diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java index 4b8453c69ad..db8bf2cb588 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java @@ -20,6 +20,7 @@ import static java.lang.Math.toIntExact; import static java.nio.file.StandardOpenOption.CREATE_NEW; import static java.nio.file.StandardOpenOption.WRITE; +import static org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.GROUP_DESTROY_LOG_INDEX; import static org.apache.ignite.internal.util.IgniteUtils.atomicMoveFile; import static org.apache.ignite.internal.util.IgniteUtils.fsyncFile; @@ -137,7 +138,6 @@ class IndexFileManager { /** * Index file metadata grouped by Raft Group ID. */ - // FIXME: This map is never cleaned up, see https://issues.apache.org/jira/browse/IGNITE-27926. private final Map groupIndexMetas = new ConcurrentHashMap<>(); IndexFileManager(Path baseDir) throws IOException { @@ -451,6 +451,12 @@ private void putIndexFileMeta(IndexMetaSpec metaSpec) { long firstIndexKept = metaSpec.firstIndexKept(); + if (firstIndexKept == GROUP_DESTROY_LOG_INDEX) { + groupIndexMetas.remove(groupId); + + return; + } + GroupIndexMeta existingGroupIndexMeta = groupIndexMetas.get(groupId); if (existingGroupIndexMeta == null) { diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java index 6c9f952acb7..fb72aa22ac7 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java @@ -114,6 +114,11 @@ class SegmentFileManager implements ManuallyCloseable { */ static final byte[] SWITCH_SEGMENT_RECORD = new byte[8]; // 8 zero bytes. + /** + * Special "destroy group" sentinel value for the log reset index. + */ + static final long GROUP_DESTROY_LOG_INDEX = 0L; + private final String storageName; private final Path segmentFilesDir; @@ -422,6 +427,14 @@ void reset(long groupId, long nextLogIndex) throws IOException { } } + /** + * Destroys all log data for the given group. Writes a tombstone using {@link #GROUP_DESTROY_LOG_INDEX} so that the GC can discard + * the group's entries on the next compaction pass. + */ + void destroyGroup(long groupId) throws IOException { + reset(groupId, GROUP_DESTROY_LOG_INDEX); + } + private WriteBufferWithMemtable reserveBytesWithRollover(int size) throws IOException { while (true) { SegmentFileWithMemtable segmentFileWithMemtable = currentSegmentFile(); diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentLogStorageManager.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentLogStorageManager.java index ca663ce9037..95033ba9113 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentLogStorageManager.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentLogStorageManager.java @@ -70,7 +70,7 @@ public LogStorage createLogStorage(String groupId, RaftOptions raftOptions) { @Override public void destroyLogStorage(String groupId) { try { - fileManager.reset(convertGroupId(groupId), 1); + fileManager.destroyGroup(convertGroupId(groupId)); } catch (IOException e) { throw new LogStorageException("Failed to destroy log storage for group " + groupId, e); } diff --git a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java index 78453d5f3c6..3d70d171299 100644 --- a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java +++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.raft.storage.segstore; +import static org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.GROUP_DESTROY_LOG_INDEX; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -644,4 +645,54 @@ void testGetSegmentFilePointerReturnsNullForEmptyMetaRange() throws IOException assertThat(indexFileManager.getSegmentFilePointer(1, 2), is(nullValue())); } + + @Test + void testDestroyGroupSoleTombstoneClearsIndexMetaOnCheckpoint() throws IOException { + var memtable = new SingleThreadMemTable(); + memtable.appendSegmentFileOffset(0, 1, 10); + memtable.appendSegmentFileOffset(1, 1, 20); + indexFileManager.saveNewIndexMemtable(memtable); + + assertThat(indexFileManager.firstLogIndexInclusive(0), is(1L)); + assertThat(indexFileManager.firstLogIndexInclusive(1), is(1L)); + + // Sole tombstone: the segment file had no entries for group 0, only the destroy tombstone. + var destroyMemtable = new SingleThreadMemTable(); + destroyMemtable.reset(0, GROUP_DESTROY_LOG_INDEX); + indexFileManager.saveNewIndexMemtable(destroyMemtable); + + assertThat(indexFileManager.getSegmentFilePointer(0, 1), is(nullValue())); + assertThat(indexFileManager.firstLogIndexInclusive(0), is(-1L)); + assertThat(indexFileManager.lastLogIndexExclusive(0), is(-1L)); + + // Group 1 must be unaffected. + assertThat(indexFileManager.getSegmentFilePointer(1, 1), is(new SegmentFilePointer(new FileProperties(0), 20))); + assertThat(indexFileManager.firstLogIndexInclusive(1), is(1L)); + } + + @Test + void testDestroyGroupWithPriorDataClearsIndexMetaOnCheckpoint() throws IOException { + var memtable = new SingleThreadMemTable(); + memtable.appendSegmentFileOffset(0, 1, 10); + memtable.appendSegmentFileOffset(1, 1, 20); + indexFileManager.saveNewIndexMemtable(memtable); + + assertThat(indexFileManager.firstLogIndexInclusive(0), is(1L)); + + // Group 0 writes more entries in the next segment, then gets destroyed in the same segment. + var destroyMemtable = new SingleThreadMemTable(); + destroyMemtable.appendSegmentFileOffset(0, 2, 100); + destroyMemtable.appendSegmentFileOffset(0, 3, 200); + destroyMemtable.reset(0, GROUP_DESTROY_LOG_INDEX); + indexFileManager.saveNewIndexMemtable(destroyMemtable); + + assertThat(indexFileManager.getSegmentFilePointer(0, 1), is(nullValue())); + assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(nullValue())); + assertThat(indexFileManager.firstLogIndexInclusive(0), is(-1L)); + assertThat(indexFileManager.lastLogIndexExclusive(0), is(-1L)); + + // Group 1 must be unaffected. + assertThat(indexFileManager.getSegmentFilePointer(1, 1), is(new SegmentFilePointer(new FileProperties(0), 20))); + assertThat(indexFileManager.firstLogIndexInclusive(1), is(1L)); + } } diff --git a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java index 2ce340b12f2..06fcb3384ff 100644 --- a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java +++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java @@ -646,6 +646,90 @@ void testCompactionOfFileAdjacentToStaleEntryInDequeCausesCorruption() throws Ex }); } + /** + * Verifies that after {@link SegmentFileManager#destroyGroup} is called, the GC can remove segment files belonging + * to the destroyed group. + */ + @Test + void testSegmentFilesRemovedByGcAfterGroupDestroy() throws Exception { + List batches = createRandomData(FILE_SIZE / 4, 10); + + for (int i = 0; i < batches.size(); i++) { + appendBytes(GROUP_ID_1, batches.get(i), i); + } + + await().until(this::indexFiles, hasSize(equalTo(segmentFiles().size() - 1))); + + List oldSegmentFiles = segmentFiles().subList(0, segmentFiles().size() - 1); + List oldIndexFiles = indexFiles(); + + fileManager.destroyGroup(GROUP_ID_1); + + // The destroy tombstone is in the current segment file's memtable. Trigger a rollover using a different group so the + // checkpoint thread processes the tombstone and removes GROUP_ID_1 from the in-memory index. + triggerAndAwaitCheckpoint(GROUP_ID_2, 0); + + for (Path segmentFile : oldSegmentFiles) { + runCompaction(segmentFile); + } + + for (Path segmentFile : oldSegmentFiles) { + assertThat(segmentFile, not(exists())); + } + + for (Path indexFile : oldIndexFiles) { + assertThat(indexFile, not(exists())); + } + } + + /** + * Verifies that when a group is destroyed while another group shares the same segment files, GC compacts each shared file by dropping + * only the destroyed group's entries while preserving the surviving group's entries. + */ + @Test + void testSegmentFilesCompactedByGcAfterGroupDestroyWithSurvivingGroup() throws Exception { + List batches = createRandomData(FILE_SIZE / 4, 10); + + for (int i = 0; i < batches.size(); i++) { + appendBytes(GROUP_ID_1, batches.get(i), i); + appendBytes(GROUP_ID_2, batches.get(i), i); + } + + await().until(this::indexFiles, hasSize(equalTo(segmentFiles().size() - 1))); + + List oldSegmentFiles = segmentFiles().subList(0, segmentFiles().size() - 1); + + // Destroy one group while the other remains active. + fileManager.destroyGroup(GROUP_ID_1); + + // Trigger a rollover using GROUP_ID_2 so the checkpoint thread processes the tombstone and removes GROUP_ID_1 from + // the in-memory index, enabling GC to compact the shared segment files. + triggerAndAwaitCheckpoint(GROUP_ID_2, batches.size() - 1); + + for (Path segmentFile : oldSegmentFiles) { + FileProperties originalProperties = SegmentFile.fileProperties(segmentFile); + + runCompaction(segmentFile); + + // Original file must have been replaced by a compacted generation (GROUP_ID_2 entries survive). + assertThat(segmentFile, not(exists())); + + var newFileProperties = new FileProperties(originalProperties.ordinal(), originalProperties.generation() + 1); + + assertThat(fileManager.segmentFilesDir().resolve(SegmentFile.fileName(newFileProperties)), exists()); + } + + // GROUP_ID_2 entries must all still be readable. + for (int i = 0; i < batches.size(); i++) { + int index = i; + + fileManager.getEntry(GROUP_ID_2, i, bs -> { + assertThat(bs, is(batches.get(index))); + return null; + }); + } + } + @Test void testLogSizeBytesInitializedCorrectlyOnStartup() throws Exception { // Fill multiple segment files to create both segment and index files. @@ -717,13 +801,17 @@ public int size(LogEntry logEntry) { } private void triggerAndAwaitCheckpoint(long lastGroupIndex) throws IOException { + triggerAndAwaitCheckpoint(GROUP_ID_1, lastGroupIndex); + } + + private void triggerAndAwaitCheckpoint(long groupId, long lastGroupIndex) throws IOException { List segmentFilesBeforeCheckpoint = segmentFiles(); // Insert some entries to trigger a rollover (and a checkpoint). List batches = createRandomData(FILE_SIZE / 4, 5); for (int i = 0; i < batches.size(); i++) { - appendBytes(GROUP_ID_1, batches.get(i), lastGroupIndex + i + 1); + appendBytes(groupId, batches.get(i), lastGroupIndex + i + 1); } List segmentFilesAfterCheckpoint = segmentFiles();