Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -676,24 +676,22 @@ private static void rewriteDVFile(
String sourcePrefix,
String targetPrefix)
throws IOException {
List<Blob> rewrittenBlobs = Lists.newArrayList();
try (PuffinReader reader = Puffin.read(io.newInputFile(deleteFile.location())).build()) {
// Read all blobs and rewrite them with updated referenced data file paths
try (PuffinReader reader = Puffin.read(io.newInputFile(deleteFile.location())).build();
PuffinWriter writer =
Puffin.write(outputFile).createdBy(IcebergBuild.fullVersion()).build()) {
for (Pair<org.apache.iceberg.puffin.BlobMetadata, ByteBuffer> blobPair :
reader.readAll(reader.fileMetadata().blobs())) {
org.apache.iceberg.puffin.BlobMetadata blobMetadata = blobPair.first();
ByteBuffer blobData = blobPair.second();

// Get the original properties and update the referenced data file path
Map<String, String> properties = Maps.newHashMap(blobMetadata.properties());
String referencedDataFile = properties.get("referenced-data-file");
if (referencedDataFile != null && referencedDataFile.startsWith(sourcePrefix)) {
String newReferencedDataFile = newPath(referencedDataFile, sourcePrefix, targetPrefix);
properties.put("referenced-data-file", newReferencedDataFile);
}

// Create a new blob with updated properties
rewrittenBlobs.add(
writer.write(
new Blob(
blobMetadata.type(),
blobMetadata.inputFields(),
Expand All @@ -704,11 +702,6 @@ private static void rewriteDVFile(
properties));
}
}

try (PuffinWriter writer =
Puffin.write(outputFile).createdBy(IcebergBuild.fullVersion()).build()) {
rewrittenBlobs.forEach(writer::write);
}
}

private static PositionDelete newPositionDeleteRecord(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,24 @@
*/
package org.apache.iceberg;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.BlobMetadata;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinReader;
import org.apache.iceberg.puffin.PuffinWriter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.Pair;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -281,4 +293,75 @@ public void testRewritingMultiplePositionDeleteEntriesWithinManifestFile() throw

assertThat(deleteFileRewriteResult.toRewrite()).hasSize(2);
}

@TestTemplate
public void testRewritePositionDeleteFileRewritesDVBlobsWithoutBufferingAllBlobs()
throws IOException {
assumeThat(formatVersion).as("DVs require format version 3").isGreaterThanOrEqualTo(3);

String sourcePrefix = temp.resolve("source").toString();
String targetPrefix = temp.resolve("target").toString();
String sourceReferencedDataFile = sourcePrefix + "/data/file-a.parquet";
String unchangedReferencedDataFile = temp.resolve("external/data/file-b.parquet").toString();

OutputFile sourceDVFile =
Files.localOutput(temp.resolve("source/metadata/source-dv.puffin").toString());
try (PuffinWriter writer = Puffin.write(sourceDVFile).createdBy("Test 1234").build()) {
writer.write(newDVBlob(sourceReferencedDataFile, "blob-1", "alpha"));
writer.write(newDVBlob(unchangedReferencedDataFile, "blob-2", "beta"));
}

DeleteFile deleteFile =
FileMetadata.deleteFileBuilder(table.spec())
.ofPositionDeletes()
.withPath(sourceDVFile.location())
.withFileSizeInBytes(sourceDVFile.toInputFile().getLength())
.withPartition(FILE_A.partition())
.withRecordCount(2)
.withReferencedDataFile(sourceReferencedDataFile)
.withContentOffset(4)
.withContentSizeInBytes(9)
.build();

OutputFile rewrittenDVFile =
Files.localOutput(temp.resolve("target/metadata/rewritten-dv.puffin").toString());
RewriteTablePathUtil.rewritePositionDeleteFile(
deleteFile, rewrittenDVFile, table.io(), table.spec(), sourcePrefix, targetPrefix, null);

try (PuffinReader reader = Puffin.read(rewrittenDVFile.toInputFile()).build()) {
List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs();
assertThat(blobMetadata).hasSize(2);
assertThat(blobMetadata)
.extracting(BlobMetadata::properties)
.containsExactly(
ImmutableMap.of(
"blob-id",
"blob-1",
"referenced-data-file",
targetPrefix + "/data/file-a.parquet"),
ImmutableMap.of(
"blob-id", "blob-2", "referenced-data-file", unchangedReferencedDataFile));

assertThat(
readBlobData(reader, blobMetadata).stream()
.map(pair -> UTF_8.decode(pair.second()).toString()))
.containsExactly("alpha", "beta");
}
}

private static Blob newDVBlob(String referencedDataFile, String blobId, String data) {
return new Blob(
"dv",
ImmutableList.of(1),
1L,
1L,
ByteBuffer.wrap(data.getBytes(UTF_8)),
null,
ImmutableMap.of("blob-id", blobId, "referenced-data-file", referencedDataFile));
}

private static List<Pair<BlobMetadata, ByteBuffer>> readBlobData(
PuffinReader reader, List<BlobMetadata> blobMetadata) {
return ImmutableList.copyOf(reader.readAll(blobMetadata));
}
}
Loading