Skip to content

feat(datafusion): support fetch contract in PaimonTableScan#224

Open
QuakeWang wants to merge 8 commits intoapache:mainfrom
QuakeWang:feat/datafusion-fetch-contract
Open

feat(datafusion): support fetch contract in PaimonTableScan#224
QuakeWang wants to merge 8 commits intoapache:mainfrom
QuakeWang:feat/datafusion-fetch-contract

Conversation

@QuakeWang
Copy link
Copy Markdown
Contributor

@QuakeWang QuakeWang commented Apr 7, 2026

Purpose

Linked issue: close #220

Align DataFusion scan pruning with the core-owned limit-hint design.

Instead of introducing a DataFusion-only fetch contract, this PR now reuses core-owned split pruning by passing a conservative limit hint into read_builder.with_limit(...) when it is safe to do so.

Brief change log

  • keep the shared split-pruning rule in table_scan
  • reuse that core-owned pruning from DataFusion via read_builder.with_limit(...)
  • only push a limit hint when there are no filters or all pushed filters are Exact
  • fail open for inexact or residual filters instead of adding a scan-side exact fetch contract
  • simplify PaimonTableScan back to regular scan execution without scan-side fetch handling
  • update plan and integration tests to cover safe limit hints and the absence of scan-side fetch=...

Tests

API and Format

Documentation

fetch: Option<usize>,
) -> PartitionPlan {
if let Some(fetch) = fetch {
let selected_splits = prune_splits_by_limit_hint(full_splits.iter().cloned(), Some(fetch));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use read_builder.with_limit here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not switch this to read_builder.with_limit(...) here because the provider stage needs to retain the full split plan for later physical with_fetch() rebuilds. For OFFSET + LIMIT, DataFusion may push skip + fetch, so narrowing the provider scan too early would lose the extra skip budget. I added a short comment here to make that intent explicit.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe can we support fetch in the table_scan?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a reasonable direction longer-term, and that is also why I moved the shared split-pruning rule into table_scan as prune_splits_by_limit_hint(...).

For this PR, I kept the full physical fetch contract in the DataFusion layer because it also needs DataFusion-specific behavior (with_fetch(), repartitioned scan planning, per-partition fetch budgets, and exact stream truncation). Moving all of that into table_scan would expand the scope beyond issue #220.

If we later want a cleaner core abstraction for fetch-aware split planning, I’d be happy to follow up on that separately.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think it should be placed on table_scan. If you think it's a meaningful feature, otherwise it's just a standalone ability for datafusion, then its significance is not significant.

BTW, currently, there is no engine that supports fetch optimization.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I aligned the latest commit with this direction. The PR no longer introduces a DataFusion-only fetch contract: TableProvider::scan() now only passes a conservative limit hint via read_builder.with_limit(...) when there are no filters or all pushed filters are Exact, and PaimonTableScan no longer carries with_fetch() / per-partition fetch budgets / exact stream truncation.

I also updated the plan tests to assert scan-side limit=... without scan-side fetch=... for OFFSET + LIMIT. So the remaining change is just core-owned limit-hint pruning reused by DataFusion, not a standalone fetch optimization feature. If we want true fetch optimization later, I agree it should be introduced from table_scan first.

/// limit is unknown. Planning may still stop early later if the
/// accumulated known `merged_row_count()` reaches the limit, and the
/// caller or query engine must enforce the final LIMIT.
pub fn prune_splits_by_limit_hint(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to modify this? Just let it as it is?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.

I kept this logic internal in TableScan and removed the extra public helper. I also added the regression coverage back for limit == 0 and unknown merged_row_count() so the original limit-hint behavior stays unchanged.

///
/// This stays intentionally narrow: the hint is only safe when there are no
/// filters, or when every filter is exact at the table-provider boundary.
pub(crate) fn can_push_down_limit_hint(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think table_scan has already made this judgment.

) -> bool {
filters.is_empty()
|| filters.iter().all(|filter| {
classify_filter_pushdown(filter, fields, partition_keys)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another point is, can we add this judgment to read_builder.with_filter? We can clean datafusion connector here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the scan-owned part of this judgment into paimon-core. The remaining DataFusion-side check is only for residual filters that are not visible to ReadBuilder::with_filter(...), because with_filter only receives the translated Paimon Predicate.

Unsupported or inexact DataFusion filters may still remain above the scan, and that residual part is still needed to decide whether LIMIT can be pushed safely. So I kept only this residual-filter guard in the DataFusion connector.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just provide a method in read_builder.rs, exacted_filter_pushdown return bool.
You can just use this method to datafusion supports_filters_pushdown. For limit, you can just push it always.

And then what else do we need to do?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support core-owned OFFSET + LIMIT scan pruning for DataFusion

2 participants