diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java index b5cbf6501444..9fd6526d73a7 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java @@ -249,7 +249,7 @@ private CompletableFuture denyLease(ReplicationGroupId grpId, L leaseNegotiator.cancelAgreement(grpId); - Leases leasesCurrent = leaseTracker.leasesLatest(); + Leases leasesCurrent = leaseTracker.latestLeases(); Collection currentLeases = leasesCurrent.leaseByGroupId().values(); @@ -418,7 +418,7 @@ private void updateLeaseBatchInternal() { HybridTimestamp newExpirationTimestamp = new HybridTimestamp(currentTime.getPhysical() + leaseExpirationInterval, 0); - Leases leasesCurrent = leaseTracker.leasesLatest(); + Leases leasesCurrent = leaseTracker.latestLeases(); Map toBeNegotiated = new HashMap<>(); Map renewedLeases = new HashMap<>(leasesCurrent.leaseByGroupId().size()); @@ -779,6 +779,20 @@ private void processMessageInternal(String sender, PlacementDriverActorMessage m clusterService.messagingService().respond(sender, response, correlationId); } }); + } else { + long time = lease.getLeaseholderId() == null + ? clockService.current().longValue() + : NULL_HYBRID_TIMESTAMP; + + LOG.info("Stop lease prolongation message was received from non-leaseholder " + + "[groupId={}, sender={}, leaseholder={}, time={}]", grpId, sender, lease.getLeaseholder(), time); + + StopLeaseProlongationMessageResponse response = PLACEMENT_DRIVER_MESSAGES_FACTORY + .stopLeaseProlongationMessageResponse() + .deniedLeaseExpirationTimeLong(time) + .build(); + + clusterService.messagingService().respond(sender, response, correlationId); } } else { LOG.warn("Unknown message type [msg={}]", msg.getClass().getSimpleName()); diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java index e112fa6b896e..f2ade824a349 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java @@ -181,7 +181,7 @@ public Lease getLease(ReplicationGroupId grpId) { } /** Returns collection of latest leases, ordered by replication group. Shows all latest leases including expired ones. */ - public Leases leasesLatest() { + public Leases latestLeases() { return leases; } diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/metrics/PlacementDriverMetricSource.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/metrics/PlacementDriverMetricSource.java index bfa15036aa89..a63328c16dbe 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/metrics/PlacementDriverMetricSource.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/metrics/PlacementDriverMetricSource.java @@ -97,7 +97,7 @@ private int numberOfLeases(boolean accepted) { int count = 0; HybridTimestamp now = clockService.current(); - for (Lease lease : leaseTracker.leasesLatest().leaseByGroupId().values()) { + for (Lease lease : leaseTracker.latestLeases().leaseByGroupId().values()) { // Expired leases can be ignored. if (lease != null && accepted == lease.isAccepted() && clockService.before(lease.getExpirationTime(), now)) { count++; diff --git a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java index e23632ddef1a..72d83cd65526 100644 --- a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java +++ b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java @@ -153,7 +153,7 @@ void setUp( when(clusterService.messagingService()).thenReturn(messagingService); - lenient().when(leaseTracker.leasesLatest()).thenReturn(leases); + lenient().when(leaseTracker.latestLeases()).thenReturn(leases); lenient().when(leaseTracker.getLease(any(ReplicationGroupId.class))).then(i -> Lease.emptyLease(i.getArgument(0))); when(metaStorageManager.recoveryFinishedFuture()).thenReturn(completedFuture(new Revisions(1, -1))); diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java index 066e33319a5b..29d53fef1078 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java @@ -282,6 +282,8 @@ CompletableFuture weakStopReplica( // If is primary, turning off the primary first. context.replicaState = ReplicaState.RESTART_PLANNED; + LOG.info("Stopping lease prolongation due to partition restart [groupId={}].", groupId); + return replicaManager.stopLeaseProlongation(groupId, null) .thenCompose(unused -> planDeferredReplicaStop(groupId, context, stopOperation)); } else { @@ -318,6 +320,8 @@ private CompletableFuture stopReplica( // These is some probability that the replica would be reserved after the previous lease is expired and before this method // is called, so reservation state needs to be checked again. if (context.reservedForPrimary) { + LOG.info("Stopping lease prolongation due to replica stop [groupId={}].", groupId); + return replicaManager.stopLeaseProlongation(groupId, null) .thenCompose(unused -> planDeferredReplicaStop(groupId, context, stopOperation)); } diff --git a/modules/rest/build.gradle b/modules/rest/build.gradle index 37b751097eac..978fd32cef05 100644 --- a/modules/rest/build.gradle +++ b/modules/rest/build.gradle @@ -97,6 +97,7 @@ dependencies { integrationTestImplementation project(':ignite-table') integrationTestImplementation project(':ignite-transactions') integrationTestImplementation project(':ignite-eventlog') + integrationTestImplementation project(':ignite-placement-driver-api') integrationTestImplementation testFixtures(project(':ignite-core')) integrationTestImplementation testFixtures(project(':ignite-runner')) integrationTestImplementation testFixtures(project(':ignite-cluster-management')) diff --git a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java index f1cbf89bd8fd..4db33e159db4 100644 --- a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java +++ b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsTest.java @@ -19,11 +19,13 @@ import static io.micronaut.http.HttpStatus.BAD_REQUEST; import static io.micronaut.http.HttpStatus.OK; +import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toSet; import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME; import static org.apache.ignite.internal.rest.matcher.MicronautHttpResponseMatcher.assertThrowsProblem; import static org.apache.ignite.internal.rest.matcher.MicronautHttpResponseMatcher.hasStatus; import static org.apache.ignite.internal.rest.matcher.ProblemMatcher.isProblem; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; @@ -36,20 +38,33 @@ import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import org.apache.ignite.Ignite; import org.apache.ignite.internal.ClusterConfiguration; import org.apache.ignite.internal.ClusterPerClassIntegrationTest; +import org.apache.ignite.internal.app.IgniteImpl; +import org.apache.ignite.internal.placementdriver.ReplicaMeta; +import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessage; +import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse; +import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory; +import org.apache.ignite.internal.placementdriver.message.StopLeaseProlongationMessage; +import org.apache.ignite.internal.replicator.ZonePartitionId; import org.apache.ignite.internal.rest.api.recovery.RestartZonePartitionsRequest; +import org.apache.ignite.internal.table.TableImpl; +import org.apache.ignite.internal.wrapper.Wrappers; import org.hamcrest.Matcher; import org.hamcrest.Matchers; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; /** Test for disaster recovery restart partitions command. */ @MicronautTest public class ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPerClassIntegrationTest { + private static final PlacementDriverMessagesFactory PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory(); + private static final String NODE_URL = "http://localhost:" + ClusterConfiguration.DEFAULT_BASE_HTTP_PORT; private static final String FIRST_ZONE = "first_ZONE"; @@ -65,10 +80,17 @@ public class ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe @BeforeAll public void setUp() { sql(String.format("CREATE ZONE \"%s\" storage profiles ['%s']", FIRST_ZONE, DEFAULT_AIPERSIST_PROFILE_NAME)); - sql(String.format("CREATE TABLE PUBLIC.\"%s\" (id INT PRIMARY KEY, val INT) ZONE \"%s\"", TABLE_NAME, + sql(String.format("CREATE TABLE %s (id INT PRIMARY KEY, val INT) ZONE \"%s\"", TABLE_NAME, FIRST_ZONE)); } + @BeforeEach + public void beforeEach() { + for (IgniteImpl node : runningNodesList()) { + node.stopDroppingMessages(); + } + } + @Test public void testRestartPartitionZoneNotFound() { String unknownZone = "unknown_zone"; @@ -135,7 +157,6 @@ public void testRestartAllPartitions() { } @Test - @Disabled("https://issues.apache.org/jira/browse/IGNITE-26377") public void testRestartSpecifiedPartitions() { MutableHttpRequest post = restartPartitionsRequest(Set.of(), FIRST_ZONE, Set.of(0, 1)); @@ -151,6 +172,64 @@ public void testRestartPartitionsByNodes() { assertThat(client.toBlocking().exchange(post), hasStatus(OK)); } + @Test + public void testRestartPartitionDuringLeaseNegotiation() { + IgniteImpl node = anyNode(); + + int zoneId = Wrappers.unwrap(node.tables().table(TABLE_NAME), TableImpl.class).zoneId(); + ZonePartitionId partId = new ZonePartitionId(zoneId, 0); + + CompletableFuture primaryReplicaFut = anyNode().placementDriver().awaitPrimaryReplica( + partId, + node.clock().now(), + 10, + SECONDS + ); + + assertThat(primaryReplicaFut, willCompleteSuccessfully()); + + log.info("Test: primary replica [groupId={}, leaseholder={}]", partId, primaryReplicaFut.join().getLeaseholder()); + + CompletableFuture newNegotiationFuture = new CompletableFuture<>(); + + for (IgniteImpl n : runningNodesList()) { + n.dropMessages((recp, msg) -> { + if (msg instanceof LeaseGrantedMessage) { + LeaseGrantedMessage lgm = (LeaseGrantedMessage) msg; + if (lgm.groupId().equals(partId)) { + log.info("Test: new negotiation begins [groupId={}, leaseholder={}]", lgm.groupId(), recp); + newNegotiationFuture.complete(null); + } + } + + if (msg instanceof LeaseGrantedMessageResponse) { + log.info("Test: lease negotiation tries to finish [accepted={}]", ((LeaseGrantedMessageResponse) msg).accepted()); + return true; + } + + return false; + }); + } + + StopLeaseProlongationMessage stopMsg = PLACEMENT_DRIVER_MESSAGES_FACTORY.stopLeaseProlongationMessage() + .groupId(partId) + .build(); + + for (IgniteImpl n : runningNodesList()) { + for (IgniteImpl recp : runningNodesList()) { + n.clusterService().messagingService().invoke(recp.clusterService().topologyService().localMember(), stopMsg, 3000); + } + } + + assertThat(newNegotiationFuture, willCompleteSuccessfully()); + + log.info("Test: partition restart"); + + MutableHttpRequest post = restartPartitionsRequest(Set.of(), FIRST_ZONE, Set.of(0)); + + assertThat(client.toBlocking().exchange(post), hasStatus(OK)); + } + private static Set nodeNames(int count) { return CLUSTER.runningNodes() .map(Ignite::name)