Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ lerobot-replay="lerobot.scripts.lerobot_replay:main"
lerobot-setup-motors="lerobot.scripts.lerobot_setup_motors:main"
lerobot-teleoperate="lerobot.scripts.lerobot_teleoperate:main"
lerobot-eval="lerobot.scripts.lerobot_eval:main"
lerobot-eval-parallel="lerobot.scripts.lerobot_eval_parallel:main"
lerobot-eval-autotune="lerobot.scripts.lerobot_eval_autotune:main"
lerobot-train="lerobot.scripts.lerobot_train:main"
lerobot-train-tokenizer="lerobot.scripts.lerobot_train_tokenizer:main"
lerobot-dataset-viz="lerobot.scripts.lerobot_dataset_viz:main"
Expand Down
11 changes: 11 additions & 0 deletions src/lerobot/configs/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ class EvalConfig:
# `use_async_envs` specifies whether to use asynchronous environments (multiprocessing).
# Defaults to True; automatically downgraded to SyncVectorEnv when batch_size=1.
use_async_envs: bool = True
# Sharding: split n_episodes across independent processes.
# shard_id=0, num_shards=1 is the default (no sharding, existing behaviour).
# Set via lerobot_eval_parallel or manually: --eval.shard_id=K --eval.num_shards=N
shard_id: int = 0
num_shards: int = 1

def __post_init__(self) -> None:
if self.batch_size > self.n_episodes:
Expand All @@ -80,6 +85,12 @@ def __post_init__(self) -> None:
f"to increase the number of episodes to match the batch size (e.g. `eval.n_episodes={self.batch_size}`), "
f"or lower the batch size (e.g. `eval.batch_size={self.n_episodes}`)."
)
if self.num_shards < 1:
raise ValueError(f"`num_shards` must be >= 1, got {self.num_shards}")
if not (0 <= self.shard_id < self.num_shards):
raise ValueError(
f"`shard_id` must be in [0, num_shards), got shard_id={self.shard_id}, num_shards={self.num_shards}"
)


@dataclass
Expand Down
77 changes: 57 additions & 20 deletions src/lerobot/scripts/lerobot_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,17 @@
"""

import concurrent.futures as cf
import copy
import json
import logging
import math
import threading
import time
from collections import defaultdict
from collections.abc import Callable
from contextlib import nullcontext
from copy import deepcopy
from dataclasses import asdict
from functools import partial
from pathlib import Path
from pprint import pformat
from typing import Any, TypedDict
Expand Down Expand Up @@ -92,6 +93,14 @@
)


def _shard_episodes(n_episodes: int, shard_id: int, num_shards: int) -> list[int]:
"""Return the episode indices assigned to this shard (round-robin distribution).

Example: _shard_episodes(10, 1, 4) -> [1, 5, 9]
"""
return list(range(shard_id, n_episodes, num_shards))


def rollout(
env: gym.vector.VectorEnv,
policy: PreTrainedPolicy,
Expand Down Expand Up @@ -553,6 +562,14 @@ def eval_main(cfg: EvalPipelineConfig):
# Create environment-specific preprocessor and postprocessor (e.g., for LIBERO environments)
env_preprocessor, env_postprocessor = make_env_pre_post_processors(env_cfg=cfg.env, policy_cfg=cfg.policy)

# Sharding: each shard runs a subset of n_episodes with non-overlapping seeds.
shard_id = cfg.eval.shard_id
num_shards = cfg.eval.num_shards
episodes_for_shard = _shard_episodes(cfg.eval.n_episodes, shard_id, num_shards)
n_per_shard = len(episodes_for_shard)
# Shift the seed so each shard gets a different, non-overlapping seed range.
shard_seed = (cfg.seed or 0) + shard_id * math.ceil(cfg.eval.n_episodes / num_shards)

with torch.no_grad(), torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext():
info = eval_policy_all(
envs=envs,
Expand All @@ -561,10 +578,10 @@ def eval_main(cfg: EvalPipelineConfig):
env_postprocessor=env_postprocessor,
preprocessor=preprocessor,
postprocessor=postprocessor,
n_episodes=cfg.eval.n_episodes,
n_episodes=n_per_shard,
max_episodes_rendered=10,
videos_dir=Path(cfg.output_dir) / "videos",
start_seed=cfg.seed,
start_seed=shard_seed,
max_parallel_tasks=cfg.env.max_parallel_tasks,
)
print("Overall Aggregated Metrics:")
Expand All @@ -577,8 +594,13 @@ def eval_main(cfg: EvalPipelineConfig):
# Close all vec envs
close_envs(envs)

# Save info
with open(Path(cfg.output_dir) / "eval_info.json", "w") as f:
# Save info — use shard-specific filename when running in parallel mode.
if num_shards > 1:
out_path = Path(cfg.output_dir) / f"shard_{shard_id}_of_{num_shards}.json"
else:
out_path = Path(cfg.output_dir) / "eval_info.json"
out_path.parent.mkdir(parents=True, exist_ok=True)
with open(out_path, "w") as f:
json.dump(info, f, indent=2)

logging.info("End of eval")
Expand Down Expand Up @@ -738,34 +760,49 @@ def _append(key, value):
group_acc[group]["video_paths"].extend(paths)
overall["video_paths"].extend(paths)

def _make_thread_policy(p: PreTrainedPolicy) -> PreTrainedPolicy:
"""Shallow copy sharing weight tensors, with independent per-thread state.

copy.copy() gives a new Python object whose _parameters dict is a shared
reference (same tensor storage, zero extra VRAM). reset() then rebinds
mutable state (action queues etc.) to fresh per-thread objects.

Note: does NOT work for ACT with temporal_ensemble_coeff — that policy's
reset() mutates a shared sub-object. Use max_parallel_tasks=1 for that config.
"""
thread_p = copy.copy(p)
thread_p.reset()
return thread_p

# Choose runner (sequential vs threaded)
task_runner = partial(
run_one,
policy=policy,
env_preprocessor=env_preprocessor,
env_postprocessor=env_postprocessor,
preprocessor=preprocessor,
postprocessor=postprocessor,
n_episodes=n_episodes,
max_episodes_rendered=max_episodes_rendered,
videos_dir=videos_dir,
return_episode_data=return_episode_data,
start_seed=start_seed,
)
_runner_kwargs = {
"env_preprocessor": env_preprocessor,
"env_postprocessor": env_postprocessor,
"preprocessor": preprocessor,
"postprocessor": postprocessor,
"n_episodes": n_episodes,
"max_episodes_rendered": max_episodes_rendered,
"videos_dir": videos_dir,
"return_episode_data": return_episode_data,
"start_seed": start_seed,
}

if max_parallel_tasks <= 1:
for task_group, task_id, env in tasks:
try:
tg, tid, metrics = task_runner(task_group, task_id, env)
tg, tid, metrics = run_one(task_group, task_id, env, policy=policy, **_runner_kwargs)
_accumulate_to(tg, metrics)
per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics})
finally:
env.close()
else:
# threaded path: each thread gets a shallow policy copy (shared weights, independent state)
with cf.ThreadPoolExecutor(max_workers=max_parallel_tasks) as executor:
fut2meta = {}
for task_group, task_id, env in tasks:
fut = executor.submit(task_runner, task_group, task_id, env)
fut = executor.submit(
run_one, task_group, task_id, env, policy=_make_thread_policy(policy), **_runner_kwargs
)
fut2meta[fut] = (task_group, task_id, env)
for fut in cf.as_completed(fut2meta):
tg, tid, env = fut2meta[fut]
Expand Down
Loading