Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -423,21 +423,53 @@ public void endInput() throws IOException {
}

private void writeToManifestUptoLatestCheckpoint(long checkpointId) throws IOException {
if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) {

if (!dataFilesPerCheckpoint.containsKey(checkpointId)) {
dataFilesPerCheckpoint.put(checkpointId, EMPTY_MANIFEST_DATA);
}

for (Map.Entry<Long, List<WriteResult>> writeResultsOfCheckpoint :
writeResultsSinceLastSnapshot.entrySet()) {
dataFilesPerCheckpoint.put(
writeResultsOfCheckpoint.getKey(),
writeToManifest(writeResultsOfCheckpoint.getKey(), writeResultsOfCheckpoint.getValue()));
Map<Long, List<WriteResult>> pendingWriteResults = Maps.newHashMap();
for (Map.Entry<Long, List<WriteResult>> entry : writeResultsSinceLastSnapshot.entrySet()) {
long assignedCheckpointId = computeCheckpointId(checkpointId, entry);
pendingWriteResults
.computeIfAbsent(assignedCheckpointId, k -> Lists.newArrayList())
.addAll(entry.getValue());
}

for (Map.Entry<Long, List<WriteResult>> entry : pendingWriteResults.entrySet()) {
dataFilesPerCheckpoint.put(entry.getKey(), writeToManifest(entry.getKey(), entry.getValue()));
}

// Clear the local buffer for current checkpoint.
writeResultsSinceLastSnapshot.clear();
}

/**
* in case of unaligned checkpoints, data files that were part of checkpoint N in the writer may
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT

Suggested change
* in case of unaligned checkpoints, data files that were part of checkpoint N in the writer may
* In case of unaligned checkpoints, data files that were part of checkpoint N in the writer may

* have to become part of a later checkpoint in the committer if:
*
* <ul>
* <li>previous files were already committed for checkpoint N. We have to keep the manifests for
* new files under a later key, otherwise they are discarded during recovery after a crash
* <li>we already have a manifest of files to be committed for checkpoint N, even though it
* might not have been committed yet. In this case, we must not overwrite the manifests we
* already have, and we must keep them consistent with our checkpoint
* </ul>
*/
private long computeCheckpointId(long checkpointId, Map.Entry<Long, List<WriteResult>> entry) {
long sourceCheckpointId = entry.getKey();

boolean sourceCheckpointIdAlreadyCommitted = sourceCheckpointId <= maxCommittedCheckpointId;
boolean sourceCheckpointIdHasDataInSnapshot =
dataFilesPerCheckpoint.containsKey(sourceCheckpointId);
// for aligned checkpoints, both conditions will be false and the upstream operator's checkpoint
// ID
// will be chosen.
Comment on lines +466 to +467
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// ID
// will be chosen.
// ID will be chosen.

NIT

return sourceCheckpointIdAlreadyCommitted || sourceCheckpointIdHasDataInSnapshot
? checkpointId
: sourceCheckpointId;
Comment on lines +468 to +470
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct under all scenarios? This will mean that we include files of an older snapshot into the current checkpoint, but IMHO they need to be in the following snapshot (sourceCheckpointId + 1), not in the latest. There can be outstanding WriteResults for multiple snapshots, which have not been committed to Iceberg.

Otherwise, we violate the order in case of deletions.

}

/**
* Write all the complete data files to a newly created manifest file and return the manifest's
* avro serialized bytes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,220 @@ public void testSpecEvolution() throws Exception {
}
}

/**
* (unaligned checkpoints) With unaligned checkpoints a writer subtask may deliver its write
* result after {@code snapshotState(N)} has already fired. ("post-barrier data"). This test
* verifies that post-barrier data for a checkpoint that never completes is not lost, it must be
* committed together with the next successful checkpoint. Also covers the case where the
* successful checkpoint itself has post-barrier data, which must then wait for the checkpoint
* after that.
*
* <pre>
* processElement(dataA, checkpointId=1)
* snapshotState(1)
* processElement(dataB, checkpointId=1)
* // checkpoint 1 never completes
* processElement(dataC, checkpointId=2)
* snapshotState(2)
* processElement(dataD, checkpointId=2)
* notifyCheckpointComplete(2) // commits dataA, dataB, dataC
* snapshotState(3)
* notifyCheckpointComplete(3) // commits dataD
* </pre>
*/
@TestTemplate
public void testPostBarrierDataSurvivesFailedCheckpoint() throws Exception {
long timestamp = 0;
JobID jobId = new JobID();
OperatorID operatorId;
try (OneInputStreamOperatorTestHarness<FlinkWriteResult, Void> harness =
createStreamSink(jobId)) {
harness.setup();
harness.open();
operatorId = harness.getOperator().getOperatorID();

assertSnapshotSize(0);
assertMaxCommittedCheckpointId(jobId, operatorId, -1L);

RowData rowA = SimpleDataUtil.createRowData(1, "early-checkpoint1");
DataFile dataFileA = writeDataFile("data-A", ImmutableList.of(rowA));
RowData rowB = SimpleDataUtil.createRowData(2, "post-barrier-checkpoint1");
DataFile dataFileB = writeDataFile("data-B", ImmutableList.of(rowB));
RowData rowC = SimpleDataUtil.createRowData(3, "early-checkpoint2");
DataFile dataFileC = writeDataFile("data-C", ImmutableList.of(rowC));
RowData rowD = SimpleDataUtil.createRowData(4, "post-barrier-checkpoint2");
DataFile dataFileD = writeDataFile("data-D", ImmutableList.of(rowD));

long checkpoint1 = 1;
long checkpoint2 = 2;
long checkpoint3 = 3;

harness.processElement(of(checkpoint1, dataFileA), ++timestamp);
harness.snapshot(checkpoint1, ++timestamp);
assertFlinkManifests(1);

// post-barrier, arrives after snapshotState(1); checkpoint1 then FAILS (no notify)
harness.processElement(of(checkpoint1, dataFileB), ++timestamp);

harness.processElement(of(checkpoint2, dataFileC), ++timestamp);
harness.snapshot(checkpoint2, ++timestamp);

// post-barrier, arrives after snapshotState(2), before notify(2)
harness.processElement(of(checkpoint2, dataFileD), ++timestamp);

harness.notifyOfCompletedCheckpoint(checkpoint2);
SimpleDataUtil.assertTableRows(table, ImmutableList.of(rowA, rowB, rowC), branch);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint2);

harness.snapshot(checkpoint3, ++timestamp);
harness.notifyOfCompletedCheckpoint(checkpoint3);
SimpleDataUtil.assertTableRows(table, ImmutableList.of(rowA, rowB, rowC, rowD), branch);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint3);
}
}

/**
* (unaligned checkpoints) Post-barrier data for checkpoint N arrives after {@code
* snapshotState(N)} but before {@code notifyCheckpointComplete(N)}, so it is not included in the
* Iceberg commit for checkpoint N. This test verifies that such data survives a crash/recovery:
* after notify(N) commits the pre-barrier data for N and the job crashes before notify(N+1),
* recovery from checkpoint N+1 must still commit the post-barrier data for N and any data
* snapshotted before the crash for N+1.
*
* <pre>
* processElement(dataA, checkpointId=1)
* snapshotState(1)
* processElement(dataB, checkpointId=1)
* notifyCheckpointComplete(1) // commits dataA only
* processElement(dataC, checkpointId=2)
* snapshotState(2)
* // crash, no notifyCheckpointComplete(2)
* recover from checkpoint2 // dataB and dataC committed on initializeState
* </pre>
*/
@TestTemplate
public void testPostBarrierDataMergedWithEarlyDataOnRecovery() throws Exception {
long timestamp = 0;
JobID jobId = new JobID();
OperatorID operatorId;
OperatorSubtaskState snapshot;

RowData rowA = SimpleDataUtil.createRowData(1, "pre-barrier-checkpoint1");
DataFile dataFileA = writeDataFile("data-A", ImmutableList.of(rowA));
RowData rowB = SimpleDataUtil.createRowData(2, "post-barrier-checkpoint1");
DataFile dataFileB = writeDataFile("data-B", ImmutableList.of(rowB));
RowData rowC = SimpleDataUtil.createRowData(3, "pre-barrier-checkpoint2");
DataFile dataFileC = writeDataFile("data-C", ImmutableList.of(rowC));

long checkpoint1 = 1;
long checkpoint2 = 2;

try (OneInputStreamOperatorTestHarness<FlinkWriteResult, Void> harness =
createStreamSink(jobId)) {
harness.setup();
harness.open();
operatorId = harness.getOperator().getOperatorID();

assertSnapshotSize(0);
assertMaxCommittedCheckpointId(jobId, operatorId, -1L);

harness.processElement(of(checkpoint1, dataFileA), ++timestamp);
harness.snapshot(checkpoint1, ++timestamp);
assertFlinkManifests(1);

// post-barrier, arrives after snapshotState(1), before notify(1)
harness.processElement(of(checkpoint1, dataFileB), ++timestamp);

harness.notifyOfCompletedCheckpoint(checkpoint1);
SimpleDataUtil.assertTableRows(table, ImmutableList.of(rowA), branch);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint1);
assertFlinkManifests(0);

harness.processElement(of(checkpoint2, dataFileC), ++timestamp);
snapshot = harness.snapshot(checkpoint2, ++timestamp);
// crash, notifyCheckpointComplete(2) never fires
}

try (OneInputStreamOperatorTestHarness<FlinkWriteResult, Void> harness =
createStreamSink(jobId)) {
harness.getStreamConfig().setOperatorID(operatorId);
harness.setup();
harness.initializeState(snapshot);
harness.open();

SimpleDataUtil.assertTableRows(table, ImmutableList.of(rowA, rowB, rowC), branch);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint2);
}
}

/**
* (unaligned checkpoints) Combines the failed-checkpoint scenario from {@link
* #testPostBarrierDataSurvivesFailedCheckpoint} with crash+recovery: checkpoint N fails and
* checkpoint N+1 also never completes (crash before notify). Recovery from checkpoint N+1 must
* commit both the pre-barrier data from the failed checkpoint N and the post-barrier data that
* arrived after snapshotState(N).
*
* <pre>
* processElement(dataA, checkpointId=1)
* snapshotState(1)
* processElement(dataB, checkpointId=1)
* // checkpoint 1 never completes
* processElement(dataC, checkpointId=2)
* snapshotState(2)
* // crash, no notifyCheckpointComplete(2)
* recover from checkpoint2 // all three files committed on initializeState
* </pre>
*/
@TestTemplate
public void testPostBarrierDataForFailedCheckpointSurvivesRecovery() throws Exception {
long timestamp = 0;
JobID jobId = new JobID();
OperatorID operatorId;
OperatorSubtaskState snapshot;

RowData rowA = SimpleDataUtil.createRowData(1, "pre-barrier-checkpoint1");
DataFile dataFileA = writeDataFile("data-A", ImmutableList.of(rowA));
RowData rowB = SimpleDataUtil.createRowData(2, "post-barrier-checkpoint1");
DataFile dataFileB = writeDataFile("data-B", ImmutableList.of(rowB));
RowData rowC = SimpleDataUtil.createRowData(3, "pre-barrier-checkpoint2");
DataFile dataFileC = writeDataFile("data-C", ImmutableList.of(rowC));

long checkpoint1 = 1;
long checkpoint2 = 2;

try (OneInputStreamOperatorTestHarness<FlinkWriteResult, Void> harness =
createStreamSink(jobId)) {
harness.setup();
harness.open();
operatorId = harness.getOperator().getOperatorID();

assertSnapshotSize(0);
assertMaxCommittedCheckpointId(jobId, operatorId, -1L);

harness.processElement(of(checkpoint1, dataFileA), ++timestamp);
harness.snapshot(checkpoint1, ++timestamp);
assertFlinkManifests(1);

// post-barrier, arrives after snapshotState(1); checkpoint1 then fails (no notify)
harness.processElement(of(checkpoint1, dataFileB), ++timestamp);

harness.processElement(of(checkpoint2, dataFileC), ++timestamp);
snapshot = harness.snapshot(checkpoint2, ++timestamp);
// crash, notifyCheckpointComplete(2) never fires
}

try (OneInputStreamOperatorTestHarness<FlinkWriteResult, Void> harness =
createStreamSink(jobId)) {
harness.getStreamConfig().setOperatorID(operatorId);
harness.setup();
harness.initializeState(snapshot);
harness.open();

SimpleDataUtil.assertTableRows(table, ImmutableList.of(rowA, rowB, rowC), branch);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint2);
}
}

private int getStagingManifestSpecId(OperatorStateStore operatorStateStore, long checkPointId)
throws Exception {
ListState<SortedMap<Long, byte[]>> checkpointsState =
Expand Down
Loading