diff --git a/core/src/main/java/io/grpc/internal/MessageDeframer.java b/core/src/main/java/io/grpc/internal/MessageDeframer.java index 13a01efec0a..77bce3fb6cf 100644 --- a/core/src/main/java/io/grpc/internal/MessageDeframer.java +++ b/core/src/main/java/io/grpc/internal/MessageDeframer.java @@ -314,6 +314,11 @@ private boolean readRequiredBytes() { int totalBytesRead = 0; int deflatedBytesRead = 0; try { + // Avoid allocating nextFrame when idle + if (fullStreamDecompressor == null && unprocessed.readableBytes() == 0) { + return false; + } + if (nextFrame == null) { nextFrame = new CompositeReadableBuffer(); } diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index dc0709e1fb8..5bff096c3b8 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -151,7 +151,9 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume InternalLogId.allocate("Server", String.valueOf(getListenSocketsIgnoringLifecycle())); // Fork from the passed in context so that it does not propagate cancellation, it only // inherits values. - this.rootContext = Preconditions.checkNotNull(rootContext, "rootContext").fork(); + this.rootContext = Preconditions.checkNotNull(rootContext, "rootContext") + .fork() + .withValue(io.grpc.InternalServer.SERVER_CONTEXT_KEY, ServerImpl.this); this.decompressorRegistry = builder.decompressorRegistry; this.compressorRegistry = builder.compressorRegistry; this.transportFilters = Collections.unmodifiableList( @@ -622,19 +624,7 @@ private void runInternal() { // An extremely short deadline may expire before stream.setListener(jumpListener). // This causes NPE as in issue: https://github.com/grpc/grpc-java/issues/6300 // Delay of setting cancellationListener to context will fix the issue. - final class ServerStreamCancellationListener implements Context.CancellationListener { - @Override - public void cancelled(Context context) { - Status status = statusFromCancelled(context); - if (DEADLINE_EXCEEDED.getCode().equals(status.getCode())) { - // This should rarely get run, since the client will likely cancel the stream - // before the timeout is reached. - stream.cancel(status); - } - } - } - - context.addListener(new ServerStreamCancellationListener(), directExecutor()); + context.addListener(new ServerStreamCancellationListener(stream), directExecutor()); } } @@ -648,8 +638,7 @@ private Context.CancellableContext createContext( Context baseContext = statsTraceCtx - .serverFilterContext(rootContext) - .withValue(io.grpc.InternalServer.SERVER_CONTEXT_KEY, ServerImpl.this); + .serverFilterContext(rootContext); if (timeoutNanos == null) { return baseContext.withCancellation(); @@ -707,6 +696,31 @@ private ServerStreamListener startWrappedCall( } } + /** + * Propagates context cancellation to the ServerStream. + * + * This is outside of HandleServerCall because that class holds Metadata and other state needed + * only when starting the RPC. The cancellation listener will live for the life of the call, so we + * avoid that useless state being retained. + */ + static final class ServerStreamCancellationListener implements Context.CancellationListener { + private final ServerStream stream; + + ServerStreamCancellationListener(ServerStream stream) { + this.stream = checkNotNull(stream, "stream"); + } + + @Override + public void cancelled(Context context) { + Status status = statusFromCancelled(context); + if (DEADLINE_EXCEEDED.getCode().equals(status.getCode())) { + // This should rarely get run, since the client will likely cancel the stream + // before the timeout is reached. + stream.cancel(status); + } + } + } + @Override public InternalLogId getLogId() { return logId; diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java index a9ee9382495..573b8ec6de8 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java @@ -511,27 +511,30 @@ public static List interceptors() { } /** - * Echo the request headers from a client into response headers and trailers. Useful for + * Echo a request header from a client into response headers and trailers. Useful for * testing end-to-end metadata propagation. */ - private static ServerInterceptor echoRequestHeadersInterceptor(final Metadata.Key... keys) { - final Set> keySet = new HashSet<>(Arrays.asList(keys)); + private static ServerInterceptor echoRequestHeadersInterceptor(final Metadata.Key key) { return new ServerInterceptor() { @Override public ServerCall.Listener interceptCall( ServerCall call, - final Metadata requestHeaders, + Metadata requestHeaders, ServerCallHandler next) { + if (!requestHeaders.containsKey(key)) { + return next.startCall(call, requestHeaders); + } + T value = requestHeaders.get(key); return next.startCall(new SimpleForwardingServerCall(call) { @Override public void sendHeaders(Metadata responseHeaders) { - responseHeaders.merge(requestHeaders, keySet); + responseHeaders.put(key, value); super.sendHeaders(responseHeaders); } @Override public void close(Status status, Metadata trailers) { - trailers.merge(requestHeaders, keySet); + trailers.put(key, value); super.close(status, trailers); } }, requestHeaders); @@ -540,52 +543,48 @@ public void close(Status status, Metadata trailers) { } /** - * Echoes request headers with the specified key(s) from a client into response headers only. + * Echoes request headers with the specified key from a client into response headers only. */ - private static ServerInterceptor echoRequestMetadataInHeaders(final Metadata.Key... keys) { - final Set> keySet = new HashSet<>(Arrays.asList(keys)); + private static ServerInterceptor echoRequestMetadataInHeaders(final Metadata.Key key) { return new ServerInterceptor() { @Override public ServerCall.Listener interceptCall( ServerCall call, final Metadata requestHeaders, ServerCallHandler next) { + if (!requestHeaders.containsKey(key)) { + return next.startCall(call, requestHeaders); + } + T value = requestHeaders.get(key); return next.startCall(new SimpleForwardingServerCall(call) { @Override public void sendHeaders(Metadata responseHeaders) { - responseHeaders.merge(requestHeaders, keySet); + responseHeaders.put(key, value); super.sendHeaders(responseHeaders); } - - @Override - public void close(Status status, Metadata trailers) { - super.close(status, trailers); - } }, requestHeaders); } }; } /** - * Echoes request headers with the specified key(s) from a client into response trailers only. + * Echoes request headers with the specified key from a client into response trailers only. */ - private static ServerInterceptor echoRequestMetadataInTrailers(final Metadata.Key... keys) { - final Set> keySet = new HashSet<>(Arrays.asList(keys)); + private static ServerInterceptor echoRequestMetadataInTrailers(final Metadata.Key key) { return new ServerInterceptor() { @Override public ServerCall.Listener interceptCall( ServerCall call, final Metadata requestHeaders, ServerCallHandler next) { + if (!requestHeaders.containsKey(key)) { + return next.startCall(call, requestHeaders); + } + T value = requestHeaders.get(key); return next.startCall(new SimpleForwardingServerCall(call) { - @Override - public void sendHeaders(Metadata responseHeaders) { - super.sendHeaders(responseHeaders); - } - @Override public void close(Status status, Metadata trailers) { - trailers.merge(requestHeaders, keySet); + trailers.put(key, value); super.close(status, trailers); } }, requestHeaders);