diff --git a/docs/source/so101.mdx b/docs/source/so101.mdx index 1274b8282a..d0fe8f9f93 100644 --- a/docs/source/so101.mdx +++ b/docs/source/so101.mdx @@ -408,6 +408,48 @@ The video below shows how to perform the calibration. First you need to move the +#### Auto Calibrate (Follower) + +As an alternative to manual calibration, you can use the automatic calibration script for the follower arm. +This drives each joint to its mechanical limits at low torque to find the full range of motion automatically — no manual movement required. + +> [!WARNING] +> The arm will move on its own during auto-calibration. Make sure the workspace is clear and the arm is free to move without obstruction. + +```bash +lerobot-auto-calibrate-feetech \ + --port /dev/tty.usbmodem58760431551 \ + --save \ + --robot-id my_awesome_follower_arm +``` + +The script runs a 5-stage process: + +| Stage | Description | +|-------|-------------| +| 0 | **Initialize** — stops all servos, enables torque, configures PID/acceleration | +| 2 | **Unfold** — extends joints 2–4 to avoid self-collision during calibration | +| 3 | **Calibrate joints 2–6** — probes mechanical limits in order (5→6→4→3→2) | +| 4 | **Calibrate joint 1** (shoulder_pan) and return to center | +| 5 | **Confirm** — waits for user confirmation, then releases torque | + +##### Arguments + +| Argument | Required | Default | Description | +|----------|----------|---------|-------------| +| `--port` | Yes | — | Serial port path (e.g. `/dev/ttyUSB0` or `/dev/tty.usbmodemXXX`) | +| `--save` | No | off | Write calibration to servo EEPROM and save to the local calibration file | +| `--robot-id` | No | `default` | Robot ID for the calibration file path (matches `config.id` at runtime) | +| `--motor` | No | all | Calibrate only a single motor by name (e.g. `shoulder_pan`) — skips unfolding | +| `--velocity-limit` | No | 1000 | Velocity for probing mechanical limits (Goal_Velocity in constant-speed mode) | +| `--timeout` | No | 20.0 | Timeout in seconds per direction when probing limits | +| `--unfold-angle` | No | 45.0 | Unfold angle in degrees — set to `0` to skip the unfold stage | +| `--unfold-timeout` | No | 6.0 | Timeout in seconds for each unfold movement | +| `--unfold-only` | No | off | Only run initialization + unfold (stages 0+2), skip calibration — useful for debugging | + +> [!TIP] +> For questions about auto-calibration, join the [Maker-Mods Discord](https://discord.gg/4QNq2CpJ). + #### Leader Do the same steps to calibrate the leader arm, run the following command or API example: diff --git a/pyproject.toml b/pyproject.toml index 4a1efab30b..b38a7f7209 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -228,6 +228,7 @@ lerobot-find-joint-limits="lerobot.scripts.lerobot_find_joint_limits:main" lerobot-imgtransform-viz="lerobot.scripts.lerobot_imgtransform_viz:main" lerobot-edit-dataset="lerobot.scripts.lerobot_edit_dataset:main" lerobot-setup-can="lerobot.scripts.lerobot_setup_can:main" +lerobot-auto-calibrate-feetech="lerobot.scripts.lerobot_auto_calibrate_feetech:main" # ---------------- Tool Configurations ---------------- [tool.setuptools.package-data] diff --git a/src/lerobot/motors/feetech/auto_calibration.py b/src/lerobot/motors/feetech/auto_calibration.py new file mode 100644 index 0000000000..3c8ad67791 --- /dev/null +++ b/src/lerobot/motors/feetech/auto_calibration.py @@ -0,0 +1,964 @@ +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Feetech auto-calibration and WritePosEx utilities (measure mechanical range, default range, single write position+speed+acceleration).""" + +import contextlib +import logging +import time + +import scservo_sdk as scs + +from ..motors_bus import NameOrID +from .calibration_defaults import ( + DEFAULT_ACCELERATION, + DEFAULT_POS_SPEED, + DEFAULT_TIMEOUT, + FULL_TURN, + HOMING_OFFSET_MAX_MAG, + MID_POS, + OVERLOAD_SETTLE_TIME, + SAFE_IO_INTERVAL, + SAFE_IO_RETRIES, + SO_STS_DEFAULT_RANGES, + STALL_POSITION_DELTA_THRESHOLD, + STALL_VELOCITY_THRESHOLD, + UNFOLD_OVERLOAD_SETTLE, + UNFOLD_TOLERANCE_DEG, + motor_label, +) + +COMM_ERR = (RuntimeError, ConnectionError) +"""Exception types that may be raised during servo communication: RuntimeError (Overload) and ConnectionError (no status packet).""" + +logger = logging.getLogger(__name__) + + +class FeetechCalibrationMixin: + """Provides auto mechanical range measurement, default range, write_pos_ex_and_wait/wait_until_stopped.""" + + # Single instruction writes Acceleration(41) + Goal_Position(42-43) + Goal_Time(44-45) + Goal_Velocity(46-47), 7 bytes total (matches STServo WritePosEx) + _POS_EX_START_ADDR = 41 + _POS_EX_LEN = 7 + # Min_Position_Limit(9,2) + Max_Position_Limit(11,2) contiguous 4 bytes, writable in a single instruction + _POS_LIMITS_START_ADDR = 9 + _POS_LIMITS_LEN = 4 + + def get_default_range(self, motor: NameOrID) -> tuple[int, int]: + """Return the default (range_min, range_max) used during auto-calibration for the specified motor. + + If the motor name is found in calibration_defaults.SO_STS_DEFAULT_RANGES, uses the preset values; + otherwise uses the full range (0, max_res) for that model's resolution. + """ + motor_names = self._get_motors_list(motor) + name = motor_names[0] + if name in SO_STS_DEFAULT_RANGES: + return SO_STS_DEFAULT_RANGES[name] + model = self._get_motor_model(motor) + max_res = self.model_resolution_table[model] - 1 + return (0, max_res) + + def _wait_for_stall( + self, + motor: str, + stall_confirm_samples: int, + timeout_s: float, + sample_interval_s: float, + *, + velocity_threshold: int = STALL_VELOCITY_THRESHOLD, + position_delta_threshold: int = STALL_POSITION_DELTA_THRESHOLD, + ) -> str: + """Poll for stall/limit detection, return a stop-reason string. + + Prioritizes the AND condition (near-zero velocity + stable position + Moving=0), returning immediately if met; + otherwise checks Status register BIT5 (overload) or communication error, requiring stall_confirm_samples consecutive hits. + """ + stall_count = 0 + stable_count = 0 + prev_position: int | None = None + t0 = time.monotonic() + while time.monotonic() - t0 < timeout_s: + try: + vel = self.read("Present_Velocity", motor, normalize=False) + pos = self.read("Present_Position", motor, normalize=False) + moving = self.read("Moving", motor, normalize=False) + status = self.read("Status", motor, normalize=False) + except COMM_ERR: + stable_count = 0 + stall_count += 1 + if stall_count >= stall_confirm_samples: + return f"stall confirmed({stall_confirm_samples}x): communication error" + time.sleep(sample_interval_s) + continue + + # Priority: near-zero velocity AND stable position AND Moving=0 (N consecutive times) + vel_ok = abs(vel) < velocity_threshold + pos_ok = prev_position is None or abs(pos - prev_position) < position_delta_threshold + moving_ok = moving == 0 + if vel_ok and pos_ok and moving_ok: + stable_count += 1 + if stable_count >= stall_confirm_samples: + return ( + f"limit confirmed({stall_confirm_samples}x): " + "near-zero velocity + stable position + Moving=0" + ) + else: + stable_count = 0 + prev_position = pos + + # Secondary: Status BIT5 overload + if status & 0x20: + stall_count += 1 + if stall_count >= stall_confirm_samples: + return f"stall confirmed({stall_confirm_samples}x): Status=0x{status:02X}(BIT5 overload)" + else: + stall_count = 0 + + time.sleep(sample_interval_s) + return f"timeout({timeout_s}s)" + + def _wait_for_stall_multi( + self, + motors: list[str], + stall_confirm_samples: int, + timeout_s: float, + sample_interval_s: float, + *, + velocity_threshold: int = STALL_VELOCITY_THRESHOLD, + position_delta_threshold: int = STALL_POSITION_DELTA_THRESHOLD, + ) -> tuple[dict[str, str], dict[str, int]]: + """Multi-motor stall polling: monitor all simultaneously, write Goal_Velocity=0 to each as it stalls, return after all stalled or timeout. + + Returns (reasons, positions): stop reason and stall Present_Position for each motor. + """ + still_running = set(motors) + reasons: dict[str, str] = {} + positions: dict[str, int] = {} + # Per-motor state: previous position, stable count, overload count + prev_pos: dict[str, int | None] = dict.fromkeys(motors) + stable_count: dict[str, int] = dict.fromkeys(motors, 0) + stall_count: dict[str, int] = dict.fromkeys(motors, 0) + t0 = time.monotonic() + + while still_running and (time.monotonic() - t0 < timeout_s): + for m in list(still_running): + try: + vel = self.read("Present_Velocity", m, normalize=False) + pos = self.read("Present_Position", m, normalize=False) + moving = self.read("Moving", m, normalize=False) + status = self.read("Status", m, normalize=False) + except COMM_ERR: + stall_count[m] = stall_count.get(m, 0) + 1 + if stall_count[m] >= stall_confirm_samples: + reasons[m] = f"stall confirmed({stall_confirm_samples}x): communication error" + positions[m] = self._read_with_retry("Present_Position", m) + self.write("Goal_Velocity", m, 0) + still_running.discard(m) + continue + + vel_ok = abs(vel) < velocity_threshold + pos_ok = prev_pos[m] is None or abs(pos - prev_pos[m]) < position_delta_threshold + moving_ok = moving == 0 + if vel_ok and pos_ok and moving_ok: + stable_count[m] = stable_count.get(m, 0) + 1 + if stable_count[m] >= stall_confirm_samples: + reasons[m] = ( + f"limit confirmed({stall_confirm_samples}x): " + "near-zero velocity + stable position + Moving=0" + ) + positions[m] = pos + self.write("Goal_Velocity", m, 0) + still_running.discard(m) + continue + else: + stable_count[m] = 0 + prev_pos[m] = pos + + if status & 0x20: + stall_count[m] = stall_count.get(m, 0) + 1 + if stall_count[m] >= stall_confirm_samples: + reasons[m] = ( + f"stall confirmed({stall_confirm_samples}x): Status=0x{status:02X}(BIT5 overload)" + ) + positions[m] = pos + self.write("Goal_Velocity", m, 0) + still_running.discard(m) + else: + stall_count[m] = 0 + + time.sleep(sample_interval_s) + + for m in still_running: + reasons[m] = f"timeout({timeout_s}s)" + try: + positions[m] = self.read("Present_Position", m, normalize=False) + except COMM_ERR: + positions[m] = 0 + with contextlib.suppress(COMM_ERR): + self.write("Goal_Velocity", m, 0) + return reasons, positions + + def _prepare_motors_for_range_measure(self, motors: list[str]) -> None: + """Prepare motors for range measurement: clear overload, disable torque, set Phase(BIT4=0), Homing_Offset=0, velocity mode, enable torque.""" + from .feetech import OperatingMode + + for m in motors: + self._safe_stop_and_clear_overload(m) + # self.disable_torque(m) + for m in motors: + phase_raw = self.read("Phase", m, normalize=False) + if phase_raw & 0x10: + self.write("Phase", m, phase_raw & ~0x10, normalize=False) + print( + f" [{motor_label(m)}] Phase(reg18): 0x{phase_raw:02X} -> 0x{phase_raw & ~0x10:02X} (BIT4=0 single-turn)" + ) + else: + print(f" [{motor_label(m)}] Phase(reg18): 0x{phase_raw:02X} (already single-turn)") + self.write("Homing_Offset", m, 0, normalize=False) + self.write("Operating_Mode", m, OperatingMode.VELOCITY.value) + mode = self.read("Operating_Mode", m, normalize=False) + if len(motors) == 1: + print(f" [{motor_label(m)}] Operating_Mode={mode} (expected 1, velocity mode)") + self.enable_torque(m) + if motors: + time.sleep(0.1) + + def _run_direction_until_stall( + self, + motors: list[str], + velocity: int | dict[str, int], + *, + stall_confirm_samples: int = 2, + timeout_s: float = 10.0, + sample_interval_s: float = 0.05, + initial_move_delay_s: float = 0.5, + ) -> tuple[dict[str, str], dict[str, int]]: + """Start specified motors with a single command, poll for stall then stop and clear overload. velocity can be a single int or per-motor dict. Returns (per-motor stop reason, per-motor stall position).""" + vel_dict = velocity if isinstance(velocity, dict) else dict.fromkeys(motors, velocity) + self.sync_write( + "Goal_Velocity", + vel_dict, + normalize=False, + ) + time.sleep(initial_move_delay_s) + if len(motors) == 1: + m = motors[0] + reason = self._wait_for_stall(m, stall_confirm_samples, timeout_s, sample_interval_s) + reasons = {m: reason} + positions = {m: self._read_with_retry("Present_Position", m)} + else: + reasons, positions = self._wait_for_stall_multi( + motors, stall_confirm_samples, timeout_s, sample_interval_s + ) + # for m in motors: + # self._safe_stop_and_clear_overload(m) + return reasons, positions + + def _compute_mid_and_range_from_limits( + self, + motor: str, + pos_cw: int, + pos_ccw: int, + *, + move_timeout: float = 5.0, + reference_pos: int | None = None, + ) -> tuple[int, int, int, int, int, int]: + """From CW/CCW stall positions, perform backoff, compute physical midpoint and range. Returns (range_min, range_max, mid, raw_min, raw_max, homing_offset). + If reference_pos is provided (e.g. (Present_Position+Homing_Offset)%FULL_TURN sampled during lift), skips backoff and uses it directly for arc selection.""" + arc_ccw_to_cw = (pos_cw - pos_ccw) % FULL_TURN + arc_cw_to_ccw = (pos_ccw - pos_cw) % FULL_TURN + if reference_pos is not None: + start_pos = reference_pos + print( + f" [{motor_label(motor)}] Using pre-sampled reference position start_pos={start_pos}, skipping limit backoff" + ) + else: + print("Probing reference position...") + shortest_arc = min(arc_ccw_to_cw, arc_cw_to_ccw) + steps_back = max(1, shortest_arc // 3) + back_deg = steps_back * 360.0 / FULL_TURN + print( + f" [{motor_label(motor)}] long arc: {max(arc_ccw_to_cw, arc_cw_to_ccw)} steps, " + f"short arc: {min(arc_ccw_to_cw, arc_cw_to_ccw)} steps, " + f"backoff: {steps_back} steps ({back_deg:.1f}°)" + ) + self.unfold_single_joint(motor, back_deg, move_timeout=move_timeout) + time.sleep(0.1) + present_raw = self._read_with_retry("Present_Position", motor) + homing_raw = self._read_with_retry("Homing_Offset", motor) + start_pos = (present_raw + homing_raw) % FULL_TURN + print( + f" [{motor_label(motor)}] Backed off {steps_back} steps ({back_deg:.1f}°) from limit, " + f"reference position: present={present_raw}, offset={homing_raw}, actual={start_pos}" + ) + arc_ccw_to_cw = (pos_cw - pos_ccw) % FULL_TURN + arc_cw_to_ccw = (pos_ccw - pos_cw) % FULL_TURN + start_in_arc_a = (start_pos - pos_ccw) % FULL_TURN <= arc_ccw_to_cw + if start_in_arc_a: + physical_range = arc_ccw_to_cw + mid = (pos_ccw + physical_range // 2) % FULL_TURN + else: + physical_range = arc_cw_to_ccw + mid = (pos_cw + physical_range // 2) % FULL_TURN + raw_min = min(pos_cw, pos_ccw) + raw_max = max(pos_cw, pos_ccw) + homing_offset = mid - MID_POS + homing_offset = max( + -HOMING_OFFSET_MAX_MAG, + min(HOMING_OFFSET_MAX_MAG, homing_offset), + ) + half = physical_range // 2 + range_min = max(0, min(FULL_TURN - 1, MID_POS - half)) + range_max = max(0, min(FULL_TURN - 1, MID_POS + half)) + crosses_zero = pos_ccw > pos_cw + print( + f" [{motor_label(motor)}] CW={pos_cw}, CCW={pos_ccw}, computed reference={start_pos}, " + f"range steps={physical_range} ({physical_range * 360 / FULL_TURN:.1f}°), " + f"physical midpoint={mid}, crosses zero={crosses_zero}" + ) + return range_min, range_max, mid, raw_min, raw_max, homing_offset + + def _safe_stop_and_clear_overload(self, motor: str, settle_s: float = 0.5) -> None: + """Safe stop after stall: write Goal_Velocity=0, disable torque, wait for overload/communication error to clear.""" + for _ in range(5): + try: + self.write("Goal_Velocity", motor, 0) + break + except COMM_ERR: + time.sleep(0.1) + for _ in range(5): + try: + self.disable_torque(motor) + break + except COMM_ERR: + time.sleep(0.1) + time.sleep(settle_s) + + def _read_with_retry(self, data_name: str, motor: str, retries: int = 5, interval_s: float = 0.2) -> int: + """Read with retries, used for reading registers during overload/communication error recovery.""" + for i in range(retries): + try: + return self.read(data_name, motor, normalize=False) + except COMM_ERR as e: + if i < retries - 1: + time.sleep(interval_s) + continue + raise RuntimeError( + f"_read_with_retry: all {retries} attempts failed for {data_name} on {motor}: {e}" + ) from e + raise RuntimeError(f"_read_with_retry: unable to read {data_name} on {motor}") + + def safe_read( + self, + reg: str, + motor: NameOrID, + *, + retries: int = SAFE_IO_RETRIES, + interval_s: float = SAFE_IO_INTERVAL, + ) -> int: + """Safe read with retries.""" + return self._read_with_retry( + reg, self._get_motors_list(motor)[0], retries=retries, interval_s=interval_s + ) + + def safe_write( + self, + reg: str, + motor: NameOrID, + value: int, + *, + normalize: bool = True, + retries: int = SAFE_IO_RETRIES, + interval_s: float = SAFE_IO_INTERVAL, + ) -> None: + """Safe write with retries.""" + motor_name = self._get_motors_list(motor)[0] + for i in range(retries): + try: + self.write(reg, motor_name, value, normalize=normalize) + return + except COMM_ERR as e: + if i < retries - 1: + time.sleep(interval_s) + continue + raise RuntimeError( + f"safe_write: all {retries} attempts failed for {reg}={value} on {motor_name}: {e}" + ) from e + raise RuntimeError(f"safe_write: unable to write {reg} on {motor_name}") + + def _write_torque_with_recovery( + self, motor: str, value: int, retries: int = 3, interval_s: float = 0.5 + ) -> None: + """Write Torque_Enable with type-specific error recovery and retry. + + - RuntimeError (Overload): disable torque to clear overload status, wait, then retry. + - ConnectionError (no response): wait and retry directly. + Raises if all retries are exhausted. + """ + for attempt in range(retries): + try: + self.write("Torque_Enable", motor, value) + return + except RuntimeError as e: + # Overload: disable torque to clear, wait, then retry + if attempt < retries - 1: + print( + f" [{motor_label(motor)}] Torque_Enable={value} Overload, clearing and retrying " + f"({attempt + 1}/{retries}): {e}" + ) + with contextlib.suppress(COMM_ERR): + self.write("Torque_Enable", motor, 0) + time.sleep(interval_s) + else: + raise RuntimeError( + f"_write_torque_with_recovery: all {retries} attempts failed for Torque_Enable={value} on {motor}: {e}" + ) from e + except ConnectionError as e: + # No response: wait and retry + if attempt < retries - 1: + print( + f" [{motor_label(motor)}] Torque_Enable={value} no response, waiting and retrying " + f"({attempt + 1}/{retries}): {e}" + ) + time.sleep(interval_s) + else: + raise RuntimeError( + f"_write_torque_with_recovery: all {retries} attempts failed for Torque_Enable={value} on {motor}: {e}" + ) from e + + def _clear_and_enable_torque(self, motor: str, settle_s: float = OVERLOAD_SETTLE_TIME) -> None: + """Clear overload and re-enable torque: disable torque and wait, then enable torque with recovery retry. + + Used after stall stop when reverse motion is needed, replacing bare enable_torque calls. + """ + # Disable torque to clear overload status (with retry logic) + for _ in range(5): + try: + self.write("Torque_Enable", motor, 0) + break + except COMM_ERR: + time.sleep(0.1) + time.sleep(settle_s) + # Enable torque with recovery retry + self._write_torque_with_recovery(motor, 1) + with contextlib.suppress(COMM_ERR): + self.write("Lock", motor, 1) + + def safe_write_position_limits( + self, + motor: NameOrID, + rmin: int, + rmax: int, + *, + retries: int = SAFE_IO_RETRIES, + interval_s: float = SAFE_IO_INTERVAL, + ) -> None: + """Safe write of Min/Max position limits with retries (single instruction).""" + motor_name = self._get_motors_list(motor)[0] + for i in range(retries): + try: + self.write_position_limits(motor_name, rmin, rmax) + return + except COMM_ERR as e: + if i < retries - 1: + time.sleep(interval_s) + continue + raise RuntimeError( + f"safe_write_position_limits: all {retries} attempts failed for rmin={rmin} rmax={rmax} on {motor_name}: {e}" + ) from e + raise RuntimeError(f"safe_write_position_limits: unable to write on {motor_name}") + + def safe_disable_all( + self, + motor_names: list[str] | None = None, + *, + num_try_per_motor: int = 3, + interval_s: float = 0.1, + ) -> None: + """Safely disable torque on all motors, ignoring communication errors.""" + names = motor_names if motor_names is not None else list(self.motors.keys()) + for m in names: + for _ in range(num_try_per_motor): + try: + self.write("Torque_Enable", m, 0) + break + except COMM_ERR: + time.sleep(interval_s) + + def go_to_mid( + self, + motor: NameOrID, + *, + timeout_s: float = DEFAULT_TIMEOUT, + poll_interval_s: float = 0.05, + ) -> bool: + """Move motor to midpoint (servo mode), wait until reached. Returns True if reached, False if timed out.""" + motor_name = self._get_motors_list(motor)[0] + ok = self.write_pos_ex_and_wait( + motor_name, + MID_POS, + DEFAULT_POS_SPEED, + DEFAULT_ACCELERATION, + timeout_s=timeout_s, + poll_interval_s=poll_interval_s, + ) + try: + cur = self.read("Present_Position", motor_name, normalize=False) + except COMM_ERR: + cur = -1 + if not ok: + logger.warning( + "%s return to mid timed out (%.1fs), current position=%s", + motor_label(motor_name), + timeout_s, + cur, + ) + return ok + + def measure_ranges_of_motion( + self, + motor: NameOrID, + *, + velocity_limit: int = 1000, + stall_confirm_samples: int = 2, + timeout_s: float = 10.0, + sample_interval_s: float = 0.05, + initial_move_delay_s: float = 0.5, + ) -> tuple[int, int, int, int, int, int]: + """Automatically measure the mechanical limit range of a single motor (velocity mode). + + Only one motor may be passed. Torque (Max_Torque_Limit, Torque_Limit) and acceleration (Acceleration) + should be initialized before calling; this method only drives via Goal_Velocity. + + Procedure: + 1. Set register 18 BIT4=0 (single-turn angle feedback, 0-4095) + 2. Zero out Homing_Offset + 3. Switch to Operating_Mode=1 (velocity mode), drive CW/CCW via Goal_Velocity + 4. Detect stall via Status register BIT5, obtain CW/CCW limits + 5. Independent of start position: back off from CCW limit in the reverse direction by some steps to get reference point P. + Backoff steps = min(distance to 0, distance to 4095) / 2; if CCW limit is exactly 0 or 4095, use CW limit for calculation. + + Returns: + (range_min, range_max, mid, raw_min, raw_max, homing_offset) - six integers: + - range_min, range_max: limits in offset space (0~4095, non-wrapping), for writing Min/Max position limits; + - mid: measured physical midpoint (raw encoding); + - raw_min, raw_max: measured extremes (raw encoding, may wrap around zero); + - homing_offset: offset value mid - MID_POS, for writing Homing_Offset. + """ + motor = self._get_motors_list(motor)[0] + self._prepare_motors_for_range_measure([motor]) + + cw_reasons, pos_cw_dict = self._run_direction_until_stall( + [motor], + velocity_limit, + stall_confirm_samples=stall_confirm_samples, + timeout_s=timeout_s, + sample_interval_s=sample_interval_s, + initial_move_delay_s=initial_move_delay_s, + ) + print(f" [{motor_label(motor)}] CW stop reason: {cw_reasons[motor]}") + print(f" [{motor_label(motor)}] CW stall position: {pos_cw_dict[motor]}") + + self._clear_and_enable_torque(motor) + time.sleep(0.05) + ccw_reasons, pos_ccw_dict = self._run_direction_until_stall( + [motor], + -velocity_limit, + stall_confirm_samples=stall_confirm_samples, + timeout_s=timeout_s, + sample_interval_s=sample_interval_s, + initial_move_delay_s=initial_move_delay_s, + ) + print(f" [{motor_label(motor)}] CCW stop reason: {ccw_reasons[motor]}") + print(f" [{motor_label(motor)}] CCW stall position: {pos_ccw_dict[motor]}") + + return self._compute_mid_and_range_from_limits(motor, pos_cw_dict[motor], pos_ccw_dict[motor]) + + def measure_ranges_of_motion_multi( + self, + motors: list[str], + *, + velocity_limit: int = 1000, + stall_confirm_samples: int = 2, + timeout_s: float = 10.0, + sample_interval_s: float = 0.05, + initial_move_delay_s: float = 0.5, + ccw_first: bool | dict[str, bool] = False, + reference_positions: dict[str, int] | None = None, + ) -> dict[str, tuple[int, int, int, int, int, int]]: + """Multi-motor simultaneous mechanical limit measurement: start all at once, poll for stall (write 0 to each as it stops), read positions after all stop; compute backoff and midpoint per motor individually. + + ccw_first: if True, that motor runs CCW then CW; if dict, specifies per motor (motor_name -> True/False). + reference_positions: if a motor's reference position ((Present_Position+Homing_Offset)%FULL_TURN) is provided, skip limit backoff and use it directly for arc selection. + Returns dict[motor_name, (range_min, range_max, mid, raw_min, raw_max, homing_offset)]. + """ + if not motors: + return {} + if len(motors) == 1: + m = motors[0] + t = self.measure_ranges_of_motion( + m, + velocity_limit=velocity_limit, + stall_confirm_samples=stall_confirm_samples, + timeout_s=timeout_s, + sample_interval_s=sample_interval_s, + initial_move_delay_s=initial_move_delay_s, + ) + return {m: t} + + self._prepare_motors_for_range_measure(motors) + + # Per-motor direction: CCW first or CW first + def _ccw_first(m: str) -> bool: + return ccw_first.get(m, False) if isinstance(ccw_first, dict) else bool(ccw_first) + + first_vel_dict = {m: (-velocity_limit if _ccw_first(m) else velocity_limit) for m in motors} + second_vel_dict = {m: (velocity_limit if _ccw_first(m) else -velocity_limit) for m in motors} + + first_reasons, first_pos = self._run_direction_until_stall( + motors, + first_vel_dict, + stall_confirm_samples=stall_confirm_samples, + timeout_s=timeout_s, + sample_interval_s=sample_interval_s, + initial_move_delay_s=initial_move_delay_s, + ) + for m in motors: + label = "CCW" if first_vel_dict[m] < 0 else "CW" + print( + f" [{motor_label(m)}] {label} stop reason: {first_reasons[m]}, stall position: {first_pos[m]}" + ) + for m in motors: + self._clear_and_enable_torque(m) + time.sleep(0.05) + second_reasons, second_pos = self._run_direction_until_stall( + motors, + second_vel_dict, + stall_confirm_samples=stall_confirm_samples, + timeout_s=timeout_s, + sample_interval_s=sample_interval_s, + initial_move_delay_s=initial_move_delay_s, + ) + for m in motors: + label = "CCW" if second_vel_dict[m] < 0 else "CW" + print( + f" [{motor_label(m)}] {label} stop reason: {second_reasons[m]}, stall position: {second_pos[m]}" + ) + time.sleep(OVERLOAD_SETTLE_TIME) + + # Reconstruct pos_cw, pos_ccw per motor based on first/second direction (CW = positive velocity limit, CCW = negative velocity limit) + pos_cw_dict = { + m: (first_pos[m] if first_vel_dict[m] == velocity_limit else second_pos[m]) for m in motors + } + pos_ccw_dict = { + m: (second_pos[m] if first_vel_dict[m] == velocity_limit else first_pos[m]) for m in motors + } + + result: dict[str, tuple[int, int, int, int, int, int]] = {} + for m in motors: + ref_pos = reference_positions.get(m) if reference_positions else None + result[m] = self._compute_mid_and_range_from_limits( + m, pos_cw_dict[m], pos_ccw_dict[m], reference_pos=ref_pos + ) + return result + + def _write_raw_bytes( + self, + addr: int, + motor_id: int, + data: list[int], + *, + num_retry: int = 0, + raise_on_error: bool = True, + err_msg: str = "", + ) -> tuple[int, int]: + """Write raw byte sequence to a register starting address (used by write_pos_ex_and_wait, sync_write_pos_ex, etc.).""" + for n_try in range(1 + num_retry): + comm, error = self.packet_handler.writeTxRx(self.port_handler, motor_id, addr, len(data), data) + if self._is_comm_success(comm): + break + logger.debug( + f"_write_raw_bytes @{addr} len={len(data)} id={motor_id} try={n_try}: " + + self.packet_handler.getTxRxResult(comm) + ) + if not self._is_comm_success(comm) and raise_on_error: + raise ConnectionError(f"{err_msg} {self.packet_handler.getTxRxResult(comm)}") + if self._is_error(error) and raise_on_error: + raise RuntimeError(f"{err_msg} {self.packet_handler.getRxPacketError(error)}") + return comm, error + + def write_position_limits( + self, + motor: NameOrID, + rmin: int, + rmax: int, + *, + num_retry: int = 0, + ) -> None: + """Write Min_Position_Limit(9) + Max_Position_Limit(11) in a single instruction, 4 bytes total.""" + id_ = self._get_motor_id(motor) + data = self._split_into_byte_chunks(rmin, 2) + self._split_into_byte_chunks(rmax, 2) + err_msg = f"write_position_limits(id={id_}, rmin={rmin}, rmax={rmax}) failed" + self._write_raw_bytes( + self._POS_LIMITS_START_ADDR, + id_, + data, + num_retry=num_retry, + raise_on_error=True, + err_msg=err_msg, + ) + + def read_position_limits(self, motor: NameOrID) -> tuple[int, int]: + """Read Min_Position_Limit and Max_Position_Limit, return (rmin, rmax).""" + rmin = self.read("Min_Position_Limit", motor, normalize=False) + rmax = self.read("Max_Position_Limit", motor, normalize=False) + return (rmin, rmax) + + def wait_until_stopped( + self, + motor: NameOrID, + timeout_s: float = 10.0, + poll_interval_s: float = 0.05, + ) -> bool: + """Poll the Moving register until it reads 0 or timeout (consistent with STServo read_write example). + + Returns True if stopped (Moving==0), False if timed out or read failed. + """ + t0 = time.monotonic() + while time.monotonic() - t0 < timeout_s: + try: + moving = self.read("Moving", motor, normalize=False) + except COMM_ERR: + logger.debug("wait_until_stopped: communication error reading Moving") + time.sleep(poll_interval_s) + continue + if moving == 0: + return True + time.sleep(poll_interval_s) + return False + + def write_pos_ex_and_wait( + self, + motor: NameOrID, + position: int, + speed: int, + acc: int, + timeout_s: float = 10.0, + poll_interval_s: float = 0.05, + *, + num_retry: int = 0, + ) -> bool: + """Write position+speed+acceleration in a single instruction, then poll Moving until stopped (consistent with STServo read_write example). + + Ensures servo mode (Operating_Mode=0) first, then writes Goal_Position/Goal_Velocity/Acceleration, then wait_until_stopped. + Returns True if position reached, False if timed out or write failed. + """ + try: + self.write("Operating_Mode", motor, 0) # Ensure servo mode, otherwise Goal_Position has no effect + time.sleep(0.05) + # Single instruction writes goal position, speed, acceleration (7 consecutive bytes from register 41), matching STServo WritePosEx + id_ = self._get_motor_id(motor) + pos_enc = self._encode_sign("Goal_Position", {id_: position})[id_] + speed_enc = self._encode_sign("Goal_Velocity", {id_: speed})[id_] + data = ( + [acc] + + self._split_into_byte_chunks(pos_enc, 2) + + [0, 0] + + self._split_into_byte_chunks(speed_enc, 2) + ) + self._write_raw_bytes( + self._POS_EX_START_ADDR, + id_, + data, + num_retry=num_retry, + raise_on_error=True, + err_msg=f"write_pos_ex_and_wait(id={id_}, pos={position}, speed={speed}, acc={acc}) failed", + ) + time.sleep(0.3) + except COMM_ERR: + return False + result = self.wait_until_stopped(motor, timeout_s=timeout_s, poll_interval_s=poll_interval_s) + time.sleep(0.1) + return result + + def sync_write_pos_ex( + self, + values: dict[str, tuple[int, int, int]], + *, + num_retry: int = 0, + ) -> None: + """Multi-motor RegWritePosEx to buffer, then RegAction to execute simultaneously (consistent with STServo reg_write example). + + values: motor_name -> (position, speed, acc). Multiple motors can share the same (position, speed, acc) values. + """ + + for motor_name, (position, speed, acc) in values.items(): + id_ = self._get_motor_id(motor_name) + self._get_motor_model(motor_name) + pos_enc = self._encode_sign("Goal_Position", {id_: position})[id_] + speed_enc = self._encode_sign("Goal_Velocity", {id_: speed})[id_] + data = ( + [acc] + + self._split_into_byte_chunks(pos_enc, 2) + + [0, 0] + + self._split_into_byte_chunks(speed_enc, 2) + ) + for _n_try in range(1 + num_retry): + comm, error = self.packet_handler.regWriteTxRx( + self.port_handler, id_, self._POS_EX_START_ADDR, len(data), data + ) + if self._is_comm_success(comm): + break + logger.debug( + f"sync_write_pos_ex RegWrite id={id_}: {self.packet_handler.getTxRxResult(comm)}" + ) + if self._is_error(error): + logger.warning( + f"sync_write_pos_ex RegWrite id={id_}: {self.packet_handler.getRxPacketError(error)}" + ) + comm = self.packet_handler.action(self.port_handler, scs.BROADCAST_ID) + if not self._is_comm_success(comm): + raise ConnectionError( + f"sync_write_pos_ex RegAction failed: {self.packet_handler.getTxRxResult(comm)}" + ) + + def _unfold_move_and_wait( + self, + motor: str, + goal: int, + timeout_s: float, + tolerance_deg: float = UNFOLD_TOLERANCE_DEG, + ) -> tuple[bool, int, str]: + """In servo mode, write goal via WritePosEx, poll Moving until stopped, then check position/Status to determine if reached or stalled. Error within tolerance_deg degrees is considered success (default 5 deg).""" + goal = max(0, min(goal, FULL_TURN - 1)) + pos_now = self._read_with_retry("Present_Position", motor) + print(f" [{motor_label(motor)}] current position={pos_now}, goal position={goal}") + ok = self.write_pos_ex_and_wait( + motor, + goal, + DEFAULT_POS_SPEED, + DEFAULT_ACCELERATION, + timeout_s=timeout_s, + poll_interval_s=0.05, + ) + print(f" [{motor_label(motor)}] move completed: {ok}, checking position") + time.sleep(0.3) + try: + pos = self.read("Present_Position", motor, normalize=False) + except COMM_ERR: + self._clear_overload_unfold(motor) + pos = self._read_with_retry("Present_Position", motor) + err_deg = abs(pos - goal) * 360.0 / FULL_TURN + print( + f" [{motor_label(motor)}] stall/error stop: pos={pos} ({abs(pos - goal)} steps from goal ≈ {err_deg:.1f}°)" + ) + return False, pos, "stall(communication error)" + if not ok: + self._clear_overload_unfold(motor) + err_deg = abs(pos - goal) * 360.0 / FULL_TURN + print( + f" [{motor_label(motor)}] timeout: pos={pos} ({abs(pos - goal)} steps from goal ≈ {err_deg:.1f}°)" + ) + return False, pos, "timeout" + error_deg = abs(pos - goal) * 360.0 / FULL_TURN + if error_deg <= tolerance_deg: + print( + f" [{motor_label(motor)}] reached goal: pos={pos} (error {abs(pos - goal)} steps ≈ {error_deg:.1f}°, within {tolerance_deg}°)" + ) + return True, pos, "reached" + try: + status = self.read("Status", motor, normalize=False) + except COMM_ERR: + status = 0 + if status & 0x20: + self._clear_overload_unfold(motor) + print( + f" [{motor_label(motor)}] stall stop: pos={pos} (Status=0x{status:02X} BIT5 overload, " + f"{abs(pos - goal)} steps from goal ≈ {error_deg:.1f}°)" + ) + return False, pos, f"stall(Status=0x{status:02X})" + print( + f" [{motor_label(motor)}] stopped short: pos={pos} ({abs(pos - goal)} steps from goal ≈ {error_deg:.1f}°, exceeds {tolerance_deg}°)" + ) + return False, pos, "not reached" + + def _clear_overload_unfold(self, motor: str) -> None: + """Disable torque to clear overload status, wait for recovery, then re-enable torque (used by unfold logic).""" + try: + self.write("Torque_Enable", motor, 0) + time.sleep(UNFOLD_OVERLOAD_SETTLE + 0.1) + self.write("Torque_Enable", motor, 1) + except COMM_ERR: + pass + + def unfold_single_joint( + self, + motor: str, + unfold_angle: float, + move_timeout: float, + ) -> tuple[str | None, int]: + """Unfold a single joint: try forward direction first, then reverse if forward fails. + + PID/Acceleration/Operating_Mode=0 are already configured during initialization; + in servo mode, only Goal_Position needs to be written to drive the motor. + + Returns: + (direction, steps): direction is "forward"/"reverse", or (None, 0) on failure; steps is the target step count. + """ + target_steps = int(unfold_angle / 360.0 * FULL_TURN) + print(f"\n--- Unfold {motor_label(motor)} ({target_steps} steps ≈ {unfold_angle:.1f}°) ---") + + # Calibrate midpoint (Torque_Enable=128 sets current position to 2048) + self._write_torque_with_recovery(motor, 128) + self._write_torque_with_recovery(motor, 1) + + time.sleep(0.1) + print( + f"[{motor_label(motor)}] Set current position as midpoint: Present_Position={self._read_with_retry('Present_Position', motor)}" + ) + time.sleep(0.1) + # Restore servo mode (128 may have changed mode state) and enable torque + self.write("Operating_Mode", motor, 0) + self._write_torque_with_recovery(motor, 1) + time.sleep(0.3) + # Try forward direction + # print(f" [{motor_label(motor)}] Trying forward...") + reached, pos_after, reason = self._unfold_move_and_wait(motor, MID_POS + target_steps, move_timeout) + if reached: + print(f" [{motor_label(motor)}] forward direction succeeded") + return "forward", target_steps + + abs(pos_after - MID_POS) + # print(f" [{motor_label(motor)}] forward failed ({reason}), moved {moved_fwd} steps") + + # Return to midpoint (after stall, _clear_overload_unfold has re-enabled torque; write midpoint with wait) + self.write_pos_ex_and_wait( + motor, + MID_POS, + DEFAULT_POS_SPEED, + DEFAULT_ACCELERATION, + timeout_s=5.0, + poll_interval_s=0.05, + ) + + # Try reverse direction + # print(f" [{motor_label(motor)}] Trying reverse...") + reached, pos_after, reason = self._unfold_move_and_wait(motor, MID_POS - target_steps, move_timeout) + if reached: + print(f" [{motor_label(motor)}] reverse direction succeeded") + return "reverse", target_steps + abs(MID_POS - pos_after) + # print(f" [{motor_label(motor)}] reverse also failed ({reason}), moved {moved_rev} steps, keeping current position") + print(f" [{motor_label(motor)}] unfold failed, keeping current position") + return None, 0 diff --git a/src/lerobot/motors/feetech/calibration_defaults.py b/src/lerobot/motors/feetech/calibration_defaults.py new file mode 100644 index 0000000000..079d9ee27c --- /dev/null +++ b/src/lerobot/motors/feetech/calibration_defaults.py @@ -0,0 +1,115 @@ +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Default configuration and constants for Feetech STS/SO servo auto-calibration. + +Shared by auto_calibration.py, lerobot_auto_calibrate_feetech.py, etc. +""" + +from .. import Motor, MotorNormMode + +# --------------------------------------------------------------------------- +# Default position ranges (range_min, range_max) +# --------------------------------------------------------------------------- +# Default mapping from SO/STS joint names to (range_min, range_max) in raw encoder values (4096 resolution). +# Joints not listed here will use the full range (0, max_res). +SO_STS_DEFAULT_RANGES: dict[str, tuple[int, int]] = { + "shoulder_pan": (0, 4095), + "shoulder_lift": (0, 4095), + "elbow_flex": (0, 4095), + "wrist_flex": (0, 4095), + "wrist_roll": (0, 4095), + "gripper": (0, 4095), +} + + +# SO 6-DOF arm joint names -> motor IDs (used for display, etc.) +SO_MOTOR_NUMBERS: dict[str, int] = { + "shoulder_pan": 1, + "shoulder_lift": 2, + "elbow_flex": 3, + "wrist_flex": 4, + "wrist_roll": 5, + "gripper": 6, +} + +# SO 6-DOF arm joint name list (same order as SO_MOTOR_NUMBERS) +MOTOR_NAMES: list[str] = list(SO_MOTOR_NUMBERS.keys()) + +# SO calibration motor table (name -> Motor, normalization mode for calibration/scripts) +SO_FOLLOWER_MOTORS: dict[str, Motor] = { + "shoulder_pan": Motor(1, "sts3215", MotorNormMode.RANGE_M100_100), + "shoulder_lift": Motor(2, "sts3215", MotorNormMode.RANGE_M100_100), + "elbow_flex": Motor(3, "sts3215", MotorNormMode.RANGE_M100_100), + "wrist_flex": Motor(4, "sts3215", MotorNormMode.RANGE_M100_100), + "wrist_roll": Motor(5, "sts3215", MotorNormMode.RANGE_M100_100), + "gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100), +} + + +def motor_label(name: str) -> str: + """Motor label for display: name(id), e.g. shoulder_pan(1).""" + n = SO_MOTOR_NUMBERS.get(name, "") + return f"{name}({n})" if n != "" else name + + +# --------------------------------------------------------------------------- +# Resolution and midpoint (4096 steps per revolution) +# --------------------------------------------------------------------------- +FULL_TURN = 4096 +MID_POS = 2047 +STS_HALF_TURN_RAW = 2047 # Same as MID_POS; used for centering and normalization + +# Homing_Offset register uses 12-bit sign-magnitude encoding (sign_bit_index=11), range [-2047, 2047] +HOMING_OFFSET_MAX_MAG = 2047 + +# --------------------------------------------------------------------------- +# Calibration / measurement parameters +# --------------------------------------------------------------------------- +DEFAULT_VELOCITY_LIMIT = 1000 # Limit-seeking velocity during calibration (constant-speed mode Goal_Velocity) +DEFAULT_MAX_TORQUE = 1000 # Maximum torque (Max_Torque_Limit) +DEFAULT_TORQUE_LIMIT = 800 # Torque limit (Torque_Limit) +DEFAULT_ACCELERATION = 50 # Acceleration (matches project configure_motors) +DEFAULT_POS_SPEED = 1000 # Default speed for servo mode WritePosEx +DEFAULT_P_COEFFICIENT = 16 # PID P coefficient (matches so_follower) +DEFAULT_I_COEFFICIENT = 0 # PID I coefficient +DEFAULT_D_COEFFICIENT = 32 # PID D coefficient +DEFAULT_TIMEOUT = 20.0 # Timeout for single-direction limit seeking during calibration (seconds) +POSITION_TOLERANCE = 20 # Tolerance for reaching target position (steps) +# Stall detection AND conditions: near-zero velocity + stable position + Moving=0 (preferred over Status BIT5) +STALL_VELOCITY_THRESHOLD = ( + 3 # Velocity near-zero threshold (|Present_Velocity| below this is considered stopped) +) +STALL_POSITION_DELTA_THRESHOLD = ( + 3 # Position change between samples below this step count is considered stationary +) +OVERLOAD_SETTLE_TIME = 0.2 # Wait time after stall before disabling torque (seconds) +SAFE_IO_RETRIES = 5 # Number of retries for safe read/write operations +SAFE_IO_INTERVAL = 0.2 # Interval between safe read/write retries (seconds) + +# --------------------------------------------------------------------------- +# Unfold parameters +# --------------------------------------------------------------------------- +DEFAULT_UNFOLD_ANGLE = 45.0 # Unfold angle (degrees) +DEFAULT_UNFOLD_TIMEOUT = 6.0 # Timeout for a single unfold movement (seconds) +UNFOLD_OVERLOAD_SETTLE = 0.3 # Wait time after stall during unfold before disabling torque (seconds) +UNFOLD_TOLERANCE_DEG = 5.0 # Unfold position tolerance: error within this many degrees is considered success + +# --------------------------------------------------------------------------- +# Calibration / unfold order (SO 6-DOF arm) +# --------------------------------------------------------------------------- +CALIBRATE_FIRST: list[str] = ["shoulder_pan"] +CALIBRATE_REST: list[str] = ["wrist_roll", "gripper", "wrist_flex", "elbow_flex", "shoulder_lift"] +UNFOLD_ORDER: list[str] = ["wrist_flex", "elbow_flex", "shoulder_lift"] diff --git a/src/lerobot/motors/feetech/feetech.py b/src/lerobot/motors/feetech/feetech.py index 58a65310de..779ae2bf70 100644 --- a/src/lerobot/motors/feetech/feetech.py +++ b/src/lerobot/motors/feetech/feetech.py @@ -19,6 +19,7 @@ from ..encoding_utils import decode_sign_magnitude, encode_sign_magnitude from ..motors_bus import Motor, MotorCalibration, NameOrID, SerialMotorsBus, Value, get_address +from .auto_calibration import FeetechCalibrationMixin from .tables import ( FIRMWARE_MAJOR_VERSION, FIRMWARE_MINOR_VERSION, @@ -95,7 +96,7 @@ def patch_setPacketTimeout(self, packet_length): # noqa: N802 self.packet_timeout = (self.tx_time_per_byte * packet_length) + (self.tx_time_per_byte * 3.0) + 50 -class FeetechMotorsBus(SerialMotorsBus): +class FeetechMotorsBus(FeetechCalibrationMixin, SerialMotorsBus): """ The FeetechMotorsBus class allows to efficiently read and write to the attached motors. It relies on the python feetech sdk to communicate with the motors, which is itself based on the dynamixel sdk. diff --git a/src/lerobot/scripts/lerobot_auto_calibrate_feetech.py b/src/lerobot/scripts/lerobot_auto_calibrate_feetech.py new file mode 100644 index 0000000000..f2c28862ba --- /dev/null +++ b/src/lerobot/scripts/lerobot_auto_calibrate_feetech.py @@ -0,0 +1,755 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Feetech STS servo auto-calibration (with unfolding). + +Full workflow (single command): + Stage 0 Initialize: stop all servos, Lock=1, configure PID/acceleration, enable torque + Stage 2 Unfold joints 2-4 (can be skipped with --unfold-angle 0) + Stage 3 Calibrate servos 2-6 (5->6->4->3->2) + Stage 4 Finally calibrate servo 1 shoulder_pan and return to center + Stage 5 Wait for user confirmation then disable torque + +Usage examples: + + lerobot-auto-calibrate-feetech --port COM3 + lerobot-auto-calibrate-feetech --port COM3 --save + lerobot-auto-calibrate-feetech --port COM3 --unfold-angle 0 + lerobot-auto-calibrate-feetech --port COM3 --save --robot-id default + lerobot-auto-calibrate-feetech --port COM3 --unfold-only # Only debug arm unfolding (Stage 0 + Stage 2) +""" + +import argparse +import contextlib +import sys +import time +from collections.abc import Callable + +import draccus + +from lerobot.motors import MotorCalibration +from lerobot.motors.feetech import FeetechMotorsBus +from lerobot.motors.feetech.auto_calibration import COMM_ERR +from lerobot.motors.feetech.calibration_defaults import ( + CALIBRATE_FIRST, + CALIBRATE_REST, + DEFAULT_ACCELERATION, + DEFAULT_D_COEFFICIENT, + DEFAULT_I_COEFFICIENT, + DEFAULT_MAX_TORQUE, + DEFAULT_P_COEFFICIENT, + DEFAULT_POS_SPEED, + DEFAULT_TIMEOUT, + DEFAULT_TORQUE_LIMIT, + DEFAULT_UNFOLD_ANGLE, + DEFAULT_UNFOLD_TIMEOUT, + DEFAULT_VELOCITY_LIMIT, + FULL_TURN, + HOMING_OFFSET_MAX_MAG, + MOTOR_NAMES, + SO_FOLLOWER_MOTORS, + STS_HALF_TURN_RAW, + UNFOLD_ORDER, + motor_label, +) +from lerobot.utils.constants import HF_LEROBOT_CALIBRATION + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Feetech servo auto-calibration (with unfolding): complete full workflow in one command." + ) + parser.add_argument( + "--port", + type=str, + required=True, + help="Serial port path, e.g. COM3 or /dev/ttyUSB0", + ) + parser.add_argument( + "--motor", + type=str, + choices=MOTOR_NAMES, + default=None, + help="Only test this motor (skip unfolding); if not specified, test all 6 motors sequentially", + ) + + cal = parser.add_argument_group("Calibration parameters") + cal.add_argument( + "--velocity-limit", + type=int, + default=DEFAULT_VELOCITY_LIMIT, + help=f"Calibration limit-detection velocity (constant-speed mode Goal_Velocity), default {DEFAULT_VELOCITY_LIMIT}", + ) + cal.add_argument( + "--timeout", + type=float, + default=DEFAULT_TIMEOUT, + help=f"Calibration single-direction limit detection timeout (seconds), default {DEFAULT_TIMEOUT}", + ) + unfold = parser.add_argument_group("Unfold parameters") + unfold.add_argument( + "--unfold-only", + action="store_true", + help="Only perform arm unfolding (Stage 0 init + Stage 2 unfold), no calibration, for debugging unfold logic", + ) + unfold.add_argument( + "--unfold-angle", + type=float, + default=DEFAULT_UNFOLD_ANGLE, + help=f"Unfold angle (degrees), set to 0 to skip unfolding. Default {DEFAULT_UNFOLD_ANGLE}", + ) + unfold.add_argument( + "--unfold-timeout", + type=float, + default=DEFAULT_UNFOLD_TIMEOUT, + help=f"Unfold single-motion wait timeout (seconds), default {DEFAULT_UNFOLD_TIMEOUT}", + ) + out = parser.add_argument_group("Output (same path and format as manual calibration)") + out.add_argument( + "--save", + action="store_true", + help="Write calibration data to servo EEPROM and save to the same local path as manual calibration (draccus format)", + ) + out.add_argument( + "--robot-id", + type=str, + default="default", + help="Robot id for saving, corresponds to path .../calibration/robots//.json, must match config.id when starting the arm", + ) + out.add_argument( + "--robot-type", + type=str, + default="so_follower", + choices=["so_follower", "so_leader"], + help="Robot type for calibration file path: 'so_follower' (default) or 'so_leader'", + ) + + return parser.parse_args() + + +# ====================== Unfolding ====================== + + +def _unfold_joints( + bus: FeetechMotorsBus, + unfold_angle: float, + unfold_timeout: float, + unfold_directions: dict[str, str | None] | None = None, +) -> None: + """Unfold joints 2-4 to avoid mechanical interference during calibration. If unfold_directions is provided, record each joint's unfold direction.""" + print(f"\n{'=' * 20} Stage 2: Unfold joints 2-4 ({unfold_angle} deg) {'=' * 20}") + for motor in UNFOLD_ORDER: + direction, _ = bus.unfold_single_joint(motor, unfold_angle, unfold_timeout) + if unfold_directions is not None and direction is not None: + unfold_directions[motor] = direction + print("\n Unfolding complete, joints 2-4 are raised. Unfold direction for each joint:") + if unfold_directions is not None: + for motor in UNFOLD_ORDER: + direction = unfold_directions.get(motor, "unknown") + print(f" {motor_label(motor)}: unfold direction = {direction}") + + +def _fold_arm( + bus: FeetechMotorsBus, + all_mins: dict[str, int], + all_maxes: dict[str, int], + all_unfold_directions: dict[str, str | None], + *, + motors: list[str] | None = None, + unfold: bool = False, + unfold_per_motor: dict[str, bool] | None = None, +) -> None: + """Fold or fully unfold specified joints. Multiple servos move simultaneously. + + Fold (unfold=False): forward unfold -> fold target range_max; reverse -> range_min; gripper fixed at range_min. + Unfold (unfold=True): target is opposite of fold, forward -> range_min, reverse -> range_max; gripper fixed at range_max. + motors: list of servos to move; if None or empty, use default order (shoulder_lift->elbow_flex->wrist_flex->gripper). + unfold_per_motor: optional, specify fold(False)/unfold(True) per joint; unlisted joints use unfold. If None, all use unfold. + """ + default_order = ["shoulder_lift", "elbow_flex", "wrist_flex", "gripper"] + fold_order = motors if motors else default_order + title = "Fold/unfold arm" if unfold_per_motor else ("Unfold" if unfold else "Fold") + " arm" + print(f"\n{'=' * 20} {title} (simultaneous) {'=' * 20}") + values: dict[str, tuple[int, int, int]] = {} + + for motor in fold_order: + if motor not in all_mins or motor not in all_maxes: + continue + per_unfold = unfold_per_motor.get(motor, unfold) if unfold_per_motor is not None else unfold + direction = all_unfold_directions.get(motor) + # Compute fold end and unfold end, then select based on per_unfold + if motor == "gripper": + fold_end = all_mins[motor] + unfold_end = all_maxes[motor] + else: + fold_end = all_maxes[motor] if direction == "reverse" else all_mins[motor] + unfold_end = all_mins[motor] if direction == "reverse" else all_maxes[motor] + target = unfold_end if per_unfold else fold_end + label = "range_max" if target == all_maxes[motor] else "range_min" + if motor == "gripper": + label += "(gripper)" if per_unfold else "(gripper forward)" + action_m = "unfold" if per_unfold else "fold" + values[motor] = (target, DEFAULT_POS_SPEED, DEFAULT_ACCELERATION) + bus.write("Operating_Mode", motor, 0) # servo mode + try: + pos = bus.read("Present_Position", motor, normalize=False) + print(f" {motor_label(motor)} current pos={pos}, {action_m} to {label}={target}.") + except COMM_ERR: + print(f" {motor_label(motor)} failed to read current pos, {action_m} to {label}={target}.") + if not values: + action = "unfold" if unfold else "fold" + print(f" No valid motors, skipping {action}.\n") + return + bus.sync_write_pos_ex(values) + time.sleep(0.3) + # Poll until all motors stop + timeout_s = 10.0 + poll_s = 0.05 + t0 = time.monotonic() + while time.monotonic() - t0 < timeout_s: + try: + if all(bus.read("Moving", m, normalize=False) == 0 for m in values): + break + except COMM_ERR: + pass + time.sleep(poll_s) + done_label = "fold/unfold" if unfold_per_motor else ("unfold" if unfold else "fold") + for m in values: + try: + pos = bus.read("Present_Position", m, normalize=False) + print(f" {motor_label(m)} after {done_label}: end pos={pos}, reached target") + except COMM_ERR: + print(f" {motor_label(m)} after {done_label}: failed to read pos, reached target") + print(f" {done_label} complete.\n") + + +def _move_arm_by_angle( + bus: FeetechMotorsBus, + all_unfold_directions: dict[str, str | None], + angle_deg: float, + *, + fold: bool = False, + motors: list[str] | None = None, + all_mins: dict[str, int] | None = None, + all_maxes: dict[str, int] | None = None, +) -> None: + """Move specified degrees from current position in unfold or fold direction. Does not detect direction; relies on all_unfold_directions. + + Direction consistent with _fold_arm: forward unfold -> position increase is unfold, decrease is fold; reverse unfold -> position decrease is unfold, increase is fold. + fold: False=unfold direction, True=fold direction. + motors: list of servos to move, None or empty uses default order (shoulder_lift->elbow_flex->wrist_flex). + all_mins/all_maxes: optional, if provided, clamp target position to limits. + """ + default_order = ["shoulder_lift", "elbow_flex", "wrist_flex"] + move_order = motors if motors else default_order + angle_steps = int(angle_deg / 360.0 * FULL_TURN) + direction_label = "fold" if fold else "unfold" + print(f"\n{'=' * 20} Relative {direction_label} {angle_deg:.1f} deg from current pos {'=' * 20}") + for motor in move_order: + if ( + all_mins is not None + and all_maxes is not None + and (motor not in all_mins or motor not in all_maxes) + ): + continue + try: + present = bus.read("Present_Position", motor, normalize=False) + except COMM_ERR: + print(f" Warning: {motor_label(motor)} failed to read current position, skipping") + continue + direction = all_unfold_directions.get(motor) + # Consistent with _fold_arm: forward -> unfold increases position, fold decreases; reverse -> opposite + if fold: + target = present - angle_steps if direction == "forward" else present + angle_steps + else: + target = present + angle_steps if direction == "forward" else present - angle_steps + if all_mins is not None and all_maxes is not None and motor in all_mins and motor in all_maxes: + target = max(all_mins[motor], min(all_maxes[motor], target)) + print(f" {motor_label(motor)} {direction_label} {angle_deg:.1f} deg: pos {present} -> {target}") + ok = bus.write_pos_ex_and_wait( + motor, + target, + DEFAULT_POS_SPEED, + DEFAULT_ACCELERATION, + timeout_s=DEFAULT_UNFOLD_TIMEOUT, + poll_interval_s=0.05, + ) + if not ok: + print(f" Warning: {motor_label(motor)} motion timed out, keeping current position") + else: + print(f" {motor_label(motor)} reached target") + print(f" {direction_label} complete.\n") + + +# ====================== Calibration ====================== + + +def _record_reference_position( + bus: FeetechMotorsBus, + motor_name: str, + out: dict[str, int], +) -> None: + """Read the motor's current reference position (Present_Position + Homing_Offset) % FULL_TURN and store in out[motor_name]. On read failure, out is not modified.""" + try: + pr = bus.read("Present_Position", motor_name, normalize=False) + ho = bus.read("Homing_Offset", motor_name, normalize=False) + out[motor_name] = (pr + ho) % FULL_TURN + except COMM_ERR: + pass + + +def _calibrate_motors( + bus: FeetechMotorsBus, + motor_names: list[str], + *, + velocity_limit: int = DEFAULT_VELOCITY_LIMIT, + timeout_s: float = DEFAULT_TIMEOUT, + ccw_first: bool = False, + unfold_directions: dict[str, str | None] | None = None, + reference_positions: dict[str, int] | None = None, +) -> dict[str, tuple[int, int, int]]: + """Calibrate a group of motors (run measure_ranges_of_motion_multi then write back and return to center). Returns {motor_name: (range_min, range_max, mid_raw)}. + If unfold_directions is provided and motors 2 and 3 are calibrated together: both move simultaneously, motor 2 fully folds, motor 3 fully unfolds; otherwise return to center as normal. + If reference_positions is provided: use that reference position to select arc for the corresponding motor, skipping limit retreat and re-read.""" + if not motor_names: + return {} + raw_results = bus.measure_ranges_of_motion_multi( + motor_names, + velocity_limit=velocity_limit, + timeout_s=timeout_s, + ccw_first=ccw_first, + reference_positions=reference_positions, + ) + print("Preparing to write registers") + result: dict[str, tuple[int, int, int]] = {} + for m in motor_names: + rmin, rmax, mid_raw, _raw_min_meas, _raw_max_meas, homing_offset = raw_results[m] + print( + f" {motor_label(m)}: after offset range_min={rmin}, range_max={rmax}, mid={mid_raw}, Homing_Offset register={homing_offset}" + ) + time.sleep(0.05) + try: + ho_before = bus.read("Homing_Offset", m, normalize=False) + min_before = bus.read("Min_Position_Limit", m, normalize=False) + max_before = bus.read("Max_Position_Limit", m, normalize=False) + print( + f" {motor_label(m)} before write: Min_Position_Limit={min_before}, Max_Position_Limit={max_before}, Homing_Offset={ho_before}" + ) + except COMM_ERR: + print(f" {motor_label(m)} before write: failed to read registers") + bus.safe_write("Homing_Offset", m, homing_offset, normalize=False) + bus.safe_write_position_limits(m, rmin, rmax) + time.sleep(0.1) + try: + ho_after = bus.read("Homing_Offset", m, normalize=False) + min_after = bus.read("Min_Position_Limit", m, normalize=False) + max_after = bus.read("Max_Position_Limit", m, normalize=False) + print( + f" {motor_label(m)} after write: Min_Position_Limit={min_after}, Max_Position_Limit={max_after}, Homing_Offset={ho_after}" + ) + except COMM_ERR: + print(f" {motor_label(m)} after write: failed to read registers") + time.sleep(0.1) + do_2_3_together = ( + unfold_directions is not None and "shoulder_lift" in motor_names and "elbow_flex" in motor_names + ) + if m == "wrist_roll": + pass + elif do_2_3_together and m in ("shoulder_lift", "elbow_flex"): + # Motors 2 and 3 keep torque locked + pass + else: + bus.go_to_mid(m) + result[m] = (rmin, rmax, mid_raw) + + return result + + +# ====================== Connection and initialization (shared) ====================== + + +def _connect_and_clear(port: str) -> FeetechMotorsBus: + """Create bus, clear residual Overload, then formally connect. Raises exception on failure.""" + bus = FeetechMotorsBus(port=port, motors=SO_FOLLOWER_MOTORS.copy()) + bus.connect(handshake=False) + print("Clearing residual servo state...") + all_zero = dict.fromkeys(MOTOR_NAMES, 0) + for _ in range(3): + with contextlib.suppress(COMM_ERR): + bus.sync_write("Goal_Velocity", all_zero) + with contextlib.suppress(COMM_ERR): + bus.sync_write("Torque_Enable", all_zero) + time.sleep(0.2) + bus.disconnect(disable_torque=False) + time.sleep(0.2) + bus.connect() + print("All servos ready.") + return bus + + +def _run_with_bus( + port: str, + interactive: bool, + body: Callable[[FeetechMotorsBus], None], +) -> int: + """Connect bus then execute body(bus), with unified handling of connection failure, KeyboardInterrupt, Exception and disconnect. Returns 0 success, 1 error, 130 user interrupt.""" + try: + bus = _connect_and_clear(port) + except Exception as e: + print(f"Connection failed: {e}", file=sys.stderr) + return 1 + try: + body(bus) + except KeyboardInterrupt: + print("\nUser interrupted, disabling all servos...") + bus.safe_disable_all() + return 130 + except Exception as e: + print(f"Exception: {e}", file=sys.stderr) + bus.safe_disable_all() + if interactive: + with contextlib.suppress(EOFError): + input("Press Enter to exit...") + return 1 + finally: + bus.disconnect() + return 0 + + +# Stage 0 initialization: write->read->compare per register, table-driven config; special items (limits, Torque_Enable) handled separately +INIT_CHECKS = [ + ("Lock", 1), + ("Return_Delay_Time", 0), + ("Operating_Mode", 0), + ("Max_Torque_Limit", DEFAULT_MAX_TORQUE), + ("Torque_Limit", DEFAULT_TORQUE_LIMIT), + ("Acceleration", DEFAULT_ACCELERATION), + ("P_Coefficient", DEFAULT_P_COEFFICIENT), + ("I_Coefficient", DEFAULT_I_COEFFICIENT), + ("D_Coefficient", DEFAULT_D_COEFFICIENT), + ("Homing_Offset", 0), +] + + +def _run_init(bus: FeetechMotorsBus, *, interactive: bool = True) -> None: + """Stage 0: Lock=1, PID, limits, Homing_Offset, enable torque. If parameter error and interactive, wait for Enter.""" + print(f"\n{'=' * 20} Stage 0: Initialize {'=' * 20}") + for m in MOTOR_NAMES: + print(f"Configuring servo: {motor_label(m)}") + try: + bus.write("Torque_Enable", m, 0) + time.sleep(0.05) + except COMM_ERR: + pass + param_set_ok = True + try: + for reg, expected in INIT_CHECKS: + bus.write(reg, m, expected, normalize=(reg != "Homing_Offset")) + time.sleep(0.01) + got = bus.read(reg, m, normalize=False) + if got != expected: + print(f" [Warning] {reg} set failed on {m}: expected={expected}, got={got}") + param_set_ok = False + # Position limits: write/read/compare separately + bus.write_position_limits(m, 0, 4095) + time.sleep(0.05) + limits = bus.read_position_limits(m) + if limits != (0, 4095): + print(f" [Warning] Position_Limits set failed on {m}: expected=(0, 4095), got={limits}") + param_set_ok = False + time.sleep(0.2) + # Finally enable torque + bus.write("Torque_Enable", m, 1) + time.sleep(0.05) + te_read = bus.read("Torque_Enable", m, normalize=False) + if te_read != 1: + print(f" [Warning] Torque_Enable failed on {m}: expected=1, got={te_read}") + param_set_ok = False + time.sleep(0.1) + except Exception as e: + print(f" [Exception] Error setting parameters on {m}: {e}") + param_set_ok = False + if not param_set_ok and interactive: + with contextlib.suppress(Exception): + input( + " [Warning] Parameter set/verify failed, check wiring and power, press Enter to force continue..." + ) + print( + f"Initialized and torque enabled (P={DEFAULT_P_COEFFICIENT}, " + f"Acc={DEFAULT_ACCELERATION}, Torque={DEFAULT_TORQUE_LIMIT})." + ) + + +# ====================== Public entry points (full calibration / unfold only / single motor) ====================== + + +def _apply_calibration_results( + results: dict[str, tuple[int, int, int]], + all_mins: dict[str, int], + all_maxes: dict[str, int], + all_mids: dict[str, int], + motor_list: list[str], +) -> None: + """Write _calibrate_motors results into all_mins / all_maxes / all_mids.""" + for m in motor_list: + all_mins[m], all_maxes[m], all_mids[m] = results[m] + + +def run_full_calibration( + port: str, + *, + save: bool = False, + robot_id: str = "default", + robot_type: str = "so_follower", + velocity_limit: int = DEFAULT_VELOCITY_LIMIT, + timeout_s: float = DEFAULT_TIMEOUT, + unfold_timeout_s: float = DEFAULT_UNFOLD_TIMEOUT, + interactive: bool = True, +) -> int: + """Full calibration workflow: initialize -> servos 2-6 (with arm raise for clearance) -> servo 1 shoulder_pan calibrated last -> fold. + If save is True: write to servo EEPROM and save to the same path and format as manual calibration (draccus, loaded at arm startup). + + Called by CLI or teleoperation programs. Returns 0 success, 1 error, 130 user interrupt. + """ + + def body(bus: FeetechMotorsBus) -> None: + all_mins: dict[str, int] = {} + all_maxes: dict[str, int] = {} + all_mids: dict[str, int] = {} + all_unfold_directions: dict[str, str | None] = {} + all_reference_positions: dict[str, int] = {} + _run_init(bus, interactive=interactive) + # Raise motor 4 by 80 degrees + direction, _ = bus.unfold_single_joint("wrist_flex", 80, unfold_timeout_s) + if direction is not None: + all_unfold_directions["wrist_flex"] = direction + time.sleep(0.1) + # Raise motors 2 and 3 and record reference positions (Present_Position + Homing_Offset, used for arc selection during calibration) + direction, _ = bus.unfold_single_joint("shoulder_lift", 15, unfold_timeout_s) + if direction is not None: + all_unfold_directions["shoulder_lift"] = direction + _record_reference_position(bus, "shoulder_lift", all_reference_positions) + direction, _ = bus.unfold_single_joint("elbow_flex", 30, unfold_timeout_s) + if direction is not None: + all_unfold_directions["elbow_flex"] = direction + _record_reference_position(bus, "elbow_flex", all_reference_positions) + time.sleep(0.1) + # Fold: retract shoulder_lift and elbow_flex + for m in ["shoulder_lift", "elbow_flex"]: + bus.go_to_mid(m) + time.sleep(0.1) + # Calibrate motors 2 and 3 using multi-motor calibration, initial rotation direction is opposite to their raise direction + # forward raise -> CCW first; reverse raise -> CW first; default CCW first if not recorded + ccw_first_2_3 = { + "shoulder_lift": all_unfold_directions.get("shoulder_lift") != "reverse", + "elbow_flex": all_unfold_directions.get("elbow_flex") != "reverse", + } + print(f"\n{'=' * 20} Calibrate motors 2 and 3 (multi-motor, opposite to raise direction) {'=' * 20}") + results_2_3 = _calibrate_motors( + bus, + ["shoulder_lift", "elbow_flex"], + velocity_limit=velocity_limit, + timeout_s=timeout_s, + ccw_first=ccw_first_2_3, + unfold_directions=all_unfold_directions, + reference_positions=all_reference_positions, + ) + _apply_calibration_results( + results_2_3, all_mins, all_maxes, all_mids, ["shoulder_lift", "elbow_flex"] + ) + _fold_arm(bus, all_mins, all_maxes, all_unfold_directions, motors=["shoulder_lift", "elbow_flex"]) + + time.sleep(0.1) + # Stage 3: Calibrate remaining motors 4, 5, 6 (multi-motor simultaneous, with arm raise for clearance) + print(f"\n{'=' * 20} Stage 3: Calibrate motors 4-6 (multi-motor simultaneous) {'=' * 20}") + _move_arm_by_angle( + bus, + all_unfold_directions, + 80, + fold=False, + motors=["elbow_flex"], + all_mins=all_mins, + all_maxes=all_maxes, + ) + calibrate_rest_remaining = ["wrist_roll", "gripper", "wrist_flex"] + results_rest = _calibrate_motors( + bus, + calibrate_rest_remaining, + velocity_limit=velocity_limit, + timeout_s=timeout_s, + reference_positions=all_reference_positions, + ) + _apply_calibration_results(results_rest, all_mins, all_maxes, all_mids, calibrate_rest_remaining) + time.sleep(0.1) + # Fold motor 3, fully unfold motor 4 (executed simultaneously in one call) + _fold_arm( + bus, + all_mins, + all_maxes, + all_unfold_directions, + motors=["elbow_flex", "wrist_flex", "gripper"], + unfold_per_motor={"elbow_flex": False, "wrist_flex": True, "gripper": False}, + ) + # Stage 4: Finally calibrate motor 1 shoulder_pan + print( + f"\n{'=' * 20} Stage 4: Calibrate {motor_label('shoulder_pan')} (motor 1) and return to center {'=' * 20}" + ) + results_pan = _calibrate_motors( + bus, ["shoulder_pan"], velocity_limit=velocity_limit, timeout_s=timeout_s + ) + _apply_calibration_results(results_pan, all_mins, all_maxes, all_mids, ["shoulder_pan"]) + time.sleep(0.1) + motors_calibrated = CALIBRATE_REST + CALIBRATE_FIRST + print(f"\n{'=' * 20} Calibration results {'=' * 20}") + for name in motors_calibrated: + offset = all_mids[name] - STS_HALF_TURN_RAW + print( + f" {motor_label(name)}: min={all_mins[name]}, max={all_maxes[name]}, " + f"mid={all_mids[name]}, offset={offset}" + ) + + _fold_arm(bus, all_mins, all_maxes, all_unfold_directions) + # Before persisting: unlock EEPROM (Lock=0) and restore all servos to servo mode (Operating_Mode=0) + for name in bus.motors: + bus.write("Lock", name, 0) + time.sleep(0.01) + bus.write("Operating_Mode", name, 0) + time.sleep(0.01) + time.sleep(1) + if interactive: + bus.safe_disable_all() + print("\nCalibration complete.") + + if save: + print(f"\n{'=' * 20} Persisting (same method as manual calibration) {'=' * 20}") + bus.safe_disable_all() + cal = {} + for name in motors_calibrated: + m = SO_FOLLOWER_MOTORS[name] + offset = all_mids[name] - STS_HALF_TURN_RAW + offset = max(-HOMING_OFFSET_MAX_MAG, min(HOMING_OFFSET_MAX_MAG, offset)) + cal[name] = MotorCalibration( + id=m.id, + drive_mode=0, + homing_offset=offset, + range_min=all_mins[name], + range_max=all_maxes[name], + ) + bus.write_calibration(cal, cache=True) + print("Calibration written to servo EEPROM.") + # Same path and format as manual calibration, loaded at arm startup + calibration_fpath = HF_LEROBOT_CALIBRATION / "robots" / robot_type / f"{robot_id}.json" + calibration_fpath.parent.mkdir(parents=True, exist_ok=True) + with open(calibration_fpath, "w") as f, draccus.config_type("json"): + draccus.dump(cal, f, indent=4) + print(f"Calibration saved to: {calibration_fpath}") + print("Disabling all servos...") + bus.safe_disable_all() + + return _run_with_bus(port, interactive, body) + + +def unfold_joints( + port: str, + angle_deg: float, + *, + timeout_s: float = DEFAULT_UNFOLD_TIMEOUT, + interactive: bool = True, +) -> int: + """Only perform Stage 0 init + unfold joints 2-4 to specified angle. For debugging unfold. Returns 0/1/130.""" + + def body(bus: FeetechMotorsBus) -> None: + _run_init(bus, interactive=interactive) + all_unfold_directions: dict[str, str | None] = {} + if angle_deg > 0: + _unfold_joints(bus, angle_deg, timeout_s, all_unfold_directions) + print(" Unfolding complete, unfold directions:") + for motor, direction in all_unfold_directions.items(): + print(f" {motor_label(motor)}: {direction}") + if interactive: + input(" Press Enter to disable torque and exit...") + else: + print(" Unfold angle is 0, unfolding skipped.") + if interactive: + input(" Press Enter to disable torque and exit...") + bus.safe_disable_all() + + return _run_with_bus(port, interactive, body) + + +def calibrate_single_motor( + port: str, + motor_name: str, + *, + velocity_limit: int = DEFAULT_VELOCITY_LIMIT, + timeout_s: float = DEFAULT_TIMEOUT, + interactive: bool = True, +) -> int: + """Only perform Stage 0 + calibrate specified motor, no folding, no saving. For testing. Returns 0/1/130.""" + + def body(bus: FeetechMotorsBus) -> None: + _run_init(bus, interactive=interactive) + print(f"\n{'=' * 20} Calibrate {motor_label(motor_name)} {'=' * 20}") + _calibrate_motors(bus, [motor_name], velocity_limit=velocity_limit, timeout_s=timeout_s) + time.sleep(0.1) + if interactive: + input(" Calibration complete, press Enter to disable torque and exit...") + bus.safe_disable_all() + + return _run_with_bus(port, interactive, body) + + +# ====================== CLI entry point ====================== + + +def main() -> int: + """CLI: dispatch to full calibration, unfold only, or single motor calibration based on arguments.""" + args = parse_args() + print(f"Serial port: {args.port}") + if getattr(args, "unfold_only", False): + print("Unfold only (--unfold-only): Stage 0 init + Stage 2 unfold, no calibration") + print(f"Unfold angle: {args.unfold_angle} deg") + return unfold_joints( + args.port, + args.unfold_angle, + timeout_s=args.unfold_timeout, + interactive=True, + ) + if args.motor is not None: + print(f"Single motor mode: {args.motor}") + return calibrate_single_motor( + args.port, + args.motor, + velocity_limit=args.velocity_limit, + timeout_s=args.timeout, + interactive=True, + ) + print(f"Full calibration: {CALIBRATE_FIRST + CALIBRATE_REST}") + return run_full_calibration( + args.port, + save=args.save, + robot_id=args.robot_id, + robot_type=args.robot_type, + velocity_limit=args.velocity_limit, + timeout_s=args.timeout, + unfold_timeout_s=args.unfold_timeout, + interactive=True, + ) + + +if __name__ == "__main__": + sys.exit(main())