Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
fb52bbf
Save changes.
kannanjgithub Jan 19, 2026
f518d5e
Save changes.
kannanjgithub Jan 20, 2026
3874631
Save changes.
kannanjgithub Jan 20, 2026
0bad82f
Save changes.
kannanjgithub Jan 21, 2026
51b38e2
Fix enum test.
kannanjgithub Jan 21, 2026
4da06a4
Revert temp changes.
kannanjgithub Jan 21, 2026
df70a27
Revert temp changes.
kannanjgithub Jan 21, 2026
bee005f
Try with server streaming alone.
kannanjgithub Jan 23, 2026
efce818
Interceptor logic.
kannanjgithub Jan 31, 2026
2ac2d47
Closing the stream after the rpc is done.
kannanjgithub Feb 10, 2026
4214889
Style fixes.
kannanjgithub Feb 10, 2026
e099d1d
Fix test name.
kannanjgithub Feb 10, 2026
3d75bf8
Fix style warnings.
kannanjgithub Feb 10, 2026
84d9528
Fix build.
kannanjgithub Feb 10, 2026
3136bca
Review comments - Build channel separately for MCS connection scaling…
kannanjgithub Feb 12, 2026
c3fc7c3
Address Review comments.
kannanjgithub Feb 12, 2026
93cb9ad
Expand the test name on the client side as well.
kannanjgithub Feb 13, 2026
3719011
Add debug print statements to diagnose why server is not starting.
kannanjgithub Feb 13, 2026
dbb3881
Revert "Add debug print statements to diagnose why server is not star…
kannanjgithub Feb 13, 2026
bd36d59
Add temp debug stmts for server start.
kannanjgithub Feb 13, 2026
0216d3b
Rename request proto field.
kannanjgithub Feb 19, 2026
3346bf8
Rename request proto field.
kannanjgithub Feb 19, 2026
d27128b
Rename response proto field.
kannanjgithub Feb 19, 2026
a8d66e1
Address review comments.
kannanjgithub Feb 20, 2026
9b66653
Specify MCS limit directly via command line arg.
kannanjgithub Mar 31, 2026
809ae0f
Merge remote-tracking branch 'origin/mcs-interop-tests' into mcs-inte…
kannanjgithub Mar 31, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions android-interop-testing/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ android {
srcDirs += "${projectDir}/../interop-testing/src/main/java/"
setIncludes(["io/grpc/android/integrationtest/**",
"io/grpc/testing/integration/AbstractInteropTest.java",
"io/grpc/testing/integration/TestCases.java",
"io/grpc/testing/integration/TestServiceImpl.java",
"io/grpc/testing/integration/Util.java"])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public enum TestCases {
RPC_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on the same channel"),
CHANNEL_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on a new channel"),
ORCA_PER_RPC("report backend metrics per query"),
ORCA_OOB("report backend metrics out-of-band");
ORCA_OOB("report backend metrics out-of-band"),
MCS_CS("max concurrent streaming connection scaling");

private final String description;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.grpc.testing.integration;

import static com.google.common.truth.Truth.assertThat;
import static io.grpc.testing.integration.TestCases.MCS_CS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
Expand Down Expand Up @@ -70,6 +71,7 @@
import io.grpc.testing.integration.Messages.TestOrcaReport;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.Arrays;
Expand Down Expand Up @@ -563,7 +565,11 @@ private void runTest(TestCases testCase) throws Exception {
tester.testOrcaOob();
break;
}


case MCS_CS: {
tester.testMcs();
break;
}
default:
throw new IllegalArgumentException("Unknown test case: " + testCase);
}
Expand Down Expand Up @@ -596,9 +602,10 @@ private ClientInterceptor maybeCreateAdditionalMetadataInterceptor(
}

private class Tester extends AbstractInteropTest {

@Override
protected ManagedChannelBuilder<?> createChannelBuilder() {
boolean useGeneric = false;
boolean useGeneric = testCase.equals(MCS_CS.toString()) ? true : false;
ChannelCredentials channelCredentials;
if (customCredentialsType != null) {
useGeneric = true; // Retain old behavior; avoids erroring if incompatible
Expand Down Expand Up @@ -658,7 +665,17 @@ protected ManagedChannelBuilder<?> createChannelBuilder() {
if (serverHostOverride != null) {
channelBuilder.overrideAuthority(serverHostOverride);
}
if (serviceConfig != null) {
if (testCase.equals(MCS_CS.toString())) {
channelBuilder.disableServiceConfigLookUp();
try {
@SuppressWarnings("unchecked")
Map<String, ?> serviceConfigMap = (Map<String, ?>) JsonParser.parse(
"{\"connection_scaling\":{\"max_connections_per_subchannel\": 2}}");
channelBuilder.defaultServiceConfig(serviceConfigMap);
} catch (IOException e) {
throw new RuntimeException(e);
}
} else if (serviceConfig != null) {
channelBuilder.disableServiceConfigLookUp();
channelBuilder.defaultServiceConfig(serviceConfig);
}
Expand Down Expand Up @@ -979,31 +996,16 @@ public void testOrcaOob() throws Exception {
.build();

final int retryLimit = 5;
BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
final Object lastItem = new Object();
StreamingOutputCallResponseObserver streamingOutputCallResponseObserver =
new StreamingOutputCallResponseObserver();
StreamObserver<StreamingOutputCallRequest> streamObserver =
asyncStub.fullDuplexCall(new StreamObserver<StreamingOutputCallResponse>() {

@Override
public void onNext(StreamingOutputCallResponse value) {
queue.add(value);
}

@Override
public void onError(Throwable t) {
queue.add(t);
}

@Override
public void onCompleted() {
queue.add(lastItem);
}
});
asyncStub.fullDuplexCall(streamingOutputCallResponseObserver);

streamObserver.onNext(StreamingOutputCallRequest.newBuilder()
.setOrcaOobReport(answer)
.addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build());
assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class);
assertThat(streamingOutputCallResponseObserver.take())
.isInstanceOf(StreamingOutputCallResponse.class);
int i = 0;
for (; i < retryLimit; i++) {
Thread.sleep(1000);
Expand All @@ -1016,7 +1018,7 @@ public void onCompleted() {
streamObserver.onNext(StreamingOutputCallRequest.newBuilder()
.setOrcaOobReport(answer2)
.addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build());
assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class);
assertThat(streamingOutputCallResponseObserver.isCompleted).isTrue();

for (i = 0; i < retryLimit; i++) {
Thread.sleep(1000);
Expand All @@ -1026,8 +1028,6 @@ public void onCompleted() {
}
}
assertThat(i).isLessThan(retryLimit);
streamObserver.onCompleted();
assertThat(queue.take()).isSameInstanceAs(lastItem);
}

@Override
Expand All @@ -1054,6 +1054,78 @@ protected ServerBuilder<?> getHandshakerServerBuilder() {
protected int operationTimeoutMillis() {
return 15000;
}

class StreamingOutputCallResponseObserver implements
StreamObserver<StreamingOutputCallResponse> {
private final BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
private volatile boolean isCompleted = true;

@Override
public void onNext(StreamingOutputCallResponse value) {
queue.add(value);
}

@Override
public void onError(Throwable t) {
queue.add(t);
}

@Override
public void onCompleted() {
isCompleted = true;
}

Object take() throws InterruptedException {
return queue.take();
}
}

public void testMcs() throws Exception {
StreamingOutputCallResponseObserver responseObserver1 =
new StreamingOutputCallResponseObserver();
StreamObserver<StreamingOutputCallRequest> streamObserver1 =
asyncStub.fullDuplexCall(responseObserver1);
StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
.setPayload(Payload.newBuilder().setBody(
ByteString.copyFromUtf8(MCS_CS.description())).build()).build();
streamObserver1.onNext(request);
Object responseObj = responseObserver1.take();
StreamingOutputCallResponse callResponse = (StreamingOutputCallResponse) responseObj;
String clientSocketAddressInCall1 = new String(callResponse.getPayload().getBody()
.toByteArray(), UTF_8);
assertThat(clientSocketAddressInCall1).isNotEmpty();

StreamingOutputCallResponseObserver responseObserver2 =
new StreamingOutputCallResponseObserver();
StreamObserver<StreamingOutputCallRequest> streamObserver2 =
asyncStub.fullDuplexCall(responseObserver2);
streamObserver2.onNext(request);
callResponse = (StreamingOutputCallResponse) responseObserver2.take();
String clientSocketAddressInCall2 =
new String(callResponse.getPayload().getBody().toByteArray(), UTF_8);

assertThat(clientSocketAddressInCall1).isEqualTo(clientSocketAddressInCall2);

// The first connection is at max rpc call count of 2, so the 3rd rpc will cause a new
// connection to be created in the same subchannel and not get queued.
StreamingOutputCallResponseObserver responseObserver3 =
new StreamingOutputCallResponseObserver();
StreamObserver<StreamingOutputCallRequest> streamObserver3 =
asyncStub.fullDuplexCall(responseObserver3);
streamObserver3.onNext(request);
callResponse = (StreamingOutputCallResponse) responseObserver3.take();
String clientSocketAddressInCall3 =
new String(callResponse.getPayload().getBody().toByteArray(), UTF_8);

assertThat(clientSocketAddressInCall3).isNotEqualTo(clientSocketAddressInCall1);

streamObserver1.onCompleted();
assertThat(responseObserver1.isCompleted).isTrue();
streamObserver2.onCompleted();
assertThat(responseObserver2.isCompleted).isTrue();
streamObserver3.onCompleted();
assertThat(responseObserver3.isCompleted).isTrue();
}
}

private static String validTestCasesHelpText() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,20 @@

package io.grpc.testing.integration;

import static io.grpc.Grpc.TRANSPORT_ATTR_REMOTE_ADDR;
import static io.grpc.testing.integration.TestCases.MCS_CS;
import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.common.base.Preconditions;
import com.google.common.collect.Queues;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.protobuf.ByteString;
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
Expand All @@ -42,6 +49,7 @@
import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
import io.grpc.testing.integration.Messages.TestOrcaReport;
import io.grpc.testing.integration.TestServiceGrpc.AsyncService;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -61,8 +69,8 @@
* sent in response streams.
*/
public class TestServiceImpl implements io.grpc.BindableService, AsyncService {
static Context.Key<SocketAddress> PEER_ADDRESS_CONTEXT_KEY = Context.key("peer-address");
private final Random random = new Random();

private final ScheduledExecutorService executor;
private final ByteString compressableBuffer;
private final MetricRecorder metricRecorder;
Expand Down Expand Up @@ -235,6 +243,17 @@ public void onNext(StreamingOutputCallRequest request) {
.asRuntimeException());
return;
}
if (new String(request.getPayload().getBody().toByteArray(), UTF_8)
.equals(MCS_CS.description())) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please no. There's two regular approaches we use for tweaking the server's behavior:

  1. A field in the request proto (e.g., fill_username)
  2. Header metadata. I don't think we actually use this approach in the vanilla interop today, but we do do it for psm interop

(fill_username and the like are because of mistakes in the past where grpc-java verified the result message exactly. That means you can't add new fields to have them be ignored. We did want to verify the result was what we expected, but it would have probably been better to just verify the payload. But the behavior hasn't been changed, so we still need these knobs that enable result fields.)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added new fields in request and response for setting the client socket address.

SocketAddress peerAddress = PEER_ADDRESS_CONTEXT_KEY.get();
ByteString payload = ByteString.copyFromUtf8(peerAddress.toString());
StreamingOutputCallResponse.Builder responseBuilder =
StreamingOutputCallResponse.newBuilder();
responseBuilder.setPayload(
Payload.newBuilder()
.setBody(payload));
responseObserver.onNext(responseBuilder.build());
}
dispatcher.enqueue(toChunkQueue(request));
}

Expand Down Expand Up @@ -507,7 +526,8 @@ public static List<ServerInterceptor> interceptors() {
return Arrays.asList(
echoRequestHeadersInterceptor(Util.METADATA_KEY),
echoRequestMetadataInHeaders(Util.ECHO_INITIAL_METADATA_KEY),
echoRequestMetadataInTrailers(Util.ECHO_TRAILING_METADATA_KEY));
echoRequestMetadataInTrailers(Util.ECHO_TRAILING_METADATA_KEY),
new McsScalingTestcaseInterceptor());
}

/**
Expand Down Expand Up @@ -539,6 +559,22 @@ public void close(Status status, Metadata trailers) {
};
}

static class McsScalingTestcaseInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
SocketAddress peerAddress = call.getAttributes().get(TRANSPORT_ATTR_REMOTE_ADDR);

// Create a new context with the peer address value
Context newContext = Context.current().withValue(PEER_ADDRESS_CONTEXT_KEY, peerAddress);
try {
return Contexts.interceptCall(newContext, call, headers, next);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}

/**
* Echoes request headers with the specified key(s) from a client into response headers only.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public void run() {
private int port = 8080;
private boolean useTls = true;
private boolean useAlts = false;
private boolean useMcs = false;

private ScheduledExecutorService executor;
private Server server;
Expand Down Expand Up @@ -118,6 +119,9 @@ void parseArgs(String[] args) {
usage = true;
break;
}
} else if ("use_mcs".equals(key)) {
useMcs = Boolean.parseBoolean(value);
addressType = Util.AddressType.IPV4; // To use NettyServerBuilder
} else {
System.err.println("Unknown argument: " + key);
usage = true;
Expand Down Expand Up @@ -186,6 +190,9 @@ void start() throws Exception {
if (v4Address != null && !v4Address.equals(localV4Address)) {
((NettyServerBuilder) serverBuilder).addListenAddress(v4Address);
}
if (useMcs) {
((NettyServerBuilder) serverBuilder).maxConcurrentCallsPerConnection(2);
}
break;
case IPV6:
List<SocketAddress> v6Addresses = Util.getV6Addresses(port);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public void testCaseNamesShouldMapToEnums() {
"cancel_after_first_response",
"timeout_on_sleeping_server",
"orca_per_rpc",
"orca_oob"
"orca_oob",
"mcs_cs",
};

// additional test cases
Expand Down