Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,26 @@ class VeloxMetricsApi extends MetricsApi with Logging {
override def genExpandTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
new ExpandMetricsUpdater(metrics)

override def genGenerateTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of generate"),
"cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
"peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
"numMemoryAllocations" -> SQLMetrics.createMetric(
sparkContext,
"number of memory allocations"),
"loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"time of loading lazy vectors")
)

override def genGenerateTransformerMetricsUpdater(
metrics: Map[String, SQLMetric]): MetricsUpdater =
new GenerateMetricsUpdater(metrics)

override def genCustomExpandMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.gluten.execution

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.GenerateExecTransformer.supportsGenerate
import org.apache.gluten.metrics.{GenerateMetricsUpdater, MetricsUpdater}
import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.expression.ExpressionNode
import org.apache.gluten.substrait.extensions.{AdvancedExtensionNode, ExtensionBuilder}
Expand All @@ -27,7 +27,6 @@ import org.apache.gluten.utils.PullOutProjectHelper

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.{GenerateExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.{BooleanType, IntegerType}

import com.google.protobuf.StringValue
Expand All @@ -50,19 +49,10 @@ case class GenerateExecTransformer(

@transient
override lazy val metrics =
Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of generate"),
"cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
"peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
"numMemoryAllocations" -> SQLMetrics.createMetric(
sparkContext,
"number of memory allocations")
)
BackendsApiManager.getMetricsApiInstance.genGenerateTransformerMetrics(sparkContext)

override def metricsUpdater(): MetricsUpdater = new GenerateMetricsUpdater(metrics)
override def metricsUpdater(): MetricsUpdater =
BackendsApiManager.getMetricsApiInstance.genGenerateTransformerMetricsUpdater(metrics)

override protected def withNewChildInternal(newChild: SparkPlan): GenerateExecTransformer =
copy(generator, requiredChildOutput, outer, generatorOutput, newChild)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ object MetricsUtil extends Logging {
}
} catch {
case e: Exception =>
logWarning(s"Updating native metrics failed due to ${e.getCause}.")
logError(s"Updating native metrics failed due to ${e.getCause}." + e)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we throw? I met similar issue a few days ago. cc @rui-mo

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall I make the change in this PR?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, let's change it and verify CI

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhztheplayer removing the try-catch required fixes in a few more places but all UTs are passing now

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@acvictor what kinds of failures are you seeing after removing the try-catch? As I recall, it was added to guard against cases where the metrics updater isn’t properly implemented for certain operators.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rui-mo these were the only test failures

- rewrite unbounded window *** FAILED ***
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 104.0 failed 1 times, most recent failure: Lost task 0.0 in stage 104.0 (TID 66) (3e255c7568a5 executor driver): java.lang.NullPointerException: Cannot invoke "org.apache.gluten.substrait.JoinParams.postProjectionNeeded()" because "joinParams" is null
	at org.apache.gluten.metrics.JoinMetricsUpdaterBase.updateJoinMetrics(JoinMetricsUpdater.scala:49)
	at org.apache.gluten.metrics.MetricsUtil$.updateTransformerMetricsInternal(MetricsUtil.scala:268)
	at org.apache.gluten.metrics.MetricsUtil$.$anonfun$genMetricsUpdatingFunction$3(MetricsUtil.scala:357)
	at org.apache.gluten.metrics.MetricsUtil$.$anonfun$genMetricsUpdatingFunction$3$adapted(MetricsUtil.scala:344)
	at org.apache.gluten.backendsapi.velox.VeloxIteratorApi.$anonfun$genFinalStageIterator$2(VeloxIteratorApi.scala:282)
- test cross join *** FAILED ***
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 948.0 failed 1 times, most recent failure: Lost task 0.0 in stage 948.0 (TID 812) (3e255c7568a5 executor driver): java.lang.NullPointerException: Cannot invoke "org.apache.gluten.substrait.JoinParams.postProjectionNeeded()" because "joinParams" is null
	at org.apache.gluten.metrics.JoinMetricsUpdaterBase.updateJoinMetrics(JoinMetricsUpdater.scala:49)
	at org.apache.gluten.metrics.MetricsUtil$.updateTransformerMetricsInternal(MetricsUtil.scala:268)
	at org.apache.gluten.metrics.MetricsUtil$.$anonfun$genMetricsUpdatingFunction$3(MetricsUtil.scala:357)
	at org.apache.gluten.metrics.MetricsUtil$.$anonfun$genMetricsUpdatingFunction$3$adapted(MetricsUtil.scala:344)
	at org.apache.gluten.backendsapi.velox.VeloxIteratorApi.$anonfun$genFinalStageIterator$2(VeloxIteratorApi.scala:282)
- Columnar cartesian product with other join *** FAILED ***
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1044.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1044.0 (TID 877) (3e255c7568a5 executor driver): org.apache.gluten.exception.GlutenException: org.apache.gluten.exception.GlutenException: Error during calling Java code from native code: java.lang.NullPointerException: Cannot invoke "org.apache.gluten.substrait.JoinParams.postProjectionNeeded()" because "joinParams" is null
	at org.apache.gluten.metrics.JoinMetricsUpdaterBase.updateJoinMetrics(JoinMetricsUpdater.scala:49)
	at org.apache.gluten.metrics.MetricsUtil$.updateTransformerMetricsInternal(MetricsUtil.scala:268)
	at org.apache.gluten.metrics.MetricsUtil$.$anonfun$updateTransformerMetricsInternal$2(MetricsUtil.scala:310)
	at org.apache.gluten.metrics.MetricsUtil$.$anonfun$updateTransformerMetricsInternal$2$adapted(MetricsUtil.scala:302)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rui-mo do you have any concerns with removing the try-catch?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see CI is green, which means the metrics strictly match in all tests. Waiting for @rui-mo to confirm here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @acvictor, if I understand correctly, are these failures already addressed in this PR? If so, it works for me to proceed with this PR first.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes @rui-mo. I have fixed all in this PR.

()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,27 @@ class VeloxMetricsSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa
assert(metrics("numOutputVectors").value > 0)
assert(metrics("numOutputBytes").value > 0)
}

runQueryAndCompare(
"SELECT c1, col FROM metrics_t1 LATERAL VIEW explode(array(c1, c2)) t AS col") {
df =>
val scan = find(df.queryExecution.executedPlan) {
case _: FileSourceScanExecTransformer => true
case _ => false
}
assert(scan.isDefined)
val scanMetrics = scan.get.metrics
assert(scanMetrics("rawInputRows").value > 0)

val generate = find(df.queryExecution.executedPlan) {
case _: GenerateExecTransformer => true
case _ => false
}
assert(generate.isDefined)
val genMetrics = generate.get.metrics
assert(genMetrics("numOutputRows").value == 2 * scanMetrics("rawInputRows").value)
assert(genMetrics.contains("loadLazyVectorTime"))
}
}

test("Metrics of window") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ trait MetricsApi extends Serializable {

def genExpandTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater

def genGenerateTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
throw new UnsupportedOperationException()

def genGenerateTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
throw new UnsupportedOperationException()

def genCustomExpandMetrics(sparkContext: SparkContext): Map[String, SQLMetric]

def genColumnarShuffleExchangeMetrics(
Expand Down
Loading