Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static java.lang.Math.toIntExact;
import static java.nio.file.StandardOpenOption.CREATE_NEW;
import static java.nio.file.StandardOpenOption.WRITE;
import static org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.GROUP_DESTROY_LOG_INDEX;
import static org.apache.ignite.internal.util.IgniteUtils.atomicMoveFile;
import static org.apache.ignite.internal.util.IgniteUtils.fsyncFile;

Expand Down Expand Up @@ -137,7 +138,6 @@ class IndexFileManager {
/**
* Index file metadata grouped by Raft Group ID.
*/
// FIXME: This map is never cleaned up, see https://issues.apache.org/jira/browse/IGNITE-27926.
private final Map<Long, GroupIndexMeta> groupIndexMetas = new ConcurrentHashMap<>();

IndexFileManager(Path baseDir) throws IOException {
Expand Down Expand Up @@ -451,6 +451,12 @@ private void putIndexFileMeta(IndexMetaSpec metaSpec) {

long firstIndexKept = metaSpec.firstIndexKept();

if (firstIndexKept == GROUP_DESTROY_LOG_INDEX) {
groupIndexMetas.remove(groupId);

return;
}
Comment on lines 452 to +458
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This removal is keyed off firstIndexKept == GROUP_DESTROY_LOG_INDEX, but firstIndexKept is also used for regular prefix/reset tombstones and can legitimately be 0 if a group ever uses log index 0 (the segstore tests do). In that case, this would incorrectly drop all in-memory index metadata for an active group, making entries unreachable and potentially causing the GC to delete/compact files incorrectly. Please make the “group destroyed” signal unambiguous (e.g., a separate persisted flag/record marker) or add a strict invariant (validated at write/reset boundaries) that log indices are always >= 1 so 0 can never occur except as a destroy marker.

Copilot uses AI. Check for mistakes.

GroupIndexMeta existingGroupIndexMeta = groupIndexMetas.get(groupId);

if (existingGroupIndexMeta == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ class SegmentFileManager implements ManuallyCloseable {
*/
static final byte[] SWITCH_SEGMENT_RECORD = new byte[8]; // 8 zero bytes.

/**
* Special "destroy group" sentinel value for the log reset index.
*/
static final long GROUP_DESTROY_LOG_INDEX = 0L;
Comment on lines +119 to +120
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GROUP_DESTROY_LOG_INDEX is set to 0, but the segstore code does not enforce log indices to be > 0 (tests in this module regularly append index 0). Overloading a valid log index as a destroy sentinel can lead to ambiguous semantics (e.g., reset may keep the entry at index 0 instead of fully discarding the group) and can also poison recovery: firstLogIndexInclusiveOnRecovery prefers segmentInfo.firstIndexKept(), so after a destroy tombstone it can return 0, which makes SegstoreLogStorage.getFirstLogIndex() return 0 (JRaft expects 1 for an empty log). Consider making the destroy marker unambiguous (dedicated record type/flag) or enforce/validate that all stored log indices are >= 1 and handle the destroy tombstone explicitly on recovery so destroyed groups recover as empty (first=1,last=0).

Suggested change
*/
static final long GROUP_DESTROY_LOG_INDEX = 0L;
* Must not overlap with a valid stored log index.
*/
static final long GROUP_DESTROY_LOG_INDEX = -1L;

Copilot uses AI. Check for mistakes.

private final String storageName;

private final Path segmentFilesDir;
Expand Down Expand Up @@ -422,6 +427,14 @@ void reset(long groupId, long nextLogIndex) throws IOException {
}
}

/**
* Destroys all log data for the given group. Writes a tombstone using {@link #GROUP_DESTROY_LOG_INDEX} so that the GC can discard
* the group's entries on the next compaction pass.
*/
void destroyGroup(long groupId) throws IOException {
reset(groupId, GROUP_DESTROY_LOG_INDEX);
}

private WriteBufferWithMemtable reserveBytesWithRollover(int size) throws IOException {
while (true) {
SegmentFileWithMemtable segmentFileWithMemtable = currentSegmentFile();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public LogStorage createLogStorage(String groupId, RaftOptions raftOptions) {
@Override
public void destroyLogStorage(String groupId) {
try {
fileManager.reset(convertGroupId(groupId), 1);
fileManager.destroyGroup(convertGroupId(groupId));
} catch (IOException e) {
throw new LogStorageException("Failed to destroy log storage for group " + groupId, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.raft.storage.segstore;

import static org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.GROUP_DESTROY_LOG_INDEX;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
Expand Down Expand Up @@ -644,4 +645,54 @@ void testGetSegmentFilePointerReturnsNullForEmptyMetaRange() throws IOException

assertThat(indexFileManager.getSegmentFilePointer(1, 2), is(nullValue()));
}

@Test
void testDestroyGroupSoleTombstoneClearsIndexMetaOnCheckpoint() throws IOException {
var memtable = new SingleThreadMemTable();
memtable.appendSegmentFileOffset(0, 1, 10);
memtable.appendSegmentFileOffset(1, 1, 20);
indexFileManager.saveNewIndexMemtable(memtable);

assertThat(indexFileManager.firstLogIndexInclusive(0), is(1L));
assertThat(indexFileManager.firstLogIndexInclusive(1), is(1L));

// Sole tombstone: the segment file had no entries for group 0, only the destroy tombstone.
var destroyMemtable = new SingleThreadMemTable();
destroyMemtable.reset(0, GROUP_DESTROY_LOG_INDEX);
indexFileManager.saveNewIndexMemtable(destroyMemtable);

assertThat(indexFileManager.getSegmentFilePointer(0, 1), is(nullValue()));
assertThat(indexFileManager.firstLogIndexInclusive(0), is(-1L));
assertThat(indexFileManager.lastLogIndexExclusive(0), is(-1L));

// Group 1 must be unaffected.
assertThat(indexFileManager.getSegmentFilePointer(1, 1), is(new SegmentFilePointer(new FileProperties(0), 20)));
assertThat(indexFileManager.firstLogIndexInclusive(1), is(1L));
}

@Test
void testDestroyGroupWithPriorDataClearsIndexMetaOnCheckpoint() throws IOException {
var memtable = new SingleThreadMemTable();
memtable.appendSegmentFileOffset(0, 1, 10);
memtable.appendSegmentFileOffset(1, 1, 20);
indexFileManager.saveNewIndexMemtable(memtable);

assertThat(indexFileManager.firstLogIndexInclusive(0), is(1L));

// Group 0 writes more entries in the next segment, then gets destroyed in the same segment.
var destroyMemtable = new SingleThreadMemTable();
destroyMemtable.appendSegmentFileOffset(0, 2, 100);
destroyMemtable.appendSegmentFileOffset(0, 3, 200);
destroyMemtable.reset(0, GROUP_DESTROY_LOG_INDEX);
indexFileManager.saveNewIndexMemtable(destroyMemtable);

assertThat(indexFileManager.getSegmentFilePointer(0, 1), is(nullValue()));
assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(nullValue()));
assertThat(indexFileManager.firstLogIndexInclusive(0), is(-1L));
assertThat(indexFileManager.lastLogIndexExclusive(0), is(-1L));

// Group 1 must be unaffected.
assertThat(indexFileManager.getSegmentFilePointer(1, 1), is(new SegmentFilePointer(new FileProperties(0), 20)));
assertThat(indexFileManager.firstLogIndexInclusive(1), is(1L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,90 @@ void testCompactionOfFileAdjacentToStaleEntryInDequeCausesCorruption() throws Ex
});
}

/**
* Verifies that after {@link SegmentFileManager#destroyGroup} is called, the GC can remove segment files belonging
* to the destroyed group.
*/
@Test
void testSegmentFilesRemovedByGcAfterGroupDestroy() throws Exception {
List<byte[]> batches = createRandomData(FILE_SIZE / 4, 10);

for (int i = 0; i < batches.size(); i++) {
appendBytes(GROUP_ID_1, batches.get(i), i);
}

await().until(this::indexFiles, hasSize(equalTo(segmentFiles().size() - 1)));

List<Path> oldSegmentFiles = segmentFiles().subList(0, segmentFiles().size() - 1);
List<Path> oldIndexFiles = indexFiles();

fileManager.destroyGroup(GROUP_ID_1);

// The destroy tombstone is in the current segment file's memtable. Trigger a rollover using a different group so the
// checkpoint thread processes the tombstone and removes GROUP_ID_1 from the in-memory index.
triggerAndAwaitCheckpoint(GROUP_ID_2, 0);

Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests verify that segment/index files are deleted/compacted after destroyGroup, but they don’t verify the primary correctness property: entries from the destroyed group must become unreadable via getEntry. Because the test data uses log index 0, a destroy implemented as reset(..., 0) can still leave index 0 readable (depending on existing memtable state), which would not be caught here. Please add assertions that getEntry(GROUP_ID_1, i) returns null for the previously appended indices (including 0), and/or consider using 1-based indices in the test to match JRaft’s empty-log contract (first=1,last=0).

Suggested change
for (int i = 0; i < batches.size(); i++) {
assertThat(getEntry(GROUP_ID_1, i), is((LogEntry) null));
}

Copilot uses AI. Check for mistakes.
for (Path segmentFile : oldSegmentFiles) {
runCompaction(segmentFile);
}

for (Path segmentFile : oldSegmentFiles) {
assertThat(segmentFile, not(exists()));
}

for (Path indexFile : oldIndexFiles) {
assertThat(indexFile, not(exists()));
}
}

/**
* Verifies that when a group is destroyed while another group shares the same segment files, GC compacts each shared file by dropping
* only the destroyed group's entries while preserving the surviving group's entries.
*/
@Test
void testSegmentFilesCompactedByGcAfterGroupDestroyWithSurvivingGroup() throws Exception {
List<byte[]> batches = createRandomData(FILE_SIZE / 4, 10);

for (int i = 0; i < batches.size(); i++) {
appendBytes(GROUP_ID_1, batches.get(i), i);
appendBytes(GROUP_ID_2, batches.get(i), i);
}

await().until(this::indexFiles, hasSize(equalTo(segmentFiles().size() - 1)));

List<Path> oldSegmentFiles = segmentFiles().subList(0, segmentFiles().size() - 1);

// Destroy one group while the other remains active.
fileManager.destroyGroup(GROUP_ID_1);

// Trigger a rollover using GROUP_ID_2 so the checkpoint thread processes the tombstone and removes GROUP_ID_1 from
// the in-memory index, enabling GC to compact the shared segment files.
triggerAndAwaitCheckpoint(GROUP_ID_2, batches.size() - 1);

for (Path segmentFile : oldSegmentFiles) {
FileProperties originalProperties = SegmentFile.fileProperties(segmentFile);

runCompaction(segmentFile);

// Original file must have been replaced by a compacted generation (GROUP_ID_2 entries survive).
assertThat(segmentFile, not(exists()));

var newFileProperties = new FileProperties(originalProperties.ordinal(), originalProperties.generation() + 1);

assertThat(fileManager.segmentFilesDir().resolve(SegmentFile.fileName(newFileProperties)), exists());
}

// GROUP_ID_2 entries must all still be readable.
for (int i = 0; i < batches.size(); i++) {
int index = i;

fileManager.getEntry(GROUP_ID_2, i, bs -> {
assertThat(bs, is(batches.get(index)));
return null;
});
}
}

@Test
void testLogSizeBytesInitializedCorrectlyOnStartup() throws Exception {
// Fill multiple segment files to create both segment and index files.
Expand Down Expand Up @@ -717,13 +801,17 @@ public int size(LogEntry logEntry) {
}

private void triggerAndAwaitCheckpoint(long lastGroupIndex) throws IOException {
triggerAndAwaitCheckpoint(GROUP_ID_1, lastGroupIndex);
}

private void triggerAndAwaitCheckpoint(long groupId, long lastGroupIndex) throws IOException {
List<Path> segmentFilesBeforeCheckpoint = segmentFiles();

// Insert some entries to trigger a rollover (and a checkpoint).
List<byte[]> batches = createRandomData(FILE_SIZE / 4, 5);

for (int i = 0; i < batches.size(); i++) {
appendBytes(GROUP_ID_1, batches.get(i), lastGroupIndex + i + 1);
appendBytes(groupId, batches.get(i), lastGroupIndex + i + 1);
}

List<Path> segmentFilesAfterCheckpoint = segmentFiles();
Expand Down