Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,26 @@ class VeloxMetricsApi extends MetricsApi with Logging {
override def genExpandTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
new ExpandMetricsUpdater(metrics)

override def genGenerateTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of generate"),
"cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
"peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
"numMemoryAllocations" -> SQLMetrics.createMetric(
sparkContext,
"number of memory allocations"),
"loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"time of loading lazy vectors")
)

override def genGenerateTransformerMetricsUpdater(
metrics: Map[String, SQLMetric]): MetricsUpdater =
new GenerateMetricsUpdater(metrics)

override def genCustomExpandMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.gluten.execution

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.GenerateExecTransformer.supportsGenerate
import org.apache.gluten.metrics.{GenerateMetricsUpdater, MetricsUpdater}
import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.expression.ExpressionNode
import org.apache.gluten.substrait.extensions.{AdvancedExtensionNode, ExtensionBuilder}
Expand All @@ -27,7 +27,6 @@ import org.apache.gluten.utils.PullOutProjectHelper

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.{GenerateExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.{BooleanType, IntegerType}

import com.google.protobuf.StringValue
Expand All @@ -50,19 +49,10 @@ case class GenerateExecTransformer(

@transient
override lazy val metrics =
Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of generate"),
"cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
"peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
"numMemoryAllocations" -> SQLMetrics.createMetric(
sparkContext,
"number of memory allocations")
)
BackendsApiManager.getMetricsApiInstance.genGenerateTransformerMetrics(sparkContext)

override def metricsUpdater(): MetricsUpdater = new GenerateMetricsUpdater(metrics)
override def metricsUpdater(): MetricsUpdater =
BackendsApiManager.getMetricsApiInstance.genGenerateTransformerMetricsUpdater(metrics)

override protected def withNewChildInternal(newChild: SparkPlan): GenerateExecTransformer =
copy(generator, requiredChildOutput, outer, generatorOutput, newChild)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@ abstract class JoinMetricsUpdaterBase(val metrics: Map[String, SQLMetric])
joinMetrics: util.ArrayList[OperatorMetrics],
singleMetrics: SingleMetric,
joinParams: JoinParams): Unit = {
assert(joinParams.postProjectionNeeded)
val postProjectMetrics = joinMetrics.remove(0)
postProjectionCpuCount += postProjectMetrics.cpuCount
postProjectionWallNanos += postProjectMetrics.wallNanos
numOutputRows += postProjectMetrics.outputRows
numOutputVectors += postProjectMetrics.outputVectors
numOutputBytes += postProjectMetrics.outputBytes
if (joinParams.postProjectionNeeded) {
val postProjectMetrics = joinMetrics.remove(0)
postProjectionCpuCount += postProjectMetrics.cpuCount
postProjectionWallNanos += postProjectMetrics.wallNanos
numOutputRows += postProjectMetrics.outputRows
numOutputVectors += postProjectMetrics.outputVectors
numOutputBytes += postProjectMetrics.outputBytes
}

updateJoinMetricsInternal(joinMetrics, joinParams)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,27 +253,28 @@ object MetricsUtil extends Logging {

mutNode.updater match {
case smj: SortMergeJoinMetricsUpdater =>
smj.updateJoinMetrics(
operatorMetrics,
metrics.getSingleMetrics,
joinParamsMap.get(operatorIdx))
val joinParams = Option(joinParamsMap.get(operatorIdx)).getOrElse {
val p = JoinParams(); p.postProjectionNeeded = false; p
}
smj.updateJoinMetrics(operatorMetrics, metrics.getSingleMetrics, joinParams)
case ju: JoinMetricsUpdaterBase =>
// JoinRel and CrossRel output two suites of metrics respectively for build and probe.
// Therefore, fetch one more suite of metrics here.
operatorMetrics.add(metrics.getOperatorMetrics(curMetricsIdx))
curMetricsIdx -= 1
ju.updateJoinMetrics(
operatorMetrics,
metrics.getSingleMetrics,
joinParamsMap.get(operatorIdx))
val joinParams = Option(joinParamsMap.get(operatorIdx)).getOrElse {
val p = JoinParams(); p.postProjectionNeeded = false; p
}
ju.updateJoinMetrics(operatorMetrics, metrics.getSingleMetrics, joinParams)
case u: UnionMetricsUpdater =>
// JoinRel outputs two suites of metrics respectively for hash build and hash probe.
// Therefore, fetch one more suite of metrics here.
operatorMetrics.add(metrics.getOperatorMetrics(curMetricsIdx))
curMetricsIdx -= 1
u.updateUnionMetrics(operatorMetrics)
case hau: HashAggregateMetricsUpdater =>
hau.updateAggregationMetrics(operatorMetrics, aggParamsMap.get(operatorIdx))
val aggParams = Option(aggParamsMap.get(operatorIdx)).getOrElse(AggregationParams())
hau.updateAggregationMetrics(operatorMetrics, aggParams)
case lu: LimitMetricsUpdater =>
// Limit over Sort is converted to TopN node in Velox, so there is only one suite of metrics
// for the two transformers. We do not update metrics for limit and leave it for sort.
Expand Down Expand Up @@ -342,30 +343,24 @@ object MetricsUtil extends Logging {
aggParamsMap: JMap[JLong, AggregationParams],
taskStatsAccumulator: TaskStatsAccumulator): IMetrics => Unit = {
imetrics =>
try {
val metrics = imetrics.asInstanceOf[Metrics]
val numNativeMetrics = metrics.inputRows.length
if (numNativeMetrics == 0) {
()
} else {
updateTransformerMetricsInternal(
mutNode,
relMap,
operatorIdx,
metrics,
numNativeMetrics - 1,
joinParamsMap,
aggParamsMap)
val metrics = imetrics.asInstanceOf[Metrics]
val numNativeMetrics = metrics.inputRows.length
if (numNativeMetrics == 0) {
()
} else {
updateTransformerMetricsInternal(
mutNode,
relMap,
operatorIdx,
metrics,
numNativeMetrics - 1,
joinParamsMap,
aggParamsMap)

// Update the task stats accumulator with the metrics.
if (metrics.taskStats != null) {
taskStatsAccumulator.add(metrics.taskStats)
}
// Update the task stats accumulator with the metrics.
if (metrics.taskStats != null) {
taskStatsAccumulator.add(metrics.taskStats)
}
} catch {
case e: Exception =>
logWarning(s"Updating native metrics failed due to ${e.getCause}.")
()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,27 @@ class VeloxMetricsSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa
assert(metrics("numOutputVectors").value > 0)
assert(metrics("numOutputBytes").value > 0)
}

runQueryAndCompare(
"SELECT c1, col FROM metrics_t1 LATERAL VIEW explode(array(c1, c2)) t AS col") {
df =>
val scan = find(df.queryExecution.executedPlan) {
case _: FileSourceScanExecTransformer => true
case _ => false
}
assert(scan.isDefined)
val scanMetrics = scan.get.metrics
assert(scanMetrics("rawInputRows").value > 0)

val generate = find(df.queryExecution.executedPlan) {
case _: GenerateExecTransformer => true
case _ => false
}
assert(generate.isDefined)
val genMetrics = generate.get.metrics
assert(genMetrics("numOutputRows").value == 2 * scanMetrics("rawInputRows").value)
assert(genMetrics.contains("loadLazyVectorTime"))
}
}

test("Metrics of window") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ trait MetricsApi extends Serializable {

def genExpandTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater

def genGenerateTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
throw new UnsupportedOperationException()

def genGenerateTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
throw new UnsupportedOperationException()

def genCustomExpandMetrics(sparkContext: SparkContext): Map[String, SQLMetric]

def genColumnarShuffleExchangeMetrics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.expression.ExpressionConverter
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.{JoinParams, SubstraitContext}
import org.apache.gluten.substrait.rel.RelBuilder
import org.apache.gluten.utils.SubstraitUtil

Expand Down Expand Up @@ -96,6 +96,11 @@ case class CartesianProductExecTransformer(
JoinUtils.createExtensionNode(inputLeftOutput ++ inputRightOutput, validation = false)

val operatorId = context.nextOperatorId(this.nodeName)
val joinParams = new JoinParams
joinParams.postProjectionNeeded = false
if (condition.isDefined) {
joinParams.isWithCondition = true
}

val currRel = RelBuilder.makeCrossRel(
inputLeftRelNode,
Expand All @@ -106,6 +111,9 @@ case class CartesianProductExecTransformer(
context,
operatorId
)

context.registerJoinParam(operatorId, joinParams)

TransformContext(output, currRel)
}

Expand Down
Loading