diff --git a/api/src/testFixtures/java/io/grpc/testing/StatusSubject.java b/api/src/testFixtures/java/io/grpc/testing/StatusSubject.java new file mode 100644 index 00000000000..e1f5515d403 --- /dev/null +++ b/api/src/testFixtures/java/io/grpc/testing/StatusSubject.java @@ -0,0 +1,69 @@ +/* + * Copyright 2026 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.testing; + +import static com.google.common.truth.Fact.fact; + +import com.google.common.truth.FailureMetadata; +import com.google.common.truth.Subject; +import io.grpc.Status; +import javax.annotation.Nullable; + +/** Propositions for {@link Status} subjects. */ +public final class StatusSubject extends Subject { + + private static final Subject.Factory statusFactory = new Factory(); + + public static Subject.Factory status() { + return statusFactory; + } + + private final Status actual; + + private StatusSubject(FailureMetadata metadata, @Nullable Status subject) { + super(metadata, subject); + this.actual = subject; + } + + /** Fails if the subject is not OK. */ + public void isOk() { + if (actual == null) { + failWithActual("expected to be OK but was", "null"); + } else if (!actual.isOk()) { + failWithoutActual( + fact("expected to be OK but was", actual.getCode()), + fact("description", actual.getDescription()), + fact("cause", actual.getCause())); + } + } + + /** Fails if the subject does not have the given code. */ + public void hasCode(Status.Code expectedCode) { + if (actual == null) { + failWithActual("expected to have code " + expectedCode + " but was", "null"); + } else { + check("getCode()").that(actual.getCode()).isEqualTo(expectedCode); + } + } + + private static final class Factory implements Subject.Factory { + @Override + public StatusSubject createSubject(FailureMetadata metadata, @Nullable Status that) { + return new StatusSubject(metadata, that); + } + } +} diff --git a/binder/build.gradle b/binder/build.gradle index 0da3f97ceee..a67567309eb 100644 --- a/binder/build.gradle +++ b/binder/build.gradle @@ -54,6 +54,7 @@ dependencies { testImplementation project(':grpc-testing') testImplementation project(':grpc-inprocess') testImplementation testFixtures(project(':grpc-core')) + testImplementation testFixtures(project(':grpc-api')) androidTestAnnotationProcessor libraries.auto.value androidTestImplementation project(':grpc-testing') diff --git a/binder/src/main/java/io/grpc/binder/internal/BinderServer.java b/binder/src/main/java/io/grpc/binder/internal/BinderServer.java index 96685a2f8bd..f913775fcbe 100644 --- a/binder/src/main/java/io/grpc/binder/internal/BinderServer.java +++ b/binder/src/main/java/io/grpc/binder/internal/BinderServer.java @@ -70,6 +70,7 @@ public final class BinderServer implements InternalServer, LeakSafeOneWayBinder. private final LeakSafeOneWayBinder hostServiceBinder; private final BinderTransportSecurity.ServerPolicyChecker serverPolicyChecker; private final InboundParcelablePolicy inboundParcelablePolicy; + private final OneWayBinderProxy.Decorator clientBinderDecorator; @GuardedBy("this") private ServerListener listener; @@ -92,6 +93,7 @@ private BinderServer(Builder builder) { ImmutableList.copyOf(checkNotNull(builder.streamTracerFactories, "streamTracerFactories")); this.serverPolicyChecker = BinderInternal.createPolicyChecker(builder.serverSecurityPolicy); this.inboundParcelablePolicy = builder.inboundParcelablePolicy; + this.clientBinderDecorator = builder.clientBinderDecorator; hostServiceBinder = new LeakSafeOneWayBinder(this); } @@ -183,7 +185,7 @@ public synchronized boolean handleTransaction(int code, Parcel parcel) { executorServicePool, attrsBuilder.build(), streamTracerFactories, - OneWayBinderProxy.IDENTITY_DECORATOR, + clientBinderDecorator, callbackBinder); transport.start(listener.transportCreated(transport)); return true; @@ -225,6 +227,7 @@ public static class Builder { SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE); ServerSecurityPolicy serverSecurityPolicy = SecurityPolicies.serverInternalOnly(); InboundParcelablePolicy inboundParcelablePolicy = InboundParcelablePolicy.DEFAULT; + OneWayBinderProxy.Decorator clientBinderDecorator = OneWayBinderProxy.IDENTITY_DECORATOR; public BinderServer build() { return new BinderServer(this); @@ -295,5 +298,19 @@ public Builder setInboundParcelablePolicy(InboundParcelablePolicy inboundParcela checkNotNull(inboundParcelablePolicy, "inboundParcelablePolicy"); return this; } + + /** + * Sets the {@link OneWayBinderProxy.Decorator} to be applied to this server's "client Binders". + * + *

Tests can use this to capture post-setup transactions from server to client. The specified + * decorator will be applied every time a client connects. The decorated result will be used for + * all subsequent transactions to this client from the new ServerTransport. + * + *

Optional, {@link OneWayBinderProxy#IDENTITY_DECORATOR} is the default. + */ + public Builder setClientBinderDecorator(OneWayBinderProxy.Decorator clientBinderDecorator) { + this.clientBinderDecorator = checkNotNull(clientBinderDecorator); + return this; + } } } diff --git a/binder/src/main/java/io/grpc/binder/internal/BlockPool.java b/binder/src/main/java/io/grpc/binder/internal/BlockPool.java index 3c58abdd80b..985e465ab4b 100644 --- a/binder/src/main/java/io/grpc/binder/internal/BlockPool.java +++ b/binder/src/main/java/io/grpc/binder/internal/BlockPool.java @@ -40,7 +40,7 @@ final class BlockPool { * The size of each standard block. (Currently 16k) The block size must be at least as large as * the maximum header list size. */ - private static final int BLOCK_SIZE = Math.max(16 * 1024, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE); + static final int BLOCK_SIZE = Math.max(16 * 1024, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE); /** * Maximum number of blocks to keep around. (Max 128k). This limit is a judgement call. 128k is diff --git a/binder/src/test/java/io/grpc/binder/internal/RobolectricBinderTransportTest.java b/binder/src/test/java/io/grpc/binder/internal/RobolectricBinderTransportTest.java index 8282f5e1025..aed55fd1926 100644 --- a/binder/src/test/java/io/grpc/binder/internal/RobolectricBinderTransportTest.java +++ b/binder/src/test/java/io/grpc/binder/internal/RobolectricBinderTransportTest.java @@ -24,6 +24,7 @@ import static io.grpc.binder.internal.BinderTransport.SETUP_TRANSPORT; import static io.grpc.binder.internal.BinderTransport.SHUTDOWN_TRANSPORT; import static io.grpc.binder.internal.BinderTransport.WIRE_FORMAT_VERSION; +import static io.grpc.testing.StatusSubject.status; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.junit.Assume.assumeTrue; import static org.mockito.ArgumentMatchers.any; @@ -47,15 +48,20 @@ import com.google.common.collect.ImmutableList; import com.google.common.truth.TruthJUnit; import io.grpc.Attributes; +import io.grpc.CallOptions; import io.grpc.InternalChannelz.SocketStats; +import io.grpc.Metadata; import io.grpc.ServerStreamTracer; import io.grpc.Status; import io.grpc.binder.AndroidComponentAddress; import io.grpc.binder.ApiConstants; import io.grpc.binder.AsyncSecurityPolicy; import io.grpc.binder.SecurityPolicies; +import io.grpc.binder.internal.OneWayBinderProxies.*; import io.grpc.binder.internal.SettableAsyncSecurityPolicy.AuthRequest; import io.grpc.internal.AbstractTransportTest; +import io.grpc.internal.ClientStream; +import io.grpc.internal.ClientStreamListenerBase; import io.grpc.internal.ClientTransport; import io.grpc.internal.ClientTransportFactory.ClientTransportOptions; import io.grpc.internal.ConnectionClientTransport; @@ -66,7 +72,9 @@ import io.grpc.internal.MockServerTransportListener; import io.grpc.internal.ObjectPool; import io.grpc.internal.SharedResourcePool; +import java.io.InputStream; import java.util.List; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import org.junit.Before; @@ -124,6 +132,8 @@ public final class RobolectricBinderTransportTest extends AbstractTransportTest ServiceInfo serviceInfo; private int nextServerAddress; + private BlockingBinderDecorator blockingDecorator = + new BlockingBinderDecorator<>(); @Parameter(value = 0) public boolean preAuthServersParam; @@ -167,27 +177,34 @@ public void requestRealisticBindServiceBehavior() { shadowOf(application).setUnbindServiceCallsOnServiceDisconnected(false); } - @Override - protected InternalServer newServer(List streamTracerFactories) { + BinderServer.Builder newServerBuilder() { AndroidComponentAddress listenAddr = AndroidComponentAddress.forBindIntent( new Intent() .setClassName(serviceInfo.packageName, serviceInfo.name) .setAction("io.grpc.action.BIND." + nextServerAddress++)); - BinderServer binderServer = - new BinderServer.Builder() - .setListenAddress(listenAddr) - .setExecutorPool(serverExecutorPool) - .setExecutorServicePool(executorServicePool) - .setStreamTracerFactories(streamTracerFactories) - .build(); + return new BinderServer.Builder() + .setListenAddress(listenAddr) + .setExecutorPool(serverExecutorPool) + .setExecutorServicePool(executorServicePool) + .setStreamTracerFactories(List.of()); + } + void registerServerWithRobolectric(BinderServer server) { + AndroidComponentAddress listenAddr = (AndroidComponentAddress) server.getListenSocketAddress(); shadowOf(application.getPackageManager()).addServiceIfNotPresent(listenAddr.getComponent()); shadowOf(application) .setComponentNameAndServiceForBindServiceForIntent( - listenAddr.asBindIntent(), listenAddr.getComponent(), binderServer.getHostBinder()); - return binderServer; + listenAddr.asBindIntent(), listenAddr.getComponent(), server.getHostBinder()); + } + + @Override + protected InternalServer newServer(List streamTracerFactories) { + BinderServer server = + newServerBuilder().setStreamTracerFactories(streamTracerFactories).build(); + registerServerWithRobolectric(server); + return server; } @Override @@ -433,4 +450,248 @@ public void flowControlPushBack() {} @Ignore("See BinderTransportTest#serverAlreadyListening") @Override public void serverAlreadyListening() {} + + @Test + public void singleTxnMsgsDeliveredToServerOutOfOrder() throws Exception { + server.start(serverListener); + client = + newClientTransportBuilder() + .setFactory( + newClientTransportFactoryBuilder() + .setBinderDecorator(blockingDecorator) + .buildClientTransportFactory()) + .build(); + runIfNotNull(client.start(mockClientTransportListener)); + blockingDecorator.putNextResult(takeNextBinder(blockingDecorator)); // Endpoint binder. + QueueingOneWayBinderProxy queueingServerProxy = + new QueueingOneWayBinderProxy(takeNextBinder(blockingDecorator)); // Server binder. + blockingDecorator.putNextResult(queueingServerProxy); + + verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady(); + + ClientStream stream = + client.newStream(methodDescriptor, new Metadata(), CallOptions.DEFAULT, noopTracers); + ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); + stream.start(clientStreamListener); + stream.writeMessage(methodDescriptor.streamRequest("one")); + stream.writeMessage(methodDescriptor.streamRequest("two")); + stream.halfClose(); + + // Expect one transaction for headers, one for each message, and one for half-close. + QueueingOneWayBinderProxy.Transaction txHeaders = takeNextTransaction(queueingServerProxy); + QueueingOneWayBinderProxy.Transaction tx1 = takeNextTransaction(queueingServerProxy); + QueueingOneWayBinderProxy.Transaction tx2 = takeNextTransaction(queueingServerProxy); + QueueingOneWayBinderProxy.Transaction txHalfClose = takeNextTransaction(queueingServerProxy); + + // Deliver messages out of order! + queueingServerProxy.deliver(txHeaders); + queueingServerProxy.deliver(tx2); + queueingServerProxy.deliver(tx1); + queueingServerProxy.deliver(txHalfClose); + + MockServerTransportListener serverTransportListener = + serverListener.takeListenerOrFail(TIMEOUT_MS, MILLISECONDS); + MockServerTransportListener.StreamCreation serverStreamCreation = + serverTransportListener.takeStreamOrFail(TIMEOUT_MS, MILLISECONDS); + serverStreamCreation.stream.request(2); + + // Expect the server to deliver the messages in the order they were originally sent. + InputStream msg1 = takeNextMessage(serverStreamCreation.listener.messageQueue); + assertThat(methodDescriptor.parseResponse(msg1)).isEqualTo("one"); + + InputStream msg2 = takeNextMessage(serverStreamCreation.listener.messageQueue); + assertThat(methodDescriptor.parseResponse(msg2)).isEqualTo("two"); + + assertThat(serverStreamCreation.listener.awaitHalfClosed(TIMEOUT_MS, MILLISECONDS)).isTrue(); + serverStreamCreation.stream.close(Status.OK, new Metadata()); + + assertAbout(status()).that(clientStreamListener.awaitClose(TIMEOUT_MS, MILLISECONDS)).isOk(); + assertAbout(status()) + .that(serverStreamCreation.listener.awaitClose(TIMEOUT_MS, MILLISECONDS)) + .isOk(); + } + + @Test + public void msgFragmentsDeliveredToServerOutOfOrder() throws Exception { + server.start(serverListener); + client = + newClientTransportBuilder() + .setFactory( + newClientTransportFactoryBuilder() + .setBinderDecorator(blockingDecorator) + .buildClientTransportFactory()) + .build(); + runIfNotNull(client.start(mockClientTransportListener)); + blockingDecorator.putNextResult(takeNextBinder(blockingDecorator)); // Endpoint binder. + QueueingOneWayBinderProxy queueingServerProxy = + new QueueingOneWayBinderProxy(takeNextBinder(blockingDecorator)); // Server binder. + blockingDecorator.putNextResult(queueingServerProxy); + + verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady(); + + ClientStream stream = + client.newStream(methodDescriptor, new Metadata(), CallOptions.DEFAULT, noopTracers); + ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); + stream.start(clientStreamListener); + + String largeMessage = newStringOfLength(BlockPool.BLOCK_SIZE + 1); + stream.writeMessage(methodDescriptor.streamRequest(largeMessage)); + stream.halfClose(); + + // Expect the client to split largeMessage into two transactions, plus headers and half-close. + QueueingOneWayBinderProxy.Transaction txHeaders = takeNextTransaction(queueingServerProxy); + QueueingOneWayBinderProxy.Transaction tx1 = takeNextTransaction(queueingServerProxy); + QueueingOneWayBinderProxy.Transaction tx2 = takeNextTransaction(queueingServerProxy); + QueueingOneWayBinderProxy.Transaction txHalfClose = takeNextTransaction(queueingServerProxy); + + // Deliver fragments out of order! + queueingServerProxy.deliver(txHeaders); + queueingServerProxy.deliver(tx2); + queueingServerProxy.deliver(tx1); + queueingServerProxy.deliver(txHalfClose); + + // Verify that the server reassembles the transactions correctly. + MockServerTransportListener serverTransportListener = + serverListener.takeListenerOrFail(TIMEOUT_MS, MILLISECONDS); + MockServerTransportListener.StreamCreation serverStreamCreation = + serverTransportListener.takeStreamOrFail(TIMEOUT_MS, MILLISECONDS); + serverStreamCreation.stream.request(1); + InputStream msg = takeNextMessage(serverStreamCreation.listener.messageQueue); + assertThat(methodDescriptor.parseResponse(msg)).isEqualTo(largeMessage); + + assertThat(serverStreamCreation.listener.awaitHalfClosed(TIMEOUT_MS, MILLISECONDS)).isTrue(); + serverStreamCreation.stream.close(Status.OK, new Metadata()); + + assertAbout(status()).that(clientStreamListener.awaitClose(TIMEOUT_MS, MILLISECONDS)).isOk(); + assertAbout(status()) + .that(serverStreamCreation.listener.awaitClose(TIMEOUT_MS, MILLISECONDS)) + .isOk(); + } + + @Test + public void singleTxnMsgsDeliveredToClientOutOfOrder() throws Exception { + server = newServerBuilder().setClientBinderDecorator(blockingDecorator).build(); + registerServerWithRobolectric((BinderServer) server); + server.start(serverListener); + + client = newClientTransport(server); + runIfNotNull(client.start(mockClientTransportListener)); + + QueueingOneWayBinderProxy queueingClientProxy = + new QueueingOneWayBinderProxy(takeNextBinder(blockingDecorator)); + blockingDecorator.putNextResult(queueingClientProxy); + + // Deliver the setup transaction without interference. + queueingClientProxy.deliver(takeNextTransaction(queueingClientProxy)); + verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady(); + + ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); + ClientStream stream = + client.newStream(methodDescriptor, new Metadata(), CallOptions.DEFAULT, noopTracers); + stream.start(clientStreamListener); + stream.halfClose(); + stream.request(2); + + MockServerTransportListener serverTransportListener = + serverListener.takeListenerOrFail(TIMEOUT_MS, MILLISECONDS); + MockServerTransportListener.StreamCreation serverStreamCreation = + serverTransportListener.takeStreamOrFail(TIMEOUT_MS, MILLISECONDS); + + serverStreamCreation.stream.writeMessage(methodDescriptor.streamResponse("one")); + serverStreamCreation.stream.writeMessage(methodDescriptor.streamResponse("two")); + serverStreamCreation.stream.close(Status.OK, new Metadata()); + + // Expect one transaction from the server for each message. + QueueingOneWayBinderProxy.Transaction tx1 = takeNextTransaction(queueingClientProxy); + QueueingOneWayBinderProxy.Transaction tx2 = takeNextTransaction(queueingClientProxy); + QueueingOneWayBinderProxy.Transaction txClose = takeNextTransaction(queueingClientProxy); + + // Deliver messages to the client out of order! + queueingClientProxy.deliver(tx2); + queueingClientProxy.deliver(tx1); + queueingClientProxy.deliver(txClose); + + // Client should deliver messages to the application in the order sent. + InputStream msg1 = takeNextMessage(clientStreamListener.messageQueue); + assertThat(methodDescriptor.parseResponse(msg1)).isEqualTo("one"); + InputStream msg2 = takeNextMessage(clientStreamListener.messageQueue); + assertThat(methodDescriptor.parseResponse(msg2)).isEqualTo("two"); + + assertAbout(status()).that(clientStreamListener.awaitClose(TIMEOUT_MS, MILLISECONDS)).isOk(); + assertAbout(status()) + .that(serverStreamCreation.listener.awaitClose(TIMEOUT_MS, MILLISECONDS)) + .isOk(); + } + + @Test + public void msgFragmentsDeliveredToClientOutOfOrder() throws Exception { + server = newServerBuilder().setClientBinderDecorator(blockingDecorator).build(); + registerServerWithRobolectric((BinderServer) server); + server.start(serverListener); + + client = newClientTransport(server); + runIfNotNull(client.start(mockClientTransportListener)); + + QueueingOneWayBinderProxy queueingClientProxy = + new QueueingOneWayBinderProxy(takeNextBinder(blockingDecorator)); + blockingDecorator.putNextResult(queueingClientProxy); + + // Deliver the setup transaction without interference. + queueingClientProxy.deliver(takeNextTransaction(queueingClientProxy)); + verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady(); + + ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); + ClientStream stream = + client.newStream(methodDescriptor, new Metadata(), CallOptions.DEFAULT, noopTracers); + stream.start(clientStreamListener); + stream.request(1); + + MockServerTransportListener serverTransportListener = + serverListener.takeListenerOrFail(TIMEOUT_MS, MILLISECONDS); + MockServerTransportListener.StreamCreation serverStreamCreation = + serverTransportListener.takeStreamOrFail(TIMEOUT_MS, MILLISECONDS); + + String largeMessage = newStringOfLength(BlockPool.BLOCK_SIZE + 1); + serverStreamCreation.stream.writeMessage(methodDescriptor.streamResponse(largeMessage)); + serverStreamCreation.stream.flush(); + + // Expect the client to split largeMessage into two transactions. + QueueingOneWayBinderProxy.Transaction tx1 = takeNextTransaction(queueingClientProxy); + QueueingOneWayBinderProxy.Transaction tx2 = takeNextTransaction(queueingClientProxy); + + // Deliver them to the client out of order! + queueingClientProxy.deliver(tx2); + queueingClientProxy.deliver(tx1); + + // Client should reassemble the message correctly. + InputStream msg = takeNextMessage(clientStreamListener.messageQueue); + assertThat(methodDescriptor.parseResponse(msg)).isEqualTo(largeMessage); + } + + private static OneWayBinderProxy takeNextBinder( + BlockingBinderDecorator decorator) throws InterruptedException { + OneWayBinderProxy proxy = decorator.takeNextRequest(TIMEOUT_MS, MILLISECONDS); + assertThat(proxy).isNotNull(); + return proxy; + } + + private static QueueingOneWayBinderProxy.Transaction takeNextTransaction( + QueueingOneWayBinderProxy proxy) throws InterruptedException { + QueueingOneWayBinderProxy.Transaction tx = proxy.pollNextTransaction(TIMEOUT_MS, MILLISECONDS); + assertThat(tx).isNotNull(); + return tx; + } + + private static InputStream takeNextMessage(BlockingQueue messageQueue) + throws InterruptedException { + InputStream msg = messageQueue.poll(TIMEOUT_MS, MILLISECONDS); + assertThat(msg).isNotNull(); + return msg; + } + + private static String newStringOfLength(int numChars) { + char[] chars = new char[numChars]; + java.util.Arrays.fill(chars, 'x'); + return new String(chars); + } } diff --git a/binder/src/androidTest/java/io/grpc/binder/internal/OneWayBinderProxies.java b/binder/src/testFixtures/java/io/grpc/binder/internal/OneWayBinderProxies.java similarity index 67% rename from binder/src/androidTest/java/io/grpc/binder/internal/OneWayBinderProxies.java rename to binder/src/testFixtures/java/io/grpc/binder/internal/OneWayBinderProxies.java index 4abdb2c03dd..c7eee06e73a 100644 --- a/binder/src/androidTest/java/io/grpc/binder/internal/OneWayBinderProxies.java +++ b/binder/src/testFixtures/java/io/grpc/binder/internal/OneWayBinderProxies.java @@ -18,6 +18,7 @@ import android.os.RemoteException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; /** A collection of {@link OneWayBinderProxy}-related test helpers. */ @@ -42,6 +43,18 @@ public OneWayBinderProxy takeNextRequest() throws InterruptedException { return requests.take(); } + /** + * Returns the next {@link OneWayBinderProxy} that needs decorating, blocking for up to the + * specified timeout if it hasn't yet been provided to {@link #decorate}. + * + *

Follow this with a call to {@link #putNextResult(OneWayBinderProxy)} to provide the result + * of {@link #decorate} and unblock the waiting caller. + */ + public OneWayBinderProxy takeNextRequest(long timeout, TimeUnit unit) + throws InterruptedException { + return requests.poll(timeout, unit); + } + /** Provides the next value to return from {@link #decorate}. */ public void putNextResult(T next) throws InterruptedException { results.put(next); @@ -119,6 +132,49 @@ public void transact(int code, ParcelHolder data) throws RemoteException { } } + /** A {@link OneWayBinderProxy} that queues transactions for a test to deliver manually later. */ + public static final class QueueingOneWayBinderProxy extends OneWayBinderProxy { + public static final class Transaction { + public final int code; + private final ParcelHolder parcel; + + public Transaction(int code, ParcelHolder parcel) { + this.code = code; + this.parcel = parcel; + } + } + + private final BlockingQueue queue = new LinkedBlockingQueue<>(); + private final OneWayBinderProxy wrapped; + + public QueueingOneWayBinderProxy(OneWayBinderProxy wrapped) { + super(wrapped.getDelegate()); + this.wrapped = wrapped; + } + + @Override + public void transact(int code, ParcelHolder data) throws RemoteException { + queue.add(new Transaction(code, new ParcelHolder(data.release()))); + } + + /** + * Returns the next transaction that was queued in order, waiting up to the specified timeout. + */ + public Transaction pollNextTransaction(long timeout, TimeUnit unit) + throws InterruptedException { + return queue.poll(timeout, unit); + } + + /** + * Delivers a previously queued transaction to its original destination. + * + * @throws IllegalStateException if transaction was already delivered once before + */ + public void deliver(Transaction transaction) throws RemoteException { + wrapped.transact(transaction.code, transaction.parcel); + } + } + // Cannot be instantiated. private OneWayBinderProxies() {} ; diff --git a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java index 5d6b88a1392..61ab4850650 100644 --- a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java +++ b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java @@ -185,7 +185,7 @@ public void log(ChannelLogLevel level, String messageFormat, Object... args) {} protected final ClientStreamTracer[] tracers = new ClientStreamTracer[] { clientStreamTracer1, clientStreamTracer2 }; - private final ClientStreamTracer[] noopTracers = new ClientStreamTracer[] { + protected final ClientStreamTracer[] noopTracers = new ClientStreamTracer[] { new ClientStreamTracer() {} }; @@ -2195,7 +2195,7 @@ public void streamCreated(Attributes transportAttrs, Metadata metadata) { } } - private static class StringMarshaller implements MethodDescriptor.Marshaller { + protected static class StringMarshaller implements MethodDescriptor.Marshaller { public static final StringMarshaller INSTANCE = new StringMarshaller(); @Override