diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index b9ac0f9906ce..77b4f01ee5ea 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -423,21 +423,53 @@ public void endInput() throws IOException { } private void writeToManifestUptoLatestCheckpoint(long checkpointId) throws IOException { - if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) { + + if (!dataFilesPerCheckpoint.containsKey(checkpointId)) { dataFilesPerCheckpoint.put(checkpointId, EMPTY_MANIFEST_DATA); } - for (Map.Entry> writeResultsOfCheckpoint : - writeResultsSinceLastSnapshot.entrySet()) { - dataFilesPerCheckpoint.put( - writeResultsOfCheckpoint.getKey(), - writeToManifest(writeResultsOfCheckpoint.getKey(), writeResultsOfCheckpoint.getValue())); + Map> pendingWriteResults = Maps.newHashMap(); + for (Map.Entry> entry : writeResultsSinceLastSnapshot.entrySet()) { + long assignedCheckpointId = computeCheckpointId(checkpointId, entry); + pendingWriteResults + .computeIfAbsent(assignedCheckpointId, k -> Lists.newArrayList()) + .addAll(entry.getValue()); + } + + for (Map.Entry> entry : pendingWriteResults.entrySet()) { + dataFilesPerCheckpoint.put(entry.getKey(), writeToManifest(entry.getKey(), entry.getValue())); } // Clear the local buffer for current checkpoint. writeResultsSinceLastSnapshot.clear(); } + /** + * in case of unaligned checkpoints, data files that were part of checkpoint N in the writer may + * have to become part of a later checkpoint in the committer if: + * + *
    + *
  • previous files were already committed for checkpoint N. We have to keep the manifests for + * new files under a later key, otherwise they are discarded during recovery after a crash + *
  • we already have a manifest of files to be committed for checkpoint N, even though it + * might not have been committed yet. In this case, we must not overwrite the manifests we + * already have, and we must keep them consistent with our checkpoint + *
+ */ + private long computeCheckpointId(long checkpointId, Map.Entry> entry) { + long sourceCheckpointId = entry.getKey(); + + boolean sourceCheckpointIdAlreadyCommitted = sourceCheckpointId <= maxCommittedCheckpointId; + boolean sourceCheckpointIdHasDataInSnapshot = + dataFilesPerCheckpoint.containsKey(sourceCheckpointId); + // for aligned checkpoints, both conditions will be false and the upstream operator's checkpoint + // ID + // will be chosen. + return sourceCheckpointIdAlreadyCommitted || sourceCheckpointIdHasDataInSnapshot + ? checkpointId + : sourceCheckpointId; + } + /** * Write all the complete data files to a newly created manifest file and return the manifest's * avro serialized bytes. diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index abe77e795b09..dc7baefc5a9c 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -1076,6 +1076,220 @@ public void testSpecEvolution() throws Exception { } } + /** + * (unaligned checkpoints) With unaligned checkpoints a writer subtask may deliver its write + * result after {@code snapshotState(N)} has already fired. ("post-barrier data"). This test + * verifies that post-barrier data for a checkpoint that never completes is not lost, it must be + * committed together with the next successful checkpoint. Also covers the case where the + * successful checkpoint itself has post-barrier data, which must then wait for the checkpoint + * after that. + * + *
+   *   processElement(dataA, checkpointId=1)
+   *   snapshotState(1)
+   *   processElement(dataB, checkpointId=1)
+   *   // checkpoint 1 never completes
+   *   processElement(dataC, checkpointId=2)
+   *   snapshotState(2)
+   *   processElement(dataD, checkpointId=2)
+   *   notifyCheckpointComplete(2)   // commits dataA, dataB, dataC
+   *   snapshotState(3)
+   *   notifyCheckpointComplete(3)   // commits dataD
+   * 
+ */ + @TestTemplate + public void testPostBarrierDataSurvivesFailedCheckpoint() throws Exception { + long timestamp = 0; + JobID jobId = new JobID(); + OperatorID operatorId; + try (OneInputStreamOperatorTestHarness harness = + createStreamSink(jobId)) { + harness.setup(); + harness.open(); + operatorId = harness.getOperator().getOperatorID(); + + assertSnapshotSize(0); + assertMaxCommittedCheckpointId(jobId, operatorId, -1L); + + RowData rowA = SimpleDataUtil.createRowData(1, "early-checkpoint1"); + DataFile dataFileA = writeDataFile("data-A", ImmutableList.of(rowA)); + RowData rowB = SimpleDataUtil.createRowData(2, "post-barrier-checkpoint1"); + DataFile dataFileB = writeDataFile("data-B", ImmutableList.of(rowB)); + RowData rowC = SimpleDataUtil.createRowData(3, "early-checkpoint2"); + DataFile dataFileC = writeDataFile("data-C", ImmutableList.of(rowC)); + RowData rowD = SimpleDataUtil.createRowData(4, "post-barrier-checkpoint2"); + DataFile dataFileD = writeDataFile("data-D", ImmutableList.of(rowD)); + + long checkpoint1 = 1; + long checkpoint2 = 2; + long checkpoint3 = 3; + + harness.processElement(of(checkpoint1, dataFileA), ++timestamp); + harness.snapshot(checkpoint1, ++timestamp); + assertFlinkManifests(1); + + // post-barrier, arrives after snapshotState(1); checkpoint1 then FAILS (no notify) + harness.processElement(of(checkpoint1, dataFileB), ++timestamp); + + harness.processElement(of(checkpoint2, dataFileC), ++timestamp); + harness.snapshot(checkpoint2, ++timestamp); + + // post-barrier, arrives after snapshotState(2), before notify(2) + harness.processElement(of(checkpoint2, dataFileD), ++timestamp); + + harness.notifyOfCompletedCheckpoint(checkpoint2); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(rowA, rowB, rowC), branch); + assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint2); + + harness.snapshot(checkpoint3, ++timestamp); + harness.notifyOfCompletedCheckpoint(checkpoint3); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(rowA, rowB, rowC, rowD), branch); + assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint3); + } + } + + /** + * (unaligned checkpoints) Post-barrier data for checkpoint N arrives after {@code + * snapshotState(N)} but before {@code notifyCheckpointComplete(N)}, so it is not included in the + * Iceberg commit for checkpoint N. This test verifies that such data survives a crash/recovery: + * after notify(N) commits the pre-barrier data for N and the job crashes before notify(N+1), + * recovery from checkpoint N+1 must still commit the post-barrier data for N and any data + * snapshotted before the crash for N+1. + * + *
+   *   processElement(dataA, checkpointId=1)
+   *   snapshotState(1)
+   *   processElement(dataB, checkpointId=1)
+   *   notifyCheckpointComplete(1)           // commits dataA only
+   *   processElement(dataC, checkpointId=2)
+   *   snapshotState(2)
+   *   // crash, no notifyCheckpointComplete(2)
+   *   recover from checkpoint2              // dataB and dataC committed on initializeState
+   * 
+ */ + @TestTemplate + public void testPostBarrierDataMergedWithEarlyDataOnRecovery() throws Exception { + long timestamp = 0; + JobID jobId = new JobID(); + OperatorID operatorId; + OperatorSubtaskState snapshot; + + RowData rowA = SimpleDataUtil.createRowData(1, "pre-barrier-checkpoint1"); + DataFile dataFileA = writeDataFile("data-A", ImmutableList.of(rowA)); + RowData rowB = SimpleDataUtil.createRowData(2, "post-barrier-checkpoint1"); + DataFile dataFileB = writeDataFile("data-B", ImmutableList.of(rowB)); + RowData rowC = SimpleDataUtil.createRowData(3, "pre-barrier-checkpoint2"); + DataFile dataFileC = writeDataFile("data-C", ImmutableList.of(rowC)); + + long checkpoint1 = 1; + long checkpoint2 = 2; + + try (OneInputStreamOperatorTestHarness harness = + createStreamSink(jobId)) { + harness.setup(); + harness.open(); + operatorId = harness.getOperator().getOperatorID(); + + assertSnapshotSize(0); + assertMaxCommittedCheckpointId(jobId, operatorId, -1L); + + harness.processElement(of(checkpoint1, dataFileA), ++timestamp); + harness.snapshot(checkpoint1, ++timestamp); + assertFlinkManifests(1); + + // post-barrier, arrives after snapshotState(1), before notify(1) + harness.processElement(of(checkpoint1, dataFileB), ++timestamp); + + harness.notifyOfCompletedCheckpoint(checkpoint1); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(rowA), branch); + assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint1); + assertFlinkManifests(0); + + harness.processElement(of(checkpoint2, dataFileC), ++timestamp); + snapshot = harness.snapshot(checkpoint2, ++timestamp); + // crash, notifyCheckpointComplete(2) never fires + } + + try (OneInputStreamOperatorTestHarness harness = + createStreamSink(jobId)) { + harness.getStreamConfig().setOperatorID(operatorId); + harness.setup(); + harness.initializeState(snapshot); + harness.open(); + + SimpleDataUtil.assertTableRows(table, ImmutableList.of(rowA, rowB, rowC), branch); + assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint2); + } + } + + /** + * (unaligned checkpoints) Combines the failed-checkpoint scenario from {@link + * #testPostBarrierDataSurvivesFailedCheckpoint} with crash+recovery: checkpoint N fails and + * checkpoint N+1 also never completes (crash before notify). Recovery from checkpoint N+1 must + * commit both the pre-barrier data from the failed checkpoint N and the post-barrier data that + * arrived after snapshotState(N). + * + *
+   *   processElement(dataA, checkpointId=1)
+   *   snapshotState(1)
+   *   processElement(dataB, checkpointId=1)
+   *   // checkpoint 1 never completes
+   *   processElement(dataC, checkpointId=2)
+   *   snapshotState(2)
+   *   // crash, no notifyCheckpointComplete(2)
+   *   recover from checkpoint2    // all three files committed on initializeState
+   * 
+ */ + @TestTemplate + public void testPostBarrierDataForFailedCheckpointSurvivesRecovery() throws Exception { + long timestamp = 0; + JobID jobId = new JobID(); + OperatorID operatorId; + OperatorSubtaskState snapshot; + + RowData rowA = SimpleDataUtil.createRowData(1, "pre-barrier-checkpoint1"); + DataFile dataFileA = writeDataFile("data-A", ImmutableList.of(rowA)); + RowData rowB = SimpleDataUtil.createRowData(2, "post-barrier-checkpoint1"); + DataFile dataFileB = writeDataFile("data-B", ImmutableList.of(rowB)); + RowData rowC = SimpleDataUtil.createRowData(3, "pre-barrier-checkpoint2"); + DataFile dataFileC = writeDataFile("data-C", ImmutableList.of(rowC)); + + long checkpoint1 = 1; + long checkpoint2 = 2; + + try (OneInputStreamOperatorTestHarness harness = + createStreamSink(jobId)) { + harness.setup(); + harness.open(); + operatorId = harness.getOperator().getOperatorID(); + + assertSnapshotSize(0); + assertMaxCommittedCheckpointId(jobId, operatorId, -1L); + + harness.processElement(of(checkpoint1, dataFileA), ++timestamp); + harness.snapshot(checkpoint1, ++timestamp); + assertFlinkManifests(1); + + // post-barrier, arrives after snapshotState(1); checkpoint1 then fails (no notify) + harness.processElement(of(checkpoint1, dataFileB), ++timestamp); + + harness.processElement(of(checkpoint2, dataFileC), ++timestamp); + snapshot = harness.snapshot(checkpoint2, ++timestamp); + // crash, notifyCheckpointComplete(2) never fires + } + + try (OneInputStreamOperatorTestHarness harness = + createStreamSink(jobId)) { + harness.getStreamConfig().setOperatorID(operatorId); + harness.setup(); + harness.initializeState(snapshot); + harness.open(); + + SimpleDataUtil.assertTableRows(table, ImmutableList.of(rowA, rowB, rowC), branch); + assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint2); + } + } + private int getStagingManifestSpecId(OperatorStateStore operatorStateStore, long checkPointId) throws Exception { ListState> checkpointsState =