diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala index f13217442f9f..43399d5da539 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala @@ -358,6 +358,26 @@ class VeloxMetricsApi extends MetricsApi with Logging { override def genExpandTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater = new ExpandMetricsUpdater(metrics) + override def genGenerateTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = + Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), + "numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), + "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of generate"), + "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"), + "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"), + "numMemoryAllocations" -> SQLMetrics.createMetric( + sparkContext, + "number of memory allocations"), + "loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric( + sparkContext, + "time of loading lazy vectors") + ) + + override def genGenerateTransformerMetricsUpdater( + metrics: Map[String, SQLMetric]): MetricsUpdater = + new GenerateMetricsUpdater(metrics) + override def genCustomExpandMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala index c4b2e84d6768..7c99c9673363 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala @@ -18,7 +18,7 @@ package org.apache.gluten.execution import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.GenerateExecTransformer.supportsGenerate -import org.apache.gluten.metrics.{GenerateMetricsUpdater, MetricsUpdater} +import org.apache.gluten.metrics.MetricsUpdater import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.expression.ExpressionNode import org.apache.gluten.substrait.extensions.{AdvancedExtensionNode, ExtensionBuilder} @@ -27,7 +27,6 @@ import org.apache.gluten.utils.PullOutProjectHelper import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.{GenerateExec, ProjectExec, SparkPlan} -import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.types.{BooleanType, IntegerType} import com.google.protobuf.StringValue @@ -50,19 +49,10 @@ case class GenerateExecTransformer( @transient override lazy val metrics = - Map( - "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), - "numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), - "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of generate"), - "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"), - "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"), - "numMemoryAllocations" -> SQLMetrics.createMetric( - sparkContext, - "number of memory allocations") - ) + BackendsApiManager.getMetricsApiInstance.genGenerateTransformerMetrics(sparkContext) - override def metricsUpdater(): MetricsUpdater = new GenerateMetricsUpdater(metrics) + override def metricsUpdater(): MetricsUpdater = + BackendsApiManager.getMetricsApiInstance.genGenerateTransformerMetricsUpdater(metrics) override protected def withNewChildInternal(newChild: SparkPlan): GenerateExecTransformer = copy(generator, requiredChildOutput, outer, generatorOutput, newChild) diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala index cf894b9da466..b056cd36a8ed 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala @@ -46,13 +46,14 @@ abstract class JoinMetricsUpdaterBase(val metrics: Map[String, SQLMetric]) joinMetrics: util.ArrayList[OperatorMetrics], singleMetrics: SingleMetric, joinParams: JoinParams): Unit = { - assert(joinParams.postProjectionNeeded) - val postProjectMetrics = joinMetrics.remove(0) - postProjectionCpuCount += postProjectMetrics.cpuCount - postProjectionWallNanos += postProjectMetrics.wallNanos - numOutputRows += postProjectMetrics.outputRows - numOutputVectors += postProjectMetrics.outputVectors - numOutputBytes += postProjectMetrics.outputBytes + if (joinParams.postProjectionNeeded) { + val postProjectMetrics = joinMetrics.remove(0) + postProjectionCpuCount += postProjectMetrics.cpuCount + postProjectionWallNanos += postProjectMetrics.wallNanos + numOutputRows += postProjectMetrics.outputRows + numOutputVectors += postProjectMetrics.outputVectors + numOutputBytes += postProjectMetrics.outputBytes + } updateJoinMetricsInternal(joinMetrics, joinParams) } diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala index 09430fdd70ec..0567ccc8ceb6 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala @@ -253,19 +253,19 @@ object MetricsUtil extends Logging { mutNode.updater match { case smj: SortMergeJoinMetricsUpdater => - smj.updateJoinMetrics( - operatorMetrics, - metrics.getSingleMetrics, - joinParamsMap.get(operatorIdx)) + val joinParams = Option(joinParamsMap.get(operatorIdx)).getOrElse { + val p = JoinParams(); p.postProjectionNeeded = false; p + } + smj.updateJoinMetrics(operatorMetrics, metrics.getSingleMetrics, joinParams) case ju: JoinMetricsUpdaterBase => // JoinRel and CrossRel output two suites of metrics respectively for build and probe. // Therefore, fetch one more suite of metrics here. operatorMetrics.add(metrics.getOperatorMetrics(curMetricsIdx)) curMetricsIdx -= 1 - ju.updateJoinMetrics( - operatorMetrics, - metrics.getSingleMetrics, - joinParamsMap.get(operatorIdx)) + val joinParams = Option(joinParamsMap.get(operatorIdx)).getOrElse { + val p = JoinParams(); p.postProjectionNeeded = false; p + } + ju.updateJoinMetrics(operatorMetrics, metrics.getSingleMetrics, joinParams) case u: UnionMetricsUpdater => // JoinRel outputs two suites of metrics respectively for hash build and hash probe. // Therefore, fetch one more suite of metrics here. @@ -273,7 +273,8 @@ object MetricsUtil extends Logging { curMetricsIdx -= 1 u.updateUnionMetrics(operatorMetrics) case hau: HashAggregateMetricsUpdater => - hau.updateAggregationMetrics(operatorMetrics, aggParamsMap.get(operatorIdx)) + val aggParams = Option(aggParamsMap.get(operatorIdx)).getOrElse(AggregationParams()) + hau.updateAggregationMetrics(operatorMetrics, aggParams) case lu: LimitMetricsUpdater => // Limit over Sort is converted to TopN node in Velox, so there is only one suite of metrics // for the two transformers. We do not update metrics for limit and leave it for sort. @@ -342,30 +343,24 @@ object MetricsUtil extends Logging { aggParamsMap: JMap[JLong, AggregationParams], taskStatsAccumulator: TaskStatsAccumulator): IMetrics => Unit = { imetrics => - try { - val metrics = imetrics.asInstanceOf[Metrics] - val numNativeMetrics = metrics.inputRows.length - if (numNativeMetrics == 0) { - () - } else { - updateTransformerMetricsInternal( - mutNode, - relMap, - operatorIdx, - metrics, - numNativeMetrics - 1, - joinParamsMap, - aggParamsMap) + val metrics = imetrics.asInstanceOf[Metrics] + val numNativeMetrics = metrics.inputRows.length + if (numNativeMetrics == 0) { + () + } else { + updateTransformerMetricsInternal( + mutNode, + relMap, + operatorIdx, + metrics, + numNativeMetrics - 1, + joinParamsMap, + aggParamsMap) - // Update the task stats accumulator with the metrics. - if (metrics.taskStats != null) { - taskStatsAccumulator.add(metrics.taskStats) - } + // Update the task stats accumulator with the metrics. + if (metrics.taskStats != null) { + taskStatsAccumulator.add(metrics.taskStats) } - } catch { - case e: Exception => - logWarning(s"Updating native metrics failed due to ${e.getCause}.") - () } } } diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala index 247de220d002..35edc4fa6e25 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala @@ -152,6 +152,27 @@ class VeloxMetricsSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa assert(metrics("numOutputVectors").value > 0) assert(metrics("numOutputBytes").value > 0) } + + runQueryAndCompare( + "SELECT c1, col FROM metrics_t1 LATERAL VIEW explode(array(c1, c2)) t AS col") { + df => + val scan = find(df.queryExecution.executedPlan) { + case _: FileSourceScanExecTransformer => true + case _ => false + } + assert(scan.isDefined) + val scanMetrics = scan.get.metrics + assert(scanMetrics("rawInputRows").value > 0) + + val generate = find(df.queryExecution.executedPlan) { + case _: GenerateExecTransformer => true + case _ => false + } + assert(generate.isDefined) + val genMetrics = generate.get.metrics + assert(genMetrics("numOutputRows").value == 2 * scanMetrics("rawInputRows").value) + assert(genMetrics.contains("loadLazyVectorTime")) + } } test("Metrics of window") { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala index 896979b100c5..93e7c1fb9ddc 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala @@ -82,6 +82,12 @@ trait MetricsApi extends Serializable { def genExpandTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater + def genGenerateTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = + throw new UnsupportedOperationException() + + def genGenerateTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater = + throw new UnsupportedOperationException() + def genCustomExpandMetrics(sparkContext: SparkContext): Map[String, SQLMetric] def genColumnarShuffleExchangeMetrics( diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala index 868e6f1a9ec7..8e6df159342a 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala @@ -20,7 +20,7 @@ import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.expression.ExpressionConverter import org.apache.gluten.extension.columnar.transition.Convention import org.apache.gluten.metrics.MetricsUpdater -import org.apache.gluten.substrait.SubstraitContext +import org.apache.gluten.substrait.{JoinParams, SubstraitContext} import org.apache.gluten.substrait.rel.RelBuilder import org.apache.gluten.utils.SubstraitUtil @@ -96,6 +96,11 @@ case class CartesianProductExecTransformer( JoinUtils.createExtensionNode(inputLeftOutput ++ inputRightOutput, validation = false) val operatorId = context.nextOperatorId(this.nodeName) + val joinParams = new JoinParams + joinParams.postProjectionNeeded = false + if (condition.isDefined) { + joinParams.isWithCondition = true + } val currRel = RelBuilder.makeCrossRel( inputLeftRelNode, @@ -106,6 +111,9 @@ case class CartesianProductExecTransformer( context, operatorId ) + + context.registerJoinParam(operatorId, joinParams) + TransformContext(output, currRel) }