diff --git a/docs/assets/SNMOT-118-single-track-xyxy-vs-xcycsr.mp4 b/docs/assets/SNMOT-118-single-track-xyxy-vs-xcycsr.mp4 new file mode 100644 index 00000000..143de170 Binary files /dev/null and b/docs/assets/SNMOT-118-single-track-xyxy-vs-xcycsr.mp4 differ diff --git a/docs/index.md b/docs/index.md index edf7cdec..f82f128a 100644 --- a/docs/index.md +++ b/docs/index.md @@ -171,4 +171,14 @@ Try trackers in your browser with our [Hugging Face Playground](https://huggingf [:simple-googlecolab: Run Google Colab](https://colab.research.google.com/github/roboflow-ai/notebooks/blob/main/notebooks/how-to-track-objects-with-bytetrack-tracker.ipynb) +- **How to Track Objects with OC-SORT** + + --- + + [![](url-to-image)](https://colab.research.google.com/github/roboflow-ai/notebooks/blob/main/notebooks/how-to-track-objects-with-ocsort-tracker.ipynb) + + End-to-end example showing how to run RF-DETR detection with the OC-SORT tracker. + + [:simple-googlecolab: Run Google Colab](https://colab.research.google.com/github/roboflow-ai/notebooks/blob/main/notebooks/how-to-track-objects-with-ocsort-tracker.ipynb) + diff --git a/docs/javascripts/mathjax.js b/docs/javascripts/mathjax.js new file mode 100644 index 00000000..be1e8a19 --- /dev/null +++ b/docs/javascripts/mathjax.js @@ -0,0 +1,12 @@ +window.MathJax = { + tex: { + inlineMath: [["\\(", "\\)"], ["$", "$"]], + displayMath: [["\\[", "\\]"], ["$$", "$$"]], + processEscapes: true, + processEnvironments: true, + }, + options: { + ignoreHtmlClass: "^((?!arithmatex).)*$", + processHtmlClass: "arithmatex", + }, +}; diff --git a/docs/learn/state-estimators.md b/docs/learn/state-estimators.md new file mode 100644 index 00000000..92d120aa --- /dev/null +++ b/docs/learn/state-estimators.md @@ -0,0 +1,255 @@ +# State Estimators + +Every tracker in `trackers` uses a Kalman filter to predict where objects will appear in the next frame. The **state estimator** controls how bounding boxes are represented inside that filter. Different representations make different assumptions about object motion, and picking the right one can improve tracking quality without changing anything else. + +**What you'll learn:** + +- What state estimators are and why they matter +- How `XYXYStateEstimator` and `XCYCSRStateEstimator` represent bounding boxes +- When to use each representation +- How to swap the state estimator in any tracker + +--- + +## Install + +Get started by installing the package. + +```text +pip install trackers +``` + +For more options, see the [install guide](install.md). + +--- + +## What Is a State Estimator? + +A state estimator wraps a Kalman filter and defines how bounding boxes are encoded into the filter's state vector. The Kalman filter then predicts the next position of each tracked object and corrects that prediction when a new detection arrives. + +Two representations are available: + +| Estimator | State Dimensions | Representation | Aspect Ratio | +| :--------------------: | :--------------: | :---------------------------------------------------- | :-----------: | +| `XYXYStateEstimator` | 8 | Top-left and bottom-right corners + their velocities | Can change | +| `XCYCSRStateEstimator` | 7 | Center point, area, their velocities and aspect ratio | Held constant | + +They accept `[x1, y1, x2, y2]` bounding boxes on input and produce `[x1, y1, x2, y2]` bounding boxes on output. The difference is entirely in how the filter models motion internally. + +--- + +## XYXY — Corner-Based + +`XYXYStateEstimator` tracks the four corner coordinates independently. Each corner gets its own velocity term, giving the filter 8 state variables: + +``` +State: [x1, y1, x2, y2, vx1, vy1, vx2, vy2] +Measure: [x1, y1, x2, y2] +``` + +The transition matrix $F$ defines how the state evolves from one frame to the next. + +State order: $[x_1, y_1, x_2, y_2, v_{x_1}, v_{y_1}, v_{x_2}, v_{y_2}]$ + +$$ +F = +\begin{bmatrix} +1 & 0 & 0 & 0 & 1 & 0 & 0 & 0 \\ +0 & 1 & 0 & 0 & 0 & 1 & 0 & 0 \\ +0 & 0 & 1 & 0 & 0 & 0 & 1 & 0 \\ +0 & 0 & 0 & 1 & 0 & 0 & 0 & 1 \\ +0 & 0 & 0 & 0 & 1 & 0 & 0 & 0 \\ +0 & 0 & 0 & 0 & 0 & 1 & 0 & 0 \\ +0 & 0 & 0 & 0 & 0 & 0 & 1 & 0 \\ +0 & 0 & 0 & 0 & 0 & 0 & 0 & 1 +\end{bmatrix} +$$ + +Equivalent update equations: + +```text +x1' = x1 + vx1 +y1' = y1 + vy1 +x2' = x2 + vx2 +y2' = y2 + vy2 +vx1' = vx1 +vy1' = vy1 +vx2' = vx2 +vy2' = vy2 +``` + +| Row | Meaning | +| :-- | :------------------------------------------------------- | +| 1-4 | Each corner coordinate is updated by adding its velocity | +| 5-8 | Velocities persist unchanged from frame to frame | + +Because each corner moves freely, the box width and height can change between frames. This makes XYXY a natural fit when objects change shape — due to camera perspective, non-rigid motion, or inconsistent detections. + +**In Trackers, this is the default** for `ByteTrackTracker` and `SORTTracker`. + +--- + +## XCYCSR — Center-Based + +`XCYCSRStateEstimator` tracks the box center, area (scale), and aspect ratio. Only the center and scale get velocity terms; aspect ratio is treated as constant. This gives 7 state variables: + +``` +State: [x_center, y_center, scale, aspect_ratio, vx, vy, vs] +Measure: [x_center, y_center, scale, aspect_ratio] +``` + +The transition matrix $F$ shows the key difference: the aspect ratio is propagated without a velocity term. + +State order: $[x_c, y_c, s, r, v_x, v_y, v_s]$ + +$$ +F = +\begin{bmatrix} +1 & 0 & 0 & 0 & 1 & 0 & 0 \\ +0 & 1 & 0 & 0 & 0 & 1 & 0 \\ +0 & 0 & 1 & 0 & 0 & 0 & 1 \\ +0 & 0 & 0 & 1 & 0 & 0 & 0 \\ +0 & 0 & 0 & 0 & 1 & 0 & 0 \\ +0 & 0 & 0 & 0 & 0 & 1 & 0 \\ +0 & 0 & 0 & 0 & 0 & 0 & 1 +\end{bmatrix} +$$ + +Equivalent update equations: + +```text +x_center' = x_center + vx +y_center' = y_center + vy +scale' = scale + vs +aspect_ratio' = aspect_ratio +vx' = vx +vy' = vy +vs' = vs +``` + +| Row | Meaning | +| :-- | :-------------------------------------------------------- | +| 1-3 | Center position and scale follow constant-velocity motion | +| 4 | Aspect ratio is copied forward unchanged | +| 5-7 | Velocities persist unchanged from frame to frame | + +The aspect ratio `r = w / h` is carried forward unchanged. This acts as a regularizer — the filter resists sudden shape changes. It works well for rigid objects whose proportions stay consistent, like pedestrians walking or cars on a highway. + +**This is the default** for `OCSORTTracker`, matching the original OC-SORT paper. + +--- + +## When to Use Each + +| Scenario | Recommended | Why | +| :------------------------------------------- | :--------------------: | :--------------------------------------------------------- | +| Pedestrians, vehicles, rigid objects | `XCYCSRStateEstimator` | Constant aspect ratio stabilizes predictions | +| Non-rigid or deformable objects | `XYXYStateEstimator` | Corners move independently to track shape changes | +| Noisy detections with fluctuating box sizes | `XCYCSRStateEstimator` | Aspect ratio constraint absorbs size noise | +| Strong perspective changes (camera pan/zoom) | `XYXYStateEstimator` | Box proportions shift with viewpoint; corners adapt freely | +| Default choice when unsure | `XYXYStateEstimator` | More general, fewer assumptions | + +We can also benchmark the trackers using the different State Estimators and we get: + +- In **Dancetrack**, with defaults parameters all trackers perform better with XYXYStateEstimator, but with tuned parameters, SORT tracker with XCYCSRStateEstimator gets +0.8% HOTA. +- In **Soccernet dataset**, with defaults parameters SORT tracker with XYXYStateEstimator has ~5% more HOTA than using XCYC, when tuning parameters with grid search this difference is reduced to 2%. For the other trackers we dont find significant advantages of using a different StateEstimators, just having up to 0.2% better HOTA. +- In **SportsMOT**, for OC-SORT and ByteTrack, the StateEstimator doesn't affect the performance, while for SORT XYXYStateEstimator gives a small advantage of ~2% HOTA with default parameters and 0.4% when tuning both. +- In **MOT17**, with default parameters XYXYStateEstimator performs slightly better than XCYCSRStateEstimator with SORT and ByteTrack with up to 0.7% better results, but for OC-SORT XCYCSRStateEstimator gives 0.2% better HOTA. When tuning parameters, XCYCSRStateEstimator performs the best with all the trackers by a small margin, ranging in 0.2-0.4% HOTA. + +But lets visualize where these differences are, here is an example where using XCYCSR State Estimator associates an occluded track correctly, while using XYXY changes the ID: + +
+ +
+ +--- + +## Swapping the Estimator + +All trackers accept a `state_estimator_class` parameter. Import the class you want and pass it to the constructor. + +=== "ByteTrack with XCYCSR" + + ```python + from trackers import ByteTrackTracker + from trackers.utils.state_representations import XCYCSRStateEstimator + + tracker = ByteTrackTracker( + state_estimator_class=XCYCSRStateEstimator, + ) + ``` + +=== "OC-SORT with XYXY" + + ```python + from trackers import OCSORTTracker + from trackers.utils.state_representations import XYXYStateEstimator + + tracker = OCSORTTracker( + state_estimator_class=XYXYStateEstimator, + ) + ``` + +=== "SORT with XCYCSR" + + ```python + from trackers import SORTTracker + from trackers.utils.state_representations import XCYCSRStateEstimator + + tracker = SORTTracker( + state_estimator_class=XCYCSRStateEstimator, + ) + ``` + +Everything else stays the same — detection, association, and visualization work identically regardless of which estimator you choose. + +--- + +## Full Example + +Run ByteTrack with both estimators on the same video and compare the results side by side. + +```python +import cv2 + +import supervision as sv +from inference import get_model +from trackers import ByteTrackTracker +from trackers.utils.state_representations import ( + XCYCSRStateEstimator, + XYXYStateEstimator, +) + +model = get_model("rfdetr-nano") + +tracker_xyxy = ByteTrackTracker( + state_estimator_class=XYXYStateEstimator, +) +tracker_xcycsr = ByteTrackTracker( + state_estimator_class=XCYCSRStateEstimator, +) + +cap = cv2.VideoCapture("source.mp4") +while True: + ret, frame = cap.read() + if not ret: + break + + result = model.infer(frame)[0] + detections = sv.Detections.from_inference(result) + + tracked_xyxy = tracker_xyxy.update(detections) + tracked_xcycsr = tracker_xcycsr.update(detections) + + # Compare tracker_id assignments, box smoothness, etc. + print(f"XYXY IDs: {tracked_xyxy.tracker_id}") + print(f"XCYCSR IDs: {tracked_xcycsr.tracker_id}") +``` + +--- + +## Takeaway + +The state estimator is a single-line change that controls how the Kalman filter models bounding box motion. Use `XCYCSRStateEstimator` when objects keep a consistent shape, and `XYXYStateEstimator` when shape varies or you want fewer assumptions. Try it on your case, the best choice depends on the scene. diff --git a/docs/trackers/comparison.md b/docs/trackers/comparison.md index f877f68b..b0057efe 100644 --- a/docs/trackers/comparison.md +++ b/docs/trackers/comparison.md @@ -81,7 +81,7 @@ Sports broadcast tracking with fast motion, camera pans, and similar-looking tar | Tracker | HOTA | IDF1 | MOTA | | :-------: | :------: | :------: | :------: | - | SORT | 70.9 | 68.9 | 95.7 | + | SORT | 70.8 | 68.9 | 95.5 | | ByteTrack | **73.0** | **72.5** | **96.4** | | OC-SORT | 71.7 | 71.4 | 95.0 | @@ -152,7 +152,7 @@ Long sequences with dense interactions and partial occlusions. Tests long-term I | Tracker | HOTA | IDF1 | MOTA | | :-------: | :------: | :------: | :------: | | SORT | **84.2** | **78.2** | **98.2** | - | ByteTrack | 84.0 | 78.1 | 97.8 | + | ByteTrack | 84.0 | 78.1 | **98.2** | | OC-SORT | 82.9 | 77.9 | 96.8 | Tuned configuration for each tracker. @@ -166,9 +166,9 @@ Long sequences with dense interactions and partial occlusions. Tests long-term I ByteTrack: lost_track_buffer: 30 - track_activation_threshold: 0.5 - minimum_consecutive_frames: 2 - minimum_iou_threshold: 0.1 + track_activation_threshold: 0.2 + minimum_consecutive_frames: 1 + minimum_iou_threshold: 0.05 high_conf_det_threshold: 0.5 OC-SORT: diff --git a/mkdocs.yml b/mkdocs.yml index dfa759b9..a3c97949 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -54,6 +54,8 @@ extra_css: extra_javascript: - javascripts/pycon_copy.js + - javascripts/mathjax.js + - https://unpkg.com/mathjax@3/es5/tex-mml-chtml.js - javascripts/cli_builder_framework.js - javascripts/command_builder.js @@ -75,6 +77,9 @@ markdown_extensions: pygments_lang_class: true # Enables inline code highlighting - pymdownx.inlinehilite + # Enables LaTeX-style math in Markdown + - pymdownx.arithmatex: + generic: true # Allows including content from other files - pymdownx.snippets # Enables nested code blocks and custom fences @@ -120,6 +125,7 @@ nav: - Download Datasets: learn/download.md - Evaluate Trackers: learn/evaluate.md - Detection Quality Matters: learn/detection-quality.md + - State Estimators: learn/state-estimators.md - Trackers: - Comparison: trackers/comparison.md - SORT: trackers/sort.md diff --git a/trackers/core/bytetrack/tracker.py b/trackers/core/bytetrack/tracker.py index 3792266c..b374a4e9 100644 --- a/trackers/core/bytetrack/tracker.py +++ b/trackers/core/bytetrack/tracker.py @@ -9,10 +9,12 @@ from scipy.optimize import linear_sum_assignment from trackers.core.base import BaseTracker -from trackers.core.bytetrack.kalman import ByteTrackKalmanBoxTracker -from trackers.core.sort.utils import ( - get_alive_trackers, - get_iou_matrix, +from trackers.core.bytetrack.tracklet import ByteTrackTracklet +from trackers.core.bytetrack.utils import _get_alive_tracklets +from trackers.core.sort.utils import _get_iou_matrix +from trackers.utils.state_representations import ( + BaseStateEstimator, + XYXYStateEstimator, ) @@ -53,6 +55,9 @@ class ByteTrackTracker(BaseTracker): detections to existing tracks. Higher values require more overlap. high_conf_det_threshold: `float` specifying threshold for separating high and low confidence detections in the two-stage association. + state_estimator_class: State estimator class to use for Kalman filter. + Defaults to `XYXYStateEstimator`. Can also use + `XCYCSRStateEstimator` for center-based representation. """ tracker_id = "bytetrack" @@ -65,6 +70,7 @@ def __init__( minimum_consecutive_frames: int = 2, minimum_iou_threshold: float = 0.1, high_conf_det_threshold: float = 0.6, + state_estimator_class: type[BaseStateEstimator] = XYXYStateEstimator, ) -> None: # Calculate maximum frames without update based on lost_track_buffer and # frame_rate. This scales the buffer based on the frame rate to ensure @@ -74,13 +80,14 @@ def __init__( self.minimum_iou_threshold = minimum_iou_threshold self.track_activation_threshold = track_activation_threshold self.high_conf_det_threshold = high_conf_det_threshold - self.tracks: list[ByteTrackKalmanBoxTracker] = [] + self.tracks: list[ByteTrackTracklet] = [] + self.state_estimator_class = state_estimator_class def update( self, detections: sv.Detections, ) -> sv.Detections: - """Update tracker state with new detections and return tracked objects. + """Update tracks state with new detections and return tracked objects. Performs Kalman filter prediction, two-stage association (high then low confidence), and initializes new tracks for unmatched detections. @@ -120,7 +127,7 @@ def update( low_boxes = detection_boxes[low_indices] # Step 1: associate high-confidence detections to all tracks - iou_matrix = get_iou_matrix(self.tracks, high_boxes) + iou_matrix = _get_iou_matrix(self.tracks, high_boxes) matched, unmatched_tracks, unmatched_high = self._get_associated_indices( iou_matrix, self.minimum_iou_threshold ) @@ -129,17 +136,18 @@ def update( track = self.tracks[row] track.update(high_boxes[col]) if ( - track.number_of_successful_updates >= self.minimum_consecutive_frames + track.number_of_successful_consecutive_updates + >= self.minimum_consecutive_frames and track.tracker_id == -1 ): - track.tracker_id = ByteTrackKalmanBoxTracker.get_next_tracker_id() + track.tracker_id = ByteTrackTracklet.get_next_tracker_id() out_det_indices.append(int(high_indices[col])) out_tracker_ids.append(track.tracker_id) remaining_tracks = [self.tracks[i] for i in unmatched_tracks] # Step 2: associate low-confidence detections to remaining tracks - iou_matrix = get_iou_matrix(remaining_tracks, low_boxes) + iou_matrix = _get_iou_matrix(remaining_tracks, low_boxes) matched, _, unmatched_low = self._get_associated_indices( iou_matrix, self.minimum_iou_threshold ) @@ -148,10 +156,11 @@ def update( track = remaining_tracks[row] track.update(low_boxes[col]) if ( - track.number_of_successful_updates >= self.minimum_consecutive_frames + track.number_of_successful_consecutive_updates + >= self.minimum_consecutive_frames and track.tracker_id == -1 ): - track.tracker_id = ByteTrackKalmanBoxTracker.get_next_tracker_id() + track.tracker_id = ByteTrackTracklet.get_next_tracker_id() out_det_indices.append(int(low_indices[col])) out_tracker_ids.append(track.tracker_id) @@ -161,7 +170,7 @@ def update( out_tracker_ids.append(-1) # Spawn new tracks from unmatched high-confidence detections - self._spawn_new_trackers( + self._spawn_new_tracks( detection_boxes, confidences, unmatched_high, @@ -170,10 +179,10 @@ def update( out_tracker_ids, ) - self.tracks = get_alive_trackers( - trackers=self.tracks, - maximum_frames_without_update=self.maximum_frames_without_update, + self.tracks = _get_alive_tracklets( # type: ignore[assignment] + tracklets=self.tracks, minimum_consecutive_frames=self.minimum_consecutive_frames, + maximum_frames_without_update=self.maximum_frames_without_update, ) # Build final sv.Detections from original by indexing @@ -223,7 +232,7 @@ def _get_associated_indices( return matched_indices, unmatched_tracks, unmatched_detections - def _spawn_new_trackers( + def _spawn_new_tracks( self, detection_boxes: np.ndarray, confidences: np.ndarray, @@ -237,7 +246,10 @@ def _spawn_new_trackers( conf = float(confidences[global_idx]) if conf >= self.track_activation_threshold: self.tracks.append( - ByteTrackKalmanBoxTracker(bbox=detection_boxes[global_idx]) + ByteTrackTracklet( + initial_bbox=detection_boxes[global_idx], + state_estimator_class=self.state_estimator_class, + ) ) out_det_indices.append(global_idx) out_tracker_ids.append(-1) @@ -247,4 +259,4 @@ def reset(self) -> None: Call this method when switching to a new video or scene. """ self.tracks = [] - ByteTrackKalmanBoxTracker.count_id = 0 + ByteTrackTracklet.count_id = 0 diff --git a/trackers/core/bytetrack/tracklet.py b/trackers/core/bytetrack/tracklet.py new file mode 100644 index 00000000..dd63fa96 --- /dev/null +++ b/trackers/core/bytetrack/tracklet.py @@ -0,0 +1,57 @@ +# ------------------------------------------------------------------------ +# Trackers +# Copyright (c) 2026 Roboflow. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 [see LICENSE for details] +# ------------------------------------------------------------------------ + +import numpy as np + +from trackers.utils.base_tracklet import BaseTracklet +from trackers.utils.state_representations import ( + BaseStateEstimator, + XYXYStateEstimator, +) + + +class ByteTrackTracklet(BaseTracklet): + count_id: int = 0 + + def __init__( + self, + initial_bbox: np.ndarray, + state_estimator_class: type[BaseStateEstimator] = XYXYStateEstimator, + ) -> None: + super().__init__(initial_bbox, state_estimator_class) + self._configure_noise() + # Count initial bbox as first successful update (matches original + # ByteTrackKalmanBoxTracker behavior where hits started at 1) + self.number_of_successful_consecutive_updates = 1 + + def update(self, bbox: np.ndarray | None) -> None: + """Update tracklet with new observation or None if missed.""" + if bbox is not None: + self.state_estimator.update(bbox) + self.time_since_update = 0 + self.number_of_successful_consecutive_updates += 1 + else: + self.state_estimator.update(None) + self.time_since_update += 1 + self.number_of_successful_consecutive_updates = 0 + + def predict(self) -> np.ndarray: + """Predict next bounding box position.""" + self.state_estimator.predict() + self.age += 1 + return self.state_estimator.state_to_bbox() + + def get_state_bbox(self) -> np.ndarray: + """Get current bounding box estimate from the filter/state.""" + return self.state_estimator.state_to_bbox() + + def _configure_noise(self) -> None: + """Configure Kalman filter noise (original ByteTrack tuning).""" + kf = self.state_estimator.kf + self.state_estimator.set_kf_covariances( + R=kf.R * 0.1, + Q=kf.Q * 0.01, + ) diff --git a/trackers/core/bytetrack/utils.py b/trackers/core/bytetrack/utils.py new file mode 100644 index 00000000..7bb80ddd --- /dev/null +++ b/trackers/core/bytetrack/utils.py @@ -0,0 +1,43 @@ +# ------------------------------------------------------------------------ +# Trackers +# Copyright (c) 2026 Roboflow. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 [see LICENSE for details] +# ------------------------------------------------------------------------ + +from collections.abc import Sequence + +from trackers.utils.base_tracklet import BaseTracklet + + +def _get_alive_tracklets( + tracklets: Sequence[BaseTracklet], + minimum_consecutive_frames: int, + maximum_frames_without_update: int, +) -> list[BaseTracklet]: + """ + Remove dead or immature lost tracklets and get alive trackers + that are within `maximum_frames_without_update` AND (it's mature OR + it was just updated). + + Args: + tracklets: List of BaseTracklet objects. + minimum_consecutive_frames: Number of consecutive frames that an object + must be tracked before it is considered a 'valid' track. + maximum_frames_without_update: Maximum number of frames without update + before a track is considered dead. + + Returns: + List of alive tracklets. + """ + alive_tracklets = [] + for tracklet in tracklets: + is_mature = ( + tracklet.number_of_successful_consecutive_updates + >= minimum_consecutive_frames + ) + is_active = tracklet.time_since_update == 0 + if tracklet.time_since_update < maximum_frames_without_update and ( + is_mature or is_active + ): + alive_tracklets.append(tracklet) + return alive_tracklets diff --git a/trackers/core/ocsort/tracker.py b/trackers/core/ocsort/tracker.py index f84fa7b6..94b66305 100644 --- a/trackers/core/ocsort/tracker.py +++ b/trackers/core/ocsort/tracker.py @@ -4,6 +4,8 @@ # Licensed under the Apache License, Version 2.0 [see LICENSE for details] # ------------------------------------------------------------------------ +from copy import deepcopy + import numpy as np import supervision as sv from scipy.optimize import linear_sum_assignment @@ -14,7 +16,10 @@ _build_direction_consistency_matrix_batch, _get_iou_matrix, ) -from trackers.utils.state_representations import XCYCSRStateEstimator +from trackers.utils.state_representations import ( + BaseStateEstimator, + XCYCSRStateEstimator, +) class OCSORTTracker(BaseTracker): @@ -57,6 +62,9 @@ class OCSORTTracker(BaseTracker): delta_t: `int` specifying number of past frames to use for velocity estimation. Higher values provide more stable direction estimates during occlusion. + state_estimator_class: State estimator class to use for Kalman filter. + Defaults to `XCYCSRStateEstimator`. Can also use + `XYXYStateEstimator` for corner-based representation. """ tracker_id = "ocsort" @@ -70,6 +78,7 @@ def __init__( direction_consistency_weight: float = 0.2, high_conf_det_threshold: float = 0.6, delta_t: int = 3, + state_estimator_class: type[BaseStateEstimator] = XCYCSRStateEstimator, ) -> None: # Calculate maximum frames without update based on lost_track_buffer and # frame_rate. This scales the buffer based on the frame rate to ensure @@ -83,7 +92,7 @@ def __init__( self.tracks: list[OCSORTTracklet] = [] self.frame_count = 0 - self.state_estimator_class = XCYCSRStateEstimator + self.state_estimator_class = state_estimator_class def _get_associated_indices( self, @@ -248,7 +257,8 @@ def update(self, detections: sv.Detections) -> sv.Detections: # Build output — single index into the filtered detections preserves # all metadata (confidence, class_id, mask, data dict). if out_det_indices: - result = detections[out_det_indices] + copied_detections = deepcopy(detections) + result = copied_detections[out_det_indices] result.tracker_id = np.array(out_tracker_ids, dtype=int) else: result = sv.Detections.empty() diff --git a/trackers/core/ocsort/tracklet.py b/trackers/core/ocsort/tracklet.py index 89aa2b64..387935b3 100644 --- a/trackers/core/ocsort/tracklet.py +++ b/trackers/core/ocsort/tracklet.py @@ -8,6 +8,7 @@ import numpy as np +from trackers.utils.base_tracklet import BaseTracklet from trackers.utils.converters import ( xyxy_to_xcycsr, ) @@ -17,7 +18,7 @@ ) -class OCSORTTracklet: +class OCSORTTracklet(BaseTracklet): """Tracklet for OC-SORT tracker with ORU (Observation-centric Re-Update). Manages a single tracked object with Kalman filter state estimation. @@ -50,18 +51,17 @@ def __init__( Args: initial_bbox: Initial bounding box `[x1, y1, x2, y2]`. - kalman_filter_class: Kalman filter class to use. Instantiated + state_estimator_class: State estimator class to use. Instantiated with *initial_bbox*. Defaults to `XCYCSRKalmanFilter`. delta_t: Number of timesteps back to look for velocity estimation. Higher values use observations further in the past to estimate motion direction, providing more stable velocity estimates. """ - self.age = 0 # Initialize state estimator (wraps KalmanFilter + state repr) - self.kalman_filter: BaseStateEstimator = state_estimator_class(initial_bbox) - + super().__init__(initial_bbox, state_estimator_class) + self._configure_noise() # Observation history for ORU and delta_t self.delta_t = delta_t self.last_observation = initial_bbox @@ -69,28 +69,13 @@ def __init__( self.observations: dict[int, np.ndarray] = {} self.velocity: np.ndarray | None = None - # Track ID can be initialized before mature in oc-sort - # it is assigned if the frame number is less than minimum_consecutive_frames - self.tracker_id = -1 - - # Tracking counters - self.number_of_successful_consecutive_updates = 0 - self.time_since_update = 0 - # ORU: saved state for freeze/unfreeze self._frozen_state: dict | None = None self._observed = True - @classmethod - def get_next_tracker_id(cls) -> int: - """Get next available tracker ID.""" - next_id = cls.count_id - cls.count_id += 1 - return next_id - def _freeze(self) -> None: """Save Kalman filter state before track is lost (ORU mechanism).""" - self._frozen_state = self.kalman_filter.get_state() + self._frozen_state = self.state_estimator.get_state() def _unfreeze(self, new_bbox: np.ndarray) -> None: """Restore state and apply virtual trajectory (ORU mechanism). @@ -106,11 +91,11 @@ def _unfreeze(self, new_bbox: np.ndarray) -> None: return # Restore to frozen state - self.kalman_filter.set_state(self._frozen_state) + self.state_estimator.set_state(self._frozen_state) time_gap = self.time_since_update # this is oc-sort specific - if isinstance(self.kalman_filter, XCYCSRStateEstimator): + if isinstance(self.state_estimator, XCYCSRStateEstimator): self._unfreeze_xcycsr(new_bbox, time_gap) else: self._unfreeze_xyxy(new_bbox, time_gap) @@ -155,9 +140,9 @@ def _unfreeze_xcycsr(self, new_bbox: np.ndarray, time_gap: int) -> None: r = w / h virtual_obs = np.array([x, y, s, r]).reshape((4, 1)) - self.kalman_filter.kf.update(virtual_obs) + self.state_estimator.kf.update(virtual_obs) if i < time_gap - 1: - self.kalman_filter.kf.predict() + self.state_estimator.kf.predict() def _unfreeze_xyxy(self, new_bbox: np.ndarray, time_gap: int) -> None: """ORU interpolation for XYXY representation. @@ -174,9 +159,9 @@ def _unfreeze_xyxy(self, new_bbox: np.ndarray, time_gap: int) -> None: for i in range(time_gap): virtual_obs = (last_xyxy + (i + 1) * delta).reshape((4, 1)) - self.kalman_filter.kf.update(virtual_obs) + self.state_estimator.kf.update(virtual_obs) if i < time_gap - 1: - self.kalman_filter.kf.predict() + self.state_estimator.kf.predict() def get_k_previous_obs(self) -> np.ndarray | None: """Get observation from delta_t steps ago. @@ -239,7 +224,7 @@ def update(self, bbox: np.ndarray | None) -> None: # Update KF with the real observation # (after ORU this is the final update at the correct time step; # without ORU this is the normal measurement update) - self.kalman_filter.update(bbox) + self.state_estimator.update(bbox) self._observed = True self.time_since_update = 0 @@ -252,7 +237,7 @@ def update(self, bbox: np.ndarray | None) -> None: if self._observed: self._freeze() self._observed = False - self.kalman_filter.update(None) + self.state_estimator.update(None) def predict(self) -> np.ndarray: """Predict next bounding box position. @@ -260,14 +245,14 @@ def predict(self) -> np.ndarray: Returns: Predicted bounding box `[x1, y1, x2, y2]`. """ - self.kalman_filter.predict() + self.state_estimator.predict() self.age += 1 if self.time_since_update > 0: self.number_of_successful_consecutive_updates = 0 self.time_since_update += 1 - return self.kalman_filter.state_to_bbox() + return self.state_estimator.state_to_bbox() def get_state_bbox(self) -> np.ndarray: """Get current bounding box estimate from Kalman filter. @@ -275,7 +260,26 @@ def get_state_bbox(self) -> np.ndarray: Returns: Current bounding box estimate `[x1, y1, x2, y2]`. """ - return self.kalman_filter.state_to_bbox() + return self.state_estimator.state_to_bbox() + + def _configure_noise(self) -> None: + """Configure Kalman filter noise matrices (OC-SORT paper tuning).""" + kf = self.state_estimator.kf + R = kf.R + P = kf.P + Q = kf.Q + if isinstance(self.state_estimator, XCYCSRStateEstimator): + R[2:, 2:] *= 10.0 + P[4:, 4:] *= 1000.0 + P *= 10.0 + Q[-1, -1] *= 0.01 + Q[4:, 4:] *= 0.01 + else: + # XYXY: same velocity uncertainty scaling + P[4:, 4:] *= 1000.0 + P *= 10.0 + Q[4:, 4:] *= 0.01 + self.state_estimator.set_kf_covariances(R=R, Q=Q, P=P) def resolve_tracker_id( self, diff --git a/trackers/core/sort/kalman.py b/trackers/core/sort/kalman.py deleted file mode 100644 index 144fde58..00000000 --- a/trackers/core/sort/kalman.py +++ /dev/null @@ -1,147 +0,0 @@ -# ------------------------------------------------------------------------ -# Trackers -# Copyright (c) 2026 Roboflow. All Rights Reserved. -# Licensed under the Apache License, Version 2.0 [see LICENSE for details] -# ------------------------------------------------------------------------ - -import numpy as np -from numpy.typing import NDArray - - -class SORTKalmanBoxTracker: - """ - The `SORTKalmanBoxTracker` class represents the internals of a single - tracked object (bounding box), with a Kalman filter to predict and update - its position. - - Attributes: - tracker_id: Unique identifier for the tracker. - number_of_successful_updates: Number of times the object has been - updated successfully. - time_since_update: Number of frames since the last update. - state: State vector of the bounding box. - F: State transition matrix. - H: Measurement matrix. - Q: Process noise covariance matrix. - R: Measurement noise covariance matrix. - P: Error covariance matrix. - count_id: Class variable to assign unique IDs to each tracker. - - Args: - bbox: Initial bounding box in the form [x1, y1, x2, y2]. - """ - - count_id: int = 0 - state: NDArray[np.float32] - F: NDArray[np.float32] - H: NDArray[np.float32] - Q: NDArray[np.float32] - R: NDArray[np.float32] - P: NDArray[np.float32] - - @classmethod - def get_next_tracker_id(cls) -> int: - next_id = cls.count_id - cls.count_id += 1 - return next_id - - def __init__(self, bbox: NDArray[np.float64]) -> None: - # Initialize with a temporary ID of -1 - # Will be assigned a real ID when the track is considered mature - self.tracker_id = -1 - - # Number of hits indicates how many times the object has been - # updated successfully - self.number_of_successful_updates = 1 - # Number of frames since the last update - self.time_since_update = 0 - - # For simplicity, we keep a small state vector: - # (x, y, x2, y2, vx, vy, vx2, vy2). - # We'll store the bounding box in "self.state" - self.state = np.zeros((8, 1), dtype=np.float32) - - # Initialize state directly from the first detection - bbox_float: NDArray[np.float32] = bbox.astype(np.float32) - self.state[0, 0] = bbox_float[0] - self.state[1, 0] = bbox_float[1] - self.state[2, 0] = bbox_float[2] - self.state[3, 0] = bbox_float[3] - - # Basic constant velocity model - self._initialize_kalman_filter() - - def _initialize_kalman_filter(self) -> None: - """ - Sets up the matrices for the Kalman filter. - """ - # State transition matrix (F): 8x8 - # We assume a constant velocity model. Positions are incremented by - # velocity each step. - self.F = np.eye(8, dtype=np.float32) - for i in range(4): - self.F[i, i + 4] = 1.0 - - # Measurement matrix (H): we directly measure x1, y1, x2, y2 - self.H = np.eye(4, 8, dtype=np.float32) # 4x8 - - # Process covariance matrix (Q) - self.Q = np.eye(8, dtype=np.float32) * 0.01 - - # Measurement covariance (R): noise in detection - self.R = np.eye(4, dtype=np.float32) * 0.1 - - # Error covariance matrix (P) - self.P = np.eye(8, dtype=np.float32) - - def predict(self) -> None: - """ - Predict the next state of the bounding box (applies the state transition). - """ - # Predict state - self.state = (self.F @ self.state).astype(np.float32) - # Predict error covariance - self.P = (self.F @ self.P @ self.F.T + self.Q).astype(np.float32) - - # Increase time since update - self.time_since_update += 1 - - def update(self, bbox: NDArray[np.float64]) -> None: - """ - Updates the state with a new detected bounding box. - - Args: - bbox: Detected bounding box in the form [x1, y1, x2, y2]. - """ - self.time_since_update = 0 - self.number_of_successful_updates += 1 - - # Kalman Gain - S: NDArray[np.float32] = (self.H @ self.P @ self.H.T + self.R).astype( - np.float32 - ) - K: NDArray[np.float32] = (self.P @ self.H.T @ np.linalg.inv(S)).astype( - np.float32 - ) - - # Residual - measurement: NDArray[np.float32] = bbox.reshape((4, 1)).astype(np.float32) - y: NDArray[np.float32] = ( - measurement - self.H @ self.state - ) # y should be float32 (4,1) - - # Update state - self.state = (self.state + K @ y).astype(np.float32) - - # Update covariance - identity_matrix: NDArray[np.float32] = np.eye(8, dtype=np.float32) - self.P = ((identity_matrix - K @ self.H) @ self.P).astype(np.float32) - - def get_state_bbox(self) -> NDArray[np.float32]: - """ - Returns the current bounding box estimate from the state vector. - - Returns: - The bounding box [x1, y1, x2, y2]. - """ - return self.state[:4, 0].flatten().astype(np.float32) diff --git a/trackers/core/sort/tracker.py b/trackers/core/sort/tracker.py index 5e0f2a5f..37b93c44 100644 --- a/trackers/core/sort/tracker.py +++ b/trackers/core/sort/tracker.py @@ -9,10 +9,14 @@ from scipy.optimize import linear_sum_assignment from trackers.core.base import BaseTracker -from trackers.core.sort.kalman import SORTKalmanBoxTracker +from trackers.core.sort.tracklet import SORTTracklet from trackers.core.sort.utils import ( - get_alive_trackers, - get_iou_matrix, + _get_alive_tracklets, + _get_iou_matrix, +) +from trackers.utils.state_representations import ( + BaseStateEstimator, + XYXYStateEstimator, ) @@ -51,6 +55,9 @@ class SORTTracker(BaseTracker): threshold, tracks are assigned `tracker_id` of `-1`. minimum_iou_threshold: `float` specifying IoU threshold for associating detections to existing tracks. Higher values require more overlap. + state_estimator_class: State estimator class to use for Kalman filter. + Defaults to `XYXYStateEstimator`. Can also use + `XYXYStateEstimator` for corner-based representation. """ tracker_id = "sort" @@ -62,6 +69,7 @@ def __init__( track_activation_threshold: float = 0.25, minimum_consecutive_frames: int = 3, minimum_iou_threshold: float = 0.3, + state_estimator_class: type[BaseStateEstimator] = XYXYStateEstimator, ) -> None: # Calculate maximum frames without update based on lost_track_buffer and # frame_rate. This scales the buffer based on the frame rate to ensure @@ -70,28 +78,29 @@ def __init__( self.minimum_consecutive_frames = minimum_consecutive_frames self.minimum_iou_threshold = minimum_iou_threshold self.track_activation_threshold = track_activation_threshold + self.state_estimator_class = state_estimator_class - # Active trackers - self.trackers: list[SORTKalmanBoxTracker] = [] + # Active tracklets + self.tracklets: list[SORTTracklet] = [] def _get_associated_indices( self, iou_matrix: np.ndarray, detection_boxes: np.ndarray ) -> tuple[list[tuple[int, int]], set[int], set[int]]: """ - Associate detections to trackers based on IOU + Associate detections to tracklets based on IOU Args: iou_matrix: IOU cost matrix. detection_boxes: Detected bounding boxes in the form [x1, y1, x2, y2]. Returns: - Matched indices, unmatched trackers, unmatched detections. + Matched indices, unmatched tracklets, unmatched detections. """ matched_indices = [] - unmatched_trackers = set(range(len(self.trackers))) + unmatched_tracklets = set(range(len(self.tracklets))) unmatched_detections = set(range(len(detection_boxes))) - if len(self.trackers) > 0 and len(detection_boxes) > 0: + if len(self.tracklets) > 0 and len(detection_boxes) > 0: # Find optimal assignment using scipy.optimize.linear_sum_assignment. # Note that it uses a a modified Jonker-Volgenant algorithm with no # initialization instead of the Hungarian algorithm as mentioned in the @@ -100,26 +109,24 @@ def _get_associated_indices( for row, col in zip(row_indices, col_indices): if iou_matrix[row, col] >= self.minimum_iou_threshold: matched_indices.append((row, col)) - unmatched_trackers.remove(row) + unmatched_tracklets.remove(row) unmatched_detections.remove(col) - return matched_indices, unmatched_trackers, unmatched_detections + return matched_indices, unmatched_tracklets, unmatched_detections - def _spawn_new_trackers( + def _spawn_new_tracklets( self, - confidences: np.ndarray | None, + confidences: np.ndarray, detection_boxes: np.ndarray, unmatched_detections: set[int], ) -> None: for detection_idx in unmatched_detections: - if ( - confidences is None - or detection_idx >= len(confidences) - or confidences[detection_idx] >= self.track_activation_threshold - ): - self.trackers.append( - SORTKalmanBoxTracker(detection_boxes[detection_idx]) + if confidences[detection_idx] >= self.track_activation_threshold: + new_tracker = SORTTracklet( + detection_boxes[detection_idx], + state_estimator_class=self.state_estimator_class, ) + self.tracklets.append(new_tracker) def update(self, detections: sv.Detections) -> sv.Detections: """Update tracker state with new detections and return tracked objects. @@ -135,7 +142,7 @@ def update(self, detections: sv.Detections) -> sv.Detections: `sv.Detections` with `tracker_id` assigned for each detection. Unmatched or immature tracks have `tracker_id` of `-1`. """ - if len(self.trackers) == 0 and len(detections) == 0: + if len(self.tracklets) == 0 and len(detections) == 0: detections.tracker_id = np.array([], dtype=int) return detections @@ -143,37 +150,43 @@ def update(self, detections: sv.Detections) -> sv.Detections: detections.xyxy if len(detections) > 0 else np.array([]).reshape(0, 4) ) - for tracker in self.trackers: - tracker.predict() + for tracklet in self.tracklets: + tracklet.predict() + + iou_matrix = _get_iou_matrix(self.tracklets, detection_boxes) - iou_matrix = get_iou_matrix(self.trackers, detection_boxes) - matched_indices, _, unmatched_detections = self._get_associated_indices( - iou_matrix, detection_boxes + # Associate detections to tracklets based on IOU + matched_indices, unmatched_tracklets, unmatched_detections = ( + self._get_associated_indices(iou_matrix, detection_boxes) ) - # Update matched trackers and record the det_idx -> tracker mapping - matched_tracker_for_det: dict[int, SORTKalmanBoxTracker] = {} + # Update matched tracklets and record the det_idx -> tracklet mapping + matched_tracklet_for_det: dict[int, SORTTracklet] = {} for row, col in matched_indices: - self.trackers[row].update(detection_boxes[col]) - matched_tracker_for_det[col] = self.trackers[row] + self.tracklets[row].update(detection_boxes[col]) + matched_tracklet_for_det[col] = self.tracklets[row] - self._spawn_new_trackers( + # Update non matched for increasing time_since_update + for index in unmatched_tracklets: + self.tracklets[index].update(None) + self._spawn_new_tracklets( detections.confidence, detection_boxes, unmatched_detections ) - self.trackers = get_alive_trackers( - self.trackers, + # Remove dead tracklets + self.tracklets = _get_alive_tracklets( # type: ignore[assignment] + self.tracklets, self.minimum_consecutive_frames, self.maximum_frames_without_update, ) # Build tracker_ids from the recorded mapping (no deepcopy, no re-IoU) tracker_ids = np.full(len(detection_boxes), -1, dtype=int) - for det_idx, tracker in matched_tracker_for_det.items(): - if tracker.number_of_successful_updates >= self.minimum_consecutive_frames: - if tracker.tracker_id == -1: - tracker.tracker_id = SORTKalmanBoxTracker.get_next_tracker_id() - tracker_ids[det_idx] = tracker.tracker_id + for det_idx, tracklet in matched_tracklet_for_det.items(): + if tracklet.number_of_successful_updates >= self.minimum_consecutive_frames: + if tracklet.tracker_id == -1: + tracklet.tracker_id = SORTTracklet.get_next_tracker_id() + tracker_ids[det_idx] = tracklet.tracker_id detections.tracker_id = tracker_ids return detections @@ -182,5 +195,5 @@ def reset(self) -> None: """Reset tracker state by clearing all tracks and resetting ID counter. Call this method when switching to a new video or scene. """ - self.trackers = [] - SORTKalmanBoxTracker.count_id = 0 + self.tracklets = [] + SORTTracklet.count_id = 0 diff --git a/trackers/core/sort/tracklet.py b/trackers/core/sort/tracklet.py new file mode 100644 index 00000000..486a25da --- /dev/null +++ b/trackers/core/sort/tracklet.py @@ -0,0 +1,75 @@ +# ------------------------------------------------------------------------ +# Trackers +# Copyright (c) 2026 Roboflow. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 [see LICENSE for details] +# ------------------------------------------------------------------------ + +import numpy as np + +from trackers.utils.base_tracklet import BaseTracklet +from trackers.utils.state_representations import ( + BaseStateEstimator, + XCYCSRStateEstimator, + XYXYStateEstimator, +) + + +class SORTTracklet(BaseTracklet): + count_id: int = 0 + + def __init__( + self, + initial_bbox: np.ndarray, + state_estimator_class: type[BaseStateEstimator] = XYXYStateEstimator, + ) -> None: + super().__init__(initial_bbox, state_estimator_class) + self._configure_noise() + # SORTKalmanBoxTracker behavior where hits started at 1) + self.number_of_successful_updates = ( + 1 # SORT doesn't use number_of_successful_consecutive_updates + ) + + def update(self, bbox: np.ndarray | None) -> None: + """Update tracklet with new observation or None if missed.""" + if bbox is not None: + self.state_estimator.update(bbox) + self.time_since_update = 0 + self.number_of_successful_updates += 1 + else: + self.state_estimator.update(None) + self.time_since_update += 1 + + def predict(self) -> np.ndarray: + """Predict next bounding box position.""" + self.state_estimator.predict() + self.age += 1 + return self.state_estimator.state_to_bbox() + + def get_state_bbox(self) -> np.ndarray: + """Get current bounding box estimate from the filter/state.""" + return self.state_estimator.state_to_bbox() + + def _configure_noise(self) -> None: + """Configure Kalman filter noise matrices (OC-SORT paper behaviour) and SORT + behaviour for XYXY coordinates.""" + kf = self.state_estimator.kf + R = kf.R + P = kf.P + Q = kf.Q + if isinstance(self.state_estimator, XCYCSRStateEstimator): + R[2:, 2:] *= 10.0 + P[4:, 4:] *= 1000.0 + P *= 10.0 + Q[-1, -1] *= 0.01 + Q[4:, 4:] *= 0.01 + else: + # Process covariance matrix (Q) + Q = np.eye(8, dtype=np.float64) * 0.01 + + # Measurement covariance (R): noise in detection + R = np.eye(4, dtype=np.float64) * 0.1 + + # Error covariance matrix (P) + P = np.eye(8, dtype=np.float64) + + self.state_estimator.set_kf_covariances(R=R, Q=Q, P=P) diff --git a/trackers/core/sort/utils.py b/trackers/core/sort/utils.py index 987f90f4..23bcd020 100644 --- a/trackers/core/sort/utils.py +++ b/trackers/core/sort/utils.py @@ -5,148 +5,67 @@ # ------------------------------------------------------------------------ from collections.abc import Sequence -from copy import deepcopy -from typing import TypeVar import numpy as np import supervision as sv -from trackers.core.bytetrack.kalman import ByteTrackKalmanBoxTracker -from trackers.core.sort.kalman import SORTKalmanBoxTracker +from trackers.core.sort.tracklet import SORTTracklet +from trackers.utils.base_tracklet import BaseTracklet -KalmanBoxTrackerType = TypeVar( - "KalmanBoxTrackerType", bound=SORTKalmanBoxTracker | ByteTrackKalmanBoxTracker -) - -def get_alive_trackers( - trackers: Sequence[KalmanBoxTrackerType], +def _get_alive_tracklets( + tracklets: Sequence[SORTTracklet], minimum_consecutive_frames: int, maximum_frames_without_update: int, -) -> list[KalmanBoxTrackerType]: +) -> list[SORTTracklet]: """ Remove dead or immature lost tracklets and get alive trackers that are within `maximum_frames_without_update` AND (it's mature OR it was just updated). Args: - trackers: List of KalmanBoxTracker objects. + tracklets: List of SORTTracklet objects. minimum_consecutive_frames: Number of consecutive frames that an object must be tracked before it is considered a 'valid' track. maximum_frames_without_update: Maximum number of frames without update before a track is considered dead. Returns: - List of alive trackers. + List of alive tracklets. """ - alive_trackers = [] - for tracker in trackers: - is_mature = tracker.number_of_successful_updates >= minimum_consecutive_frames - is_active = tracker.time_since_update == 0 - if tracker.time_since_update < maximum_frames_without_update and ( + alive_tracklets = [] + for tracklet in tracklets: + is_mature = tracklet.number_of_successful_updates >= minimum_consecutive_frames + is_active = tracklet.time_since_update == 0 + if tracklet.time_since_update < maximum_frames_without_update and ( is_mature or is_active ): - alive_trackers.append(tracker) - return alive_trackers + alive_tracklets.append(tracklet) + return alive_tracklets -def get_iou_matrix( - trackers: Sequence[KalmanBoxTrackerType], detection_boxes: np.ndarray +def _get_iou_matrix( + tracks: Sequence[BaseTracklet], detection_boxes: np.ndarray ) -> np.ndarray: """ Build IOU cost matrix between detections and predicted bounding boxes Args: - trackers: List of KalmanBoxTracker objects. + tracks: List of BaseTracklet objects. detection_boxes: Detected bounding boxes in the form [x1, y1, x2, y2]. Returns: IOU cost matrix. """ - predicted_boxes = np.array([t.get_state_bbox() for t in trackers]) - if len(predicted_boxes) == 0 and len(trackers) > 0: + predicted_boxes = np.array([t.get_state_bbox() for t in tracks]) + if len(predicted_boxes) == 0 and len(tracks) > 0: # Handle case where get_state_bbox might return empty array - predicted_boxes = np.zeros((len(trackers), 4), dtype=np.float32) + predicted_boxes = np.zeros((len(tracks), 4), dtype=np.float32) - if len(trackers) > 0 and len(detection_boxes) > 0: + if len(tracks) > 0 and len(detection_boxes) > 0: iou_matrix = sv.box_iou_batch(predicted_boxes, detection_boxes) else: - iou_matrix = np.zeros((len(trackers), len(detection_boxes)), dtype=np.float32) + iou_matrix = np.zeros((len(tracks), len(detection_boxes)), dtype=np.float32) return iou_matrix - - -def update_detections_with_track_ids( - trackers: Sequence[KalmanBoxTrackerType], - detections: sv.Detections, - detection_boxes: np.ndarray, - minimum_iou_threshold: float, - minimum_consecutive_frames: int, -) -> sv.Detections: - """ - The function prepares the updated Detections with track IDs. - If a tracker is "mature" (>= `minimum_consecutive_frames`) or recently updated, - it is assigned an ID to the detection that just updated it. - - Args: - trackers: List of SORTKalmanBoxTracker objects. - detections: The latest set of object detections. - detection_boxes: Detected bounding boxes in the - form [x1, y1, x2, y2]. - minimum_iou_threshold: IOU threshold for associating detections to - existing tracks. - minimum_consecutive_frames: Number of consecutive frames that an object - must be tracked before it is considered a 'valid' track. - - Returns: - A copy of the detections with `tracker_id` set - for each detection that is tracked. - """ - # Re-run association in the same way (could also store direct mapping) - final_tracker_ids = [-1] * len(detection_boxes) - - # Recalculate predicted_boxes based on current trackers after some may have - # been removed - predicted_boxes = np.array([t.get_state_bbox() for t in trackers]) - iou_matrix_final = np.zeros((len(trackers), len(detection_boxes)), dtype=np.float32) - - # Ensure predicted_boxes is properly shaped before the second iou calculation - if len(predicted_boxes) == 0 and len(trackers) > 0: - predicted_boxes = np.zeros((len(trackers), 4), dtype=np.float32) - - if len(trackers) > 0 and len(detection_boxes) > 0: - iou_matrix_final = sv.box_iou_batch(predicted_boxes, detection_boxes) - - row_indices, col_indices = np.where(iou_matrix_final > minimum_iou_threshold) - sorted_pairs = sorted( - zip(row_indices, col_indices), - key=lambda x: iou_matrix_final[x[0], x[1]], - reverse=True, - ) - used_rows: set[int] = set() - used_cols: set[int] = set() - for row, col in sorted_pairs: - # Double check index is in range - if row < len(trackers): - tracker_obj = trackers[int(row)] - # Only assign if the track is "mature" or is new but has enough hits - if (int(row) not in used_rows) and (int(col) not in used_cols): - if ( - tracker_obj.number_of_successful_updates - >= minimum_consecutive_frames - ): - # If tracker is mature but still has ID -1, assign a new ID - if tracker_obj.tracker_id == -1: - tracker_obj.tracker_id = ( - SORTKalmanBoxTracker.get_next_tracker_id() - ) - final_tracker_ids[int(col)] = tracker_obj.tracker_id - used_rows.add(int(row)) - used_cols.add(int(col)) - - # Assign tracker IDs to the returned Detections - updated_detections = deepcopy(detections) - updated_detections.tracker_id = np.array(final_tracker_ids) - - return updated_detections diff --git a/trackers/utils/base_tracklet.py b/trackers/utils/base_tracklet.py new file mode 100644 index 00000000..77477885 --- /dev/null +++ b/trackers/utils/base_tracklet.py @@ -0,0 +1,51 @@ +# ------------------------------------------------------------------------ +# Trackers +# Copyright (c) 2026 Roboflow. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 [see LICENSE for details] +# ------------------------------------------------------------------------ + +from abc import ABC, abstractmethod + +import numpy as np + +from trackers.utils.state_representations import BaseStateEstimator + + +class BaseTracklet(ABC): + """ + Abstract base class for all tracker-specific tracklets. + Provides common interface and attributes for tracklet management. + """ + + count_id: int = 0 + + def __init__( + self, bbox: np.ndarray, state_estimator_class: type[BaseStateEstimator] + ) -> None: + self.age = 0 + self.state_estimator: BaseStateEstimator = state_estimator_class(bbox) + + self.tracker_id = -1 + self.time_since_update = 0 + self.number_of_successful_consecutive_updates = 0 + + @classmethod + def get_next_tracker_id(cls) -> int: + next_id = cls.count_id + cls.count_id += 1 + return next_id + + @abstractmethod + def update(self, bbox: np.ndarray | None) -> None: + """Update tracklet with new observation or None if missed.""" + pass + + @abstractmethod + def predict(self) -> np.ndarray: + """Predict next bounding box position.""" + pass + + @abstractmethod + def get_state_bbox(self) -> np.ndarray: + """Get current bounding box estimate from the filter/state.""" + pass diff --git a/trackers/utils/state_representations.py b/trackers/utils/state_representations.py index f1e93a22..17b878b5 100644 --- a/trackers/utils/state_representations.py +++ b/trackers/utils/state_representations.py @@ -126,6 +126,26 @@ def set_state(self, state: dict) -> None: """ self.kf.set_state(state) + def set_kf_covariances( + self, + R: np.ndarray | None = None, + Q: np.ndarray | None = None, + P: np.ndarray | None = None, + ) -> None: + """Set Kalman filter parameters. + + Args: + R: Measurement noise covariance matrix. + Q: Process noise covariance matrix. + P: Error covariance matrix. + """ + if R is not None: + self.kf.R = R + if Q is not None: + self.kf.Q = Q + if P is not None: + self.kf.P = P + class XCYCSRStateEstimator(BaseStateEstimator): """Center-based Kalman filter with 7 state dimensions and 4 measurements. @@ -157,13 +177,6 @@ def _create_filter(self, bbox: np.ndarray) -> KalmanFilter: # Measurement function: observe (x, y, s, r) from state kf.H = np.eye(4, 7, dtype=np.float64) - # Noise tuning (from OC-SORT paper) - kf.R[2:, 2:] *= 10.0 - kf.P[4:, 4:] *= 1000.0 # high uncertainty for velocities - kf.P *= 10.0 - kf.Q[-1, -1] *= 0.01 - kf.Q[4:, 4:] *= 0.01 - # Initialise state with first observation kf.x[:4] = xyxy_to_xcycsr(bbox).reshape((4, 1)) @@ -211,12 +224,6 @@ def _create_filter(self, bbox: np.ndarray) -> KalmanFilter: # Measurement function: observe (x1, y1, x2, y2) from state kf.H = np.eye(4, 8, dtype=np.float64) - # Noise tuning (similar scaling to XCYCSR version) - kf.R *= 1.0 # measurement noise - kf.P[4:, 4:] *= 1000.0 # high uncertainty for velocities - kf.P *= 10.0 - kf.Q[4:, 4:] *= 0.01 - # Initialise state with first observation (direct XYXY) kf.x[:4] = bbox.reshape((4, 1))