Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private GrpcOpenTelemetry(Builder builder) {
this.optionalLabels = ImmutableList.copyOf(builder.optionalLabels);
this.openTelemetryMetricsModule = new OpenTelemetryMetricsModule(
STOPWATCH_SUPPLIER, resource, optionalLabels, builder.plugins,
builder.targetFilter);
openTelemetrySdk.getPropagators(), builder.targetFilter);
this.openTelemetryTracingModule = new OpenTelemetryTracingModule(openTelemetrySdk);
this.sink = new OpenTelemetryMetricSink(meter, enableMetrics, disableDefault, optionalLabels);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.opentelemetry.api.baggage.Baggage;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.ContextPropagators;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -100,24 +101,28 @@ final class OpenTelemetryMetricsModule {
private final boolean localityEnabled;
private final boolean backendServiceEnabled;
private final ImmutableList<OpenTelemetryPlugin> plugins;
private final ContextPropagators contextPropagators;
@Nullable
private final TargetFilter targetAttributeFilter;

OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
OpenTelemetryMetricsResource resource,
Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins) {
this(stopwatchSupplier, resource, optionalLabels, plugins, null);
Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins,
ContextPropagators contextPropagators) {
this(stopwatchSupplier, resource, optionalLabels, plugins, contextPropagators, null);
}

OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
OpenTelemetryMetricsResource resource,
Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins,
@Nullable TargetFilter targetAttributeFilter) {
ContextPropagators contextPropagators,
@Nullable TargetFilter targetAttributeFilter) {
this.resource = checkNotNull(resource, "resource");
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey());
this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey());
this.plugins = ImmutableList.copyOf(plugins);
this.contextPropagators = checkNotNull(contextPropagators, "contextPropagators");
this.targetAttributeFilter = targetAttributeFilter;
}

Expand Down Expand Up @@ -159,8 +164,7 @@ static String recordMethodName(String fullMethodName, boolean isGeneratedMethod)
return isGeneratedMethod ? fullMethodName : "other";
}

private static Context otelContextWithBaggage() {
Baggage baggage = BAGGAGE_KEY.get();
private static Context otelContextWithBaggage(Baggage baggage) {
if (baggage == null) {
return Context.current();
}
Expand Down Expand Up @@ -282,7 +286,7 @@ public void streamClosed(Status status) {
}

void recordFinishedAttempt() {
Context otelContext = otelContextWithBaggage();
Context otelContext = otelContextWithBaggage(BAGGAGE_KEY.get());
AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
.put(METHOD_KEY, fullMethodName)
.put(TARGET_KEY, target)
Expand Down Expand Up @@ -448,7 +452,7 @@ void callEnded(Status status) {
}

void recordFinishedCall() {
Context otelContext = otelContextWithBaggage();
Context otelContext = otelContextWithBaggage(BAGGAGE_KEY.get());
if (attemptsPerCall.get() == 0) {
ClientTracer tracer = newClientTracer(null);
tracer.attemptNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS);
Expand Down Expand Up @@ -553,13 +557,15 @@ private static final class ServerTracer extends ServerStreamTracer {
private final Stopwatch stopwatch;
private volatile long outboundWireSize;
private volatile long inboundWireSize;
private final Context otelContext;

ServerTracer(OpenTelemetryMetricsModule module, String fullMethodName,
List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins) {
List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins, Context otelContext) {
this.module = checkNotNull(module, "module");
this.fullMethodName = fullMethodName;
this.streamPlugins = checkNotNull(streamPlugins, "streamPlugins");
this.stopwatch = module.stopwatchSupplier.get().start();
this.otelContext = checkNotNull(otelContext, "otelContext");
}

@Override
Expand All @@ -574,7 +580,7 @@ public void serverCallStarted(ServerCallInfo<?, ?> callInfo) {
METHOD_KEY, recordMethodName(fullMethodName, isSampledToLocalTracing));

if (module.resource.serverCallCountCounter() != null) {
module.resource.serverCallCountCounter().add(1, attribute);
module.resource.serverCallCountCounter().add(1, attribute, otelContext);
}
}

Expand Down Expand Up @@ -606,7 +612,6 @@ public void inboundWireSize(long bytes) {
*/
@Override
public void streamClosed(Status status) {
Context otelContext = otelContextWithBaggage();
if (streamClosedUpdater != null) {
if (streamClosedUpdater.getAndSet(this, 1) != 0) {
return;
Expand All @@ -627,17 +632,23 @@ public void streamClosed(Status status) {
}
io.opentelemetry.api.common.Attributes attributes = builder.build();

Context ctxToRecord = otelContext;
Baggage currentBaggage = BAGGAGE_KEY.get();
if (currentBaggage != null && !currentBaggage.isEmpty()) {
ctxToRecord = ctxToRecord.with(currentBaggage);
}

if (module.resource.serverCallDurationCounter() != null) {
module.resource.serverCallDurationCounter()
.record(elapsedTimeNanos * SECONDS_PER_NANO, attributes, otelContext);
.record(elapsedTimeNanos * SECONDS_PER_NANO, attributes, ctxToRecord);
}
if (module.resource.serverTotalSentCompressedMessageSizeCounter() != null) {
module.resource.serverTotalSentCompressedMessageSizeCounter()
.record(outboundWireSize, attributes, otelContext);
.record(outboundWireSize, attributes, ctxToRecord);
}
if (module.resource.serverTotalReceivedCompressedMessageSizeCounter() != null) {
module.resource.serverTotalReceivedCompressedMessageSizeCounter()
.record(inboundWireSize, attributes, otelContext);
.record(inboundWireSize, attributes, ctxToRecord);
}
}
}
Expand All @@ -657,7 +668,10 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata
}
streamPlugins = Collections.unmodifiableList(streamPluginsMutable);
}
return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins);
Context context = contextPropagators.getTextMapPropagator().extract(
Context.current(), headers, MetadataGetter.getInstance());
return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins,
context);
}
}

Expand Down Expand Up @@ -717,3 +731,4 @@ public void onClose(Status status, Metadata trailers) {
}
}
}

Loading