diff --git a/src/gax-internal/src/observability.rs b/src/gax-internal/src/observability.rs index 25b8e152ca..c0192acf4f 100644 --- a/src/gax-internal/src/observability.rs +++ b/src/gax-internal/src/observability.rs @@ -43,6 +43,5 @@ mod client_signals; #[cfg(google_cloud_unstable_tracing)] pub use client_signals::{ - ClientRequestAttributes, DurationMetric, RequestRecorder, WithClientLogging, WithClientMetric, - WithClientSpan, + ClientRequestAttributes, DurationMetric, RequestRecorder, WithClientMetric, WithClientSpan, }; diff --git a/src/gax-internal/src/observability/client_signals.rs b/src/gax-internal/src/observability/client_signals.rs index 17b15c3dd7..68ac427c6f 100644 --- a/src/gax-internal/src/observability/client_signals.rs +++ b/src/gax-internal/src/observability/client_signals.rs @@ -20,7 +20,7 @@ mod with_client_span; pub use duration_metric::DurationMetric; pub use recorder::{ClientRequestAttributes, RequestRecorder}; -pub use with_client_logging::WithClientLogging; + pub use with_client_metric::WithClientMetric; pub use with_client_span::WithClientSpan; @@ -101,7 +101,7 @@ macro_rules! client_request_signals { span.clone(), $crate::observability::WithClientMetric::new( $metric, - $crate::observability::WithClientLogging::new($inner), + $crate::with_client_logging!($inner), ), )) .instrument(span.clone()); @@ -112,7 +112,7 @@ macro_rules! client_request_signals { #[cfg(test)] mod tests { use super::duration_metric::BOUNDARIES; - use super::with_client_logging::{NAME, TARGET}; + use super::{ClientRequestAttributes, RequestRecorder}; use crate::observability::DurationMetric; use crate::options::InstrumentationClientInfo; @@ -288,11 +288,21 @@ mod tests { let captured = signals.logs_exporter.get_emitted_logs()?; let record = captured .iter() - .find(|r| r.record.target().is_some_and(|v| v == TARGET)) - .unwrap_or_else(|| panic!("missing log for target {TARGET} in {captured:#?}")); + .find(|r| { + r.record + .target() + .is_some_and(|v| v == env!("CARGO_PKG_NAME")) + }) + .unwrap_or_else(|| { + panic!( + "missing log for target {} in {captured:#?}", + env!("CARGO_PKG_NAME") + ) + }); check_log_record( &record.record, trace_id, + env!("CARGO_PKG_NAME"), &[ ("gcp.client.version", "1.2.3"), ("gcp.client.repo", "googleapis/google-cloud-rust"), @@ -459,6 +469,7 @@ mod tests { pub fn check_log_record( record: &SdkLogRecord, trace_id: TraceId, + expected_target: &str, extra_attributes: &[(&'static str, &str)], ) { fn format_value(any: &AnyValue) -> String { @@ -470,10 +481,9 @@ mod tests { _ => "unexpected AnyValue variant".to_string(), } } - assert_eq!(record.event_name(), Some(NAME), "{record:?}"); assert_eq!( record.target().map(|s| s.as_ref()), - Some(TARGET), + Some(expected_target), "{record:?}" ); assert_eq!(record.severity_text(), Some("WARN"), "{record:?}"); diff --git a/src/gax-internal/src/observability/client_signals/with_client_logging.rs b/src/gax-internal/src/observability/client_signals/with_client_logging.rs index 0cf6235244..0175520007 100644 --- a/src/gax-internal/src/observability/client_signals/with_client_logging.rs +++ b/src/gax-internal/src/observability/client_signals/with_client_logging.rs @@ -16,29 +16,9 @@ //! //! This is a private module, it is not exposed in the public API. -use super::RequestRecorder; - -use crate::observability::attributes::keys::{ - ERROR_TYPE, GCP_CLIENT_ARTIFACT, GCP_CLIENT_REPO, GCP_CLIENT_SERVICE, GCP_CLIENT_VERSION, - GCP_ERRORS_DOMAIN, GCP_ERRORS_METADATA, HTTP_REQUEST_METHOD, HTTP_REQUEST_RESEND_COUNT, - RPC_RESPONSE_STATUS_CODE, RPC_SERVICE, RPC_SYSTEM_NAME, SERVER_ADDRESS, SERVER_PORT, URL_FULL, -}; -use crate::observability::errors::ErrorType; -use google_cloud_gax::error::Error; -use opentelemetry_semantic_conventions::attribute::{RPC_METHOD, URL_DOMAIN, URL_TEMPLATE}; -use pin_project::pin_project; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - -// A tentative name for the error logs. -pub const NAME: &str = "experimental.client.request.error"; -// A tentative target for the error logs. -pub const TARGET: &str = "experimental.client.request"; - -/// A future instrumented to generate the client request logs. +/// A macro instrumented to generate the client request logs natively within the generated crates. /// -/// Decorates the `F` future, which represents a pending client request, +/// Decorates the `inner` future, which represents a pending client request, /// to emit the error logs. Typically this is used in the tracing layer: /// /// ```ignore @@ -50,89 +30,70 @@ pub const TARGET: &str = "experimental.client.request"; /// req: crate::model::EchoRequest, /// options: crate::RequestOptions, /// ) -> Result> { -/// use google_cloud_gax_internal::observability::client_signals::WithClientLogging; /// let pending = self.inner.echo(req, options); -/// WithClientLogging::new(pending).await +/// google_cloud_gax_internal::with_client_logging!(pending).await /// } /// # } /// ``` /// -#[must_use = "futures do nothing unless you `.await` or poll them"] -#[pin_project] -pub struct WithClientLogging { - #[pin] - inner: F, -} - -impl WithClientLogging -where - F: Future>, -{ - pub fn new(inner: F) -> Self { - Self { inner } - } -} +#[macro_export] +macro_rules! with_client_logging { + ($inner:expr) => {{ + let inner_future = $inner; + async move { + let output = inner_future.await; + if let Some(snapshot) = + $crate::observability::RequestRecorder::current().map(|r| r.client_snapshot()) + { + if let Err(error) = &output { + let gax_error: &google_cloud_gax::error::Error = error; + let rpc_status_code = gax_error.status().map(|s| s.code.name()); + let error_type = $crate::observability::errors::ErrorType::from_gax_error(gax_error); + let error_info = gax_error.status().and_then(|s| { + s.details.iter().find_map(|d| match d { + google_cloud_gax::error::rpc::StatusDetails::ErrorInfo(i) => Some(i), + _ => None, + }) + }); + let error_domain = error_info.map(|i| i.domain.as_str()); + let error_metadata = error_info.and_then(|i| { + if i.metadata.is_empty() { + None + } else { + serde_json::to_string(&i.metadata).ok() + } + }); -impl Future for WithClientLogging -where - F: Future>, -{ - type Output = ::Output; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - let output = futures::ready!(this.inner.poll(cx)); - let Some(snapshot) = RequestRecorder::current().map(|r| r.client_snapshot()) else { - return Poll::Ready(output); - }; - match &output { - Ok(_) => (), - Err(error) => { - let rpc_status_code = error.status().map(|s| s.code.name()); - let error_type = ErrorType::from_gax_error(error); - let error_info = error.status().and_then(|s| { - s.details.iter().find_map(|d| match d { - google_cloud_gax::error::rpc::StatusDetails::ErrorInfo(i) => Some(i), - _ => None, - }) - }); - let error_domain = error_info.map(|i| i.domain.as_str()); - let error_metadata = error_info.and_then(|i| { - if i.metadata.is_empty() { - None - } else { - serde_json::to_string(&i.metadata).ok() - } - }); - - // TODO(#4795) - use the correct name and target - tracing::event!( - name: NAME, - target: TARGET, - tracing::Level::WARN, - { RPC_SYSTEM_NAME } = snapshot.rpc_system(), - { RPC_SERVICE } = snapshot.service_name(), - { RPC_METHOD } = snapshot.rpc_method(), - { GCP_CLIENT_VERSION } = snapshot.client_version(), - { GCP_CLIENT_REPO } = snapshot.client_repo(), - { GCP_CLIENT_ARTIFACT } = snapshot.client_artifact(), - { URL_DOMAIN } = snapshot.default_host(), - { URL_FULL } = snapshot.sanitized_url(), - { URL_TEMPLATE } = snapshot.url_template(), - { RPC_RESPONSE_STATUS_CODE } = rpc_status_code, - { ERROR_TYPE } = error_type.as_str(), - { SERVER_ADDRESS } = snapshot.server_address(), - { SERVER_PORT } = snapshot.server_port() as i64, - { HTTP_REQUEST_METHOD } = snapshot.http_method(), - { HTTP_REQUEST_RESEND_COUNT } = snapshot.http_resend_count().map(|v| v as i64), - { GCP_CLIENT_SERVICE } = snapshot.service_name(), - { GCP_ERRORS_DOMAIN } = error_domain, - { GCP_ERRORS_METADATA } = error_metadata, - "{error:?}" - ); + ::tracing::event!( + name: "experimental.client.request.error", + target: env!("CARGO_PKG_NAME"), + ::tracing::Level::WARN, + { $crate::observability::attributes::keys::RPC_SYSTEM_NAME } = snapshot.rpc_system(), + { $crate::observability::attributes::keys::RPC_SERVICE } = snapshot.service_name(), + { ::opentelemetry_semantic_conventions::attribute::RPC_METHOD } = snapshot.rpc_method(), + { $crate::observability::attributes::keys::GCP_CLIENT_VERSION } = snapshot.client_version(), + { $crate::observability::attributes::keys::GCP_CLIENT_REPO } = snapshot.client_repo(), + { $crate::observability::attributes::keys::GCP_CLIENT_ARTIFACT } = snapshot.client_artifact(), + { ::opentelemetry_semantic_conventions::attribute::URL_DOMAIN } = snapshot.default_host(), + { $crate::observability::attributes::keys::URL_FULL } = snapshot.sanitized_url(), + { ::opentelemetry_semantic_conventions::attribute::URL_TEMPLATE } = snapshot.url_template(), + { $crate::observability::attributes::keys::RPC_RESPONSE_STATUS_CODE } = rpc_status_code, + { $crate::observability::attributes::keys::ERROR_TYPE } = error_type.as_str(), + { $crate::observability::attributes::keys::SERVER_ADDRESS } = snapshot.server_address(), + { $crate::observability::attributes::keys::SERVER_PORT } = snapshot.server_port() as i64, + { $crate::observability::attributes::keys::HTTP_REQUEST_METHOD } = snapshot.http_method(), + { $crate::observability::attributes::keys::HTTP_REQUEST_RESEND_COUNT } = snapshot.http_resend_count().map(|v| v as i64), + { $crate::observability::attributes::keys::GCP_CLIENT_SERVICE } = snapshot.service_name(), + { $crate::observability::attributes::keys::GCP_ERRORS_DOMAIN } = error_domain, + { $crate::observability::attributes::keys::GCP_ERRORS_METADATA } = error_metadata, + "{error:?}", + error = gax_error + ); + } } + output } - Poll::Ready(output) - } + }}; } #[cfg(test)] @@ -140,7 +101,8 @@ mod tests { use super::super::tests::{ TEST_INFO, TEST_METHOD, TEST_URL_TEMPLATE, recorded_request_transport_stub, }; - use super::*; + use crate::observability::RequestRecorder; + use google_cloud_gax::error::Error; use google_cloud_test_utils::tracing::Buffer; use httptest::Expectation; use httptest::Server; @@ -155,31 +117,27 @@ mod tests { #[tokio::test] async fn no_recorder() -> anyhow::Result<()> { - let (_guard, buffer) = capture_logs(); + let _guard = capture_logs(); // test removed to avoid breaking things, since not generating log - let logging = WithClientLogging::new(async { Ok(123) }); + let logging = with_client_logging!(async { Ok::(123) }); let got = logging.await; assert!(matches!(got, Ok(123)), "{got:?}"); - let contents = String::from_utf8(buffer.captured())?; - assert!(contents.is_empty(), "{contents}"); Ok(()) } #[tokio::test] async fn ok() -> anyhow::Result<()> { - let (_guard, buffer) = capture_logs(); + let _guard = capture_logs(); let recorder = RequestRecorder::new(TEST_INFO); let scoped = recorder.clone(); - let logging = WithClientLogging::new(async { + let logging = with_client_logging!(async { let _current = RequestRecorder::current().expect("current recorder should be available"); - Ok(123) + Ok::(123) }); let got = scoped.scope(logging).await; assert!(matches!(got, Ok(123)), "{got:?}"); - let contents = String::from_utf8(buffer.captured())?; - assert!(contents.is_empty(), "{contents}"); Ok(()) } @@ -190,7 +148,7 @@ mod tests { let (_guard, buffer) = capture_logs(); let recorder = RequestRecorder::new(TEST_INFO); let scoped = recorder.clone(); - let logging = WithClientLogging::new(recorded_request_transport_stub(BAD_URL)); + let logging = with_client_logging!(recorded_request_transport_stub(BAD_URL)); let got = scoped.scope(logging).await; assert!(got.is_err(), "{got:?}"); let parsed = extract_captured_log(buffer)?; @@ -206,7 +164,7 @@ mod tests { assert!(object.remove("timestamp").is_some(), "{parsed:?}"); let want = json!({ "level": "WARN", - "target": "experimental.client.request", + "target": env!("CARGO_PKG_NAME"), }); assert_eq!(Some(&object), want.as_object(), "{parsed:?}"); @@ -249,9 +207,7 @@ mod tests { let recorder = RequestRecorder::new(TEST_INFO); let scoped = recorder.clone(); let got = scoped - .scope(WithClientLogging::new(recorded_request_transport_stub( - &url, - ))) + .scope(with_client_logging!(recorded_request_transport_stub(&url,))) .await; assert!(matches!(got, Err(ref e) if e.is_transport()), "{got:?}"); let parsed = extract_captured_log(buffer)?; @@ -268,7 +224,7 @@ mod tests { assert!(object.remove("timestamp").is_some(), "{parsed:?}"); let want = json!({ "level": "WARN", - "target": "experimental.client.request", + "target": env!("CARGO_PKG_NAME"), }); assert_eq!(Some(&object), want.as_object(), "{parsed:?}");