diff --git a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java index 435f79129204..f15ceb1d3d04 100644 --- a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java +++ b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java @@ -676,15 +676,14 @@ private static void rewriteDVFile( String sourcePrefix, String targetPrefix) throws IOException { - List rewrittenBlobs = Lists.newArrayList(); - try (PuffinReader reader = Puffin.read(io.newInputFile(deleteFile.location())).build()) { - // Read all blobs and rewrite them with updated referenced data file paths + try (PuffinReader reader = Puffin.read(io.newInputFile(deleteFile.location())).build(); + PuffinWriter writer = + Puffin.write(outputFile).createdBy(IcebergBuild.fullVersion()).build()) { for (Pair blobPair : reader.readAll(reader.fileMetadata().blobs())) { org.apache.iceberg.puffin.BlobMetadata blobMetadata = blobPair.first(); ByteBuffer blobData = blobPair.second(); - // Get the original properties and update the referenced data file path Map properties = Maps.newHashMap(blobMetadata.properties()); String referencedDataFile = properties.get("referenced-data-file"); if (referencedDataFile != null && referencedDataFile.startsWith(sourcePrefix)) { @@ -692,8 +691,7 @@ private static void rewriteDVFile( properties.put("referenced-data-file", newReferencedDataFile); } - // Create a new blob with updated properties - rewrittenBlobs.add( + writer.write( new Blob( blobMetadata.type(), blobMetadata.inputFields(), @@ -704,11 +702,6 @@ private static void rewriteDVFile( properties)); } } - - try (PuffinWriter writer = - Puffin.write(outputFile).createdBy(IcebergBuild.fullVersion()).build()) { - rewrittenBlobs.forEach(writer::write); - } } private static PositionDelete newPositionDeleteRecord( diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteTablePathUtil.java b/core/src/test/java/org/apache/iceberg/TestRewriteTablePathUtil.java index bedd8dd66d71..14bbc87e431f 100644 --- a/core/src/test/java/org/apache/iceberg/TestRewriteTablePathUtil.java +++ b/core/src/test/java/org/apache/iceberg/TestRewriteTablePathUtil.java @@ -18,12 +18,24 @@ */ package org.apache.iceberg; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assumptions.assumeThat; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; import java.util.Set; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.BlobMetadata; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinReader; +import org.apache.iceberg.puffin.PuffinWriter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.Pair; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -281,4 +293,75 @@ public void testRewritingMultiplePositionDeleteEntriesWithinManifestFile() throw assertThat(deleteFileRewriteResult.toRewrite()).hasSize(2); } + + @TestTemplate + public void testRewritePositionDeleteFileRewritesDVBlobsWithoutBufferingAllBlobs() + throws IOException { + assumeThat(formatVersion).as("DVs require format version 3").isGreaterThanOrEqualTo(3); + + String sourcePrefix = temp.resolve("source").toString(); + String targetPrefix = temp.resolve("target").toString(); + String sourceReferencedDataFile = sourcePrefix + "/data/file-a.parquet"; + String unchangedReferencedDataFile = temp.resolve("external/data/file-b.parquet").toString(); + + OutputFile sourceDVFile = + Files.localOutput(temp.resolve("source/metadata/source-dv.puffin").toString()); + try (PuffinWriter writer = Puffin.write(sourceDVFile).createdBy("Test 1234").build()) { + writer.write(newDVBlob(sourceReferencedDataFile, "blob-1", "alpha")); + writer.write(newDVBlob(unchangedReferencedDataFile, "blob-2", "beta")); + } + + DeleteFile deleteFile = + FileMetadata.deleteFileBuilder(table.spec()) + .ofPositionDeletes() + .withPath(sourceDVFile.location()) + .withFileSizeInBytes(sourceDVFile.toInputFile().getLength()) + .withPartition(FILE_A.partition()) + .withRecordCount(2) + .withReferencedDataFile(sourceReferencedDataFile) + .withContentOffset(4) + .withContentSizeInBytes(9) + .build(); + + OutputFile rewrittenDVFile = + Files.localOutput(temp.resolve("target/metadata/rewritten-dv.puffin").toString()); + RewriteTablePathUtil.rewritePositionDeleteFile( + deleteFile, rewrittenDVFile, table.io(), table.spec(), sourcePrefix, targetPrefix, null); + + try (PuffinReader reader = Puffin.read(rewrittenDVFile.toInputFile()).build()) { + List blobMetadata = reader.fileMetadata().blobs(); + assertThat(blobMetadata).hasSize(2); + assertThat(blobMetadata) + .extracting(BlobMetadata::properties) + .containsExactly( + ImmutableMap.of( + "blob-id", + "blob-1", + "referenced-data-file", + targetPrefix + "/data/file-a.parquet"), + ImmutableMap.of( + "blob-id", "blob-2", "referenced-data-file", unchangedReferencedDataFile)); + + assertThat( + readBlobData(reader, blobMetadata).stream() + .map(pair -> UTF_8.decode(pair.second()).toString())) + .containsExactly("alpha", "beta"); + } + } + + private static Blob newDVBlob(String referencedDataFile, String blobId, String data) { + return new Blob( + "dv", + ImmutableList.of(1), + 1L, + 1L, + ByteBuffer.wrap(data.getBytes(UTF_8)), + null, + ImmutableMap.of("blob-id", blobId, "referenced-data-file", referencedDataFile)); + } + + private static List> readBlobData( + PuffinReader reader, List blobMetadata) { + return ImmutableList.copyOf(reader.readAll(blobMetadata)); + } }