diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java index de7ddd282989..16da6fda3850 100644 --- a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java +++ b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java @@ -117,7 +117,12 @@ public enum ProtocolBitmaskFeature { /** * Client supports SQL_UPDATE_COUNTERS_2 error extension (single binary value instead of array). */ - SQL_UPDATE_COUNTERS_2(18); + SQL_UPDATE_COUNTERS_2(18), + + /** + * Allow rolling back direct transactions using the first request id. + */ + TX_ROLLBACK_USING_FIRST_REQUEST(19); private static final EnumSet ALL_FEATURES_AS_ENUM_SET = EnumSet.allOf(ProtocolBitmaskFeature.class); diff --git a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java index b87b84c8ef14..ea38273ca24d 100644 --- a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java +++ b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java @@ -565,6 +565,7 @@ public void testServerReturnsAllItsFeatures() throws IOException { expected.set(16); expected.set(17); expected.set(18); + expected.set(19); assertEquals(expected, supportedFeatures); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java index b7e6a357fc6d..52259fe94924 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java @@ -103,7 +103,8 @@ public class ClientHandlerModule implements IgniteComponent, PlatformComputeTran ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES, ProtocolBitmaskFeature.SQL_PARTITION_AWARENESS_TABLE_NAME, ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_DISCARD, - ProtocolBitmaskFeature.SQL_UPDATE_COUNTERS_2 + ProtocolBitmaskFeature.SQL_UPDATE_COUNTERS_2, + ProtocolBitmaskFeature.TX_ROLLBACK_USING_FIRST_REQUEST )); /** Connection id generator. diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java index 895872994ea9..85c3bd86a2e1 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java @@ -29,6 +29,7 @@ import static org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING; import static org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES; import static org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_PIGGYBACK; +import static org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_ROLLBACK_USING_FIRST_REQUEST; import static org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP; import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; @@ -96,6 +97,7 @@ import org.apache.ignite.client.handler.requests.table.ClientSchemasGetRequest; import org.apache.ignite.client.handler.requests.table.ClientStreamerBatchSendRequest; import org.apache.ignite.client.handler.requests.table.ClientStreamerWithReceiverBatchSendRequest; +import org.apache.ignite.client.handler.requests.table.ClientTableCommon; import org.apache.ignite.client.handler.requests.table.ClientTableGetQualifiedRequest; import org.apache.ignite.client.handler.requests.table.ClientTableGetRequest; import org.apache.ignite.client.handler.requests.table.ClientTablePartitionPrimaryReplicasGetRequest; @@ -190,6 +192,7 @@ import org.apache.ignite.security.exception.InvalidCredentialsException; import org.apache.ignite.security.exception.UnsupportedAuthenticationTypeException; import org.apache.ignite.sql.SqlBatchException; +import org.apache.ignite.table.IgniteTables; import org.apache.ignite.tx.RetriableTransactionException; import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; @@ -292,6 +295,24 @@ public class ClientInboundMessageHandler private final HandshakeEventLoopSwitcher handshakeEventLoopSwitcher; + /** + * Tracks mappings between the first {@code requestId} and the {@code resourceId} + * holding the Tx object for directly mapped transactions. The process is not localized. + * + *

Mappings are created: + *

+ * + *

Mappings are removed: + *

+ */ + private final Map firstReqToTxResMap = new ConcurrentHashMap<>(); + /** * Constructor. * @@ -557,6 +578,7 @@ private void handshakeSuccess( actualFeatures.clear(TX_DELAYED_ACKS.featureId()); actualFeatures.clear(TX_PIGGYBACK.featureId()); actualFeatures.clear(TX_ALLOW_NOOP_ENLIST.featureId()); + actualFeatures.clear(TX_ROLLBACK_USING_FIRST_REQUEST.featureId()); actualFeatures.clear(SQL_DIRECT_TX_MAPPING.featureId()); } else { @@ -945,70 +967,72 @@ private CompletableFuture processOperation( return ClientTableGetRequest.process(in, igniteTables); case ClientOp.TUPLE_UPSERT: - return ClientTupleUpsertRequest.process( - in, igniteTables, resources, txManager, clockService, notificationSender(requestId), tsTracker); + return ClientTupleUpsertRequest.process(in, igniteTables, resources, txManager, clockService, notificationSender(requestId), + tsTracker, requestId, firstReqToTxResMap); case ClientOp.TUPLE_GET: - return ClientTupleGetRequest.process(in, igniteTables, resources, txManager, clockService, tsTracker); + return ClientTupleGetRequest.process(in, igniteTables, resources, txManager, clockService, tsTracker, requestId, + firstReqToTxResMap); case ClientOp.TUPLE_UPSERT_ALL: return ClientTupleUpsertAllRequest.process(in, igniteTables, resources, txManager, clockService, - notificationSender(requestId), tsTracker); + notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap); case ClientOp.TUPLE_GET_ALL: return ClientTupleGetAllRequest.process(in, igniteTables, resources, txManager, clockService, tsTracker, - clientContext.hasFeature(TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS)); + requestId, firstReqToTxResMap, clientContext.hasFeature(TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS)); case ClientOp.TUPLE_GET_AND_UPSERT: return ClientTupleGetAndUpsertRequest.process(in, igniteTables, resources, txManager, clockService, - notificationSender(requestId), tsTracker); + notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap); case ClientOp.TUPLE_INSERT: return ClientTupleInsertRequest.process(in, igniteTables, resources, txManager, clockService, - notificationSender(requestId), tsTracker); + notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap); case ClientOp.TUPLE_INSERT_ALL: return ClientTupleInsertAllRequest.process(in, igniteTables, resources, txManager, clockService, - notificationSender(requestId), tsTracker); + notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap); case ClientOp.TUPLE_REPLACE: return ClientTupleReplaceRequest.process(in, igniteTables, resources, txManager, clockService, - notificationSender(requestId), tsTracker); + notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap); case ClientOp.TUPLE_REPLACE_EXACT: return ClientTupleReplaceExactRequest.process(in, igniteTables, resources, txManager, clockService, - notificationSender(requestId), tsTracker); + notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap); case ClientOp.TUPLE_GET_AND_REPLACE: return ClientTupleGetAndReplaceRequest.process(in, igniteTables, resources, txManager, clockService, - notificationSender(requestId), tsTracker); + notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap); case ClientOp.TUPLE_DELETE: return ClientTupleDeleteRequest.process(in, igniteTables, resources, txManager, clockService, - notificationSender(requestId), tsTracker); + notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap); case ClientOp.TUPLE_DELETE_ALL: return ClientTupleDeleteAllRequest.process(in, igniteTables, resources, txManager, clockService, - notificationSender(requestId), tsTracker); + notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap); case ClientOp.TUPLE_DELETE_EXACT: return ClientTupleDeleteExactRequest.process(in, igniteTables, resources, txManager, clockService, - notificationSender(requestId), tsTracker); + notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap); case ClientOp.TUPLE_DELETE_ALL_EXACT: return ClientTupleDeleteAllExactRequest.process(in, igniteTables, resources, txManager, clockService, - notificationSender(requestId), tsTracker); + notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap); case ClientOp.TUPLE_GET_AND_DELETE: return ClientTupleGetAndDeleteRequest.process(in, igniteTables, resources, txManager, clockService, - notificationSender(requestId), tsTracker); + notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap); case ClientOp.TUPLE_CONTAINS_KEY: - return ClientTupleContainsKeyRequest.process(in, igniteTables, resources, txManager, clockService, tsTracker); + return ClientTupleContainsKeyRequest.process(in, igniteTables, resources, txManager, clockService, tsTracker, requestId, + firstReqToTxResMap); case ClientOp.TUPLE_CONTAINS_ALL_KEYS: return ClientTupleContainsAllKeysRequest.process(in, igniteTables, resources, txManager, clockService, tsTracker, - clientContext.hasFeature(TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS)); + requestId, firstReqToTxResMap, clientContext.hasFeature(TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS)); case ClientOp.JDBC_CONNECT: return ClientJdbcConnectRequest.execute(in, jdbcQueryEventHandler, resolveCurrentUsername()); @@ -1057,7 +1081,7 @@ private CompletableFuture processOperation( clientContext.hasFeature(TX_PIGGYBACK), clientContext.hasFeature(TX_DIRECT_MAPPING_SEND_REMOTE_WRITES), tsTracker); case ClientOp.TX_ROLLBACK: - return ClientTransactionRollbackRequest.process(in, resources, metrics, igniteTables, + return ClientTransactionRollbackRequest.process(in, resources, metrics, igniteTables, firstReqToTxResMap, clientContext.hasFeature(TX_PIGGYBACK), clientContext.hasFeature(TX_DIRECT_MAPPING_SEND_REMOTE_WRITES)); case ClientOp.COMPUTE_EXECUTE: @@ -1112,6 +1136,7 @@ private CompletableFuture processOperation( igniteTables, clockService, notificationSender(requestId), + firstReqToTxResMap, resolveCurrentUsername(), clientContext.hasFeature(SQL_MULTISTATEMENT_SUPPORT), clientContext.hasFeature(SQL_PARTITION_AWARENESS_TABLE_NAME), @@ -1143,12 +1168,12 @@ partitionOperationsExecutor, in, queryProcessor, requestId, cancelHandles, tsTra case ClientOp.SQL_QUERY_META: return ClientSqlQueryMetadataRequest.process( - partitionOperationsExecutor, in, queryProcessor, resources, tsTracker + partitionOperationsExecutor, in, queryProcessor, resources, tsTracker, requestId, firstReqToTxResMap ); case ClientOp.SQL_EXEC_BATCH: return ClientSqlExecuteBatchRequest.process( - in, queryProcessor, resources, requestId, cancelHandles, tsTracker, + in, queryProcessor, resources, requestId, cancelHandles, tsTracker, firstReqToTxResMap, resolveCurrentUsername() ); @@ -1218,6 +1243,7 @@ private void processOperationInternal( if (err != null) { writeError(requestId, opCode, (Throwable) err, ctx, false, guard); metrics.requestsFailedIncrement(); + firstReqToTxResMap.remove(requestId); return; } diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java index 413fa317c4f9..6032ad96ade0 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java @@ -47,6 +47,7 @@ public class ClientSqlExecuteBatchRequest { * @param cancelHandleMap Registry of handlers. Request must register itself in this registry before switching to another * thread. * @param username Authenticated user name. + * @param reqToTxMap Tracker for first request of direct transactions. * @return Future representing result of operation. */ public static CompletableFuture process( @@ -56,6 +57,7 @@ public static CompletableFuture process( long requestId, Map cancelHandleMap, HybridTimestampTracker tsTracker, + Map reqToTxMap, String username ) { CancelHandle cancelHandle = CancelHandle.create(); @@ -68,7 +70,9 @@ public static CompletableFuture process( null, null, null, - null + null, + requestId, + reqToTxMap ); ClientSqlProperties props = new ClientSqlProperties(in, false); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java index cbcd37de96b2..01048bfd28bd 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java @@ -79,6 +79,7 @@ public class ClientSqlExecuteRequest { * transaction. * @param notificationSender Notification sender is required to send acknowledge for underlying write operation within a remote * transaction. + * @param reqToTxMap Tracker for first request of direct transactions. * @param username Authenticated user name or {@code null} for unknown user. * @return Future representing result of operation. */ @@ -97,6 +98,7 @@ public static CompletableFuture process( IgniteTables tables, ClockService clockService, NotificationSender notificationSender, + Map reqToTxMap, @Nullable String username, boolean sqlMultistatementsSupported, boolean sqlPartitionAwarenessQualifiedNameSupported, @@ -118,7 +120,9 @@ public static CompletableFuture process( txManager, tables, notificationSender, - resIdHolder + resIdHolder, + requestId, + reqToTxMap ); ClientSqlProperties props = new ClientSqlProperties(in, sqlMultistatementsSupported); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java index 49db22c22bb8..9a08d3630a96 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java @@ -19,6 +19,7 @@ import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import org.apache.ignite.client.handler.ClientResourceRegistry; @@ -42,6 +43,8 @@ public class ClientSqlQueryMetadataRequest { * @param in Unpacker. * @param processor SQL API. * @param resources Resources. + * @param requestId Id of the request. + * @param reqToTxMap Tracker for first request of direct transactions. * @return Future representing result of operation. */ public static CompletableFuture process( @@ -49,9 +52,11 @@ public static CompletableFuture process( ClientMessageUnpacker in, QueryProcessor processor, ClientResourceRegistry resources, - HybridTimestampTracker tsTracker + HybridTimestampTracker tsTracker, + long requestId, + Map reqToTxMap ) { - CompletableFuture txFut = readTx(in, tsTracker, resources, null, null, null, null); + CompletableFuture txFut = readTx(in, tsTracker, resources, null, null, null, null, requestId, reqToTxMap); String schema = in.unpackString(); String query = in.unpackString(); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java index 3a26a7afc553..e40fcce319d9 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java @@ -30,6 +30,7 @@ import java.util.BitSet; import java.util.Collection; import java.util.EnumSet; +import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; import org.apache.ignite.client.handler.ClientResource; @@ -421,6 +422,8 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) { * @param txManager Tx manager. * @param notificationSender Notification sender. * @param resourceIdHolder Resource id holder. + * @param requestId Id of the request. + * @param reqToTxMap Tracker for first request of direct transactions. * @return Transaction, if present, or null. */ public static CompletableFuture<@Nullable InternalTransaction> readTx( @@ -430,7 +433,9 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) { @Nullable TxManager txManager, @Nullable IgniteTables tables, @Nullable NotificationSender notificationSender, - long[] resourceIdHolder + long[] resourceIdHolder, + long requestId, + Map reqToTxMap ) { return readTx( in, @@ -440,6 +445,8 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) { tables, notificationSender, resourceIdHolder, + requestId, + reqToTxMap, EnumSet.noneOf(RequestOptions.class) ); } @@ -453,6 +460,8 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) { * @param txManager Tx manager. * @param notificationSender Notification sender. * @param resourceIdHolder Resource id holder. + * @param requestId Id of the request. + * @param reqToTxMap Tracker for first request of direct transactions. * @param options Request options. Defines how a request is processed. * @return Transaction, if present, or null. */ @@ -464,6 +473,8 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) { @Nullable IgniteTables tables, @Nullable NotificationSender notificationSender, long[] resourceIdHolder, + long requestId, + Map reqToTxMap, EnumSet options ) { if (in.tryUnpackNil()) { @@ -505,11 +516,18 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) { }); InternalTxOptions txOptions = builder.build(); - var tx = startExplicitTx(tsUpdater, txManager, HybridTimestamp.nullableHybridTimestamp(observableTs), readOnly, txOptions); + var tx = new DirectTransactionWithFirstRequest( + startExplicitTx(tsUpdater, txManager, HybridTimestamp.nullableHybridTimestamp(observableTs), readOnly, txOptions), + reqToTxMap, + requestId + ); // Attach resource id only on first direct request. resourceIdHolder[0] = resources.put(new ClientResource(tx, tx::rollbackAsync)); + // Record the mapping between first request and resourceId. + reqToTxMap.put(requestId, resourceIdHolder[0]); + return completedFuture(tx); } else if (id == TX_ID_DIRECT) { assert txManager != null : "Transaction manager must be specified to process directly mapped requests."; @@ -589,9 +607,11 @@ static CompletableFuture readOrStartImplicitTx( IgniteTables tables, EnumSet options, @Nullable NotificationSender notificationSender, - long[] resourceIdHolder + long[] resourceIdHolder, + long requestId, + Map reqToTxMap ) { - return readTx(in, readTs, resources, txManager, tables, notificationSender, resourceIdHolder, options) + return readTx(in, readTs, resources, txManager, tables, notificationSender, resourceIdHolder, requestId, reqToTxMap, options) .thenApply(tx -> { if (tx == null) { // Implicit transactions do not use an observation timestamp because RW never depends on it, diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsAllKeysRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsAllKeysRequest.java index ba79df0406d9..facb35070773 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsAllKeysRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsAllKeysRequest.java @@ -23,6 +23,7 @@ import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY; import java.util.EnumSet; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.ignite.client.handler.ClientResourceRegistry; import org.apache.ignite.client.handler.ResponseWriter; @@ -46,6 +47,8 @@ public class ClientTupleContainsAllKeysRequest { * @param txManager Transaction manager. * @param clockService Clock service. * @param tsTracker Tracker. + * @param requestId Id of the request. + * @param reqToTxMap Tracker for first request of direct transactions. * @param supportsOptions {@code True} if supports tx options. * @return Future. */ @@ -56,11 +59,13 @@ public static CompletableFuture process( TxManager txManager, ClockService clockService, HybridTimestampTracker tsTracker, + long requestId, + Map reqToTxMap, boolean supportsOptions ) { EnumSet options = supportsOptions ? of(KEY_ONLY, HAS_OPTIONS) : of(KEY_ONLY); - return ClientTuplesRequestBase.readAsync(in, tables, resources, txManager, null, tsTracker, options) + return ClientTuplesRequestBase.readAsync(in, tables, resources, txManager, null, tsTracker, options, requestId, reqToTxMap) .thenCompose(req -> req.table().recordView().containsAllAsync(req.tx(), req.tuples()) .thenApply(containsAll -> out -> { writeTxMeta(out, tsTracker, clockService, req); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java index 721f3d02cfa9..a901b274bf72 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java @@ -21,7 +21,9 @@ import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta; import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY; import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.READ_ONLY; +import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.ignite.client.handler.ClientResourceRegistry; import org.apache.ignite.client.handler.ResponseWriter; @@ -42,6 +44,8 @@ public class ClientTupleContainsKeyRequest { * @param tables Ignite tables. * @param resources Resource registry. * @param txManager Transaction manager. + * @param requestId Id of the request. + * @param reqToTxMap Tracker for first request of direct transactions. * @return Future. */ public static CompletableFuture process( @@ -50,9 +54,11 @@ public static CompletableFuture process( ClientResourceRegistry resources, TxManager txManager, ClockService clockService, - HybridTimestampTracker tsTracker + HybridTimestampTracker tsTracker, + long requestId, + Map reqToTxMap ) { - return ClientTupleRequestBase.readAsync(in, tables, resources, txManager, null, tsTracker, of(READ_ONLY, KEY_ONLY)) + return readAsync(in, tables, resources, txManager, null, tsTracker, of(READ_ONLY, KEY_ONLY), requestId, reqToTxMap) .thenCompose(req -> req.table().recordView().containsAsync(req.tx(), req.tuple()) .thenApply(res -> out -> { writeTxMeta(out, tsTracker, clockService, req); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java index e89d35f410cb..f22263d3bf84 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java @@ -22,6 +22,7 @@ import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta; import static org.apache.ignite.client.handler.requests.table.ClientTuplesRequestBase.readAsync; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.ignite.client.handler.ClientResourceRegistry; import org.apache.ignite.client.handler.NotificationSender; @@ -44,6 +45,8 @@ public class ClientTupleDeleteAllExactRequest { * @param tables Ignite tables. * @param resources Resource registry. * @param txManager Ignite transactions. + * @param requestId Id of the request. + * @param reqToTxMap Tracker for first request of direct transactions. * @return Future. */ public static CompletableFuture process( @@ -53,13 +56,26 @@ public static CompletableFuture process( TxManager txManager, ClockService clockService, NotificationSender notificationSender, - HybridTimestampTracker tsTracker + HybridTimestampTracker tsTracker, + long requestId, + Map reqToTxMap ) { - return readAsync(in, tables, resources, txManager, notificationSender, tsTracker, noneOf(RequestOptions.class)) + return readAsync( + in, + tables, + resources, + txManager, + notificationSender, + tsTracker, + noneOf(RequestOptions.class), + requestId, + reqToTxMap + ) .thenCompose(req -> req.table().recordView().deleteAllExactAsync(req.tx(), req.tuples()) .thenApply(skippedTuples -> out -> { writeTxMeta(out, tsTracker, clockService, req); writeTuples(out, skippedTuples, req.table().schemaView()); - })); + }) + ); } } diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java index 48afbf9a32db..93165ba6ccdc 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java @@ -21,7 +21,9 @@ import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuples; import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta; import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY; +import static org.apache.ignite.client.handler.requests.table.ClientTuplesRequestBase.readAsync; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.ignite.client.handler.ClientResourceRegistry; import org.apache.ignite.client.handler.NotificationSender; @@ -44,6 +46,8 @@ public class ClientTupleDeleteAllRequest { * @param tables Ignite tables. * @param resources Resource registry. * @param txManager Ignite transactions. + * @param requestId Id of the request. + * @param reqToTxMap Tracker for first request of direct transactions. * @return Future. */ public static CompletableFuture process( @@ -53,9 +57,11 @@ public static CompletableFuture process( TxManager txManager, ClockService clockService, NotificationSender notificationSender, - HybridTimestampTracker tsTracker + HybridTimestampTracker tsTracker, + long requestId, + Map reqToTxMap ) { - return ClientTuplesRequestBase.readAsync(in, tables, resources, txManager, notificationSender, tsTracker, of(KEY_ONLY)) + return readAsync(in, tables, resources, txManager, notificationSender, tsTracker, of(KEY_ONLY), requestId, reqToTxMap) .thenCompose(req -> req.table().recordView().deleteAllAsync(req.tx(), req.tuples()) .thenApply(skippedTuples -> out -> { writeTxMeta(out, tsTracker, clockService, req); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java index ad0c0640d99d..89b38cf317a5 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java @@ -21,6 +21,7 @@ import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta; import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.ignite.client.handler.ClientResourceRegistry; import org.apache.ignite.client.handler.NotificationSender; @@ -43,6 +44,8 @@ public class ClientTupleDeleteExactRequest { * @param tables Ignite tables. * @param resources Resource registry. * @param txManager Ignite transactions. + * @param requestId Id of the request. + * @param reqToTxMap Tracker for first request of direct transactions. * @return Future. */ public static CompletableFuture process( @@ -52,14 +55,27 @@ public static CompletableFuture process( TxManager txManager, ClockService clockService, NotificationSender notificationSender, - HybridTimestampTracker tsTracker + HybridTimestampTracker tsTracker, + long requestId, + Map reqToTxMap ) { - return readAsync(in, tables, resources, txManager, notificationSender, tsTracker, noneOf(RequestOptions.class)) + return readAsync( + in, + tables, + resources, + txManager, + notificationSender, + tsTracker, + noneOf(RequestOptions.class), + requestId, + reqToTxMap + ) .thenCompose(req -> req.table().recordView().deleteExactAsync(req.tx(), req.tuple()) - .thenApply(res -> out -> { - writeTxMeta(out, tsTracker, clockService, req); - out.packInt(req.table().schemaView().lastKnownSchemaVersion()); - out.packBoolean(res); - })); + .thenApply(res -> out -> { + writeTxMeta(out, tsTracker, clockService, req); + out.packInt(req.table().schemaView().lastKnownSchemaVersion()); + out.packBoolean(res); + }) + ); } } diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java index adce4d98d984..c1ec25adf6aa 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java @@ -20,7 +20,9 @@ import static java.util.EnumSet.of; import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta; import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY; +import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.ignite.client.handler.ClientResourceRegistry; import org.apache.ignite.client.handler.NotificationSender; @@ -42,6 +44,8 @@ public class ClientTupleDeleteRequest { * @param tables Ignite tables. * @param resources Resource registry. * @param txManager Ignite transactions. + * @param requestId Id of the request. + * @param reqToTxMap Tracker for first request of direct transactions. * @return Future. */ public static CompletableFuture process( @@ -51,9 +55,11 @@ public static CompletableFuture process( TxManager txManager, ClockService clockService, NotificationSender notificationSender, - HybridTimestampTracker tsTracker + HybridTimestampTracker tsTracker, + long requestId, + Map reqToTxMap ) { - return ClientTupleRequestBase.readAsync(in, tables, resources, txManager, notificationSender, tsTracker, of(KEY_ONLY)) + return readAsync(in, tables, resources, txManager, notificationSender, tsTracker, of(KEY_ONLY), requestId, reqToTxMap) .thenCompose(req -> req.table().recordView().deleteAsync(req.tx(), req.tuple()) .thenApply(res -> out -> { writeTxMeta(out, tsTracker, clockService, req); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java index b777792b5030..33602db49585 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java @@ -24,6 +24,7 @@ import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY; import java.util.EnumSet; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.ignite.client.handler.ClientResourceRegistry; import org.apache.ignite.client.handler.ResponseWriter; @@ -48,6 +49,8 @@ public class ClientTupleGetAllRequest { * @param txManager Transaction manager. * @param clockService Clock service. * @param tsTracker Tracker. + * @param requestId Id of the request. + * @param reqToTxMap Tracker for first request of direct transactions. * @param supportsOptions {@code True} if supports tx options. * @return Future. */ @@ -58,11 +61,13 @@ public static CompletableFuture process( TxManager txManager, ClockService clockService, HybridTimestampTracker tsTracker, + long requestId, + Map reqToTxMap, boolean supportsOptions ) { EnumSet options = supportsOptions ? of(KEY_ONLY, HAS_OPTIONS) : of(KEY_ONLY); - return ClientTuplesRequestBase.readAsync(in, tables, resources, txManager, null, tsTracker, options) + return ClientTuplesRequestBase.readAsync(in, tables, resources, txManager, null, tsTracker, options, requestId, reqToTxMap) .thenCompose(req -> { return req.table().recordView().getAllAsync(req.tx(), req.tuples()) .thenApply(resTuples -> out -> { diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java index d53c30e79b18..8e0597eb3905 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java @@ -20,7 +20,9 @@ import static java.util.EnumSet.of; import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta; import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY; +import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.ignite.client.handler.ClientResourceRegistry; import org.apache.ignite.client.handler.NotificationSender; @@ -43,6 +45,8 @@ public class ClientTupleGetAndDeleteRequest { * @param tables Ignite tables. * @param resources Resource registry. * @param txManager Ignite transactions. + * @param requestId Id of the request. + * @param reqToTxMap Tracker for first request of direct transactions. * @return Future. */ public static CompletableFuture process( @@ -52,9 +56,11 @@ public static CompletableFuture process( TxManager txManager, ClockService clockService, NotificationSender notificationSender, - HybridTimestampTracker tsTracker + HybridTimestampTracker tsTracker, + long requestId, + Map reqToTxMap ) { - return ClientTupleRequestBase.readAsync(in, tables, resources, txManager, notificationSender, tsTracker, of(KEY_ONLY)) + return readAsync(in, tables, resources, txManager, notificationSender, tsTracker, of(KEY_ONLY), requestId, reqToTxMap) .thenCompose(req -> req.table().recordView().getAndDeleteAsync(req.tx(), req.tuple()) .thenApply(resTuple -> out -> { writeTxMeta(out, tsTracker, clockService, req); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java index d1b99440d76e..1361d79a67a8 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java @@ -21,6 +21,7 @@ import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta; import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.ignite.client.handler.ClientResourceRegistry; import org.apache.ignite.client.handler.NotificationSender; @@ -44,6 +45,8 @@ public class ClientTupleGetAndReplaceRequest { * @param tables Ignite tables. * @param resources Resource registry. * @param txManager Ignite transactions. + * @param requestId Id of the request. + * @param reqToTxMap Tracker for first request of direct transactions. * @return Future. */ public static CompletableFuture process( @@ -53,13 +56,26 @@ public static CompletableFuture process( TxManager txManager, ClockService clockService, NotificationSender notificationSender, - HybridTimestampTracker tsTracker + HybridTimestampTracker tsTracker, + long requestId, + Map reqToTxMap ) { - return readAsync(in, tables, resources, txManager, notificationSender, tsTracker, noneOf(RequestOptions.class)) + return readAsync( + in, + tables, + resources, + txManager, + notificationSender, + tsTracker, + noneOf(RequestOptions.class), + requestId, + reqToTxMap + ) .thenCompose(req -> req.table().recordView().getAndReplaceAsync(req.tx(), req.tuple()) .thenApply(resTuple -> out -> { writeTxMeta(out, tsTracker, clockService, req); ClientTableCommon.writeTupleOrNil(out, resTuple, TuplePart.KEY_AND_VAL, req.table().schemaView()); - })); + }) + ); } } diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java index cef414d79176..51d45033f6f3 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java @@ -21,6 +21,7 @@ import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta; import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.ignite.client.handler.ClientResourceRegistry; import org.apache.ignite.client.handler.NotificationSender; @@ -44,6 +45,8 @@ public class ClientTupleGetAndUpsertRequest { * @param tables Ignite tables. * @param resources Resource registry. * @param txManager Ignite transactions. + * @param requestId Id of the request. + * @param reqToTxMap Tracker for first request of direct transactions. * @return Future. */ public static CompletableFuture process( @@ -53,13 +56,25 @@ public static CompletableFuture process( TxManager txManager, ClockService clockService, NotificationSender notificationSender, - HybridTimestampTracker tsTracker + HybridTimestampTracker tsTracker, + long requestId, + Map reqToTxMap ) { - return readAsync(in, tables, resources, txManager, notificationSender, tsTracker, noneOf(RequestOptions.class)) + return readAsync(in, + tables, + resources, + txManager, + notificationSender, + tsTracker, + noneOf(RequestOptions.class), + requestId, + reqToTxMap + ) .thenCompose(req -> req.table().recordView().getAndUpsertAsync(req.tx(), req.tuple()) .thenApply(resTuple -> out -> { writeTxMeta(out, tsTracker, clockService, req); ClientTableCommon.writeTupleOrNil(out, resTuple, TuplePart.KEY_AND_VAL, req.table().schemaView()); - })); + }) + ); } } diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java index f53333c25f24..7ce868f1de1e 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java @@ -21,7 +21,9 @@ import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta; import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY; import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.READ_ONLY; +import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.ignite.client.handler.ClientResourceRegistry; import org.apache.ignite.client.handler.ResponseWriter; @@ -43,6 +45,8 @@ public class ClientTupleGetRequest { * @param tables Ignite tables. * @param resources Resource registry. * @param clockService Clock service. + * @param requestId Id of the request. + * @param reqToTxMap Tracker for first request of direct transactions. * @return Future. */ public static CompletableFuture process( @@ -51,9 +55,11 @@ public static CompletableFuture process( ClientResourceRegistry resources, TxManager txManager, ClockService clockService, - HybridTimestampTracker tsTracker + HybridTimestampTracker tsTracker, + long requestId, + Map reqToTxMap ) { - return ClientTupleRequestBase.readAsync(in, tables, resources, txManager, null, tsTracker, of(READ_ONLY, KEY_ONLY)) + return readAsync(in, tables, resources, txManager, null, tsTracker, of(READ_ONLY, KEY_ONLY), requestId, reqToTxMap) .thenCompose(req -> req.table().recordView().getAsync(req.tx(), req.tuple()) .thenApply(res -> out -> { writeTxMeta(out, tsTracker, clockService, req); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java index e750152ecd7d..0a0c8ec466ce 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java @@ -22,6 +22,7 @@ import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta; import static org.apache.ignite.client.handler.requests.table.ClientTuplesRequestBase.readAsync; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.ignite.client.handler.ClientResourceRegistry; import org.apache.ignite.client.handler.NotificationSender; @@ -46,6 +47,8 @@ public class ClientTupleInsertAllRequest { * @param txManager Ignite transactions. * @param clockService Clock service. * @param notificationSender Notification sender. + * @param requestId Id of the request. + * @param reqToTxMap Tracker for first request of direct transactions. * @return Future. */ public static CompletableFuture process( @@ -55,13 +58,25 @@ public static CompletableFuture process( TxManager txManager, ClockService clockService, NotificationSender notificationSender, - HybridTimestampTracker tsTracker + HybridTimestampTracker tsTracker, + long requestId, + Map reqToTxMap ) { - return readAsync(in, tables, resources, txManager, notificationSender, tsTracker, noneOf(RequestOptions.class)) + return readAsync(in, + tables, + resources, + txManager, + notificationSender, + tsTracker, + noneOf(RequestOptions.class), + requestId, + reqToTxMap + ) .thenCompose(req -> req.table().recordView().insertAllAsync(req.tx(), req.tuples()) .thenApply(skippedTuples -> out -> { writeTxMeta(out, tsTracker, clockService, req); writeTuples(out, skippedTuples, req.table().schemaView()); - })); + }) + ); } } diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java index 566f735b5943..480292dc6ac9 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java @@ -21,6 +21,7 @@ import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta; import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.ignite.client.handler.ClientResourceRegistry; import org.apache.ignite.client.handler.NotificationSender; @@ -45,6 +46,8 @@ public class ClientTupleInsertRequest { * @param txManager Ignite transactions. * @param clockService Clock service. * @param notificationSender Notification sender. + * @param requestId Id of the request. + * @param reqToTxMap Tracker for first request of direct transactions. * @return Future. */ public static CompletableFuture process( @@ -54,14 +57,26 @@ public static CompletableFuture process( TxManager txManager, ClockService clockService, NotificationSender notificationSender, - HybridTimestampTracker tsTracker + HybridTimestampTracker tsTracker, + long requestId, + Map reqToTxMap ) { - return readAsync(in, tables, resources, txManager, notificationSender, tsTracker, noneOf(RequestOptions.class)) + return readAsync(in, + tables, + resources, + txManager, + notificationSender, + tsTracker, + noneOf(RequestOptions.class), + requestId, + reqToTxMap + ) .thenCompose(req -> req.table().recordView().insertAsync(req.tx(), req.tuple()) .thenApply(res -> out -> { writeTxMeta(out, tsTracker, clockService, req); out.packInt(req.table().schemaView().lastKnownSchemaVersion()); out.packBoolean(res); - })); + }) + ); } } diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java index 1d3813335262..8717c2c297ac 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java @@ -20,7 +20,9 @@ import static java.util.EnumSet.of; import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta; import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.READ_SECOND_TUPLE; +import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.ignite.client.handler.ClientResourceRegistry; import org.apache.ignite.client.handler.NotificationSender; @@ -42,6 +44,8 @@ public class ClientTupleReplaceExactRequest { * @param tables Ignite tables. * @param resources Resource registry. * @param txManager Ignite transactions. + * @param requestId Id of the request. + * @param reqToTxMap Tracker for first request of direct transactions. * @return Future. */ public static CompletableFuture process( @@ -51,9 +55,11 @@ public static CompletableFuture process( TxManager txManager, ClockService clockService, NotificationSender notificationSender, - HybridTimestampTracker tsTracker + HybridTimestampTracker tsTracker, + long requestId, + Map reqToTxMap ) { - return ClientTupleRequestBase.readAsync(in, tables, resources, txManager, notificationSender, tsTracker, of(READ_SECOND_TUPLE)) + return readAsync(in, tables, resources, txManager, notificationSender, tsTracker, of(READ_SECOND_TUPLE), requestId, reqToTxMap) .thenCompose(req -> req.table().recordView().replaceExactAsync(req.tx(), req.tuple(), req.tuple2()) .thenApply(res -> out -> { writeTxMeta(out, tsTracker, clockService, req); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java index 361793d7c1e4..13092510d24d 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java @@ -21,6 +21,7 @@ import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta; import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.ignite.client.handler.ClientResourceRegistry; import org.apache.ignite.client.handler.NotificationSender; @@ -43,6 +44,8 @@ public class ClientTupleReplaceRequest { * @param tables Ignite tables. * @param resources Resource registry. * @param txManager Ignite transactions. + * @param requestId Id of the request. + * @param reqToTxMap Tracker for first request of direct transactions. * @return Future. */ public static CompletableFuture process( @@ -52,14 +55,27 @@ public static CompletableFuture process( TxManager txManager, ClockService clockService, NotificationSender notificationSender, - HybridTimestampTracker tsTracker + HybridTimestampTracker tsTracker, + long requestId, + Map reqToTxMap ) { - return readAsync(in, tables, resources, txManager, notificationSender, tsTracker, noneOf(RequestOptions.class)) + return readAsync( + in, + tables, + resources, + txManager, + notificationSender, + tsTracker, + noneOf(RequestOptions.class), + requestId, + reqToTxMap + ) .thenCompose(req -> req.table().recordView().replaceAsync(req.tx(), req.tuple()) .thenApply(res -> out -> { writeTxMeta(out, tsTracker, clockService, req); out.packInt(req.table().schemaView().lastKnownSchemaVersion()); out.packBoolean(res); - })); + }) + ); } } diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java index eefed432cd8e..1b5b74a4a658 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java @@ -25,6 +25,7 @@ import java.util.BitSet; import java.util.EnumSet; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.ignite.client.handler.ClientResourceRegistry; import org.apache.ignite.client.handler.NotificationSender; @@ -91,7 +92,9 @@ public static CompletableFuture readAsync( TxManager txManager, @Nullable NotificationSender notificationSender, HybridTimestampTracker tsTracker, - EnumSet options + EnumSet options, + long requestId, + Map reqToTxMap ) { int tableId = in.unpackInt(); @@ -105,7 +108,9 @@ public static CompletableFuture readAsync( tables, options, notificationSender, - resIdHolder + resIdHolder, + requestId, + reqToTxMap ); int schemaId = in.unpackInt(); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java index f9cc36b449f7..792c3f1bddeb 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java @@ -21,6 +21,7 @@ import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta; import static org.apache.ignite.client.handler.requests.table.ClientTuplesRequestBase.readAsync; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.ignite.client.handler.ClientResourceRegistry; import org.apache.ignite.client.handler.NotificationSender; @@ -43,6 +44,8 @@ public class ClientTupleUpsertAllRequest { * @param tables Ignite tables. * @param resources Resource registry. * @param txManager Ignite transactions. + * @param requestId Id of the request. + * @param reqToTxMap Tracker for first request of direct transactions. * @return Future. */ public static CompletableFuture process( @@ -52,13 +55,26 @@ public static CompletableFuture process( TxManager txManager, ClockService clockService, NotificationSender notificationSender, - HybridTimestampTracker tsTracker + HybridTimestampTracker tsTracker, + long requestId, + Map reqToTxMap ) { - return readAsync(in, tables, resources, txManager, notificationSender, tsTracker, noneOf(RequestOptions.class)) + return readAsync( + in, + tables, + resources, + txManager, + notificationSender, + tsTracker, + noneOf(RequestOptions.class), + requestId, + reqToTxMap + ) .thenCompose(req -> req.table().recordView().upsertAllAsync(req.tx(), req.tuples()) .thenApply(v -> out -> { writeTxMeta(out, tsTracker, clockService, req); out.packInt(req.table().schemaView().lastKnownSchemaVersion()); - })); + }) + ); } } diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java index b80cbc2d405d..34decea9ad23 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java @@ -21,6 +21,7 @@ import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta; import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.ignite.client.handler.ClientResourceRegistry; import org.apache.ignite.client.handler.NotificationSender; @@ -36,6 +37,7 @@ * Client tuple upsert request. */ public class ClientTupleUpsertRequest { + /** * Processes the request. * @@ -45,6 +47,8 @@ public class ClientTupleUpsertRequest { * @param txManager Ignite transactions. * @param clockService Clock service. * @param notificationSender Notification sender. + * @param requestId Id of the request. + * @param reqToTxMap Tracker for first request of direct transactions. * @return Future. */ public static CompletableFuture process( @@ -54,13 +58,26 @@ public static CompletableFuture process( TxManager txManager, ClockService clockService, NotificationSender notificationSender, - HybridTimestampTracker tsTracker + HybridTimestampTracker tsTracker, + long requestId, + Map reqToTxMap ) { - return readAsync(in, tables, resources, txManager, notificationSender, tsTracker, noneOf(RequestOptions.class)) + return readAsync( + in, + tables, + resources, + txManager, + notificationSender, + tsTracker, + noneOf(RequestOptions.class), + requestId, + reqToTxMap + ) .thenCompose(req -> req.table().recordView().upsertAsync(req.tx(), req.tuple()) .thenApply(v -> out -> { writeTxMeta(out, tsTracker, clockService, req); out.packInt(req.table().schemaView().lastKnownSchemaVersion()); - })); + }) + ); } } diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java index c0dbcb6d0d69..cfb610121de4 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java @@ -26,6 +26,7 @@ import java.util.BitSet; import java.util.EnumSet; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.ignite.client.handler.ClientResourceRegistry; import org.apache.ignite.client.handler.NotificationSender; @@ -75,14 +76,26 @@ public static CompletableFuture readAsync( TxManager txManager, @Nullable NotificationSender notificationSender, HybridTimestampTracker tsTracker, - EnumSet options + EnumSet options, + long requestId, + Map reqToTxMap ) { int tableId = in.unpackInt(); long[] resIdHolder = {0}; - CompletableFuture txFut = - readOrStartImplicitTx(in, tsTracker, resources, txManager, tables, options, notificationSender, resIdHolder); + CompletableFuture txFut = readOrStartImplicitTx( + in, + tsTracker, + resources, + txManager, + tables, + options, + notificationSender, + resIdHolder, + requestId, + reqToTxMap + ); int schemaId = in.unpackInt(); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/DirectTransactionWithFirstRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/DirectTransactionWithFirstRequest.java new file mode 100644 index 000000000000..0f2f4ffcba67 --- /dev/null +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/DirectTransactionWithFirstRequest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.client.handler.requests.table; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.replicator.ZonePartitionId; +import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment; +import org.apache.ignite.internal.tx.TxState; +import org.apache.ignite.internal.wrapper.Wrapper; +import org.apache.ignite.tx.TransactionException; +import org.jetbrains.annotations.Nullable; + +class DirectTransactionWithFirstRequest implements InternalTransaction, Wrapper { + private final InternalTransaction base; + + // We could also just accept a lambda. + private final Map reqToTxMap; + + private final long firstReqId; + + DirectTransactionWithFirstRequest(InternalTransaction base, Map reqToTxMap, long firstReqId) { + this.base = base; + this.reqToTxMap = reqToTxMap; + this.firstReqId = firstReqId; + } + + @Override + public UUID id() { + return base.id(); + } + + @Override + public PendingTxPartitionEnlistment enlistedPartition(ZonePartitionId replicationGroupId) { + return base.enlistedPartition(replicationGroupId); + } + + @Override + public TxState state() { + return base.state(); + } + + @Override + public boolean assignCommitPartition(ZonePartitionId commitPartitionId) { + return base.assignCommitPartition(commitPartitionId); + } + + @Override + public ZonePartitionId commitPartition() { + return base.commitPartition(); + } + + @Override + public void enlist(ZonePartitionId replicationGroupId, int tableId, String primaryNodeConsistentId, long consistencyToken) { + base.enlist(replicationGroupId, tableId, primaryNodeConsistentId, consistencyToken); + } + + @Override + public @Nullable HybridTimestamp readTimestamp() { + return base.readTimestamp(); + } + + @Override + public HybridTimestamp schemaTimestamp() { + return base.schemaTimestamp(); + } + + @Override + public UUID coordinatorId() { + return base.coordinatorId(); + } + + @Override + public boolean implicit() { + return base.implicit(); + } + + @Override + public boolean remote() { + return base.remote(); + } + + @Override + public boolean remoteOnCoordinator() { + return base.remoteOnCoordinator(); + } + + @Override + public CompletableFuture finish(boolean commit, @Nullable HybridTimestamp executionTimestamp, boolean full, + @Nullable Throwable finishReason) { + return base.finish(commit, executionTimestamp, full, finishReason).whenComplete((v, err) -> removeMapping()); + } + + @Override + public boolean isFinishingOrFinished() { + return base.isFinishingOrFinished(); + } + + @Override + public long getTimeout() { + return base.getTimeout(); + } + + @Override + public CompletableFuture kill() { + return base.kill().whenComplete((v, err) -> removeMapping()); + } + + @Override + public CompletableFuture rollbackWithExceptionAsync(Throwable throwable) { + return base.rollbackWithExceptionAsync(throwable).whenComplete((v, err) -> removeMapping()); + } + + @Override + public boolean isRolledBackWithTimeoutExceeded() { + return base.isRolledBackWithTimeoutExceeded(); + } + + @Override + public void processDelayedAck(Object val, @Nullable Throwable err) { + base.processDelayedAck(val, err); + } + + @Override + public void commit() throws TransactionException { + try { + base.commit(); + } finally { + removeMapping(); + } + } + + @Override + public CompletableFuture commitAsync() { + return base.commitAsync().whenComplete((v, err) -> removeMapping()); + } + + @Override + public void rollback() throws TransactionException { + try { + base.rollback(); + } finally { + removeMapping(); + } + } + + @Override + public CompletableFuture rollbackAsync() { + return base.rollbackAsync().whenComplete((v, err) -> removeMapping()); + } + + @Override + public boolean isReadOnly() { + return base.isReadOnly(); + } + + public InternalTransaction base() { + return base; + } + + @Override + public T unwrap(Class classToUnwrap) { + return (T) base; + } + + private void removeMapping() { + reqToTxMap.remove(firstReqId); + } +} diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java index df377d133aeb..d2d267032567 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment; import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl; import org.apache.ignite.internal.util.ExceptionUtils; +import org.apache.ignite.internal.wrapper.Wrappers; import org.apache.ignite.tx.TransactionException; /** @@ -89,7 +90,7 @@ public static CompletableFuture process( // Update causality. Used to assign commit timestamp after all enlistments. clockService.updateClock(HybridTimestamp.hybridTimestamp(causality)); - ReadWriteTransactionImpl tx0 = (ReadWriteTransactionImpl) tx; + ReadWriteTransactionImpl tx0 = Wrappers.unwrap(tx, ReadWriteTransactionImpl.class); // Enforce cleanup. tx0.noRemoteWrites(sendRemoteWritesFlag && in.unpackBoolean()); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java index c6492dfa99ab..9e90e9598e68 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java @@ -19,6 +19,7 @@ import static org.apache.ignite.client.handler.requests.tx.ClientTransactionCommitRequest.merge; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.ignite.client.handler.ClientHandlerMetricSource; import org.apache.ignite.client.handler.ClientResourceRegistry; @@ -29,6 +30,9 @@ import org.apache.ignite.internal.table.TableViewInternal; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl; +import org.apache.ignite.internal.wrapper.Wrappers; +import org.apache.ignite.lang.ErrorGroups.Client; +import org.apache.ignite.lang.IgniteException; /** * Client transaction rollback request. @@ -41,6 +45,7 @@ public class ClientTransactionRollbackRequest { * @param resources Resources. * @param metrics Metrics. * @param igniteTables Tables facade. + * @param reqToTxMap Tracker for first request of direct transactions. * @param enableDirectMapping Enable direct mapping. * @param sendRemoteWritesFlag Send remote writes flag. * @return Future. @@ -50,37 +55,56 @@ public static CompletableFuture process( ClientResourceRegistry resources, ClientHandlerMetricSource metrics, IgniteTablesInternal igniteTables, + Map reqToTxMap, boolean enableDirectMapping, boolean sendRemoteWritesFlag ) throws IgniteInternalCheckedException { long resourceId = in.unpackLong(); - InternalTransaction tx = resources.remove(resourceId).get(InternalTransaction.class); + InternalTransaction tx; - if (enableDirectMapping && !tx.isReadOnly()) { - // Attempt to merge server and client transactions. - int cnt = in.unpackInt(); // Number of direct enlistments. - for (int i = 0; i < cnt; i++) { - int tableId = in.unpackInt(); - int partId = in.unpackInt(); - String consistentId = in.unpackString(); - long token = in.unpackLong(); + if (!enableDirectMapping) { + tx = resources.remove(resourceId).get(InternalTransaction.class); + } else if (resourceId < 0) { + // Direct mapping was enabled, but the user does not know the resourceId, so he sent the first req id. + long reqId = -resourceId; + var actualResourceId = reqToTxMap.get(reqId); - TableViewInternal table = igniteTables.cachedTable(tableId); + // Is it ok to reuse this error?? + if (actualResourceId == null) { + throw new IgniteException(Client.RESOURCE_NOT_FOUND_ERR, "Failed to find resource from requestId: " + reqId); + } + + tx = resources.remove(actualResourceId).get(InternalTransaction.class); + // Will not remove right away from reqToTxMap, it will be remove automatically on rollback. + } else { + tx = resources.remove(resourceId).get(InternalTransaction.class); + + if (!tx.isReadOnly()) { + // Attempt to merge server and client transactions. + int cnt = in.unpackInt(); // Number of direct enlistments. + for (int i = 0; i < cnt; i++) { + int tableId = in.unpackInt(); + int partId = in.unpackInt(); + String consistentId = in.unpackString(); + long token = in.unpackLong(); - if (table != null) { - merge(table.internalTable(), partId, consistentId, token, tx, false); + TableViewInternal table = igniteTables.cachedTable(tableId); + + if (table != null) { + merge(table.internalTable(), partId, consistentId, token, tx, false); + } } - } - if (cnt > 0) { - in.unpackLong(); // Unpack causality. + if (cnt > 0) { + in.unpackLong(); // Unpack causality. - ReadWriteTransactionImpl tx0 = (ReadWriteTransactionImpl) tx; + ReadWriteTransactionImpl tx0 = Wrappers.unwrap(tx, ReadWriteTransactionImpl.class); - // Enforce cleanup. - tx0.noRemoteWrites(sendRemoteWritesFlag && in.unpackBoolean()); + // Enforce cleanup. + tx0.noRemoteWrites(sendRemoteWritesFlag && in.unpackBoolean()); + } } } diff --git a/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientTransactionsTest.java b/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientTransactionsTest.java index aedee88476be..1b53f2bc8428 100644 --- a/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientTransactionsTest.java +++ b/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientTransactionsTest.java @@ -21,11 +21,14 @@ import static java.util.Collections.emptyList; import static java.util.Comparator.comparing; import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn; import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.startsWith; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -58,6 +61,7 @@ import java.util.stream.Stream; import org.apache.ignite.Ignite; import org.apache.ignite.client.IgniteClient; +import org.apache.ignite.client.handler.ClientInboundMessageHandler; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.client.sql.ClientSql; import org.apache.ignite.internal.client.sql.PartitionMappingProvider; @@ -68,6 +72,8 @@ import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.tx.Lock; import org.apache.ignite.internal.tx.TxState; +import org.apache.ignite.internal.tx.impl.DeadlockPreventionPolicyImpl.TxIdComparators; +import org.apache.ignite.internal.tx.impl.TxManagerImpl; import org.apache.ignite.internal.util.CollectionUtils; import org.apache.ignite.lang.ErrorGroups; import org.apache.ignite.lang.ErrorGroups.Transactions; @@ -87,8 +93,9 @@ import org.apache.ignite.tx.TransactionException; import org.apache.ignite.tx.TransactionOptions; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIf; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -1397,34 +1404,166 @@ public void testRollbackDoesNotBlockOnLockConflict(KillTestContext ctx) { assertThat(kvView.removeAllAsync(null, Arrays.asList(key0, key, key2)), willSucceedFast()); } - @Test - @Disabled("https://issues.apache.org/jira/browse/IGNITE-27947") - public void testRollbackDoesNotBlockOnLockConflictDuringFirstRequest() throws InterruptedException { - // Note: reversed tx priority is required for this test. - ClientTable table = (ClientTable) table(); - KeyValueView kvView = table().keyValueView(); + static boolean isUsingReverseComparator() { + TxIdComparators comparator = IgniteTestUtils.getFieldValue(null, TxManagerImpl.class, "DEFAULT_TX_ID_COMPARATOR"); + return comparator == TxIdComparators.REVERSED; + } - Map map = table.partitionDistribution().primaryReplicasAsync().join(); - List tuples0 = generateKeysForPartition(100, 10, map, 0, table); + @EnabledIf("org.apache.ignite.internal.client.ItThinClientTransactionsTest#isUsingReverseComparator") + @Nested + class OnConflictDuringFirstRequest { + class Data { + final IgniteImpl ignite; + final List tuples; + final ClientLazyTransaction tx1; + final ClientLazyTransaction tx2; + final CompletableFuture req2Fut; + + Data( + IgniteImpl ignite, + List tuples, + ClientLazyTransaction tx1, + ClientLazyTransaction tx2, + CompletableFuture req2Fut + ) { + this.ignite = ignite; + this.tuples = tuples; + this.tx1 = tx1; + this.tx2 = tx2; + this.req2Fut = req2Fut; + } + } - // We need a waiter for this scenario. - Tuple key = tuples0.get(0); - Tuple val = val("1"); + Data prepareBlockedTransaction(KillTestContext ctx) throws InterruptedException { + ClientTable table = (ClientTable) table(); + ClientSql sql = (ClientSql) client().sql(); + + Map map = table.partitionDistribution().primaryReplicasAsync().join(); + Entry mapping = map.entrySet().iterator().next(); + List tuples0 = generateKeysForPartition(100, 10, map, (int) mapping.getKey().id(), table); + Ignite server = server(mapping.getValue()); + IgniteImpl ignite = unwrapIgniteImpl(server); + + // Init SQL mappings. + Tuple key0 = tuples0.get(0); + sql.execute(format("INSERT INTO %s (%s, %s) VALUES (?, ?)", TABLE_NAME, COLUMN_KEY, COLUMN_VAL), + key0.intValue(0), key0.intValue(0) + ""); + await().atMost(2, TimeUnit.SECONDS) + .until(() -> sql.partitionAwarenessCachedMetas().stream().allMatch(PartitionMappingProvider::ready)); - ClientLazyTransaction tx1 = (ClientLazyTransaction) client().transactions().begin(); - ClientLazyTransaction tx2 = (ClientLazyTransaction) client().transactions().begin(); + // We need a waiter for this scenario. + Tuple key = tuples0.get(1); + + ClientLazyTransaction tx2 = (ClientLazyTransaction) client().transactions().begin(); + ClientLazyTransaction tx1 = (ClientLazyTransaction) client().transactions().begin(); + + // Starts the transaction. + assertThat(ctx.put.apply(client(), tx1, key), willSucceedIn(120, TimeUnit.SECONDS)); + + await().atMost(3, TimeUnit.SECONDS).until(() -> { + Iterator locks = ignite.txManager().lockManager().locks(tx1.startedTx().txId()); + + int count = CollectionUtils.count(locks); + return count == 2; + }); + + // Will wait for lock. + CompletableFuture fut2 = ctx.put.apply(client(), tx2, key); + Thread.sleep(500); + + assertThat(fut2.isDone(), is(false)); + IgniteTestUtils.assertThrows(AssertionError.class, () -> ClientTransaction.get(tx2), "Transaction is starting"); + + return new Data(ignite, tuples0, tx1, tx2, fut2); + } - kvView.put(tx1, key, val); + @ParameterizedTest + @MethodSource("org.apache.ignite.internal.client.ItThinClientTransactionsTest#killTestContextFactory") + public void testRollbackDoesNotBlock(KillTestContext ctx) throws InterruptedException { + var test = prepareBlockedTransaction(ctx); - // Will wait for lock. - CompletableFuture fut2 = kvView.putAsync(tx2, key, val); - assertFalse(fut2.isDone()); + // Rollback should not be blocked. + assertThat(test.tx2.rollbackAsync(), willSucceedIn(1, TimeUnit.SECONDS)); + assertThat(test.req2Fut, willThrowFast( + ctx.expectedErr, + "Can't acquire a lock because transaction is already finished")); - Thread.sleep(500); + assertThat(test.tx1.rollbackAsync(), willSucceedIn(1, TimeUnit.SECONDS)); - // Rollback should not be blocked. - assertThat(tx2.rollbackAsync(), willSucceedFast()); - assertThat(tx1.rollbackAsync(), willSucceedFast()); + var ex = assertThrows(TransactionException.class, () -> ClientTransaction.get(test.tx2)); + assertThat(ex.getMessage(), containsString("Transaction is already finished")); + assertThat(ex.getMessage(), containsString("committed=false")); + + KeyValueView kvView = table().keyValueView(); + assertThat(kvView.removeAllAsync(null, test.tuples.subList(0, 2)), willSucceedIn(5, TimeUnit.SECONDS)); + } + + @ParameterizedTest + @MethodSource("org.apache.ignite.internal.client.ItThinClientTransactionsTest#killTestContextFactory") + public void testOperationsBlockWaitingForLock(KillTestContext ctx) throws InterruptedException { + var test = prepareBlockedTransaction(ctx); + + CompletableFuture fut3 = ctx.put.apply(client(), test.tx2, test.tuples.get(2)); + + assertDoesNotThrow(test.tx1::startedTx); + + assertThat(test.tx1.rollbackAsync(), willSucceedIn(1, TimeUnit.SECONDS)); + + // After the lock is open, the requests are free to complete. + assertThat(test.req2Fut, willSucceedIn(1, TimeUnit.SECONDS)); + assertThat(fut3, willSucceedIn(1, TimeUnit.SECONDS)); + + assertThat(test.tx2.rollbackAsync(), willSucceedIn(1, TimeUnit.SECONDS)); + + KeyValueView kvView = table().keyValueView(); + assertThat(kvView.removeAllAsync(null, test.tuples.subList(0, 3)), willSucceedIn(5, TimeUnit.SECONDS)); + } + + @ParameterizedTest + @MethodSource("org.apache.ignite.internal.client.ItThinClientTransactionsTest#killTestContextFactory") + public void testCancelByRequestIdNotAvailable(KillTestContext ctx) throws InterruptedException { + var test = prepareBlockedTransaction(ctx); + + // Remove firstReqMapping from the server side. + { + Map firstReqToTxResMap = IgniteTestUtils.getFieldValue( + test.ignite.clientInboundMessageHandler(), + ClientInboundMessageHandler.class, + "firstReqToTxResMap" + ); + + CompletableFuture reqInfoFut = IgniteTestUtils.getFieldValue(test.tx2, ClientLazyTransaction.class, + "requestInfoFuture"); + Object requestInfo = reqInfoFut.join(); + long firstReqId = IgniteTestUtils.getFieldValue(requestInfo, "firstReqId"); + firstReqToTxResMap.remove(firstReqId); + } + + // Will block because of the error. + var rollbackTx2Fut1 = test.tx2.rollbackAsync(); + Thread.sleep(1_000); + assertThat(rollbackTx2Fut1.isDone(), is(false)); + + // Do another concurrent rollback call just to make sure. + // If we allow multiple rollback requests by id to be sent concurrently, the outcome might be different. + var rollbackTx2Fut2 = test.tx2.rollbackAsync(); + Thread.sleep(1_000); + assertThat(rollbackTx2Fut2.isDone(), is(false)); + + // Now unblock the transaction. + assertThat(test.tx1.rollbackAsync(), willSucceedIn(1, TimeUnit.SECONDS)); + + // Requests should rollback. + assertThat(rollbackTx2Fut1, willSucceedIn(1, TimeUnit.SECONDS)); + assertThat(rollbackTx2Fut2, willSucceedIn(1, TimeUnit.SECONDS)); + + var ex = assertThrows(TransactionException.class, () -> ClientTransaction.get(test.tx2)); + assertThat(ex.getMessage(), containsString("Transaction is already finished")); + assertThat(ex.getMessage(), containsString("committed=false")); + + KeyValueView kvView = table().keyValueView(); + assertThat(kvView.removeAllAsync(null, test.tuples.subList(0, 2)), willSucceedIn(5, TimeUnit.SECONDS)); + } } @ParameterizedTest diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java index d932247b62f5..6c9c7c5711ac 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java @@ -17,8 +17,9 @@ package org.apache.ignite.internal.client; +import java.util.ArrayList; +import java.util.List; import org.apache.ignite.internal.client.proto.ClientMessagePacker; -import org.jetbrains.annotations.Nullable; /** * Thin client payload output channel. @@ -33,8 +34,8 @@ public class PayloadOutputChannel implements AutoCloseable { /** Client request ID. */ private final long requestId; - /** Action to be executed when the payload is sent. */ - private volatile @Nullable Runnable onSentAction; + /** Actions to be executed when the payload is sent. */ + private final List onSentActions; /** * Constructor. @@ -47,6 +48,7 @@ public class PayloadOutputChannel implements AutoCloseable { this.ch = ch; this.out = out; this.requestId = requestId; + this.onSentActions = new ArrayList<>(); } /** @@ -88,11 +90,11 @@ public void close() { * @param action Action to be executed. */ public void onSent(Runnable action) { - this.onSentAction = action; + this.onSentActions.add(action); } /** Returns an action, if any, that should be executed when the payload is sent successfully. */ - @Nullable Runnable onSentAction() { - return onSentAction; + List onSentActions() { + return onSentActions; } } diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java index 5c198f1da98e..f36ba5073bd1 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java @@ -33,6 +33,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.BitSet; +import java.util.Collections; import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -106,7 +107,8 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS, ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES, ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_DISCARD, - ProtocolBitmaskFeature.SQL_UPDATE_COUNTERS_2 + ProtocolBitmaskFeature.SQL_UPDATE_COUNTERS_2, + ProtocolBitmaskFeature.TX_ROLLBACK_USING_FIRST_REQUEST )); /** Minimum supported heartbeat interval. */ @@ -405,6 +407,7 @@ private CompletableFuture send( payloadWriter.accept(payloadCh); } + var actions = Collections.unmodifiableList(payloadCh.onSentActions()); write(req).addListener(f -> { if (!f.isSuccess()) { String msg = "Failed to send request async [id=" + id + ", op=" + opCode + ", remoteAddress=" + cfg.getAddress() + "]"; @@ -420,8 +423,7 @@ private CompletableFuture send( } else { metrics.requestsSentIncrement(); - Runnable action = payloadCh.onSentAction(); - if (action != null) { + for (Runnable action : actions) { asyncContinuationExecutor.execute(action); } } diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java index 93c5b9a6fe9e..719446cae3c9 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java @@ -366,13 +366,21 @@ public CompletableFuture> executeAsyncInternal( return txStartFut.thenCompose(tx -> ch.serviceAsync( ClientOp.SQL_EXEC, - payloadWriter(ctx, transaction, cancellationToken, queryModifiers, statement, arguments, shouldTrackOperation), + DirectTxUtils.payloadWriter( + ctx, + transaction, + payloadWriter(ctx, transaction, cancellationToken, queryModifiers, statement, arguments, shouldTrackOperation) + ), payloadReader(ctx, mapper, tx, statement), () -> DirectTxUtils.resolveChannel(ctx, ch, shouldTrackOperation, tx, mapping), null, false ).handle((BiFunction, Throwable, CompletableFuture>>) (r, err) -> { if (err != null) { + if (DirectTxUtils.tryHandleErrorOnFirstRequest(ctx, ch, -1)) { + return failedFuture(err); + } + if (tx != null && shouldRecordTransactionFailure(err)) { tx.recordOperationFailure(err); } @@ -381,26 +389,7 @@ public CompletableFuture> executeAsyncInternal( return failedFuture(err); } - if (ctx.enlistmentToken != null) { - // In case of direct mapping error need to rollback the tx on coordinator. - return tx.rollbackAsync().handle((ignored, err0) -> { - if (err0 != null) { - err.addSuppressed(err0); - } - - sneakyThrow(err); - return null; - }); - } else { - return tx.rollbackAndDiscardDirectMappings(false).handle((ignored, err0) -> { - if (err0 != null) { - err.addSuppressed(err0); - } - - sneakyThrow(err); - return null; - }); - } + return DirectTxUtils.handleErrorOnOtherRequests(ctx, tx, err); } return completedFuture(r); diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java index 3b90a75e62a7..9d9920fa6807 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java @@ -496,7 +496,7 @@ private CompletableFuture doSchemaOutInOpAsync( return txStartFut.thenCompose(tx0 -> { return ch.serviceAsync( opCode, - w -> writer.accept(schema, w, ctx), + DirectTxUtils.payloadWriter(ctx, tx, w -> writer.accept(schema, w, ctx)), r -> readSchemaAndReadData(schema, r, reader, defaultValue, responseSchemaRequired, ctx, tx0), () -> DirectTxUtils.resolveChannel(ctx, ch, ClientOp.isWrite(opCode), tx0, pm), retryPolicyOverride, @@ -507,13 +507,7 @@ private CompletableFuture doSchemaOutInOpAsync( if (ex != null) { Throwable cause = ex; - if (ctx.firstReqFut != null) { - // Create failed transaction. - ClientTransaction failed = new ClientTransaction(ctx.channel, ch, id, ctx.readOnly, null, - ctx.pm, null, ch.observableTimestamp(), 0); - failed.fail(); - ctx.firstReqFut.complete(failed); - // Txn was not started, rollback is not required. + if (DirectTxUtils.tryHandleErrorOnFirstRequest(ctx, ch, id)) { fut.completeExceptionally(unwrapCause(ex)); return null; } @@ -546,37 +540,20 @@ private CompletableFuture doSchemaOutInOpAsync( cause = cause.getCause(); } + } - if (tx0 == null) { - fut.completeExceptionally(ex); - } else { - tx0.rollbackAndDiscardDirectMappings(false).handle((ignored, err0) -> { - if (err0 != null) { - ex.addSuppressed(err0); - } - - fut.completeExceptionally(ex); - - return (T) null; - }); - } + if (tx0 == null) { + fut.completeExceptionally(ex); } else { - // In case of direct mapping error we need to rollback the tx on coordinator. - tx0.rollbackAsync().handle((ignored, err0) -> { - if (err0 != null) { - ex.addSuppressed(err0); - } - - fut.completeExceptionally(ex); - - return (T) null; - }); + DirectTxUtils.handleErrorOnOtherRequests(ctx, tx0, ex) + .whenComplete((ignored, err0) -> fut.completeExceptionally(err0)); } + + return null; } else { fut.complete(ret); + return null; } - - return null; }); }); }).exceptionally(ex -> { diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java index 70bf1765f9fd..76845240c818 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java @@ -17,14 +17,18 @@ package org.apache.ignite.internal.client.tx; +import static java.util.concurrent.CompletableFuture.failedFuture; +import static org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_ROLLBACK_USING_FIRST_REQUEST; import static org.apache.ignite.internal.client.tx.ClientTransactions.USE_CONFIGURED_TIMEOUT_DEFAULT; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import java.util.EnumSet; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import org.apache.ignite.internal.client.ClientChannel; import org.apache.ignite.internal.client.ReliableChannel; +import org.apache.ignite.internal.client.proto.ClientOp; import org.apache.ignite.internal.client.proto.tx.ClientInternalTxOptions; import org.apache.ignite.internal.hlc.HybridTimestampTracker; import org.apache.ignite.internal.lang.IgniteBiTuple; @@ -39,12 +43,19 @@ * Lazy client transaction. Will be actually started on the first operation. */ public class ClientLazyTransaction implements Transaction { + private static final CompletableFuture NOT_SUPPORTED_FUTURE = + failedFuture(new UnsupportedOperationException("TX_ROLLBACK_USING_FIRST_REQUEST is not supported")); + private final long observableTimestamp; private final @Nullable TransactionOptions options; private final EnumSet txOptions; + private final AtomicBoolean cancelled = new AtomicBoolean(false); + + private final CompletableFuture requestInfoFuture = new CompletableFuture<>(); + private volatile CompletableFuture tx; ClientLazyTransaction(HybridTimestampTracker observableTimestamp, @Nullable TransactionOptions options) { @@ -94,12 +105,35 @@ public void rollback() throws TransactionException { public CompletableFuture rollbackAsync() { var tx0 = tx; + // TODO: IGNITE-28405 This is really fishy. It will probably let you reuse a transaction after calling a rollback :( if (tx0 == null) { // No operations were performed, nothing to rollback. return nullCompletedFuture(); } - return tx0.thenCompose(ClientTransaction::rollbackAsync); + // If the transaction is not started. Issue the rollback and wait for the server response. + if (!tx0.isDone() && cancelled.compareAndSet(false, true)) { + return requestInfoFuture + .thenCompose(reqInfo -> { + ClientChannel ch = reqInfo.ch; + if (ch.protocolContext().isFeatureSupported(TX_ROLLBACK_USING_FIRST_REQUEST)) { + return ch.serviceAsync(ClientOp.TX_ROLLBACK, w -> w.out().packLong(-reqInfo.firstReqId), r -> null); + } else { + return NOT_SUPPORTED_FUTURE; + } + }) + .handle((res, e) -> { + // If ok, we don't need to wait for the response. If error, let's block. + if (e == null) { + return nullCompletedFuture(); + } else { + return tx0.thenCompose(ClientTransaction::rollbackAsync); + } + }) + .thenCompose(f -> (CompletableFuture) f); + } else { + return tx0.thenCompose(ClientTransaction::rollbackAsync); + } } @Override @@ -221,8 +255,23 @@ public long observableTimestamp() { return txOptions; } + public void updateRequestInfo(long firstReqId, ClientChannel ch) { + boolean s = this.requestInfoFuture.complete(new RequestInfo(firstReqId, ch)); + assert s : "Transaction request info was previously set"; + } + @Override public String toString() { return S.toString(this); } + + private static class RequestInfo { + private final long firstReqId; + private final ClientChannel ch; + + private RequestInfo(long firstReqId, ClientChannel ch) { + this.firstReqId = firstReqId; + this.ch = ch; + } + } } diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java index 811a5689593b..0639e5ad9294 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java @@ -23,6 +23,7 @@ import static org.apache.ignite.internal.client.proto.tx.ClientTxUtils.TX_ID_DIRECT; import static org.apache.ignite.internal.client.proto.tx.ClientTxUtils.TX_ID_FIRST_DIRECT; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; +import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow; import static org.apache.ignite.lang.ErrorGroups.Client.CONNECTION_ERR; import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR; @@ -34,6 +35,7 @@ import org.apache.ignite.internal.client.PartitionMapping; import org.apache.ignite.internal.client.PayloadInputChannel; import org.apache.ignite.internal.client.PayloadOutputChannel; +import org.apache.ignite.internal.client.PayloadWriter; import org.apache.ignite.internal.client.ReliableChannel; import org.apache.ignite.internal.client.WriteContext; import org.apache.ignite.internal.client.proto.ClientMessageUnpacker; @@ -282,6 +284,81 @@ public static CompletableFuture resolveChannel( }); } + /** + * If the current request is the first request of a direct translation, add a listener to the {@link PayloadWriter}. + * + * @param ctx The {@link WriteContext} that holds transactional context information. + * @param tx The client transaction associated with the request, or {@code null} if none. + * @param base Request native {@link PayloadWriter}. + * @return The {@link PayloadWriter} that should be used on the request. + */ + public static PayloadWriter payloadWriter(WriteContext ctx, @Nullable Transaction tx, PayloadWriter base) { + if (ctx.firstReqFut != null && tx instanceof ClientLazyTransaction) { + return poc -> { + base.accept(poc); + + var clientLazyTx = (ClientLazyTransaction) tx; + long requestId = poc.requestId(); + ClientChannel cc = poc.clientChannel(); + poc.onSent(() -> clientLazyTx.updateRequestInfo(requestId, cc)); + }; + } else { + return base; + } + } + + /** + * Try to handle error on first request. Returns false if not in the context of the first request. + * This method essentially populates context with a failed request instance. + * + * @param ctx The {@link WriteContext} that holds transactional context information. + * @param ch The {@link ReliableChannel} used to resolve the actual communication channel. + * @param id Client Table Id. + * @return Whether the error was handled or not. + */ + public static boolean tryHandleErrorOnFirstRequest(WriteContext ctx, ReliableChannel ch, long id) { + if (ctx.firstReqFut == null) { + return false; + } + + // Create failed transaction. + ClientTransaction failed = new ClientTransaction(ctx.channel, ch, id, ctx.readOnly, null, + ctx.pm, null, ch.observableTimestamp(), 0); + failed.fail(); + ctx.firstReqFut.complete(failed); + // Txn was not started, rollback is not required. + return true; + } + + /** + * Handles errors after the first request. + * Essentially call the rollback on the transaction and appends any errors to the original error. + * + * @param ctx The {@link WriteContext} that holds transactional context information. + * @param tx The client transaction. + * @param err The error to be handled. + * @param type of the expected future. + * @return A completable future what always fails with the original error plus any suppressed errors. + */ + public static CompletableFuture handleErrorOnOtherRequests(WriteContext ctx, ClientTransaction tx, Throwable err) { + CompletableFuture rollback; + if (ctx.enlistmentToken != null) { + // In case of direct mapping error need to rollback the tx on coordinator. + rollback = tx.rollbackAsync(); + } else { + rollback = tx.rollbackAndDiscardDirectMappings(false); + } + + return rollback.handle((ignored, err0) -> { + if (err0 != null) { + err.addSuppressed(err0); + } + + sneakyThrow(err); + return null; + }); + } + private static CompletableFuture resolveChannelInner( WriteContext ctx, ReliableChannel ch,