diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ad308be..4ff2528 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -53,7 +53,7 @@ jobs: run: cargo fmt --all -- --check - name: Clippy - run: cargo clippy --all-targets --workspace -- -D warnings + run: cargo clippy --all-targets --workspace --features fulltext -- -D warnings build: runs-on: ${{ matrix.os }} @@ -66,7 +66,7 @@ jobs: steps: - uses: actions/checkout@v6 - name: Build - run: cargo build + run: cargo build --features fulltext unit: runs-on: ${{ matrix.os }} @@ -80,7 +80,7 @@ jobs: - uses: actions/checkout@v6 - name: Test - run: cargo test -p paimon --all-targets + run: cargo test -p paimon --all-targets --features fulltext env: RUST_LOG: DEBUG RUST_BACKTRACE: full diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index 370279c..ddfa2c0 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -19,6 +19,7 @@ name = "paimon-datafusion" edition.workspace = true version.workspace = true +exclude = ["testdata/"] license.workspace = true homepage = "https://paimon.apache.org/docs/rust/datafusion/" documentation = "https://docs.rs/paimon-datafusion" @@ -26,6 +27,9 @@ description = "Apache Paimon DataFusion Integration" categories = ["database"] keywords = ["paimon", "datafusion", "integrations"] +[features] +fulltext = ["paimon/fulltext"] + [dependencies] async-trait = "0.1" chrono = "0.4" @@ -37,8 +41,10 @@ tokio = { workspace = true, features = ["rt", "time", "fs"] } [dev-dependencies] arrow-array = { workspace = true } arrow-schema = { workspace = true } +flate2 = "1" parquet = { workspace = true } serde = "1" serde_json = "1" +tar = "0.4" tempfile = "3" tokio = { version = "1", features = ["macros", "rt-multi-thread"] } diff --git a/crates/integrations/datafusion/src/full_text_search.rs b/crates/integrations/datafusion/src/full_text_search.rs new file mode 100644 index 0000000..8d4c037 --- /dev/null +++ b/crates/integrations/datafusion/src/full_text_search.rs @@ -0,0 +1,256 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! `full_text_search` table-valued function for DataFusion. +//! +//! Usage: +//! ```sql +//! SELECT * FROM full_text_search('table_name', 'column_name', 'query text', 10) +//! ``` +//! +//! Reference: [PaimonTableValuedFunctions.scala](https://github.com/apache/paimon/blob/master/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonTableValuedFunctions.scala) + +use std::any::Any; +use std::fmt::Debug; +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; +use datafusion::catalog::Session; +use datafusion::catalog::TableFunctionImpl; +use datafusion::datasource::{TableProvider, TableType}; +use datafusion::error::Result as DFResult; +use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::SessionContext; +use paimon::catalog::{Catalog, Identifier}; + +use crate::error::to_datafusion_error; +use crate::runtime::{await_with_runtime, block_on_with_runtime}; +use crate::table::{build_paimon_scan, PaimonTableProvider}; + +/// Register the `full_text_search` table-valued function on a [`SessionContext`]. +pub fn register_full_text_search( + ctx: &SessionContext, + catalog: Arc, + default_database: &str, +) { + ctx.register_udtf( + "full_text_search", + Arc::new(FullTextSearchFunction::new(catalog, default_database)), + ); +} + +/// Table function that performs full-text search on a Paimon table. +/// +/// Arguments: `(table_name STRING, column_name STRING, query_text STRING, limit INT)` +pub struct FullTextSearchFunction { + catalog: Arc, + default_database: String, +} + +impl Debug for FullTextSearchFunction { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FullTextSearchFunction") + .field("default_database", &self.default_database) + .finish() + } +} + +impl FullTextSearchFunction { + pub fn new(catalog: Arc, default_database: &str) -> Self { + Self { + catalog, + default_database: default_database.to_string(), + } + } +} + +impl TableFunctionImpl for FullTextSearchFunction { + fn call(&self, args: &[Expr]) -> DFResult> { + if args.len() != 4 { + return Err(datafusion::error::DataFusionError::Plan( + "full_text_search requires 4 arguments: (table_name, column_name, query_text, limit)".to_string(), + )); + } + + let table_name = extract_string_literal(&args[0], "table_name")?; + let column_name = extract_string_literal(&args[1], "column_name")?; + let query_text = extract_string_literal(&args[2], "query_text")?; + let limit = extract_int_literal(&args[3], "limit")?; + + if limit <= 0 { + return Err(datafusion::error::DataFusionError::Plan( + "full_text_search: limit must be positive".to_string(), + )); + } + + let identifier = parse_table_identifier(&table_name, &self.default_database)?; + + let catalog = Arc::clone(&self.catalog); + let table = block_on_with_runtime( + async move { catalog.get_table(&identifier).await }, + "full_text_search: catalog access thread panicked", + ) + .map_err(to_datafusion_error)?; + + let inner = PaimonTableProvider::try_new(table)?; + + Ok(Arc::new(FullTextSearchTableProvider { + inner, + column_name, + query_text, + limit: limit as usize, + })) + } +} + +/// A wrapper around [`PaimonTableProvider`] that injects full-text search +/// row filtering into the scan path. +#[derive(Debug)] +struct FullTextSearchTableProvider { + inner: PaimonTableProvider, + column_name: String, + query_text: String, + limit: usize, +} + +#[async_trait] +impl TableProvider for FullTextSearchTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> ArrowSchemaRef { + self.inner.schema() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + limit: Option, + ) -> DFResult> { + let table = self.inner.table(); + + // Use FullTextSearchBuilder to execute the search. + let row_ranges = await_with_runtime(async { + let mut builder = table.new_full_text_search_builder(); + builder + .with_text_column(&self.column_name) + .with_query_text(&self.query_text) + .with_limit(self.limit); + builder.execute().await.map_err(to_datafusion_error) + }) + .await?; + + // Convert search results to row ranges and inject into the scan. + let mut read_builder = table.new_read_builder(); + if let Some(limit) = limit { + read_builder.with_limit(limit); + } + let scan = if row_ranges.is_empty() { + read_builder.new_scan() + } else { + read_builder.new_scan().with_row_ranges(row_ranges) + }; + let plan = await_with_runtime(scan.plan()) + .await + .map_err(to_datafusion_error)?; + + let target = state.config_options().execution.target_partitions; + build_paimon_scan( + table, + &self.schema(), + &plan, + projection, + None, + limit, + target, + ) + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> DFResult> { + Ok(vec![ + TableProviderFilterPushDown::Unsupported; + filters.len() + ]) + } +} + +fn extract_string_literal(expr: &Expr, name: &str) -> DFResult { + match expr { + Expr::Literal(scalar, _) => { + let s = scalar.try_as_str().flatten().ok_or_else(|| { + datafusion::error::DataFusionError::Plan(format!( + "full_text_search: {name} must be a string literal, got: {expr}" + )) + })?; + Ok(s.to_string()) + } + _ => Err(datafusion::error::DataFusionError::Plan(format!( + "full_text_search: {name} must be a literal, got: {expr}" + ))), + } +} + +fn extract_int_literal(expr: &Expr, name: &str) -> DFResult { + use datafusion::common::ScalarValue; + match expr { + Expr::Literal(scalar, _) => match scalar { + ScalarValue::Int8(Some(v)) => Ok(*v as i64), + ScalarValue::Int16(Some(v)) => Ok(*v as i64), + ScalarValue::Int32(Some(v)) => Ok(*v as i64), + ScalarValue::Int64(Some(v)) => Ok(*v), + ScalarValue::UInt8(Some(v)) => Ok(*v as i64), + ScalarValue::UInt16(Some(v)) => Ok(*v as i64), + ScalarValue::UInt32(Some(v)) => Ok(*v as i64), + ScalarValue::UInt64(Some(v)) => i64::try_from(*v).map_err(|_| { + datafusion::error::DataFusionError::Plan(format!( + "full_text_search: {name} value {v} exceeds i64 range" + )) + }), + _ => Err(datafusion::error::DataFusionError::Plan(format!( + "full_text_search: {name} must be an integer literal, got: {expr}" + ))), + }, + _ => Err(datafusion::error::DataFusionError::Plan(format!( + "full_text_search: {name} must be a literal, got: {expr}" + ))), + } +} + +fn parse_table_identifier(name: &str, default_database: &str) -> DFResult { + let parts: Vec<&str> = name.split('.').collect(); + match parts.len() { + 1 => Ok(Identifier::new(default_database, parts[0])), + 2 => Ok(Identifier::new(parts[0], parts[1])), + // 3-part name: catalog.database.table — ignore catalog prefix + 3 => Ok(Identifier::new(parts[1], parts[2])), + _ => Err(datafusion::error::DataFusionError::Plan(format!( + "full_text_search: invalid table name '{name}', expected 'table', 'database.table', or 'catalog.database.table'" + ))), + } +} diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index 8454bf7..abcf744 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -39,6 +39,8 @@ mod catalog; mod error; mod filter_pushdown; +#[cfg(feature = "fulltext")] +mod full_text_search; mod physical_plan; mod relation_planner; pub mod runtime; @@ -46,6 +48,8 @@ mod table; pub use catalog::{PaimonCatalogProvider, PaimonSchemaProvider}; pub use error::to_datafusion_error; +#[cfg(feature = "fulltext")] +pub use full_text_search::{register_full_text_search, FullTextSearchFunction}; pub use physical_plan::PaimonTableScan; pub use relation_planner::PaimonRelationPlanner; pub use table::PaimonTableProvider; diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index fdc4f0e..65eb07f 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -74,7 +74,7 @@ impl PaimonTableProvider { } /// Distribute `items` into `num_buckets` groups using round-robin assignment. -fn bucket_round_robin(items: Vec, num_buckets: usize) -> Vec> { +pub(crate) fn bucket_round_robin(items: Vec, num_buckets: usize) -> Vec> { let mut buckets: Vec> = (0..num_buckets).map(|_| Vec::new()).collect(); for (i, item) in items.into_iter().enumerate() { buckets[i % num_buckets].push(item); @@ -82,6 +82,49 @@ fn bucket_round_robin(items: Vec, num_buckets: usize) -> Vec> { buckets } +/// Build a [`PaimonTableScan`] from a planned [`paimon::table::Plan`]. +/// +/// Shared by [`PaimonTableProvider`] and the full-text search UDTF to avoid +/// duplicating projection, partition distribution, and scan construction. +pub(crate) fn build_paimon_scan( + table: &Table, + schema: &ArrowSchemaRef, + plan: &paimon::table::Plan, + projection: Option<&Vec>, + pushed_predicate: Option, + limit: Option, + target_partitions: usize, +) -> DFResult> { + let (projected_schema, projected_columns) = if let Some(indices) = projection { + let fields: Vec = indices.iter().map(|&i| schema.field(i).clone()).collect(); + let column_names: Vec = fields.iter().map(|f| f.name().clone()).collect(); + (Arc::new(Schema::new(fields)), Some(column_names)) + } else { + let column_names: Vec = schema.fields().iter().map(|f| f.name().clone()).collect(); + (schema.clone(), Some(column_names)) + }; + + let splits = plan.splits().to_vec(); + let planned_partitions: Vec> = if splits.is_empty() { + vec![Arc::from(Vec::new())] + } else { + let num_partitions = splits.len().min(target_partitions.max(1)); + bucket_round_robin(splits, num_partitions) + .into_iter() + .map(Arc::from) + .collect() + }; + + Ok(Arc::new(PaimonTableScan::new( + projected_schema, + table.clone(), + projected_columns, + pushed_predicate, + planned_partitions, + limit, + ))) +} + #[async_trait] impl TableProvider for PaimonTableProvider { fn as_any(&self) -> &dyn Any { @@ -103,23 +146,6 @@ impl TableProvider for PaimonTableProvider { filters: &[Expr], limit: Option, ) -> DFResult> { - let (projected_schema, projected_columns) = if let Some(indices) = projection { - let fields: Vec = indices - .iter() - .map(|&i| self.schema.field(i).clone()) - .collect(); - let column_names: Vec = fields.iter().map(|f| f.name().clone()).collect(); - (Arc::new(Schema::new(fields)), Some(column_names)) - } else { - let column_names: Vec = self - .schema - .fields() - .iter() - .map(|f| f.name().clone()) - .collect(); - (self.schema.clone(), Some(column_names)) - }; - // Plan splits eagerly so we know partition count upfront. let pushed_predicate = build_pushed_predicate(filters, self.table.schema().fields()); let mut read_builder = self.table.new_read_builder(); @@ -140,30 +166,16 @@ impl TableProvider for PaimonTableProvider { .await .map_err(to_datafusion_error)?; - // Distribute splits across DataFusion partitions, capped by the - // session's target_partitions to avoid over-sharding with many small splits. - // Each partition's splits are wrapped in Arc to avoid deep-cloning in execute(). - let splits = plan.splits().to_vec(); - let planned_partitions: Vec> = if splits.is_empty() { - // Empty plans get a single empty partition to avoid 0-partition edge cases. - vec![Arc::from(Vec::new())] - } else { - let target = state.config_options().execution.target_partitions; - let num_partitions = splits.len().min(target.max(1)); - bucket_round_robin(splits, num_partitions) - .into_iter() - .map(Arc::from) - .collect() - }; - - Ok(Arc::new(PaimonTableScan::new( - projected_schema, - self.table.clone(), - projected_columns, + let target = state.config_options().execution.target_partitions; + build_paimon_scan( + &self.table, + &self.schema, + &plan, + projection, pushed_predicate, - planned_partitions, limit, - ))) + target, + ) } fn supports_filters_pushdown( diff --git a/crates/integrations/datafusion/testdata/test_tantivy_fulltext.tar.gz b/crates/integrations/datafusion/testdata/test_tantivy_fulltext.tar.gz new file mode 100644 index 0000000..3fb2f07 Binary files /dev/null and b/crates/integrations/datafusion/testdata/test_tantivy_fulltext.tar.gz differ diff --git a/crates/integrations/datafusion/tests/read_tables.rs b/crates/integrations/datafusion/tests/read_tables.rs index 5a20712..4fc26d6 100644 --- a/crates/integrations/datafusion/tests/read_tables.rs +++ b/crates/integrations/datafusion/tests/read_tables.rs @@ -754,3 +754,126 @@ async fn test_filter_row_id_from_data_evolution_table() { } } } + +// ======================= Full-Text Search Tests ======================= + +#[cfg(feature = "fulltext")] +mod fulltext_tests { + use std::sync::Arc; + + use datafusion::arrow::array::{Int32Array, StringArray}; + use datafusion::prelude::SessionContext; + use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options}; + use paimon_datafusion::{register_full_text_search, PaimonCatalogProvider}; + + /// Extract the bundled tar.gz into a temp dir and return (tempdir, warehouse_path). + fn extract_test_warehouse() -> (tempfile::TempDir, String) { + let archive_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR")) + .join("testdata/test_tantivy_fulltext.tar.gz"); + let file = std::fs::File::open(&archive_path) + .unwrap_or_else(|e| panic!("Failed to open {}: {e}", archive_path.display())); + let decoder = flate2::read::GzDecoder::new(file); + let mut archive = tar::Archive::new(decoder); + + let tmp = tempfile::tempdir().expect("Failed to create temp dir"); + let db_dir = tmp.path().join("default.db"); + std::fs::create_dir_all(&db_dir).unwrap(); + archive.unpack(&db_dir).unwrap(); + + let warehouse = format!("file://{}", tmp.path().display()); + (tmp, warehouse) + } + + async fn create_fulltext_context() -> (SessionContext, tempfile::TempDir) { + let (tmp, warehouse) = extract_test_warehouse(); + let mut options = Options::new(); + options.set(CatalogOptions::WAREHOUSE, warehouse); + let catalog = FileSystemCatalog::new(options).expect("Failed to create catalog"); + let catalog: Arc = Arc::new(catalog); + + let ctx = SessionContext::new(); + ctx.register_catalog( + "paimon", + Arc::new(PaimonCatalogProvider::new(Arc::clone(&catalog))), + ); + register_full_text_search(&ctx, catalog, "default"); + (ctx, tmp) + } + + fn extract_id_content_rows( + batches: &[datafusion::arrow::record_batch::RecordBatch], + ) -> Vec<(i32, String)> { + let mut rows = Vec::new(); + for batch in batches { + let id_array = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("Expected Int32Array for id"); + let content_array = batch + .column_by_name("content") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("Expected StringArray for content"); + for i in 0..batch.num_rows() { + rows.push((id_array.value(i), content_array.value(i).to_string())); + } + } + rows.sort_by_key(|(id, _)| *id); + rows + } + + /// Search for 'paimon' — rows 0, 2, 4 mention "paimon". + #[tokio::test] + async fn test_full_text_search_paimon() { + let (ctx, _tmp) = create_fulltext_context().await; + let batches = ctx + .sql("SELECT id, content FROM full_text_search('paimon.default.test_tantivy_fulltext', 'content', 'paimon', 10)") + .await + .expect("SQL should parse") + .collect() + .await + .expect("query should execute"); + + let rows = extract_id_content_rows(&batches); + let ids: Vec = rows.iter().map(|(id, _)| *id).collect(); + assert_eq!( + ids, + vec![0, 2, 4], + "Searching 'paimon' should match rows 0, 2, 4" + ); + } + + /// Search for 'tantivy' — only row 1. + #[tokio::test] + async fn test_full_text_search_tantivy() { + let (ctx, _tmp) = create_fulltext_context().await; + let batches = ctx + .sql("SELECT id, content FROM full_text_search('paimon.default.test_tantivy_fulltext', 'content', 'tantivy', 10)") + .await + .expect("SQL should parse") + .collect() + .await + .expect("query should execute"); + + let rows = extract_id_content_rows(&batches); + let ids: Vec = rows.iter().map(|(id, _)| *id).collect(); + assert_eq!(ids, vec![1], "Searching 'tantivy' should match row 1"); + } + + /// Search for 'search' — rows 1, 3 mention "full-text search". + #[tokio::test] + async fn test_full_text_search_search() { + let (ctx, _tmp) = create_fulltext_context().await; + let batches = ctx + .sql("SELECT id, content FROM full_text_search('paimon.default.test_tantivy_fulltext', 'content', 'search', 10)") + .await + .expect("SQL should parse") + .collect() + .await + .expect("query should execute"); + + let rows = extract_id_content_rows(&batches); + let ids: Vec = rows.iter().map(|(id, _)| *id).collect(); + assert!(ids.contains(&1), "Searching 'search' should match row 1"); + assert!(ids.contains(&3), "Searching 'search' should match row 3"); + } +} diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml index 682adab..10e5fa0 100644 --- a/crates/paimon/Cargo.toml +++ b/crates/paimon/Cargo.toml @@ -20,6 +20,7 @@ categories = ["database"] description = "The rust implementation of Apache Paimon" documentation = "https://docs.rs/paimon" name = "paimon" +exclude = ["testdata/"] homepage.workspace = true repository.workspace = true @@ -30,6 +31,7 @@ version.workspace = true [features] default = ["storage-memory", "storage-fs", "storage-oss"] storage-all = ["storage-memory", "storage-fs", "storage-oss", "storage-s3"] +fulltext = ["tantivy", "tempfile"] storage-memory = ["opendal/services-memory"] storage-fs = ["opendal/services-fs"] @@ -77,9 +79,10 @@ md-5 = "0.10" regex = "1" uuid = { version = "1", features = ["v4"] } urlencoding = "2.1" +tantivy = { version = "0.22", optional = true } +tempfile = { version = "3", optional = true } [dev-dependencies] axum = { version = "0.7", features = ["macros", "tokio", "http1", "http2"] } rand = "0.8.5" - tempfile = "3" diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs index f2deafe..a68ba6b 100644 --- a/crates/paimon/src/lib.rs +++ b/crates/paimon/src/lib.rs @@ -34,6 +34,8 @@ pub mod io; mod predicate_stats; pub mod spec; pub mod table; +#[cfg(feature = "fulltext")] +pub mod tantivy; pub use catalog::Catalog; pub use catalog::CatalogFactory; diff --git a/crates/paimon/src/table/full_text_search_builder.rs b/crates/paimon/src/table/full_text_search_builder.rs new file mode 100644 index 0000000..783bd57 --- /dev/null +++ b/crates/paimon/src/table/full_text_search_builder.rs @@ -0,0 +1,207 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Full-text search builder for Paimon tables. +//! +//! Reference: [FullTextSearchBuilderImpl.java](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextSearchBuilderImpl.java) + +use crate::spec::{DataField, FileKind, IndexManifest}; +use crate::table::snapshot_manager::SnapshotManager; +use crate::table::{RowRange, Table}; +use crate::tantivy::full_text_search::{FullTextSearch, SearchResult}; +use crate::tantivy::reader::TantivyFullTextReader; + +const INDEX_DIR: &str = "index"; +const TANTIVY_FULLTEXT_INDEX_TYPE: &str = "tantivy-fulltext"; + +/// Builder for executing full-text search on a Paimon table. +/// +/// Usage: +/// ```ignore +/// let result = table.new_full_text_search_builder() +/// .with_text_column("content") +/// .with_query_text("hello world") +/// .with_limit(10) +/// .execute() +/// .await?; +/// ``` +/// +/// Reference: `org.apache.paimon.table.source.FullTextSearchBuilder` +pub struct FullTextSearchBuilder<'a> { + table: &'a Table, + text_column: Option, + query_text: Option, + limit: Option, +} + +impl<'a> FullTextSearchBuilder<'a> { + pub(crate) fn new(table: &'a Table) -> Self { + Self { + table, + text_column: None, + query_text: None, + limit: None, + } + } + + /// Set the text column to search. + pub fn with_text_column(&mut self, name: &str) -> &mut Self { + self.text_column = Some(name.to_string()); + self + } + + /// Set the query text to search for. + pub fn with_query_text(&mut self, query: &str) -> &mut Self { + self.query_text = Some(query.to_string()); + self + } + + /// Set the top-k limit for results. + pub fn with_limit(&mut self, limit: usize) -> &mut Self { + self.limit = Some(limit); + self + } + + /// Execute the full-text search and return row ranges. + /// + /// This reads the latest snapshot, loads the index manifest, and evaluates + /// the search against Tantivy indexes. + /// + /// Reference: `FullTextSearchBuilder.executeLocal()` + pub async fn execute(&self) -> crate::Result> { + let text_column = + self.text_column + .as_deref() + .ok_or_else(|| crate::Error::ConfigInvalid { + message: "Text column must be set via with_text_column()".to_string(), + })?; + let query_text = self + .query_text + .as_deref() + .ok_or_else(|| crate::Error::ConfigInvalid { + message: "Query text must be set via with_query_text()".to_string(), + })?; + let limit = self.limit.ok_or_else(|| crate::Error::ConfigInvalid { + message: "Limit must be set via with_limit()".to_string(), + })?; + + let search = FullTextSearch::new(query_text.to_string(), limit, text_column.to_string())?; + + let snapshot_manager = SnapshotManager::new( + self.table.file_io().clone(), + self.table.location().to_string(), + ); + + let snapshot = match snapshot_manager.get_latest_snapshot().await? { + Some(s) => s, + None => return Ok(Vec::new()), + }; + + let index_manifest_name = match snapshot.index_manifest() { + Some(name) => name.to_string(), + None => return Ok(Vec::new()), + }; + + let manifest_path = format!( + "{}/manifest/{}", + self.table.location().trim_end_matches('/'), + index_manifest_name + ); + let index_entries = IndexManifest::read(self.table.file_io(), &manifest_path).await?; + + evaluate_full_text_search( + self.table.file_io(), + self.table.location(), + &index_entries, + &search, + self.table.schema().fields(), + ) + .await + } +} + +/// Evaluate a full-text search query against Tantivy indexes found in the index manifest. +async fn evaluate_full_text_search( + file_io: &crate::io::FileIO, + table_path: &str, + index_entries: &[crate::spec::IndexManifestEntry], + search: &FullTextSearch, + schema_fields: &[DataField], +) -> crate::Result> { + let table_path = table_path.trim_end_matches('/'); + + let field_id = match find_field_id_by_name(schema_fields, &search.field_name) { + Some(id) => id, + None => return Ok(Vec::new()), + }; + + // Collect tantivy fulltext entries for the target field. + let fulltext_entries: Vec<_> = index_entries + .iter() + .filter(|e| { + e.kind == FileKind::Add + && e.index_file.index_type == TANTIVY_FULLTEXT_INDEX_TYPE + && e.index_file + .global_index_meta + .as_ref() + .is_some_and(|m| m.index_field_id == field_id) + }) + .collect(); + + if fulltext_entries.is_empty() { + return Ok(Vec::new()); + } + + let futures: Vec<_> = fulltext_entries + .into_iter() + .map(|entry| { + let global_meta = entry.index_file.global_index_meta.as_ref().unwrap(); + let path = format!("{table_path}/{INDEX_DIR}/{}", entry.index_file.file_name); + let file_name = entry.index_file.file_name.clone(); + let query_text = search.query_text.clone(); + let limit = search.limit; + let row_range_start = global_meta.row_range_start; + let input = file_io.new_input(&path); + async move { + let input = input?; + let reader = TantivyFullTextReader::from_input_file(&input) + .await + .map_err(|e| crate::Error::UnexpectedError { + message: format!( + "Failed to open Tantivy full-text index '{}': {}", + file_name, e + ), + source: None, + })?; + let result = reader.search(&query_text, limit)?; + Ok::<_, crate::Error>(result.offset(row_range_start)) + } + }) + .collect(); + + let results = futures::future::try_join_all(futures).await?; + let mut merged = SearchResult::empty(); + for r in &results { + merged = merged.or(r); + } + + Ok(merged.top_k(search.limit).to_row_ranges()) +} + +fn find_field_id_by_name(fields: &[DataField], name: &str) -> Option { + fields.iter().find(|f| f.name() == name).map(|f| f.id()) +} diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index 9b1d6f7..1b34324 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -19,6 +19,8 @@ pub(crate) mod bin_pack; mod bucket_filter; +#[cfg(feature = "fulltext")] +mod full_text_search_builder; pub(crate) mod global_index_scanner; mod read_builder; pub(crate) mod row_id_predicate; @@ -31,6 +33,8 @@ mod tag_manager; use crate::Result; use arrow_array::RecordBatch; +#[cfg(feature = "fulltext")] +pub use full_text_search_builder::FullTextSearchBuilder; use futures::stream::BoxStream; pub use read_builder::{ReadBuilder, TableRead}; pub use schema_manager::SchemaManager; @@ -106,6 +110,14 @@ impl Table { ReadBuilder::new(self) } + /// Create a full-text search builder. + /// + /// Reference: [FullTextSearchBuilderImpl](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextSearchBuilderImpl.java) + #[cfg(feature = "fulltext")] + pub fn new_full_text_search_builder(&self) -> FullTextSearchBuilder<'_> { + FullTextSearchBuilder::new(self) + } + /// Create a copy of this table with extra options merged into the schema. pub fn copy_with_options(&self, extra: HashMap) -> Self { Self { diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index 4e38958..e21f00b 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -329,6 +329,19 @@ impl<'a> TableScan<'a> { } } + /// Set row ranges for scan-time filtering. + /// + /// This replaces any existing row_ranges. Typically used to inject + /// results from global index lookups (e.g. full-text search). + pub fn with_row_ranges(mut self, ranges: Vec) -> Self { + self.row_ranges = if ranges.is_empty() { + None + } else { + Some(ranges) + }; + self + } + /// Plan the full scan: resolve snapshot (via options or latest), then read manifests and build DataSplits. /// /// Time travel is resolved from table options: diff --git a/crates/paimon/src/tantivy/directory.rs b/crates/paimon/src/tantivy/directory.rs new file mode 100644 index 0000000..32c3058 --- /dev/null +++ b/crates/paimon/src/tantivy/directory.rs @@ -0,0 +1,394 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Archive-backed Tantivy Directory implementation. +//! +//! Reads a Tantivy index packed into a single archive file. The archive format +//! (Big-Endian, compatible with Java Paimon): +//! +//! ```text +//! [fileCount: 4 bytes BE] +//! for each file: +//! [nameLen: 4 bytes BE] +//! [name: nameLen bytes UTF-8] +//! [dataLen: 8 bytes BE] +//! [data: dataLen bytes] +//! ``` +//! +//! Reference: `org.apache.paimon.tantivy.index.TantivyFullTextGlobalIndexWriter.packIndex()` +//! Reference: `org.apache.paimon.tantivy.index.TantivyFullTextGlobalIndexReader.parseArchiveHeader()` + +use bytes::Bytes; +use std::collections::HashMap; +use std::fmt; +use std::io; +use std::ops::Range; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; +use tantivy::directory::error::{DeleteError, LockError, OpenReadError, OpenWriteError}; +use tantivy::directory::Directory; +use tantivy::directory::{ + AntiCallToken, DirectoryLock, FileHandle, Lock, OwnedBytes, TerminatingWrite, WatchCallback, + WatchHandle, WritePtr, +}; +use tantivy::HasLen; + +use crate::io::FileRead; + +/// Metadata for a single file within the archive. +#[derive(Clone, Debug)] +struct FileMeta { + /// Absolute byte offset of the file data within the archive. + offset: u64, + length: usize, +} + +/// A read-only Tantivy `Directory` backed by an archive file. +/// +/// Only the archive header (file names, offsets, lengths) is parsed eagerly. +/// Actual file data is read on demand via `FileRead`. +#[derive(Clone)] +pub struct ArchiveDirectory { + files: Arc>, + reader: Arc, + /// In-memory storage for atomic_write (used by Tantivy for meta.json). + atomic_data: Arc>>>, +} + +impl ArchiveDirectory { + /// Create an `ArchiveDirectory` from an async `FileRead`. + /// + /// Only the archive header (file names, offsets, lengths) is read eagerly. + /// Actual file data is read on demand when Tantivy requests it. + pub async fn from_reader(reader: impl FileRead, file_size: u64) -> io::Result { + let reader: Arc = Arc::new(reader); + + if file_size < 4 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Archive too small to contain file count", + )); + } + + // Read file count (4 bytes). + let buf = reader + .read(0..4) + .await + .map_err(|e| io::Error::other(e.to_string()))?; + let file_count = i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]); + if file_count < 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("Negative file count in archive: {file_count}"), + )); + } + let file_count = file_count as usize; + + let mut pos: u64 = 4; + let mut files = HashMap::with_capacity(file_count); + + for _ in 0..file_count { + // Read name_len (4 bytes). + let buf = reader + .read(pos..pos + 4) + .await + .map_err(|e| io::Error::other(e.to_string()))?; + let name_len = i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]); + if name_len < 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("Negative name length in archive: {name_len}"), + )); + } + let name_len = name_len as u64; + pos += 4; + + // Read name + data_len together in a single IO call. + let meta_buf = reader + .read(pos..pos + name_len + 8) + .await + .map_err(|e| io::Error::other(e.to_string()))?; + + let name = String::from_utf8(meta_buf[..name_len as usize].to_vec()).map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("Invalid UTF-8 in file name: {}", e), + ) + })?; + + let dl = name_len as usize; + let data_len = i64::from_be_bytes([ + meta_buf[dl], + meta_buf[dl + 1], + meta_buf[dl + 2], + meta_buf[dl + 3], + meta_buf[dl + 4], + meta_buf[dl + 5], + meta_buf[dl + 6], + meta_buf[dl + 7], + ]); + if data_len < 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("Negative data length in archive: {data_len}"), + )); + } + let data_len = data_len as u64; + pos += name_len + 8; + + let data_offset = pos; + files.insert( + PathBuf::from(&name), + FileMeta { + offset: data_offset, + length: data_len as usize, + }, + ); + + // Skip past file data — do NOT read it. + pos += data_len; + } + + Ok(Self { + files: Arc::new(files), + reader, + atomic_data: Arc::new(Mutex::new(HashMap::new())), + }) + } +} + +/// Bridge sync Tantivy `FileHandle::read_bytes` to async `FileRead::read`. +/// +/// Uses `block_in_place` on multi-threaded tokio runtimes (no thread spawn +/// overhead). Falls back to a scoped thread for current-thread runtimes. +fn block_on_read(reader: &Arc, range: Range) -> io::Result { + let handle = tokio::runtime::Handle::current(); + let do_read = || { + handle + .block_on(reader.read(range.clone())) + .map_err(|e| io::Error::other(e.to_string())) + }; + + match handle.runtime_flavor() { + tokio::runtime::RuntimeFlavor::MultiThread => tokio::task::block_in_place(do_read), + _ => { + // Current-thread runtime: block_in_place is not available, + // fall back to a scoped thread. + let reader = Arc::clone(reader); + std::thread::scope(|s| { + s.spawn(move || { + handle + .block_on(reader.read(range)) + .map_err(|e| io::Error::other(e.to_string())) + }) + .join() + .map_err(|_| io::Error::other("reader thread panicked"))? + }) + } + } +} + +impl fmt::Debug for ArchiveDirectory { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ArchiveDirectory") + .field("files", &self.files.keys().collect::>()) + .finish() + } +} + +/// A `FileHandle` for a single file within the archive. +#[derive(Clone)] +struct ArchiveFileHandle { + reader: Arc, + file_offset: u64, + file_length: usize, +} + +impl fmt::Debug for ArchiveFileHandle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ArchiveFileHandle") + .field("offset", &self.file_offset) + .field("length", &self.file_length) + .finish() + } +} + +impl HasLen for ArchiveFileHandle { + fn len(&self) -> usize { + self.file_length + } +} + +impl FileHandle for ArchiveFileHandle { + fn read_bytes(&self, range: Range) -> io::Result { + if range.end > self.file_length { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "Read range {:?} exceeds file length {}", + range, self.file_length + ), + )); + } + + let abs_start = self.file_offset + range.start as u64; + let abs_end = self.file_offset + range.end as u64; + let data = block_on_read(&self.reader, abs_start..abs_end)?; + Ok(OwnedBytes::new(data.to_vec())) + } +} + +impl Directory for ArchiveDirectory { + fn get_file_handle(&self, path: &Path) -> Result, OpenReadError> { + let meta = self + .files + .get(path) + .ok_or_else(|| OpenReadError::FileDoesNotExist(path.to_path_buf()))?; + + Ok(Arc::new(ArchiveFileHandle { + reader: self.reader.clone(), + file_offset: meta.offset, + file_length: meta.length, + })) + } + + fn exists(&self, path: &Path) -> Result { + Ok(self.files.contains_key(path) || self.atomic_data.lock().unwrap().contains_key(path)) + } + + fn atomic_read(&self, path: &Path) -> Result, OpenReadError> { + if let Some(data) = self.atomic_data.lock().unwrap().get(path) { + return Ok(data.clone()); + } + let meta = self + .files + .get(path) + .ok_or_else(|| OpenReadError::FileDoesNotExist(path.to_path_buf()))?; + + let data = block_on_read(&self.reader, meta.offset..meta.offset + meta.length as u64) + .map_err(|e| OpenReadError::wrap_io_error(e, path.to_path_buf()))?; + Ok(data.to_vec()) + } + + fn atomic_write(&self, path: &Path, data: &[u8]) -> io::Result<()> { + self.atomic_data + .lock() + .unwrap() + .insert(path.to_path_buf(), data.to_vec()); + Ok(()) + } + + fn delete(&self, _path: &Path) -> Result<(), DeleteError> { + Ok(()) + } + + fn open_write(&self, _path: &Path) -> Result { + Ok(io::BufWriter::new(Box::new( + VecTerminatingWrite(Vec::new()), + ))) + } + + fn sync_directory(&self) -> io::Result<()> { + Ok(()) + } + + fn acquire_lock(&self, _lock: &Lock) -> Result { + Ok(DirectoryLock::from(Box::new(()))) + } + + fn watch(&self, _watch_callback: WatchCallback) -> tantivy::Result { + Ok(WatchHandle::empty()) + } +} + +/// Dummy writer for lock file support. +struct VecTerminatingWrite(Vec); + +impl io::Write for VecTerminatingWrite { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.0.extend_from_slice(buf); + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl TerminatingWrite for VecTerminatingWrite { + fn terminate_ref(&mut self, _token: AntiCallToken) -> io::Result<()> { + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::io::FileIOBuilder; + use crate::tantivy::writer::TantivyFullTextWriter; + + async fn make_test_dir() -> ArchiveDirectory { + let file_io = FileIOBuilder::new("memory").build().unwrap(); + + let mut writer = TantivyFullTextWriter::new().unwrap(); + writer.add_document(0, Some("hello")).unwrap(); + writer.add_document(1, Some("world")).unwrap(); + let output = file_io.new_output("/test_archive.bin").unwrap(); + writer.finish(&output).await.unwrap(); + + let input = output.to_input_file(); + let metadata = input.metadata().await.unwrap(); + let reader = input.reader().await.unwrap(); + ArchiveDirectory::from_reader(reader, metadata.size) + .await + .unwrap() + } + + #[tokio::test] + async fn test_parse_archive() { + let dir = make_test_dir().await; + // Tantivy index files should be present. + assert!(!dir.files.is_empty()); + } + + #[tokio::test] + async fn test_read_file_from_archive() { + let dir = make_test_dir().await; + // Read any file from the archive and verify it's non-empty. + let first_path = dir.files.keys().next().unwrap().clone(); + let handle = dir.get_file_handle(&first_path).unwrap(); + assert!(handle.len() > 0); + let data = handle.read_bytes(0..handle.len()).unwrap(); + assert_eq!(data.len(), handle.len()); + } + + #[tokio::test] + async fn test_atomic_read_write() { + let dir = make_test_dir().await; + + // atomic_write + atomic_read + dir.atomic_write(Path::new("meta.json"), b"{}").unwrap(); + let data = dir.atomic_read(Path::new("meta.json")).unwrap(); + assert_eq!(&data, b"{}"); + } + + #[tokio::test] + async fn test_file_not_found() { + let dir = make_test_dir().await; + assert!(dir.get_file_handle(Path::new("missing.txt")).is_err()); + } +} diff --git a/crates/paimon/src/tantivy/full_text_search.rs b/crates/paimon/src/tantivy/full_text_search.rs new file mode 100644 index 0000000..77bcd03 --- /dev/null +++ b/crates/paimon/src/tantivy/full_text_search.rs @@ -0,0 +1,176 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Full-text search types for global index. +//! +//! Reference: [org.apache.paimon.predicate.FullTextSearch](https://github.com/apache/paimon/blob/master/paimon-common/src/main/java/org/apache/paimon/predicate/FullTextSearch.java) + +/// Full-text search predicate. +/// +/// Reference: `org.apache.paimon.predicate.FullTextSearch` +#[derive(Debug, Clone)] +pub struct FullTextSearch { + pub query_text: String, + pub field_name: String, + pub limit: usize, +} + +impl FullTextSearch { + pub fn new(query_text: String, limit: usize, field_name: String) -> crate::Result { + if query_text.is_empty() { + return Err(crate::Error::ConfigInvalid { + message: "Query text cannot be empty".to_string(), + }); + } + if limit == 0 { + return Err(crate::Error::ConfigInvalid { + message: "Limit must be positive".to_string(), + }); + } + if field_name.is_empty() { + return Err(crate::Error::ConfigInvalid { + message: "Field name cannot be empty".to_string(), + }); + } + Ok(Self { + query_text, + field_name, + limit, + }) + } +} + +/// Search result containing parallel arrays of row IDs and scores. +#[derive(Debug, Clone)] +pub struct SearchResult { + pub row_ids: Vec, + pub scores: Vec, +} + +impl SearchResult { + pub fn new(row_ids: Vec, scores: Vec) -> Self { + assert_eq!(row_ids.len(), scores.len()); + Self { row_ids, scores } + } + + pub fn empty() -> Self { + Self { + row_ids: Vec::new(), + scores: Vec::new(), + } + } + + pub fn len(&self) -> usize { + self.row_ids.len() + } + + pub fn is_empty(&self) -> bool { + self.row_ids.is_empty() + } + + /// Apply an offset to all row IDs. + pub fn offset(&self, offset: i64) -> Self { + if offset == 0 { + return self.clone(); + } + let row_ids = self + .row_ids + .iter() + .map(|&id| { + if offset >= 0 { + id.saturating_add(offset as u64) + } else { + id.saturating_sub(offset.unsigned_abs()) + } + }) + .collect(); + Self { + row_ids, + scores: self.scores.clone(), + } + } + + /// Merge two search results. + pub fn or(&self, other: &SearchResult) -> Self { + let mut row_ids = self.row_ids.clone(); + let mut scores = self.scores.clone(); + row_ids.extend_from_slice(&other.row_ids); + scores.extend_from_slice(&other.scores); + Self { row_ids, scores } + } + + /// Return top-k results by score (descending). + pub fn top_k(&self, k: usize) -> Self { + if self.row_ids.len() <= k { + return self.clone(); + } + let mut indices: Vec = (0..self.row_ids.len()).collect(); + indices.sort_by(|&a, &b| { + self.scores[b] + .partial_cmp(&self.scores[a]) + .unwrap_or(std::cmp::Ordering::Equal) + }); + indices.truncate(k); + let row_ids = indices.iter().map(|&i| self.row_ids[i]).collect(); + let scores = indices.iter().map(|&i| self.scores[i]).collect(); + Self { row_ids, scores } + } + + /// Convert to sorted, merged row ranges. + pub fn to_row_ranges(&self) -> Vec { + if self.row_ids.is_empty() { + return Vec::new(); + } + let mut sorted: Vec = self.row_ids.clone(); + sorted.sort_unstable(); + sorted.dedup(); + let mut ranges = Vec::new(); + let mut start = sorted[0] as i64; + let mut end = start; + for &id in &sorted[1..] { + let id = id as i64; + if id == end + 1 { + end = id; + } else { + ranges.push(crate::table::RowRange::new(start, end)); + start = id; + end = id; + } + } + ranges.push(crate::table::RowRange::new(start, end)); + ranges + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_full_text_search_new() { + let fts = FullTextSearch::new("hello".into(), 10, "text".into()).unwrap(); + assert_eq!(fts.query_text, "hello"); + assert_eq!(fts.limit, 10); + assert_eq!(fts.field_name, "text"); + } + + #[test] + fn test_full_text_search_empty_query() { + let result = FullTextSearch::new("".into(), 10, "text".into()); + assert!(result.is_err()); + } +} diff --git a/crates/paimon/src/tantivy/mod.rs b/crates/paimon/src/tantivy/mod.rs new file mode 100644 index 0000000..0922937 --- /dev/null +++ b/crates/paimon/src/tantivy/mod.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tantivy-based full-text search for global index. + +pub(crate) mod directory; +pub mod full_text_search; +pub mod reader; +pub mod writer; diff --git a/crates/paimon/src/tantivy/reader.rs b/crates/paimon/src/tantivy/reader.rs new file mode 100644 index 0000000..1748850 --- /dev/null +++ b/crates/paimon/src/tantivy/reader.rs @@ -0,0 +1,198 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tantivy full-text index reader. +//! +//! Reads a Tantivy index from an archive (packed by `TantivyFullTextWriter`) +//! and performs full-text search queries. +//! +//! Reference: `org.apache.paimon.tantivy.index.TantivyFullTextGlobalIndexReader` + +use crate::io::{FileRead, InputFile}; +use crate::tantivy::directory::ArchiveDirectory; +use crate::tantivy::full_text_search::SearchResult; +use tantivy::collector::TopDocs; +use tantivy::query::QueryParser; +use tantivy::{Index, IndexReader, ReloadPolicy}; + +/// Reader for a Tantivy full-text index stored in archive format. +pub struct TantivyFullTextReader { + reader: IndexReader, + index: Index, +} + +impl TantivyFullTextReader { + /// Open a reader from an `InputFile` (on-demand reading, no full load). + pub async fn from_input_file(input: &InputFile) -> crate::Result { + let metadata = input.metadata().await?; + let reader = input.reader().await?; + Self::from_reader(reader, metadata.size).await + } + + /// Open a reader from an async `FileRead` and file size. + pub async fn from_reader(reader: impl FileRead, file_size: u64) -> crate::Result { + let directory = ArchiveDirectory::from_reader(reader, file_size) + .await + .map_err(|e| crate::Error::UnexpectedError { + message: format!("Failed to parse Tantivy archive: {}", e), + source: None, + })?; + + let index = Index::open(directory).map_err(|e| crate::Error::UnexpectedError { + message: format!("Failed to open Tantivy index from archive: {}", e), + source: None, + })?; + + let reader = index + .reader_builder() + .reload_policy(ReloadPolicy::Manual) + .try_into() + .map_err(|e| crate::Error::UnexpectedError { + message: format!("Failed to create Tantivy reader: {}", e), + source: None, + })?; + + Ok(Self { reader, index }) + } + + /// Search the index and return top-N results ranked by score. + pub fn search(&self, query_text: &str, limit: usize) -> crate::Result { + let schema = self.index.schema(); + let text_field = schema + .get_field("text") + .map_err(|_| crate::Error::UnexpectedError { + message: "Tantivy schema missing 'text' field".to_string(), + source: None, + })?; + + let searcher = self.reader.searcher(); + let query_parser = QueryParser::for_index(&self.index, vec![text_field]); + let query = + query_parser + .parse_query(query_text) + .map_err(|e| crate::Error::UnexpectedError { + message: format!("Failed to parse query '{}': {}", query_text, e), + source: None, + })?; + + let top_docs = searcher + .search(&query, &TopDocs::with_limit(limit)) + .map_err(|e| crate::Error::UnexpectedError { + message: format!("Tantivy search failed: {}", e), + source: None, + })?; + + let mut row_ids = Vec::with_capacity(top_docs.len()); + let mut scores = Vec::with_capacity(top_docs.len()); + + for (score, doc_address) in &top_docs { + let segment_reader = searcher.segment_reader(doc_address.segment_ord); + let fast_fields = segment_reader.fast_fields().u64("row_id").map_err(|e| { + crate::Error::UnexpectedError { + message: format!("Failed to get row_id fast field: {}", e), + source: None, + } + })?; + let row_id = fast_fields.first(doc_address.doc_id).ok_or_else(|| { + crate::Error::UnexpectedError { + message: format!( + "Missing row_id for doc_id {} in segment {}", + doc_address.doc_id, doc_address.segment_ord + ), + source: None, + } + })?; + row_ids.push(row_id); + scores.push(*score); + } + + Ok(SearchResult::new(row_ids, scores)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::io::FileIOBuilder; + use crate::tantivy::writer::TantivyFullTextWriter; + + async fn create_test_reader() -> TantivyFullTextReader { + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let mut writer = TantivyFullTextWriter::new().unwrap(); + writer + .add_document(0, Some("the quick brown fox jumps over the lazy dog")) + .unwrap(); + writer + .add_document(1, Some("rust programming language")) + .unwrap(); + writer + .add_document(2, Some("apache paimon data lake")) + .unwrap(); + writer + .add_document(3, Some("full text search with tantivy")) + .unwrap(); + writer + .add_document(4, Some("the fox is quick and brown")) + .unwrap(); + let output = file_io.new_output("/test_reader_index.archive").unwrap(); + writer.finish(&output).await.unwrap(); + TantivyFullTextReader::from_input_file(&output.to_input_file()) + .await + .unwrap() + } + + #[tokio::test] + async fn test_search_basic() { + let reader = create_test_reader().await; + let result = reader.search("fox", 10).unwrap(); + assert_eq!(result.len(), 2); + assert!(result.row_ids.contains(&0)); + assert!(result.row_ids.contains(&4)); + } + + #[tokio::test] + async fn test_search_limit() { + let reader = create_test_reader().await; + let result = reader.search("fox", 1).unwrap(); + assert_eq!(result.len(), 1); + } + + #[tokio::test] + async fn test_search_no_match() { + let reader = create_test_reader().await; + let result = reader.search("nonexistent", 10).unwrap(); + assert!(result.is_empty()); + } + + #[tokio::test] + async fn test_search_scored() { + let reader = create_test_reader().await; + let result = reader.search("tantivy", 10).unwrap(); + assert_eq!(result.len(), 1); + assert!(result.row_ids.contains(&3)); + assert!(result.scores[0] > 0.0); + } + + #[tokio::test] + async fn test_search_with_offset() { + let reader = create_test_reader().await; + let result = reader.search("fox", 10).unwrap(); + let offset_result = result.offset(1000); + assert!(offset_result.row_ids.contains(&1000)); + assert!(offset_result.row_ids.contains(&1004)); + } +} diff --git a/crates/paimon/src/tantivy/writer.rs b/crates/paimon/src/tantivy/writer.rs new file mode 100644 index 0000000..a12ed30 --- /dev/null +++ b/crates/paimon/src/tantivy/writer.rs @@ -0,0 +1,242 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tantivy full-text index writer. +//! +//! Writes documents (rowId + text) to a local Tantivy index, then packs the +//! index directory into a single archive file (Big-Endian, Java-compatible) +//! and writes it to an `OutputFile`. +//! +//! Reference: `org.apache.paimon.tantivy.index.TantivyFullTextGlobalIndexWriter` + +use bytes::Bytes; +use std::io::Read; + +use crate::io::OutputFile; +use tantivy::schema::{Field, NumericOptions, Schema, TEXT}; +use tantivy::{Index, IndexWriter, TantivyDocument}; + +/// Builds the fixed schema: `row_id` (u64 fast+stored+indexed) + `text` (full-text). +fn build_schema() -> (Schema, Field, Field) { + let mut builder = Schema::builder(); + let row_id_field = builder.add_u64_field( + "row_id", + NumericOptions::default() + .set_stored() + .set_indexed() + .set_fast(), + ); + let text_field = builder.add_text_field("text", TEXT); + (builder.build(), row_id_field, text_field) +} + +/// Writer for creating a Tantivy full-text index and packing it into an archive. +pub struct TantivyFullTextWriter { + writer: IndexWriter, + row_id_field: Field, + text_field: Field, + temp_dir: tempfile::TempDir, + row_count: u64, +} + +impl TantivyFullTextWriter { + /// Create a new writer. The index is built in a temporary directory. + pub fn new() -> crate::Result { + let temp_dir = tempfile::tempdir().map_err(|e| crate::Error::UnexpectedError { + message: format!("Failed to create temp directory for Tantivy index: {}", e), + source: None, + })?; + + let (schema, row_id_field, text_field) = build_schema(); + let index = Index::create_in_dir(temp_dir.path(), schema).map_err(|e| { + crate::Error::UnexpectedError { + message: format!("Failed to create Tantivy index: {}", e), + source: None, + } + })?; + let writer = index + .writer(50_000_000) + .map_err(|e| crate::Error::UnexpectedError { + message: format!("Failed to create Tantivy writer: {}", e), + source: None, + })?; + + Ok(Self { + writer, + row_id_field, + text_field, + temp_dir, + row_count: 0, + }) + } + + /// Add a document with the given row ID and text content. + /// If text is None, the row ID is still incremented (null value). + pub fn add_document(&mut self, row_id: u64, text: Option<&str>) -> crate::Result<()> { + if let Some(text) = text { + let mut doc = TantivyDocument::new(); + doc.add_u64(self.row_id_field, row_id); + doc.add_text(self.text_field, text); + self.writer + .add_document(doc) + .map_err(|e| crate::Error::UnexpectedError { + message: format!("Failed to add document: {}", e), + source: None, + })?; + } + self.row_count += 1; + Ok(()) + } + + /// Commit, pack the index into an archive, and write it to the given `OutputFile`. + /// + /// Returns `false` if no documents were written (nothing is written to the file). + /// + /// Reference: `TantivyFullTextGlobalIndexWriter.packIndex()` + pub async fn finish(mut self, output: &OutputFile) -> crate::Result { + if self.row_count == 0 { + return Ok(false); + } + + self.writer + .commit() + .map_err(|e| crate::Error::UnexpectedError { + message: format!("Failed to commit Tantivy index: {}", e), + source: None, + })?; + + // Drop the writer to release file locks before packing. + drop(self.writer); + + // Stream the archive directly to the OutputFile. + let mut file_writer = output.writer().await?; + + // Collect file entries from the temp directory. + let mut entries: Vec<(String, std::path::PathBuf)> = Vec::new(); + for entry in + std::fs::read_dir(self.temp_dir.path()).map_err(|e| crate::Error::UnexpectedError { + message: format!("Failed to read Tantivy index directory: {}", e), + source: None, + })? + { + let entry = entry.map_err(|e| crate::Error::UnexpectedError { + message: format!("Failed to read directory entry: {}", e), + source: None, + })?; + if entry.file_type().map(|t| t.is_file()).unwrap_or(false) { + let name = entry.file_name().to_string_lossy().to_string(); + entries.push((name, entry.path())); + } + } + + // Write file count (4 bytes BE). + file_writer + .write(Bytes::from((entries.len() as i32).to_be_bytes().to_vec())) + .await?; + + // Write each file: name_len + name + data_len + data (chunked). + const CHUNK_SIZE: usize = 4 * 1024 * 1024; // 4 MiB + for (name, path) in &entries { + let name_bytes = name.as_bytes(); + file_writer + .write(Bytes::from( + (name_bytes.len() as i32).to_be_bytes().to_vec(), + )) + .await?; + file_writer.write(Bytes::from(name_bytes.to_vec())).await?; + + let file_len = std::fs::metadata(path) + .map_err(|e| crate::Error::UnexpectedError { + message: format!("Failed to stat index file '{}': {}", name, e), + source: None, + })? + .len(); + file_writer + .write(Bytes::from((file_len as i64).to_be_bytes().to_vec())) + .await?; + + let mut file = + std::fs::File::open(path).map_err(|e| crate::Error::UnexpectedError { + message: format!("Failed to open index file '{}': {}", name, e), + source: None, + })?; + let mut buf = vec![0u8; CHUNK_SIZE]; + loop { + let n = file + .read(&mut buf) + .map_err(|e| crate::Error::UnexpectedError { + message: format!("Failed to read index file '{}': {}", name, e), + source: None, + })?; + if n == 0 { + break; + } + file_writer.write(Bytes::copy_from_slice(&buf[..n])).await?; + } + } + + file_writer.close().await?; + Ok(true) + } + + /// Number of rows processed (including nulls). + pub fn row_count(&self) -> u64 { + self.row_count + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::io::FileIOBuilder; + use crate::tantivy::reader::TantivyFullTextReader; + + #[tokio::test] + async fn test_write_and_read_roundtrip() { + let file_io = FileIOBuilder::new("memory").build().unwrap(); + + let mut writer = TantivyFullTextWriter::new().unwrap(); + writer.add_document(0, Some("hello world")).unwrap(); + writer.add_document(1, Some("foo bar baz")).unwrap(); + writer.add_document(2, None).unwrap(); + writer.add_document(3, Some("hello again")).unwrap(); + + let output = file_io.new_output("/test_index.archive").unwrap(); + let written = writer.finish(&output).await.unwrap(); + assert!(written); + + let input = output.to_input_file(); + let reader = TantivyFullTextReader::from_input_file(&input) + .await + .unwrap(); + let result = reader.search("hello", 10).unwrap(); + assert_eq!(result.len(), 2); + assert!(result.row_ids.contains(&0)); + assert!(result.row_ids.contains(&3)); + } + + #[tokio::test] + async fn test_empty_writer() { + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let output = file_io.new_output("/empty_index.archive").unwrap(); + + let writer = TantivyFullTextWriter::new().unwrap(); + let written = writer.finish(&output).await.unwrap(); + assert!(!written); + assert!(!output.exists().await.unwrap()); + } +}