File size: 5,543 Bytes
ef6a683 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
import time
from typing import Any, Dict
from unitree_sdk2py.core.channel import ChannelFactoryInitialize
from .base_sim import BaseSimulator
def init_channel(config: Dict[str, Any]) -> None:
"""
Initialize the communication channel for simulator/robot communication.
Args:
config: Configuration dictionary containing DOMAIN_ID and optionally INTERFACE
"""
if config.get("INTERFACE", None):
ChannelFactoryInitialize(config["DOMAIN_ID"], config["INTERFACE"])
else:
ChannelFactoryInitialize(config["DOMAIN_ID"])
class SimulatorFactory:
"""Factory class for creating different types of simulators."""
@staticmethod
def create_simulator(config: Dict[str, Any], env_name: str = "default", **kwargs):
"""
Create a simulator based on the configuration.
Args:
config: Configuration dictionary containing SIMULATOR type
env_name: Environment name
**kwargs: Additional keyword arguments for specific simulators
"""
simulator_type = config.get("SIMULATOR", "mujoco")
if simulator_type == "mujoco":
return SimulatorFactory._create_mujoco_simulator(config, env_name, **kwargs)
elif simulator_type == "robocasa":
return SimulatorFactory._create_robocasa_simulator(config, env_name, **kwargs)
else:
print(
f"Warning: Invalid simulator type: {simulator_type}. "
"If you are using run_sim_loop, please ignore this warning."
)
return None
@staticmethod
def _create_mujoco_simulator(config: Dict[str, Any], env_name: str = "default", **kwargs):
"""Create a MuJoCo simulator instance."""
camera_configs = kwargs.pop("camera_configs", {})
if len(camera_configs) > 0:
print(f"Debug: SimulatorFactory received {len(camera_configs)} camera config(s)")
env_kwargs = dict(
onscreen=kwargs.pop("onscreen", True),
offscreen=kwargs.pop("offscreen", False),
camera_configs=camera_configs,
)
return BaseSimulator(config=config, env_name=env_name, **env_kwargs)
@staticmethod
def _create_robocasa_simulator(config: Dict[str, Any], env_name: str = "default", **kwargs):
"""Create a RoboCasa simulator instance."""
from gr00t_wbc.control.envs.g1.sim.robocasa_sim import RoboCasaG1EnvServer
from gr00t_wbc.control.envs.robocasa.utils.controller_utils import (
update_robosuite_controller_configs,
)
from gr00t_wbc.control.envs.robocasa.utils.sim_utils import change_simulation_timestep
change_simulation_timestep(config["SIMULATE_DT"])
# Use default environment if not specified
if env_name == "default":
env_name = "GroundOnly"
# Get or create controller configurations
controller_configs = kwargs.get("controller_configs")
if controller_configs is None:
wbc_version = kwargs.get("wbc_version", "gear_wbc")
controller_configs = update_robosuite_controller_configs("G1", wbc_version)
# Build environment kwargs
env_kwargs = dict(
onscreen=kwargs.pop("onscreen", True),
offscreen=kwargs.pop("offscreen", False),
camera_names=kwargs.pop("camera_names", None),
camera_heights=kwargs.pop("camera_heights", None),
camera_widths=kwargs.pop("camera_widths", None),
control_freq=kwargs.pop("control_freq", 50),
controller_configs=controller_configs,
ik_indicator=kwargs.pop("ik_indicator", False),
randomize_cameras=kwargs.pop("randomize_cameras", True),
)
kwargs.update(
{
"verbose": config.pop("verbose", False),
"sim_freq": 1 / config.pop("SIMULATE_DT"),
}
)
return RoboCasaG1EnvServer(
env_name=env_name,
wbc_config=config,
env_kwargs=env_kwargs,
**kwargs,
)
@staticmethod
def start_simulator(
simulator,
as_thread: bool = True,
enable_image_publish: bool = False,
mp_start_method: str = "spawn",
camera_port: int = 5555,
):
"""
Start the simulator either as a thread or as a separate process.
Args:
simulator: The simulator instance to start
config: Configuration dictionary
as_thread: If True, start as thread; if False, start as subprocess
enable_offscreen: If True and not as_thread, start image publishing
"""
if as_thread:
simulator.start_as_thread()
else:
# Wrap in try-except to make sure simulator is properly closed upon exit.
try:
if enable_image_publish:
simulator.start_image_publish_subprocess(
start_method=mp_start_method,
camera_port=camera_port,
)
time.sleep(1)
simulator.start()
except KeyboardInterrupt:
print("+++++Simulator interrupted by user.")
except Exception as e:
print(f"++++error in simulator: {e} ++++")
finally:
print("++++closing simulator ++++")
simulator.close()
# Allow simulator to initialize
time.sleep(1)
|