Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.apache.ignite.client.handler.requests.table.ClientSchemasGetRequest;
import org.apache.ignite.client.handler.requests.table.ClientStreamerBatchSendRequest;
import org.apache.ignite.client.handler.requests.table.ClientStreamerWithReceiverBatchSendRequest;
import org.apache.ignite.client.handler.requests.table.ClientTableCommon;
import org.apache.ignite.client.handler.requests.table.ClientTableGetQualifiedRequest;
import org.apache.ignite.client.handler.requests.table.ClientTableGetRequest;
import org.apache.ignite.client.handler.requests.table.ClientTablePartitionPrimaryReplicasGetRequest;
Expand Down Expand Up @@ -189,6 +190,7 @@
import org.apache.ignite.security.exception.InvalidCredentialsException;
import org.apache.ignite.security.exception.UnsupportedAuthenticationTypeException;
import org.apache.ignite.sql.SqlBatchException;
import org.apache.ignite.table.IgniteTables;
import org.apache.ignite.tx.RetriableTransactionException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
Expand Down Expand Up @@ -291,6 +293,25 @@ public class ClientInboundMessageHandler

private final HandshakeEventLoopSwitcher handshakeEventLoopSwitcher;

/**
* Tracks mappings between the first {@code requestId} and the {@code resourceId}
* holding the Tx object for directly mapped transactions. The process is not localized.
*
* <p><b>Mappings are created:</b>
* <ul>
* <li>When a direct transaction is created in {@link ClientTableCommon#readTx(ClientMessageUnpacker, HybridTimestampTracker,
* ClientResourceRegistry, TxManager, IgniteTables, NotificationSender, long[], long, Map)}.
* </li>
* </ul>
*
* <p><b>Mappings are removed:</b>
* <ul>
* <li>During a rollback request.</li>
* <li>After the first request response is sent to the client.</li>
* </ul>
*/
private final Map<Long, Long> firstReqToTxResMap = new ConcurrentHashMap<>();

/**
* Constructor.
*
Expand Down Expand Up @@ -899,70 +920,72 @@ private CompletableFuture<ResponseWriter> processOperation(
return ClientTableGetRequest.process(in, igniteTables);

case ClientOp.TUPLE_UPSERT:
return ClientTupleUpsertRequest.process(
in, igniteTables, resources, txManager, clockService, notificationSender(requestId), tsTracker);
return ClientTupleUpsertRequest.process(in, igniteTables, resources, txManager, clockService, notificationSender(requestId),
tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_GET:
return ClientTupleGetRequest.process(in, igniteTables, resources, txManager, clockService, tsTracker);
return ClientTupleGetRequest.process(in, igniteTables, resources, txManager, clockService, tsTracker, requestId,
firstReqToTxResMap);

case ClientOp.TUPLE_UPSERT_ALL:
return ClientTupleUpsertAllRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_GET_ALL:
return ClientTupleGetAllRequest.process(in, igniteTables, resources, txManager, clockService, tsTracker,
clientContext.hasFeature(TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS));
requestId, firstReqToTxResMap, clientContext.hasFeature(TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS));

case ClientOp.TUPLE_GET_AND_UPSERT:
return ClientTupleGetAndUpsertRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_INSERT:
return ClientTupleInsertRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_INSERT_ALL:
return ClientTupleInsertAllRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_REPLACE:
return ClientTupleReplaceRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_REPLACE_EXACT:
return ClientTupleReplaceExactRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_GET_AND_REPLACE:
return ClientTupleGetAndReplaceRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_DELETE:
return ClientTupleDeleteRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_DELETE_ALL:
return ClientTupleDeleteAllRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_DELETE_EXACT:
return ClientTupleDeleteExactRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_DELETE_ALL_EXACT:
return ClientTupleDeleteAllExactRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_GET_AND_DELETE:
return ClientTupleGetAndDeleteRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_CONTAINS_KEY:
return ClientTupleContainsKeyRequest.process(in, igniteTables, resources, txManager, clockService, tsTracker);
return ClientTupleContainsKeyRequest.process(in, igniteTables, resources, txManager, clockService, tsTracker, requestId,
firstReqToTxResMap);

case ClientOp.TUPLE_CONTAINS_ALL_KEYS:
return ClientTupleContainsAllKeysRequest.process(in, igniteTables, resources, txManager, clockService, tsTracker,
clientContext.hasFeature(TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS));
requestId, firstReqToTxResMap, clientContext.hasFeature(TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS));

case ClientOp.JDBC_CONNECT:
return ClientJdbcConnectRequest.execute(in, jdbcQueryEventHandler, resolveCurrentUsername());
Expand Down Expand Up @@ -1011,7 +1034,7 @@ private CompletableFuture<ResponseWriter> processOperation(
clientContext.hasFeature(TX_PIGGYBACK), clientContext.hasFeature(TX_DIRECT_MAPPING_SEND_REMOTE_WRITES), tsTracker);

case ClientOp.TX_ROLLBACK:
return ClientTransactionRollbackRequest.process(in, resources, metrics, igniteTables,
return ClientTransactionRollbackRequest.process(in, resources, metrics, igniteTables, firstReqToTxResMap,
clientContext.hasFeature(TX_PIGGYBACK), clientContext.hasFeature(TX_DIRECT_MAPPING_SEND_REMOTE_WRITES));

case ClientOp.COMPUTE_EXECUTE:
Expand Down Expand Up @@ -1066,6 +1089,7 @@ private CompletableFuture<ResponseWriter> processOperation(
igniteTables,
clockService,
notificationSender(requestId),
firstReqToTxResMap,
resolveCurrentUsername(),
clientContext.hasFeature(SQL_MULTISTATEMENT_SUPPORT),
clientContext.hasFeature(SQL_PARTITION_AWARENESS_TABLE_NAME),
Expand Down Expand Up @@ -1097,12 +1121,12 @@ partitionOperationsExecutor, in, queryProcessor, requestId, cancelHandles, tsTra

case ClientOp.SQL_QUERY_META:
return ClientSqlQueryMetadataRequest.process(
partitionOperationsExecutor, in, queryProcessor, resources, tsTracker
partitionOperationsExecutor, in, queryProcessor, resources, tsTracker, requestId, firstReqToTxResMap
);

case ClientOp.SQL_EXEC_BATCH:
return ClientSqlExecuteBatchRequest.process(
in, queryProcessor, resources, requestId, cancelHandles, tsTracker,
in, queryProcessor, resources, requestId, cancelHandles, tsTracker, firstReqToTxResMap,
resolveCurrentUsername()
);

Expand Down Expand Up @@ -1171,6 +1195,7 @@ private void processOperationInternal(
if (err != null) {
writeError(requestId, opCode, (Throwable) err, ctx, false);
metrics.requestsFailedIncrement();
firstReqToTxResMap.remove(requestId);
return;
}

Expand Down Expand Up @@ -1200,6 +1225,8 @@ private void processOperationInternal(
writeError(requestId, opCode, e, ctx, false);
metrics.requestsFailedIncrement();
}

firstReqToTxResMap.remove(requestId);
Copy link
Copy Markdown
Contributor

@ptupitsyn ptupitsyn Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we remove the mapping once the response is sent, but the client might have requested cancellation just before it received our response:

client: send request
server: firstReqToTxResMap.add
server: send response, `firstReqToTxResMap.remove`
client: cancel operation
server: returns an error?

Can we remove the firstReqToTxResMap later? For example, when the tx is cleaned up (removed from the resource registry)?

This should simplify client-side logic too, we can always rollback using the first request id.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Pavel,
Yeah, you are right.
The rollback message using the firstReqId may already be waiting to be processed when we remove here.
In this scenario, the server would respond with an error, but we are not interested in that response.
That's why we also attach to the actual transaction future from the server and execute the rollback using the normal method if necessary. If we received the first request response concurrently, we just resend the rollback using the normal way, even if the first rollback was already sent.
But I forgot to add this scenario to the tests. I had it in the first version but forgot to port it to this one.

Delaying the removal of the mapping is also possible, at the expense of slightly more complexity on the server-side. I thought about it during the impl. On the client-side it would probably be simpler as you said, since we would rely on the server response for the first rollback message.

I'll add the test and implement this to see how it looks.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you recheck @ptupitsyn ??

});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class ClientSqlExecuteBatchRequest {
* @param cancelHandleMap Registry of handlers. Request must register itself in this registry before switching to another
* thread.
* @param username Authenticated user name.
* @param reqToTxMap Tracker for first request of direct transactions.
* @return Future representing result of operation.
*/
public static CompletableFuture<ResponseWriter> process(
Expand All @@ -56,6 +57,7 @@ public static CompletableFuture<ResponseWriter> process(
long requestId,
Map<Long, CancelHandle> cancelHandleMap,
HybridTimestampTracker tsTracker,
Map<Long, Long> reqToTxMap,
String username
) {
CancelHandle cancelHandle = CancelHandle.create();
Expand All @@ -68,7 +70,9 @@ public static CompletableFuture<ResponseWriter> process(
null,
null,
null,
null
null,
requestId,
reqToTxMap
);

ClientSqlProperties props = new ClientSqlProperties(in, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class ClientSqlExecuteRequest {
* transaction.
* @param notificationSender Notification sender is required to send acknowledge for underlying write operation within a remote
* transaction.
* @param reqToTxMap Tracker for first request of direct transactions.
* @param username Authenticated user name or {@code null} for unknown user.
* @return Future representing result of operation.
*/
Expand All @@ -97,6 +98,7 @@ public static CompletableFuture<ResponseWriter> process(
IgniteTables tables,
ClockService clockService,
NotificationSender notificationSender,
Map<Long, Long> reqToTxMap,
@Nullable String username,
boolean sqlMultistatementsSupported,
boolean sqlPartitionAwarenessQualifiedNameSupported,
Expand All @@ -118,7 +120,9 @@ public static CompletableFuture<ResponseWriter> process(
txManager,
tables,
notificationSender,
resIdHolder
resIdHolder,
requestId,
reqToTxMap
);

ClientSqlProperties props = new ClientSqlProperties(in, sqlMultistatementsSupported);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.ignite.client.handler.ClientResourceRegistry;
Expand All @@ -42,16 +43,20 @@ public class ClientSqlQueryMetadataRequest {
* @param in Unpacker.
* @param processor SQL API.
* @param resources Resources.
* @param requestId Id of the request.
* @param reqToTxMap Tracker for first request of direct transactions.
* @return Future representing result of operation.
*/
public static CompletableFuture<ResponseWriter> process(
Executor operationExecutor,
ClientMessageUnpacker in,
QueryProcessor processor,
ClientResourceRegistry resources,
HybridTimestampTracker tsTracker
HybridTimestampTracker tsTracker,
long requestId,
Map<Long, Long> reqToTxMap
) {
CompletableFuture<InternalTransaction> txFut = readTx(in, tsTracker, resources, null, null, null, null);
CompletableFuture<InternalTransaction> txFut = readTx(in, tsTracker, resources, null, null, null, null, requestId, reqToTxMap);

String schema = in.unpackString();
String query = in.unpackString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.BitSet;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientResource;
Expand Down Expand Up @@ -421,6 +422,8 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
* @param txManager Tx manager.
* @param notificationSender Notification sender.
* @param resourceIdHolder Resource id holder.
* @param requestId Id of the request.
* @param reqToTxMap Tracker for first request of direct transactions.
* @return Transaction, if present, or null.
*/
public static CompletableFuture<@Nullable InternalTransaction> readTx(
Expand All @@ -430,7 +433,9 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
@Nullable TxManager txManager,
@Nullable IgniteTables tables,
@Nullable NotificationSender notificationSender,
long[] resourceIdHolder
long[] resourceIdHolder,
long requestId,
Map<Long, Long> reqToTxMap
) {
return readTx(
in,
Expand All @@ -440,6 +445,8 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
tables,
notificationSender,
resourceIdHolder,
requestId,
reqToTxMap,
EnumSet.noneOf(RequestOptions.class)
);
}
Expand All @@ -453,6 +460,8 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
* @param txManager Tx manager.
* @param notificationSender Notification sender.
* @param resourceIdHolder Resource id holder.
* @param requestId Id of the request.
* @param reqToTxMap Tracker for first request of direct transactions.
* @param options Request options. Defines how a request is processed.
* @return Transaction, if present, or null.
*/
Expand All @@ -464,6 +473,8 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
@Nullable IgniteTables tables,
@Nullable NotificationSender notificationSender,
long[] resourceIdHolder,
long requestId,
Map<Long, Long> reqToTxMap,
EnumSet<RequestOptions> options
) {
if (in.tryUnpackNil()) {
Expand Down Expand Up @@ -510,6 +521,9 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
// Attach resource id only on first direct request.
resourceIdHolder[0] = resources.put(new ClientResource(tx, tx::rollbackAsync));

// Record the mapping between first request and resourceId.
reqToTxMap.put(requestId, resourceIdHolder[0]);

return completedFuture(tx);
} else if (id == TX_ID_DIRECT) {
assert txManager != null : "Transaction manager must be specified to process directly mapped requests.";
Expand Down Expand Up @@ -589,9 +603,11 @@ static CompletableFuture<InternalTransaction> readOrStartImplicitTx(
IgniteTables tables,
EnumSet<RequestOptions> options,
@Nullable NotificationSender notificationSender,
long[] resourceIdHolder
long[] resourceIdHolder,
long requestId,
Map<Long, Long> reqToTxMap
) {
return readTx(in, readTs, resources, txManager, tables, notificationSender, resourceIdHolder, options)
return readTx(in, readTs, resources, txManager, tables, notificationSender, resourceIdHolder, requestId, reqToTxMap, options)
.thenApply(tx -> {
if (tx == null) {
// Implicit transactions do not use an observation timestamp because RW never depends on it,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY;

import java.util.EnumSet;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientResourceRegistry;
import org.apache.ignite.client.handler.ResponseWriter;
Expand All @@ -46,6 +47,8 @@ public class ClientTupleContainsAllKeysRequest {
* @param txManager Transaction manager.
* @param clockService Clock service.
* @param tsTracker Tracker.
* @param requestId Id of the request.
* @param reqToTxMap Tracker for first request of direct transactions.
* @param supportsOptions {@code True} if supports tx options.
* @return Future.
*/
Expand All @@ -56,11 +59,13 @@ public static CompletableFuture<ResponseWriter> process(
TxManager txManager,
ClockService clockService,
HybridTimestampTracker tsTracker,
long requestId,
Map<Long, Long> reqToTxMap,
boolean supportsOptions
) {
EnumSet<RequestOptions> options = supportsOptions ? of(KEY_ONLY, HAS_OPTIONS) : of(KEY_ONLY);

return ClientTuplesRequestBase.readAsync(in, tables, resources, txManager, null, tsTracker, options)
return ClientTuplesRequestBase.readAsync(in, tables, resources, txManager, null, tsTracker, options, requestId, reqToTxMap)
.thenCompose(req -> req.table().recordView().containsAllAsync(req.tx(), req.tuples())
.thenApply(containsAll -> out -> {
writeTxMeta(out, tsTracker, clockService, req);
Expand Down
Loading