From 43cd3a5d7c16929c9bfd452c5132bf967f32f22e Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Sun, 5 Apr 2026 22:55:03 +0800 Subject: [PATCH] Data: Add TCK tests for metrics collection in BaseFormatModelTests --- .../iceberg/data/BaseFormatModelTests.java | 444 +++++++++++++++++- 1 file changed, 440 insertions(+), 4 deletions(-) diff --git a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java index e295b5fbc1bb..506477d065b6 100644 --- a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java +++ b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java @@ -26,8 +26,11 @@ import static org.assertj.core.api.Assumptions.assumeThat; import static org.junit.jupiter.api.Assumptions.assumeFalse; +import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.UUID; @@ -35,9 +38,11 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TestTables; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; @@ -56,9 +61,13 @@ import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.FieldSource; @@ -77,6 +86,8 @@ protected boolean supportsBatchReads() { return false; } + @TempDir private File tableDir; + private static final FileFormat[] FILE_FORMATS = new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC}; @@ -92,11 +103,14 @@ protected boolean supportsBatchReads() { static final String FEATURE_CASE_SENSITIVE = "caseSensitive"; static final String FEATURE_SPLIT = "split"; static final String FEATURE_REUSE_CONTAINERS = "reuseContainers"; + static final String FEATURE_METRIC_COLLECT = "metricCollect"; private static final Map MISSING_FEATURES = Map.of( FileFormat.AVRO, - new String[] {FEATURE_FILTER, FEATURE_CASE_SENSITIVE, FEATURE_SPLIT}, + new String[] { + FEATURE_FILTER, FEATURE_CASE_SENSITIVE, FEATURE_SPLIT, FEATURE_METRIC_COLLECT + }, FileFormat.ORC, new String[] {FEATURE_REUSE_CONTAINERS}); @@ -398,9 +412,6 @@ void testReaderBuilderFilter(FileFormat fileFormat) throws IOException { assumeSupports(fileFormat, FEATURE_FILTER); Schema schema = SCHEMA; - new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.required(2, "data", Types.StringType.get())); // Generate records with known id values [0, count) int count = 10000; @@ -628,6 +639,326 @@ void testReaderBuilderRecordsPerBatchNotSupported(FileFormat fileFormat) throws .isInstanceOf(UnsupportedOperationException.class); } + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testDataWriterMetricsCollection(FileFormat fileFormat) throws IOException { + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema schema = dataGenerator.schema(); + FileWriterBuilder, Object> writerBuilder = + FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile); + + DataWriter writer = + writerBuilder.schema(schema).spec(PartitionSpec.unpartitioned()).build(); + + List genericRecords = dataGenerator.generateRecords(); + + try (writer) { + genericRecords.forEach(writer::write); + } + + DataFile dataFile = writer.toDataFile(); + + assertThat(dataFile).isNotNull(); + assertThat(dataFile.recordCount()).isEqualTo(genericRecords.size()); + + // AVRO Only support metric collection with rowCount + assumeSupports(fileFormat, FEATURE_METRIC_COLLECT); + + assertValueAndNullCounts( + schema, genericRecords, dataFile.valueCounts(), dataFile.nullValueCounts()); + assertBounds(schema, genericRecords, dataFile.lowerBounds(), dataFile.upperBounds()); + + assertThat(dataFile.columnSizes()).isNotNull().isNotEmpty(); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testDataWriterMetricsWithNoneMode(FileFormat fileFormat) throws IOException { + + // AVRO Only support metric collection with rowCount + assumeSupports(fileFormat, FEATURE_METRIC_COLLECT); + + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema schema = dataGenerator.schema(); + + TestTables.TestTable table = + TestTables.create( + tableDir, + "test", + schema, + PartitionSpec.unpartitioned(), + 3, + ImmutableMap.of(TableProperties.DEFAULT_WRITE_METRICS_MODE, "none")); + + MetricsConfig noneConfig = MetricsConfig.forTable(table); + + FileWriterBuilder, Object> writerBuilder = + FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile); + + DataWriter writer = + writerBuilder + .schema(schema) + .spec(PartitionSpec.unpartitioned()) + .metricsConfig(noneConfig) + .build(); + + List genericRecords = dataGenerator.generateRecords(); + + try (writer) { + genericRecords.forEach(writer::write); + } + + DataFile dataFile = writer.toDataFile(); + + assertThat(dataFile).isNotNull(); + assertThat(dataFile.recordCount()).isEqualTo(genericRecords.size()); + + // In `None` mode, `valueCounts`, `nullValueCounts`, `lowerBounds`, and `upperBounds` should be + // empty/null. + for (Types.NestedField field : schema.columns()) { + if (field.type().isPrimitiveType()) { + assertThat(dataFile.valueCounts().get(field.fieldId())).isNull(); + assertThat(dataFile.nullValueCounts().get(field.fieldId())).isNull(); + assertThat(dataFile.lowerBounds().get(field.fieldId())).isNull(); + assertThat(dataFile.upperBounds().get(field.fieldId())).isNull(); + } + } + + TestTables.clearTables(); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testDataWriterMetricsWithCountsMode(FileFormat fileFormat) throws IOException { + // AVRO Only support metric collection with rowCount + assumeSupports(fileFormat, FEATURE_METRIC_COLLECT); + + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema schema = dataGenerator.schema(); + TestTables.TestTable table = + TestTables.create( + tableDir, + "test", + schema, + PartitionSpec.unpartitioned(), + 3, + ImmutableMap.of(TableProperties.DEFAULT_WRITE_METRICS_MODE, "counts")); + + MetricsConfig countsConfig = MetricsConfig.forTable(table); + + FileWriterBuilder, Object> writerBuilder = + FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile); + + DataWriter writer = + writerBuilder + .schema(schema) + .spec(PartitionSpec.unpartitioned()) + .metricsConfig(countsConfig) + .build(); + + List genericRecords = dataGenerator.generateRecords(); + + try (writer) { + genericRecords.forEach(writer::write); + } + + DataFile dataFile = writer.toDataFile(); + + assertThat(dataFile).isNotNull(); + assertThat(dataFile.recordCount()).isEqualTo(genericRecords.size()); + + // In the counts mode, valueCounts and nullValueCounts should be present, while lowerBounds and + // upperBounds should be null. + assertValueAndNullCounts( + schema, genericRecords, dataFile.valueCounts(), dataFile.nullValueCounts()); + assertBoundsNull(schema, dataFile.lowerBounds(), dataFile.upperBounds()); + + TestTables.clearTables(); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testEqualityDeleteWriterMetricsCollection(FileFormat fileFormat) throws IOException { + + // AVRO Only support metric collection with rowCount + assumeSupports(fileFormat, FEATURE_METRIC_COLLECT); + + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema schema = dataGenerator.schema(); + FileWriterBuilder, Object> writerBuilder = + FormatModelRegistry.equalityDeleteWriteBuilder(fileFormat, Record.class, encryptedFile); + + EqualityDeleteWriter writer = + writerBuilder + .schema(schema) + .spec(PartitionSpec.unpartitioned()) + .equalityFieldIds(1) + .build(); + + List genericRecords = dataGenerator.generateRecords(); + + try (writer) { + genericRecords.forEach(writer::write); + } + + DeleteFile deleteFile = writer.toDeleteFile(); + + assertThat(deleteFile).isNotNull(); + assertThat(deleteFile.recordCount()).isEqualTo(genericRecords.size()); + + assertValueAndNullCounts( + schema, genericRecords, deleteFile.valueCounts(), deleteFile.nullValueCounts()); + assertBounds(schema, genericRecords, deleteFile.lowerBounds(), deleteFile.upperBounds()); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testPositionDeleteWriterMetricsSingleFile(FileFormat fileFormat) throws IOException { + Schema positionDeleteSchema = DeleteSchemaUtil.pathPosSchema(); + + FileWriterBuilder, ?> writerBuilder = + FormatModelRegistry.positionDeleteWriteBuilder(fileFormat, encryptedFile); + + // Only reference a single data file, which takes the copyWithoutFieldCounts branch: + // counts are removed but bounds are preserved. + List> deletes = + ImmutableList.of( + PositionDelete.create().set("d-file-1.parquet", 0L), + PositionDelete.create().set("d-file-1.parquet", 5L), + PositionDelete.create().set("d-file-1.parquet", 3L)); + + List genericRecords = + deletes.stream() + .map( + d -> + GenericRecord.create(positionDeleteSchema) + .copy( + DELETE_FILE_PATH.name(), d.path(), + DELETE_FILE_POS.name(), d.pos())) + .toList(); + + PositionDeleteWriter writer = writerBuilder.spec(PartitionSpec.unpartitioned()).build(); + try (writer) { + deletes.forEach(writer::write); + } + + DeleteFile deleteFile = writer.toDeleteFile(); + + assertThat(deleteFile).isNotNull(); + assertThat(deleteFile.recordCount()).isEqualTo(deletes.size()); + + assumeSupports(fileFormat, FEATURE_METRIC_COLLECT); + + // Single file reference: valueCounts and nullValueCounts for file_path and pos are removed + assertCountsNull(positionDeleteSchema, deleteFile.valueCounts(), deleteFile.nullValueCounts()); + // Single file reference: bounds are preserved + assertBounds( + positionDeleteSchema, genericRecords, deleteFile.lowerBounds(), deleteFile.upperBounds()); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testPositionDeleteWriterMetricsMultipleFiles(FileFormat fileFormat) throws IOException { + Schema positionDeleteSchema = DeleteSchemaUtil.pathPosSchema(); + + FileWriterBuilder, ?> writerBuilder = + FormatModelRegistry.positionDeleteWriteBuilder(fileFormat, encryptedFile); + + // Reference multiple data files, which takes the copyWithoutFieldCountsAndBounds branch: + // both counts and bounds are removed. + List> deletes = + ImmutableList.of( + PositionDelete.create().set("d-file-1.parquet", 0L), + PositionDelete.create().set("d-file-1.parquet", 5L), + PositionDelete.create().set("d-file-2.parquet", 3L)); + + PositionDeleteWriter writer = writerBuilder.spec(PartitionSpec.unpartitioned()).build(); + try (writer) { + deletes.forEach(writer::write); + } + + DeleteFile deleteFile = writer.toDeleteFile(); + + assertThat(deleteFile).isNotNull(); + assertThat(deleteFile.recordCount()).isEqualTo(deletes.size()); + + assumeSupports(fileFormat, FEATURE_METRIC_COLLECT); + + // Multiple file references: all metrics (counts and bounds) for file_path and pos are removed + assertCountsNull(positionDeleteSchema, deleteFile.valueCounts(), deleteFile.nullValueCounts()); + assertBoundsNull(positionDeleteSchema, deleteFile.lowerBounds(), deleteFile.upperBounds()); + } + + @ParameterizedTest + @FieldSource("FILE_FORMATS") + void testDataWriterMetricsWithPerColumnMode(FileFormat fileFormat) throws IOException { + // AVRO Only support metric collection with rowCount + assumeSupports(fileFormat, FEATURE_METRIC_COLLECT); + + DataGenerator dataGenerator = new DataGenerators.DefaultSchema(); + Schema schema = dataGenerator.schema(); + + // Default mode is "counts", col_b is overridden to "full", col_a is overridden to "none" + TestTables.TestTable table = + TestTables.create( + tableDir, + "test", + schema, + PartitionSpec.unpartitioned(), + 3, + ImmutableMap.of( + TableProperties.DEFAULT_WRITE_METRICS_MODE, + "counts", + TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX + "col_b", + "full", + TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX + "col_a", + "none")); + + MetricsConfig perColumnConfig = MetricsConfig.forTable(table); + + FileWriterBuilder, Object> writerBuilder = + FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile); + + DataWriter writer = + writerBuilder + .schema(schema) + .spec(PartitionSpec.unpartitioned()) + .metricsConfig(perColumnConfig) + .build(); + + List genericRecords = dataGenerator.generateRecords(); + + try (writer) { + genericRecords.forEach(writer::write); + } + + DataFile dataFile = writer.toDataFile(); + + assertThat(dataFile).isNotNull(); + assertThat(dataFile.recordCount()).isEqualTo(genericRecords.size()); + + // col_a: mode=none -> no valueCounts, nullValueCounts, bounds + Schema noneSchema = new Schema(schema.findField("col_a")); + assertCountsNull(noneSchema, dataFile.valueCounts(), dataFile.nullValueCounts()); + assertBoundsNull(noneSchema, dataFile.lowerBounds(), dataFile.upperBounds()); + + // col_b: mode=full -> valueCounts, nullValueCounts, and bounds all present + Schema fullSchema = new Schema(schema.findField("col_b")); + assertValueAndNullCounts( + fullSchema, genericRecords, dataFile.valueCounts(), dataFile.nullValueCounts()); + assertBounds(fullSchema, genericRecords, dataFile.lowerBounds(), dataFile.upperBounds()); + + // col_c, col_d, col_e: mode=counts (default) -> valueCounts and nullValueCounts present, + // but no bounds + Schema countsSchema = + new Schema(schema.findField("col_c"), schema.findField("col_d"), schema.findField("col_e")); + assertValueAndNullCounts( + countsSchema, genericRecords, dataFile.valueCounts(), dataFile.nullValueCounts()); + assertBoundsNull(countsSchema, dataFile.lowerBounds(), dataFile.upperBounds()); + + TestTables.clearTables(); + } + private void readAndAssertGenericRecords( FileFormat fileFormat, Schema schema, List expected) throws IOException { InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); @@ -638,6 +969,7 @@ private void readAndAssertGenericRecords( .build()) { readRecords = ImmutableList.copyOf(reader); } + DataTestHelpers.assertEquals(schema.asStruct(), expected, readRecords); } @@ -719,4 +1051,108 @@ private static String splitSizeProperty(FileFormat fileFormat) { "No split size property defined for format: " + fileFormat); }; } + + private void assertValueAndNullCounts( + Schema schema, + List genericRecords, + Map valueCounts, + Map nullValueCounts) { + for (Types.NestedField field : schema.columns()) { + if (field.type().isPrimitiveType()) { + assertThat(valueCounts).containsKey(field.fieldId()); + assertThat(nullValueCounts).containsKey(field.fieldId()); + + long nullCount = + genericRecords.stream().filter(r -> r.getField(field.name()) == null).count(); + + assertThat(valueCounts.get(field.fieldId())).isEqualTo(genericRecords.size()); + assertThat(nullValueCounts.get(field.fieldId())).isEqualTo(nullCount); + } + } + } + + private void assertBounds( + Schema schema, + List genericRecords, + Map lowerBounds, + Map upperBounds) { + for (Types.NestedField field : schema.columns()) { + if (field.type().isPrimitiveType()) { + assertThat(lowerBounds).containsKey(field.fieldId()); + assertThat(upperBounds).containsKey(field.fieldId()); + + ByteBuffer lowerBuffer = lowerBounds.get(field.fieldId()); + ByteBuffer upperBuffer = upperBounds.get(field.fieldId()); + + Comparator cmp = Comparators.forType(field.type().asPrimitiveType()); + + Object[] minMax = computeMinMax(genericRecords, field, cmp); + Object expectedMin = minMax[0]; + Object expectedMax = minMax[1]; + + if (expectedMin != null) { + assertThat(lowerBuffer).isNotNull(); + Object actualLower = Conversions.fromByteBuffer(field.type(), lowerBuffer); + assertThat(cmp.compare(actualLower, expectedMin)).isEqualTo(0); + } + + if (expectedMax != null) { + assertThat(upperBuffer).isNotNull(); + Object actualUpper = Conversions.fromByteBuffer(field.type(), upperBuffer); + assertThat(cmp.compare(actualUpper, expectedMax)).isEqualTo(0); + } + } + } + } + + private static Object[] computeMinMax( + List records, Types.NestedField field, Comparator cmp) { + Object min = null; + Object max = null; + for (Record record : records) { + Object value = record.getField(field.name()); + if (value == null) { + continue; + } + + if (value instanceof Float && ((Float) value).isNaN()) { + continue; + } + + if (value instanceof Double && ((Double) value).isNaN()) { + continue; + } + + if (min == null || cmp.compare(value, min) < 0) { + min = value; + } + + if (max == null || cmp.compare(value, max) > 0) { + max = value; + } + } + + return new Object[] {min, max}; + } + + private void assertBoundsNull( + Schema schema, Map lowerBounds, Map upperBounds) { + for (Types.NestedField field : schema.columns()) { + if (field.type().isPrimitiveType()) { + assertThat(lowerBounds == null || lowerBounds.get(field.fieldId()) == null).isTrue(); + assertThat(upperBounds == null || upperBounds.get(field.fieldId()) == null).isTrue(); + } + } + } + + private void assertCountsNull( + Schema schema, Map valueCounts, Map nullValueCounts) { + for (Types.NestedField field : schema.columns()) { + if (field.type().isPrimitiveType()) { + assertThat(valueCounts == null || valueCounts.get(field.fieldId()) == null).isTrue(); + assertThat(nullValueCounts == null || nullValueCounts.get(field.fieldId()) == null) + .isTrue(); + } + } + } }