diff --git a/docs/source/adding_benchmarks.mdx b/docs/source/adding_benchmarks.mdx index 73a9512760..3a024f026b 100644 --- a/docs/source/adding_benchmarks.mdx +++ b/docs/source/adding_benchmarks.mdx @@ -26,7 +26,7 @@ During evaluation, data moves through four stages: 1. gym.Env ──→ raw observations (numpy dicts) 2. Preprocessing ──→ standard LeRobot keys + task description - (preprocess_observation, add_envs_task in envs/utils.py) + (preprocess_observation in envs/utils.py, env.call("task_description")) 3. Processors ──→ env-specific then policy-specific transforms (env_preprocessor, policy_preprocessor) @@ -161,6 +161,8 @@ class MyBenchmarkEnv(gym.Env): ... ``` +**GPU-based simulators (e.g. MuJoCo with EGL rendering):** If your simulator allocates GPU/EGL contexts during `__init__`, defer that allocation to a `_ensure_env()` helper called on first `reset()`/`step()`. This avoids inheriting stale GPU handles when `AsyncVectorEnv` spawns worker processes. See `LiberoEnv._ensure_env()` for the pattern. + Also provide a factory function that returns the nested dict structure: ```python @@ -207,7 +209,7 @@ class MyBenchmarkEnvConfig(EnvConfig): def gym_kwargs(self) -> dict: return {"obs_type": self.obs_type, "render_mode": self.render_mode} - def create_envs(self, n_envs: int, use_async_envs: bool = False): + def create_envs(self, n_envs: int, use_async_envs: bool = True): """Override for multi-task benchmarks or custom env creation.""" from lerobot.envs. import create__envs return create__envs(task=self.task, n_envs=n_envs, ...) @@ -299,7 +301,7 @@ After completing the steps above, confirm that everything works: 1. **Install** — `pip install -e ".[mybenchmark]"` and verify the dependency group installs cleanly. 2. **Smoke test env creation** — call `make_env()` with your config in Python, check that the returned dict has the expected `{suite: {task_id: VectorEnv}}` shape, and that `reset()` returns observations with the right keys. -3. **Run a full eval** — `lerobot-eval --env.type= --env.task= --eval.n_episodes=1 --eval.batch_size=1 --policy.path=` to exercise the full pipeline end-to-end. +3. **Run a full eval** — `lerobot-eval --env.type= --env.task= --eval.n_episodes=1 --policy.path=` to exercise the full pipeline end-to-end. (`batch_size` defaults to auto-tuning based on CPU cores; pass `--eval.batch_size=1` to force a single environment.) 4. **Check success detection** — verify that `info["is_success"]` flips to `True` when the task is actually completed. This is what the eval loop uses to compute success rates. ## Writing a benchmark doc page @@ -311,7 +313,7 @@ Each benchmark `.mdx` page should include: - **Overview image or GIF.** - **Available tasks** — table of task suites with counts and brief descriptions. - **Installation** — `pip install -e ".[]"` plus any extra steps (env vars, system packages). -- **Evaluation** — recommended `lerobot-eval` command with `n_episodes` and `batch_size` for reproducible results. Include single-task and multi-task examples if applicable. +- **Evaluation** — recommended `lerobot-eval` command with `n_episodes` for reproducible results. `batch_size` defaults to auto; only specify it if needed. Include single-task and multi-task examples if applicable. - **Policy inputs and outputs** — observation keys with shapes, action space description. - **Recommended evaluation episodes** — how many episodes per task is standard. - **Training** — example `lerobot-train` command. diff --git a/docs/source/env_processor.mdx b/docs/source/env_processor.mdx index a03eb984db..290af3b34e 100644 --- a/docs/source/env_processor.mdx +++ b/docs/source/env_processor.mdx @@ -88,7 +88,7 @@ policy_preprocessor = NormalizerProcessorStep(stats=dataset_stats) The same policy can work with different environment processors, and the same environment processor can work with different policies: -```python +````python # Use SmolVLA policy with LIBERO environment # Use SmolVLA policy with LIBERO environment libero_preprocessor, libero_postprocessor = make_env_pre_post_processors( @@ -102,7 +102,20 @@ libero_preprocessor, libero_postprocessor = make_env_pre_post_processors( policy_cfg=act_cfg, ) act_preprocessor, act_postprocessor = make_pre_post_processors(act_cfg) -``` +```python +# Use SmolVLA policy with LIBERO environment +libero_preprocessor, libero_postprocessor = make_env_pre_post_processors( + env_cfg=libero_cfg, + policy_cfg=smolvla_cfg, +) +smolvla_preprocessor, smolvla_postprocessor = make_pre_post_processors(smolvla_cfg) + +# Or use ACT policy with the same LIBERO environment +libero_preprocessor, libero_postprocessor = make_env_pre_post_processors( + env_cfg=libero_cfg, + policy_cfg=act_cfg, +) +act_preprocessor, act_postprocessor = make_pre_post_processors(act_cfg) ### 3. **Easier Experimentation** @@ -132,7 +145,7 @@ class LiberoVelocityProcessorStep(ObservationProcessorStep): state = torch.cat([eef_pos, eef_axisangle, eef_vel, gripper_pos, gripper_vel], dim=-1) # 14D return state -``` +```` ### 4. **Cleaner Environment Code** @@ -157,7 +170,7 @@ observation = { ### Factory Function -The `make_env_pre_post_processors` function delegates to `env_cfg.get_env_processors()`: +The `make_env_pre_post_processors` function follows the same pattern as `make_pre_post_processors` for policies: ```python from lerobot.envs.factory import make_env_pre_post_processors @@ -165,30 +178,46 @@ from lerobot.envs.configs import LiberoEnv, PushtEnv # For LIBERO: Returns LiberoProcessorStep in preprocessor libero_cfg = LiberoEnv(task="libero_spatial", camera_name=["agentview"]) -env_preprocessor, env_postprocessor = make_env_pre_post_processors(libero_cfg, policy_cfg) +env_preprocessor, env_postprocessor = make_env_pre_post_processors(libero_cfg) # For other environments: Returns identity processors (no-op) pusht_cfg = PushtEnv() -env_preprocessor, env_postprocessor = make_env_pre_post_processors(pusht_cfg, policy_cfg) +env_preprocessor, env_postprocessor = make_env_pre_post_processors(pusht_cfg) ``` -### How It Works - -Each `EnvConfig` subclass can override `get_env_processors()` to return benchmark-specific -processor pipelines. The base class returns identity (no-op) processors by default. +### Implementation in `envs/factory.py` ```python -# In your EnvConfig subclass: -def get_env_processors(self): - from lerobot.processor.pipeline import PolicyProcessorPipeline - return ( - PolicyProcessorPipeline(steps=[MyProcessorStep()]), - PolicyProcessorPipeline(steps=[]), - ) -``` +def make_env_pre_post_processors( + env_cfg: EnvConfig, +) -> tuple[ + PolicyProcessorPipeline[dict[str, Any], dict[str, Any]], + PolicyProcessorPipeline[dict[str, Any], dict[str, Any]], +]: + """ + Create preprocessor and postprocessor pipelines for environment observations. + + Args: + env_cfg: The configuration of the environment. -The factory function `make_env_pre_post_processors` simply delegates to this method, -with a special case for `XVLAConfig` policies which override the env processors entirely. + Returns: + A tuple containing: + - preprocessor: Pipeline that processes environment observations + - postprocessor: Pipeline that processes environment outputs + """ + # For LIBERO environments, add the LiberoProcessorStep to preprocessor + if isinstance(env_cfg, LiberoEnv) or "libero" in env_cfg.type: + preprocessor = PolicyProcessorPipeline(steps=[LiberoProcessorStep()]) + else: + # For all other environments, return an identity preprocessor + preprocessor = PolicyProcessorPipeline(steps=[]) + + # Postprocessor is currently identity for all environments + # Future: Could add environment-specific action transformations + postprocessor = PolicyProcessorPipeline(steps=[]) + + return preprocessor, postprocessor +``` ### Integration in Evaluation @@ -209,10 +238,7 @@ def eval_main(cfg: EvalPipelineConfig): ) # Create environment processors (NEW!) - env_preprocessor, env_postprocessor = make_env_pre_post_processors( - env_cfg=cfg.env, - policy_cfg=cfg.policy, - ) + env_preprocessor, env_postprocessor = make_env_pre_post_processors(env_cfg=cfg.env) # Run evaluation with both processor types eval_policy_all( @@ -319,19 +345,18 @@ class MyEnvProcessorStep(ObservationProcessorStep): ### 2. Update Your `EnvConfig` Subclass ```python -# In src/lerobot/envs/configs.py -@EnvConfig.register_subclass("myenv") -@dataclass -class MyEnvConfig(EnvConfig): - # ... task/features/gym kwargs ... - - def get_env_processors(self): - from lerobot.processor.pipeline import PolicyProcessorPipeline - - return ( - PolicyProcessorPipeline(steps=[MyEnvProcessorStep()]), - PolicyProcessorPipeline(steps=[]), - ) +# In src/lerobot/envs/factory.py + +def make_env_pre_post_processors(env_cfg: EnvConfig): + if isinstance(env_cfg, LiberoEnv) or "libero" in env_cfg.type: + preprocessor = PolicyProcessorPipeline(steps=[LiberoProcessorStep()]) + elif isinstance(env_cfg, MyEnvConfig) or "myenv" in env_cfg.type: + preprocessor = PolicyProcessorPipeline(steps=[MyEnvProcessorStep()]) + else: + preprocessor = PolicyProcessorPipeline(steps=[]) + + postprocessor = PolicyProcessorPipeline(steps=[]) + return preprocessor, postprocessor ``` ### 3. Use in Evaluation diff --git a/docs/source/metaworld.mdx b/docs/source/metaworld.mdx index 5c4a780be8..8e629dea96 100644 --- a/docs/source/metaworld.mdx +++ b/docs/source/metaworld.mdx @@ -2,7 +2,7 @@ Meta-World is an open-source simulation benchmark for **multi-task and meta reinforcement learning** in continuous-control robotic manipulation. It bundles 50 diverse manipulation tasks using everyday objects and a common tabletop Sawyer arm, providing a standardized playground to test whether algorithms can learn many different tasks and generalize quickly to new ones. -- Paper: [Meta-World: A Benchmark and Evaluation for Multi-Task and Meta Reinforcement Learning](https://arxiv.org/abs/1910.10897) +- Paper: [Meta-World: A Benchmark and Evaluation for Multi-Task and Meta Reinforcement Learning paper](https://arxiv.org/abs/1910.10897) - GitHub: [Farama-Foundation/Metaworld](https://github.com/Farama-Foundation/Metaworld) - Project website: [metaworld.farama.org](https://metaworld.farama.org) diff --git a/src/lerobot/configs/default.py b/src/lerobot/configs/default.py index 58ed64420c..d6ad665bf1 100644 --- a/src/lerobot/configs/default.py +++ b/src/lerobot/configs/default.py @@ -65,20 +65,27 @@ class WandBConfig: class EvalConfig: n_episodes: int = 50 # `batch_size` specifies the number of environments to use in a gym.vector.VectorEnv. - batch_size: int = 50 + # Set to 0 for auto-tuning based on available CPU cores and n_episodes. + batch_size: int = 0 # `use_async_envs` specifies whether to use asynchronous environments (multiprocessing). - use_async_envs: bool = False + # Defaults to True; automatically downgraded to SyncVectorEnv when batch_size=1. + use_async_envs: bool = True def __post_init__(self) -> None: + if self.batch_size == 0: + self.batch_size = self._auto_batch_size() if self.batch_size > self.n_episodes: - raise ValueError( - "The eval batch size is greater than the number of eval episodes " - f"({self.batch_size} > {self.n_episodes}). As a result, {self.batch_size} " - f"eval environments will be instantiated, but only {self.n_episodes} will be used. " - "This might significantly slow down evaluation. To fix this, you should update your command " - f"to increase the number of episodes to match the batch size (e.g. `eval.n_episodes={self.batch_size}`), " - f"or lower the batch size (e.g. `eval.batch_size={self.n_episodes}`)." - ) + self.batch_size = self.n_episodes + + def _auto_batch_size(self) -> int: + """Pick batch_size based on CPU cores, capped by n_episodes.""" + import math + import os + + cpu_cores = os.cpu_count() or 4 + # Each async env worker needs ~1 core; leave headroom for main process + inference. + by_cpu = max(1, math.floor(cpu_cores * 0.7)) + return min(by_cpu, self.n_episodes, 64) @dataclass diff --git a/src/lerobot/envs/configs.py b/src/lerobot/envs/configs.py index 750187c05f..af5bda33f6 100644 --- a/src/lerobot/envs/configs.py +++ b/src/lerobot/envs/configs.py @@ -44,6 +44,13 @@ ) +def _make_vec_env_cls(use_async: bool, n_envs: int): + """Return the right VectorEnv constructor.""" + if use_async and n_envs > 1: + return gym.vector.AsyncVectorEnv + return gym.vector.SyncVectorEnv + + @dataclass class EnvConfig(draccus.ChoiceRegistry, abc.ABC): task: str | None = None @@ -80,8 +87,9 @@ def create_envs( """Create {suite: {task_id: VectorEnv}}. Default: single-task env via gym.make(). Multi-task benchmarks override. + AsyncVectorEnv is the default for n_envs > 1; auto-downgraded to Sync for n_envs=1. """ - env_cls = gym.vector.AsyncVectorEnv if use_async_envs else gym.vector.SyncVectorEnv + env_cls = gym.vector.AsyncVectorEnv if (use_async_envs and n_envs > 1) else gym.vector.SyncVectorEnv if self.gym_id not in gym_registry: print(f"gym id '{self.gym_id}' not found, attempting to import '{self.package_name}'...") @@ -101,12 +109,17 @@ def create_envs( def _make_one(): return gym.make(self.gym_id, disable_env_checker=self.disable_env_checker, **self.gym_kwargs) + extra_kwargs: dict = {} + if env_cls is gym.vector.AsyncVectorEnv: + extra_kwargs["context"] = "forkserver" try: from gymnasium.vector import AutoresetMode - vec = env_cls([_make_one for _ in range(n_envs)], autoreset_mode=AutoresetMode.SAME_STEP) + vec = env_cls( + [_make_one for _ in range(n_envs)], autoreset_mode=AutoresetMode.SAME_STEP, **extra_kwargs + ) except ImportError: - vec = env_cls([_make_one for _ in range(n_envs)]) + vec = env_cls([_make_one for _ in range(n_envs)], **extra_kwargs) return {self.type: {0: vec}} def get_env_processors(self): @@ -394,7 +407,12 @@ def __post_init__(self): @property def gym_kwargs(self) -> dict: - kwargs: dict[str, Any] = {"obs_type": self.obs_type, "render_mode": self.render_mode} + kwargs: dict[str, Any] = { + "obs_type": self.obs_type, + "render_mode": self.render_mode, + "observation_height": self.observation_height, + "observation_width": self.observation_width, + } if self.task_ids is not None: kwargs["task_ids"] = self.task_ids return kwargs @@ -404,7 +422,7 @@ def create_envs(self, n_envs: int, use_async_envs: bool = False): if self.task is None: raise ValueError("LiberoEnv requires a task to be specified") - env_cls = gym.vector.AsyncVectorEnv if use_async_envs else gym.vector.SyncVectorEnv + env_cls = _make_vec_env_cls(use_async_envs, n_envs) return create_libero_envs( task=self.task, n_envs=n_envs, @@ -473,7 +491,7 @@ def create_envs(self, n_envs: int, use_async_envs: bool = False): if self.task is None: raise ValueError("MetaWorld requires a task to be specified") - env_cls = gym.vector.AsyncVectorEnv if use_async_envs else gym.vector.SyncVectorEnv + env_cls = _make_vec_env_cls(use_async_envs, n_envs) return create_metaworld_envs( task=self.task, n_envs=n_envs, diff --git a/src/lerobot/envs/libero.py b/src/lerobot/envs/libero.py index 8ddb4b68cb..1b814db524 100644 --- a/src/lerobot/envs/libero.py +++ b/src/lerobot/envs/libero.py @@ -29,6 +29,7 @@ from libero.libero import benchmark, get_libero_path from libero.libero.envs import OffScreenRenderEnv +from lerobot.envs.utils import _LazyAsyncVectorEnv from lerobot.types import RobotObservation @@ -150,7 +151,17 @@ def __init__( self.init_state_id = self.episode_index # tie each sub-env to a fixed init state - self._env = self._make_envs_task(task_suite, self.task_id) + # Extract task metadata without allocating GPU resources (safe before fork). + task = task_suite.get_task(task_id) + self.task = task.name + self.task_description = task.language + self._task_bddl_file = os.path.join( + get_libero_path("bddl_files"), task.problem_folder, task.bddl_file + ) + self._env: OffScreenRenderEnv | None = ( + None # deferred — created on first reset() inside the worker subprocess + ) + default_steps = 500 self._max_episode_steps = ( TASK_SUITE_MAX_STEPS.get(task_suite_name, default_steps) @@ -221,29 +232,33 @@ def __init__( low=ACTION_LOW, high=ACTION_HIGH, shape=(ACTION_DIM,), dtype=np.float32 ) + def _ensure_env(self) -> None: + """Create the underlying OffScreenRenderEnv on first use. + + Called inside the worker subprocess after fork(), so each worker gets + its own clean EGL context rather than inheriting a stale one from the + parent process (which causes EGL_BAD_CONTEXT crashes with AsyncVectorEnv). + """ + if self._env is not None: + return + env = OffScreenRenderEnv( + bddl_file_name=self._task_bddl_file, + camera_heights=self.observation_height, + camera_widths=self.observation_width, + ) + env.reset() + self._env = env + def render(self): + self._ensure_env() raw_obs = self._env.env._get_observations() pixels = self._format_raw_obs(raw_obs)["pixels"] image = next(iter(pixels.values())) image = image[::-1, ::-1] # flip both H and W for visualization return image - def _make_envs_task(self, task_suite: Any, task_id: int = 0): - task = task_suite.get_task(task_id) - self.task = task.name - self.task_description = task.language - task_bddl_file = os.path.join(get_libero_path("bddl_files"), task.problem_folder, task.bddl_file) - - env_args = { - "bddl_file_name": task_bddl_file, - "camera_heights": self.observation_height, - "camera_widths": self.observation_width, - } - env = OffScreenRenderEnv(**env_args) - env.reset() - return env - def _format_raw_obs(self, raw_obs: RobotObservation) -> RobotObservation: + assert self._env is not None, "_format_raw_obs called before _ensure_env()" images = {} for camera_name in self.camera_name: image = raw_obs[camera_name] @@ -295,6 +310,7 @@ def _format_raw_obs(self, raw_obs: RobotObservation) -> RobotObservation: ) def reset(self, seed=None, **kwargs): + self._ensure_env() super().reset(seed=seed) self._env.seed(seed) raw_obs = self._env.reset() @@ -321,6 +337,8 @@ def reset(self, seed=None, **kwargs): return observation, info def step(self, action: np.ndarray) -> tuple[RobotObservation, float, bool, bool, dict[str, Any]]: + self._ensure_env() + assert self._env is not None if action.ndim != 1: raise ValueError( f"Expected action to be 1-D (shape (action_dim,)), " @@ -345,7 +363,8 @@ def step(self, action: np.ndarray) -> tuple[RobotObservation, float, bool, bool, return observation, reward, terminated, truncated, info def close(self): - self._env.close() + if self._env is not None: + self._env.close() def _make_env_fns( @@ -428,6 +447,8 @@ def create_libero_envs( if task_ids_filter is not None: print(f"Restricting to task_ids={task_ids_filter}") + is_async = env_cls is gym.vector.AsyncVectorEnv + out: dict[str, dict[int, Any]] = defaultdict(dict) for suite_name in suite_names: suite = _get_suite(suite_name) @@ -436,6 +457,11 @@ def create_libero_envs( if not selected: raise ValueError(f"No tasks selected for suite '{suite_name}' (available: {total}).") + # All tasks in a suite share identical observation/action spaces. + # Probe once and reuse to avoid creating a temp env per task. + cached_obs_space: spaces.Space | None = None + cached_act_space: spaces.Space | None = None + for tid in selected: fns = _make_env_fns( suite=suite, @@ -449,8 +475,14 @@ def create_libero_envs( control_mode=control_mode, camera_name_mapping=camera_name_mapping, ) - out[suite_name][tid] = env_cls(fns) + if is_async: + lazy = _LazyAsyncVectorEnv(fns, cached_obs_space, cached_act_space) + if cached_obs_space is None: + cached_obs_space = lazy.observation_space + cached_act_space = lazy.action_space + out[suite_name][tid] = lazy + else: + out[suite_name][tid] = env_cls(fns) print(f"Built vec env | suite={suite_name} | task_id={tid} | n_envs={n_envs}") - # return plain dicts for predictability return {suite: dict(task_map) for suite, task_map in out.items()} diff --git a/src/lerobot/envs/metaworld.py b/src/lerobot/envs/metaworld.py index e9e29f3049..49c775957b 100644 --- a/src/lerobot/envs/metaworld.py +++ b/src/lerobot/envs/metaworld.py @@ -25,6 +25,7 @@ import numpy as np from gymnasium import spaces +from lerobot.envs.utils import _LazyAsyncVectorEnv from lerobot.types import RobotObservation # ---- Load configuration data from the external JSON file ---- @@ -97,8 +98,9 @@ def __init__( self.visualization_height = visualization_height self.camera_name = camera_name - self._env = self._make_envs_task(self.task) - self._max_episode_steps = self._env.max_path_length + self._env_name = self.task # already stripped of "metaworld-" prefix above + self._env = None # deferred — created on first reset() inside the worker subprocess + self._max_episode_steps = 500 # MT1 environments always have max_path_length=500 self.task_description = TASK_DESCRIPTIONS[self.task] self.expert_policy = TASK_POLICY_MAPPING[self.task]() @@ -136,6 +138,24 @@ def __init__( self.action_space = spaces.Box(low=-1, high=1, shape=(ACTION_DIM,), dtype=np.float32) + def _ensure_env(self) -> None: + """Create the underlying MetaWorld env on first use. + + Called inside the worker subprocess after fork(), so each worker gets + its own clean rendering context rather than inheriting a stale one from + the parent process (which causes crashes with AsyncVectorEnv). + """ + if self._env is not None: + return + mt1 = metaworld.MT1(self._env_name, seed=42) + env = mt1.train_classes[self._env_name](render_mode="rgb_array", camera_name=self.camera_name) + env.set_task(mt1.train_tasks[0]) + if self.camera_name == "corner2": + env.model.cam_pos[2] = [0.75, 0.075, 0.7] + env.reset() + env._freeze_rand_vec = False # otherwise no randomization + self._env = env + def render(self) -> np.ndarray: """ Render the current environment frame. @@ -143,26 +163,13 @@ def render(self) -> np.ndarray: Returns: np.ndarray: The rendered RGB image from the environment. """ + self._ensure_env() image = self._env.render() if self.camera_name == "corner2": # Images from this camera are flipped — correct them image = np.flip(image, (0, 1)) return image - def _make_envs_task(self, env_name: str): - mt1 = metaworld.MT1(env_name, seed=42) - env = mt1.train_classes[env_name](render_mode="rgb_array", camera_name=self.camera_name) - env.set_task(mt1.train_tasks[0]) - if self.camera_name == "corner2": - env.model.cam_pos[2] = [ - 0.75, - 0.075, - 0.7, - ] # corner2 position, similar to https://arxiv.org/pdf/2206.14244 - env.reset() - env._freeze_rand_vec = False # otherwise no randomization - return env - def _format_raw_obs(self, raw_obs: np.ndarray) -> RobotObservation: image = None if self._env is not None: @@ -209,6 +216,7 @@ def reset( observation (RobotObservation): The initial formatted observation. info (Dict[str, Any]): Additional info about the reset state. """ + self._ensure_env() super().reset(seed=seed) raw_obs, info = self._env.reset(seed=seed) @@ -232,6 +240,7 @@ def step(self, action: np.ndarray) -> tuple[RobotObservation, float, bool, bool, truncated (bool): Whether the episode was truncated due to a time limit. info (Dict[str, Any]): Additional environment info. """ + self._ensure_env() if action.ndim != 1: raise ValueError( f"Expected action to be 1-D (shape (action_dim,)), " @@ -263,7 +272,8 @@ def step(self, action: np.ndarray) -> tuple[RobotObservation, float, bool, bool, return observation, reward, terminated, truncated, info def close(self): - self._env.close() + if self._env is not None: + self._env.close() # ---- Main API ---------------------------------------------------------------- @@ -297,6 +307,9 @@ def create_metaworld_envs( print(f"Creating Meta-World envs | task_groups={task_groups} | n_envs(per task)={n_envs}") + is_async = env_cls is gym.vector.AsyncVectorEnv + cached_obs_space = None + cached_act_space = None out: dict[str, dict[int, Any]] = defaultdict(dict) for group in task_groups: @@ -309,7 +322,14 @@ def create_metaworld_envs( # build n_envs factories fns = [(lambda tn=task_name: MetaworldEnv(task=tn, **gym_kwargs)) for _ in range(n_envs)] - out[group][tid] = env_cls(fns) + if is_async: + lazy = _LazyAsyncVectorEnv(fns, cached_obs_space, cached_act_space) + if cached_obs_space is None: + cached_obs_space = lazy.observation_space + cached_act_space = lazy.action_space + out[group][tid] = lazy + else: + out[group][tid] = env_cls(fns) # return a plain dict for consistency return {group: dict(task_map) for group, task_map in out.items()} diff --git a/src/lerobot/envs/utils.py b/src/lerobot/envs/utils.py index fd17a67621..ff5f53735a 100644 --- a/src/lerobot/envs/utils.py +++ b/src/lerobot/envs/utils.py @@ -16,7 +16,7 @@ import importlib.util import os import warnings -from collections.abc import Mapping, Sequence +from collections.abc import Callable, Mapping, Sequence from functools import singledispatch from typing import Any @@ -29,7 +29,6 @@ from lerobot.configs.types import FeatureType, PolicyFeature from lerobot.envs.configs import EnvConfig -from lerobot.types import RobotObservation from lerobot.utils.constants import OBS_ENV_STATE, OBS_IMAGE, OBS_IMAGES, OBS_STATE, OBS_STR from lerobot.utils.utils import get_channel_first_image_shape @@ -130,59 +129,80 @@ def env_to_policy_features(env_cfg: EnvConfig) -> dict[str, PolicyFeature]: return policy_features -def are_all_envs_same_type(env: gym.vector.VectorEnv) -> bool: - first_type = type(env.envs[0]) # Get type of first env - return all(type(e) is first_type for e in env.envs) # Fast type check +def _sub_env_has_attr(env: gym.vector.VectorEnv, attr: str) -> bool: + try: + env.get_attr(attr) + return True + except (AttributeError, Exception): + return False -def check_env_attributes_and_types(env: gym.vector.VectorEnv) -> None: - with warnings.catch_warnings(): - warnings.simplefilter("once", UserWarning) # Apply filter only in this function +class _LazyAsyncVectorEnv: + """Defers AsyncVectorEnv creation until first use. - if not (hasattr(env.envs[0], "task_description") and hasattr(env.envs[0], "task")): - warnings.warn( - "The environment does not have 'task_description' and 'task'. Some policies require these features.", - UserWarning, - stacklevel=2, - ) - if not are_all_envs_same_type(env): - warnings.warn( - "The environments have different types. Make sure you infer the right task from each environment. Empty task will be passed instead.", - UserWarning, - stacklevel=2, - ) + Creating all tasks' AsyncVectorEnvs upfront spawns N_tasks × n_envs worker + processes, all of which allocate EGL/GPU resources immediately. Since tasks + are evaluated sequentially, only one task's workers need to be alive at a + time. This wrapper stores the factory functions and creates the real + AsyncVectorEnv on first reset()/step()/call(), keeping peak process count = n_envs. + """ + + def __init__( + self, + env_fns: list[Callable], + observation_space=None, + action_space=None, + ): + self._env_fns = env_fns + self._env: gym.vector.AsyncVectorEnv | None = None + self.num_envs = len(env_fns) + if observation_space is not None and action_space is not None: + self.observation_space = observation_space + self.action_space = action_space + else: + tmp = env_fns[0]() + self.observation_space = tmp.observation_space + self.action_space = tmp.action_space + tmp.close() + self.single_observation_space = self.observation_space + self.single_action_space = self.action_space + def _ensure(self) -> None: + if self._env is None: + self._env = gym.vector.AsyncVectorEnv(self._env_fns, context="forkserver", shared_memory=True) -def add_envs_task(env: gym.vector.VectorEnv, observation: RobotObservation) -> RobotObservation: - """Adds task feature to the observation dict with respect to the first environment attribute.""" - if hasattr(env.envs[0], "task_description"): - task_result = env.call("task_description") + def reset(self, **kwargs): + self._ensure() + return self._env.reset(**kwargs) - if isinstance(task_result, tuple): - task_result = list(task_result) + def step(self, actions): + self._ensure() + return self._env.step(actions) - if not isinstance(task_result, list): - raise TypeError(f"Expected task_description to return a list, got {type(task_result)}") - if not all(isinstance(item, str) for item in task_result): - raise TypeError("All items in task_description result must be strings") + def call(self, name, *args, **kwargs): + self._ensure() + return self._env.call(name, *args, **kwargs) - observation["task"] = task_result - elif hasattr(env.envs[0], "task"): - task_result = env.call("task") + def get_attr(self, name): + self._ensure() + return self._env.get_attr(name) - if isinstance(task_result, tuple): - task_result = list(task_result) + def close(self) -> None: + if self._env is not None: + self._env.close() + self._env = None - if not isinstance(task_result, list): - raise TypeError(f"Expected task to return a list, got {type(task_result)}") - if not all(isinstance(item, str) for item in task_result): - raise TypeError("All items in task result must be strings") - observation["task"] = task_result - else: # For envs without language instructions, e.g. aloha transfer cube and etc. - num_envs = observation[list(observation.keys())[0]].shape[0] - observation["task"] = ["" for _ in range(num_envs)] - return observation +def check_env_attributes_and_types(env: gym.vector.VectorEnv) -> None: + with warnings.catch_warnings(): + warnings.simplefilter("once", UserWarning) + + if not (_sub_env_has_attr(env, "task_description") and _sub_env_has_attr(env, "task")): + warnings.warn( + "The environment does not have 'task_description' and 'task'. Some policies require these features.", + UserWarning, + stacklevel=2, + ) def _close_single_env(env: Any) -> None: diff --git a/src/lerobot/processor/tokenizer_processor.py b/src/lerobot/processor/tokenizer_processor.py index 2a972ecc86..0b5305dcf0 100644 --- a/src/lerobot/processor/tokenizer_processor.py +++ b/src/lerobot/processor/tokenizer_processor.py @@ -136,8 +136,8 @@ def get_task(self, transition: EnvTransition) -> list[str] | None: # Standardize to a list of strings for the tokenizer if isinstance(task, str): return [task] - elif isinstance(task, list) and all(isinstance(t, str) for t in task): - return task + elif isinstance(task, (list, tuple)) and all(isinstance(t, str) for t in task): + return list(task) return None diff --git a/src/lerobot/scripts/lerobot_eval.py b/src/lerobot/scripts/lerobot_eval.py index e2c23ab39a..cd912280fc 100644 --- a/src/lerobot/scripts/lerobot_eval.py +++ b/src/lerobot/scripts/lerobot_eval.py @@ -73,7 +73,6 @@ from lerobot.configs.eval import EvalPipelineConfig from lerobot.envs.factory import make_env, make_env_pre_post_processors from lerobot.envs.utils import ( - add_envs_task, check_env_attributes_and_types, close_envs, preprocess_observation, @@ -166,9 +165,15 @@ def rollout( if return_observations: all_observations.append(deepcopy(observation)) - # Infer "task" from attributes of environments. - # TODO: works with SyncVectorEnv but not AsyncVectorEnv - observation = add_envs_task(env, observation) + # Infer "task" from sub-environments (prefer natural language description). + # env.call() works with both SyncVectorEnv and AsyncVectorEnv. + try: + observation["task"] = list(env.call("task_description")) + except (AttributeError, NotImplementedError): + try: + observation["task"] = list(env.call("task")) + except (AttributeError, NotImplementedError): + observation["task"] = [""] * env.num_envs # Apply environment-specific preprocessing (e.g., LiberoProcessorStep for LIBERO) observation = env_preprocessor(observation) @@ -318,8 +323,9 @@ def render_frame(env: gym.vector.VectorEnv): n_to_render_now = min(max_episodes_rendered - n_episodes_rendered, env.num_envs) if isinstance(env, gym.vector.SyncVectorEnv): ep_frames.append(np.stack([env.envs[i].render() for i in range(n_to_render_now)])) # noqa: B023 - elif isinstance(env, gym.vector.AsyncVectorEnv): + elif hasattr(env, "call"): # Here we must render all frames and discard any we don't need. + # Covers AsyncVectorEnv and _LazyAsyncVectorEnv (which wraps one). ep_frames.append(np.stack(env.call("render")[:n_to_render_now])) if max_episodes_rendered > 0: @@ -521,7 +527,7 @@ def eval_main(cfg: EvalPipelineConfig): logging.info(colored("Output dir:", "yellow", attrs=["bold"]) + f" {cfg.output_dir}") - logging.info("Making environment.") + logging.info(f"Making environment (batch_size={cfg.eval.batch_size}, async={cfg.eval.use_async_envs}).") envs = make_env( cfg.env, n_envs=cfg.eval.batch_size, @@ -755,23 +761,39 @@ def _append(key, value): ) if max_parallel_tasks <= 1: - # sequential path (single accumulator path on the main thread) - # NOTE: keeping a single-threaded accumulator avoids concurrent list appends or locks - for task_group, task_id, env in tasks: - tg, tid, metrics = task_runner(task_group, task_id, env) - _accumulate_to(tg, metrics) - per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics}) + prefetch_thread: threading.Thread | None = None + for i, (task_group, task_id, env) in enumerate(tasks): + if prefetch_thread is not None: + prefetch_thread.join() + prefetch_thread = None + + try: + tg, tid, metrics = task_runner(task_group, task_id, env) + _accumulate_to(tg, metrics) + per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics}) + finally: + env.close() + # Prefetch next task's workers *after* closing current env to prevent + # GPU memory overlap between consecutive tasks. + if i + 1 < len(tasks): + next_env = tasks[i + 1][2] + if hasattr(next_env, "_ensure"): + prefetch_thread = threading.Thread(target=next_env._ensure, daemon=True) + prefetch_thread.start() else: - # threaded path: submit all tasks, consume completions on main thread and accumulate there with cf.ThreadPoolExecutor(max_workers=max_parallel_tasks) as executor: fut2meta = {} for task_group, task_id, env in tasks: fut = executor.submit(task_runner, task_group, task_id, env) - fut2meta[fut] = (task_group, task_id) + fut2meta[fut] = (task_group, task_id, env) for fut in cf.as_completed(fut2meta): - tg, tid, metrics = fut.result() - _accumulate_to(tg, metrics) - per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics}) + tg, tid, env = fut2meta[fut] + try: + tg, tid, metrics = fut.result() + _accumulate_to(tg, metrics) + per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics}) + finally: + env.close() # compute aggregated metrics helper (robust to lists/scalars) def _agg_from_list(xs): diff --git a/tests/envs/test_dispatch.py b/tests/envs/test_dispatch.py index 7074b04c83..5bd2827f3a 100644 --- a/tests/envs/test_dispatch.py +++ b/tests/envs/test_dispatch.py @@ -90,7 +90,7 @@ def gym_kwargs(self): envs = _Env().create_envs(n_envs=2) assert "_dispatch_base_test" in envs env = envs["_dispatch_base_test"][0] - assert isinstance(env, gym.vector.SyncVectorEnv) + assert isinstance(env, gym.vector.VectorEnv) assert env.num_envs == 2 env.close() finally: diff --git a/tests/policies/test_policies.py b/tests/policies/test_policies.py index 77a74d60e6..4a8d3ab720 100644 --- a/tests/policies/test_policies.py +++ b/tests/policies/test_policies.py @@ -31,7 +31,7 @@ from lerobot.datasets.feature_utils import dataset_to_policy_features from lerobot.datasets.utils import cycle from lerobot.envs.factory import make_env, make_env_config -from lerobot.envs.utils import preprocess_observation +from lerobot.envs.utils import close_envs, preprocess_observation from lerobot.optim.factory import make_optimizer_and_scheduler from lerobot.policies.act.configuration_act import ACTConfig from lerobot.policies.act.modeling_act import ACTTemporalEnsembler @@ -224,6 +224,8 @@ def test_policy(ds_repo_id, env_name, env_kwargs, policy_name, policy_kwargs): # Test step through policy env.step(action) + close_envs(envs) + # TODO(rcadene, aliberts): This test is quite end-to-end. Move this test in test_optimizer? def test_act_backbone_lr(): diff --git a/tests/processor/test_tokenizer_processor.py b/tests/processor/test_tokenizer_processor.py index 2f1c4cc9cc..76dce25372 100644 --- a/tests/processor/test_tokenizer_processor.py +++ b/tests/processor/test_tokenizer_processor.py @@ -189,6 +189,30 @@ def test_list_of_strings_tokenization(mock_auto_tokenizer): assert attention_mask.shape == (2, 8) +@require_package("transformers") +@patch("lerobot.processor.tokenizer_processor.AutoTokenizer") +def test_tuple_of_strings_tokenization(mock_auto_tokenizer): + """Test tokenization of a tuple of strings (returned by VectorEnv.call()).""" + mock_tokenizer = MockTokenizer(vocab_size=100) + mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer + + processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", max_length=8) + + transition = create_transition( + observation={"state": torch.tensor([1.0, 2.0])}, + action=torch.tensor([0.1, 0.2]), + complementary_data={"task": ("pick up cube", "place on table")}, + ) + + result = processor(transition) + + observation = result[TransitionKey.OBSERVATION] + tokens = observation[f"{OBS_LANGUAGE}.tokens"] + attention_mask = observation[f"{OBS_LANGUAGE}.attention_mask"] + assert tokens.shape == (2, 8) + assert attention_mask.shape == (2, 8) + + @require_package("transformers") @patch("lerobot.processor.tokenizer_processor.AutoTokenizer") def test_custom_keys(mock_auto_tokenizer):