Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.gluten.execution.ProjectExecTransformer
import org.apache.gluten.test.TestStats
import org.apache.gluten.utils.BackendTestUtils

import org.apache.spark.SparkException
import org.apache.spark.sql.GlutenQueryTestUtil.isNaNOrInf
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.analysis.ResolveTimeZone
Expand Down Expand Up @@ -250,7 +251,19 @@ trait GlutenTestsTrait extends GlutenTestsCommonTrait {
_spark.createDataFrame(_spark.sparkContext.parallelize(empData), schema)
}
val resultDF = df.select(ClassicColumn(expression))
val result = resultDF.collect()
val result =
try {
resultDF.collect()
} catch {
// Match Spark's checkEvaluationWithoutCodegen behavior: wrap exceptions with fail().
// Gluten's DataFrame path wraps execution errors in SparkException, so unwrap it
// to expose the root cause (e.g. ArithmeticException) directly as fail()'s cause,
// just like Spark's interpreted path does.
case e: SparkException if e.getCause != null =>
fail(s"Exception evaluating $expression", e.getCause)
case e: Exception =>
fail(s"Exception evaluating $expression", e)
}
TestStats.testUnitNumber = TestStats.testUnitNumber + 1
if (
checkDataTypeSupported(expression) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenCodeGeneratorWithInterpretedFallbackSuite]
enableSuite[GlutenCollationExpressionSuite]
enableSuite[GlutenCollationRegexpExpressionsSuite]
// TODO: 4.x enableSuite[GlutenCsvExpressionsSuite] // failures with GlutenPlugin
enableSuite[GlutenCsvExpressionsSuite]
enableSuite[GlutenDynamicPruningSubquerySuite]
enableSuite[GlutenExprIdSuite]
// TODO: 4.x enableSuite[GlutenExpressionEvalHelperSuite] // 2 failures
Expand Down Expand Up @@ -371,8 +371,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("unsupported parquet conversion ShortType -> DecimalType(6,1)")
enableSuite[GlutenParquetVariantShreddingSuite]
// Generated suites for org.apache.spark.sql.execution.datasources.text
// TODO: 4.x enableSuite[GlutenWholeTextFileV1Suite] // 1 failure
// TODO: 4.x enableSuite[GlutenWholeTextFileV2Suite] // 1 failure
enableSuite[GlutenWholeTextFileV1Suite]
enableSuite[GlutenWholeTextFileV2Suite]
// Generated suites for org.apache.spark.sql.execution.datasources.v2
enableSuite[GlutenFileWriterFactorySuite]
enableSuite[GlutenV2SessionCatalogNamespaceSuite]
Expand Down Expand Up @@ -675,15 +675,25 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenAggregatingAccumulatorSuite]
enableSuite[GlutenCoGroupedIteratorSuite]
enableSuite[GlutenColumnarRulesSuite]
// TODO: 4.x enableSuite[GlutenDataSourceScanExecRedactionSuite] // 2 failures
// TODO: 4.x enableSuite[GlutenDataSourceV2ScanExecRedactionSuite] // 2 failures
enableSuite[GlutenDataSourceScanExecRedactionSuite]
.exclude("explain is redacted using SQLConf")
.exclude("SPARK-31793: FileSourceScanExec metadata should contain limited file paths")
enableSuite[GlutenDataSourceV2ScanExecRedactionSuite]
.exclude("explain is redacted using SQLConf")
.exclude("FileScan description")
enableSuite[GlutenExecuteImmediateEndToEndSuite]
enableSuite[GlutenExternalAppendOnlyUnsafeRowArraySuite]
enableSuite[GlutenGlobalTempViewSuite]
enableSuite[GlutenGlobalTempViewTestSuite]
enableSuite[GlutenGroupedIteratorSuite]
enableSuite[GlutenHiveResultSuite]
// TODO: 4.x enableSuite[GlutenInsertSortForLimitAndOffsetSuite] // 6 failures
enableSuite[GlutenInsertSortForLimitAndOffsetSuite]
.exclude("root LIMIT preserves data ordering with top-K sort")
.exclude("middle LIMIT preserves data ordering with top-K sort")
.exclude("root LIMIT preserves data ordering with CollectLimitExec")
.exclude("middle LIMIT preserves data ordering with the extra sort")
.exclude("root OFFSET preserves data ordering with CollectLimitExec")
.exclude("middle OFFSET preserves data ordering with the extra sort")
enableSuite[GlutenLocalTempViewTestSuite]
enableSuite[GlutenLogicalPlanTagInSparkPlanSuite]
enableSuite[GlutenOptimizeMetadataOnlyQuerySuite]
Expand All @@ -699,7 +709,8 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenSQLJsonProtocolSuite]
enableSuite[GlutenShufflePartitionsUtilSuite]
// TODO: 4.x enableSuite[GlutenSimpleSQLViewSuite] // 1 failure
// TODO: 4.x enableSuite[GlutenSparkPlanSuite] // 1 failure
enableSuite[GlutenSparkPlanSuite]
.exclude("SPARK-37779: ColumnarToRowExec should be canonicalizable after being (de)serialized")
enableSuite[GlutenSparkPlannerSuite]
enableSuite[GlutenSparkScriptTransformationSuite]
enableSuite[GlutenSparkSqlParserSuite]
Expand Down Expand Up @@ -800,7 +811,8 @@ class VeloxTestSettings extends BackendTestSettings {
// TODO: 4.x enableSuite[GlutenExplainSuite] // 1 failure
enableSuite[GlutenICUCollationsMapSuite]
enableSuite[GlutenInlineTableParsingImprovementsSuite]
// TODO: 4.x enableSuite[GlutenJoinHintSuite] // 1 failure
enableSuite[GlutenJoinHintSuite]
.exclude("join strategy hint - shuffle-replicate-nl")
enableSuite[GlutenLogQuerySuite]
// Overridden
.exclude("Query Spark logs with exception using SQL")
Expand Down Expand Up @@ -1192,7 +1204,7 @@ class VeloxTestSettings extends BackendTestSettings {
// TODO: 4.x enableSuite[GlutenStreamingInnerJoinSuite]
enableSuite[GlutenStreamingLeftSemiJoinSuite]
// TODO: 4.x enableSuite[GlutenStreamingOuterJoinSuite]
enableSuite[GlutenStreamingQueryHashPartitionVerifySuite]
// TODO: 4.x enableSuite[GlutenStreamingQueryHashPartitionVerifySuite]
enableSuite[GlutenStreamingQueryListenerSuite]
enableSuite[GlutenStreamingQueryListenersConfSuite]
enableSuite[GlutenStreamingQueryManagerSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,43 @@
*/
package org.apache.spark.sql

class GlutenJoinHintSuite extends JoinHintSuite with GlutenSQLTestsBaseTrait {}
import org.apache.gluten.execution.CartesianProductExecTransformer

import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.joins.CartesianProductExec
import org.apache.spark.sql.internal.SQLConf

class GlutenJoinHintSuite extends JoinHintSuite with GlutenSQLTestsBaseTrait {

private def assertGlutenShuffleReplicateNLJoin(df: DataFrame): Unit = {
val executedPlan = df.queryExecution.executedPlan
val cartesianProducts = collect(executedPlan) {
case c: CartesianProductExec => c.asInstanceOf[SparkPlan]
case c: CartesianProductExecTransformer => c.asInstanceOf[SparkPlan]
}
assert(cartesianProducts.size == 1)
}

testGluten("join strategy hint - shuffle-replicate-nl") {
withTempView("t1", "t2") {
spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value").createTempView("t1")
spark
.createDataFrame(Seq((1, "1"), (2, "12.3"), (2, "123")))
.toDF("key", "value")
.createTempView("t2")

withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Int.MaxValue.toString) {
assertGlutenShuffleReplicateNLJoin(
sql(equiJoinQueryWithHint("SHUFFLE_REPLICATE_NL(t1)" :: Nil)))
assertGlutenShuffleReplicateNLJoin(
sql(equiJoinQueryWithHint("SHUFFLE_REPLICATE_NL(t2)" :: Nil)))
assertGlutenShuffleReplicateNLJoin(
sql(equiJoinQueryWithHint("SHUFFLE_REPLICATE_NL(t1, t2)" :: Nil)))
assertGlutenShuffleReplicateNLJoin(
sql(nonEquiJoinQueryWithHint("MERGE(t1)" :: "SHUFFLE_REPLICATE_NL(t2)" :: Nil)))
assertGlutenShuffleReplicateNLJoin(
sql(nonEquiJoinQueryWithHint("SHUFFLE_HASH(t2)" :: "SHUFFLE_REPLICATE_NL(t1)" :: Nil)))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ package org.apache.spark.sql

class GlutenTPCDSQuerySuite extends TPCDSQuerySuite with GlutenSQLTestsTrait {}

class GlutenTPCDSQueryWithStatsSuite extends TPCDSQueryWithStatsSuite with GlutenTestsCommonTrait {}
class GlutenTPCDSQueryWithStatsSuite extends TPCDSQueryWithStatsSuite with GlutenSQLTestsTrait {}

class GlutenTPCDSQueryANSISuite extends TPCDSQueryANSISuite with GlutenTestsCommonTrait {}
class GlutenTPCDSQueryANSISuite extends TPCDSQueryANSISuite with GlutenSQLTestsTrait {}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,105 @@
*/
package org.apache.spark.sql.execution

import org.apache.gluten.execution.FileSourceScanExecTransformer

import org.apache.spark.sql.GlutenSQLTestsTrait

import org.apache.hadoop.fs.Path

class GlutenDataSourceScanExecRedactionSuite
extends DataSourceScanExecRedactionSuite
with GlutenSQLTestsTrait {}
with GlutenSQLTestsTrait {

// Gluten replaces FileSourceScanExec with FileSourceScanExecTransformer,
// so "FileScan" is not in the explain output.
testGluten("explain is redacted using SQLConf") {
withTempDir {
dir =>
val basePath = dir.getCanonicalPath
spark.range(0, 10).toDF("a").write.orc(new Path(basePath, "foo=1").toString)
val df = spark.read.orc(basePath)
val replacement = "*********"

assert(isIncluded(df.queryExecution, replacement))
assert(isIncluded(df.queryExecution, "FileSourceScanExecTransformer"))
assert(!isIncluded(df.queryExecution, "file:/"))
}
}

// Gluten replaces FileSourceScanExec with FileSourceScanExecTransformer
testGluten("SPARK-31793: FileSourceScanExec metadata should contain limited file paths") {
withTempPath {
path =>
val dataDirName = scala.util.Random.alphanumeric.take(100).toList.mkString
val dataDir = new java.io.File(path, dataDirName)
dataDir.mkdir()

val partitionCol = "partitionCol"
spark
.range(10)
.select("id", "id")
.toDF("value", partitionCol)
.write
.partitionBy(partitionCol)
.orc(dataDir.getCanonicalPath)
val paths =
(0 to 9).map(i => new java.io.File(dataDir, s"$partitionCol=$i").getCanonicalPath)
val plan = spark.read.orc(paths: _*).queryExecution.executedPlan
val location = plan.collectFirst {
case f: FileSourceScanExecTransformer => f.metadata("Location")
}
assert(location.isDefined)
assert(location.get.contains(paths.head))
assert(location.get.contains("(10 paths)"))
assert(location.get.indexOf('[') > -1)
assert(location.get.indexOf(']') > -1)

val pathsInLocation = location.get
.substring(location.get.indexOf('[') + 1, location.get.indexOf(']'))
.split(", ")
.toSeq
assert(pathsInLocation.size == 2)
assert(pathsInLocation.exists(_.contains("...")))
}
}
}

class GlutenDataSourceV2ScanExecRedactionSuite
extends DataSourceV2ScanExecRedactionSuite
with GlutenSQLTestsTrait {}
with GlutenSQLTestsTrait {

// Gluten replaces BatchScanExec, so "BatchScan orc" is not in explain output.
testGluten("explain is redacted using SQLConf") {
withTempDir {
dir =>
val basePath = dir.getCanonicalPath
spark.range(0, 10).toDF("a").write.orc(new Path(basePath, "foo=1").toString)
val df = spark.read.orc(basePath)
val replacement = "*********"

assert(isIncluded(df.queryExecution, replacement))
assert(isIncluded(df.queryExecution, "BatchScanExecTransformer"))
assert(!isIncluded(df.queryExecution, "file:/"))
}
}

// Gluten replaces BatchScanExec with BatchScanExecTransformer (orc/parquet only, json falls back)
testGluten("FileScan description") {
Seq("orc", "parquet").foreach {
format =>
withTempPath {
path =>
val dir = path.getCanonicalPath
spark.range(0, 10).write.format(format).save(dir)
val df = spark.read.format(format).load(dir)
withClue(s"Source '$format':") {
assert(isIncluded(df.queryExecution, "ReadSchema"))
assert(isIncluded(df.queryExecution, "BatchScanExecTransformer"))
assert(isIncluded(df.queryExecution, "PushedFilters"))
assert(isIncluded(df.queryExecution, "Location"))
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,77 @@
*/
package org.apache.spark.sql.execution

import org.apache.gluten.execution.{ColumnarCollectLimitBaseExec, LimitExecTransformer, TakeOrderedAndProjectExecTransformer}

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.SQLConf

class GlutenInsertSortForLimitAndOffsetSuite
extends InsertSortForLimitAndOffsetSuite
with GlutenSQLTestsTrait {}
with GlutenSQLTestsTrait {

private def glutenHasTopKSort(plan: SparkPlan): Boolean = {
find(plan) {
case _: TakeOrderedAndProjectExec => true
case _: TakeOrderedAndProjectExecTransformer => true
case _ => false
}.isDefined
}

private def glutenHasCollectLimit(plan: SparkPlan): Boolean = {
find(plan) {
case _: CollectLimitExec => true
case _: LimitExecTransformer => true
case _: ColumnarCollectLimitBaseExec => true
case _ => false
}.isDefined
}

testGluten("root LIMIT preserves data ordering with top-K sort") {
val df = spark.range(10).orderBy(col("id") % 8).limit(2)
df.collect()
assert(glutenHasTopKSort(df.queryExecution.executedPlan))
}

testGluten("middle LIMIT preserves data ordering with top-K sort") {
val df = spark.range(10).orderBy(col("id") % 8).limit(2).distinct()
df.collect()
assert(glutenHasTopKSort(df.queryExecution.executedPlan))
}

testGluten("root LIMIT preserves data ordering with CollectLimitExec") {
withSQLConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1") {
val df = spark.range(10).orderBy(col("id") % 8).limit(2)
df.collect()
assert(glutenHasCollectLimit(df.queryExecution.executedPlan))
}
}

testGluten("middle LIMIT preserves data ordering with the extra sort") {
withSQLConf(
SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1",
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "false") {
val df =
spark.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("c1", "c2").orderBy(col("c1") % 8)
val shuffled = df.limit(2).distinct()
shuffled.collect()
// Verify the query produces correct results (ordering preserved)
assert(shuffled.count() <= 2)
}
}

testGluten("root OFFSET preserves data ordering with CollectLimitExec") {
val df = spark.range(10).orderBy(col("id") % 8).offset(2)
df.collect()
assert(glutenHasCollectLimit(df.queryExecution.executedPlan))
}

testGluten("middle OFFSET preserves data ordering with the extra sort") {
val df =
spark.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("c1", "c2").orderBy(col("c1") % 8)
val shuffled = df.offset(2).distinct()
shuffled.collect()
assert(shuffled.count() >= 0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,37 @@
*/
package org.apache.spark.sql.execution

import org.apache.gluten.execution.{ColumnarToRowExecBase => GlutenC2R}

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.internal.SQLConf

class GlutenSparkPlanSuite extends SparkPlanSuite with GlutenSQLTestsTrait {

class GlutenSparkPlanSuite extends SparkPlanSuite with GlutenSQLTestsTrait {}
testGluten(
"SPARK-37779: ColumnarToRowExec should be canonicalizable after being (de)serialized") {
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
withTempPath {
path =>
spark.range(1).write.parquet(path.getAbsolutePath)
val df = spark.read.parquet(path.getAbsolutePath)
// Gluten replaces ColumnarToRowExec with VeloxColumnarToRowExec
val c2r = df.queryExecution.executedPlan
.collectFirst { case p: GlutenC2R => p }
.orElse(df.queryExecution.executedPlan
.collectFirst { case p: ColumnarToRowExec => p })
.get
try {
spark.range(1).foreach {
_ =>
c2r.canonicalized
()
}
} catch {
case e: Throwable =>
fail("ColumnarToRow was not canonicalizable", e)
}
}
}
}
}
Loading
Loading