diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala index 511e728dbca8..cd5955b0ff39 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala @@ -19,7 +19,9 @@ package org.apache.paimon.spark import org.apache.paimon.CoreOptions +import org.apache.paimon.partition.PartitionStatistics import org.apache.paimon.table.{FileStoreTable, Table} +import org.apache.paimon.table.source.ScanMode import org.apache.paimon.types.RowType import org.apache.paimon.utils.{InternalRowPartitionComputer, TypeUtils} @@ -136,7 +138,31 @@ trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement with L } override def loadPartitionMetadata(ident: InternalRow): JMap[String, String] = { - Map.empty[String, String].asJava + table match { + case fileStoreTable: FileStoreTable => + val partitionSpec = toPaimonPartitions(Array(ident)).head + val partitionEntries = fileStoreTable + .newSnapshotReader() + .withMode(ScanMode.ALL) + .withPartitionFilter(partitionSpec) + .partitionEntries() + + if (!partitionEntries.isEmpty) { + val entry = partitionEntries.get(0) + Map( + PartitionStatistics.FIELD_RECORD_COUNT -> entry.recordCount().toString, + PartitionStatistics.FIELD_FILE_SIZE_IN_BYTES -> entry.fileSizeInBytes().toString, + PartitionStatistics.FIELD_FILE_COUNT -> entry.fileCount().toString, + PartitionStatistics.FIELD_LAST_FILE_CREATION_TIME -> entry + .lastFileCreationTime() + .toString + ).asJava + } else { + Map.empty[String, String].asJava + } + case _ => + Map.empty[String, String].asJava + } } override def listPartitionIdentifiers( diff --git a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala similarity index 84% rename from paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala rename to paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala index ac98a807ca16..5d85c7bd5ee9 100644 --- a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala @@ -18,13 +18,14 @@ package org.apache.paimon.spark.commands +import org.apache.paimon.partition.PartitionStatistics import org.apache.paimon.spark.catalyst.Compatibility import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.analysis.ResolvedPartitionSpec import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName -import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, ToPrettyString} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal} import org.apache.spark.sql.connector.catalog.{Identifier, SupportsPartitionManagement, TableCatalog} import org.apache.spark.sql.connector.catalog.PaimonCatalogImplicits._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ @@ -87,7 +88,20 @@ case class PaimonShowTablePartitionCommand( val partitionValues = partitions.mkString("[", ", ", "]") results.put("Partition Values", s"$partitionValues") - // TODO "Partition Parameters", "Created Time", "Last Access", "Partition Statistics" + // Partition Parameters and Partition Statistics + val metadata = partitionTable.loadPartitionMetadata(row) + if (!metadata.isEmpty) { + val metadataMap = metadata.asScala + results.put( + "Partition Parameters", + s"{${metadataMap.map { case (k, v) => s"$k=$v" }.mkString(", ")}}") + + val fileSizeInBytes = + metadataMap.getOrElse(PartitionStatistics.FIELD_FILE_SIZE_IN_BYTES, "0").toLong + val recordCount = + metadataMap.getOrElse(PartitionStatistics.FIELD_RECORD_COUNT, "0").toLong + results.put("Partition Statistics", s"$recordCount rows, $fileSizeInBytes bytes") + } results .map { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTestBase.scala index 63efa3f7e0a6..31cec5fda672 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTestBase.scala @@ -98,6 +98,22 @@ abstract class DescribeTableTestBase extends PaimonSparkTestBase { ) Assertions.assertTrue( res2.select("information").collect().head.getString(0).contains("Partition Values")) + + val info2 = res2.select("information").collect().head.getString(0) + Assertions.assertTrue(info2.contains("Partition Parameters")) + Assertions.assertTrue(info2.contains(PartitionStatistics.FIELD_RECORD_COUNT)) + Assertions.assertTrue(info2.contains(PartitionStatistics.FIELD_FILE_SIZE_IN_BYTES)) + Assertions.assertTrue(info2.contains(PartitionStatistics.FIELD_FILE_COUNT)) + Assertions.assertTrue(info2.contains(PartitionStatistics.FIELD_LAST_FILE_CREATION_TIME)) + Assertions.assertTrue(info2.contains("Partition Statistics")) + Assertions.assertTrue(info2.contains("recordCount=1")) + Assertions.assertTrue(info2.contains("1 rows")) + + val res3 = + spark.sql(s"SHOW TABLE EXTENDED IN $testDB LIKE 's2' PARTITION(pt = '2024')") + val info3 = res3.select("information").collect().head.getString(0) + Assertions.assertTrue(info3.contains("recordCount=2")) + Assertions.assertTrue(info3.contains("2 rows")) } } } diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala index 461cbd0c938a..23db4e716ddf 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala @@ -18,10 +18,18 @@ package org.apache.paimon.spark.catalyst.analysis +import org.apache.paimon.spark.commands.PaimonShowTablePartitionCommand + import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.analysis.ResolvedTable +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ShowTablePartition} import org.apache.spark.sql.catalyst.rules.Rule case class Spark4ResolutionRules(session: SparkSession) extends Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = plan + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown { + case s @ ShowTablePartition(rt: ResolvedTable, _, _) => + val resolvedSpec = + PaimonResolvePartitionSpec.resolve(rt.catalog, rt.identifier, s.partitionSpec) + PaimonShowTablePartitionCommand(s.output, rt.catalog, rt.identifier, resolvedSpec) + } }