diff --git a/crates/integrations/datafusion/src/filter_pushdown.rs b/crates/integrations/datafusion/src/filter_pushdown.rs index 91c65d3..55250a5 100644 --- a/crates/integrations/datafusion/src/filter_pushdown.rs +++ b/crates/integrations/datafusion/src/filter_pushdown.rs @@ -20,15 +20,72 @@ use datafusion::logical_expr::expr::InList; use datafusion::logical_expr::{Between, BinaryExpr, Expr, Operator, TableProviderFilterPushDown}; use paimon::spec::{DataField, DataType, Datum, Predicate, PredicateBuilder}; -pub(crate) fn classify_filter_pushdown( +#[derive(Debug)] +struct SingleFilterAnalysis { + translated_predicates: Vec, + has_untranslated_residual: bool, +} + +#[derive(Debug)] +pub(crate) struct FilterPushdownAnalysis { + pub(crate) pushed_predicate: Option, + pub(crate) has_untranslated_residual: bool, +} + +fn analyze_filter(filter: &Expr, fields: &[DataField]) -> SingleFilterAnalysis { + let translator = FilterTranslator::new(fields); + if let Some(predicate) = translator.translate(filter) { + return SingleFilterAnalysis { + translated_predicates: vec![predicate], + has_untranslated_residual: false, + }; + } + + SingleFilterAnalysis { + translated_predicates: split_conjunction(filter) + .into_iter() + .filter_map(|expr| translator.translate(expr)) + .collect(), + has_untranslated_residual: true, + } +} + +pub(crate) fn analyze_filters(filters: &[Expr], fields: &[DataField]) -> FilterPushdownAnalysis { + let mut translated_predicates = Vec::new(); + let mut has_untranslated_residual = false; + + for filter in filters { + let analysis = analyze_filter(filter, fields); + translated_predicates.extend(analysis.translated_predicates); + has_untranslated_residual |= analysis.has_untranslated_residual; + } + + FilterPushdownAnalysis { + pushed_predicate: if translated_predicates.is_empty() { + None + } else { + Some(Predicate::and(translated_predicates)) + }, + has_untranslated_residual, + } +} + +#[cfg(test)] +pub(crate) fn build_pushed_predicate(filters: &[Expr], fields: &[DataField]) -> Option { + analyze_filters(filters, fields).pushed_predicate +} + +pub(crate) fn classify_filter_pushdown( filter: &Expr, fields: &[DataField], - partition_keys: &[String], -) -> TableProviderFilterPushDown { + is_exact_filter_pushdown: F, +) -> TableProviderFilterPushDown +where + F: Fn(&Predicate) -> bool, +{ let translator = FilterTranslator::new(fields); - if translator.translate(filter).is_some() { - let partition_translator = FilterTranslator::for_allowed_columns(fields, partition_keys); - if partition_translator.translate(filter).is_some() { + if let Some(predicate) = translator.translate(filter) { + if is_exact_filter_pushdown(&predicate) { TableProviderFilterPushDown::Exact } else { TableProviderFilterPushDown::Inexact @@ -43,21 +100,6 @@ pub(crate) fn classify_filter_pushdown( } } -pub(crate) fn build_pushed_predicate(filters: &[Expr], fields: &[DataField]) -> Option { - let translator = FilterTranslator::new(fields); - let pushed: Vec<_> = filters - .iter() - .flat_map(split_conjunction) - .filter_map(|filter| translator.translate(filter)) - .collect(); - - if pushed.is_empty() { - None - } else { - Some(Predicate::and(pushed)) - } -} - fn split_conjunction(expr: &Expr) -> Vec<&Expr> { match expr { Expr::BinaryExpr(BinaryExpr { @@ -75,7 +117,6 @@ fn split_conjunction(expr: &Expr) -> Vec<&Expr> { struct FilterTranslator<'a> { fields: &'a [DataField], - allowed_columns: Option<&'a [String]>, predicate_builder: PredicateBuilder, } @@ -83,15 +124,6 @@ impl<'a> FilterTranslator<'a> { fn new(fields: &'a [DataField]) -> Self { Self { fields, - allowed_columns: None, - predicate_builder: PredicateBuilder::new(fields), - } - } - - fn for_allowed_columns(fields: &'a [DataField], allowed_columns: &'a [String]) -> Self { - Self { - fields, - allowed_columns: Some(allowed_columns), predicate_builder: PredicateBuilder::new(fields), } } @@ -240,12 +272,6 @@ impl<'a> FilterTranslator<'a> { return None; }; - if let Some(allowed_columns) = self.allowed_columns { - if !allowed_columns.iter().any(|column| column == name) { - return None; - } - } - self.fields.iter().find(|field| field.name() == name) } } @@ -352,22 +378,39 @@ mod tests { use super::*; use datafusion::common::Column; use datafusion::logical_expr::{expr::InList, lit, TableProviderFilterPushDown}; - use paimon::spec::{IntType, VarCharType}; + use paimon::catalog::Identifier; + use paimon::io::FileIOBuilder; + use paimon::spec::{IntType, Schema, TableSchema, VarCharType}; + use paimon::table::Table; + + fn test_table() -> Table { + let file_io = FileIOBuilder::new("file").build().unwrap(); + let table_schema = TableSchema::new( + 0, + &Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("dt", DataType::VarChar(VarCharType::string_type())) + .column("hr", DataType::Int(IntType::new())) + .partition_keys(["dt", "hr"]) + .build() + .unwrap(), + ); + Table::new( + file_io, + Identifier::new("default", "t"), + "/tmp/test-filter-pushdown".to_string(), + table_schema, + ) + } fn test_fields() -> Vec { - vec![ - DataField::new(0, "id".to_string(), DataType::Int(IntType::new())), - DataField::new( - 1, - "dt".to_string(), - DataType::VarChar(VarCharType::string_type()), - ), - DataField::new(2, "hr".to_string(), DataType::Int(IntType::new())), - ] + test_table().schema().fields().to_vec() } - fn partition_keys() -> Vec { - vec!["dt".to_string(), "hr".to_string()] + fn is_exact_filter_pushdown(predicate: &Predicate) -> bool { + test_table() + .new_read_builder() + .is_exact_filter_pushdown(predicate) } #[test] @@ -387,11 +430,59 @@ mod tests { let filter = Expr::Column(Column::from_name("dt")).eq(lit("2024-01-01")); assert_eq!( - classify_filter_pushdown(&filter, &fields, &partition_keys()), + classify_filter_pushdown(&filter, &fields, is_exact_filter_pushdown), TableProviderFilterPushDown::Exact ); } + #[test] + fn test_analyze_filters_for_supported_data_filter_has_no_untranslated_residual() { + let fields = test_fields(); + let filters = vec![Expr::Column(Column::from_name("id")).gt(lit(10))]; + let analysis = analyze_filters(&filters, &fields); + + assert_eq!( + analysis + .pushed_predicate + .expect("data filter should translate") + .to_string(), + "id > 10" + ); + assert!(!analysis.has_untranslated_residual); + } + + #[test] + fn test_analyze_filters_marks_partial_translation_as_untranslated_residual() { + let fields = test_fields(); + let filters = vec![Expr::Column(Column::from_name("dt")) + .eq(lit("2024-01-01")) + .and(Expr::Not(Box::new( + Expr::Column(Column::from_name("hr")).eq(lit(10)), + )))]; + let analysis = analyze_filters(&filters, &fields); + + assert_eq!( + analysis + .pushed_predicate + .expect("supported conjunct should still translate") + .to_string(), + "dt = '2024-01-01'" + ); + assert!(analysis.has_untranslated_residual); + } + + #[test] + fn test_analyze_filters_marks_unsupported_filter_as_untranslated_residual() { + let fields = test_fields(); + let filters = vec![Expr::Not(Box::new( + Expr::Column(Column::from_name("dt")).eq(lit("2024-01-01")), + ))]; + let analysis = analyze_filters(&filters, &fields); + + assert!(analysis.pushed_predicate.is_none()); + assert!(analysis.has_untranslated_residual); + } + #[test] fn test_translate_reversed_partition_comparison() { let fields = test_fields(); @@ -448,7 +539,7 @@ mod tests { let filter = Expr::Column(Column::from_name("id")).gt(lit(10)); assert_eq!( - classify_filter_pushdown(&filter, &fields, &partition_keys()), + classify_filter_pushdown(&filter, &fields, is_exact_filter_pushdown), TableProviderFilterPushDown::Inexact ); } @@ -474,7 +565,7 @@ mod tests { .and(Expr::Column(Column::from_name("id")).gt(lit(10))); assert_eq!( - classify_filter_pushdown(&filter, &fields, &partition_keys()), + classify_filter_pushdown(&filter, &fields, is_exact_filter_pushdown), TableProviderFilterPushDown::Inexact ); } @@ -500,7 +591,7 @@ mod tests { )); assert_eq!( - classify_filter_pushdown(&filter, &fields, &partition_keys()), + classify_filter_pushdown(&filter, &fields, is_exact_filter_pushdown), TableProviderFilterPushDown::Unsupported ); } diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index 8454bf7..1ae212f 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -46,6 +46,5 @@ mod table; pub use catalog::{PaimonCatalogProvider, PaimonSchemaProvider}; pub use error::to_datafusion_error; -pub use physical_plan::PaimonTableScan; pub use relation_planner::PaimonRelationPlanner; pub use table::PaimonTableProvider; diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs b/crates/integrations/datafusion/src/physical_plan/mod.rs index 48aa546..1f06658 100644 --- a/crates/integrations/datafusion/src/physical_plan/mod.rs +++ b/crates/integrations/datafusion/src/physical_plan/mod.rs @@ -17,4 +17,4 @@ pub(crate) mod scan; -pub use scan::PaimonTableScan; +pub(crate) use scan::PaimonTableScan; diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index 1a181eb..7389a44 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -40,7 +40,7 @@ use crate::error::to_datafusion_error; /// and the resulting splits are distributed across DataFusion execution partitions /// so that DataFusion can schedule them in parallel. #[derive(Debug)] -pub struct PaimonTableScan { +pub(crate) struct PaimonTableScan { table: Table, /// Projected column names (if None, reads all columns). projected_columns: Option>, @@ -52,7 +52,7 @@ pub struct PaimonTableScan { /// Wrapped in `Arc` to avoid deep-cloning `DataSplit` metadata in `execute()`. planned_partitions: Vec>, plan_properties: PlanProperties, - /// Optional limit on the number of rows to return. + /// Optional limit hint pushed to paimon-core planning. limit: Option, } @@ -81,10 +81,6 @@ impl PaimonTableScan { } } - pub fn table(&self) -> &Table { - &self.table - } - #[cfg(test)] pub(crate) fn planned_partitions(&self) -> &[Arc<[DataSplit]>] { &self.planned_partitions @@ -95,7 +91,8 @@ impl PaimonTableScan { self.pushed_predicate.as_ref() } - pub fn limit(&self) -> Option { + #[cfg(test)] + pub(crate) fn limit(&self) -> Option { self.limit } } diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index fdc4f0e..f24f332 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -30,7 +30,9 @@ use datafusion::physical_plan::ExecutionPlan; use paimon::table::Table; use crate::error::to_datafusion_error; -use crate::filter_pushdown::{build_pushed_predicate, classify_filter_pushdown}; +#[cfg(test)] +use crate::filter_pushdown::build_pushed_predicate; +use crate::filter_pushdown::{analyze_filters, classify_filter_pushdown}; use crate::physical_plan::PaimonTableScan; use crate::runtime::await_with_runtime; @@ -76,8 +78,8 @@ impl PaimonTableProvider { /// Distribute `items` into `num_buckets` groups using round-robin assignment. fn bucket_round_robin(items: Vec, num_buckets: usize) -> Vec> { let mut buckets: Vec> = (0..num_buckets).map(|_| Vec::new()).collect(); - for (i, item) in items.into_iter().enumerate() { - buckets[i % num_buckets].push(item); + for (index, item) in items.into_iter().enumerate() { + buckets[index % num_buckets].push(item); } buckets } @@ -121,14 +123,13 @@ impl TableProvider for PaimonTableProvider { }; // Plan splits eagerly so we know partition count upfront. - let pushed_predicate = build_pushed_predicate(filters, self.table.schema().fields()); + let filter_analysis = analyze_filters(filters, self.table.schema().fields()); let mut read_builder = self.table.new_read_builder(); - if let Some(filter) = pushed_predicate.clone() { + if let Some(filter) = filter_analysis.pushed_predicate.clone() { read_builder.with_filter(filter); } - // Push the limit hint to paimon-core planning to reduce splits when possible. - // DataFusion still enforces the final LIMIT semantics. - if let Some(limit) = limit { + let pushed_limit = limit.filter(|_| !filter_analysis.has_untranslated_residual); + if let Some(limit) = pushed_limit { read_builder.with_limit(limit); } let scan = read_builder.new_scan(); @@ -160,9 +161,9 @@ impl TableProvider for PaimonTableProvider { projected_schema, self.table.clone(), projected_columns, - pushed_predicate, + filter_analysis.pushed_predicate, planned_partitions, - limit, + pushed_limit, ))) } @@ -171,11 +172,15 @@ impl TableProvider for PaimonTableProvider { filters: &[&Expr], ) -> DFResult> { let fields = self.table.schema().fields(); - let partition_keys = self.table.schema().partition_keys(); + let read_builder = self.table.new_read_builder(); Ok(filters .iter() - .map(|filter| classify_filter_pushdown(filter, fields, partition_keys)) + .map(|filter| { + classify_filter_pushdown(filter, fields, |predicate| { + read_builder.is_exact_filter_pushdown(predicate) + }) + }) .collect()) } } @@ -238,14 +243,9 @@ mod tests { async fn plan_partitions( provider: &PaimonTableProvider, filters: Vec, + limit: Option, ) -> Vec> { - let config = SessionConfig::new().with_target_partitions(8); - let ctx = SessionContext::new_with_config(config); - let state = ctx.state(); - let plan = provider - .scan(&state, None, &filters, None) - .await - .expect("scan() should succeed"); + let plan = plan_scan(provider, filters, limit).await; let scan = plan .as_any() .downcast_ref::() @@ -254,6 +254,20 @@ mod tests { scan.planned_partitions().to_vec() } + async fn plan_scan( + provider: &PaimonTableProvider, + filters: Vec, + limit: Option, + ) -> Arc { + let config = SessionConfig::new().with_target_partitions(8); + let ctx = SessionContext::new_with_config(config); + let state = ctx.state(); + provider + .scan(&state, None, &filters, limit) + .await + .expect("scan() should succeed") + } + fn extract_dt_partition_set(planned_partitions: &[Arc<[DataSplit]>]) -> BTreeSet { planned_partitions .iter() @@ -291,7 +305,7 @@ mod tests { async fn test_scan_partition_filter_plans_matching_partition_set() { let provider = create_provider("partitioned_log_table").await; let planned_partitions = - plan_partitions(&provider, vec![col("dt").eq(lit("2024-01-01"))]).await; + plan_partitions(&provider, vec![col("dt").eq(lit("2024-01-01"))], None).await; assert_eq!( extract_dt_partition_set(&planned_partitions), @@ -305,6 +319,7 @@ mod tests { let planned_partitions = plan_partitions( &provider, vec![col("dt").eq(lit("2024-01-01")).and(col("id").gt(lit(1)))], + None, ) .await; @@ -319,10 +334,11 @@ mod tests { let provider = create_provider("multi_partitioned_log_table").await; let dt_only_partitions = - plan_partitions(&provider, vec![col("dt").eq(lit("2024-01-01"))]).await; + plan_partitions(&provider, vec![col("dt").eq(lit("2024-01-01"))], None).await; let dt_hr_partitions = plan_partitions( &provider, vec![col("dt").eq(lit("2024-01-01")).and(col("hr").eq(lit(10)))], + None, ) .await; @@ -339,6 +355,39 @@ mod tests { ); } + #[tokio::test] + async fn test_scan_partially_translated_filter_keeps_partition_pruning_but_skips_limit_hint() { + let provider = create_provider("multi_partitioned_log_table").await; + let filter = col("dt") + .eq(lit("2024-01-01")) + .and(Expr::Not(Box::new(col("hr").eq(lit(10))))); + let full_plan = plan_partitions(&provider, vec![filter.clone()], None).await; + let plan = plan_scan(&provider, vec![filter], Some(1)).await; + let scan = plan + .as_any() + .downcast_ref::() + .expect("Expected PaimonTableScan"); + + assert_eq!(scan.limit(), None); + assert_eq!( + extract_dt_hr_partition_set(scan.planned_partitions()), + BTreeSet::from([ + ("2024-01-01".to_string(), 10), + ("2024-01-01".to_string(), 20), + ]), + ); + assert_eq!( + scan.planned_partitions() + .iter() + .map(|partition| partition.len()) + .sum::(), + full_plan + .iter() + .map(|partition| partition.len()) + .sum::() + ); + } + #[tokio::test] async fn test_scan_keeps_pushed_predicate_for_execute() { let provider = create_provider("partitioned_log_table").await; @@ -361,4 +410,51 @@ mod tests { assert_eq!(scan.pushed_predicate(), Some(&expected)); } + + #[tokio::test] + async fn test_scan_applies_limit_hint_only_when_safe() { + let provider = create_provider("partitioned_log_table").await; + let full_plan = plan_partitions(&provider, vec![], None).await; + let plan = plan_scan(&provider, vec![], Some(1)).await; + let scan = plan + .as_any() + .downcast_ref::() + .expect("Expected PaimonTableScan"); + + assert_eq!(scan.limit(), Some(1)); + assert!( + scan.planned_partitions() + .iter() + .map(|partition| partition.len()) + .sum::() + < full_plan + .iter() + .map(|partition| partition.len()) + .sum::() + ); + } + + #[tokio::test] + async fn test_scan_keeps_limit_but_skips_limit_pruning_for_data_filters() { + let provider = create_provider("partitioned_log_table").await; + let filter = col("id").gt(lit(1)); + let full_plan = plan_partitions(&provider, vec![filter.clone()], None).await; + let plan = plan_scan(&provider, vec![filter], Some(1)).await; + let scan = plan + .as_any() + .downcast_ref::() + .expect("Expected PaimonTableScan"); + + assert_eq!(scan.limit(), Some(1)); + assert_eq!( + scan.planned_partitions() + .iter() + .map(|partition| partition.len()) + .sum::(), + full_plan + .iter() + .map(|partition| partition.len()) + .sum::() + ); + } } diff --git a/crates/integrations/datafusion/tests/read_tables.rs b/crates/integrations/datafusion/tests/read_tables.rs index 5a20712..b51264b 100644 --- a/crates/integrations/datafusion/tests/read_tables.rs +++ b/crates/integrations/datafusion/tests/read_tables.rs @@ -22,6 +22,7 @@ use datafusion::arrow::array::{Array, Int32Array, StringArray}; use datafusion::catalog::CatalogProvider; use datafusion::datasource::TableProvider; use datafusion::logical_expr::{col, lit, TableProviderFilterPushDown}; +use datafusion::physical_plan::{displayable, ExecutionPlan}; use datafusion::prelude::{SessionConfig, SessionContext}; use paimon::catalog::Identifier; use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options}; @@ -97,6 +98,14 @@ async fn collect_query( ctx.sql(sql).await?.collect().await } +async fn create_physical_plan( + table_name: &str, + sql: &str, +) -> datafusion::error::Result> { + let ctx = create_context(table_name).await; + ctx.sql(sql).await?.create_physical_plan().await +} + fn extract_id_name_rows( batches: &[datafusion::arrow::record_batch::RecordBatch], ) -> Vec<(i32, String)> { @@ -121,6 +130,17 @@ fn extract_id_name_rows( rows } +fn format_physical_plan(plan: &Arc) -> String { + displayable(plan.as_ref()).indent(true).to_string() +} + +fn paimon_scan_lines(plan_text: &str) -> Vec<&str> { + plan_text + .lines() + .filter(|line| line.contains("PaimonTableScan:")) + .collect() +} + #[tokio::test] async fn test_read_log_table_via_datafusion() { let actual_rows = read_rows("simple_log_table").await; @@ -302,52 +322,185 @@ async fn test_mixed_and_filter_keeps_residual_datafusion_filter() { assert_eq!(actual_rows, vec![(2, "bob".to_string())]); } -/// Test limit pushdown: ensures that LIMIT queries return the correct number of rows. #[tokio::test] -async fn test_limit_pushdown() { - // Test append-only table (simple_log_table) - { - let batches = collect_query( - "simple_log_table", - "SELECT id, name FROM simple_log_table LIMIT 2", - ) - .await - .expect("Limit query should succeed"); - - let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); - assert_eq!(total_rows, 2, "LIMIT 2 should return exactly 2 rows"); - } +async fn test_partially_translated_filter_keeps_partition_pruning_and_correctness() { + let sql = "SELECT id, name FROM multi_partitioned_log_table WHERE dt = '2024-01-01' AND hr + 1 > 20 LIMIT 1"; + let plan = create_physical_plan("multi_partitioned_log_table", sql) + .await + .expect("Physical plan creation should succeed"); + let plan_text = format_physical_plan(&plan); + let scan_lines = paimon_scan_lines(&plan_text); - // Test data evolution table - { - let batches = collect_query( - "data_evolution_table", - "SELECT id, name FROM data_evolution_table LIMIT 3", - ) + assert!( + !scan_lines.is_empty(), + "plan should contain a PaimonTableScan, plan:\n{plan_text}" + ); + assert!( + scan_lines + .iter() + .any(|line| line.contains("predicate=dt = '2024-01-01'")), + "The translated partition predicate should still be pushed into PaimonTableScan, plan:\n{plan_text}" + ); + assert!( + scan_lines.iter().all(|line| !line.contains("fetch=")), + "Partially translated filters should not revive the removed fetch contract, plan:\n{plan_text}" + ); + + let batches = collect_query("multi_partitioned_log_table", sql) .await - .expect("Limit query on data evolution table should succeed"); + .expect("Partially translated filter + LIMIT query should succeed"); + let rows = extract_id_name_rows(&batches); - let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); - assert_eq!( - total_rows, 3, - "LIMIT 3 should return exactly 3 rows for data evolution table" - ); + assert_eq!( + rows, + vec![(3, "carol".to_string())], + "The residual filter should still be enforced above the scan" + ); +} - // Verify the data is from the merged result (not raw files) - let mut rows = extract_id_name_rows(&batches); - rows.sort_by_key(|(id, _)| *id); +#[tokio::test] +async fn test_limit_pushdown_on_data_evolution_table_returns_merged_rows() { + let batches = collect_query( + "data_evolution_table", + "SELECT id, name FROM data_evolution_table LIMIT 3", + ) + .await + .expect("Limit query on data evolution table should succeed"); - // LIMIT 3 returns ids 1, 2, 3 with merged values - assert_eq!( - rows, - vec![ - (1, "alice-v2".to_string()), - (2, "bob".to_string()), - (3, "carol-v2".to_string()), - ], - "Data evolution table LIMIT 3 should return merged rows" - ); - } + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!( + total_rows, 3, + "LIMIT 3 should return exactly 3 rows for data evolution table" + ); + + let mut rows = extract_id_name_rows(&batches); + rows.sort_by_key(|(id, _)| *id); + + assert_eq!( + rows, + vec![ + (1, "alice-v2".to_string()), + (2, "bob".to_string()), + (3, "carol-v2".to_string()), + ], + "Data evolution table LIMIT 3 should return merged rows" + ); +} + +#[tokio::test] +async fn test_limit_pushdown_marks_safe_scan_limit_hint_and_keeps_correctness() { + let sql = "SELECT id, name FROM simple_log_table LIMIT 2"; + let plan = create_physical_plan("simple_log_table", sql) + .await + .expect("Physical plan creation should succeed"); + let plan_text = format_physical_plan(&plan); + let scan_lines = paimon_scan_lines(&plan_text); + + assert!( + scan_lines + .iter() + .any(|line| line.contains("limit=2") && !line.contains("fetch=")), + "Safe LIMIT query should push a scan limit hint into PaimonTableScan, plan:\n{plan_text}" + ); + + let batches = collect_query("simple_log_table", sql) + .await + .expect("LIMIT query should succeed"); + let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum(); + assert_eq!(total_rows, 2, "LIMIT 2 should still return exactly 2 rows"); +} + +#[tokio::test] +async fn test_offset_limit_pushdown_keeps_correctness_without_fetch_contract() { + let sql = "SELECT id, name FROM partitioned_log_table OFFSET 1 LIMIT 1"; + let plan = create_physical_plan("partitioned_log_table", sql) + .await + .expect("Physical plan creation should succeed"); + let plan_text = format_physical_plan(&plan); + let scan_lines = paimon_scan_lines(&plan_text); + + assert!( + plan_text.contains("GlobalLimitExec"), + "OFFSET queries should keep a GlobalLimitExec in DataFusion, plan:\n{plan_text}" + ); + assert!( + scan_lines.iter().all(|line| !line.contains("fetch=")), + "OFFSET + LIMIT should not rely on the removed DataFusion fetch contract in PaimonTableScan, plan:\n{plan_text}" + ); + + let batches = collect_query("partitioned_log_table", sql) + .await + .expect("OFFSET + LIMIT query should succeed"); + + let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum(); + assert_eq!( + total_rows, 1, + "OFFSET 1 LIMIT 1 should still return exactly 1 row" + ); +} + +#[tokio::test] +async fn test_inexact_filter_limit_keeps_correctness_without_fetch_contract() { + let sql = "SELECT id, name FROM partitioned_log_table WHERE id > 1 LIMIT 1"; + let plan = create_physical_plan("partitioned_log_table", sql) + .await + .expect("Physical plan creation should succeed"); + let plan_text = format_physical_plan(&plan); + let scan_lines = paimon_scan_lines(&plan_text); + + assert!( + !scan_lines.is_empty(), + "plan should contain a PaimonTableScan, plan:\n{plan_text}" + ); + assert!( + scan_lines.iter().all(|line| !line.contains("fetch=")), + "Inexact filter queries should not revive the removed fetch contract, plan:\n{plan_text}" + ); + + let batches = collect_query("partitioned_log_table", sql) + .await + .expect("Inexact filter + LIMIT query should succeed"); + let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum(); + assert_eq!( + total_rows, 1, + "Inexact filter + LIMIT should still return exactly 1 row" + ); +} + +#[tokio::test] +async fn test_residual_filter_limit_keeps_connector_limit_and_correctness() { + let sql = "SELECT id, name FROM simple_log_table WHERE id + 1 > 3 LIMIT 1"; + let plan = create_physical_plan("simple_log_table", sql) + .await + .expect("Physical plan creation should succeed"); + let plan_text = format_physical_plan(&plan); + let scan_lines = paimon_scan_lines(&plan_text); + + assert!( + !scan_lines.is_empty(), + "plan should contain a PaimonTableScan, plan:\n{plan_text}" + ); + assert!( + scan_lines.iter().all(|line| !line.contains("fetch=")), + "Residual filter queries should not revive the removed fetch contract, plan:\n{plan_text}" + ); + assert!( + scan_lines + .iter() + .all(|line| !line.contains("limit=")), + "Residual filter queries should not push a scan limit hint when residual filters stay above the scan, plan:\n{plan_text}" + ); + + let batches = collect_query("simple_log_table", sql) + .await + .expect("Residual filter + LIMIT query should succeed"); + let rows = extract_id_name_rows(&batches); + + assert_eq!( + rows, + vec![(3, "carol".to_string())], + "Residual filter + LIMIT should still return the matching row" + ); } // ======================= Catalog Provider Tests ======================= diff --git a/crates/paimon/src/table/read_builder.rs b/crates/paimon/src/table/read_builder.rs index c9be32c..81fd713 100644 --- a/crates/paimon/src/table/read_builder.rs +++ b/crates/paimon/src/table/read_builder.rs @@ -37,6 +37,25 @@ struct NormalizedFilter { bucket_predicate: Option, } +/// Whether a translated predicate is exact at the table-provider boundary. +/// +/// Exact filters are fully enforced by paimon-core scan planning using only +/// partition-owned semantics, without requiring residual filtering above the +/// scan. +fn is_exact_filter_pushdown_for_schema( + fields: &[DataField], + partition_keys: &[String], + filter: &Predicate, +) -> bool { + if partition_keys.is_empty() { + return false; + } + + let (_, data_predicates) = + split_partition_and_data_predicates(filter.clone(), fields, partition_keys); + data_predicates.is_empty() +} + fn split_scan_predicates(table: &Table, filter: Predicate) -> (Option, Vec) { let partition_keys = table.schema().partition_keys(); if partition_keys.is_empty() { @@ -148,6 +167,18 @@ impl<'a> ReadBuilder<'a> { self } + /// Whether a translated predicate is exact at the table-provider boundary. + /// + /// Exact filters are fully enforced by paimon-core scan planning, without + /// requiring residual filtering above the scan. + pub fn is_exact_filter_pushdown(&self, filter: &Predicate) -> bool { + is_exact_filter_pushdown_for_schema( + self.table.schema().fields(), + self.table.schema().partition_keys(), + filter, + ) + } + /// Set row ID ranges `[from, to]` (inclusive) for filtering in data evolution mode. pub fn with_row_ranges(&mut self, ranges: Vec) -> &mut Self { self.row_ranges = if ranges.is_empty() { @@ -160,8 +191,8 @@ impl<'a> ReadBuilder<'a> { /// Push a row-limit hint down to scan planning. /// - /// This allows the scan to generate fewer splits when possible. The hint is - /// applied based on the `merged_row_count()` of each split. + /// This allows paimon-core scan planning to generate fewer splits when the + /// current scan state keeps split-level `merged_row_count()` conservative. /// /// Note: This method does not guarantee that exactly `limit` rows will be /// returned by [`TableRead`]. It is only a pushdown hint for planning. @@ -357,6 +388,49 @@ mod tests { .collect() } + fn simple_table() -> Table { + let file_io = FileIOBuilder::new("file").build().unwrap(); + let table_schema = TableSchema::new( + 0, + &Schema::builder() + .column("dt", DataType::VarChar(VarCharType::string_type())) + .column("id", DataType::Int(IntType::new())) + .partition_keys(["dt"]) + .build() + .unwrap(), + ); + Table::new( + file_io, + Identifier::new("default", "t"), + "/tmp/test-read-builder".to_string(), + table_schema, + ) + } + + #[test] + fn test_exact_filter_pushdown_is_true_for_partition_only_filter() { + let table = simple_table(); + let predicate = PredicateBuilder::new(table.schema().fields()) + .equal("dt", crate::spec::Datum::String("2024-01-01".to_string())) + .unwrap(); + + let builder = table.new_read_builder(); + + assert!(builder.is_exact_filter_pushdown(&predicate)); + } + + #[test] + fn test_exact_filter_pushdown_is_false_for_data_filter() { + let table = simple_table(); + let predicate = PredicateBuilder::new(table.schema().fields()) + .greater_than("id", crate::spec::Datum::Int(1)) + .unwrap(); + + let builder = table.new_read_builder(); + + assert!(!builder.is_exact_filter_pushdown(&predicate)); + } + #[tokio::test] async fn test_new_read_pushes_filter_to_reader_when_filter_column_not_projected() { let tempdir = tempdir().unwrap(); diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index c7f6908..e1b0b19 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -295,6 +295,18 @@ fn partition_matches_predicate( } } +/// Whether scan-owned pruning still preserves `merged_row_count()` as a safe +/// row-count hint. +/// +/// Data predicates and row ranges can reduce rows within a split after planning, +/// so split-level row counts stop being a conservative bound for final rows. +pub(super) fn can_push_down_limit_hint_for_scan( + data_predicates: &[Predicate], + row_ranges: Option<&[RowRange]>, +) -> bool { + data_predicates.is_empty() && row_ranges.is_none() +} + /// TableScan for full table scan (no incremental, no predicate). /// /// Reference: [pypaimon.read.table_scan.TableScan](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/read/table_scan.py) @@ -397,6 +409,9 @@ impl<'a> TableScan<'a> { Some(l) => l, None => return splits, }; + if limit == 0 { + return Vec::new(); + } if splits.is_empty() { return splits; @@ -411,14 +426,10 @@ impl<'a> TableScan<'a> { limited_splits.push(split); scanned_row_count += merged_count; if scanned_row_count >= limit as i64 { - // We likely have enough rows for the limit hint. return limited_splits; } } None => { - // Can't compute merged row count, so keep this split and - // rely on the caller or query engine to enforce the final - // LIMIT. limited_splits.push(split); } } @@ -427,6 +438,10 @@ impl<'a> TableScan<'a> { limited_splits } + fn can_push_down_limit_hint(&self) -> bool { + can_push_down_limit_hint_for_scan(&self.data_predicates, self.row_ranges.as_deref()) + } + async fn plan_snapshot(&self, snapshot: Snapshot) -> crate::Result { let file_io = self.table.file_io(); let table_path = self.table.location(); @@ -708,7 +723,7 @@ impl<'a> TableScan<'a> { // With data predicates or row_ranges, merged_row_count() reflects pre-filter // row counts, so stopping early could return fewer rows than the limit. - let splits = if self.data_predicates.is_empty() && self.row_ranges.is_none() { + let splits = if self.can_push_down_limit_hint() { self.apply_limit_pushdown(splits) } else { splits @@ -720,15 +735,19 @@ impl<'a> TableScan<'a> { #[cfg(test)] mod tests { - use super::partition_matches_predicate; + use super::{partition_matches_predicate, TableScan}; + use crate::catalog::Identifier; + use crate::io::FileIOBuilder; use crate::spec::{ - stats::BinaryTableStats, ArrayType, BinaryRowBuilder, DataField, DataFileMeta, DataType, - Datum, DeletionVectorMeta, FileKind, IndexFileMeta, IndexManifestEntry, IntType, Predicate, - PredicateBuilder, PredicateOperator, VarCharType, + stats::BinaryTableStats, ArrayType, BinaryRow, BinaryRowBuilder, DataField, DataFileMeta, + DataType, Datum, DeletionVectorMeta, FileKind, IndexFileMeta, IndexManifestEntry, IntType, + Predicate, PredicateBuilder, PredicateOperator, Schema as PaimonSchema, TableSchema, + VarCharType, }; use crate::table::bucket_filter::{compute_target_buckets, extract_predicate_for_keys}; - use crate::table::source::DeletionFile; + use crate::table::source::{DataSplit, DataSplitBuilder, DeletionFile}; use crate::table::stats_filter::{data_file_matches_predicates, group_by_overlapping_row_id}; + use crate::table::Table; use crate::Error; use chrono::{DateTime, Utc}; @@ -842,6 +861,98 @@ mod tests { } } + fn limit_test_table() -> Table { + let file_io = FileIOBuilder::new("file").build().unwrap(); + let schema = PaimonSchema::builder().build().unwrap(); + let table_schema = TableSchema::new(0, &schema); + Table::new( + file_io, + Identifier::new("test_db", "test_table"), + "/tmp/test-table".to_string(), + table_schema, + ) + } + + fn limit_test_split(file_name: &str, row_count: i64) -> DataSplit { + let mut file = test_data_file_meta(Vec::new(), Vec::new(), Vec::new(), row_count); + file.file_name = file_name.to_string(); + + DataSplitBuilder::new() + .with_snapshot(1) + .with_partition(BinaryRow::new(0)) + .with_bucket(0) + .with_bucket_path(format!("file:/tmp/{file_name}")) + .with_total_buckets(1) + .with_data_files(vec![file]) + .with_raw_convertible(true) + .build() + .unwrap() + } + + fn limit_test_split_with_unknown_merged_row_count( + file_name: &str, + row_count: i64, + ) -> DataSplit { + let mut file = test_data_file_meta(Vec::new(), Vec::new(), Vec::new(), row_count); + file.file_name = file_name.to_string(); + + DataSplitBuilder::new() + .with_snapshot(1) + .with_partition(BinaryRow::new(0)) + .with_bucket(0) + .with_bucket_path(format!("file:/tmp/{file_name}")) + .with_total_buckets(1) + .with_data_files(vec![file]) + .with_data_deletion_files(vec![Some(DeletionFile::new( + format!("file:/tmp/{file_name}.dv"), + 0, + 0, + None, + ))]) + .with_raw_convertible(true) + .build() + .unwrap() + } + + fn split_file_names(splits: &[DataSplit]) -> Vec<&str> { + splits + .iter() + .map(|split| split.data_files()[0].file_name.as_str()) + .collect() + } + + #[test] + fn test_apply_limit_pushdown_zero_returns_empty() { + let table = limit_test_table(); + let scan = TableScan::new(&table, None, vec![], None, Some(0), None); + let splits = vec![ + limit_test_split("a.parquet", 2), + limit_test_split("b.parquet", 3), + ]; + + let pruned = scan.apply_limit_pushdown(splits); + + assert!(pruned.is_empty()); + } + + #[test] + fn test_apply_limit_pushdown_keeps_unknown_merged_row_count() { + let table = limit_test_table(); + let scan = TableScan::new(&table, None, vec![], None, Some(3), None); + let splits = vec![ + limit_test_split("a.parquet", 2), + limit_test_split_with_unknown_merged_row_count("b.parquet", 4), + limit_test_split("c.parquet", 3), + ]; + + let pruned = scan.apply_limit_pushdown(splits); + + assert_eq!( + split_file_names(&pruned), + vec!["a.parquet", "b.parquet", "c.parquet"] + ); + } + #[test] fn test_partition_matches_predicate_decode_failure_fails_open() { let predicate = PredicateBuilder::new(&partition_string_field())