Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,20 @@ public void onShutdown() {
}
}

@Override
public long getPersistedAppliedIndex() {
Copy link
Copy Markdown
Contributor

@alievmirza alievmirza Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, I had a great discussion with @rpuch and it seems that we have already fixed the possibility of having gap between persisted and applied index (see https://issues.apache.org/jira/browse/IGNITE-28216), so there is a chance that all this PR could be unnecessary

Copy link
Copy Markdown
Contributor Author

@dant3 dant3 Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IGNITE-28216 fixes one of the known root causes: specific missed lastApplied() bumps in partition processing. This PR adds a detection mechanism: if truncation below applied index ever happens (from this bug or any future bug), we fail the node instead of silently corrupting data.

As a result these are complementary. IGNITE-28216 prevents the gap; our PR catches it if it ever occurs anyway, due to some other unknown bug or nuance -- a similar future bug would again cause silent corruption with no indication of something went wrong.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot resolve the comment, but I'm satisfied with your explanation and seems that this PR is really needeed

// Clamp to 0 because lastAppliedIndex() returns -1 during rebalance.
long result = max(0, txStateStorage.lastAppliedIndex());

synchronized (tableProcessorsStateLock) {
for (RaftTableProcessor processor : tableProcessors.values()) {
result = max(result, processor.lastAppliedIndex());
}
}

return result;
}

/**
* Adds a given Table Partition-level Raft processor to the set of managed processors.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,14 @@ default void onConfigurationCommitted(
* Invoked once after a raft node has been shut down.
*/
void onShutdown();

/**
* Returns the last applied index persisted by the state machine.
* Called during {@code NodeImpl.init()} to prevent truncation of already-applied log entries.
*
* @return persisted applied index, or 0 if unknown.
*/
default long getPersistedAppliedIndex() {
return 0;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we implement it for all listeners, especially for MS/CMG?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should, the same situation is possible there, maybe worth a separate ticket though

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that the real answer is "no, we should not". MG and CMG only use one data storage per FSM, which always guarantees that min(lastAppliedIndex) matches max(lastAppliedIndex), and the corresponding troubled behavior is impossible. This is a distribution-zone-exclusive problem.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, thanks for clarification @ibessonov

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,11 @@ public RaftGroupListener getListener() {
return listener;
}

@Override
public long getPersistedAppliedIndex() {
return listener.getPersistedAppliedIndex();
}

@Override
public void onApply(Iterator iter) {
var iterWrapper = new WriteCommandIterator(iter, marshaller);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,14 @@ default void onRawConfigurationCommitted(ConfigurationEntry conf, long lastAppli
* @param ctx context of leader change
*/
void onStartFollowing(final LeaderChangeContext ctx);

/**
* Returns the last applied index persisted by the state machine.
* Called during {@code NodeImpl.init()} to prevent truncation of already-applied log entries.
*
* @return persisted applied index, or 0 if unknown.
*/
default long getPersistedAppliedIndex() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,19 @@ public boolean init(final NodeOptions opts) {
return false;
}

// Restore appliedId so that unsafeTruncateSuffix() can reject truncation of applied entries.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this entire block of code? please explain

Copy link
Copy Markdown
Contributor Author

@dant3 dant3 Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block restores appliedId from the state machine's persisted applied index so the guard is effective immediately after restart, before any entries are re-applied.
After a node restart, logManager.appliedId is transient and resets to 0. The unsafeTruncateSuffix() guard checks lastIndexKept < appliedId.getIndex(), but with appliedId=0, it can never reject anything. This effectively leads to possible data corruption.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this explanation is worth copying into the comment in the code.

long persistedApplied = this.options.getFsm().getPersistedAppliedIndex();
if (persistedApplied > 0) {
long term = this.logManager.getTerm(persistedApplied);
if (term > 0) {
// Term is 0 when the index is outside the log (covered by a snapshot) — skip in that case.
this.logManager.setAppliedId(new LogId(persistedApplied, term));
} else {
LOG.warn("Node {} persisted applied index {} is not in the raft log, expecting snapshot to cover it.",
getNodeId(), persistedApplied);
}
}

final Status st = this.logManager.checkConsistency();
if (!st.isOk()) {
LOG.error("Node {} is initialized with inconsistent log, status={}.", getNodeId(), st);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1049,11 +1049,25 @@ private boolean reset(final long nextLogIndex) {
}
}

private void unsafeTruncateSuffix(final long lastIndexKept, final Lock lock) {
/**
* Truncates log entries after {@code lastIndexKept}.
*
* @return {@code true} on success, {@code false} if truncation would discard applied entries (node moves to error state).
*/
private boolean unsafeTruncateSuffix(final long lastIndexKept, final Lock lock) {
if (lastIndexKept < this.appliedId.getIndex()) {
LOG.error("FATAL ERROR: Can't truncate logs before appliedId={}, lastIndexKept={}", this.appliedId,
lastIndexKept);
return;
LOG.error("Raft log suffix conflict: cannot truncate entries that have been applied to the state machine. "
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep log format as everywhere else. nodeId={}, appliedId={}, lastIndexKept={} should be at the end surrounded by [].
Ex.: The partition will be moved to error state [nodeId={}, appliedId={}, lastIndexKept={}].",

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that there should be no dot after ], but I saw people adding it in other places, which confuses me tbh

+ "The partition will be moved to error state [nodeId={}, appliedId={}, lastIndexKept={}].",
this.nodeId, this.appliedId, lastIndexKept);
lock.unlock();
try {
reportError(RaftError.EINVAL.getNumber(),
"Raft log suffix conflict: attempted to truncate applied entries, appliedId=%s, lastIndexKept=%d",
this.appliedId, lastIndexKept);
} finally {
lock.lock();
}
return false;
}

this.logsInMemory.removeFromLastWhen(entry -> entry.getId().getIndex() > lastIndexKept);
Expand All @@ -1068,6 +1082,7 @@ private void unsafeTruncateSuffix(final long lastIndexKept, final Lock lock) {
final TruncateSuffixClosure c = new TruncateSuffixClosure(lastIndexKept, lastTermKept);
offerEvent(c, EventType.TRUNCATE_SUFFIX);
lock.lock();
return true;
}

@SuppressWarnings("NonAtomicOperationOnVolatileField")
Expand Down Expand Up @@ -1121,7 +1136,11 @@ private boolean checkAndResolveConflict(final List<LogEntry> entries, final Stab
if (entries.get(conflictingIndex).getId().getIndex() <= this.lastLogIndex) {
// Truncate all the conflicting entries to make local logs
// consensus with the leader.
unsafeTruncateSuffix(entries.get(conflictingIndex).getId().getIndex() - 1, lock);
if (!unsafeTruncateSuffix(entries.get(conflictingIndex).getId().getIndex() - 1, lock)) {
Utils.runClosureInThread(nodeOptions.getCommonExecutor(), done,
new Status(RaftError.EINVAL, "Raft log suffix conflict with applied entries"));
return false;
}
}
this.lastLogIndex = lastLogEntry.getId().getIndex();
} // else this is a duplicated AppendEntriesRequest, we have
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.verify;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
Expand All @@ -40,6 +45,7 @@
import org.apache.ignite.raft.jraft.entity.EnumOutter;
import org.apache.ignite.raft.jraft.entity.LogEntry;
import org.apache.ignite.raft.jraft.entity.LogId;
import org.apache.ignite.raft.jraft.error.RaftException;
import org.apache.ignite.raft.jraft.entity.NodeId;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.entity.RaftOutter;
Expand Down Expand Up @@ -476,4 +482,46 @@ public void testLastLogIndexWhenShutdown() throws Exception {
Exception e = assertThrows(IllegalStateException.class, () -> this.logManager.getLastLogIndex(true));
assertEquals("Node is shutting down", e.getMessage());
}

/** Suffix truncation below appliedId must report error and abort the append (IGNITE-25502). */
@Test
public void testSuffixTruncationBelowAppliedIndexReportsError() {
List<LogEntry> entries = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
entries.add(TestUtils.mockEntry(i, 1));
}
assertThat("Initial entries should be appended successfully", appendEntries(entries), willBe(true));
assertEquals(10, this.logManager.getLastLogIndex());

this.logManager.getLastLogId(true); // Flush disruptor so setDiskId() completes.
this.logManager.setAppliedId(new LogId(8, 1));

// Conflicting entries at index 6+ (term 2 vs existing term 1) trigger unsafeTruncateSuffix(5).
// Since 5 < appliedId.index (8), this must be rejected.
List<LogEntry> conflicting = new ArrayList<>();
for (int i = 6; i <= 12; i++) {
conflicting.add(TestUtils.mockEntry(i, 2));
}
assertThat("Append should fail due to suffix conflict with applied entries", appendEntries(conflicting), willBe(false));

verify(fsmCaller).onError(any(RaftException.class));
assertEquals(10, this.logManager.getLastLogIndex());

for (int i = 1; i <= 10; i++) {
LogEntry entry = this.logManager.getEntry(i);
assertEquals(i, entry.getId().getIndex());
assertEquals(1, entry.getId().getTerm());
}
}

private CompletableFuture<Boolean> appendEntries(List<LogEntry> entries) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
this.logManager.appendEntries(new ArrayList<>(entries), new LogManager.StableClosure() {
@Override
public void run(Status status) {
future.complete(status.isOk());
}
});
return future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import static org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.CompletableFutures.allOf;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
Expand Down Expand Up @@ -72,13 +74,13 @@
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
import org.apache.ignite.raft.jraft.core.NodeImpl;
import org.apache.ignite.raft.jraft.core.State;
import org.apache.ignite.raft.jraft.option.LogStorageOptions;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.storage.LogStorage;
import org.apache.ignite.tx.TransactionOptions;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

/**
Expand All @@ -98,7 +100,6 @@ protected int initialNodes() {
return 0;
}

@Disabled("https://issues.apache.org/jira/browse/IGNITE-25502")
@Test
void enterNodeWithIndexGreaterThanCurrentMajority() throws Exception {
cluster.startAndInit(3);
Expand Down Expand Up @@ -148,12 +149,21 @@ void enterNodeWithIndexGreaterThanCurrentMajority() throws Exception {

startNode(2);

// Node 2 has applied entries that the new majority (nodes 0, 1) doesn't have,
// so it must go to ERROR state when the leader tries to overwrite those entries.
await()
.timeout(10, TimeUnit.SECONDS)
.untilAsserted(() ->
assertThat(raftNodeImpl(2, replicationGroup).getState(), equalTo(State.STATE_ERROR))
);

// SQL should still work via the healthy majority.
assertThat(
toPeopleFromSqlRows(executeSql(selectPeopleDml(TABLE_NAME))),
arrayWithSize(Matchers.allOf(greaterThan(0), lessThan(people.length)))
);

for (int nodeIndex = 0; nodeIndex < 3; nodeIndex++) {
for (int nodeIndex = 0; nodeIndex < 2; nodeIndex++) {
assertThat(
"nodeIndex=" + nodeIndex,
scanPeopleFromAllPartitions(nodeIndex, TABLE_NAME),
Expand Down