diff --git a/pyproject.toml b/pyproject.toml index c662c6ee..c625c582 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -196,5 +196,7 @@ module = [ "torchvision", "torchvision.transforms", "firerequests", + "scipy", + "scipy.optimize", ] ignore_missing_imports = true diff --git a/test/core/test_tracker_integration.py b/test/core/test_tracker_integration.py index 31c4688b..b00e8119 100644 --- a/test/core/test_tracker_integration.py +++ b/test/core/test_tracker_integration.py @@ -17,7 +17,7 @@ from trackers.eval import evaluate_mot_sequences from trackers.io.mot import _load_mot_file, _mot_frame_to_detections, _MOTOutput -_TRACKER_IDS = ["sort", "bytetrack", "ocsort"] +_TRACKER_IDS = ["sort", "bytetrack", "ocsort", "botsort"] _METRICS = ["CLEAR", "HOTA", "Identity"] _TEST_DATA_DIR = Path(__file__).resolve().parent.parent / "data" diff --git a/test/data/tracker_expected_dancetrack.json b/test/data/tracker_expected_dancetrack.json index ebf5990b..c5c60aa8 100644 --- a/test/data/tracker_expected_dancetrack.json +++ b/test/data/tracker_expected_dancetrack.json @@ -16,5 +16,11 @@ "MOTA": 98.187, "IDF1": 74.367, "IDSW": 631 + }, + "botsort": { + "HOTA": 79.999, + "MOTA": 99.511, + "IDF1": 76.389, + "IDSW": 614 } } diff --git a/test/data/tracker_expected_sportsmot.json b/test/data/tracker_expected_sportsmot.json index fadde849..08c658a6 100644 --- a/test/data/tracker_expected_sportsmot.json +++ b/test/data/tracker_expected_sportsmot.json @@ -16,5 +16,11 @@ "MOTA": 97.791, "IDF1": 79.21, "IDSW": 917 + }, + "botsort": { + "HOTA": 85.544, + "MOTA": 98.925, + "IDF1": 80.53, + "IDSW": 1107 } } diff --git a/trackers/__init__.py b/trackers/__init__.py index 31c646df..a34ba7a9 100644 --- a/trackers/__init__.py +++ b/trackers/__init__.py @@ -7,6 +7,7 @@ from __future__ import annotations from trackers.annotators.trace import MotionAwareTraceAnnotator +from trackers.core.botsort.tracker import BoTSORTTracker from trackers.core.bytetrack.tracker import ByteTrackTracker from trackers.core.ocsort.tracker import OCSORTTracker from trackers.core.sort.tracker import SORTTracker @@ -22,6 +23,7 @@ from trackers.utils.converters import xcycsr_to_xyxy, xyxy_to_xcycsr __all__ = [ + "BoTSORTTracker", "ByteTrackTracker", "CoordinatesTransformation", "Dataset", diff --git a/trackers/core/botsort/__init__.py b/trackers/core/botsort/__init__.py new file mode 100644 index 00000000..8bae3857 --- /dev/null +++ b/trackers/core/botsort/__init__.py @@ -0,0 +1,8 @@ +# ------------------------------------------------------------------------ +# Trackers +# Copyright (c) 2026 Roboflow. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 [see LICENSE for details] +# ------------------------------------------------------------------------ +from .tracker import BoTSORTTracker + +__all__ = ["BoTSORTTracker"] diff --git a/trackers/core/botsort/cmc.py b/trackers/core/botsort/cmc.py new file mode 100644 index 00000000..f04526ee --- /dev/null +++ b/trackers/core/botsort/cmc.py @@ -0,0 +1,737 @@ +# ------------------------------------------------------------------------ +# Trackers +# Copyright (c) 2026 Roboflow. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 [see LICENSE for details] +# ------------------------------------------------------------------------ + +import copy +from dataclasses import dataclass +from typing import Literal + +import cv2 +import numpy as np + +CMCTMethod = Literal["orb", "sift", "sparseOptFlow", "ecc"] + + +@dataclass +class CMCConfig: + """ + Configuration for camera motion compensation (CMC). + + The CMC module estimates a global 2D affine transform `H` (2x3) between consecutive + frames. This transform is then applied to predicted track states before data + association. + + Attributes: + method: + Camera motion estimation method. + + - "orb": Feature matching using + FAST keypoints + ORB descriptors + BFMatcher (Hamming), + followed by robust affine estimation (RANSAC). + Optionally masks out detection boxes so features are extracted from + background. + - "sift": Feature matching using + SIFT keypoints + SIFT descriptors + BFMatcher (L2), + followed by robust affine estimation (RANSAC). + Optionally masks out detection boxes so features are extracted from + background. "sift" generally produces fewer but more distinctive matches + than ORB at higher compute cost. + - "sparseOptFlow": Sparse optical flow using corner tracking: + goodFeaturesToTrack -> calcOpticalFlowPyrLK -> robust affine estimation + (RANSAC). + - "ecc": Global image alignment using the Enhanced Correlation Coefficient + (ECC) optimization method. This estimates a 2D Euclidean transform + directly from grayscale image intensities rather than from sparse feature + correspondences. + + downscale: + Integer downscale factor applied to frames before running CMC. + + Purpose: + - Speeds up feature extraction / optical flow. + + Behavior: + - Frames are resized to (W//downscale, H//downscale) for motion estimation. + - The resulting affine translation components H[0,2], H[1,2] are scaled back + by multiplying by `downscale`, so the transform is in original image + coordinates. + + fast_threshold: + (ORB only) Threshold for the FAST keypoint detector. + Higher values yield fewer keypoints (more selective); lower values yield + more keypoints. + + ransac_reproj_threshold: + (ORB only) RANSAC reprojection threshold in pixels passed to + OpenCV's affine estimation. It controls how far a point is allowed to + deviate from the estimated model while still being counted as an inlier. + Smaller values are stricter (reject more matches); larger values are more + tolerant. + + max_spatial_distance_frac: + (ORB only) Maximum allowed spatial displacement for a tentative match, + expressed as a fraction of (image width, image height) *after downscale*. + + Example: + If max_spatial_distance_frac = 0.25 and the downscaled frame is (W, H), + then a match is rejected if |dx| >= 0.25*W or |dy| >= 0.25*H. + + Motivation: + Reject obviously incorrect descriptor matches whose displacement is + implausibly large. + + roi_min_frac: + (ORB only) Lower bound of the region-of-interest (ROI) used to select + keypoints, expressed as a fraction of frame size. Points outside the ROI + are masked out. + + Example: + roi_min_frac=0.02 means we ignore a ~2% border on each side. + + roi_max_frac: + (ORB only) Upper bound of the ROI used to select keypoints (fraction of + frame size). Together with roi_min_frac, it defines a central rectangle: + [roi_min_frac..roi_max_frac] in both x and y. + + sift_n_octave_layers: + (SIFT only) Number of octave layers used by SIFT when constructing the + scale-space pyramid. Increasing this can increase sensitivity to scale + changes, at higher compute cost. + + sift_contrast_threshold: + (SIFT only) Threshold controlling how sensitive SIFT is + to low-contrast keypoints. Lower values generally produce more keypoints; + higher values are stricter. + + sift_edge_threshold: + (SIFT only) Threshold controlling rejection of keypoints on edges. + Lower values reject more edge-like responses; higher values are more + permissive. + + sof_max_corners: + (SparseOptFlow only) `maxCorners` passed to `cv2.goodFeaturesToTrack`. + Maximum number of corners to detect for tracking. + Larger values can improve robustness (more points), but cost more compute. + + sof_quality_level: + (SparseOptFlow only) `qualityLevel` passed to `cv2.goodFeaturesToTrack`. + Minimum accepted quality of corners. A higher value keeps only stronger + corners; a lower value yields more corners (including weaker ones). + + sof_min_distance: + (SparseOptFlow only) `minDistance` passed to `cv2.goodFeaturesToTrack`. + Minimum Euclidean distance (in pixels) between returned corners. + Higher values produce more spatially spread points; lower values allow + clustering. + + sof_block_size: + (SparseOptFlow only) `blockSize` passed to `cv2.goodFeaturesToTrack`. + Size of the neighborhood used to compute corner quality (structure tensor + window). + + sof_use_harris: + (SparseOptFlow only) `useHarrisDetector` passed to + `cv2.goodFeaturesToTrack`. If True, uses the Harris corner measure; + if False, uses the Shi-Tomasi measure. + + sof_k: + (SparseOptFlow only) `k` passed to `cv2.goodFeaturesToTrack`. + Harris detector free parameter. Ignored if `sof_use_harris` is False. + + ecc_number_of_iterations: + (ECC only) Maximum number of optimization iterations used by the ECC + alignment procedure. + + ecc_termination_eps: + (ECC only) Convergence tolerance used by the ECC optimizer. + Smaller values require a more precise fit and may increase runtime. + + ecc_gaussian_filter_size: + (ECC only) Gaussian filter size parameter passed to OpenCV's + `findTransformECC`. This can help stabilize optimization on noisy frames. + A value of 1 matches the current implementation. + """ + + method: CMCTMethod = "sparseOptFlow" + downscale: int = 2 + + # Shared ORB and SIFT parameters (_estimate_feature_affine) + ransac_reproj_threshold: float = 3.0 + max_spatial_distance_frac: float = 0.25 + roi_min_frac: float = 0.02 + roi_max_frac: float = 0.98 + + # ORB parameters + fast_threshold: int = 20 + + # SIFT parameters + sift_n_octave_layers: int = 3 + sift_contrast_threshold: float = 0.02 + sift_edge_threshold: int = 20 + + # Sparse optical flow parameters (goodFeaturesToTrack) + sof_max_corners: int = 1000 + sof_quality_level: float = 0.01 + sof_min_distance: int = 1 + sof_block_size: int = 3 + sof_use_harris: bool = False + sof_k: float = 0.04 + + # ECC parameters + + # BoT-SORT's original - resulting in veeery long (=unacceptably long) execution time + # ecc_number_of_iterations: int = 5000 + # ecc_termination_eps: float = 1e-6 + + # Adjusted + ecc_number_of_iterations: int = 50 + ecc_termination_eps: float = 1e-4 + + ecc_gaussian_filter_size: int = 1 + + +class CMC: + """ + Camera motion compensation estimator and track state warper. + + Typical usage in the tracker loop: + H = cmc.estimate(frame_bgr, mask_boxes_xyxy) + CMC.apply_to_tracks(tracks, H) + + Internal state: + - Keeps previous-frame features / points depending on the chosen method. + - On the first frame (or after reset), returns identity transform. + + Notes: + - H maps points from previous frame coordinates to current frame coordinates. + - This class does not perform any drawing/visualization; it only estimates + transforms. + """ + + def __init__(self, cfg: CMCConfig | None = None) -> None: + """ + Initialize CMC. + + Args: + cfg: Optional configuration. If None, defaults are used. + + Notes: + - Detector/extractor/matcher are only created if method is "orb" or "sift". + - feature_paramsare only created if method is "sparseOptFlow". + - ECC optimization settings are created for "ecc". + """ + self.cfg = cfg or CMCConfig() + self.downscale = max(1, int(self.cfg.downscale)) + + # ORB init (only if needed) + self.detector = None + self.extractor = None + self.matcher = None + if self.cfg.method == "orb": + self.detector = cv2.FastFeatureDetector_create(self.cfg.fast_threshold) # type: ignore[attr-defined] + self.extractor = cv2.ORB_create() # type: ignore[attr-defined] + self.matcher = cv2.BFMatcher(cv2.NORM_HAMMING) + elif self.cfg.method == "sift": + self.detector = cv2.SIFT_create( # type: ignore[attr-defined] + nOctaveLayers=self.cfg.sift_n_octave_layers, + contrastThreshold=self.cfg.sift_contrast_threshold, + edgeThreshold=int(self.cfg.sift_edge_threshold), + ) + self.extractor = cv2.SIFT_create( # type: ignore[attr-defined] + nOctaveLayers=self.cfg.sift_n_octave_layers, + contrastThreshold=self.cfg.sift_contrast_threshold, + edgeThreshold=int(self.cfg.sift_edge_threshold), + ) + self.matcher = cv2.BFMatcher(cv2.NORM_L2) + elif self.cfg.method == "sparseOptFlow": + self.feature_params = dict( + maxCorners=self.cfg.sof_max_corners, + qualityLevel=self.cfg.sof_quality_level, + minDistance=self.cfg.sof_min_distance, + blockSize=self.cfg.sof_block_size, + useHarrisDetector=self.cfg.sof_use_harris, + k=self.cfg.sof_k, + ) + elif self.cfg.method == "ecc": + self.warp_mode = cv2.MOTION_EUCLIDEAN + self.criteria = ( + cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, + self.cfg.ecc_number_of_iterations, + self.cfg.ecc_termination_eps, + ) + + self.reset() + + def reset(self) -> None: + """ + Reset internal state. + + After calling reset: + - The next `estimate()` call returns identity and initializes prev-frame state. + - This should be called when starting a new sequence or after a scene cut. + """ + self._initialized = False + + # ORB state + self._prev_kps = None + self._prev_desc: np.ndarray | None = None + + # SparseOptFlow state + self._prev_frame_gray: np.ndarray | None = None + + # shape (N,1,2) from goodFeaturesToTrack + self._prev_points: np.ndarray | None = None + + def estimate( + self, frame_bgr: np.ndarray, dets_xyxy: np.ndarray | None = None + ) -> np.ndarray: + """ + Estimate global affine transform H (2x3) from previous frame to current frame. + + Args: + frame_bgr: Current frame in BGR format (uint8), shape (H, W, 3). + dets_xyxy: Optional detections (N,4) in xyxy format, in original image + scale. Used by feature-based methods (ORB and SIFT) to mask out object + regions during motion estimation. + + Returns: + H: Affine transform matrix of shape (2, 3), dtype float32. + Identity if not enough correspondences or if not initialized yet. + """ + if frame_bgr is None: + return np.eye(2, 3, dtype=np.float32) + + if self.cfg.method == "orb" or self.cfg.method == "sift": + return self._estimate_feature_affine(frame_bgr, dets_xyxy) + + if self.cfg.method == "sparseOptFlow": + return self._estimate_sparse_optflow(frame_bgr) + + if self.cfg.method == "ecc": + return self._estimate_ecc(frame_bgr) + + # fallback + return np.eye(2, 3, dtype=np.float32) + + def _estimate_feature_affine( + self, frame_bgr: np.ndarray, dets_xyxy: np.ndarray | None = None + ) -> np.ndarray: + """ + Feature affine estimation. ORB-based or SIFT-based + (different initializations of self.detector, self.extractor and self.matcher for + ORB and SIFT) + + Steps: + 1) Convert to grayscale (+ optional downscale). + 2) Create ROI mask and optionally mask out detections (background emphasis). + 3) Detect FAST keypoints and compute ORB or SIFT descriptors. + 4) KNN match descriptors against previous frame (ratio test). + 5) Filter matches by max spatial displacement and by 2.5*std inliers. + 6) Estimate affine transform with RANSAC. + 7) Scale translation back up if downscaled. + + Args: + frame_bgr: Current BGR frame. + dets_xyxy: Optional detection boxes for masking (original image scale). + + Returns: + H: (2,3) affine transform mapping previous-current, float32. + """ + H_img, W_img = frame_bgr.shape[:2] + gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY) + + if self.downscale > 1: + gray = cv2.resize(gray, (W_img // self.downscale, H_img // self.downscale)) + H, W = gray.shape[:2] + + # Build mask: central ROI + remove detections (background features) + mask = np.zeros_like(gray, dtype=np.uint8) + y0 = int(self.cfg.roi_min_frac * H) + y1 = int(self.cfg.roi_max_frac * H) + x0 = int(self.cfg.roi_min_frac * W) + x1 = int(self.cfg.roi_max_frac * W) + mask[y0:y1, x0:x1] = 255 + + if dets_xyxy is not None and len(dets_xyxy) > 0: + dets = np.asarray(dets_xyxy, dtype=np.float32) / float(self.downscale) + dets = dets.astype(np.int32) + + # Safety clipping to avoid negative/out-of-bounds slicing + dets[:, 0] = np.clip(dets[:, 0], 0, W - 1) + dets[:, 2] = np.clip(dets[:, 2], 0, W - 1) + dets[:, 1] = np.clip(dets[:, 1], 0, H - 1) + dets[:, 3] = np.clip(dets[:, 3], 0, H - 1) + + for x1b, y1b, x2b, y2b in dets: + if x2b > x1b and y2b > y1b: + mask[y1b:y2b, x1b:x2b] = 0 + + # Detect + describe (ORB) + kps = self.detector.detect(gray, mask) # type: ignore[union-attr] + kps, desc = self.extractor.compute(gray, kps) # type: ignore[union-attr] + + H_aff = np.eye(2, 3, dtype=np.float32) + + # First frame init + if not self._initialized: + self._prev_kps = copy.copy(kps) + self._prev_desc = None if desc is None else copy.copy(desc) + self._initialized = True + return H_aff + + if self._prev_desc is None or desc is None or len(desc) == 0: + self._prev_kps = copy.copy(kps) + self._prev_desc = None if desc is None else copy.copy(desc) + return H_aff + + knn = self.matcher.knnMatch(self._prev_desc, desc, k=2) # type: ignore[union-attr] + if len(knn) == 0: + self._prev_kps = copy.copy(kps) + self._prev_desc = copy.copy(desc) + return H_aff + + max_spatial = self.cfg.max_spatial_distance_frac * np.array( + [W, H], dtype=np.float32 + ) + + prev_pts = [] + curr_pts = [] + spatial = [] + + for pair in knn: + if len(pair) < 2: + continue + m, n = pair + if m.distance < 0.9 * n.distance: + p_prev = np.array(self._prev_kps[m.queryIdx].pt, dtype=np.float32) # type: ignore[index] + p_curr = np.array(kps[m.trainIdx].pt, dtype=np.float32) + d = p_prev - p_curr + if (abs(d[0]) < max_spatial[0]) and (abs(d[1]) < max_spatial[1]): + spatial.append(d) + prev_pts.append(p_prev) + curr_pts.append(p_curr) + + if len(prev_pts) >= 5: + spatial_arr = np.asarray(spatial, dtype=np.float32) + mean = spatial_arr.mean(axis=0) + std = spatial_arr.std(axis=0) + 1e-6 + inl = np.logical_and( + np.abs(spatial_arr[:, 0] - mean[0]) < 2.5 * std[0], + np.abs(spatial_arr[:, 1] - mean[1]) < 2.5 * std[1], + ) + prev_pts_np = np.asarray(prev_pts, dtype=np.float32)[inl] + curr_pts_np = np.asarray(curr_pts, dtype=np.float32)[inl] + + if len(prev_pts_np) >= 5: + H_est, _ = cv2.estimateAffinePartial2D( + prev_pts_np, + curr_pts_np, + method=cv2.RANSAC, + ransacReprojThreshold=self.cfg.ransac_reproj_threshold, + ) + if H_est is not None: + H_aff = H_est.astype(np.float32) + if self.downscale > 1: + H_aff[0, 2] *= self.downscale + H_aff[1, 2] *= self.downscale + + self._prev_kps = copy.copy(kps) + self._prev_desc = copy.copy(desc) + return H_aff + + def _estimate_sparse_optflow(self, frame_bgr: np.ndarray) -> np.ndarray: + """ + Sparse optical-flow-based affine estimation. + + Steps: + 1) grayscale (+ optional downscale) + 2) detect corners using goodFeaturesToTrack + 3) compute correspondences via calcOpticalFlowPyrLK(prev, curr, prev_points) + 4) keep only points with status == 1 + 5) estimate affine transform with RANSAC + 6) scale translation back up if downscaled + + Args: + frame_bgr: Current BGR frame. + + Returns: + H: (2,3) affine transform mapping previous-current, float32. + """ + H_img, W_img = frame_bgr.shape[:2] + frame = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY) + + H_aff = np.eye(2, 3, dtype=np.float32) + + # Downscale + if self.downscale > 1: + frame = cv2.resize( + frame, (W_img // self.downscale, H_img // self.downscale) + ) + + # Find keypoints in current frame + keypoints = cv2.goodFeaturesToTrack(frame, mask=None, **self.feature_params) # type: ignore[call-overload] + + # First frame: init and return identity + if not self._initialized: + self._prev_frame_gray = frame.copy() + self._prev_points = copy.copy(keypoints) + self._initialized = True + return H_aff + + # If we don't have points, re-init + if ( + self._prev_frame_gray is None + or self._prev_points is None + or keypoints is None + ): + self._prev_frame_gray = frame.copy() + self._prev_points = copy.copy(keypoints) + return H_aff + + # Optical flow correspondences + # calcOpticalFlowPyrLK will throw or return nonsense if we give it None + matched, status, _err = cv2.calcOpticalFlowPyrLK( # type: ignore[call-overload] + self._prev_frame_gray, frame, self._prev_points, None + ) + + if status is None or matched is None: + self._prev_frame_gray = frame.copy() + self._prev_points = copy.copy(keypoints) + return H_aff + + # Keep only good correspondences + prev_pts = [] + curr_pts = [] + # status is (N,1) or (N,) + status_flat = status.reshape(-1) + + for i in range(len(status_flat)): + if status_flat[i]: + prev_pts.append(self._prev_points[i]) + curr_pts.append(matched[i]) + + prev_pts_arr = np.array(prev_pts) + curr_pts_arr = np.array(curr_pts) + + # Find rigid matrix + if (np.size(prev_pts_arr, 0) > 4) and ( + np.size(prev_pts_arr, 0) == np.size(curr_pts_arr, 0) + ): + H_est, _ = cv2.estimateAffinePartial2D( # type: ignore[call-overload] + prev_pts_arr, curr_pts_arr, cv2.RANSAC + ) + if H_est is not None: + H_aff = H_est.astype(np.float32) + + # Handle downscale translation back to original image coords + if self.downscale > 1: + H_aff[0, 2] *= self.downscale + H_aff[1, 2] *= self.downscale + else: + print("Warning: not enough matching points") + + # Store to next iteration + self._prev_frame_gray = frame.copy() + # self._prev_points = copy.copy(keypoints) + self._prev_points = None if keypoints is None else keypoints.copy() + + return H_aff + + def _estimate_ecc(self, frame_bgr: np.ndarray) -> np.ndarray: + """ + ECC-based affine motion estimation. + + This method estimates a global 2D Euclidean transform between the previous + frame and the current frame using OpenCV's Enhanced Correlation Coefficient + (ECC) image alignment algorithm. + + Steps: + 1) Convert the current frame to grayscale. + 2) Optionally smooth and downscale the frame. + 3) If this is the first frame, store it and return identity. + 4) Optimize a 2x3 warp matrix aligning the previous frame to the current + frame. + 5) If optimization succeeds, return the estimated transform. + Otherwise, keep the identity transform. + 6) Store the current frame for the next call. + + Args: + frame_bgr: + Current frame in BGR format. + + Returns: + H: + Affine transform matrix of shape (2, 3), dtype float32, mapping + previous-frame coordinates to current-frame coordinates. Returns + identity if initialization has not yet occurred or if ECC optimization + fails. + """ + H_img, W_img = frame_bgr.shape[:2] + frame = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY) + + H_aff = np.eye(2, 3, dtype=np.float32) + + if self.downscale > 1: + frame = cv2.GaussianBlur(frame, (3, 3), 1.5) + frame = cv2.resize( + frame, (W_img // self.downscale, H_img // self.downscale) + ) + + if not self._initialized: + self._prev_frame_gray = frame.copy() + self._initialized = True + return H_aff + + if self._prev_frame_gray is None: + self._prev_frame_gray = frame.copy() + return H_aff + + try: + _cc, H_est = cv2.findTransformECC( # type: ignore[call-overload] + self._prev_frame_gray, + frame, + H_aff, + self.warp_mode, + self.criteria, + None, + self.cfg.ecc_gaussian_filter_size, + ) + if H_est is not None: + H_aff = H_est.astype(np.float32) + except cv2.error: + print("Warning: find transform failed. Set warp as identity") + pass + + # NOTE: this line is not included in the original BoT-SORT. However, + # in a working recurrent estimator, you do need to update the previous frame + # after each call. Otherwise the next call would keep aligning against an old + # frame. + self._prev_frame_gray = frame.copy() + + return H_aff + + @staticmethod + def apply_to_tracks(tracks: list, H: np.ndarray) -> None: + """ + Apply a global affine motion transform to tracker states and covariances + in-place. + + This method updates each track according to the affine transform + + x' = R x + t + + where: + R: + 2x2 linear part of the affine transform (rotation / shear / scale-like + part). + t: + 2D translation vector. + + The input transform `H` is expected in standard OpenCV affine form: + + H = [ R | t ] + + with shape (2, 3). + + Tracker state convention: + Each track is assumed to store its Kalman state as + + [xc, yc, w, h, vxc, vyc, vw, vh]^T + + where: + xc, yc: + Bounding box center coordinates. + w, h: + Bounding box width and height. + vxc, vyc: + Velocities of the center coordinates. + vw, vh: + Velocities of the width and height. + + State update logic: + The affine transform is applied only to the geometric quantities that live + in the 2D image plane as position or velocity vectors: + + 1) Center position: + [xc, yc]^T = R @ [xc, yc]^T + t + + 2) Center velocity: + [vxc, vyc]^T = R @ [vxc, vyc]^T + + 3) Width, height, and their velocities: + [w, h, vw, vh] remain unchanged + + Why width and height are not transformed here: + Width and height are scalar box dimensions, not 2D point coordinates. + In this implementation, camera motion compensation is used to correct the + object center location and its image-plane velocity, while the box size + terms are left unchanged. This keeps the compensation simple and consistent + with the state representation used by the tracker. + + Covariance update: + Each track also stores a covariance matrix `P` describing uncertainty in the + 8D Kalman state. After the mean state is transformed, the covariance is + updated using the linear transform + + P = A @ P @ A.T + + where `A` is an 8x8 block matrix that applies `R` to: + - the center position block [xc, yc] + - the center velocity block [vxc, vyc] + + and leaves the remaining state dimensions unchanged. + + Concretely: + - A[0:2, 0:2] = R + - A[4:6, 4:6] = R + - all other diagonal entries remain 1 + + Args: + tracks: + List of track objects. Each track is expected to expose: + - `state`: NumPy array of shape (8, 1) + - `P`: NumPy array of shape (8, 8) + H: + Affine transform matrix of shape (2, 3), mapping previous-frame image + coordinates to current-frame image coordinates. + + Returns: + None. + The tracks are modified in-place. + + Notes: + - If `H` is None or `tracks` is empty, this method does nothing. + - The method assumes that `H` has already been estimated in image + coordinates onsistent with the tracker state. + - This method does not perform any validity checks on whether the estimated + transform is physically plausible; it simply applies the provided + transform. + """ + if H is None or len(tracks) == 0: + return + + H = H.astype(np.float32) + R = H[:2, :2] + t = H[:2, 2] + + for trk in tracks: + x = trk.state.reshape(-1) + + # Update the state mean using the affine transform. + pos = x[0:2] + vel = x[4:6] + + x[0:2] = R @ pos + t + x[4:6] = R @ vel + + trk.state = x.reshape(8, 1).astype(np.float32) + + # Update the state covariance under the corresponding linear transform. + A = np.eye(8, dtype=np.float32) + A[0:2, 0:2] = R # center position + A[4:6, 4:6] = R # center velocity + # Box size terms (w, h, vw, vh) are not transformed in this implementation. + + trk.P = (A @ trk.P @ A.T).astype(np.float32) diff --git a/trackers/core/botsort/kalman_box_tracker.py b/trackers/core/botsort/kalman_box_tracker.py new file mode 100644 index 00000000..f38a1939 --- /dev/null +++ b/trackers/core/botsort/kalman_box_tracker.py @@ -0,0 +1,449 @@ +# ------------------------------------------------------------------------ +# Trackers +# Copyright (c) 2026 Roboflow. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 [see LICENSE for details] +# ------------------------------------------------------------------------ + +import numpy as np + + +class BoTSORTKalmanBoxTracker: + """ + Kalman-filter-based state estimator for a single tracked object. + + This class maintains the motion state of one object using a linear Kalman filter + with a constant-velocity model. The tracker stores the object state internally in + center-width-height form, but accepts detections and returns boxes in standard + corner format. + + Internal state vector: + [xc, yc, w, h, vxc, vyc, vw, vh]^T + + where: + xc, yc: + Bounding box center coordinates. + w, h: + Bounding box width and height. + vxc, vyc: + Velocities of the center coordinates. + vw, vh: + Velocities of the width and height. + + Public input/output convention: + - input detections to `__init__()` and `update()` are expected in xyxy format: + [x1, y1, x2, y2] + - output from `get_state_bbox()` is returned in xyxy format: + [x1, y1, x2, y2] + + Kalman filter matrices used in this class: + F: + State transition matrix. Propagates the state from one frame to the next + under a constant-velocity assumption. + H: + Measurement matrix. Maps the internal 8D state to the observable 4D + measurement space [xc, yc, w, h]. + Q: + Process noise covariance. Models uncertainty in the motion model used + during prediction. + R: + Measurement noise covariance. Models uncertainty in incoming detections + during the update step. + P: + State covariance matrix. Represents the current uncertainty of the full + 8D state estimate. + + Lifecycle-related attributes: + tracker_id: + Permanent track identifier. Starts at -1 and is assigned later by the + outer tracking logic once the track is considered mature. + number_of_successful_updates: + Number of successful detection-based updates received by this track. + time_since_update: + Number of consecutive prediction steps since the last measurement update. + + Notes: + - The process and measurement noise are scaled using the current object width + and height. This makes the uncertainty proportional to object size. + - Width and height are constrained to remain positive after prediction and + update to avoid degenerate boxes. + """ + + count_id = 0 + + @classmethod + def get_next_tracker_id(cls) -> int: + next_id = cls.count_id + cls.count_id += 1 + return next_id + + def __init__(self, bbox: np.ndarray): + """ + Initialize a new track from the first observed bounding box. + + Args: + bbox: + Initial detection in xyxy format: [x1, y1, x2, y2]. + + Initialization steps: + 1) Set track-management attributes such as `tracker_id`, + `number_of_successful_updates`, and `time_since_update`. + 2) Allocate the internal 8D Kalman state vector: + [xc, yc, w, h, vxc, vyc, vw, vh]^T + 3) Convert the input bounding box from xyxy to xywh form: + [xc, yc, w, h] + 4) Store that measurement in the position/size part of the state. + 5) Initialize the Kalman filter matrices F, H, Q, R, and P. + + Notes: + - Initial velocities are set to zero. + - The initial covariance matrix P is set in `_initialize_kalman_filter()` + and reflects uncertainty about both position/size and velocity. + """ + self.tracker_id = -1 + self.number_of_successful_updates = 1 + self.time_since_update = 0 + + # State mean: [xc, yc, w, h, vxc, vyc, vw, vh]^T + self.state = np.zeros((8, 1), dtype=np.float32) + + # Initialize from first detection in xyxy + measurement = self.xyxy_to_xywh(bbox) + self.state[0:4, 0] = measurement + + self._initialize_kalman_filter(measurement) + + @staticmethod + def xyxy_to_xywh(bbox: np.ndarray) -> np.ndarray: + """ + Convert a bounding box from corner format to center-size format. + + Args: + bbox: + Bounding box in xyxy format: [x1, y1, x2, y2]. + + Returns: + Bounding box in xywh format: [xc, yc, w, h]. + """ + x1, y1, x2, y2 = bbox.astype(np.float32) + w = x2 - x1 + h = y2 - y1 + xc = x1 + w / 2.0 + yc = y1 + h / 2.0 + return np.array([xc, yc, w, h], dtype=np.float32) + + @staticmethod + def xywh_to_xyxy(state_xywh: np.ndarray) -> np.ndarray: + """ + Convert a bounding box from center-size format to corner format. + + Args: + state_xywh: + Bounding box in xywh format: [xc, yc, w, h]. + + Returns: + Bounding box in xyxy format: [x1, y1, x2, y2]. + """ + xc, yc, w, h = state_xywh.astype(np.float32) + x1 = xc - w / 2.0 + y1 = yc - h / 2.0 + x2 = xc + w / 2.0 + y2 = yc + h / 2.0 + return np.array([x1, y1, x2, y2], dtype=np.float32) + + def _initialize_kalman_filter(self, measurement: np.ndarray) -> None: + """ + Initialize the Kalman filter matrices for the current track. + + Args: + measurement: + Initial object measurement in xywh format: + [xc, yc, w, h]. + + This method initializes the following matrices: + + State transition matrix: + F is an 8x8 matrix defining how the state evolves from one frame to the + next. It implements a constant-velocity model: + xc <- xc + vxc + yc <- yc + vyc + w <- w + vw + h <- h + vh + while the velocity terms are carried forward unchanged. + + Measurement matrix: + H is a 4x8 matrix mapping the internal 8D state + [xc, yc, w, h, vxc, vyc, vw, vh]^T + to the observable 4D measurement + [xc, yc, w, h]^T. + In other words, only the first four state components are directly observed. + + Process noise covariance: + Q is an 8x8 diagonal matrix representing uncertainty in the motion model + used during prediction. Larger values allow the predicted state to change + more freely from frame to frame. + + Measurement noise covariance: + R is a 4x4 diagonal matrix representing uncertainty in the detector + measurements used during correction/update. + + State covariance: + P is the initial 8x8 covariance matrix representing uncertainty in the + initial state estimate. The velocity terms are initialized with larger + uncertainty than the position/size terms because they are not directly + observed in the first frame. + + Noise scaling: + The diagonal entries of Q, R, and P are scaled using the initial object + width and height. This makes the uncertainty proportional to object size: + larger objects are allowed proportionally larger absolute motion and noise. + + Notes: + - `sigma_p` controls the scale of position/size process noise. + - `sigma_v` controls the scale of velocity process noise. + - `sigma_m` controls the scale of measurement noise. + - All covariance matrices are diagonal in this implementation. + """ + self.F = np.eye(8, dtype=np.float32) + for i in range(4): + self.F[i, i + 4] = 1.0 + + self.H = np.eye(4, 8, dtype=np.float32) + + # BoT-SORT-style scale-aware noise using width/height. + sigma_p = 0.05 + sigma_v = 0.00625 + sigma_m = 0.05 + + w, h = measurement[2], measurement[3] + + q_diag = np.array( + [ + (sigma_p * w) ** 2, + (sigma_p * h) ** 2, + (sigma_p * w) ** 2, + (sigma_p * h) ** 2, + (sigma_v * w) ** 2, + (sigma_v * h) ** 2, + (sigma_v * w) ** 2, + (sigma_v * h) ** 2, + ], + dtype=np.float32, + ) + self.Q = np.diag(q_diag) + + r_diag = np.array( + [ + (sigma_m * w) ** 2, + (sigma_m * h) ** 2, + (sigma_m * w) ** 2, + (sigma_m * h) ** 2, + ], + dtype=np.float32, + ) + self.R = np.diag(r_diag) + + # Initial covariance, as in original BoT-SORT KF + p_diag = np.array( + [ + (2 * sigma_p * w) ** 2, + (2 * sigma_p * h) ** 2, + (2 * sigma_p * w) ** 2, + (2 * sigma_p * h) ** 2, + (10 * sigma_v * w) ** 2, + (10 * sigma_v * h) ** 2, + (10 * sigma_v * w) ** 2, + (10 * sigma_v * h) ** 2, + ], + dtype=np.float32, + ) + self.P = np.diag(p_diag) + + def _update_process_and_measurement_noise(self) -> None: + """ + Recompute the process and measurement noise covariances from the current box + size. + + This method updates: + + Q: + Process noise covariance, used in the prediction step. + It models uncertainty in how the state changes from one frame to the next. + + R: + Measurement noise covariance, used in the update step. + It models uncertainty in the current detection measurement. + + Why this update is needed: + The scale of the uncertainty should depend on the current object size. + For example, a 2-pixel error is relatively more important for a small object + than for a large one. Therefore, the diagonal entries of Q and R are + computed from the current predicted width and height stored in the state. + + Implementation details: + - Width and height are read from the current state: + w = state[2], h = state[3] + - They are clamped to a small positive minimum to avoid zero or negative + values. + - The resulting Q and R matrices remain diagonal. + + Notes: + This method does not update P directly. It only refreshes the noise models + used later in `predict()` and `update()`. + """ + sigma_p = 0.05 + sigma_v = 0.00625 + sigma_m = 0.05 + + w = max(float(self.state[2, 0]), 1e-3) + h = max(float(self.state[3, 0]), 1e-3) + + q_diag = np.array( + [ + (sigma_p * w) ** 2, + (sigma_p * h) ** 2, + (sigma_p * w) ** 2, + (sigma_p * h) ** 2, + (sigma_v * w) ** 2, + (sigma_v * h) ** 2, + (sigma_v * w) ** 2, + (sigma_v * h) ** 2, + ], + dtype=np.float32, + ) + self.Q = np.diag(q_diag) + + r_diag = np.array( + [ + (sigma_m * w) ** 2, + (sigma_m * h) ** 2, + (sigma_m * w) ** 2, + (sigma_m * h) ** 2, + ], + dtype=np.float32, + ) + self.R = np.diag(r_diag) + + def predict(self) -> None: + """ + Predict the next state and covariance using the Kalman motion model. + + This method performs the Kalman filter prediction step: + + state <- F @ state + P <- F @ P @ F.T + Q + + where: + F: + State transition matrix. + P: + Current state covariance matrix. + Q: + Process noise covariance. + + Effect of the prediction: + - The center position and box size are advanced using their current + velocities. + - The covariance matrix P is propagated forward and increased by Q to + reflect additional uncertainty introduced during motion prediction. + + Additional behavior: + - The process and measurement noise matrices are refreshed first by calling + `_update_process_and_measurement_noise()`. + - Width and height are clamped to remain positive after prediction. + - `time_since_update` is incremented because this frame has not yet received + a measurement update. + + Notes: + This method does not use any detection input. It only extrapolates the track + state forward in time. + """ + self._update_process_and_measurement_noise() + + # Predict state + self.state = self.F @ self.state # type: ignore[assignment] + + # Predict error (uncertainty) covariance + self.P = self.F @ self.P @ self.F.T + self.Q # type: ignore[assignment] + + # Prevent degenerate box shape + self.state[2, 0] = max(self.state[2, 0], 1e-3) + self.state[3, 0] = max(self.state[3, 0], 1e-3) + + # Increase time since update + self.time_since_update += 1 + + def update(self, bbox: np.ndarray) -> None: + """ + Correct the predicted state using a new detection. + + Args: + bbox: + Detection bounding box in xyxy format: [x1, y1, x2, y2]. + + This method performs the Kalman filter correction/update step: + + measurement = xyxy_to_xywh(bbox) + S = H @ P @ H.T + R + K = P @ H.T @ inv(S) + y = measurement - H @ state + state = state + K @ y + P = (I - K @ H) @ P + + where: + measurement: + Observed bounding box converted to [xc, yc, w, h]. + S: + Innovation covariance. Represents uncertainty in the predicted + measurement. + K: + Kalman gain. Controls how strongly the state is corrected toward + the new measurement. + y: + Innovation (also called residual), i.e. the difference between the + observed measurement and the predicted measurement. + I: + Identity matrix of appropriate size. + + Effect of the update: + - The predicted state is corrected toward the observed detection. + - The covariance matrix P is reduced to reflect increased confidence + after receiving a measurement. + + Additional behavior: + - `time_since_update` is reset to zero. + - `number_of_successful_updates` is incremented. + - Width and height are clamped to remain positive after correction. + + Notes: + The measurement only directly observes [xc, yc, w, h], not the velocity + terms. However, the velocity estimates can still change indirectly through + the Kalman gain and the state covariance structure. + """ + self.time_since_update = 0 + self.number_of_successful_updates += 1 + + measurement = self.xyxy_to_xywh(bbox).reshape((4, 1)) + self._update_process_and_measurement_noise() + + # Kalman Gain + S = self.H @ self.P @ self.H.T + self.R + K = self.P @ self.H.T @ np.linalg.inv(S) + + # Innovation (residual) + y = measurement - self.H @ self.state + + # Update state + self.state = self.state + K @ y + + # Update covariance + identity_matrix = np.eye(8, dtype=np.float32) + self.P = (identity_matrix - K @ self.H) @ self.P # type: ignore[assignment] + + self.state[2, 0] = max(self.state[2, 0], 1e-3) + self.state[3, 0] = max(self.state[3, 0], 1e-3) + + def get_state_bbox(self) -> np.ndarray: + """ + Return current predicted box in xyxy format. + """ + return self.xywh_to_xyxy(self.state[0:4, 0]) diff --git a/trackers/core/botsort/tracker.py b/trackers/core/botsort/tracker.py new file mode 100644 index 00000000..696280c0 --- /dev/null +++ b/trackers/core/botsort/tracker.py @@ -0,0 +1,451 @@ +# ------------------------------------------------------------------------ +# Trackers +# Copyright (c) 2026 Roboflow. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 [see LICENSE for details] +# ------------------------------------------------------------------------ + +from copy import deepcopy +from typing import Literal, cast + +import numpy as np +import supervision as sv +from scipy.optimize import linear_sum_assignment + +from trackers.core.base import BaseTracker +from trackers.core.botsort.cmc import CMC, CMCConfig +from trackers.core.botsort.kalman_box_tracker import BoTSORTKalmanBoxTracker +from trackers.core.botsort.utils import ( + get_alive_trackers, + get_iou_matrix, +) + + +class BoTSORTTracker(BaseTracker): + """ + BoT-SORT-style multi-object tracker (IoU association + optional CMC). + + The tracker maintains a list of active tracks (Kalman-filter-based) and, for each + frame, performs: + 1) Predict existing track states (Kalman predict) + 2) Split detections into high/low confidence groups + 3) Apply camera motion compensation to predicted tracks + 4) Associate high-confidence detections to tracks (IoU + assignment) + 5) Associate low-confidence detections to remaining tracks + 6) Spawn new tracks from unmatched high-confidence detections + 7) Remove tracks that have been lost for too long + + Parameters in __init__ control thresholds and lifecycle logic similarly to + ByteTrack. + + Attributes: + tracks: List of active `BoTSORTKalmanBoxTracker` objects. + maximum_frames_without_update: Max number of consecutive frames a track can go + unmatched before being removed. + minimum_consecutive_frames: Track maturity threshold before assigning a + permanent ID. + minimum_iou_threshold_first_assoc: Minimum IoU required for a valid match + in the first association step + minimum_iou_threshold_second_assoc: Minimum IoU required for a valid match + in the second association step + track_activation_threshold: Confidence threshold for spawning a new track. + high_conf_det_threshold: Confidence threshold splitting detections into + high/low groups. + enable_cmc: Whether to run camera motion compensation each frame + (if `cmc` is set). + cmc: Camera motion compensation instance (or None if disabled). + """ + + tracker_id = "botsort" + + def __init__( + self, + lost_track_buffer: int = 30, + frame_rate: float = 30.0, + track_activation_threshold: float = 0.7, + minimum_consecutive_frames: int = 2, + minimum_iou_threshold_first_assoc: float = 0.2, + minimum_iou_threshold_second_assoc: float = 0.5, + high_conf_det_threshold: float = 0.6, + enable_cmc: bool = True, + cmc_method: Literal["orb", "sift", "sparseOptFlow", "ecc"] = "sparseOptFlow", + cmc_downscale: int = 2, + ) -> None: + """ + Initialize the tracker. + + Args: + lost_track_buffer: Time buffer (in frames at 30 FPS) for keeping lost tracks + alive before deletion. This is scaled by `frame_rate`. + frame_rate: Video frame rate used to scale the lost track buffer to + time-like behavior. + track_activation_threshold: Minimum detection confidence to spawn a new + track. + minimum_consecutive_frames: Number of successful updates required before + assigning a stable track ID (different than initial -1). + minimum_iou_threshold_first_assoc: Minimum IoU to accept a detection-track + association during the first association step. + minimum_iou_threshold_second_assoc: Minimum IoU to accept a detection-track + association during the second association step. + high_conf_det_threshold: Confidence threshold used to split detections into: + - high confidence: confidence >= threshold + - low confidence: confidence < threshold + enable_cmc: Whether to enable camera motion compensation (CMC). + cmc_method: CMC method string passed into `CMCConfig(method=...)`. + Supported values depend on `CMC` (e.g. "orb", "sift", "sparseOptFlow", + "ecc"). See CMCConfig. + cmc_downscale: Downscale factor used inside CMC for speed/robustness. + + Notes: + - `maximum_frames_without_update` is computed as: + int(frame_rate / 30.0 * lost_track_buffer) + to maintain consistent “seconds” worth of buffer across different FPS. + """ + # Calculate maximum frames without update based on lost_track_buffer and + # frame_rate. This scales the buffer based on the frame rate to ensure + # consistent time-based tracking across different frame rates. + self.maximum_frames_without_update = int(frame_rate / 30.0 * lost_track_buffer) + self.minimum_consecutive_frames = minimum_consecutive_frames + self.minimum_iou_threshold_first_assoc = minimum_iou_threshold_first_assoc + self.minimum_iou_threshold_second_assoc = minimum_iou_threshold_second_assoc + self.track_activation_threshold = track_activation_threshold + self.high_conf_det_threshold = high_conf_det_threshold + self.tracks: list[BoTSORTKalmanBoxTracker] = [] + + self.enable_cmc = enable_cmc + self.cmc = ( + CMC(CMCConfig(method=cmc_method, downscale=cmc_downscale)) + if enable_cmc + else None + ) + + def _update_detections( + self, + tracks: list[BoTSORTKalmanBoxTracker], + detections: sv.Detections, + updated_detections: list[sv.Detections], + matched_indices: list[tuple[int, int]], + ) -> list[sv.Detections]: + """ + Apply matched detection updates to tracks and append corresponding outputs. + + For each (track_idx, det_idx) match: + - Update the track's Kalman state with the detection bbox. + - If the track is “mature” (>= minimum_consecutive_frames) and still has + tracker_id == -1, assign a new unique tracker ID. + - Create a single-row `sv.Detections` object for the matched detection and set + its tracker_id to the track ID (or -1 if not mature yet). + - Append it to `updated_detections`. + + Args: + tracks: Tracks being updated. + detections: Detections used for update. + updated_detections: Accumulator list of per-detection outputs for this + frame. + matched_indices: List of (track_row_index, detection_col_index) pairs. + + Returns: + The same `updated_detections` list, returned for convenience. + """ + # Update matched tracks with assigned detections. + det_bboxes = detections.xyxy + for row, col in matched_indices: + t = tracks[row] + t.update(det_bboxes[col]) + # If tracker is mature but still has ID -1, assign a new ID + if ( + t.number_of_successful_updates >= self.minimum_consecutive_frames + and t.tracker_id == -1 + ): # Check maturity before assigning ID + t.tracker_id = BoTSORTKalmanBoxTracker.get_next_tracker_id() + + new_det = deepcopy(detections[col : col + 1]) + # Add cast to clarify type for mypy + new_det = cast(sv.Detections, new_det) # ADDED cast + new_det.tracker_id = np.array([t.tracker_id]) + updated_detections.append(new_det) + return updated_detections + + def update( # type: ignore[override] + self, + detections: sv.Detections, + frame: np.ndarray | None = None, + ) -> sv.Detections: + """ + Update the tracker with detections from the current frame. + + This is the main per-frame entry point. + + Args: + detections: Supervision detections for the current frame. Must include ` + .xyxy`. Confidence (`detections.confidence`) is optional but + recommended. The method writes/overwrites `detections.tracker_id`. + frame: Current video frame in BGR format (H, W, 3), required if CMC is + enabled. + + Returns: + A merged `sv.Detections` object containing detections from this frame with + `tracker_id` assigned: + - >= 0 indicates a confirmed track ID + - -1 indicates unconfirmed/untracked (e.g., new / low confidence / not yet + mature) + + Notes: + - If CMC is enabled, the tracker estimates a global affine transform (2x3) + from the frame and uses it to warp predicted track states before + association. + """ + if len(self.tracks) == 0 and len(detections) == 0: + detections.tracker_id = np.array([], dtype=int) + return detections + updated_detections: list[ + sv.Detections + ] = [] # List for returning the updated detections with its new assigned + # track id + + # Predict new locations for existing tracks + for tracker in self.tracks: + tracker.predict() + # Assign a default tracker_id with the correct shape + detections.tracker_id = -np.ones(len(detections)) + # Split into high confidence boxes and lower based on + # self.high_conf_det_threshold + high_prob_detections, low_prob_detections = ( + self._get_high_and_low_probability_detections(detections) + ) + + # CMC (ORB) apply to all predicted tracks before association + if self.enable_cmc and self.cmc is not None and frame is not None: + mask_boxes = ( + high_prob_detections.xyxy if len(high_prob_detections) > 0 else None + ) + H = self.cmc.estimate(frame, mask_boxes) + self.cmc.apply_to_tracks(self.tracks, H) + + # Step 1: first association, with high confidence boxes + matched_indices, unmatched_tracks, unmatched_high_prob_detections = ( + self._similarity_step( + high_prob_detections, + self.tracks, + self.minimum_iou_threshold_first_assoc, + ) + ) + + # Update matched tracks with high-confidence detections + self._update_detections( + self.tracks, + high_prob_detections, + updated_detections, + matched_indices, + ) + + remaining_tracks = [self.tracks[i] for i in unmatched_tracks] + + # Step 2: associate Low Probability detections with remaining tracks + matched_indices, unmatched_tracks, unmatched_detections = self._similarity_step( + low_prob_detections, + remaining_tracks, + self.minimum_iou_threshold_second_assoc, + ) + + # Update matched tracks with low-confidence detections + self._update_detections( + remaining_tracks, + low_prob_detections, + updated_detections, + matched_indices, + ) + + # Add unmatched low prob predictions to updated predictions + for det_index in unmatched_detections: + new_det = cast( + sv.Detections, + deepcopy(low_prob_detections[det_index : det_index + 1]), + ) + + new_det.tracker_id = np.array([-1]) + updated_detections.append(new_det) + + self._spawn_new_trackers( + high_prob_detections, + high_prob_detections.xyxy, + unmatched_high_prob_detections, + updated_detections, + ) + + # Kill lost tracks + self.tracks = get_alive_trackers( + trackers=self.tracks, + maximum_frames_without_update=self.maximum_frames_without_update, + minimum_consecutive_frames=self.minimum_consecutive_frames, + ) + final_updated_detections: sv.Detections = sv.Detections.merge( + updated_detections + ) + if len(final_updated_detections) == 0: + final_updated_detections.tracker_id = np.array([], dtype=int) + return final_updated_detections + + def _get_high_and_low_probability_detections( + self, detections: sv.Detections + ) -> tuple[sv.Detections, sv.Detections]: + """ + Split detections into high-confidence and low-confidence sets. + + Detections with confidence <= 0.1 are discarded completely and are not + used by the tracker. + + Rules: + high-confidence: + confidence >= self.high_conf_det_threshold + + low-confidence: + 0.1 < confidence < self.high_conf_det_threshold + + discarded: + confidence <= 0.1 + + Args: + detections: + Input detections containing confidence scores. + + Returns: + Tuple: + (high_confidence_detections, low_confidence_detections) + """ + + if detections.confidence is None: + # If no confidence information exists, treat all detections + # as high-confidence + return detections, cast(sv.Detections, detections[:0]) + + conf = detections.confidence + + high_mask = conf >= self.high_conf_det_threshold + low_mask = (conf > 0.1) & (conf < self.high_conf_det_threshold) + + high_confidence = cast(sv.Detections, detections[high_mask]) + low_confidence = cast(sv.Detections, detections[low_mask]) + + return high_confidence, low_confidence + + def _get_associated_indices( + self, + similarity_matrix: np.ndarray, + min_similarity_thresh: float, + ) -> tuple[list[tuple[int, int]], set[int], set[int]]: + """ + Associate detections to tracks based on Similarity (IoU) using the + Jonker-Volgenant algorithm approach with no initialization instead of the + Hungarian algorithm as mentioned in the SORT paper, but it solves the + assignment problem in an optimal way. + + Args: + similarity_matrix: Similarity matrix between tracks (rows) and detections + (columns). min_similarity_thresh: Minimum similarity threshold for a valid + match. + + Returns: + Matched indices (list of (tracker_idx, detection_idx)), indices of + unmatched tracks, indices of unmatched detections. + """ + matched_indices = [] + n_tracks, n_detections = similarity_matrix.shape + unmatched_tracks = set(range(n_tracks)) + unmatched_detections = set(range(n_detections)) + + if n_tracks > 0 and n_detections > 0: + row_indices, col_indices = linear_sum_assignment( + similarity_matrix, maximize=True + ) + for row, col in zip(row_indices, col_indices): + if similarity_matrix[row, col] >= min_similarity_thresh: + matched_indices.append((row, col)) + unmatched_tracks.remove(row) + unmatched_detections.remove(col) + + return matched_indices, unmatched_tracks, unmatched_detections + + def _spawn_new_trackers( + self, + detections: sv.Detections, + detection_boxes: np.ndarray, + unmatched_detections: set[int], + updated_detections: list[sv.Detections], + ): + """ + Create new trackers for unmatched detections and + append detections to updated_detections detections. + + Args: + detections: Current detections. + detection_boxes: Bounding boxes for detections. + unmatched_detections: Indices of unmatched detections. + updated_detections: List with all the detections + + """ + for detection_idx in unmatched_detections: + # Check for detections.confidence existence and index bounds + if detections.confidence is not None and detection_idx < len( + detections.confidence + ): + # Assign to a temporary variable with explicit type hint + confidence_score: float = float(detections.confidence[detection_idx]) + + # Use the temporary variable in the comparison + if confidence_score >= self.track_activation_threshold: + # Original logic for high confidence detection + + new_tracker = BoTSORTKalmanBoxTracker( + bbox=detection_boxes[detection_idx] + ) + self.tracks.append(new_tracker) + + new_det = deepcopy(detections[detection_idx : detection_idx + 1]) + new_det = cast(sv.Detections, new_det) # Cast added previously + new_det.tracker_id = np.array([-1]) + updated_detections.append(new_det) + else: + pass # Do nothing, the detection remains unmatched + + def _similarity_step( + self, + detections: sv.Detections, + tracks: list[BoTSORTKalmanBoxTracker], + thresh: float, + ) -> tuple[list[tuple[int, int]], set[int], set[int]]: + """Measures similarity based on IoU between tracks and detections and returns + the matches and unmatched tracks/detections. Is used for step 1 and 2 of the + BYTE algorithm. + + Args: + detections: The set of object detections. + tracks: The list of tracks that will be matched to the detections. + thresh: Minimum IoU required for a valid match. + + Returns: + A tuple containing: + - matched_indices: A list of (tracker_idx, detection_idx) pairs. + - unmatched_tracks_indices: A set of indices for tracks that + were not matched. + - unmatched_detections_indices: A set of indices for detections + that were not matched. + """ + # Build IoU cost matrix between detections and predicted bounding boxes + similarity_matrix = get_iou_matrix(tracks, detections.xyxy) + + # Associate detections to tracks based on the higher value of the + # similarity matrix, using the Jonker-Volgenant algorithm + # (linear_sum_assignment). + matched_indices, unmatched_tracks, unmatched_detections = ( + self._get_associated_indices(similarity_matrix, thresh) + ) + return matched_indices, unmatched_tracks, unmatched_detections + + def reset(self) -> None: + """Reset tracker state by clearing all tracks and resetting ID counter. + Call this method when switching to a new video or scene. + """ + self.tracks = [] + BoTSORTKalmanBoxTracker.count_id = 0 + if self.cmc is not None: + self.cmc.reset() diff --git a/trackers/core/botsort/utils.py b/trackers/core/botsort/utils.py new file mode 100644 index 00000000..3f4fdf23 --- /dev/null +++ b/trackers/core/botsort/utils.py @@ -0,0 +1,151 @@ +# ------------------------------------------------------------------------ +# Trackers +# Copyright (c) 2026 Roboflow. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 [see LICENSE for details] +# ------------------------------------------------------------------------ + +from collections.abc import Sequence +from copy import deepcopy +from typing import TypeVar + +import numpy as np +import supervision as sv + +from trackers.core.botsort.kalman_box_tracker import BoTSORTKalmanBoxTracker + +KalmanBoxTrackerType = TypeVar("KalmanBoxTrackerType", bound=BoTSORTKalmanBoxTracker) + +BoTSORTKalmanBoxTracker + + +def get_alive_trackers( + trackers: Sequence[KalmanBoxTrackerType], + minimum_consecutive_frames: int, + maximum_frames_without_update: int, +) -> list[KalmanBoxTrackerType]: + """ + Remove dead or immature lost tracklets and get alive trackers + that are within `maximum_frames_without_update` AND (it's mature OR + it was just updated). + + Args: + trackers: List of KalmanBoxTracker objects. + minimum_consecutive_frames: Number of consecutive frames that an object + must be tracked before it is considered a 'valid' track. + maximum_frames_without_update: Maximum number of frames without update + before a track is considered dead. + + Returns: + List of alive trackers. + """ + alive_trackers = [] + for tracker in trackers: + is_mature = tracker.number_of_successful_updates >= minimum_consecutive_frames + is_active = tracker.time_since_update == 0 + if tracker.time_since_update < maximum_frames_without_update and ( + is_mature or is_active + ): + alive_trackers.append(tracker) + return alive_trackers + + +def get_iou_matrix( + trackers: Sequence[KalmanBoxTrackerType], detection_boxes: np.ndarray +) -> np.ndarray: + """ + Build IOU cost matrix between detections and predicted bounding boxes + + Args: + trackers: List of KalmanBoxTracker objects. + detection_boxes: Detected bounding boxes in the + form [x1, y1, x2, y2]. + + Returns: + IOU cost matrix. + """ + predicted_boxes = np.array([t.get_state_bbox() for t in trackers]) + if len(predicted_boxes) == 0 and len(trackers) > 0: + # Handle case where get_state_bbox might return empty array + predicted_boxes = np.zeros((len(trackers), 4), dtype=np.float32) + + if len(trackers) > 0 and len(detection_boxes) > 0: + iou_matrix = sv.box_iou_batch(predicted_boxes, detection_boxes) + else: + iou_matrix = np.zeros((len(trackers), len(detection_boxes)), dtype=np.float32) + + return iou_matrix + + +def update_detections_with_track_ids( + trackers: Sequence[KalmanBoxTrackerType], + detections: sv.Detections, + detection_boxes: np.ndarray, + minimum_iou_threshold: float, + minimum_consecutive_frames: int, +) -> sv.Detections: + """ + The function prepares the updated Detections with track IDs. + If a tracker is "mature" (>= `minimum_consecutive_frames`) or recently updated, + it is assigned an ID to the detection that just updated it. + + Args: + trackers: List of BoTSORTKalmanBoxTracker objects. + detections: The latest set of object detections. + detection_boxes: Detected bounding boxes in the + form [x1, y1, x2, y2]. + minimum_iou_threshold: IOU threshold for associating detections to + existing tracks. + minimum_consecutive_frames: Number of consecutive frames that an object + must be tracked before it is considered a 'valid' track. + + Returns: + A copy of the detections with `tracker_id` set + for each detection that is tracked. + """ + # Re-run association in the same way (could also store direct mapping) + final_tracker_ids = [-1] * len(detection_boxes) + + # Recalculate predicted_boxes based on current trackers after some may have + # been removed + predicted_boxes = np.array([t.get_state_bbox() for t in trackers]) + iou_matrix_final = np.zeros((len(trackers), len(detection_boxes)), dtype=np.float32) + + # Ensure predicted_boxes is properly shaped before the second iou calculation + if len(predicted_boxes) == 0 and len(trackers) > 0: + predicted_boxes = np.zeros((len(trackers), 4), dtype=np.float32) + + if len(trackers) > 0 and len(detection_boxes) > 0: + iou_matrix_final = sv.box_iou_batch(predicted_boxes, detection_boxes) + + row_indices, col_indices = np.where(iou_matrix_final > minimum_iou_threshold) + sorted_pairs = sorted( + zip(row_indices, col_indices), + key=lambda x: iou_matrix_final[x[0], x[1]], + reverse=True, + ) + used_rows: set[int] = set() + used_cols: set[int] = set() + for row, col in sorted_pairs: + # Double check index is in range + if row < len(trackers): + tracker_obj = trackers[int(row)] + # Only assign if the track is "mature" or is new but has enough hits + if (int(row) not in used_rows) and (int(col) not in used_cols): + if ( + tracker_obj.number_of_successful_updates + >= minimum_consecutive_frames + ): + # If tracker is mature but still has ID -1, assign a new ID + if tracker_obj.tracker_id == -1: + tracker_obj.tracker_id = ( + BoTSORTKalmanBoxTracker.get_next_tracker_id() + ) + final_tracker_ids[int(col)] = tracker_obj.tracker_id + used_rows.add(int(row)) + used_cols.add(int(col)) + + # Assign tracker IDs to the returned Detections + updated_detections = deepcopy(detections) + updated_detections.tracker_id = np.array(final_tracker_ids) + + return updated_detections