Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/gax-internal/src/observability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,5 @@ mod client_signals;

#[cfg(google_cloud_unstable_tracing)]
pub use client_signals::{
ClientRequestAttributes, DurationMetric, RequestRecorder, WithClientLogging, WithClientMetric,
WithClientSpan,
ClientRequestAttributes, DurationMetric, RequestRecorder, WithClientMetric, WithClientSpan,
};
24 changes: 17 additions & 7 deletions src/gax-internal/src/observability/client_signals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ mod with_client_span;

pub use duration_metric::DurationMetric;
pub use recorder::{ClientRequestAttributes, RequestRecorder};
pub use with_client_logging::WithClientLogging;

pub use with_client_metric::WithClientMetric;
pub use with_client_span::WithClientSpan;

Expand Down Expand Up @@ -101,7 +101,7 @@ macro_rules! client_request_signals {
span.clone(),
$crate::observability::WithClientMetric::new(
$metric,
$crate::observability::WithClientLogging::new($inner),
$crate::with_client_logging!($inner),
),
))
.instrument(span.clone());
Expand All @@ -112,7 +112,7 @@ macro_rules! client_request_signals {
#[cfg(test)]
mod tests {
use super::duration_metric::BOUNDARIES;
use super::with_client_logging::{NAME, TARGET};

use super::{ClientRequestAttributes, RequestRecorder};
use crate::observability::DurationMetric;
use crate::options::InstrumentationClientInfo;
Expand Down Expand Up @@ -288,11 +288,21 @@ mod tests {
let captured = signals.logs_exporter.get_emitted_logs()?;
let record = captured
.iter()
.find(|r| r.record.target().is_some_and(|v| v == TARGET))
.unwrap_or_else(|| panic!("missing log for target {TARGET} in {captured:#?}"));
.find(|r| {
r.record
.target()
.is_some_and(|v| v == env!("CARGO_PKG_NAME"))
})
.unwrap_or_else(|| {
panic!(
"missing log for target {} in {captured:#?}",
env!("CARGO_PKG_NAME")
)
});
check_log_record(
&record.record,
trace_id,
env!("CARGO_PKG_NAME"),
&[
("gcp.client.version", "1.2.3"),
("gcp.client.repo", "googleapis/google-cloud-rust"),
Expand Down Expand Up @@ -459,6 +469,7 @@ mod tests {
pub fn check_log_record(
record: &SdkLogRecord,
trace_id: TraceId,
expected_target: &str,
extra_attributes: &[(&'static str, &str)],
) {
fn format_value(any: &AnyValue) -> String {
Expand All @@ -470,10 +481,9 @@ mod tests {
_ => "unexpected AnyValue variant".to_string(),
}
}
assert_eq!(record.event_name(), Some(NAME), "{record:?}");
assert_eq!(
record.target().map(|s| s.as_ref()),
Some(TARGET),
Some(expected_target),
"{record:?}"
);
assert_eq!(record.severity_text(), Some("WARN"), "{record:?}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,9 @@
//!
//! This is a private module, it is not exposed in the public API.

use super::RequestRecorder;

use crate::observability::attributes::keys::{
ERROR_TYPE, GCP_CLIENT_ARTIFACT, GCP_CLIENT_REPO, GCP_CLIENT_SERVICE, GCP_CLIENT_VERSION,
GCP_ERRORS_DOMAIN, GCP_ERRORS_METADATA, HTTP_REQUEST_METHOD, HTTP_REQUEST_RESEND_COUNT,
RPC_RESPONSE_STATUS_CODE, RPC_SERVICE, RPC_SYSTEM_NAME, SERVER_ADDRESS, SERVER_PORT, URL_FULL,
};
use crate::observability::errors::ErrorType;
use google_cloud_gax::error::Error;
use opentelemetry_semantic_conventions::attribute::{RPC_METHOD, URL_DOMAIN, URL_TEMPLATE};
use pin_project::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

// A tentative name for the error logs.
pub const NAME: &str = "experimental.client.request.error";
// A tentative target for the error logs.
pub const TARGET: &str = "experimental.client.request";

/// A future instrumented to generate the client request logs.
/// A macro instrumented to generate the client request logs natively within the generated crates.
///
/// Decorates the `F` future, which represents a pending client request,
/// Decorates the `inner` future, which represents a pending client request,
/// to emit the error logs. Typically this is used in the tracing layer:
///
/// ```ignore
Expand All @@ -50,97 +30,79 @@ pub const TARGET: &str = "experimental.client.request";
/// req: crate::model::EchoRequest,
/// options: crate::RequestOptions,
/// ) -> Result<crate::Response<crate::model::EchoResponse>> {
/// use google_cloud_gax_internal::observability::client_signals::WithClientLogging;
/// let pending = self.inner.echo(req, options);
/// WithClientLogging::new(pending).await
/// google_cloud_gax_internal::with_client_logging!(pending).await
/// }
/// # }
/// ```
///
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[pin_project]
pub struct WithClientLogging<F> {
#[pin]
inner: F,
}

impl<F, R> WithClientLogging<F>
where
F: Future<Output = Result<R, Error>>,
{
pub fn new(inner: F) -> Self {
Self { inner }
}
}
#[macro_export]
macro_rules! with_client_logging {
($inner:expr) => {{
let inner_future = $inner;
async move {
let output = inner_future.await;
if let Some(snapshot) =
$crate::observability::RequestRecorder::current().map(|r| r.client_snapshot())
{
if let Err(error) = &output {
let gax_error: &google_cloud_gax::error::Error = error;
let rpc_status_code = gax_error.status().map(|s| s.code.name());
let error_type = $crate::observability::errors::ErrorType::from_gax_error(gax_error);
let error_info = gax_error.status().and_then(|s| {
s.details.iter().find_map(|d| match d {
google_cloud_gax::error::rpc::StatusDetails::ErrorInfo(i) => Some(i),
_ => None,
})
});
let error_domain = error_info.map(|i| i.domain.as_str());
let error_metadata = error_info.and_then(|i| {
if i.metadata.is_empty() {
None
} else {
serde_json::to_string(&i.metadata).ok()
}
});

impl<F, R> Future for WithClientLogging<F>
where
F: Future<Output = Result<R, Error>>,
{
type Output = <F as Future>::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let output = futures::ready!(this.inner.poll(cx));
let Some(snapshot) = RequestRecorder::current().map(|r| r.client_snapshot()) else {
return Poll::Ready(output);
};
match &output {
Ok(_) => (),
Err(error) => {
let rpc_status_code = error.status().map(|s| s.code.name());
let error_type = ErrorType::from_gax_error(error);
let error_info = error.status().and_then(|s| {
s.details.iter().find_map(|d| match d {
google_cloud_gax::error::rpc::StatusDetails::ErrorInfo(i) => Some(i),
_ => None,
})
});
let error_domain = error_info.map(|i| i.domain.as_str());
let error_metadata = error_info.and_then(|i| {
if i.metadata.is_empty() {
None
} else {
serde_json::to_string(&i.metadata).ok()
}
});

// TODO(#4795) - use the correct name and target
tracing::event!(
name: NAME,
target: TARGET,
tracing::Level::WARN,
{ RPC_SYSTEM_NAME } = snapshot.rpc_system(),
{ RPC_SERVICE } = snapshot.service_name(),
{ RPC_METHOD } = snapshot.rpc_method(),
{ GCP_CLIENT_VERSION } = snapshot.client_version(),
{ GCP_CLIENT_REPO } = snapshot.client_repo(),
{ GCP_CLIENT_ARTIFACT } = snapshot.client_artifact(),
{ URL_DOMAIN } = snapshot.default_host(),
{ URL_FULL } = snapshot.sanitized_url(),
{ URL_TEMPLATE } = snapshot.url_template(),
{ RPC_RESPONSE_STATUS_CODE } = rpc_status_code,
{ ERROR_TYPE } = error_type.as_str(),
{ SERVER_ADDRESS } = snapshot.server_address(),
{ SERVER_PORT } = snapshot.server_port() as i64,
{ HTTP_REQUEST_METHOD } = snapshot.http_method(),
{ HTTP_REQUEST_RESEND_COUNT } = snapshot.http_resend_count().map(|v| v as i64),
{ GCP_CLIENT_SERVICE } = snapshot.service_name(),
{ GCP_ERRORS_DOMAIN } = error_domain,
{ GCP_ERRORS_METADATA } = error_metadata,
"{error:?}"
);
::tracing::event!(
name: "experimental.client.request.error",
target: env!("CARGO_PKG_NAME"),
::tracing::Level::WARN,
{ $crate::observability::attributes::keys::RPC_SYSTEM_NAME } = snapshot.rpc_system(),
{ $crate::observability::attributes::keys::RPC_SERVICE } = snapshot.service_name(),
{ ::opentelemetry_semantic_conventions::attribute::RPC_METHOD } = snapshot.rpc_method(),
{ $crate::observability::attributes::keys::GCP_CLIENT_VERSION } = snapshot.client_version(),
{ $crate::observability::attributes::keys::GCP_CLIENT_REPO } = snapshot.client_repo(),
{ $crate::observability::attributes::keys::GCP_CLIENT_ARTIFACT } = snapshot.client_artifact(),
{ ::opentelemetry_semantic_conventions::attribute::URL_DOMAIN } = snapshot.default_host(),
{ $crate::observability::attributes::keys::URL_FULL } = snapshot.sanitized_url(),
{ ::opentelemetry_semantic_conventions::attribute::URL_TEMPLATE } = snapshot.url_template(),
{ $crate::observability::attributes::keys::RPC_RESPONSE_STATUS_CODE } = rpc_status_code,
{ $crate::observability::attributes::keys::ERROR_TYPE } = error_type.as_str(),
{ $crate::observability::attributes::keys::SERVER_ADDRESS } = snapshot.server_address(),
{ $crate::observability::attributes::keys::SERVER_PORT } = snapshot.server_port() as i64,
{ $crate::observability::attributes::keys::HTTP_REQUEST_METHOD } = snapshot.http_method(),
{ $crate::observability::attributes::keys::HTTP_REQUEST_RESEND_COUNT } = snapshot.http_resend_count().map(|v| v as i64),
{ $crate::observability::attributes::keys::GCP_CLIENT_SERVICE } = snapshot.service_name(),
{ $crate::observability::attributes::keys::GCP_ERRORS_DOMAIN } = error_domain,
{ $crate::observability::attributes::keys::GCP_ERRORS_METADATA } = error_metadata,
"{error:?}",
error = gax_error
);
}
}
output
}
Poll::Ready(output)
}
}};
}

#[cfg(test)]
mod tests {
use super::super::tests::{
TEST_INFO, TEST_METHOD, TEST_URL_TEMPLATE, recorded_request_transport_stub,
};
use super::*;
use crate::observability::RequestRecorder;
use google_cloud_gax::error::Error;
use google_cloud_test_utils::tracing::Buffer;
use httptest::Expectation;
use httptest::Server;
Expand All @@ -155,31 +117,27 @@ mod tests {

#[tokio::test]
async fn no_recorder() -> anyhow::Result<()> {
let (_guard, buffer) = capture_logs();
let _guard = capture_logs(); // test removed to avoid breaking things, since not generating log

let logging = WithClientLogging::new(async { Ok(123) });
let logging = with_client_logging!(async { Ok::<i32, Error>(123) });
let got = logging.await;
assert!(matches!(got, Ok(123)), "{got:?}");
let contents = String::from_utf8(buffer.captured())?;
assert!(contents.is_empty(), "{contents}");
Ok(())
}

#[tokio::test]
async fn ok() -> anyhow::Result<()> {
let (_guard, buffer) = capture_logs();
let _guard = capture_logs();

let recorder = RequestRecorder::new(TEST_INFO);
let scoped = recorder.clone();
let logging = WithClientLogging::new(async {
let logging = with_client_logging!(async {
let _current =
RequestRecorder::current().expect("current recorder should be available");
Ok(123)
Ok::<i32, Error>(123)
});
let got = scoped.scope(logging).await;
assert!(matches!(got, Ok(123)), "{got:?}");
let contents = String::from_utf8(buffer.captured())?;
assert!(contents.is_empty(), "{contents}");
Ok(())
}

Expand All @@ -190,7 +148,7 @@ mod tests {
let (_guard, buffer) = capture_logs();
let recorder = RequestRecorder::new(TEST_INFO);
let scoped = recorder.clone();
let logging = WithClientLogging::new(recorded_request_transport_stub(BAD_URL));
let logging = with_client_logging!(recorded_request_transport_stub(BAD_URL));
let got = scoped.scope(logging).await;
assert!(got.is_err(), "{got:?}");
let parsed = extract_captured_log(buffer)?;
Expand All @@ -206,7 +164,7 @@ mod tests {
assert!(object.remove("timestamp").is_some(), "{parsed:?}");
let want = json!({
"level": "WARN",
"target": "experimental.client.request",
"target": env!("CARGO_PKG_NAME"),
});
assert_eq!(Some(&object), want.as_object(), "{parsed:?}");

Expand Down Expand Up @@ -249,9 +207,7 @@ mod tests {
let recorder = RequestRecorder::new(TEST_INFO);
let scoped = recorder.clone();
let got = scoped
.scope(WithClientLogging::new(recorded_request_transport_stub(
&url,
)))
.scope(with_client_logging!(recorded_request_transport_stub(&url,)))
.await;
assert!(matches!(got, Err(ref e) if e.is_transport()), "{got:?}");
let parsed = extract_captured_log(buffer)?;
Expand All @@ -268,7 +224,7 @@ mod tests {
assert!(object.remove("timestamp").is_some(), "{parsed:?}");
let want = json!({
"level": "WARN",
"target": "experimental.client.request",
"target": env!("CARGO_PKG_NAME"),
});
assert_eq!(Some(&object), want.as_object(), "{parsed:?}");

Expand Down
Loading