From 393915b3a513b61c0249546bab5a559108832868 Mon Sep 17 00:00:00 2001 From: Egor Kuts Date: Tue, 7 Apr 2026 21:00:39 +0400 Subject: [PATCH] ignite-28305 backpressure to limit in flight partition operations per node --- .../ignite/client/handler/TestServer.java | 2 + .../client/handler/ClientHandlerModule.java | 7 ++ .../handler/ClientInboundMessageHandler.java | 38 +++++-- .../client/TestClientHandlerModule.java | 2 + .../org/apache/ignite/client/TestServer.java | 2 + .../lang/ReplicaOverloadedException.java | 33 ++++++ .../PartitionOperationInFlightLimiter.java | 101 ++++++++++++++++++ .../rebalance/ItRebalanceDistributedTest.java | 2 + .../partition/replicator/fixtures/Node.java | 2 + .../TxCleanupRecoveryRequestHandler.java | 16 +-- .../PartitionReplicaLifecycleManagerTest.java | 2 + .../ItPlacementDriverReplicaSideTest.java | 2 + .../internal/replicator/ReplicaManager.java | 31 +++++- .../ReplicationConfigurationSchema.java | 10 ++ .../replicator/ReplicaManagerTest.java | 2 + .../runner/app/ItIgniteNodeRestartTest.java | 2 + .../ignite/internal/app/IgniteImpl.java | 7 ++ .../configuration/ignite-snapshot.bin | Bin 5687 -> 5727 bytes .../distributed/ReplicaUnavailableTest.java | 2 + .../distributed/TableManagerRecoveryTest.java | 2 + .../ignite/distributed/ItTxTestCluster.java | 2 + 21 files changed, 244 insertions(+), 23 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/lang/ReplicaOverloadedException.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/util/PartitionOperationInFlightLimiter.java diff --git a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java index 230e2ebe32ec..3d48ddf72c27 100644 --- a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java +++ b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java @@ -48,6 +48,7 @@ import org.apache.ignite.internal.table.IgniteTablesInternal; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.TestInfo; @@ -144,6 +145,7 @@ ClientHandlerModule start(TestInfo testInfo) { EventLog.NOOP, new TestLowWatermark(), Runnable::run, + new PartitionOperationInFlightLimiter(0), () -> true ); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java index b7e6a357fc6d..64b8e47216f0 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java @@ -74,6 +74,7 @@ import org.apache.ignite.internal.table.IgniteTablesInternal; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.util.IgniteSpinBusyLock; +import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter; import org.apache.ignite.lang.IgniteException; import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; @@ -163,6 +164,8 @@ public class ClientHandlerModule implements IgniteComponent, PlatformComputeTran private final Executor partitionOperationsExecutor; + private final PartitionOperationInFlightLimiter partitionOperationInFlightLimiter; + private final ConcurrentHashMap> computeExecutors = new ConcurrentHashMap<>(); @TestOnly @@ -186,6 +189,7 @@ public class ClientHandlerModule implements IgniteComponent, PlatformComputeTran * @param eventLog Event log. * @param lowWatermark Low watermark. * @param partitionOperationsExecutor Executor for a partition operation. + * @param partitionOperationInFlightLimiter In-flight limiter for partition operations. * @param ddlBatchingSuggestionEnabled Boolean supplier indicates whether the suggestion related DDL batching is enabled. */ public ClientHandlerModule( @@ -207,6 +211,7 @@ public ClientHandlerModule( EventLog eventLog, LowWatermark lowWatermark, Executor partitionOperationsExecutor, + PartitionOperationInFlightLimiter partitionOperationInFlightLimiter, Supplier ddlBatchingSuggestionEnabled ) { assert igniteTables != null; @@ -252,6 +257,7 @@ public ClientHandlerModule( this.clientConnectorConfiguration = clientConnectorConfiguration; this.ddlBatchingSuggestionEnabled = ddlBatchingSuggestionEnabled; this.partitionOperationsExecutor = partitionOperationsExecutor; + this.partitionOperationInFlightLimiter = partitionOperationInFlightLimiter; } /** {@inheritDoc} */ @@ -471,6 +477,7 @@ private ClientInboundMessageHandler createInboundMessageHandler( connectionId, primaryReplicaTracker, partitionOperationsExecutor, + partitionOperationInFlightLimiter, SUPPORTED_FEATURES, Map.of(), computeExecutors::remove, diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java index 7cea0def2768..62ba0a988ba4 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java @@ -153,6 +153,7 @@ import org.apache.ignite.internal.jdbc.proto.JdbcQueryCursorHandler; import org.apache.ignite.internal.lang.IgniteExceptionMapperUtil; import org.apache.ignite.internal.lang.IgniteInternalCheckedException; +import org.apache.ignite.internal.lang.ReplicaOverloadedException; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.network.ClusterService; @@ -180,6 +181,7 @@ import org.apache.ignite.internal.tx.TransactionKilledException; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.util.ExceptionUtils; +import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter; import org.apache.ignite.lang.CancelHandle; import org.apache.ignite.lang.ErrorGroups.Compute; import org.apache.ignite.lang.ErrorGroups.Sql; @@ -278,6 +280,8 @@ public class ClientInboundMessageHandler private final Executor partitionOperationsExecutor; + private final PartitionOperationInFlightLimiter partitionOperationInFlightLimiter; + private final BitSet features; private final Map extensions; @@ -309,6 +313,7 @@ public class ClientInboundMessageHandler * @param connectionId Connection ID. * @param primaryReplicaTracker Primary replica tracker. * @param partitionOperationsExecutor Partition operations executor. + * @param partitionOperationInFlightLimiter In-flight limiter for partition operations. * @param features Features. * @param extensions Extensions. * @param eventLog Event log. @@ -330,6 +335,7 @@ public ClientInboundMessageHandler( long connectionId, ClientPrimaryReplicaTracker primaryReplicaTracker, Executor partitionOperationsExecutor, + PartitionOperationInFlightLimiter partitionOperationInFlightLimiter, BitSet features, Map extensions, Function> computeConnectionFunc, @@ -373,6 +379,7 @@ public ClientInboundMessageHandler( this.eventLog = eventLog; this.primaryReplicaTracker = primaryReplicaTracker; this.partitionOperationsExecutor = partitionOperationsExecutor; + this.partitionOperationInFlightLimiter = partitionOperationInFlightLimiter; this.handshakeEventLoopSwitcher = handshakeEventLoopSwitcher; jdbcQueryCursorHandler = new JdbcQueryCursorHandlerImpl(resources); @@ -882,19 +889,28 @@ private void processOperation(ChannelHandlerContext ctx, ClientMessageUnpacker i if (ClientOp.isPartitionOperation(opCode)) { long requestId0 = requestId; int opCode0 = opCode; + if (!partitionOperationInFlightLimiter.tryAcquire()) { + in.close(); - partitionOperationsExecutor.execute(() -> { - try { - processOperationInternal(ctx, in, requestId0, opCode0, guard); - } catch (Throwable t) { - in.close(); - - writeError(requestId0, opCode0, t, ctx, false, guard); + writeError(requestId0, opCode0, new ReplicaOverloadedException(), ctx, false, guard); - metrics.requestsFailedIncrement(); - metrics.requestsActiveDecrement(); - } - }); + metrics.requestsFailedIncrement(); + } else { + partitionOperationsExecutor.execute(() -> { + try { + processOperationInternal(ctx, in, requestId0, opCode0, guard); + } catch (Throwable t) { + in.close(); + + writeError(requestId0, opCode0, t, ctx, false, guard); + + metrics.requestsFailedIncrement(); + metrics.requestsActiveDecrement(); + } finally { + partitionOperationInFlightLimiter.release(); + } + }); + } } else { processOperationInternal(ctx, in, requestId, opCode, guard); } diff --git a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java index 8c86fe7ed6bd..13c3f1d47f25 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java +++ b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java @@ -65,6 +65,7 @@ import org.apache.ignite.internal.schema.AlwaysSyncedSchemaSyncService; import org.apache.ignite.internal.security.authentication.AuthenticationManager; import org.apache.ignite.internal.table.IgniteTablesInternal; +import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter; import org.apache.ignite.lang.IgniteException; import org.jetbrains.annotations.Nullable; @@ -272,6 +273,7 @@ protected void initChannel(Channel ch) { new TestLowWatermark() ), Runnable::run, + new PartitionOperationInFlightLimiter(0), features, randomExtensions(), unused -> null, diff --git a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java index 013462ff3aed..73a741ffb1fa 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java +++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java @@ -79,6 +79,7 @@ import org.apache.ignite.internal.security.authentication.AuthenticationManagerImpl; import org.apache.ignite.internal.security.configuration.SecurityConfiguration; import org.apache.ignite.internal.table.IgniteTablesInternal; +import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter; import org.apache.ignite.network.NetworkAddress; import org.jetbrains.annotations.Nullable; import org.mockito.Mockito; @@ -290,6 +291,7 @@ public void log(String type, Supplier eventProvider) { EventLog.NOOP, new TestLowWatermark(), Runnable::run, + new PartitionOperationInFlightLimiter(0), () -> true ); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/lang/ReplicaOverloadedException.java b/modules/core/src/main/java/org/apache/ignite/internal/lang/ReplicaOverloadedException.java new file mode 100644 index 000000000000..c5d5fb8f9baa --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/lang/ReplicaOverloadedException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.lang; + +import static org.apache.ignite.lang.ErrorGroups.Replicator.GROUP_OVERLOADED_ERR; + +/** + * Thrown when the node has reached the maximum number of in-flight partition operations + * ({@code replication.maxInFlightPartitionOperations}) and cannot accept new requests. + */ +public class ReplicaOverloadedException extends IgniteInternalException { + private static final long serialVersionUID = -6023736883539658779L; + + /** Constructor. */ + public ReplicaOverloadedException() { + super(GROUP_OVERLOADED_ERR, "Node is overloaded: max in-flight partition operations limit reached."); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/PartitionOperationInFlightLimiter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/PartitionOperationInFlightLimiter.java new file mode 100644 index 000000000000..5a0b4d7608ef --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/PartitionOperationInFlightLimiter.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import java.util.concurrent.Semaphore; +import java.util.function.IntSupplier; +import org.jetbrains.annotations.Nullable; + +/** + * Limits the number of in-flight partition operations (queued or executing) across the replica manager and thin-client connector. + * + *

When the limit is zero or less, all operations are permitted unconditionally. + * When positive, {@link #tryAcquire()} returns {@code false} once the limit is reached and the caller should reject the request. + * A permit must be released via {@link #release()} upon operation completes. + */ +public class PartitionOperationInFlightLimiter { + private volatile Semaphore semaphore; + + private final @Nullable IntSupplier limitSupplier; + + private volatile boolean initialized; + + /** + * Constructor. + * + * @param maxInFlightPartitionOperations Max number of in-flight partition operations, or <= 0 to disable the limit. + */ + public PartitionOperationInFlightLimiter(int maxInFlightPartitionOperations) { + this.semaphore = maxInFlightPartitionOperations <= 0 ? null : new Semaphore(maxInFlightPartitionOperations); + this.limitSupplier = null; + this.initialized = true; + } + + /** + * Constructor that initializes the limit lazily from the given supplier on first use. + * Allows lazy init on first usage. + * + * @param maxInFlightPartitionOperationsSupplier Supplier of the maximum number of in-flight partition operations, or 0 to disable. + */ + public PartitionOperationInFlightLimiter(@Nullable IntSupplier maxInFlightPartitionOperationsSupplier) { + this.limitSupplier = maxInFlightPartitionOperationsSupplier; + this.initialized = false; + } + + /** + * Attempts to acquire a permit. + * + * @return {@code true} if a permit was acquired or the limit is disabled; {@code false} if the limit is reached. + */ + public boolean tryAcquire() { + Semaphore s = resolvedSemaphore(); + return s == null || s.tryAcquire(); + } + + /** + * Releases a previously acquired permit. + * Must only be called after a successful {@link #tryAcquire()} when the limit is enabled. + */ + public void release() { + Semaphore s = resolvedSemaphore(); + + if (s != null) { + s.release(); + } + } + + private @Nullable Semaphore resolvedSemaphore() { + if (initialized) { + return semaphore; + } + synchronized (this) { + if (initialized) { + return semaphore; + } + if (limitSupplier != null) { + int limit = limitSupplier.getAsInt(); + + if (limit != 0) { + this.semaphore = new Semaphore(limit); + } + } + this.initialized = true; + } + return semaphore; + } +} diff --git a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index 02684f96d6d4..9d702922e81f 100644 --- a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -258,6 +258,7 @@ import org.apache.ignite.internal.tx.message.TxMessageGroup; import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage; import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter; +import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.internal.vault.persistence.PersistentVaultService; import org.apache.ignite.network.NetworkAddress; @@ -1476,6 +1477,7 @@ private class Node { Set.of(PartitionReplicationMessageGroup.class, TxMessageGroup.class), placementDriver, threadPoolsManager.partitionOperationsExecutor(), + new PartitionOperationInFlightLimiter(0), partitionIdleSafeTimePropagationPeriodMsSupplier, new NoOpFailureManager(), new ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry()), diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java index d61f25985abc..b73fcd014c42 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java @@ -195,6 +195,7 @@ import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage; import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage; import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter; +import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; @@ -668,6 +669,7 @@ public CompletableFuture invoke(Condition condition, Operation success, Set.of(PartitionReplicationMessageGroup.class, TxMessageGroup.class), placementDriverManager.placementDriver(), threadPoolsManager.partitionOperationsExecutor(), + new PartitionOperationInFlightLimiter(0), partitionIdleSafeTimePropagationPeriodMsSupplier, new NoOpFailureManager(), new ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry()), diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxCleanupRecoveryRequestHandler.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxCleanupRecoveryRequestHandler.java index 9fd5ddeedbdc..54ad62a5b711 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxCleanupRecoveryRequestHandler.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxCleanupRecoveryRequestHandler.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.failure.FailureProcessor; import org.apache.ignite.internal.lang.IgniteBiTuple; import org.apache.ignite.internal.lang.IgniteInternalException; +import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.IgniteThrottledLogger; import org.apache.ignite.internal.logger.Loggers; @@ -156,13 +157,14 @@ private CompletableFuture callCleanup(TxMeta txMeta, UUID txId) { txMeta.commitTimestamp(), txId ).exceptionally(throwable -> { - throttledLog.warn( - "Failed to cleanup transaction", - "Failed to cleanup transaction {}.", - throwable, - formatTxInfo(txId, txManager) - ); - + if (!hasCause(throwable, NodeStoppingException.class)) { + throttledLog.warn( + "Failed to cleanup transaction", + "Failed to cleanup transaction {}.", + throwable, + formatTxInfo(txId, txManager) + ); + } return null; }); } diff --git a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java index c2fed07bfc42..d7d9abcc1f18 100644 --- a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java +++ b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java @@ -121,6 +121,7 @@ import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbPartitionStorage; import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage; import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbStorage; +import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.internal.util.SafeTimeValuesTracker; import org.apache.ignite.internal.worker.ThreadAssertions; @@ -243,6 +244,7 @@ void setUp( Set.of(), placementDriver, executorService, + new PartitionOperationInFlightLimiter(0), () -> Long.MAX_VALUE, failureManager, null, diff --git a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java index 92484f02a39c..d3212672a0f8 100644 --- a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java +++ b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java @@ -107,6 +107,7 @@ import org.apache.ignite.internal.thread.IgniteThreadFactory; import org.apache.ignite.internal.topology.TestLogicalTopologyService; import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.raft.jraft.option.PermissiveSafeTimeValidator; @@ -231,6 +232,7 @@ public void beforeTest(TestInfo testInfo) { Set.of(ReplicaMessageTestGroup.class), new TestPlacementDriver(primaryReplicaSupplier), partitionOperationsExecutor, + new PartitionOperationInFlightLimiter(0), () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS, mock(FailureProcessor.class), // TODO: IGNITE-22222 can't pass ThreadLocalPartitionCommandsMarshaller there due to dependency loop diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java index 8ff7610e762c..3209ce21543e 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java @@ -71,6 +71,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.ComponentStoppingException; import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.lang.ReplicaOverloadedException; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.IgniteThrottledLogger; import org.apache.ignite.internal.logger.Loggers; @@ -124,11 +125,11 @@ import org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest; import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage; import org.apache.ignite.internal.replicator.message.TimestampAware; -import org.apache.ignite.internal.thread.ExecutorChooser; import org.apache.ignite.internal.thread.IgniteThreadFactory; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.util.IgniteStripedBusyLock; import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.internal.util.TrackerClosedException; import org.apache.ignite.lang.IgniteException; @@ -225,6 +226,9 @@ public class ReplicaManager extends AbstractEventProducer> messageGroupsToHandle, PlacementDriver placementDriver, Executor requestsExecutor, + PartitionOperationInFlightLimiter partitionOperationInFlightLimiter, LongSupplier idleSafeTimePropagationPeriodMsSupplier, FailureProcessor failureProcessor, @Nullable Marshaller raftCommandsMarshaller, @@ -293,6 +299,7 @@ public ReplicaManager( this.placementDriverMessageHandler = this::onPlacementDriverMessageReceived; this.placementDriver = placementDriver; this.requestsExecutor = requestsExecutor; + this.partitionOperationInFlightLimiter = partitionOperationInFlightLimiter; this.idleSafeTimePropagationPeriodMsSupplier = idleSafeTimePropagationPeriodMsSupplier; this.failureProcessor = failureProcessor; this.raftCommandsMarshaller = raftCommandsMarshaller; @@ -321,6 +328,22 @@ public ReplicaManager( } private void onReplicaMessageReceived(NetworkMessage message, InternalClusterNode sender, @Nullable Long correlationId) { + if (!partitionOperationInFlightLimiter.tryAcquire()) { + clusterNetSvc.messagingService().respond( + sender.name(), + prepareReplicaErrorResponse(false, new ReplicaOverloadedException()), + correlationId); + + return; + } + try { + handleReplicaMessage(message, sender, correlationId); + } finally { + partitionOperationInFlightLimiter.release(); + } + } + + private void handleReplicaMessage(NetworkMessage message, InternalClusterNode sender, @Nullable Long correlationId) { if (!(message instanceof ReplicaRequest)) { return; } @@ -983,12 +1006,10 @@ private CompletableFuture stopReplicaInternal(ReplicationGroupId replic /** {@inheritDoc} */ @Override public CompletableFuture startAsync(ComponentContext componentContext) { - ExecutorChooser replicaMessagesExecutorChooser = message -> requestsExecutor; - - clusterNetSvc.messagingService().addMessageHandler(ReplicaMessageGroup.class, replicaMessagesExecutorChooser, handler); + clusterNetSvc.messagingService().addMessageHandler(ReplicaMessageGroup.class, handler); clusterNetSvc.messagingService().addMessageHandler(PlacementDriverMessageGroup.class, placementDriverMessageHandler); messageGroupsToHandle.forEach( - mg -> clusterNetSvc.messagingService().addMessageHandler(mg, replicaMessagesExecutorChooser, handler) + mg -> clusterNetSvc.messagingService().addMessageHandler(mg, handler) ); scheduledIdleSafeTimeSyncExecutor.scheduleAtFixedRate( this::idleSafeTimeSync, diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/configuration/ReplicationConfigurationSchema.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/configuration/ReplicationConfigurationSchema.java index a53ac60c03ed..9d762def24d7 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/configuration/ReplicationConfigurationSchema.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/configuration/ReplicationConfigurationSchema.java @@ -70,4 +70,14 @@ public class ReplicationConfigurationSchema { @Range(min = 1) @Value(hasDefault = true) public int batchSizeBytes = DEFAULT_BATCH_SIZE_BYTES; + + /** + * Maximum number of in-flight partition operations (queued or executing) on this node. + * When the limit is reached, new partition operation requests are rejected with an overload error. + * Applies to both replica manager (inter-node) and client connector (thin client) partition operations. + * Zero means no limit. + */ + @Range(min = 0) + @Value(hasDefault = true) + public int maxInFlightPartitionOperations = 0; } diff --git a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java index 22e0b5704e4f..ed566b53bd01 100644 --- a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java +++ b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java @@ -88,6 +88,7 @@ import org.apache.ignite.internal.thread.ExecutorChooser; import org.apache.ignite.internal.thread.IgniteThreadFactory; import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.raft.jraft.option.PermissiveSafeTimeValidator; @@ -158,6 +159,7 @@ void startReplicaManager( Set.of(), placementDriver, requestsExecutor, + new PartitionOperationInFlightLimiter(0), () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS, new NoOpFailureManager(), marshaller, diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index c470b932a80f..225a858e63ee 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -237,6 +237,7 @@ import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter; import org.apache.ignite.internal.util.ByteUtils; import org.apache.ignite.internal.util.Cursor; +import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.internal.version.DefaultIgniteProductVersionSource; import org.apache.ignite.internal.worker.CriticalWorkerWatchdog; @@ -635,6 +636,7 @@ public CompletableFuture invoke(Condition condition, List su Set.of(PartitionReplicationMessageGroup.class, TxMessageGroup.class), placementDriverManager.placementDriver(), threadPoolsManager.partitionOperationsExecutor(), + new PartitionOperationInFlightLimiter(0), partitionIdleSafeTimePropagationPeriodMsSupplier, failureProcessor, new ThreadLocalPartitionCommandsMarshaller(clusterSvc.serializationRegistry()), diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 41844c92b345..2bb4649ec5b2 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -298,6 +298,7 @@ import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage; import org.apache.ignite.internal.tx.message.TxMessageGroup; import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage; +import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.internal.vault.persistence.PersistentVaultService; import org.apache.ignite.internal.version.DefaultIgniteProductVersionSource; @@ -976,6 +977,10 @@ public class IgniteImpl implements Ignite { var validationSchemasSource = new CatalogValidationSchemasSource(catalogManager, schemaManager, indexMetaStorage); + PartitionOperationInFlightLimiter partitionOperationInFlightLimiter = new PartitionOperationInFlightLimiter( + () -> replicationConfig.maxInFlightPartitionOperations().value() + ); + replicaMgr = new ReplicaManager( clusterSvc, cmgMgr, @@ -984,6 +989,7 @@ public class IgniteImpl implements Ignite { Set.of(PartitionReplicationMessageGroup.class, TxMessageGroup.class), placementDriverMgr.placementDriver(), threadPoolsManager.partitionOperationsExecutor(), + partitionOperationInFlightLimiter, partitionIdleSafeTimePropagationPeriodMsSupplier, failureManager, raftMarshaller, @@ -1358,6 +1364,7 @@ public class IgniteImpl implements Ignite { eventLog, lowWatermark, threadPoolsManager.partitionOperationsExecutor(), + partitionOperationInFlightLimiter, () -> suggestionsConfiguration.sequentialDdlExecution().enabled().value() ); diff --git a/modules/runner/src/test/resources/compatibility/configuration/ignite-snapshot.bin b/modules/runner/src/test/resources/compatibility/configuration/ignite-snapshot.bin index 31dac9e2ca7599b09cc94379fb8dad96bb3351d5..c3e11d419825a3babb2f0d34c9d9566df88c2322 100644 GIT binary patch delta 5534 zcmV;P6=CYPEZ-~*P)h>@6aWYa2mn?(ijfUCe@L>EGbH2yGgGIwrcMc4fHuN%Y&i+L zHUIree&7cN+JHbd`Z7$QTaRw_L+WnnpBCcWBjbQ$zxDe+EeFkU>-QF!I)vlao0dh9 zFMen-Z`wvKvSzp){njQ9$DV^|+hWdyOal*b!kqR!vvD{3%~0UXk!b3XIJRi1*Y9?E zf32TyTF7x24?#o+fBq1C%?Pz6{vC4VSS(7g!$o5bwm9?TS3Ys36wCjW4!el4Y5Wmv zdxjjFVy`fZZoZ+i39ZavIf|0La*d~G?Wo~zhz63_uE$*LaT3ny=g%^dL?arJIc5RB zBa{*!LYFeMlsJNwdrapUQDR5`9Uy1Ae=?EQoH+7>FI;>dHA$H>6=2WdynlZy`ktfL zjWGZB?%n(MzrK6>{?l(CKm7XZ$KT%m`m>BeqNxc*Q~xu59STB-CVp>ycqiJSc#5pW zLo~33>_bknK$(_k(8d&VY(IE-LS93lA3x={6BJPX!02MmJao0+@JSY(y~yn(e--zDn%6hZ|7 z1#;mW%wL%ISXh%opPt6?d9$I#ievs3$yMkJ0^BR&&aemasA9=3xh)OXQ?eFyg^D|*mEJ|1GnC!9Rvf~rfTc&Y6si2C^F^3NP_iD_N}fBxBWm?IY^ z3snb_V&vhcfOxp|LtpKQ!L}OL-bg><#ZZVf226Gdx`!_O61SnvPS6z-5#xj~dkLyU zVl#6XV#~uEGJGwTkc&VnF9d6Vo|n20Fx?QW_R}pJ!n?P}rnndQ9IB--Dg`if{QcMX%?Hiu$7)b0zFsCl;dLnfC|`auqN@zQIcfTyzccEotK-Q1Xl&?EBpr zwjOWg6vW~J%hD$7Gm@nZ**c{ZQk1tN=GdW>yqse;A&IVeL(nQo`d>)A!-ndya+A@f@a1=bBvo~+?;dfsU~jDv3K}$%#qKTho%^4_lp<}u>Y=LHwnjh ziF_y>N{|{wiXMY{f~z=9AVf}cZKFU6yz@RFnbu)Fz>H4re+mrR6>IeiUih#G{uzaT zm-JYCOESM&`VUM(BaX9LKr5D~K5 zOF)mR_cPS*B?C|tKErdj2Pa1ky!NJ03#ptSLIV$fjw+t(d1J zw(A-d_S?bSe~mXReJ36zL}!(u)wif8Gad24IHzJDRU_PhMz-yeH6;&Y^UGi|!Jf>p zxLm*zF-wrN$;~+BMFM%c5J?v`T$`=xxd>x(t1YoFcUxuc{uBMQj^VRi)MSk07k4wF zBiZXsP6LC5I;Es zyocM-jqc^RTQ;VR0}}7a9|RRDr_-*--C`d0e~&$tbge2|e95!n!m%pTXd}9EGL`qP zh*G|VDDDPhkmt07-9xH|Nh3N+(*QpO*ypjxSCgg!uIgo?W;slm?-RGOz?7mX7nFRj ziUcI;WPn|a_+3CDDRnC-^;`=XJrwvZvBYTQY=}z&&jjTtqkFy<7-oJgF3qB9T0oKD}&0&cSx=idRr z`~iD}+3>66*5aDjuJn~k=%Z@S9rsk#i7dx6+{yjCKdxIunQr&lD9c3__8GNRt>`Bn;z_jgc~zds*$l9g^am(^iuvwqPbiqqaHLob zhDj9`^7*csFde8-hUzn{JQhH5SR=Rtx#Z9l6ekVW$3?GYD6;->NU=x+?sr7^sLE?r zrD)pl<&@#8JY`hf0mmWHD+#3yPkw<2%e#--ippVp2~8?qJv4!6+Yk)o&q&C^iW!Kn zv^hkxiSGrjjACwopLGhm>(sB37POf^Wz)OxQCd$fI@qlgt2o*ylg4e=!nZrCyz{B* zjlu`^g1!D^^ulW6I{M%h?VEL60~@*T?X3R7u>gx9f~#-*o=8wihYoW68RJz`&^Dqg zFJ~z4U6Hpe3|VZ!x-0-)rCr203_XtfW@k9>{}4=VF#^PY@f4C-y@FC-`4D>b)|pT; zo$>xN_9m3Q3=rrk@pYIfEo>HyMFVvqh1iq2Vt}}+jwr|5mZo};J#@BTMQXvr+*^cI z1QiHUKsg32k}g$}q58^LK{^sC6sH9r+LzSSG(0mZhDh-Mb`X#Q-*s2~Ez^ij8{=X0 ze$-`PPjQZa7@**J;7~FbL$cSz&(q&GIvd#`>`LITNmo%j)j~(Cb%Z>2(Q~|c=}EB$OnnvSFc!UzvZci_y#_-F zw+^64fdqr876Dm|6o{|1aR}@HQ9)YHECz|O<-qHIyIG!b?vB`J@vlF?Hu10}2Mn11 z8pQfiqMnHe&8Ue!Y)$4GO=F5O*VEP%WvVHiFh!ZBDAN>WnqntY>|~0aYQH5k9`flH zyQzmYCc>BqVhQ_bh3V`Q_ zDeS&~G`LP$(u){8e90S{c$S6M$9DoaP9{sWq6rq3#Q|M6n?UX>9}iJGMmmgiXf7QZ z3?Y01S7uE>40sF)6VM1azX<(yoTH(QTw-TR91IB=B#;M~{dL%sHZ(v)0QdwH|4v~m z(ZLn}%&}O1i^8z|aNg~Qs8Q_j2LX8(Lh4CZEYYU*=rRJ*;~C(?l50t%?y?6BDO4@z zxExO#^VhgHC{ucmLU*GYj}jbh_|q`=M2v~e1-&FiD)46o$M8p8U0U>t(-nRuAz_3O zvoOdmtVPv^TM%uNY6~9-KqxM;dw-NFlaUKEe}QSeozDXO(D4W z^!~RG2U)=hqnf*Pk&w+f=y*lK9rUaqsYcZOV8uwck?xvGw;9>`<%Y_XO zxYl=x749St=>&{WoV!rSqbZjnORPMH--#bUhf;ZrMpx7gI`>!oyZ`nEU&Xh>>p$+U z??=~H@!#EEzxze!(mKDw#?mj@RI0k^f5|`QzGyckA0J#CQNgdCSnR0Y<#b8 zby($SDG#q+3ncw13c83*Yn9pOz|~Ja?oMkhm^t7%pdN&r>+GcYlHnTNXLR4$c|LDi zFP;#$%V09WUKPDpil$G9`GomXMbc=jtF^t*?4sHTTJa)EY(<=X-d>(rw9{0~e|F&G zj=bdkMp%klc7rT|ByY5i+eqr2r6C!4$ngam#b)h))_Ff67R;38*LeJ_Ub(S)B#yLA zSGU8_;P&#X*gS&lQ)CW@u&eGfz_8&XjwqWJFCkjCGV_>2{tmy@CZ=M`*vM7f5IC$- zhSdb!wo~AALu7Y{@=^V^eKx zmNk?23Q>sKTY~OGfNspbSxFC+uK6{*;=6!aH7o-y{gA<`@R+ebTnk9ru2hN>keBtwJfuvm}6 zHLilyXoOGcY)^Y;8c5?Vmu)~MG1Lv;30oTB42l?9D|NiC60KUE}U#PhN z12s_!Yc!F%3L@?;1S%1>XFEr-&>h=1GF1g=1;xluoH?gbe6)z8 z9Qg59*gLA`?bpmvp?UkY(R(-Z_N(Tr*1Y{{9`@V&D$p}}*iQ?$#k|OA1lI^|?F3iT zA?%W15$B#M%4g`YY4uzNIV8YRGGi~)qd}*8Ek3WVFTegFK5y@D`r_kzXK;UeFTc1R z4Eh7{*}u8bm$8R&C$zXHB&?V8hy{um#E>8#Ck9@vmlWS>vzx-NF7tTd%yz$z- z`h&A&f0byse?%+Hyf6B?j(@A>HHM>vxQ$8I_b^W?2VQ?44?5XeF| z+H%E*`P09NtFK8iNr;D_;S3`i7Qss(52D9EM~&@TW)+PEJZR-yzi0eXH2PQ%t{>`b zXumT_u{~urICmoluKuFLs5?dJL?-dKU~i_LEANgA3G;u$8IOaB&W4ja5h8!+DWj(} z7hI#KbV$_EU{zj&Uew)g;h_fzPpJ=O6aJdN9?Ks5YyP^MEs$e0cEH$yhNe5o7juaf z(i_0PEHp?g69ncJ_){Z`v z|0ledVH>R!g};RZ=;O}R!{H;|9m~RQh$=_XBXh#RyoR=<54S(QeS1oVZzY&YmJ8>M zM0BkD@W5t#ec1aB#@-7F_x5lcU8Rh=i*Rn88%m}#{(uBkWS4z0poD+akSka=HK2`o zZQOM^v6r@_b$Dx8WjTbjq74s!S!K{~5aB$SY;L5MA_?`}@HrzEz^&*Ciaz@E;!)#l6?Gfd6uZNu}DE3#x&_)F|Y?oLnFvosEYzlc3 zc?x_BIj)1xv;7Z!%Ef;lzoWXua@)>jkyX3Dzg7Msr_n~57)0& z9Cl9W2dB=$J@#w^A6E7k!5vD9zWU`d10Y(h^6EF$b2g-+Jz7FQ6|n@ZyWo_KK!Zn; zYnv!_J_VdwklTNM0#(sJ0juZ*U_qVP#E%=a+ruY%mHw%XGb%K;*rt!K6zXR;W3v!#2u?|(6 z6U@dbH8GiREs5?cg_FwhjP+?n&x>Vgqv}pA(M4BMFwBL-erF<-hu9G_2NWFGh;HPq z!8Xg6j1-y{RP31nK7{33wx*4_LIGz3JP{HA$$QE|ACw-Qv|(*Xvvgj&Gi*I3kyW^g z*^IA^jxm2xNizgRF%1tnK6tN~g$C&s{#p=)QQCqwN(7COV>3r=P^1zVLG;E7${D$p zpN_mlQT%Rmf6Nhaddz|(*_9w^qs<$~AsZu`3KEMp<&#n}p3Eq*APFC9QFl`>S8JNn zM%VDyTf58VF7m=4J3!^;5gL8N=zVAObtn^0d;G_?5i-xS@6bi%Lby`AGh)=n zTE-l0R8=pEUcSqx>vmyX38LX$!n-VSl$V}D7?uKS#H1q>TpU{&axhD{e*9=0GhUqm z_QOL<@kShy{Zc^Chcyk86Acc?%7$=Jbp-zpP)i301pQJy#T5Vmv8Vw6P)h*<6aW+e g2nYxORyvAY1pQJy#T5Vmv8a>z6*vY5761SM0Ibl5@6aWYa2mrW*YLN{%e+b#h84_}UnWPSLfHuN%Y&i+L zHUIree&7cN+JHbd`Z7$QTaSLJrS6vgX(7%%@(^(Bw|@Vp<)Ark{oW!|hj83_)3PY? z#Sbm!P20#t)(p3!-`d3C*mDqVTg;h|Y2YDFnA5&vHtuG>83~*@5=}i4#}+O12i;D; zfA#ZC3pozsA&BVU&mW?%8KJhszeTPbi$w``xM=kCw%{Npwp_LgdM^VyOuJIJD9X0$7(LfU0^_Yu2PQp3;{8>hlXhdT&$1LEt zgi_)|=u(E35=XFdkLf%kO6=&rL*z_XeVL+sBS8q!#P6*S??gKkPm#5_ zj|R4oeaJ}`DAN)R+L&UF?RyVT$ZH7nC@H zKrKIg{P_OkArw&!>3!_jBkVn6f3NpyVK-cEIkFzd9oC;9e1ThCPr+6+^B!^|g_u$Pw-Ydf@iwKXbq(OY;)&f6tb~94VMA zG#yB)k%ylG;^Ed0eXS=3uhqEri}WL2jD%Waz+{)8d)UG+aU0s~1YI!^F-{1xm!L{C zHZzA2wmi%s!`DKDTm&+CAy@4spnpKjR@-n~6G#l67iP%D)IHg?L`DPyOM zoial>rrbH=uTDPK&-}3(nhs0;~EkNM4OVSEQbr2 zMGAX(OLJ_y8ck_CRvOITIMi1ZYg2S|EvSh7KAB)n<`^6gEk!dSQFUJU%)`jOe`kyW)u(`96zB+? z;moa?SFj0NA9YgBv`=T~3wVSR3(@b)djtl#3K$^Y;3Whux(4}{v~dwAdBzU*{q77~ zk2i7(Lb$-XvHf1ZY6?M;+Y!sF7V@58b891EZ0Ib@d+0q&6ZJPI-UOe}KRtZvqP zdJl^oB#^Z~Pd9FkadV8DW856$=A1K6HF0x}y~Celj(pBMG{wMmzlhO*_1_ijCgB(_ zkq@Oq2~wj-(PvOka22NsgveR0Z4^j>cismi(>kmNn6b%Se}O@}LRP=vg%6A1pHcXC zNsq<1B=Z}_^hM%R*L(hDBRI$Kw~bxO7IRq5PzD&J_Yr&`zAUKe)v_UdHh|0u5h1(1 z1oWtOKSTX~G5|&4Gdy?uaB}3pYrhm~p_LOvXyD<`QRO{=2L%L>cE&gb6}pbT74x*j zc3q>wemj`EfANN8;KZYZ*sLW>fUEEFcIEhge?J*DA@!-co!XOG$hE?rGe+NkU@)pA1ln$aE^(v<=;HalV z<**}Z^c@?x`)G4GP@~O8n~gTFAA(%+2Hs~x`B7KtsLOrV>UiKr$Bm9RQOCt>tENnA znk3!4ngdT3ue%VI#E^A8>Nw0o^HY!Wy!a*pSO!A~xyJUV zkICOee@$pJd7oV(>yff4VEQSnIet=>=gPid>~?5;HP%*+q|kLWq;k%E(K9HHumr7@ z1nPceBOBtt$MW`55DQZ1`pO))#D41vQ?!8iDk*SXXRL*rVeaLPllC5I;Es zyocM-jqc^RT{fnT0}}7a?*$bqr_-*--LgFFe;<1)=~`8`_>yO%g=1Bw(MELTWGe4n z5v6=3x)>u!l;Dn+J%DKAkdl%GfEb?UV*Xh+hA>CXT&Bq3&qh8Q*kU zWsU!n{J2htGTrX8QI`Bn;z_LYc~zds*$lCh^am(^hWYkuUl^FqXsp-_ zhDj9`^7*coFde8dhUzn{JQhH5SR=Rtx#Um@ij#)xAj8{!T+la2b zoT0pTMc%Sl$l?{O%L33<+C_}R(Brsoc82r*55d$HBS0K~Pa&DrD=78V4`EktoCzh< z86P}jZ$jD25P_Z&Ux%5}!e+sPXrL~n5PMQr3=mhf5#@N>(o_$!htBq^$Sio6dyBA& zpaMY(D94~h(xpl=RF{kuq$80+aa!=9eMwE1hG#~_5GfwQ4gzxEyY7m=Wg5|GV?2u9 zkGc%(Db5jp0~9@P!E=$xi+}wgwuy%=Ibgv2*C5uH z67@_(Xhu!+VQVtiSQ=B5xt_MBC{s=8gel52MVY23(-b?IVkcAVRQoNV@sLj^?4};p zED>gjFiV8iON1Fhe4gkr40=b5db~`#lKV7N_23= zKXWYPZ&4VwAI`gdA2o^{elH+zLr6V|CEB!99$iL2c02=oSaL0i)Lr(VA%&{t9GBy1 zv-~yg4a$_>hFcJ8lWhwh2v8U$h!>{7oQSXo2-rcx&760As54vA;t~v56c-(%8x2@_59RHXOb0L<7IN zmD|z0%b^?7ZcIB7a#&^0orhO%1%HzM6rDy3Z@$XxbKvSHA9ts<70m2>4`>G=8{VBP zUou={`;6^7JJ07$>%|l5b{R}2*sEgqO40O*y`3T%3BenhNW;7n&Tz(z*u0Tp3 zS(g#)s{0HuZ1{*H%BICjh?cF)Jm!$U!*8{Tsn{|$auqiO4y%-5HDL+gDR8Rz0Bwlzko%c1b@j$fC>r761urTZ$Fq$h36JY%67|~f!<+d8Kvmj7-Ank z1=!&P(JMPG%7)OQ9B~)2l(wV|h0f*z4y)?3Exnp)uet%R`8HMf|u`TYoP#L6h*|8!7iU zl}mFrH(*ppWxiBq`rDFpQ{bKDH<{L9g*U_4uE3yOZR25he|;#)p8yjq)j-?P>3< zJ@#xL5VvL{?AX^{cyw#Q3!~Pen!TKKoyz!JW=qYAP%} z#NG+-xq(5hn;Rud|ve~zy2XUZ|<%K;^TW~cz1IrzvvBzgQ55wTwm+U*u%IJ zTHF&7)=PTC0!0jBNRW>c1FzOgif^^qP2pFUdAx9DyWh>rj8k|0!P&CEN;F)e6=psV zef8qss(*Qn;V2<)W772<%#+H2*WbrDFaJ@#+~;B6C!i|?ve1oQx#Gk8>EFcFcS$ly zh=-uz3?myB!Al?yV#hy6jqO@y6^#WvXyshLWBgJy`dAOHAL?vqzcWd(J!LjHcOwVB zEpKCQdlh%5D4ob8{ub7osprbOv7mm`-EQHaWUMA4WFkT(o48wWh`4}I?UE}kjiT6hJGf6Ae2=|oS|b*C z7DNcvAl7%Ed$y<4hguWi_0F-G$c>uH;S2i1Os)R5uxaTP{J2-rG~%nydK$jKgr9?B{WjxwFaRFP0MTldoxQ0lupxEP zkq7}z#1gdbic>ZM4W8PqZMx0*lmTi%ZvP2Xi~R(w#TI}Cb7m7iZqROzp6FHjr#8;0 zf3VnMn?9D(1fWG0`T#WPrp;YNjD%h5aSXzU&GU#Oiihk)3@QL!6|;(C&Vm%i4P#1Q zd&mOrHCJ&`2H7l-Vy!ugE_7vx78c#5@-|+`?LGSiXK@-cuI(p!DdZ z4QoT1rSsaIVe2u8tinyqW_)dIjEPE`At;Jzc*ya=d&Mj?$hPp;f+~#D7PK)Uf9L@@ zHgm)VMH+!I#BQvhoRM4k>Bu6A;@5u%4;&Gv&n!riT?vvl*1T~XvN5u$AhBpuK1?Lz z$i14H!Wji~w(8_PLe>j@ga_n?}GMB|Gf|98>0cXJmWPMg~ DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS, new NoOpFailureManager(), mock(ThreadLocalPartitionCommandsMarshaller.class), diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java index 373e2c35d2ef..6713abbda5ae 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java @@ -163,6 +163,7 @@ import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage; import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource; import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage; +import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.raft.jraft.option.PermissiveSafeTimeValidator; @@ -456,6 +457,7 @@ private void startComponents() throws Exception { Set.of(), placementDriver, partitionOperationsExecutor, + new PartitionOperationInFlightLimiter(0), () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS, failureProcessor, null, diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java index df1c99bbbee2..b5853ab2a525 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java @@ -190,6 +190,7 @@ import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.Lazy; +import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.internal.util.SafeTimeValuesTracker; import org.apache.ignite.network.NetworkAddress; @@ -506,6 +507,7 @@ public void prepareCluster() throws Exception { Set.of(PartitionReplicationMessageGroup.class, TxMessageGroup.class), placementDriver, partitionOperationsExecutor, + new PartitionOperationInFlightLimiter(0), this::getSafeTimePropagationTimeout, new NoOpFailureManager(), commandMarshaller,