-
Notifications
You must be signed in to change notification settings - Fork 4k
MCS connection scaling interop tests for Java #12651
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
fb52bbf
f518d5e
3874631
0bad82f
51b38e2
4da06a4
df70a27
bee005f
efce818
2ac2d47
4214889
e099d1d
3d75bf8
84d9528
3136bca
c3fc7c3
93cb9ad
3719011
dbb3881
bd36d59
0216d3b
3346bf8
d27128b
a8d66e1
9b66653
809ae0f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,13 +16,18 @@ | |
|
|
||
| package io.grpc.testing.integration; | ||
|
|
||
| import static io.grpc.Grpc.TRANSPORT_ATTR_REMOTE_ADDR; | ||
|
|
||
| import com.google.common.base.Preconditions; | ||
| import com.google.common.collect.Queues; | ||
| import com.google.errorprone.annotations.concurrent.GuardedBy; | ||
| import com.google.protobuf.ByteString; | ||
| import io.grpc.Context; | ||
| import io.grpc.Contexts; | ||
| import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; | ||
| import io.grpc.Metadata; | ||
| import io.grpc.ServerCall; | ||
| import io.grpc.ServerCall.Listener; | ||
| import io.grpc.ServerCallHandler; | ||
| import io.grpc.ServerInterceptor; | ||
| import io.grpc.Status; | ||
|
|
@@ -42,10 +47,12 @@ | |
| import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; | ||
| import io.grpc.testing.integration.Messages.TestOrcaReport; | ||
| import io.grpc.testing.integration.TestServiceGrpc.AsyncService; | ||
| import java.net.SocketAddress; | ||
| import java.util.ArrayDeque; | ||
| import java.util.Arrays; | ||
| import java.util.HashMap; | ||
| import java.util.HashSet; | ||
| import java.util.Iterator; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Queue; | ||
|
|
@@ -61,8 +68,8 @@ | |
| * sent in response streams. | ||
| */ | ||
| public class TestServiceImpl implements io.grpc.BindableService, AsyncService { | ||
| static Context.Key<SocketAddress> PEER_ADDRESS_CONTEXT_KEY = Context.key("peer-address"); | ||
| private final Random random = new Random(); | ||
|
|
||
| private final ScheduledExecutorService executor; | ||
| private final ByteString compressableBuffer; | ||
| private final MetricRecorder metricRecorder; | ||
|
|
@@ -235,9 +242,27 @@ public void onNext(StreamingOutputCallRequest request) { | |
| .asRuntimeException()); | ||
| return; | ||
| } | ||
| if (whetherSendClientSocketAddressInResponse(request)) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So we just ignore most of the request? It looks like this should go through
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The other parts of the request are about chunking behavior which doesn't apply for this test. I don't need to use |
||
| responseObserver.onNext( | ||
| StreamingOutputCallResponse.newBuilder() | ||
| .setPeerSocketAddress(PEER_ADDRESS_CONTEXT_KEY.get().toString()) | ||
| .build()); | ||
| return; | ||
| } | ||
| dispatcher.enqueue(toChunkQueue(request)); | ||
| } | ||
|
|
||
| private boolean whetherSendClientSocketAddressInResponse(StreamingOutputCallRequest request) { | ||
| Iterator<ResponseParameters> responseParametersIterator = | ||
| request.getResponseParametersList().iterator(); | ||
| while (responseParametersIterator.hasNext()) { | ||
| if (responseParametersIterator.next().getFillPeerSocketAddress().getValue()) { | ||
| return true; | ||
| } | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| @Override | ||
| public void onCompleted() { | ||
| if (oobTestLocked) { | ||
|
|
@@ -507,7 +532,8 @@ public static List<ServerInterceptor> interceptors() { | |
| return Arrays.asList( | ||
| echoRequestHeadersInterceptor(Util.METADATA_KEY), | ||
| echoRequestMetadataInHeaders(Util.ECHO_INITIAL_METADATA_KEY), | ||
| echoRequestMetadataInTrailers(Util.ECHO_TRAILING_METADATA_KEY)); | ||
| echoRequestMetadataInTrailers(Util.ECHO_TRAILING_METADATA_KEY), | ||
| new McsScalingTestcaseInterceptor()); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -539,6 +565,22 @@ public void close(Status status, Metadata trailers) { | |
| }; | ||
| } | ||
|
|
||
| static class McsScalingTestcaseInterceptor implements ServerInterceptor { | ||
| @Override | ||
| public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, | ||
| Metadata headers, ServerCallHandler<ReqT, RespT> next) { | ||
| SocketAddress peerAddress = call.getAttributes().get(TRANSPORT_ATTR_REMOTE_ADDR); | ||
|
|
||
| // Create a new context with the peer address value | ||
| Context newContext = Context.current().withValue(PEER_ADDRESS_CONTEXT_KEY, peerAddress); | ||
| try { | ||
| return Contexts.interceptCall(newContext, call, headers, next); | ||
| } catch (Exception ex) { | ||
| throw new RuntimeException(ex); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Echoes request headers with the specified key(s) from a client into response headers only. | ||
| */ | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.