Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public interface PartitionStatistics extends StructLike {
.build();

static Schema schema(Types.StructType unifiedPartitionType, int formatVersion) {
Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned");
Preconditions.checkState(formatVersion > 0, "Invalid format version: %d", formatVersion);

if (formatVersion <= 2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ private PartitionStatsHandler() {}
*/
@Deprecated
public static Schema schema(StructType unifiedPartitionType, int formatVersion) {
Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned");
Preconditions.checkState(
formatVersion > 0 && formatVersion <= TableMetadata.SUPPORTED_TABLE_FORMAT_VERSION,
"Invalid format version: %d",
Expand Down Expand Up @@ -278,7 +277,6 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) thro
public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long snapshotId)
throws IOException {
Preconditions.checkArgument(table != null, "Invalid table: null");
Preconditions.checkArgument(Partitioning.isPartitioned(table), "Table must be partitioned");
Snapshot snapshot = table.snapshot(snapshotId);
Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s", snapshotId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -89,23 +90,53 @@ public void testPartitionStatsOnInvalidSnapshot() throws Exception {
.hasMessage("Snapshot not found: 42");
}

@Test
@TestTemplate
public void testPartitionStatsOnUnPartitionedTable() throws Exception {
Table testTable =
TestTables.create(
tempDir("unpartitioned_table"),
"unpartitioned_table",
tempDir("unpartitioned_table_" + formatVersion),
"unpartitioned_table_" + formatVersion,
SCHEMA,
PartitionSpec.unpartitioned(),
2,
formatVersion,
fileFormatProperty);

// Add a first data file and update partition stats
DataFile dataFile = FileGenerationUtil.generateDataFile(testTable, TestHelpers.Row.of());
testTable.newAppend().appendFile(dataFile).commit();

assertThatThrownBy(() -> PartitionStatsHandler.computeAndWriteStatsFile(testTable))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Table must be partitioned");
testTable
.updatePartitionStatistics()
.setPartitionStatistics(PartitionStatsHandler.computeAndWriteStatsFile(testTable))
.commit();

try (CloseableIterable<PartitionStatistics> recordIterator =
new BasePartitionStatisticsScan(testTable).scan()) {
List<PartitionStatistics> partitionStats = Lists.newArrayList(recordIterator);
assertThat(partitionStats).hasSize(1);
PartitionStatistics stats = partitionStats.get(0);
assertThat(stats.partition()).isEqualTo(GenericRecord.create(Types.StructType.of()));
assertThat(stats.dataRecordCount()).isEqualTo(dataFile.recordCount());
}

// Add a second data file and update partition stats
DataFile dataFile2 = FileGenerationUtil.generateDataFile(testTable, TestHelpers.Row.of());
testTable.newAppend().appendFile(dataFile2).commit();

testTable
.updatePartitionStatistics()
.setPartitionStatistics(PartitionStatsHandler.computeAndWriteStatsFile(testTable))
.commit();

try (CloseableIterable<PartitionStatistics> recordIterator =
new BasePartitionStatisticsScan(testTable).scan()) {
List<PartitionStatistics> partitionStats = Lists.newArrayList(recordIterator);
assertThat(partitionStats).hasSize(1);
PartitionStatistics stats = partitionStats.get(0);
assertThat(stats.partition()).isEqualTo(GenericRecord.create(Types.StructType.of()));
assertThat(stats.dataRecordCount())
.isEqualTo(dataFile.recordCount() + dataFile2.recordCount());
}
}

@TestTemplate
Expand Down
2 changes: 1 addition & 1 deletion format/spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ The schema of the partition statistics file is as follows:

| v1 | v2 | v3 | Field id, name | Type | Description |
|----|----|----|----------------|------|-------------|
| _required_ | _required_ | _required_ | **`1 partition`** | `struct<..>` | Partition data tuple, schema based on the unified partition type considering all specs in a table |
| _required_ | _required_ | _required_ | **`1 partition`** | `struct<..>` | Partition data tuple, schema based on the unified partition type considering all specs in a table, empty for unpartitioned tables |
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to post this change to the mailing list if needed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. any spec change has to go for vote.

And whats the use case? You want to know data files, delete files, file size for unpartitioned table? We can use metadata tables for that Or maybe the new v4 manifests can solve it (with less I/O)?

Also, since the name itself is partitions stats. It makes odd to generate the file for unpartitioned table.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajantha-bhat Thanks, I'll post after exploring another approach, since Parquet writer can't write an empty struct.

We want to obtain the record count more quickly and efficiently for CBO. As mentioned in the PR description, Trino currently needs to read manifest files to estimate table size.

Even if v4 manifests address this, migration would take a long time. We need a solution that works for tables with format versions < v4 as well.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is still a behavior change for existing users.
Partition stats was computed only for partition tables. Now you are enabling for non-partition tables with NULL partition. Plus like I mentioned, it is still odd for non partition table to write this file as it is via compute_partition_stats method. Why does user has to call compute_partition_stats on non-partition table?

I can understand that you need a quick table level stats for CBO in Trino without doing I/O of all the manifests.

a) Can we introduce a new table level stats (along with NDV puffin file) in compute_table_stats? People can still refer it for both partition table and non-partition table if they need whole table level info?
b) Or Can we check if snapshot summary already has this table level stats you are looking for? (we don't have to do multiple IO of the files in that case). If not, can we enhance snapshot summary to include it?

| _required_ | _required_ | _required_ | **`2 spec_id`** | `int` | Partition spec id |
| _required_ | _required_ | _required_ | **`3 data_record_count`** | `long` | Count of records in data files |
| _required_ | _required_ | _required_ | **`4 data_file_count`** | `int` | Count of data files |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ public void testAllDatatypePartitionWriting() throws Exception {
.hasMessage("Cannot write using unregistered internal data format: ORC");
}

@Override
public void testPartitionStatsOnUnPartitionedTable() {
assertThatThrownBy(super::testPartitionStatsOnUnPartitionedTable)
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Cannot write using unregistered internal data format: ORC");
}

@Override
public void testOptionalFieldsWriting() throws Exception {
assertThatThrownBy(super::testOptionalFieldsWriting)
Expand Down
Loading