Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 78 additions & 40 deletions core/src/main/java/org/apache/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand Down Expand Up @@ -80,6 +81,12 @@ enum TransactionType {
private boolean hasLastOpCommitted;
private final MetricsReporter reporter;

private Schema replaceSchema;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: This change turned out to be more breaking than I expected. If we want to proceed, see if this can be cleaned up

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Realise this is the sort of change that'd require a dev list discussion - I wanted to experiment with this approach first)

private PartitionSpec replaceSpec;
private SortOrder replaceSortOrder;
private String replaceLocation;
private Map<String, String> replaceProperties;

BaseTransaction(
String tableName, TableOperations ops, TransactionType type, TableMetadata start) {
this(tableName, ops, type, start, LoggingMetricsReporter.instance());
Expand All @@ -101,6 +108,17 @@ enum TransactionType {
this.type = type;
this.hasLastOpCommitted = true;
this.reporter = reporter;

// For replace-style transactions, the provided TableMetadata contains the information needed to
// build the replaced table state. This is stored so the replacement can be re-applied on top of
// refreshed metadata on commit.
if (type == TransactionType.REPLACE_TABLE || type == TransactionType.CREATE_OR_REPLACE_TABLE) {
this.replaceSchema = start.schema();
this.replaceSpec = start.spec();
this.replaceSortOrder = start.sortOrder();
this.replaceLocation = start.location();
this.replaceProperties = ImmutableMap.copyOf(start.properties());
}
}

@Override
Expand Down Expand Up @@ -260,12 +278,8 @@ public void commitTransaction() {
commitCreateTransaction();
break;

case REPLACE_TABLE:
commitReplaceTransaction(false);
break;

case CREATE_OR_REPLACE_TABLE:
commitReplaceTransaction(true);
case REPLACE_TABLE, CREATE_OR_REPLACE_TABLE:
commitReplaceTransaction();
break;

case SIMPLE:
Expand Down Expand Up @@ -298,9 +312,13 @@ private void commitCreateTransaction() {
}
}

private void commitReplaceTransaction(boolean orCreate) {
private void commitReplaceTransaction() {
Map<String, String> props = base != null ? base.properties() : current.properties();

Set<Long> startingSnapshots =
base != null
? base.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet())
: ImmutableSet.of();
try {
Tasks.foreach(ops)
.retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
Expand All @@ -315,40 +333,27 @@ private void commitReplaceTransaction(boolean orCreate) {
.onlyRetryOn(CommitFailedException.class)
.run(
underlyingOps -> {
try {
underlyingOps.refresh();
} catch (NoSuchTableException e) {
if (!orCreate) {
throw e;
}
}

// because this is a replace table, it will always completely replace the table
// metadata. even if it was just updated.
if (base != underlyingOps.current()) {
this.base = underlyingOps.current(); // just refreshed
}
applyUpdates(underlyingOps);

underlyingOps.commit(base, current);
});

} catch (CommitStateUnknownException e) {
throw e;

} catch (PendingUpdateFailedException e) {
cleanUpOnCommitFailure();
throw e.wrapped();

} catch (RuntimeException e) {
// the commit failed and no files were committed. clean up each update.
if (!ops.requireStrictCleanup() || e instanceof CleanableFailure) {
cleanAllUpdates();
cleanUpOnCommitFailure();
}

throw e;

} finally {
// replace table never needs to retry because the table state is completely replaced. because
// retries are not
// a concern, it is safe to delete all the deleted files from individual operations
deleteUncommittedFiles(deletedFiles);
}

cleanUpAfterCommitSuccess(startingSnapshots);
}

private void commitSimpleTransaction() {
Expand Down Expand Up @@ -381,6 +386,7 @@ private void commitSimpleTransaction() {
} catch (PendingUpdateFailedException e) {
cleanUpOnCommitFailure();
throw e.wrapped();

} catch (RuntimeException e) {
if (!ops.requireStrictCleanup() || e instanceof CleanableFailure) {
cleanUpOnCommitFailure();
Expand All @@ -389,8 +395,18 @@ private void commitSimpleTransaction() {
throw e;
}

// the commit succeeded
cleanUpAfterCommitSuccess(startingSnapshots);
}

private void cleanUpOnCommitFailure() {
// the commit failed and no files were committed. clean up each update.
cleanAllUpdates();

// delete all the uncommitted files
deleteUncommittedFiles(deletedFiles);
}

private void cleanUpAfterCommitSuccess(Set<Long> startingSnapshots) {
try {
// clean up the data files that were deleted by each operation. first, get the list of
// committed manifests to ensure that no committed manifest is deleted.
Expand Down Expand Up @@ -420,14 +436,6 @@ private void commitSimpleTransaction() {
}
}

private void cleanUpOnCommitFailure() {
// the commit failed and no files were committed. clean up each update.
cleanAllUpdates();

// delete all the uncommitted files
deleteUncommittedFiles(deletedFiles);
}

private void cleanAllUpdates() {
Tasks.foreach(updates)
.suppressFailureWhenFinished()
Expand Down Expand Up @@ -459,10 +467,40 @@ private void deleteUncommittedFiles(Iterable<String> paths) {
}

private void applyUpdates(TableOperations underlyingOps) {
if (base != underlyingOps.refresh()) {
// use refreshed the metadata
try {
underlyingOps.refresh();
Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: TestHiveCreateReplaceTable#testCreateOrReplaceTableTxnTableDeletedConcurrently shows an NPE where this refresh actually returns null instead of NoSuchTableException being thrown. Consider handling that here, fixing if it's a bug, or leaving for now (as that's maybe how concurrent appends with dropped failed prior to this PR)

} catch (NoSuchTableException e) {
if (type == TransactionType.CREATE_OR_REPLACE_TABLE) {
return;
}
throw e;
}

if (base != underlyingOps.current()) {
// use the refreshed metadata
this.base = underlyingOps.current();
this.current = underlyingOps.current();

this.current =
switch (type) {
case REPLACE_TABLE, CREATE_OR_REPLACE_TABLE ->
// Even if we are dealing with a replace-style transaction, we need to re-apply
// updates on top of the refreshed metadata's replacement, because of (1) possible
// row lineage requirements, and (2) to not overwrite the metadata with an outdated
// replacement that may cause history loss or table corruption.
underlyingOps
.current()
.buildReplacement(
replaceSchema,
replaceSpec,
replaceSortOrder,
replaceLocation,
replaceProperties);
case SIMPLE -> underlyingOps.current();
case CREATE_TABLE ->
throw new IllegalStateException(
"Transaction update application not expected for create transactions");
};

for (PendingUpdate update : updates) {
// re-commit each update in the chain to apply it and update current
try {
Expand Down
13 changes: 10 additions & 3 deletions core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -2655,15 +2655,20 @@ public void testReplaceTableKeepsSnapshotLog() {
.containsExactly(snapshotBeforeReplace, snapshotAfterReplace);
}

@Test
public void testConcurrentReplaceTransactions() {
@ParameterizedTest
@ValueSource(ints = {2, 3})
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#15091 shows the failure of this V3 test, prior to this PR

public void testConcurrentReplaceTransactions(int formatVersion) {
C catalog = catalog();

if (requiresNamespaceCreate()) {
catalog.createNamespace(NS);
}

Transaction transaction = catalog.buildTable(TABLE, SCHEMA).createTransaction();
Transaction transaction =
catalog
.buildTable(TABLE, SCHEMA)
.withProperty("format-version", String.valueOf(formatVersion))
.createTransaction();
transaction.newFastAppend().appendFile(FILE_A).commit();
transaction.commitTransaction();

Expand Down Expand Up @@ -2691,6 +2696,8 @@ public void testConcurrentReplaceTransactions() {
secondReplace.commitTransaction();

Table afterSecondReplace = catalog.loadTable(TABLE);
// All three successfully committed snapshots should be present
assertThat(afterSecondReplace.snapshots()).hasSize(3);
Comment on lines +2699 to +2700
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#15090 shows the failure of this added line, prior to this PR

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you just add a new test please to show where exactly stuff fails with V3?

Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense - I've updated the PR description in the meantime to cover this.
(Any concurrent change to a table's snapshots causes the replace transaction to fail entirely for the REST catalog, due to server-side row-lineage validation. I'll put up an issue to track, actually)

assertThat(afterSecondReplace.schema().asStruct())
.as("Table schema should match the original schema")
.isEqualTo(original.schema().asStruct());
Expand Down
Loading