diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java
new file mode 100644
index 000000000000..4e1a4f2883ad
--- /dev/null
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * Assigns destination metadata for each input record.
+ *
+ *
The output will have the format { {destination, partition}, data }
+ */
+class AssignDestinationsAndPartitions
+ extends PTransform, PCollection>> {
+
+ private final DynamicDestinations dynamicDestinations;
+ private final IcebergCatalogConfig catalogConfig;
+ static final String DESTINATION = "destination";
+ static final String PARTITION = "partition";
+ static final org.apache.beam.sdk.schemas.Schema OUTPUT_SCHEMA =
+ org.apache.beam.sdk.schemas.Schema.builder()
+ .addStringField(DESTINATION)
+ .addStringField(PARTITION)
+ .build();
+
+ public AssignDestinationsAndPartitions(
+ DynamicDestinations dynamicDestinations, IcebergCatalogConfig catalogConfig) {
+ this.dynamicDestinations = dynamicDestinations;
+ this.catalogConfig = catalogConfig;
+ }
+
+ @Override
+ public PCollection> expand(PCollection input) {
+ return input
+ .apply(ParDo.of(new AssignDoFn(dynamicDestinations, catalogConfig)))
+ .setCoder(
+ KvCoder.of(
+ RowCoder.of(OUTPUT_SCHEMA), RowCoder.of(dynamicDestinations.getDataSchema())));
+ }
+
+ static class AssignDoFn extends DoFn> {
+ static final Map PARTITION_KEYS = new ConcurrentHashMap<>();
+ static final Map WRAPPERS = new ConcurrentHashMap<>();
+ private final DynamicDestinations dynamicDestinations;
+ private final IcebergCatalogConfig catalogConfig;
+
+ AssignDoFn(DynamicDestinations dynamicDestinations, IcebergCatalogConfig catalogConfig) {
+ this.dynamicDestinations = dynamicDestinations;
+ this.catalogConfig = catalogConfig;
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element Row element,
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ @Timestamp Instant timestamp,
+ OutputReceiver> out) {
+ String tableIdentifier =
+ dynamicDestinations.getTableStringIdentifier(
+ ValueInSingleWindow.of(element, timestamp, window, paneInfo));
+ Row data = dynamicDestinations.getData(element);
+
+ @Nullable PartitionKey partitionKey = PARTITION_KEYS.get(tableIdentifier);
+ @Nullable BeamRowWrapper wrapper = WRAPPERS.get(tableIdentifier);
+ if (partitionKey == null || wrapper == null) {
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Schema schema = IcebergUtils.beamSchemaToIcebergSchema(data.getSchema());
+ @Nullable
+ IcebergTableCreateConfig createConfig =
+ dynamicDestinations.instantiateDestination(tableIdentifier).getTableCreateConfig();
+ if (createConfig != null && createConfig.getPartitionFields() != null) {
+ spec =
+ PartitionUtils.toPartitionSpec(createConfig.getPartitionFields(), data.getSchema());
+ } else {
+ try {
+ // see if table already exists with a spec
+ // TODO(ahmedabu98): improve this by periodically refreshing the table to fetch updated
+ // specs
+ spec = catalogConfig.catalog().loadTable(TableIdentifier.parse(tableIdentifier)).spec();
+ } catch (NoSuchTableException ignored) {
+ // no partition to apply
+ }
+ }
+ partitionKey = new PartitionKey(spec, schema);
+ wrapper = new BeamRowWrapper(data.getSchema(), schema.asStruct());
+ PARTITION_KEYS.put(tableIdentifier, partitionKey);
+ WRAPPERS.put(tableIdentifier, wrapper);
+ }
+ partitionKey.partition(wrapper.wrap(data));
+ String partitionPath = partitionKey.toPath();
+
+ Row destAndPartition =
+ Row.withSchema(OUTPUT_SCHEMA).addValues(tableIdentifier, partitionPath).build();
+ out.output(KV.of(destAndPartition, data));
+ }
+ }
+}
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BeamRowWrapper.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BeamRowWrapper.java
new file mode 100644
index 000000000000..ad7ec4b7b04f
--- /dev/null
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BeamRowWrapper.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.logicaltypes.Date;
+import org.apache.beam.sdk.schemas.logicaltypes.DateTime;
+import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric;
+import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
+import org.apache.beam.sdk.schemas.logicaltypes.Time;
+import org.apache.beam.sdk.values.Row;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.UUIDUtil;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class BeamRowWrapper implements StructLike {
+
+ private final FieldType[] types;
+ private final @Nullable PositionalGetter>[] getters;
+ private @Nullable Row row = null;
+
+ public BeamRowWrapper(Schema schema, Types.StructType struct) {
+ int size = schema.getFieldCount();
+
+ types = (FieldType[]) Array.newInstance(FieldType.class, size);
+ getters = (PositionalGetter[]) Array.newInstance(PositionalGetter.class, size);
+
+ for (int i = 0; i < size; i++) {
+ types[i] = schema.getField(i).getType();
+ getters[i] = buildGetter(types[i], struct.fields().get(i).type());
+ }
+ }
+
+ public BeamRowWrapper wrap(@Nullable Row row) {
+ this.row = row;
+ return this;
+ }
+
+ @Override
+ public int size() {
+ return types.length;
+ }
+
+ @Override
+ public @Nullable T get(int pos, Class javaClass) {
+ if (row == null || row.getValue(pos) == null) {
+ return null;
+ } else if (getters[pos] != null) {
+ return javaClass.cast(getters[pos].get(checkStateNotNull(row), pos));
+ }
+
+ return javaClass.cast(checkStateNotNull(row).getValue(pos));
+ }
+
+ @Override
+ public void set(int pos, T value) {
+ throw new UnsupportedOperationException(
+ "Could not set a field in the BeamRowWrapper because rowData is read-only");
+ }
+
+ private interface PositionalGetter {
+ T get(Row data, int pos);
+ }
+
+ private static @Nullable PositionalGetter> buildGetter(FieldType beamType, Type icebergType) {
+ switch (beamType.getTypeName()) {
+ case BYTE:
+ return Row::getByte;
+ case INT16:
+ return Row::getInt16;
+ case STRING:
+ return Row::getString;
+ case BYTES:
+ return (row, pos) -> {
+ byte[] bytes = checkStateNotNull(row.getBytes(pos));
+ if (Type.TypeID.UUID == icebergType.typeId()) {
+ return UUIDUtil.convert(bytes);
+ } else {
+ return ByteBuffer.wrap(bytes);
+ }
+ };
+ case DECIMAL:
+ return Row::getDecimal;
+ case DATETIME:
+ return (row, pos) ->
+ TimeUnit.MILLISECONDS.toMicros(checkStateNotNull(row.getDateTime(pos)).getMillis());
+ case ROW:
+ Schema beamSchema = checkStateNotNull(beamType.getRowSchema());
+ Types.StructType structType = (Types.StructType) icebergType;
+
+ BeamRowWrapper nestedWrapper = new BeamRowWrapper(beamSchema, structType);
+ return (row, pos) -> nestedWrapper.wrap(row.getRow(pos));
+ case LOGICAL_TYPE:
+ if (beamType.isLogicalType(MicrosInstant.IDENTIFIER)) {
+ return (row, pos) -> {
+ Instant instant = checkStateNotNull(row.getLogicalTypeValue(pos, Instant.class));
+ return TimeUnit.SECONDS.toMicros(instant.getEpochSecond()) + instant.getNano() / 1000;
+ };
+ } else if (beamType.isLogicalType(DateTime.IDENTIFIER)) {
+ return (row, pos) ->
+ DateTimeUtil.microsFromTimestamp(
+ checkStateNotNull(row.getLogicalTypeValue(pos, LocalDateTime.class)));
+ } else if (beamType.isLogicalType(Date.IDENTIFIER)) {
+ return (row, pos) ->
+ DateTimeUtil.daysFromDate(
+ checkStateNotNull(row.getLogicalTypeValue(pos, LocalDate.class)));
+ } else if (beamType.isLogicalType(Time.IDENTIFIER)) {
+ return (row, pos) ->
+ DateTimeUtil.microsFromTime(
+ checkStateNotNull(row.getLogicalTypeValue(pos, LocalTime.class)));
+ } else if (beamType.isLogicalType(FixedPrecisionNumeric.IDENTIFIER)) {
+ return (row, pos) -> row.getLogicalTypeValue(pos, BigDecimal.class);
+ } else {
+ return null;
+ }
+ default:
+ return null;
+ }
+ }
+}
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
index 1d71ad549094..529095b6ca71 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
@@ -381,7 +381,10 @@
public class IcebergIO {
public static WriteRows writeRows(IcebergCatalogConfig catalog) {
- return new AutoValue_IcebergIO_WriteRows.Builder().setCatalogConfig(catalog).build();
+ return new AutoValue_IcebergIO_WriteRows.Builder()
+ .setCatalogConfig(catalog)
+ .setGroupByPartitions(false)
+ .build();
}
@AutoValue
@@ -397,6 +400,8 @@ public abstract static class WriteRows extends PTransform, Iceb
abstract @Nullable Integer getDirectWriteByteLimit();
+ abstract boolean getGroupByPartitions();
+
abstract Builder toBuilder();
@AutoValue.Builder
@@ -411,6 +416,8 @@ abstract static class Builder {
abstract Builder setDirectWriteByteLimit(Integer directWriteByteLimit);
+ abstract Builder setGroupByPartitions(boolean GroupByPartitions);
+
abstract WriteRows build();
}
@@ -443,6 +450,15 @@ public WriteRows withDirectWriteByteLimit(Integer directWriteByteLimit) {
return toBuilder().setDirectWriteByteLimit(directWriteByteLimit).build();
}
+ /**
+ * Groups incoming rows by partition before sending to writes, ensuring that a given bundle is
+ * written to only one partition. For partitioned tables, this helps significantly to reduce the
+ * number of small files.
+ */
+ public WriteRows groupingByPartitions() {
+ return toBuilder().setGroupByPartitions(true).build();
+ }
+
@Override
public IcebergWriteResult expand(PCollection input) {
List> allToArgs = Arrays.asList(getTableIdentifier(), getDynamicDestinations());
@@ -464,15 +480,26 @@ public IcebergWriteResult expand(PCollection input) {
IcebergUtils.isUnbounded(input),
"Must only provide direct write limit for unbounded pipelines.");
}
- return input
- .apply("Assign Table Destinations", new AssignDestinations(destinations))
- .apply(
- "Write Rows to Destinations",
- new WriteToDestinations(
- getCatalogConfig(),
- destinations,
- getTriggeringFrequency(),
- getDirectWriteByteLimit()));
+
+ if (getGroupByPartitions()) {
+ return input
+ .apply(
+ "AssignDestinationAndPartition",
+ new AssignDestinationsAndPartitions(destinations, getCatalogConfig()))
+ .apply(
+ "Write Rows to Partitions",
+ new WriteToPartitions(getCatalogConfig(), destinations, getTriggeringFrequency()));
+ } else {
+ return input
+ .apply("Assign Table Destinations", new AssignDestinations(destinations))
+ .apply(
+ "Write Rows to Destinations",
+ new WriteToDestinations(
+ getCatalogConfig(),
+ destinations,
+ getTriggeringFrequency(),
+ getDirectWriteByteLimit()));
+ }
}
}
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
index 428ef71f23e5..d8615e128315 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
@@ -134,6 +134,8 @@ public static Builder builder() {
+ " please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties.")
public abstract @Nullable Map getTableProperties();
+ public abstract @Nullable Boolean getGroupByPartitions();
+
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setTable(String table);
@@ -158,6 +160,8 @@ public abstract static class Builder {
public abstract Builder setTableProperties(Map tableProperties);
+ public abstract Builder setGroupByPartitions(Boolean groupByPartitions);
+
public abstract Configuration build();
}
@@ -238,6 +242,11 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
writeTransform = writeTransform.withDirectWriteByteLimit(directWriteByteLimit);
}
+ @Nullable Boolean groupByPartitions = configuration.getGroupByPartitions();
+ if (groupByPartitions != null && groupByPartitions) {
+ writeTransform = writeTransform.groupingByPartitions();
+ }
+
// TODO: support dynamic destinations
IcebergWriteResult result = rows.apply(writeTransform);
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
index 82251c00e72e..fd3d5d63327c 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
@@ -23,6 +23,7 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.catalog.Catalog;
@@ -56,7 +57,7 @@ class RecordWriter {
partitionKey);
}
- RecordWriter(Table table, FileFormat fileFormat, String filename, PartitionKey partitionKey)
+ RecordWriter(Table table, FileFormat fileFormat, String filename, StructLike partitionKey)
throws IOException {
this.table = table;
this.fileFormat = fileFormat;
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java
new file mode 100644
index 000000000000..51f49530493c
--- /dev/null
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.beam.sdk.io.iceberg.AssignDestinationsAndPartitions.DESTINATION;
+import static org.apache.beam.sdk.io.iceberg.AssignDestinationsAndPartitions.PARTITION;
+import static org.apache.beam.sdk.io.iceberg.RecordWriterManager.getPartitionDataPath;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.ShardedKey;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class WritePartitionedRowsToFiles
+ extends PTransform<
+ PCollection, Iterable>>, PCollection> {
+ private static final Logger LOG = LoggerFactory.getLogger(WritePartitionedRowsToFiles.class);
+ private final DynamicDestinations dynamicDestinations;
+ private final IcebergCatalogConfig catalogConfig;
+ private final String filePrefix;
+
+ WritePartitionedRowsToFiles(
+ IcebergCatalogConfig catalogConfig,
+ DynamicDestinations dynamicDestinations,
+ String filePrefix) {
+ this.catalogConfig = catalogConfig;
+ this.dynamicDestinations = dynamicDestinations;
+ this.filePrefix = filePrefix;
+ }
+
+ @Override
+ public PCollection expand(
+ PCollection, Iterable>> input) {
+ Schema dataSchema =
+ ((RowCoder)
+ ((IterableCoder)
+ ((KvCoder, Iterable>) input.getCoder())
+ .getValueCoder())
+ .getElemCoder())
+ .getSchema();
+ return input.apply(
+ ParDo.of(new WriteDoFn(catalogConfig, dynamicDestinations, filePrefix, dataSchema)));
+ }
+
+ private static class WriteDoFn extends DoFn, Iterable>, FileWriteResult> {
+
+ private final DynamicDestinations dynamicDestinations;
+ private final IcebergCatalogConfig catalogConfig;
+ private final String filePrefix;
+ private final Schema dataSchema;
+ static final Cache LAST_REFRESHED_TABLE_CACHE =
+ CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build();
+
+ WriteDoFn(
+ IcebergCatalogConfig catalogConfig,
+ DynamicDestinations dynamicDestinations,
+ String filePrefix,
+ Schema dataSchema) {
+ this.catalogConfig = catalogConfig;
+ this.dynamicDestinations = dynamicDestinations;
+ this.filePrefix = filePrefix;
+ this.dataSchema = dataSchema;
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element KV, Iterable> element, OutputReceiver out)
+ throws Exception {
+ String tableIdentifier = checkStateNotNull(element.getKey().getKey().getString(DESTINATION));
+ String partitionPath = checkStateNotNull(element.getKey().getKey().getString(PARTITION));
+
+ IcebergDestination destination = dynamicDestinations.instantiateDestination(tableIdentifier);
+ Table table = getOrCreateTable(destination, dataSchema);
+
+ // TODO(ahmedabu98): cache this
+ Map partitionFieldMap = Maps.newHashMap();
+ for (PartitionField partitionField : table.spec().fields()) {
+ partitionFieldMap.put(partitionField.name(), partitionField);
+ }
+ partitionPath = getPartitionDataPath(partitionPath, partitionFieldMap);
+
+ StructLike partitionData =
+ table.spec().isPartitioned()
+ ? DataFiles.data(table.spec(), partitionPath)
+ : new PartitionKey(table.spec(), table.schema());
+
+ String fileName =
+ destination
+ .getFileFormat()
+ .addExtension(String.format("%s-%s", filePrefix, UUID.randomUUID()));
+
+ RecordWriter writer =
+ new RecordWriter(table, destination.getFileFormat(), fileName, partitionData);
+ for (Row row : element.getValue()) {
+ Record record = IcebergUtils.beamRowToIcebergRecord(table.schema(), row);
+ writer.write(record);
+ }
+ writer.close();
+
+ SerializableDataFile sdf = SerializableDataFile.from(writer.getDataFile(), partitionPath);
+ out.output(
+ FileWriteResult.builder()
+ .setTableIdentifier(destination.getTableIdentifier())
+ .setSerializableDataFile(sdf)
+ .build());
+ }
+
+ static final class LastRefreshedTable {
+ final Table table;
+ volatile Instant lastRefreshTime;
+ static final Duration STALENESS_THRESHOLD = Duration.ofMinutes(2);
+
+ LastRefreshedTable(Table table, Instant lastRefreshTime) {
+ this.table = table;
+ this.lastRefreshTime = lastRefreshTime;
+ }
+
+ /**
+ * Refreshes the table metadata if it is considered stale (older than 2 minutes).
+ *
+ *
This method first performs a non-synchronized check on the table's freshness. This
+ * provides a lock-free fast path that avoids synchronization overhead in the common case
+ * where the table does not need to be refreshed. If the table might be stale, it then enters
+ * a synchronized block to ensure that only one thread performs the refresh operation.
+ */
+ void refreshIfStale() {
+ // Fast path: Avoid entering the synchronized block if the table is not stale.
+ if (lastRefreshTime.isAfter(Instant.now().minus(STALENESS_THRESHOLD))) {
+ return;
+ }
+ synchronized (this) {
+ if (lastRefreshTime.isBefore(Instant.now().minus(STALENESS_THRESHOLD))) {
+ table.refresh();
+ lastRefreshTime = Instant.now();
+ }
+ }
+ }
+ }
+
+ Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) {
+ Catalog catalog = catalogConfig.catalog();
+ TableIdentifier identifier = destination.getTableIdentifier();
+ @Nullable
+ LastRefreshedTable lastRefreshedTable = LAST_REFRESHED_TABLE_CACHE.getIfPresent(identifier);
+ if (lastRefreshedTable != null && lastRefreshedTable.table != null) {
+ lastRefreshedTable.refreshIfStale();
+ return lastRefreshedTable.table;
+ }
+
+ Namespace namespace = identifier.namespace();
+ @Nullable IcebergTableCreateConfig createConfig = destination.getTableCreateConfig();
+ PartitionSpec partitionSpec =
+ createConfig != null ? createConfig.getPartitionSpec() : PartitionSpec.unpartitioned();
+ Map tableProperties =
+ createConfig != null && createConfig.getTableProperties() != null
+ ? createConfig.getTableProperties()
+ : Maps.newHashMap();
+
+ @Nullable Table table = null;
+ synchronized (LAST_REFRESHED_TABLE_CACHE) {
+ // Create namespace if it does not exist yet
+ if (!namespace.isEmpty() && catalog instanceof SupportsNamespaces) {
+ SupportsNamespaces supportsNamespaces = (SupportsNamespaces) catalog;
+ if (!supportsNamespaces.namespaceExists(namespace)) {
+ try {
+ supportsNamespaces.createNamespace(namespace);
+ LOG.info("Created new namespace '{}'.", namespace);
+ } catch (AlreadyExistsException ignored) {
+ // race condition: another worker already created this namespace
+ }
+ }
+ }
+
+ // If table exists, just load it
+ // Note: the implementation of catalog.tableExists() will load the table to check its
+ // existence. We don't use it here to avoid double loadTable() calls.
+ try {
+ table = catalog.loadTable(identifier);
+ } catch (NoSuchTableException e) { // Otherwise, create the table
+ org.apache.iceberg.Schema tableSchema =
+ IcebergUtils.beamSchemaToIcebergSchema(dataSchema);
+ try {
+ table = catalog.createTable(identifier, tableSchema, partitionSpec, tableProperties);
+ LOG.info(
+ "Created Iceberg table '{}' with schema: {}\n"
+ + ", partition spec: {}, table properties: {}",
+ identifier,
+ tableSchema,
+ partitionSpec,
+ tableProperties);
+ } catch (AlreadyExistsException ignored) {
+ // race condition: another worker already created this table
+ table = catalog.loadTable(identifier);
+ }
+ }
+ }
+ lastRefreshedTable = new LastRefreshedTable(table, Instant.now());
+ LAST_REFRESHED_TABLE_CACHE.put(identifier, lastRefreshedTable);
+ return table;
+ }
+ }
+}
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToPartitions.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToPartitions.java
new file mode 100644
index 000000000000..3f7054ac9ffc
--- /dev/null
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToPartitions.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import java.util.UUID;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.transforms.GroupIntoBatches;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.ShardedKey;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+
+class WriteToPartitions extends PTransform>, IcebergWriteResult> {
+ private static final long DEFAULT_BYTES_PER_FILE = (1L << 29); // 512mb
+ private final IcebergCatalogConfig catalogConfig;
+ private final DynamicDestinations dynamicDestinations;
+ private final @Nullable Duration triggeringFrequency;
+ private final String filePrefix;
+
+ WriteToPartitions(
+ IcebergCatalogConfig catalogConfig,
+ DynamicDestinations dynamicDestinations,
+ @Nullable Duration triggeringFrequency) {
+ this.dynamicDestinations = dynamicDestinations;
+ this.catalogConfig = catalogConfig;
+ this.triggeringFrequency = triggeringFrequency;
+ // single unique prefix per write transform
+ this.filePrefix = UUID.randomUUID().toString();
+ }
+
+ @Override
+ public IcebergWriteResult expand(PCollection> input) {
+ boolean unbounded = IcebergUtils.isUnbounded(input);
+
+ GroupIntoBatches groupIntoPartitions =
+ GroupIntoBatches.ofByteSize(DEFAULT_BYTES_PER_FILE);
+ if (unbounded && triggeringFrequency != null) {
+ groupIntoPartitions.withMaxBufferingDuration(triggeringFrequency);
+ }
+
+ PCollection, Iterable>> groupedRows =
+ input
+ .apply(groupIntoPartitions.withShardedKey())
+ .setCoder(
+ KvCoder.of(
+ org.apache.beam.sdk.util.ShardedKey.Coder.of(
+ RowCoder.of(AssignDestinationsAndPartitions.OUTPUT_SCHEMA)),
+ IterableCoder.of(RowCoder.of(dynamicDestinations.getDataSchema()))));
+
+ PCollection writtenFiles =
+ groupedRows.apply(
+ new WritePartitionedRowsToFiles(catalogConfig, dynamicDestinations, filePrefix));
+
+ // Commit files to tables
+ PCollection> snapshots =
+ writtenFiles.apply(new AppendFilesToTables(catalogConfig, filePrefix));
+
+ return new IcebergWriteResult(input.getPipeline(), snapshots);
+ }
+}
diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
index a7349bffdfa0..03f2ac156c97 100644
--- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
+++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.iceberg;
+import static java.util.Arrays.asList;
import static org.apache.beam.sdk.io.iceberg.IcebergUtils.beamRowToIcebergRecord;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertFalse;
@@ -64,14 +65,22 @@
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@RunWith(JUnit4.class)
+@RunWith(Parameterized.class)
public class IcebergIOWriteTest implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(IcebergIOWriteTest.class);
+ @Parameterized.Parameters
+ public static Iterable