Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,12 @@ public enum ProtocolBitmaskFeature {
/**
* Client supports SQL_UPDATE_COUNTERS_2 error extension (single binary value instead of array).
*/
SQL_UPDATE_COUNTERS_2(18);
SQL_UPDATE_COUNTERS_2(18),

/**
* Allow rolling back direct transactions using the first request id.
*/
TX_ROLLBACK_USING_FIRST_REQUEST(19);

private static final EnumSet<ProtocolBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =
EnumSet.allOf(ProtocolBitmaskFeature.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,7 @@ public void testServerReturnsAllItsFeatures() throws IOException {
expected.set(16);
expected.set(17);
expected.set(18);
expected.set(19);

assertEquals(expected, supportedFeatures);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ public class ClientHandlerModule implements IgniteComponent, PlatformComputeTran
ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES,
ProtocolBitmaskFeature.SQL_PARTITION_AWARENESS_TABLE_NAME,
ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_DISCARD,
ProtocolBitmaskFeature.SQL_UPDATE_COUNTERS_2
ProtocolBitmaskFeature.SQL_UPDATE_COUNTERS_2,
ProtocolBitmaskFeature.TX_ROLLBACK_USING_FIRST_REQUEST
));

/** Connection id generator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING;
import static org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES;
import static org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_PIGGYBACK;
import static org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_ROLLBACK_USING_FIRST_REQUEST;
import static org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP;
import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
Expand Down Expand Up @@ -96,6 +97,7 @@
import org.apache.ignite.client.handler.requests.table.ClientSchemasGetRequest;
import org.apache.ignite.client.handler.requests.table.ClientStreamerBatchSendRequest;
import org.apache.ignite.client.handler.requests.table.ClientStreamerWithReceiverBatchSendRequest;
import org.apache.ignite.client.handler.requests.table.ClientTableCommon;
import org.apache.ignite.client.handler.requests.table.ClientTableGetQualifiedRequest;
import org.apache.ignite.client.handler.requests.table.ClientTableGetRequest;
import org.apache.ignite.client.handler.requests.table.ClientTablePartitionPrimaryReplicasGetRequest;
Expand Down Expand Up @@ -190,6 +192,7 @@
import org.apache.ignite.security.exception.InvalidCredentialsException;
import org.apache.ignite.security.exception.UnsupportedAuthenticationTypeException;
import org.apache.ignite.sql.SqlBatchException;
import org.apache.ignite.table.IgniteTables;
import org.apache.ignite.tx.RetriableTransactionException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
Expand Down Expand Up @@ -292,6 +295,24 @@ public class ClientInboundMessageHandler

private final HandshakeEventLoopSwitcher handshakeEventLoopSwitcher;

/**
* Tracks mappings between the first {@code requestId} and the {@code resourceId}
* holding the Tx object for directly mapped transactions. The process is not localized.
*
* <p><b>Mappings are created:</b>
* <ul>
* <li>When a direct transaction is created in {@link ClientTableCommon#readTx(ClientMessageUnpacker, HybridTimestampTracker,
* ClientResourceRegistry, TxManager, IgniteTables, NotificationSender, long[], long, Map)}.
* </li>
* </ul>
*
* <p><b>Mappings are removed:</b>
* <ul>
* <li>During a commit or rollback request. Hook is added at creation.</li>
* </ul>
*/
private final Map<Long, Long> firstReqToTxResMap = new ConcurrentHashMap<>();

/**
* Constructor.
*
Expand Down Expand Up @@ -557,6 +578,7 @@ private void handshakeSuccess(
actualFeatures.clear(TX_DELAYED_ACKS.featureId());
actualFeatures.clear(TX_PIGGYBACK.featureId());
actualFeatures.clear(TX_ALLOW_NOOP_ENLIST.featureId());
actualFeatures.clear(TX_ROLLBACK_USING_FIRST_REQUEST.featureId());

actualFeatures.clear(SQL_DIRECT_TX_MAPPING.featureId());
} else {
Expand Down Expand Up @@ -945,70 +967,72 @@ private CompletableFuture<ResponseWriter> processOperation(
return ClientTableGetRequest.process(in, igniteTables);

case ClientOp.TUPLE_UPSERT:
return ClientTupleUpsertRequest.process(
in, igniteTables, resources, txManager, clockService, notificationSender(requestId), tsTracker);
return ClientTupleUpsertRequest.process(in, igniteTables, resources, txManager, clockService, notificationSender(requestId),
tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_GET:
return ClientTupleGetRequest.process(in, igniteTables, resources, txManager, clockService, tsTracker);
return ClientTupleGetRequest.process(in, igniteTables, resources, txManager, clockService, tsTracker, requestId,
firstReqToTxResMap);

case ClientOp.TUPLE_UPSERT_ALL:
return ClientTupleUpsertAllRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_GET_ALL:
return ClientTupleGetAllRequest.process(in, igniteTables, resources, txManager, clockService, tsTracker,
clientContext.hasFeature(TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS));
requestId, firstReqToTxResMap, clientContext.hasFeature(TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS));

case ClientOp.TUPLE_GET_AND_UPSERT:
return ClientTupleGetAndUpsertRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_INSERT:
return ClientTupleInsertRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_INSERT_ALL:
return ClientTupleInsertAllRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_REPLACE:
return ClientTupleReplaceRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_REPLACE_EXACT:
return ClientTupleReplaceExactRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_GET_AND_REPLACE:
return ClientTupleGetAndReplaceRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_DELETE:
return ClientTupleDeleteRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_DELETE_ALL:
return ClientTupleDeleteAllRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_DELETE_EXACT:
return ClientTupleDeleteExactRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_DELETE_ALL_EXACT:
return ClientTupleDeleteAllExactRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_GET_AND_DELETE:
return ClientTupleGetAndDeleteRequest.process(in, igniteTables, resources, txManager, clockService,
notificationSender(requestId), tsTracker);
notificationSender(requestId), tsTracker, requestId, firstReqToTxResMap);

case ClientOp.TUPLE_CONTAINS_KEY:
return ClientTupleContainsKeyRequest.process(in, igniteTables, resources, txManager, clockService, tsTracker);
return ClientTupleContainsKeyRequest.process(in, igniteTables, resources, txManager, clockService, tsTracker, requestId,
firstReqToTxResMap);

case ClientOp.TUPLE_CONTAINS_ALL_KEYS:
return ClientTupleContainsAllKeysRequest.process(in, igniteTables, resources, txManager, clockService, tsTracker,
clientContext.hasFeature(TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS));
requestId, firstReqToTxResMap, clientContext.hasFeature(TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS));

case ClientOp.JDBC_CONNECT:
return ClientJdbcConnectRequest.execute(in, jdbcQueryEventHandler, resolveCurrentUsername());
Expand Down Expand Up @@ -1057,7 +1081,7 @@ private CompletableFuture<ResponseWriter> processOperation(
clientContext.hasFeature(TX_PIGGYBACK), clientContext.hasFeature(TX_DIRECT_MAPPING_SEND_REMOTE_WRITES), tsTracker);

case ClientOp.TX_ROLLBACK:
return ClientTransactionRollbackRequest.process(in, resources, metrics, igniteTables,
return ClientTransactionRollbackRequest.process(in, resources, metrics, igniteTables, firstReqToTxResMap,
clientContext.hasFeature(TX_PIGGYBACK), clientContext.hasFeature(TX_DIRECT_MAPPING_SEND_REMOTE_WRITES));

case ClientOp.COMPUTE_EXECUTE:
Expand Down Expand Up @@ -1112,6 +1136,7 @@ private CompletableFuture<ResponseWriter> processOperation(
igniteTables,
clockService,
notificationSender(requestId),
firstReqToTxResMap,
resolveCurrentUsername(),
clientContext.hasFeature(SQL_MULTISTATEMENT_SUPPORT),
clientContext.hasFeature(SQL_PARTITION_AWARENESS_TABLE_NAME),
Expand Down Expand Up @@ -1143,12 +1168,12 @@ partitionOperationsExecutor, in, queryProcessor, requestId, cancelHandles, tsTra

case ClientOp.SQL_QUERY_META:
return ClientSqlQueryMetadataRequest.process(
partitionOperationsExecutor, in, queryProcessor, resources, tsTracker
partitionOperationsExecutor, in, queryProcessor, resources, tsTracker, requestId, firstReqToTxResMap
);

case ClientOp.SQL_EXEC_BATCH:
return ClientSqlExecuteBatchRequest.process(
in, queryProcessor, resources, requestId, cancelHandles, tsTracker,
in, queryProcessor, resources, requestId, cancelHandles, tsTracker, firstReqToTxResMap,
resolveCurrentUsername()
);

Expand Down Expand Up @@ -1218,6 +1243,7 @@ private void processOperationInternal(
if (err != null) {
writeError(requestId, opCode, (Throwable) err, ctx, false, guard);
metrics.requestsFailedIncrement();
firstReqToTxResMap.remove(requestId);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class ClientSqlExecuteBatchRequest {
* @param cancelHandleMap Registry of handlers. Request must register itself in this registry before switching to another
* thread.
* @param username Authenticated user name.
* @param reqToTxMap Tracker for first request of direct transactions.
* @return Future representing result of operation.
*/
public static CompletableFuture<ResponseWriter> process(
Expand All @@ -56,6 +57,7 @@ public static CompletableFuture<ResponseWriter> process(
long requestId,
Map<Long, CancelHandle> cancelHandleMap,
HybridTimestampTracker tsTracker,
Map<Long, Long> reqToTxMap,
String username
) {
CancelHandle cancelHandle = CancelHandle.create();
Expand All @@ -68,7 +70,9 @@ public static CompletableFuture<ResponseWriter> process(
null,
null,
null,
null
null,
requestId,
reqToTxMap
);

ClientSqlProperties props = new ClientSqlProperties(in, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class ClientSqlExecuteRequest {
* transaction.
* @param notificationSender Notification sender is required to send acknowledge for underlying write operation within a remote
* transaction.
* @param reqToTxMap Tracker for first request of direct transactions.
* @param username Authenticated user name or {@code null} for unknown user.
* @return Future representing result of operation.
*/
Expand All @@ -97,6 +98,7 @@ public static CompletableFuture<ResponseWriter> process(
IgniteTables tables,
ClockService clockService,
NotificationSender notificationSender,
Map<Long, Long> reqToTxMap,
@Nullable String username,
boolean sqlMultistatementsSupported,
boolean sqlPartitionAwarenessQualifiedNameSupported,
Expand All @@ -118,7 +120,9 @@ public static CompletableFuture<ResponseWriter> process(
txManager,
tables,
notificationSender,
resIdHolder
resIdHolder,
requestId,
reqToTxMap
);

ClientSqlProperties props = new ClientSqlProperties(in, sqlMultistatementsSupported);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.ignite.client.handler.ClientResourceRegistry;
Expand All @@ -42,16 +43,20 @@ public class ClientSqlQueryMetadataRequest {
* @param in Unpacker.
* @param processor SQL API.
* @param resources Resources.
* @param requestId Id of the request.
* @param reqToTxMap Tracker for first request of direct transactions.
* @return Future representing result of operation.
*/
public static CompletableFuture<ResponseWriter> process(
Executor operationExecutor,
ClientMessageUnpacker in,
QueryProcessor processor,
ClientResourceRegistry resources,
HybridTimestampTracker tsTracker
HybridTimestampTracker tsTracker,
long requestId,
Map<Long, Long> reqToTxMap
) {
CompletableFuture<InternalTransaction> txFut = readTx(in, tsTracker, resources, null, null, null, null);
CompletableFuture<InternalTransaction> txFut = readTx(in, tsTracker, resources, null, null, null, null, requestId, reqToTxMap);

String schema = in.unpackString();
String query = in.unpackString();
Expand Down
Loading