Skip to content
64 changes: 64 additions & 0 deletions crates/integrations/datafusion/src/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,22 @@ pub(crate) fn build_pushed_predicate(filters: &[Expr], fields: &[DataField]) ->
}
}

/// Whether it is safe to pass a row-count hint down to paimon-core planning.
///
/// This stays intentionally narrow: the hint is only safe when there are no
/// filters, or when every filter is exact at the table-provider boundary.
pub(crate) fn can_push_down_limit_hint(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think table_scan has already made this judgment.

filters: &[Expr],
fields: &[DataField],
partition_keys: &[String],
) -> bool {
filters.is_empty()
|| filters.iter().all(|filter| {
classify_filter_pushdown(filter, fields, partition_keys)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another point is, can we add this judgment to read_builder.with_filter? We can clean datafusion connector here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the scan-owned part of this judgment into paimon-core. The remaining DataFusion-side check is only for residual filters that are not visible to ReadBuilder::with_filter(...), because with_filter only receives the translated Paimon Predicate.

Unsupported or inexact DataFusion filters may still remain above the scan, and that residual part is still needed to decide whether LIMIT can be pushed safely. So I kept only this residual-filter guard in the DataFusion connector.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just provide a method in read_builder.rs, exacted_filter_pushdown return bool.
You can just use this method to datafusion supports_filters_pushdown. For limit, you can just push it always.

And then what else do we need to do?

== TableProviderFilterPushDown::Exact
})
}

fn split_conjunction(expr: &Expr) -> Vec<&Expr> {
match expr {
Expr::BinaryExpr(BinaryExpr {
Expand Down Expand Up @@ -392,6 +408,54 @@ mod tests {
);
}

#[test]
fn test_can_push_down_limit_hint_without_filters() {
let fields = test_fields();

assert!(can_push_down_limit_hint(&[], &fields, &partition_keys()));
}

#[test]
fn test_can_push_down_limit_hint_for_exact_filters() {
let fields = test_fields();
let filters = vec![
Expr::Column(Column::from_name("dt")).eq(lit("2024-01-01")),
Expr::Column(Column::from_name("hr")).eq(lit(10)),
];

assert!(can_push_down_limit_hint(
&filters,
&fields,
&partition_keys()
));
}

#[test]
fn test_can_push_down_limit_hint_rejects_inexact_filters() {
let fields = test_fields();
let filters = vec![Expr::Column(Column::from_name("id")).gt(lit(10))];

assert!(!can_push_down_limit_hint(
&filters,
&fields,
&partition_keys()
));
}

#[test]
fn test_can_push_down_limit_hint_rejects_unsupported_filters() {
let fields = test_fields();
let filters = vec![Expr::Not(Box::new(
Expr::Column(Column::from_name("dt")).eq(lit("2024-01-01")),
))];

assert!(!can_push_down_limit_hint(
&filters,
&fields,
&partition_keys()
));
}

#[test]
fn test_translate_reversed_partition_comparison() {
let fields = test_fields();
Expand Down
1 change: 0 additions & 1 deletion crates/integrations/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,5 @@ mod table;

pub use catalog::{PaimonCatalogProvider, PaimonSchemaProvider};
pub use error::to_datafusion_error;
pub use physical_plan::PaimonTableScan;
pub use relation_planner::PaimonRelationPlanner;
pub use table::PaimonTableProvider;
2 changes: 1 addition & 1 deletion crates/integrations/datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@

pub(crate) mod scan;

pub use scan::PaimonTableScan;
pub(crate) use scan::PaimonTableScan;
11 changes: 4 additions & 7 deletions crates/integrations/datafusion/src/physical_plan/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::error::to_datafusion_error;
/// and the resulting splits are distributed across DataFusion execution partitions
/// so that DataFusion can schedule them in parallel.
#[derive(Debug)]
pub struct PaimonTableScan {
pub(crate) struct PaimonTableScan {
table: Table,
/// Projected column names (if None, reads all columns).
projected_columns: Option<Vec<String>>,
Expand All @@ -52,7 +52,7 @@ pub struct PaimonTableScan {
/// Wrapped in `Arc` to avoid deep-cloning `DataSplit` metadata in `execute()`.
planned_partitions: Vec<Arc<[DataSplit]>>,
plan_properties: PlanProperties,
/// Optional limit on the number of rows to return.
/// Optional limit hint pushed to paimon-core planning.
limit: Option<usize>,
}

Expand Down Expand Up @@ -81,10 +81,6 @@ impl PaimonTableScan {
}
}

pub fn table(&self) -> &Table {
&self.table
}

#[cfg(test)]
pub(crate) fn planned_partitions(&self) -> &[Arc<[DataSplit]>] {
&self.planned_partitions
Expand All @@ -95,7 +91,8 @@ impl PaimonTableScan {
self.pushed_predicate.as_ref()
}

pub fn limit(&self) -> Option<usize> {
#[cfg(test)]
pub(crate) fn limit(&self) -> Option<usize> {
self.limit
}
}
Expand Down
85 changes: 69 additions & 16 deletions crates/integrations/datafusion/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use datafusion::physical_plan::ExecutionPlan;
use paimon::table::Table;

use crate::error::to_datafusion_error;
use crate::filter_pushdown::{build_pushed_predicate, classify_filter_pushdown};
use crate::filter_pushdown::{
build_pushed_predicate, can_push_down_limit_hint, classify_filter_pushdown,
};
use crate::physical_plan::PaimonTableScan;
use crate::runtime::await_with_runtime;

Expand Down Expand Up @@ -76,8 +78,8 @@ impl PaimonTableProvider {
/// Distribute `items` into `num_buckets` groups using round-robin assignment.
fn bucket_round_robin<T>(items: Vec<T>, num_buckets: usize) -> Vec<Vec<T>> {
let mut buckets: Vec<Vec<T>> = (0..num_buckets).map(|_| Vec::new()).collect();
for (i, item) in items.into_iter().enumerate() {
buckets[i % num_buckets].push(item);
for (index, item) in items.into_iter().enumerate() {
buckets[index % num_buckets].push(item);
}
buckets
}
Expand Down Expand Up @@ -126,9 +128,14 @@ impl TableProvider for PaimonTableProvider {
if let Some(filter) = pushed_predicate.clone() {
read_builder.with_filter(filter);
}
// Push the limit hint to paimon-core planning to reduce splits when possible.
// DataFusion still enforces the final LIMIT semantics.
if let Some(limit) = limit {
let pushed_limit = limit.filter(|_| {
can_push_down_limit_hint(
filters,
self.table.schema().fields(),
self.table.schema().partition_keys(),
)
});
if let Some(limit) = pushed_limit {
read_builder.with_limit(limit);
}
let scan = read_builder.new_scan();
Expand Down Expand Up @@ -162,7 +169,7 @@ impl TableProvider for PaimonTableProvider {
projected_columns,
pushed_predicate,
planned_partitions,
limit,
pushed_limit,
)))
}

Expand Down Expand Up @@ -238,14 +245,9 @@ mod tests {
async fn plan_partitions(
provider: &PaimonTableProvider,
filters: Vec<Expr>,
limit: Option<usize>,
) -> Vec<Arc<[DataSplit]>> {
let config = SessionConfig::new().with_target_partitions(8);
let ctx = SessionContext::new_with_config(config);
let state = ctx.state();
let plan = provider
.scan(&state, None, &filters, None)
.await
.expect("scan() should succeed");
let plan = plan_scan(provider, filters, limit).await;
let scan = plan
.as_any()
.downcast_ref::<PaimonTableScan>()
Expand All @@ -254,6 +256,20 @@ mod tests {
scan.planned_partitions().to_vec()
}

async fn plan_scan(
provider: &PaimonTableProvider,
filters: Vec<Expr>,
limit: Option<usize>,
) -> Arc<dyn ExecutionPlan> {
let config = SessionConfig::new().with_target_partitions(8);
let ctx = SessionContext::new_with_config(config);
let state = ctx.state();
provider
.scan(&state, None, &filters, limit)
.await
.expect("scan() should succeed")
}

fn extract_dt_partition_set(planned_partitions: &[Arc<[DataSplit]>]) -> BTreeSet<String> {
planned_partitions
.iter()
Expand Down Expand Up @@ -291,7 +307,7 @@ mod tests {
async fn test_scan_partition_filter_plans_matching_partition_set() {
let provider = create_provider("partitioned_log_table").await;
let planned_partitions =
plan_partitions(&provider, vec![col("dt").eq(lit("2024-01-01"))]).await;
plan_partitions(&provider, vec![col("dt").eq(lit("2024-01-01"))], None).await;

assert_eq!(
extract_dt_partition_set(&planned_partitions),
Expand All @@ -305,6 +321,7 @@ mod tests {
let planned_partitions = plan_partitions(
&provider,
vec![col("dt").eq(lit("2024-01-01")).and(col("id").gt(lit(1)))],
None,
)
.await;

Expand All @@ -319,10 +336,11 @@ mod tests {
let provider = create_provider("multi_partitioned_log_table").await;

let dt_only_partitions =
plan_partitions(&provider, vec![col("dt").eq(lit("2024-01-01"))]).await;
plan_partitions(&provider, vec![col("dt").eq(lit("2024-01-01"))], None).await;
let dt_hr_partitions = plan_partitions(
&provider,
vec![col("dt").eq(lit("2024-01-01")).and(col("hr").eq(lit(10)))],
None,
)
.await;

Expand Down Expand Up @@ -361,4 +379,39 @@ mod tests {

assert_eq!(scan.pushed_predicate(), Some(&expected));
}

#[tokio::test]
async fn test_scan_applies_limit_hint_only_when_safe() {
let provider = create_provider("partitioned_log_table").await;
let full_plan = plan_partitions(&provider, vec![], None).await;
let plan = plan_scan(&provider, vec![], Some(1)).await;
let scan = plan
.as_any()
.downcast_ref::<PaimonTableScan>()
.expect("Expected PaimonTableScan");

assert_eq!(scan.limit(), Some(1));
assert!(
scan.planned_partitions()
.iter()
.map(|partition| partition.len())
.sum::<usize>()
< full_plan
.iter()
.map(|partition| partition.len())
.sum::<usize>()
);
}

#[tokio::test]
async fn test_scan_skips_limit_hint_for_inexact_filters() {
let provider = create_provider("partitioned_log_table").await;
let plan = plan_scan(&provider, vec![col("id").gt(lit(1))], Some(1)).await;
let scan = plan
.as_any()
.downcast_ref::<PaimonTableScan>()
.expect("Expected PaimonTableScan");

assert_eq!(scan.limit(), None);
}
}
Loading
Loading