Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.iceberg;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;

/**
* Assigns destination metadata for each input record.
*
* <p>The output will have the format { {destination, partition}, data }
*/
class AssignDestinationsAndPartitions
extends PTransform<PCollection<Row>, PCollection<KV<Row, Row>>> {

private final DynamicDestinations dynamicDestinations;
private final IcebergCatalogConfig catalogConfig;
static final String DESTINATION = "destination";
static final String PARTITION = "partition";
static final org.apache.beam.sdk.schemas.Schema OUTPUT_SCHEMA =
org.apache.beam.sdk.schemas.Schema.builder()
.addStringField(DESTINATION)
.addStringField(PARTITION)
.build();

public AssignDestinationsAndPartitions(
DynamicDestinations dynamicDestinations, IcebergCatalogConfig catalogConfig) {
this.dynamicDestinations = dynamicDestinations;
this.catalogConfig = catalogConfig;
}

@Override
public PCollection<KV<Row, Row>> expand(PCollection<Row> input) {
return input
.apply(ParDo.of(new AssignDoFn(dynamicDestinations, catalogConfig)))
.setCoder(
KvCoder.of(
RowCoder.of(OUTPUT_SCHEMA), RowCoder.of(dynamicDestinations.getDataSchema())));
}

static class AssignDoFn extends DoFn<Row, KV<Row, Row>> {
static final Map<String, PartitionKey> PARTITION_KEYS = new ConcurrentHashMap<>();
static final Map<String, BeamRowWrapper> WRAPPERS = new ConcurrentHashMap<>();
private final DynamicDestinations dynamicDestinations;
private final IcebergCatalogConfig catalogConfig;

AssignDoFn(DynamicDestinations dynamicDestinations, IcebergCatalogConfig catalogConfig) {
this.dynamicDestinations = dynamicDestinations;
this.catalogConfig = catalogConfig;
}

@ProcessElement
public void processElement(
@Element Row element,
BoundedWindow window,
PaneInfo paneInfo,
@Timestamp Instant timestamp,
OutputReceiver<KV<Row, Row>> out) {
String tableIdentifier =
dynamicDestinations.getTableStringIdentifier(
ValueInSingleWindow.of(element, timestamp, window, paneInfo));
Row data = dynamicDestinations.getData(element);

@Nullable PartitionKey partitionKey = PARTITION_KEYS.get(tableIdentifier);
@Nullable BeamRowWrapper wrapper = WRAPPERS.get(tableIdentifier);
if (partitionKey == null || wrapper == null) {
PartitionSpec spec = PartitionSpec.unpartitioned();
Schema schema = IcebergUtils.beamSchemaToIcebergSchema(data.getSchema());
@Nullable
IcebergTableCreateConfig createConfig =
dynamicDestinations.instantiateDestination(tableIdentifier).getTableCreateConfig();
if (createConfig != null && createConfig.getPartitionFields() != null) {
spec =
PartitionUtils.toPartitionSpec(createConfig.getPartitionFields(), data.getSchema());
} else {
try {
// see if table already exists with a spec
// TODO(ahmedabu98): improve this by periodically refreshing the table to fetch updated
// specs
spec = catalogConfig.catalog().loadTable(TableIdentifier.parse(tableIdentifier)).spec();
} catch (NoSuchTableException ignored) {
// no partition to apply
}
}
partitionKey = new PartitionKey(spec, schema);
wrapper = new BeamRowWrapper(data.getSchema(), schema.asStruct());
PARTITION_KEYS.put(tableIdentifier, partitionKey);
WRAPPERS.put(tableIdentifier, wrapper);
}
partitionKey.partition(wrapper.wrap(data));
String partitionPath = partitionKey.toPath();

Row destAndPartition =
Row.withSchema(OUTPUT_SCHEMA).addValues(tableIdentifier, partitionPath).build();
out.output(KV.of(destAndPartition, data));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.logicaltypes.Date;
import org.apache.beam.sdk.schemas.logicaltypes.DateTime;
import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric;
import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
import org.apache.beam.sdk.schemas.logicaltypes.Time;
import org.apache.beam.sdk.values.Row;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.UUIDUtil;
import org.checkerframework.checker.nullness.qual.Nullable;

public class BeamRowWrapper implements StructLike {

private final FieldType[] types;
private final @Nullable PositionalGetter<?>[] getters;
private @Nullable Row row = null;

public BeamRowWrapper(Schema schema, Types.StructType struct) {
int size = schema.getFieldCount();

types = (FieldType[]) Array.newInstance(FieldType.class, size);
getters = (PositionalGetter[]) Array.newInstance(PositionalGetter.class, size);

for (int i = 0; i < size; i++) {
types[i] = schema.getField(i).getType();
getters[i] = buildGetter(types[i], struct.fields().get(i).type());
}
}

public BeamRowWrapper wrap(@Nullable Row row) {
this.row = row;
return this;
}

@Override
public int size() {
return types.length;
}

@Override
public <T> @Nullable T get(int pos, Class<T> javaClass) {
if (row == null || row.getValue(pos) == null) {
return null;
} else if (getters[pos] != null) {
return javaClass.cast(getters[pos].get(checkStateNotNull(row), pos));
}

return javaClass.cast(checkStateNotNull(row).getValue(pos));
}

@Override
public <T> void set(int pos, T value) {
throw new UnsupportedOperationException(
"Could not set a field in the BeamRowWrapper because rowData is read-only");
}

private interface PositionalGetter<T> {
T get(Row data, int pos);
}

private static @Nullable PositionalGetter<?> buildGetter(FieldType beamType, Type icebergType) {
switch (beamType.getTypeName()) {
case BYTE:
return Row::getByte;
case INT16:
return Row::getInt16;
case STRING:
return Row::getString;
case BYTES:
return (row, pos) -> {
byte[] bytes = checkStateNotNull(row.getBytes(pos));
if (Type.TypeID.UUID == icebergType.typeId()) {
return UUIDUtil.convert(bytes);
} else {
return ByteBuffer.wrap(bytes);
}
};
case DECIMAL:
return Row::getDecimal;
case DATETIME:
return (row, pos) ->
TimeUnit.MILLISECONDS.toMicros(checkStateNotNull(row.getDateTime(pos)).getMillis());
case ROW:
Schema beamSchema = checkStateNotNull(beamType.getRowSchema());
Types.StructType structType = (Types.StructType) icebergType;

BeamRowWrapper nestedWrapper = new BeamRowWrapper(beamSchema, structType);
return (row, pos) -> nestedWrapper.wrap(row.getRow(pos));
case LOGICAL_TYPE:
if (beamType.isLogicalType(MicrosInstant.IDENTIFIER)) {
return (row, pos) -> {
Instant instant = checkStateNotNull(row.getLogicalTypeValue(pos, Instant.class));
return TimeUnit.SECONDS.toMicros(instant.getEpochSecond()) + instant.getNano() / 1000;
};
} else if (beamType.isLogicalType(DateTime.IDENTIFIER)) {
return (row, pos) ->
DateTimeUtil.microsFromTimestamp(
checkStateNotNull(row.getLogicalTypeValue(pos, LocalDateTime.class)));
} else if (beamType.isLogicalType(Date.IDENTIFIER)) {
return (row, pos) ->
DateTimeUtil.daysFromDate(
checkStateNotNull(row.getLogicalTypeValue(pos, LocalDate.class)));
} else if (beamType.isLogicalType(Time.IDENTIFIER)) {
return (row, pos) ->
DateTimeUtil.microsFromTime(
checkStateNotNull(row.getLogicalTypeValue(pos, LocalTime.class)));
} else if (beamType.isLogicalType(FixedPrecisionNumeric.IDENTIFIER)) {
return (row, pos) -> row.getLogicalTypeValue(pos, BigDecimal.class);
} else {
return null;
}
default:
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,10 @@
public class IcebergIO {

public static WriteRows writeRows(IcebergCatalogConfig catalog) {
return new AutoValue_IcebergIO_WriteRows.Builder().setCatalogConfig(catalog).build();
return new AutoValue_IcebergIO_WriteRows.Builder()
.setCatalogConfig(catalog)
.setGroupByPartitions(false)
.build();
}

@AutoValue
Expand All @@ -397,6 +400,8 @@ public abstract static class WriteRows extends PTransform<PCollection<Row>, Iceb

abstract @Nullable Integer getDirectWriteByteLimit();

abstract boolean getGroupByPartitions();

abstract Builder toBuilder();

@AutoValue.Builder
Expand All @@ -411,6 +416,8 @@ abstract static class Builder {

abstract Builder setDirectWriteByteLimit(Integer directWriteByteLimit);

abstract Builder setGroupByPartitions(boolean GroupByPartitions);

abstract WriteRows build();
}

Expand Down Expand Up @@ -443,6 +450,15 @@ public WriteRows withDirectWriteByteLimit(Integer directWriteByteLimit) {
return toBuilder().setDirectWriteByteLimit(directWriteByteLimit).build();
}

/**
* Groups incoming rows by partition before sending to writes, ensuring that a given bundle is
* written to only one partition. For partitioned tables, this helps significantly to reduce the
* number of small files.
*/
public WriteRows groupingByPartitions() {
return toBuilder().setGroupByPartitions(true).build();
}

@Override
public IcebergWriteResult expand(PCollection<Row> input) {
List<?> allToArgs = Arrays.asList(getTableIdentifier(), getDynamicDestinations());
Expand All @@ -464,15 +480,26 @@ public IcebergWriteResult expand(PCollection<Row> input) {
IcebergUtils.isUnbounded(input),
"Must only provide direct write limit for unbounded pipelines.");
}
return input
.apply("Assign Table Destinations", new AssignDestinations(destinations))
.apply(
"Write Rows to Destinations",
new WriteToDestinations(
getCatalogConfig(),
destinations,
getTriggeringFrequency(),
getDirectWriteByteLimit()));

if (getGroupByPartitions()) {
return input
.apply(
"AssignDestinationAndPartition",
new AssignDestinationsAndPartitions(destinations, getCatalogConfig()))
.apply(
"Write Rows to Partitions",
new WriteToPartitions(getCatalogConfig(), destinations, getTriggeringFrequency()));
} else {
return input
.apply("Assign Table Destinations", new AssignDestinations(destinations))
.apply(
"Write Rows to Destinations",
new WriteToDestinations(
getCatalogConfig(),
destinations,
getTriggeringFrequency(),
getDirectWriteByteLimit()));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ public static Builder builder() {
+ " please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties.")
public abstract @Nullable Map<String, String> getTableProperties();

public abstract @Nullable Boolean getGroupByPartitions();

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setTable(String table);
Expand All @@ -158,6 +160,8 @@ public abstract static class Builder {

public abstract Builder setTableProperties(Map<String, String> tableProperties);

public abstract Builder setGroupByPartitions(Boolean groupByPartitions);

public abstract Configuration build();
}

Expand Down Expand Up @@ -238,6 +242,11 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
writeTransform = writeTransform.withDirectWriteByteLimit(directWriteByteLimit);
}

@Nullable Boolean groupByPartitions = configuration.getGroupByPartitions();
if (groupByPartitions != null && groupByPartitions) {
writeTransform = writeTransform.groupingByPartitions();
}

// TODO: support dynamic destinations
IcebergWriteResult result = rows.apply(writeTransform);

Expand Down
Loading
Loading