|
|
from pathlib import Path |
|
|
import subprocess |
|
|
import sys |
|
|
import gymnasium as gym |
|
|
from gymnasium import spaces |
|
|
import numpy as np |
|
|
from huggingface_hub import snapshot_download |
|
|
import os |
|
|
import signal |
|
|
snapshot_download("lerobot/unitree-g1-mujoco") |
|
|
|
|
|
|
|
|
def make_env(n_envs=1, use_async_envs=False, **kwargs): |
|
|
|
|
|
|
|
|
repo_dir = Path(__file__).parent |
|
|
run_sim = repo_dir / "run_sim.py" |
|
|
|
|
|
print("=" * 60) |
|
|
print("launching run_sim.py as subprocess") |
|
|
print("path:", run_sim) |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
proc = subprocess.Popen([sys.executable, str(run_sim)]) |
|
|
print(f"simulator started as pid={proc.pid}") |
|
|
|
|
|
class DummyEnv(gym.Env): |
|
|
metadata = {"render_modes": []} |
|
|
|
|
|
def __init__(self, process): |
|
|
super().__init__() |
|
|
self.process = process |
|
|
self.action_space = spaces.Box(-1, 1, shape=(1,), dtype=np.float32) |
|
|
self.observation_space = spaces.Box(-1, 1, shape=(1,), dtype=np.float32) |
|
|
|
|
|
def reset(self, seed=None, options=None): |
|
|
super().reset(seed=seed, options=options) |
|
|
obs = np.zeros(1, dtype=np.float32) |
|
|
info = {} |
|
|
return obs, info |
|
|
|
|
|
def step(self, action): |
|
|
return np.zeros(1, dtype=np.float32), 0.0, False, False, {} |
|
|
|
|
|
def close(self): |
|
|
pass |
|
|
|
|
|
def kill_sim(self): |
|
|
if self.process.poll() is None: |
|
|
print("killing simulator subprocess...") |
|
|
self.process.terminate() |
|
|
try: |
|
|
self.process.wait(timeout=2) |
|
|
except subprocess.TimeoutExpired: |
|
|
print("force killing simulator subprocess...") |
|
|
self.process.kill() |
|
|
|
|
|
return DummyEnv(proc) |
|
|
|