From ad311f8e2ffe3a1a158f053b8e25bde2f824a0b0 Mon Sep 17 00:00:00 2001 From: dentiny Date: Tue, 7 Apr 2026 03:13:54 +0000 Subject: [PATCH 1/5] feat(metrics): Add regression test for metrics recording --- core/layers/observe-metrics-common/src/lib.rs | 462 ++++++++++++++++++ 1 file changed, 462 insertions(+) diff --git a/core/layers/observe-metrics-common/src/lib.rs b/core/layers/observe-metrics-common/src/lib.rs index ba9c6c781356..2353ccc02423 100644 --- a/core/layers/observe-metrics-common/src/lib.rs +++ b/core/layers/observe-metrics-common/src/lib.rs @@ -1150,3 +1150,465 @@ impl oio::Delete for MetricsWrapper { }) } } + +#[cfg(test)] +mod tests { + use std::sync::Mutex; + use std::time::Instant; + + use futures::StreamExt; + use futures::stream; + + use super::*; + + #[derive(Debug, Clone, Default)] + struct MockInterceptor { + observations: Arc>>, + } + + impl MetricsIntercept for MockInterceptor { + fn observe(&self, labels: MetricLabels, value: MetricValue) { + self.observations.lock().unwrap().push((labels, value)); + } + } + + impl MockInterceptor { + fn has_metric(&self, name: &str) -> bool { + self.observations + .lock() + .unwrap() + .iter() + .any(|(_, v)| v.name() == name) + } + + fn count_metric(&self, name: &str) -> usize { + self.observations + .lock() + .unwrap() + .iter() + .filter(|(_, v)| v.name() == name) + .count() + } + + fn count_metric_with_status(&self, name: &str, code: StatusCode) -> usize { + self.observations + .lock() + .unwrap() + .iter() + .filter(|(l, v)| v.name() == name && l.status_code == Some(code)) + .count() + } + + fn gauge_value(&self, name: &str) -> isize { + self.observations + .lock() + .unwrap() + .iter() + .filter_map(|(_, v)| match v { + MetricValue::HttpExecuting(n) if v.name() == name => Some(*n), + MetricValue::OperationExecuting(n) if v.name() == name => Some(*n), + _ => None, + }) + .sum() + } + + fn get_duration_seconds(&self, name: &str) -> Option { + self.observations.lock().unwrap().iter().find_map(|(_, v)| { + if v.name() != name { + return None; + } + match v { + MetricValue::HttpRequestDurationSeconds(d) + | MetricValue::HttpResponseDurationSeconds(d) + | MetricValue::OperationDurationSeconds(d) + | MetricValue::OperationTtfbSeconds(d) => Some(*d), + _ => None, + } + }) + } + + fn get_value_u64(&self, name: &str) -> Option { + self.observations.lock().unwrap().iter().find_map(|(_, v)| { + if v.name() != name { + return None; + } + match v { + MetricValue::HttpResponseBytes(n) + | MetricValue::HttpRequestBytes(n) + | MetricValue::OperationBytes(n) + | MetricValue::OperationEntries(n) => Some(*n), + _ => None, + } + }) + } + } + + fn test_info() -> Arc { + Arc::new(AccessorInfo::default()) + } + + fn test_labels() -> MetricLabels { + MetricLabels::new(test_info(), Operation::Read.into_static()) + } + + enum MockFetchBehavior { + /// Return a response with the given status code and an empty body. + Respond(StatusCode), + /// Fail before receiving a response. + ConnectionError, + /// Return HTTP 200 with a body stream that yields the given items. + BodyError(Vec>), + } + + struct MockHttpFetch { + behavior: Mutex, + } + + impl HttpFetch for MockHttpFetch { + async fn fetch(&self, _req: http::Request) -> Result> { + let behavior = std::mem::replace( + &mut *self.behavior.lock().unwrap(), + MockFetchBehavior::ConnectionError, + ); + match behavior { + MockFetchBehavior::Respond(status) => { + let body = HttpBody::new(stream::empty(), None); + let resp = http::Response::builder().status(status).body(body).unwrap(); + Ok(resp) + } + MockFetchBehavior::ConnectionError => { + Err(Error::new(ErrorKind::Unexpected, "mock connection refused")) + } + MockFetchBehavior::BodyError(items) => { + let body = HttpBody::new(stream::iter(items), None); + let resp = http::Response::builder() + .status(StatusCode::OK) + .body(body) + .unwrap(); + Ok(resp) + } + } + } + } + + fn build_metrics_http_fetcher( + mock: MockInterceptor, + behavior: MockFetchBehavior, + ) -> MetricsHttpFetcher { + let inner_fetch = MockHttpFetch { + behavior: Mutex::new(behavior), + }; + MetricsHttpFetcher { + inner: Arc::new(inner_fetch) as HttpFetcher, + info: test_info(), + interceptor: mock, + } + } + + fn build_http_request() -> http::Request { + let mut req = http::Request::new(Buffer::new()); + *req.uri_mut() = "https://example.com/test".parse().unwrap(); + req.extensions_mut().insert(Operation::Read); + req + } + + #[tokio::test] + async fn test_http_client_error_records_status_error() { + let mock = MockInterceptor::default(); + let fetcher = build_metrics_http_fetcher( + mock.clone(), + MockFetchBehavior::Respond(StatusCode::NOT_FOUND), + ); + + let _ = fetcher.fetch(build_http_request()).await; + + assert_eq!( + mock.count_metric_with_status( + "opendal_http_status_errors_total", + StatusCode::NOT_FOUND + ), + 1 + ); + assert_eq!(mock.gauge_value("opendal_http_executing"), 0); + } + + #[tokio::test] + async fn test_http_server_error_records_status_error() { + let mock = MockInterceptor::default(); + let fetcher = build_metrics_http_fetcher( + mock.clone(), + MockFetchBehavior::Respond(StatusCode::INTERNAL_SERVER_ERROR), + ); + + let _ = fetcher.fetch(build_http_request()).await; + + assert_eq!( + mock.count_metric_with_status( + "opendal_http_status_errors_total", + StatusCode::INTERNAL_SERVER_ERROR + ), + 1 + ); + assert_eq!(mock.gauge_value("opendal_http_executing"), 0); + } + + #[tokio::test] + async fn test_http_success_records_full_metrics() { + let mock = MockInterceptor::default(); + let fetcher = build_metrics_http_fetcher( + mock.clone(), + MockFetchBehavior::BodyError(vec![ + Ok(Buffer::from("hello")), + Ok(Buffer::from(" world")), + ]), + ); + + let resp = fetcher.fetch(build_http_request()).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let (_, mut body) = resp.into_parts(); + let buf = body.to_buffer().await.unwrap(); + assert_eq!(buf.len(), 11); + drop(body); + + assert_eq!(mock.count_metric("opendal_http_status_errors_total"), 0); + assert_eq!(mock.count_metric("opendal_http_connection_errors_total"), 0); + assert_eq!(mock.get_value_u64("opendal_http_request_bytes"), Some(0)); + assert!( + mock.get_duration_seconds("opendal_http_request_duration_seconds") + .unwrap() + > Duration::ZERO + ); + assert_eq!(mock.get_value_u64("opendal_http_response_bytes"), Some(11)); + assert!( + mock.get_duration_seconds("opendal_http_response_duration_seconds") + .unwrap() + > Duration::ZERO + ); + assert_eq!(mock.gauge_value("opendal_http_executing"), 0); + } + + #[tokio::test] + async fn test_http_connection_error_records_connection_error() { + let mock = MockInterceptor::default(); + let fetcher = build_metrics_http_fetcher(mock.clone(), MockFetchBehavior::ConnectionError); + + let res = fetcher.fetch(build_http_request()).await; + assert!(res.is_err()); + + assert_eq!(mock.count_metric("opendal_http_connection_errors_total"), 1); + assert_eq!(mock.count_metric("opendal_http_status_errors_total"), 0); + assert_eq!(mock.gauge_value("opendal_http_executing"), 0); + } + + #[tokio::test] + async fn test_http_body_read_failure_records_metrics() { + let mock = MockInterceptor::default(); + let fetcher = build_metrics_http_fetcher( + mock.clone(), + MockFetchBehavior::BodyError(vec![ + Ok(Buffer::from("partial")), + Err(Error::new(ErrorKind::Unexpected, "connection reset")), + ]), + ); + + let resp = fetcher.fetch(build_http_request()).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let (_, mut body) = resp.into_parts(); + let res = body.to_buffer().await; + assert!(res.is_err()); + drop(body); + + assert_eq!(mock.get_value_u64("opendal_http_response_bytes"), Some(7)); + assert_eq!(mock.count_metric("opendal_http_connection_errors_total"), 1); + assert_eq!(mock.gauge_value("opendal_http_executing"), 0); + } + + #[tokio::test] + async fn test_stream_completed_records_response_metrics() { + let mock = MockInterceptor::default(); + let labels = test_labels(); + mock.observe(labels.clone(), MetricValue::HttpExecuting(1)); + let inner = stream::iter(vec![Ok(Buffer::from("hello"))]); + + let mut s = MetricsStream { + inner, + interceptor: mock.clone(), + labels, + size: 0, + start: Instant::now(), + succeeded: false, + }; + + while s.next().await.is_some() {} + drop(s); + + assert_eq!(mock.get_value_u64("opendal_http_response_bytes"), Some(5)); + assert_eq!(mock.gauge_value("opendal_http_executing"), 0); + assert_eq!(mock.count_metric("opendal_http_connection_errors_total"), 0); + } + + #[tokio::test] + async fn test_stream_dropped_early_records_response_metrics() { + let mock = MockInterceptor::default(); + let labels = test_labels(); + mock.observe(labels.clone(), MetricValue::HttpExecuting(1)); + let inner = stream::iter(vec![Ok(Buffer::from("chunk1")), Ok(Buffer::from("chunk2"))]); + + let mut s = MetricsStream { + inner, + interceptor: mock.clone(), + labels, + size: 0, + start: Instant::now(), + succeeded: false, + }; + + let _ = s.next().await; + drop(s); + + assert_eq!(mock.get_value_u64("opendal_http_response_bytes"), Some(6)); + assert_eq!(mock.gauge_value("opendal_http_executing"), 0); + assert_eq!(mock.count_metric("opendal_http_connection_errors_total"), 1); + } + + #[tokio::test] + async fn test_stream_error_records_response_metrics() { + let mock = MockInterceptor::default(); + let labels = test_labels(); + mock.observe(labels.clone(), MetricValue::HttpExecuting(1)); + let inner = stream::iter(vec![ + Ok(Buffer::from("data")), + Err(Error::new(ErrorKind::Unexpected, "read error")), + ]); + + let mut s = MetricsStream { + inner, + interceptor: mock.clone(), + labels, + size: 0, + start: Instant::now(), + succeeded: false, + }; + + while let Some(res) = s.next().await { + if res.is_err() { + break; + } + } + drop(s); + + assert_eq!(mock.get_value_u64("opendal_http_response_bytes"), Some(4)); + assert_eq!(mock.gauge_value("opendal_http_executing"), 0); + assert_eq!(mock.count_metric("opendal_http_connection_errors_total"), 1); + } + + #[test] + fn test_http_guard_cancelled_records_connection_error() { + let mock = MockInterceptor::default(); + let labels = test_labels(); + mock.observe(labels.clone(), MetricValue::HttpExecuting(1)); + let guard = ExecutingGuard::new_http(mock.clone(), labels, Instant::now()); + drop(guard); + + assert_eq!(mock.count_metric("opendal_http_connection_errors_total"), 1); + assert!(mock.has_metric("opendal_http_request_duration_seconds")); + assert_eq!(mock.gauge_value("opendal_http_executing"), 0); + } + + #[test] + fn test_http_guard_completed_only_decrements_executing() { + let mock = MockInterceptor::default(); + let labels = test_labels(); + mock.observe(labels.clone(), MetricValue::HttpExecuting(1)); + let mut guard = ExecutingGuard::new_http(mock.clone(), labels, Instant::now()); + guard.complete(); + drop(guard); + + assert_eq!(mock.count_metric("opendal_http_connection_errors_total"), 0); + assert_eq!(mock.gauge_value("opendal_http_executing"), 0); + } + + #[test] + fn test_http_guard_defused_records_nothing() { + let mock = MockInterceptor::default(); + let labels = test_labels(); + mock.observe(labels.clone(), MetricValue::HttpExecuting(1)); + let mut guard = ExecutingGuard::new_http(mock.clone(), labels, Instant::now()); + guard.complete(); + guard.defuse(); + drop(guard); + + assert_eq!(mock.gauge_value("opendal_http_executing"), 1); + assert_eq!(mock.count_metric("opendal_http_connection_errors_total"), 0); + } + + #[test] + fn test_operation_guard_cancelled_records_operation_error() { + let mock = MockInterceptor::default(); + let labels = test_labels(); + mock.observe(labels.clone(), MetricValue::OperationExecuting(1)); + let guard = ExecutingGuard::new_operation(mock.clone(), labels, Instant::now()); + drop(guard); + + assert_eq!(mock.count_metric("opendal_operation_errors_total"), 1); + assert!(mock.has_metric("opendal_operation_duration_seconds")); + assert_eq!(mock.gauge_value("opendal_operation_executing"), 0); + } + + #[test] + fn test_operation_guard_completed_only_decrements_executing() { + let mock = MockInterceptor::default(); + let labels = test_labels(); + mock.observe(labels.clone(), MetricValue::OperationExecuting(1)); + let mut guard = ExecutingGuard::new_operation(mock.clone(), labels, Instant::now()); + guard.complete(); + drop(guard); + + assert_eq!(mock.count_metric("opendal_operation_errors_total"), 0); + assert_eq!(mock.gauge_value("opendal_operation_executing"), 0); + } + + #[test] + fn test_list_wrapper_drop_records_operation_entries() { + let mock = MockInterceptor::default(); + let labels = MetricLabels::new(test_info(), Operation::List.into_static()); + mock.observe(labels.clone(), MetricValue::OperationExecuting(1)); + + let mut wrapper = MetricsWrapper { + inner: (), + interceptor: mock.clone(), + labels, + start: Instant::now(), + size: 0, + }; + wrapper.size = 42; + drop(wrapper); + + assert_eq!(mock.get_value_u64("opendal_operation_entries"), Some(42)); + assert!(mock.has_metric("opendal_operation_entries_rate")); + assert_eq!(mock.get_value_u64("opendal_operation_bytes"), None); + assert_eq!(mock.gauge_value("opendal_operation_executing"), 0); + } + + #[test] + #[ignore = "requires fix from PR #7357: MetricsWrapper should emit OperationErrorsTotal on cancellation"] + fn test_wrapper_cancelled_records_operation_error() { + let mock = MockInterceptor::default(); + let labels = MetricLabels::new(test_info(), Operation::Read.into_static()); + mock.observe(labels.clone(), MetricValue::OperationExecuting(1)); + + let wrapper = MetricsWrapper { + inner: (), + interceptor: mock.clone(), + labels, + start: Instant::now(), + size: 0, + }; + drop(wrapper); + + assert_eq!(mock.count_metric("opendal_operation_errors_total"), 1); + assert_eq!(mock.gauge_value("opendal_operation_executing"), 0); + } +} From d659fcf2de99e67ca30d6863ab8d587f8bda140b Mon Sep 17 00:00:00 2001 From: dentiny Date: Tue, 7 Apr 2026 03:24:13 +0000 Subject: [PATCH 2/5] naming --- core/layers/observe-metrics-common/src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/layers/observe-metrics-common/src/lib.rs b/core/layers/observe-metrics-common/src/lib.rs index 2353ccc02423..55eb8cc58f29 100644 --- a/core/layers/observe-metrics-common/src/lib.rs +++ b/core/layers/observe-metrics-common/src/lib.rs @@ -1257,7 +1257,7 @@ mod tests { /// Fail before receiving a response. ConnectionError, /// Return HTTP 200 with a body stream that yields the given items. - BodyError(Vec>), + StreamBody(Vec>), } struct MockHttpFetch { @@ -1279,7 +1279,7 @@ mod tests { MockFetchBehavior::ConnectionError => { Err(Error::new(ErrorKind::Unexpected, "mock connection refused")) } - MockFetchBehavior::BodyError(items) => { + MockFetchBehavior::StreamBody(items) => { let body = HttpBody::new(stream::iter(items), None); let resp = http::Response::builder() .status(StatusCode::OK) @@ -1357,7 +1357,7 @@ mod tests { let mock = MockInterceptor::default(); let fetcher = build_metrics_http_fetcher( mock.clone(), - MockFetchBehavior::BodyError(vec![ + MockFetchBehavior::StreamBody(vec![ Ok(Buffer::from("hello")), Ok(Buffer::from(" world")), ]), @@ -1406,7 +1406,7 @@ mod tests { let mock = MockInterceptor::default(); let fetcher = build_metrics_http_fetcher( mock.clone(), - MockFetchBehavior::BodyError(vec![ + MockFetchBehavior::StreamBody(vec![ Ok(Buffer::from("partial")), Err(Error::new(ErrorKind::Unexpected, "connection reset")), ]), From 43d71d7a684468041182d0f0d13e60c2f222f375 Mon Sep 17 00:00:00 2001 From: dentiny Date: Tue, 7 Apr 2026 03:44:08 +0000 Subject: [PATCH 3/5] deps --- core/Cargo.lock | 1 + core/layers/observe-metrics-common/Cargo.toml | 3 +++ 2 files changed, 4 insertions(+) diff --git a/core/Cargo.lock b/core/Cargo.lock index 18c2011cd1ae..d5c8a054cd8d 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -6235,6 +6235,7 @@ dependencies = [ "futures", "http 1.4.0", "opendal-core", + "tokio", ] [[package]] diff --git a/core/layers/observe-metrics-common/Cargo.toml b/core/layers/observe-metrics-common/Cargo.toml index 2f7744fb7873..51a711799e69 100644 --- a/core/layers/observe-metrics-common/Cargo.toml +++ b/core/layers/observe-metrics-common/Cargo.toml @@ -34,3 +34,6 @@ all-features = true futures = { workspace = true } http = { workspace = true } opendal-core = { path = "../../core", version = "0.55.0", default-features = false } + +[dev-dependencies] +tokio = { workspace = true, features = ["macros"] } From 1005d0453a0bf4ae7c9d69618c011c17d35f0310 Mon Sep 17 00:00:00 2001 From: dentiny Date: Wed, 8 Apr 2026 18:38:04 +0000 Subject: [PATCH 4/5] sync latest main fix --- core/layers/observe-metrics-common/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/core/layers/observe-metrics-common/src/lib.rs b/core/layers/observe-metrics-common/src/lib.rs index 840f526020bf..e22fd9c2a05e 100644 --- a/core/layers/observe-metrics-common/src/lib.rs +++ b/core/layers/observe-metrics-common/src/lib.rs @@ -1598,7 +1598,6 @@ mod tests { } #[test] - #[ignore = "requires fix from PR #7357: MetricsWrapper should emit OperationErrorsTotal on cancellation"] fn test_wrapper_cancelled_records_operation_error() { let mock = MockInterceptor::default(); let labels = MetricLabels::new(test_info(), Operation::Read.into_static()); From 523228c7c3169a3376b407f9e59d9aa1da2e15a3 Mon Sep 17 00:00:00 2001 From: dentiny Date: Wed, 8 Apr 2026 18:59:22 +0000 Subject: [PATCH 5/5] fix api change --- core/layers/observe-metrics-common/src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/layers/observe-metrics-common/src/lib.rs b/core/layers/observe-metrics-common/src/lib.rs index e22fd9c2a05e..285999c519cc 100644 --- a/core/layers/observe-metrics-common/src/lib.rs +++ b/core/layers/observe-metrics-common/src/lib.rs @@ -1587,6 +1587,7 @@ mod tests { labels, start: Instant::now(), size: 0, + completed: true, }; wrapper.size = 42; drop(wrapper); @@ -1609,6 +1610,7 @@ mod tests { labels, start: Instant::now(), size: 0, + completed: false, }; drop(wrapper);