Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/src/main/java/io/grpc/internal/MessageDeframer.java
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,11 @@ private boolean readRequiredBytes() {
int totalBytesRead = 0;
int deflatedBytesRead = 0;
try {
// Avoid allocating nextFrame when idle
if (fullStreamDecompressor == null && unprocessed.readableBytes() == 0) {
return false;
}

if (nextFrame == null) {
nextFrame = new CompositeReadableBuffer();
}
Expand Down
46 changes: 30 additions & 16 deletions core/src/main/java/io/grpc/internal/ServerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
InternalLogId.allocate("Server", String.valueOf(getListenSocketsIgnoringLifecycle()));
// Fork from the passed in context so that it does not propagate cancellation, it only
// inherits values.
this.rootContext = Preconditions.checkNotNull(rootContext, "rootContext").fork();
this.rootContext = Preconditions.checkNotNull(rootContext, "rootContext")
.fork()
.withValue(io.grpc.InternalServer.SERVER_CONTEXT_KEY, ServerImpl.this);
this.decompressorRegistry = builder.decompressorRegistry;
this.compressorRegistry = builder.compressorRegistry;
this.transportFilters = Collections.unmodifiableList(
Expand Down Expand Up @@ -622,19 +624,7 @@ private void runInternal() {
// An extremely short deadline may expire before stream.setListener(jumpListener).
// This causes NPE as in issue: https://github.com/grpc/grpc-java/issues/6300
// Delay of setting cancellationListener to context will fix the issue.
final class ServerStreamCancellationListener implements Context.CancellationListener {
@Override
public void cancelled(Context context) {
Status status = statusFromCancelled(context);
if (DEADLINE_EXCEEDED.getCode().equals(status.getCode())) {
// This should rarely get run, since the client will likely cancel the stream
// before the timeout is reached.
stream.cancel(status);
}
}
}

context.addListener(new ServerStreamCancellationListener(), directExecutor());
context.addListener(new ServerStreamCancellationListener(stream), directExecutor());
}
}

Expand All @@ -648,8 +638,7 @@ private Context.CancellableContext createContext(

Context baseContext =
statsTraceCtx
.serverFilterContext(rootContext)
.withValue(io.grpc.InternalServer.SERVER_CONTEXT_KEY, ServerImpl.this);
.serverFilterContext(rootContext);

if (timeoutNanos == null) {
return baseContext.withCancellation();
Expand Down Expand Up @@ -707,6 +696,31 @@ private <WReqT, WRespT> ServerStreamListener startWrappedCall(
}
}

/**
* Propagates context cancellation to the ServerStream.
*
* This is outside of HandleServerCall because that class holds Metadata and other state needed
* only when starting the RPC. The cancellation listener will live for the life of the call, so we
* avoid that useless state being retained.
*/
static final class ServerStreamCancellationListener implements Context.CancellationListener {
private final ServerStream stream;

ServerStreamCancellationListener(ServerStream stream) {
this.stream = checkNotNull(stream, "stream");
}

@Override
public void cancelled(Context context) {
Status status = statusFromCancelled(context);
if (DEADLINE_EXCEEDED.getCode().equals(status.getCode())) {
// This should rarely get run, since the client will likely cancel the stream
// before the timeout is reached.
stream.cancel(status);
}
}
}

@Override
public InternalLogId getLogId() {
return logId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,27 +511,30 @@ public static List<ServerInterceptor> interceptors() {
}

/**
* Echo the request headers from a client into response headers and trailers. Useful for
* Echo a request header from a client into response headers and trailers. Useful for
* testing end-to-end metadata propagation.
*/
private static ServerInterceptor echoRequestHeadersInterceptor(final Metadata.Key<?>... keys) {
final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys));
private static <T> ServerInterceptor echoRequestHeadersInterceptor(final Metadata.Key<T> key) {
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
if (!requestHeaders.containsKey(key)) {
return next.startCall(call, requestHeaders);
}
T value = requestHeaders.get(key);
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
responseHeaders.merge(requestHeaders, keySet);
responseHeaders.put(key, value);
super.sendHeaders(responseHeaders);
}

@Override
public void close(Status status, Metadata trailers) {
trailers.merge(requestHeaders, keySet);
trailers.put(key, value);
super.close(status, trailers);
}
}, requestHeaders);
Expand All @@ -540,52 +543,48 @@ public void close(Status status, Metadata trailers) {
}

/**
* Echoes request headers with the specified key(s) from a client into response headers only.
* Echoes request headers with the specified key from a client into response headers only.
*/
private static ServerInterceptor echoRequestMetadataInHeaders(final Metadata.Key<?>... keys) {
final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys));
private static <T> ServerInterceptor echoRequestMetadataInHeaders(final Metadata.Key<T> key) {
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
if (!requestHeaders.containsKey(key)) {
return next.startCall(call, requestHeaders);
}
T value = requestHeaders.get(key);
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
responseHeaders.merge(requestHeaders, keySet);
responseHeaders.put(key, value);
super.sendHeaders(responseHeaders);
}

@Override
public void close(Status status, Metadata trailers) {
super.close(status, trailers);
}
}, requestHeaders);
}
};
}

/**
* Echoes request headers with the specified key(s) from a client into response trailers only.
* Echoes request headers with the specified key from a client into response trailers only.
*/
private static ServerInterceptor echoRequestMetadataInTrailers(final Metadata.Key<?>... keys) {
final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys));
private static <T> ServerInterceptor echoRequestMetadataInTrailers(final Metadata.Key<T> key) {
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
if (!requestHeaders.containsKey(key)) {
return next.startCall(call, requestHeaders);
}
T value = requestHeaders.get(key);
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
super.sendHeaders(responseHeaders);
}

@Override
public void close(Status status, Metadata trailers) {
trailers.merge(requestHeaders, keySet);
trailers.put(key, value);
super.close(status, trailers);
}
}, requestHeaders);
Expand Down
Loading