Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/lerobot/configs/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ class EvalConfig:
# `batch_size` specifies the number of environments to use in a gym.vector.VectorEnv.
batch_size: int = 50
# `use_async_envs` specifies whether to use asynchronous environments (multiprocessing).
use_async_envs: bool = False
# Defaults to True; automatically downgraded to SyncVectorEnv when batch_size=1.
use_async_envs: bool = True

def __post_init__(self) -> None:
if self.batch_size > self.n_episodes:
Expand Down
13 changes: 7 additions & 6 deletions src/lerobot/envs/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,14 @@ def gym_kwargs(self) -> dict:
def create_envs(
self,
n_envs: int,
use_async_envs: bool = False,
use_async_envs: bool = True,
) -> dict[str, dict[int, gym.vector.VectorEnv]]:
"""Create {suite: {task_id: VectorEnv}}.

Default: single-task env via gym.make(). Multi-task benchmarks override.
AsyncVectorEnv is the default for n_envs > 1; auto-downgraded to Sync for n_envs=1.
"""
env_cls = gym.vector.AsyncVectorEnv if use_async_envs else gym.vector.SyncVectorEnv
env_cls = gym.vector.AsyncVectorEnv if (use_async_envs and n_envs > 1) else gym.vector.SyncVectorEnv

if self.gym_id not in gym_registry:
print(f"gym id '{self.gym_id}' not found, attempting to import '{self.package_name}'...")
Expand Down Expand Up @@ -388,12 +389,12 @@ def gym_kwargs(self) -> dict:
kwargs["task_ids"] = self.task_ids
return kwargs

def create_envs(self, n_envs: int, use_async_envs: bool = False):
def create_envs(self, n_envs: int, use_async_envs: bool = True):
from lerobot.envs.libero import create_libero_envs

if self.task is None:
raise ValueError("LiberoEnv requires a task to be specified")
env_cls = gym.vector.AsyncVectorEnv if use_async_envs else gym.vector.SyncVectorEnv
env_cls = gym.vector.AsyncVectorEnv if (use_async_envs and n_envs > 1) else gym.vector.SyncVectorEnv
return create_libero_envs(
task=self.task,
n_envs=n_envs,
Expand Down Expand Up @@ -456,12 +457,12 @@ def gym_kwargs(self) -> dict:
"render_mode": self.render_mode,
}

def create_envs(self, n_envs: int, use_async_envs: bool = False):
def create_envs(self, n_envs: int, use_async_envs: bool = True):
from lerobot.envs.metaworld import create_metaworld_envs

if self.task is None:
raise ValueError("MetaWorld requires a task to be specified")
env_cls = gym.vector.AsyncVectorEnv if use_async_envs else gym.vector.SyncVectorEnv
env_cls = gym.vector.AsyncVectorEnv if (use_async_envs and n_envs > 1) else gym.vector.SyncVectorEnv
return create_metaworld_envs(
task=self.task,
n_envs=n_envs,
Expand Down
2 changes: 1 addition & 1 deletion src/lerobot/envs/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def make_env_pre_post_processors(
def make_env(
cfg: EnvConfig | str,
n_envs: int = 1,
use_async_envs: bool = False,
use_async_envs: bool = True,
hub_cache_dir: str | None = None,
trust_remote_code: bool = False,
) -> dict[str, dict[int, gym.vector.VectorEnv]]:
Expand Down
52 changes: 35 additions & 17 deletions src/lerobot/envs/libero.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,17 @@ def __init__(

self.init_state_id = self.episode_index # tie each sub-env to a fixed init state

self._env = self._make_envs_task(task_suite, self.task_id)
# Extract task metadata without allocating GPU resources (safe before fork).
task = task_suite.get_task(task_id)
self.task = task.name
self.task_description = task.language
self._task_bddl_file = os.path.join(
get_libero_path("bddl_files"), task.problem_folder, task.bddl_file
)
self._env: OffScreenRenderEnv | None = (
None # deferred — created on first reset() inside the worker subprocess
)

default_steps = 500
self._max_episode_steps = (
TASK_SUITE_MAX_STEPS.get(task_suite_name, default_steps)
Expand Down Expand Up @@ -221,28 +231,32 @@ def __init__(
low=ACTION_LOW, high=ACTION_HIGH, shape=(ACTION_DIM,), dtype=np.float32
)

def _ensure_env(self) -> None:
"""Create the underlying OffScreenRenderEnv on first use.

Called inside the worker subprocess after fork(), so each worker gets
its own clean EGL context rather than inheriting a stale one from the
parent process (which causes EGL_BAD_CONTEXT crashes with AsyncVectorEnv).
"""
if self._env is not None:
return
env = OffScreenRenderEnv(
bddl_file_name=self._task_bddl_file,
camera_heights=self.observation_height,
camera_widths=self.observation_width,
)
env.reset()
self._env = env

def render(self):
self._ensure_env()
raw_obs = self._env.env._get_observations()
image = self._format_raw_obs(raw_obs)["pixels"]["image"]
image = image[::-1, ::-1] # flip both H and W for visualization
return image

def _make_envs_task(self, task_suite: Any, task_id: int = 0):
task = task_suite.get_task(task_id)
self.task = task.name
self.task_description = task.language
task_bddl_file = os.path.join(get_libero_path("bddl_files"), task.problem_folder, task.bddl_file)

env_args = {
"bddl_file_name": task_bddl_file,
"camera_heights": self.observation_height,
"camera_widths": self.observation_width,
}
env = OffScreenRenderEnv(**env_args)
env.reset()
return env

def _format_raw_obs(self, raw_obs: RobotObservation) -> RobotObservation:
assert self._env is not None, "_format_raw_obs called before _ensure_env()"
images = {}
for camera_name in self.camera_name:
image = raw_obs[camera_name]
Expand Down Expand Up @@ -294,6 +308,7 @@ def _format_raw_obs(self, raw_obs: RobotObservation) -> RobotObservation:
)

def reset(self, seed=None, **kwargs):
self._ensure_env()
super().reset(seed=seed)
self._env.seed(seed)
raw_obs = self._env.reset()
Expand All @@ -320,6 +335,8 @@ def reset(self, seed=None, **kwargs):
return observation, info

def step(self, action: np.ndarray) -> tuple[RobotObservation, float, bool, bool, dict[str, Any]]:
self._ensure_env()
assert self._env is not None
if action.ndim != 1:
raise ValueError(
f"Expected action to be 1-D (shape (action_dim,)), "
Expand Down Expand Up @@ -350,7 +367,8 @@ def step(self, action: np.ndarray) -> tuple[RobotObservation, float, bool, bool,
return observation, reward, terminated, truncated, info

def close(self):
self._env.close()
if self._env is not None:
self._env.close()


def _make_env_fns(
Expand Down
43 changes: 26 additions & 17 deletions src/lerobot/envs/metaworld.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ def __init__(
self.visualization_height = visualization_height
self.camera_name = camera_name

self._env = self._make_envs_task(self.task)
self._max_episode_steps = self._env.max_path_length
self._env_name = self.task # already stripped of "metaworld-" prefix above
self._env = None # deferred — created on first reset() inside the worker subprocess
self._max_episode_steps = 500 # MT1 environments always have max_path_length=500
self.task_description = TASK_DESCRIPTIONS[self.task]

self.expert_policy = TASK_POLICY_MAPPING[self.task]()
Expand Down Expand Up @@ -136,33 +137,38 @@ def __init__(

self.action_space = spaces.Box(low=-1, high=1, shape=(ACTION_DIM,), dtype=np.float32)

def _ensure_env(self) -> None:
"""Create the underlying MetaWorld env on first use.

Called inside the worker subprocess after fork(), so each worker gets
its own clean rendering context rather than inheriting a stale one from
the parent process (which causes crashes with AsyncVectorEnv).
"""
if self._env is not None:
return
mt1 = metaworld.MT1(self._env_name, seed=42)
env = mt1.train_classes[self._env_name](render_mode="rgb_array", camera_name=self.camera_name)
env.set_task(mt1.train_tasks[0])
if self.camera_name == "corner2":
env.model.cam_pos[2] = [0.75, 0.075, 0.7]
env.reset()
env._freeze_rand_vec = False # otherwise no randomization
self._env = env

def render(self) -> np.ndarray:
"""
Render the current environment frame.

Returns:
np.ndarray: The rendered RGB image from the environment.
"""
self._ensure_env()
image = self._env.render()
if self.camera_name == "corner2":
# Images from this camera are flipped — correct them
image = np.flip(image, (0, 1))
return image

def _make_envs_task(self, env_name: str):
mt1 = metaworld.MT1(env_name, seed=42)
env = mt1.train_classes[env_name](render_mode="rgb_array", camera_name=self.camera_name)
env.set_task(mt1.train_tasks[0])
if self.camera_name == "corner2":
env.model.cam_pos[2] = [
0.75,
0.075,
0.7,
] # corner2 position, similar to https://arxiv.org/pdf/2206.14244
env.reset()
env._freeze_rand_vec = False # otherwise no randomization
return env

def _format_raw_obs(self, raw_obs: np.ndarray) -> RobotObservation:
image = None
if self._env is not None:
Expand Down Expand Up @@ -209,6 +215,7 @@ def reset(
observation (RobotObservation): The initial formatted observation.
info (Dict[str, Any]): Additional info about the reset state.
"""
self._ensure_env()
super().reset(seed=seed)

raw_obs, info = self._env.reset(seed=seed)
Expand All @@ -232,6 +239,7 @@ def step(self, action: np.ndarray) -> tuple[RobotObservation, float, bool, bool,
truncated (bool): Whether the episode was truncated due to a time limit.
info (Dict[str, Any]): Additional environment info.
"""
self._ensure_env()
if action.ndim != 1:
raise ValueError(
f"Expected action to be 1-D (shape (action_dim,)), "
Expand Down Expand Up @@ -263,7 +271,8 @@ def step(self, action: np.ndarray) -> tuple[RobotObservation, float, bool, bool,
return observation, reward, terminated, truncated, info

def close(self):
self._env.close()
if self._env is not None:
self._env.close()


# ---- Main API ----------------------------------------------------------------
Expand Down
55 changes: 34 additions & 21 deletions src/lerobot/scripts/lerobot_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"""

import concurrent.futures as cf
import copy
import json
import logging
import threading
Expand All @@ -56,7 +57,6 @@
from contextlib import nullcontext
from copy import deepcopy
from dataclasses import asdict
from functools import partial
from pathlib import Path
from pprint import pformat
from typing import Any, TypedDict
Expand All @@ -73,7 +73,6 @@
from lerobot.configs.eval import EvalPipelineConfig
from lerobot.envs.factory import make_env, make_env_pre_post_processors
from lerobot.envs.utils import (
add_envs_task,
check_env_attributes_and_types,
close_envs,
preprocess_observation,
Expand Down Expand Up @@ -166,9 +165,9 @@ def rollout(
if return_observations:
all_observations.append(deepcopy(observation))

# Infer "task" from attributes of environments.
# TODO: works with SyncVectorEnv but not AsyncVectorEnv
observation = add_envs_task(env, observation)
# Infer "task" from sub-environments.
# env.call() works with both SyncVectorEnv and AsyncVectorEnv.
observation["task"] = env.call("task")

# Apply environment-specific preprocessing (e.g., LiberoProcessorStep for LIBERO)
observation = env_preprocessor(observation)
Expand Down Expand Up @@ -734,34 +733,48 @@ def _append(key, value):
group_acc[group]["video_paths"].extend(paths)
overall["video_paths"].extend(paths)

def _make_thread_policy(p: PreTrainedPolicy) -> PreTrainedPolicy:
"""Shallow copy sharing weight tensors, with independent per-thread state.

copy.copy() gives a new Python object whose _parameters dict is a shared
reference (same tensor storage, zero extra VRAM). reset() then rebinds
mutable state (action queues etc.) to fresh per-thread objects.

Note: does NOT work for ACT with temporal_ensemble_coeff — that policy's
reset() mutates a shared sub-object. Use max_parallel_tasks=1 for that config.
"""
thread_p = copy.copy(p)
thread_p.reset()
return thread_p

# Choose runner (sequential vs threaded)
task_runner = partial(
run_one,
policy=policy,
env_preprocessor=env_preprocessor,
env_postprocessor=env_postprocessor,
preprocessor=preprocessor,
postprocessor=postprocessor,
n_episodes=n_episodes,
max_episodes_rendered=max_episodes_rendered,
videos_dir=videos_dir,
return_episode_data=return_episode_data,
start_seed=start_seed,
)
_runner_kwargs = {
"env_preprocessor": env_preprocessor,
"env_postprocessor": env_postprocessor,
"preprocessor": preprocessor,
"postprocessor": postprocessor,
"n_episodes": n_episodes,
"max_episodes_rendered": max_episodes_rendered,
"videos_dir": videos_dir,
"return_episode_data": return_episode_data,
"start_seed": start_seed,
}

if max_parallel_tasks <= 1:
# sequential path (single accumulator path on the main thread)
# NOTE: keeping a single-threaded accumulator avoids concurrent list appends or locks
for task_group, task_id, env in tasks:
tg, tid, metrics = task_runner(task_group, task_id, env)
tg, tid, metrics = run_one(task_group, task_id, env, policy=policy, **_runner_kwargs)
_accumulate_to(tg, metrics)
per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics})
else:
# threaded path: submit all tasks, consume completions on main thread and accumulate there
# threaded path: each thread gets a shallow policy copy (shared weights, independent state)
with cf.ThreadPoolExecutor(max_workers=max_parallel_tasks) as executor:
fut2meta = {}
for task_group, task_id, env in tasks:
fut = executor.submit(task_runner, task_group, task_id, env)
fut = executor.submit(
run_one, task_group, task_id, env, policy=_make_thread_policy(policy), **_runner_kwargs
)
fut2meta[fut] = (task_group, task_id)
for fut in cf.as_completed(fut2meta):
tg, tid, metrics = fut.result()
Expand Down