Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 13 additions & 32 deletions velox/functions/sparksql/aggregates/CollectSetAggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,19 +161,10 @@ class SparkCollectSetAggregate
SBase::clearNull(group);
auto tracker = SBase::trackRowSize(group);
auto decodedIndex = SBase::decoded_.index(i);
if (ignoreNulls_) {
SBase::value(group)->addNonNullValues(
*baseArray,
decodedIndex,
SBase::decodedElements_,
SBase::allocator_);
} else {
SBase::value(group)->addValues(
*baseArray,
decodedIndex,
SBase::decodedElements_,
SBase::allocator_);
}
// Intermediate results already have null filtering applied by the
// partial step. Always preserve all elements (including nulls) here.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intermediate results already have null filtering applied by the partial step.

Velox supports flushing during partial aggregation. When this happens, the intermediate results are left unaggregated, with the final aggregation step responsible for processing them. Could this cause any result issues?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! This is safe because:

  1. When partial flushes (toIntermediate is called), null filtering is already applied there:

    • ignoreNulls_=true: null inputs become empty arrays (size=0), so the intermediate output contains no null elements
    • ignoreNulls_=false: Base::toIntermediate wraps each value (including nulls) into [value] arrays
  2. Final step receives pre-filtered data: Since toIntermediate already handles null filtering, addIntermediateResults just needs to merge arrays — using addValues (preserve everything) is correct for both cases.

  3. No behavior change: Before this fix, the final/intermediate nodes had ignoreNulls_={false} by default, which also always used addValues in the intermediate path. My change makes this explicit and removes the dead addNonNullValues branch that was never reachable in the final step.

SBase::value(group)->addValues(
*baseArray, decodedIndex, SBase::decodedElements_, SBase::allocator_);
});
}

Expand All @@ -193,29 +184,19 @@ class SparkCollectSetAggregate
}
SBase::clearNull(group);
auto decodedIndex = SBase::decoded_.index(i);
if (ignoreNulls_) {
accumulator->addNonNullValues(
*baseArray,
decodedIndex,
SBase::decodedElements_,
SBase::allocator_);
} else {
accumulator->addValues(
*baseArray,
decodedIndex,
SBase::decodedElements_,
SBase::allocator_);
}
// Intermediate results already have null filtering applied by the
// partial step. Always preserve all elements (including nulls) here.
accumulator->addValues(
*baseArray, decodedIndex, SBase::decodedElements_, SBase::allocator_);
});
}

private:
// Initialized via setConstantInputs() from the constant boolean argument.
// Default is false (conservative: keeps nulls). In partial+final mode,
// the final node doesn't receive the boolean constant, so it uses this
// default — which is safe because the partial node already handles null
// filtering based on the actual constant value.
bool ignoreNulls_{false};
// Default to true (Spark's default: IGNORE NULLS). Updated by
// setConstantInputs() when a 2-arg signature provides explicit value.
// Only used in addRawInput (partial/single step); intermediate/final
// steps always preserve all elements from the partial output.
bool ignoreNulls_{true};
};

std::unique_ptr<exec::Aggregate> createSetAgg(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,33 @@ TEST_F(CollectSetAggregateTest, unknownType) {
testAggregations({data}, {"c0"}, {"collect_set(c0, true)"}, {}, {expected});
}

// Verify the 1-arg collect_set(c0) defaults to ignoring nulls (Spark default).
TEST_F(CollectSetAggregateTest, defaultIgnoreNulls) {
auto data = makeRowVector({
makeNullableFlatVector<int32_t>(
{1, 2, std::nullopt, 4, 5, std::nullopt, 4, 2}),
});

auto expected = makeRowVector({
makeArrayVectorFromJson<int32_t>({"[1, 2, 4, 5]"}),
});

// 1-arg signature: no explicit ignoreNulls boolean.
testAggregations(
{data}, {}, {"collect_set(c0)"}, {"spark_array_sort(a0)"}, {expected});

// All null inputs — returns empty array (nulls ignored).
data = makeRowVector({
makeAllNullFlatVector<int32_t>(5),
});

expected = makeRowVector({
makeArrayVectorFromJson<int32_t>({"[]"}),
});

testAggregations({data}, {}, {"collect_set(c0)"}, {}, {expected});
}

// Verify that collect_set(c0, true) correctly ignores null inputs.
TEST_F(CollectSetAggregateTest, explicitIgnoreNullsTrue) {
auto data = makeRowVector({
Expand Down
Loading