Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ private CompletableFuture<HybridTimestamp> denyLease(ReplicationGroupId grpId, L

leaseNegotiator.cancelAgreement(grpId);

Leases leasesCurrent = leaseTracker.leasesLatest();
Leases leasesCurrent = leaseTracker.latestLeases();

Collection<Lease> currentLeases = leasesCurrent.leaseByGroupId().values();

Expand Down Expand Up @@ -418,7 +418,7 @@ private void updateLeaseBatchInternal() {

HybridTimestamp newExpirationTimestamp = new HybridTimestamp(currentTime.getPhysical() + leaseExpirationInterval, 0);

Leases leasesCurrent = leaseTracker.leasesLatest();
Leases leasesCurrent = leaseTracker.latestLeases();
Map<ReplicationGroupId, LeaseAgreement> toBeNegotiated = new HashMap<>();
Map<ReplicationGroupId, Lease> renewedLeases = new HashMap<>(leasesCurrent.leaseByGroupId().size());

Expand Down Expand Up @@ -779,6 +779,20 @@ private void processMessageInternal(String sender, PlacementDriverActorMessage m
clusterService.messagingService().respond(sender, response, correlationId);
}
});
} else {
long time = lease.getLeaseholderId() == null
? clockService.current().longValue()
: NULL_HYBRID_TIMESTAMP;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please explain why we return NULL_HYBRID_TIMESTAMP? Why not lease.getExpirationTime().longValue()?


LOG.info("Stop lease prolongation message was received from non-leaseholder "
+ "[groupId={}, sender={}, leaseholder={}, time={}]", grpId, sender, lease.getLeaseholder(), time);

StopLeaseProlongationMessageResponse response = PLACEMENT_DRIVER_MESSAGES_FACTORY
.stopLeaseProlongationMessageResponse()
.deniedLeaseExpirationTimeLong(time)
.build();

clusterService.messagingService().respond(sender, response, correlationId);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if correlationId is null?

}
} else {
LOG.warn("Unknown message type [msg={}]", msg.getClass().getSimpleName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public Lease getLease(ReplicationGroupId grpId) {
}

/** Returns collection of latest leases, ordered by replication group. Shows all latest leases including expired ones. */
public Leases leasesLatest() {
public Leases latestLeases() {
return leases;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private int numberOfLeases(boolean accepted) {
int count = 0;
HybridTimestamp now = clockService.current();

for (Lease lease : leaseTracker.leasesLatest().leaseByGroupId().values()) {
for (Lease lease : leaseTracker.latestLeases().leaseByGroupId().values()) {
// Expired leases can be ignored.
if (lease != null && accepted == lease.isAccepted() && clockService.before(lease.getExpirationTime(), now)) {
count++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ void setUp(

when(clusterService.messagingService()).thenReturn(messagingService);

lenient().when(leaseTracker.leasesLatest()).thenReturn(leases);
lenient().when(leaseTracker.latestLeases()).thenReturn(leases);
lenient().when(leaseTracker.getLease(any(ReplicationGroupId.class))).then(i -> Lease.emptyLease(i.getArgument(0)));

when(metaStorageManager.recoveryFinishedFuture()).thenReturn(completedFuture(new Revisions(1, -1)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ CompletableFuture<Void> weakStopReplica(
// If is primary, turning off the primary first.
context.replicaState = ReplicaState.RESTART_PLANNED;

LOG.info("Stopping lease prolongation due to partition restart [groupId={}].", groupId);

return replicaManager.stopLeaseProlongation(groupId, null)
.thenCompose(unused -> planDeferredReplicaStop(groupId, context, stopOperation));
} else {
Expand Down Expand Up @@ -318,6 +320,8 @@ private CompletableFuture<Void> stopReplica(
// These is some probability that the replica would be reserved after the previous lease is expired and before this method
// is called, so reservation state needs to be checked again.
if (context.reservedForPrimary) {
LOG.info("Stopping lease prolongation due to replica stop [groupId={}].", groupId);

return replicaManager.stopLeaseProlongation(groupId, null)
.thenCompose(unused -> planDeferredReplicaStop(groupId, context, stopOperation));
}
Expand Down
1 change: 1 addition & 0 deletions modules/rest/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ dependencies {
integrationTestImplementation project(':ignite-table')
integrationTestImplementation project(':ignite-transactions')
integrationTestImplementation project(':ignite-eventlog')
integrationTestImplementation project(':ignite-placement-driver-api')
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need this?

integrationTestImplementation testFixtures(project(':ignite-core'))
integrationTestImplementation testFixtures(project(':ignite-runner'))
integrationTestImplementation testFixtures(project(':ignite-cluster-management'))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import static io.micronaut.http.HttpStatus.BAD_REQUEST;
import static io.micronaut.http.HttpStatus.OK;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
import static org.apache.ignite.internal.rest.matcher.MicronautHttpResponseMatcher.assertThrowsProblem;
import static org.apache.ignite.internal.rest.matcher.MicronautHttpResponseMatcher.hasStatus;
import static org.apache.ignite.internal.rest.matcher.ProblemMatcher.isProblem;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;

Expand All @@ -36,20 +38,33 @@
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.ClusterConfiguration;
import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessage;
import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
import org.apache.ignite.internal.placementdriver.message.StopLeaseProlongationMessage;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.rest.api.recovery.RestartZonePartitionsRequest;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.wrapper.Wrappers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

/** Test for disaster recovery restart partitions command. */
@MicronautTest
public class ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPerClassIntegrationTest {
private static final PlacementDriverMessagesFactory PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();

private static final String NODE_URL = "http://localhost:" + ClusterConfiguration.DEFAULT_BASE_HTTP_PORT;

private static final String FIRST_ZONE = "first_ZONE";
Expand All @@ -65,10 +80,17 @@ public class ItDisasterRecoveryControllerRestartPartitionsTest extends ClusterPe
@BeforeAll
public void setUp() {
sql(String.format("CREATE ZONE \"%s\" storage profiles ['%s']", FIRST_ZONE, DEFAULT_AIPERSIST_PROFILE_NAME));
sql(String.format("CREATE TABLE PUBLIC.\"%s\" (id INT PRIMARY KEY, val INT) ZONE \"%s\"", TABLE_NAME,
sql(String.format("CREATE TABLE %s (id INT PRIMARY KEY, val INT) ZONE \"%s\"", TABLE_NAME,
FIRST_ZONE));
}

@BeforeEach
public void beforeEach() {
for (IgniteImpl node : runningNodesList()) {
node.stopDroppingMessages();
}
}

@Test
public void testRestartPartitionZoneNotFound() {
String unknownZone = "unknown_zone";
Expand Down Expand Up @@ -135,7 +157,6 @@ public void testRestartAllPartitions() {
}

@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-26377")
public void testRestartSpecifiedPartitions() {
MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(), FIRST_ZONE, Set.of(0, 1));

Expand All @@ -151,6 +172,64 @@ public void testRestartPartitionsByNodes() {
assertThat(client.toBlocking().exchange(post), hasStatus(OK));
}

@Test
public void testRestartPartitionDuringLeaseNegotiation() {
IgniteImpl node = anyNode();

int zoneId = Wrappers.unwrap(node.tables().table(TABLE_NAME), TableImpl.class).zoneId();
ZonePartitionId partId = new ZonePartitionId(zoneId, 0);

CompletableFuture<ReplicaMeta> primaryReplicaFut = anyNode().placementDriver().awaitPrimaryReplica(
partId,
node.clock().now(),
10,
SECONDS
);

assertThat(primaryReplicaFut, willCompleteSuccessfully());

log.info("Test: primary replica [groupId={}, leaseholder={}]", partId, primaryReplicaFut.join().getLeaseholder());

CompletableFuture<?> newNegotiationFuture = new CompletableFuture<>();

for (IgniteImpl n : runningNodesList()) {
n.dropMessages((recp, msg) -> {
if (msg instanceof LeaseGrantedMessage) {
LeaseGrantedMessage lgm = (LeaseGrantedMessage) msg;
if (lgm.groupId().equals(partId)) {
log.info("Test: new negotiation begins [groupId={}, leaseholder={}]", lgm.groupId(), recp);
newNegotiationFuture.complete(null);
}
}

if (msg instanceof LeaseGrantedMessageResponse) {
log.info("Test: lease negotiation tries to finish [accepted={}]", ((LeaseGrantedMessageResponse) msg).accepted());
return true;
}

return false;
});
}

StopLeaseProlongationMessage stopMsg = PLACEMENT_DRIVER_MESSAGES_FACTORY.stopLeaseProlongationMessage()
.groupId(partId)
.build();

for (IgniteImpl n : runningNodesList()) {
for (IgniteImpl recp : runningNodesList()) {
n.clusterService().messagingService().invoke(recp.clusterService().topologyService().localMember(), stopMsg, 3000);
}
}

assertThat(newNegotiationFuture, willCompleteSuccessfully());

log.info("Test: partition restart");

MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(), FIRST_ZONE, Set.of(0));

assertThat(client.toBlocking().exchange(post), hasStatus(OK));
}

private static Set<String> nodeNames(int count) {
return CLUSTER.runningNodes()
.map(Ignite::name)
Expand Down