diff --git a/crates/paimon/src/arrow/format/parquet.rs b/crates/paimon/src/arrow/format/parquet.rs index 915bd56..cf470b5 100644 --- a/crates/paimon/src/arrow/format/parquet.rs +++ b/crates/paimon/src/arrow/format/parquet.rs @@ -33,7 +33,7 @@ use arrow_schema::ArrowError; use async_trait::async_trait; use bytes::Bytes; use futures::future::BoxFuture; -use futures::{StreamExt, TryFutureExt}; +use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use parquet::arrow::arrow_reader::{ ArrowPredicate, ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector, }; @@ -785,6 +785,17 @@ struct ArrowFileReader { r: Box, } +/// coalesce threshold: 1 MiB. +const RANGE_COALESCE_BYTES: u64 = 1024 * 1024; +/// concurrent range fetches. +const RANGE_FETCH_CONCURRENCY: usize = 10; +/// metadata prefetch hint: 512 KiB. +const METADATA_SIZE_HINT: usize = 512 * 1024; +/// Minimum range size for splitting: 4 MiB. +/// Ranges smaller than this will not be split further to avoid +/// excessive small IO requests whose per-request overhead dominates. +const MIN_SPLIT_SIZE: u64 = 4 * 1024 * 1024; + impl ArrowFileReader { fn new(file_size: u64, r: Box) -> Self { Self { file_size, r } @@ -809,14 +820,128 @@ impl AsyncFileReader for ArrowFileReader { self.read_bytes(range) } + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> { + let coalesce_bytes = RANGE_COALESCE_BYTES; + let concurrency = RANGE_FETCH_CONCURRENCY; + + async move { + if ranges.is_empty() { + return Ok(vec![]); + } + + // Two-phase range optimization: + // Phase 1: Merge nearby ranges based on coalesce threshold. + let coalesced = merge_byte_ranges(&ranges, coalesce_bytes); + // Phase 2: Split large merged ranges to utilize concurrency, + // but only at original range boundaries. + let fetch_ranges = split_ranges_for_concurrency(coalesced, concurrency); + + // Fetch merged ranges concurrently. + let r = &self.r; + let fetched: Vec = if fetch_ranges.len() <= concurrency { + // All ranges fit within the concurrency limit — fire them all at once. + futures::future::try_join_all(fetch_ranges.iter().map(|range| { + r.read(range.clone()) + .map_err(|e| parquet::errors::ParquetError::External(format!("{e}").into())) + })) + .await? + } else { + // More ranges than concurrency slots — use buffered stream. + futures::stream::iter(fetch_ranges.iter().cloned()) + .map(|range| async move { + r.read(range).await.map_err(|e| { + parquet::errors::ParquetError::External(format!("{e}").into()) + }) + }) + .buffered(concurrency) + .try_collect() + .await? + }; + + // Slice the fetched data back into the originally requested + // ranges. A single original range may span multiple fetch + // chunks, so we copy from as many chunks as needed. + let result: parquet::errors::Result> = ranges + .iter() + .map(|range| { + // Find the first fetch chunk whose end is past range.start. + let first = fetch_ranges.partition_point(|v| v.end <= range.start); + if first >= fetch_ranges.len() { + return Err(parquet::errors::ParquetError::General(format!( + "No fetch range covers requested range {}..{}", + range.start, range.end + ))); + } + + let need = (range.end - range.start) as usize; + + // Fast path: the original range fits entirely within one + // fetch chunk — zero-copy slice. + let fr = &fetch_ranges[first]; + if range.end <= fr.end { + let start = (range.start - fr.start) as usize; + let end = (range.end - fr.start) as usize; + return Ok(fetched[first].slice(start..end)); + } + + // Slow path: the original range spans multiple fetch + // chunks — copy pieces into a new buffer (mirrors Java's + // copyMultiBytesToBytes). + let mut buf = Vec::with_capacity(need); + let mut pos = range.start; + for i in first..fetch_ranges.len() { + if pos >= range.end { + break; + } + let fr = &fetch_ranges[i]; + let chunk = &fetched[i]; + let src_start = (pos - fr.start) as usize; + let src_end = ((range.end.min(fr.end)) - fr.start) as usize; + if src_end > chunk.len() { + return Err(parquet::errors::ParquetError::General(format!( + "Fetched data too short for range {}..{}: \ + chunk {}..{} has {} bytes, need up to offset {}", + range.start, + range.end, + fr.start, + fr.end, + chunk.len(), + src_end, + ))); + } + buf.extend_from_slice(&chunk[src_start..src_end]); + pos = fr.end; + } + if buf.len() != need { + return Err(parquet::errors::ParquetError::General(format!( + "Assembled {} bytes for range {}..{}, expected {}", + buf.len(), + range.start, + range.end, + need, + ))); + } + Ok(Bytes::from(buf)) + }) + .collect(); + result + } + .boxed() + } + fn get_metadata( &mut self, options: Option<&ArrowReaderOptions>, ) -> BoxFuture<'_, parquet::errors::Result>> { let metadata_opts = options.map(|o| o.metadata_options().clone()); + let prefetch_hint = Some(METADATA_SIZE_HINT); Box::pin(async move { let file_size = self.file_size; let metadata = ParquetMetaDataReader::new() + .with_prefetch_hint(prefetch_hint) .with_metadata_options(metadata_opts) .load_and_finish(self, file_size) .await?; @@ -825,6 +950,84 @@ impl AsyncFileReader for ArrowFileReader { } } +// --------------------------------------------------------------------------- +// Range coalescing +// --------------------------------------------------------------------------- + +/// Merge nearby byte ranges to reduce the number of requests. +/// +/// Ranges whose gap is ≤ `coalesce` bytes are merged into a single range. +/// The input does not need to be sorted. +fn merge_byte_ranges(ranges: &[Range], coalesce: u64) -> Vec> { + if ranges.is_empty() { + return vec![]; + } + + let mut sorted = ranges.to_vec(); + sorted.sort_unstable_by_key(|r| r.start); + + let mut merged = Vec::with_capacity(sorted.len()); + let mut start_idx = 0; + let mut end_idx = 1; + + while start_idx != sorted.len() { + let mut range_end = sorted[start_idx].end; + + while end_idx != sorted.len() + && sorted[end_idx] + .start + .checked_sub(range_end) + .map(|delta| delta <= coalesce) + .unwrap_or(true) + { + range_end = range_end.max(sorted[end_idx].end); + end_idx += 1; + } + + merged.push(sorted[start_idx].start..range_end); + start_idx = end_idx; + end_idx += 1; + } + + merged +} + +/// Split merged ranges into fixed-size batches to utilize concurrency, +/// Each merged range is divided into chunks of `expected_size`, +/// with the last chunk taking whatever remains. +/// Ranges smaller than `2 * MIN_SPLIT_SIZE` are kept as-is to +/// avoid excessive small IO requests. +fn split_ranges_for_concurrency(merged: Vec>, target_count: usize) -> Vec> { + if merged.is_empty() || target_count <= 1 { + return merged; + } + + let mut result = Vec::with_capacity(merged.len()); + + for range in &merged { + let length = range.end - range.start; + let expected_size = MIN_SPLIT_SIZE.max(length / target_count as u64 + 1); + let min_remain = expected_size.max(MIN_SPLIT_SIZE * 2); + + let mut offset = range.start; + let end = range.end; + + loop { + if offset + min_remain > end { + if offset < end { + result.push(offset..end); + } + break; + } else { + result.push(offset..offset + expected_size); + offset += expected_size; + } + } + } + + result +} + // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- @@ -878,4 +1081,76 @@ mod tests { assert!(row_filter.is_some()); } + + // ----------------------------------------------------------------------- + // merge_byte_ranges tests + // ----------------------------------------------------------------------- + + #[test] + fn test_merge_byte_ranges_empty() { + assert_eq!( + super::merge_byte_ranges(&[], 1024), + Vec::>::new() + ); + } + + #[test] + fn test_merge_byte_ranges_no_coalesce() { + // Ranges far apart should not be merged + let ranges = vec![0..100, 1_000_000..1_000_100]; + let merged = super::merge_byte_ranges(&ranges, 1024); + assert_eq!(merged, vec![0..100, 1_000_000..1_000_100]); + } + + #[test] + fn test_merge_byte_ranges_coalesce() { + // Ranges within the gap threshold should be merged + let ranges = vec![0..100, 200..300, 500..600]; + let merged = super::merge_byte_ranges(&ranges, 1024); + assert_eq!(merged, vec![0..600]); + } + + #[test] + fn test_merge_byte_ranges_zero_coalesce_gap() { + // With coalesce=0, ranges with a 1-byte gap should NOT merge + let ranges = vec![0..100, 101..200]; + let merged = super::merge_byte_ranges(&ranges, 0); + assert_eq!(merged, vec![0..100, 101..200]); + } + + // ----------------------------------------------------------------------- + // split_ranges_for_concurrency tests + // ----------------------------------------------------------------------- + + #[test] + fn test_split_single_small_range() { + // A single range smaller than 2 * MIN_SPLIT_SIZE should not be split. + #[allow(clippy::single_range_in_vec_init)] + let merged = vec![0..1000]; + let result = super::split_ranges_for_concurrency(merged, 4); + assert_eq!(result.len(), 1); + assert_eq!(result[0], 0..1000); + } + + #[test] + fn test_split_large_range_into_batches() { + let mb = 1024 * 1024u64; + let size = 40 * mb; + #[allow(clippy::single_range_in_vec_init)] + let merged = vec![0..size]; + let result = super::split_ranges_for_concurrency(merged, 4); + assert!(result.len() > 1); + assert_eq!(result.first().unwrap().start, 0); + assert_eq!(result.last().unwrap().end, size); + for i in 1..result.len() { + assert_eq!(result[i].start, result[i - 1].end); + } + } + + #[test] + fn test_split_empty() { + let merged: Vec> = vec![]; + let result = super::split_ranges_for_concurrency(merged, 4); + assert!(result.is_empty()); + } } diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs index 4b0ffab..6f41f11 100644 --- a/crates/paimon/src/io/file_io.rs +++ b/crates/paimon/src/io/file_io.rs @@ -279,7 +279,7 @@ impl FileIOBuilder { } #[async_trait::async_trait] -pub trait FileRead: Send + Unpin + 'static { +pub trait FileRead: Send + Sync + Unpin + 'static { async fn read(&self, range: Range) -> crate::Result; }