Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
293 changes: 291 additions & 2 deletions crates/paimon/src/arrow/format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use arrow_schema::ArrowError;
use async_trait::async_trait;
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::{StreamExt, TryFutureExt};
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use parquet::arrow::arrow_reader::{
ArrowPredicate, ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
};
Expand Down Expand Up @@ -783,11 +783,33 @@ fn build_row_ranges_selection(
struct ArrowFileReader {
file_size: u64,
r: Box<dyn FileRead>,
/// Maximum gap (in bytes) between two ranges that will be merged into a
/// single fetch request. Defaults to 1 MiB.
range_coalesce_bytes: u64,
/// Maximum number of merged ranges to fetch concurrently. Defaults to 8.
range_fetch_concurrency: usize,
/// Hint for the number of bytes to speculatively read from the end of the
/// file when loading Parquet metadata. A sufficiently large hint reduces
/// footer loading from 2 round-trips to 1. Defaults to 512 KiB.
metadata_size_hint: Option<usize>,
}

/// Default coalesce threshold: 1 MiB.
const DEFAULT_RANGE_COALESCE_BYTES: u64 = 1024 * 1024;
/// Default concurrent range fetches.
const DEFAULT_RANGE_FETCH_CONCURRENCY: usize = 8;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same to iceberg, maybe 10?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed.

/// Default metadata prefetch hint: 512 KiB.
const DEFAULT_METADATA_SIZE_HINT: usize = 512 * 1024;

impl ArrowFileReader {
fn new(file_size: u64, r: Box<dyn FileRead>) -> Self {
Self { file_size, r }
Self {
file_size,
r,
range_coalesce_bytes: DEFAULT_RANGE_COALESCE_BYTES,
range_fetch_concurrency: DEFAULT_RANGE_FETCH_CONCURRENCY,
metadata_size_hint: Some(DEFAULT_METADATA_SIZE_HINT),
}
}

fn read_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
Expand All @@ -809,14 +831,73 @@ impl AsyncFileReader for ArrowFileReader {
self.read_bytes(range)
}

fn get_byte_ranges(
&mut self,
ranges: Vec<Range<u64>>,
) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> {
let coalesce_bytes = self.range_coalesce_bytes;
let concurrency = self.range_fetch_concurrency.max(1);

async move {
if ranges.is_empty() {
return Ok(vec![]);
}

// Two-phase range optimization:
// Phase 1: Merge nearby ranges based on coalesce threshold.
let coalesced = merge_byte_ranges(&ranges, coalesce_bytes);
// Phase 2: Split large merged ranges to utilize concurrency,
// but only at original range boundaries.
let fetch_ranges = split_ranges_for_concurrency(coalesced, &ranges, concurrency);

// Fetch merged ranges concurrently.
let r = &self.r;
let fetched: Vec<Bytes> = if fetch_ranges.len() <= concurrency {
// All ranges fit within the concurrency limit — fire them all at once.
futures::future::try_join_all(fetch_ranges.iter().map(|range| {
r.read(range.clone())
.map_err(|e| parquet::errors::ParquetError::External(format!("{e}").into()))
}))
.await?
} else {
// More ranges than concurrency slots — use buffered stream.
futures::stream::iter(fetch_ranges.iter().cloned())
.map(|range| async move {
r.read(range).await.map_err(|e| {
parquet::errors::ParquetError::External(format!("{e}").into())
})
})
.buffered(concurrency)
.try_collect()
.await?
};

// Slice the fetched data back into the originally requested ranges.
Ok(ranges
.iter()
.map(|range| {
let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If partition_point returns 0, 0-1 overflow panic will occur here. Although logically fetch_range must cover all original ranges, this is an implicit assumption. It is recommended to add a debug_assert! Or use checked_stub+more explicit error messages

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.

let fetch_range = &fetch_ranges[idx];
let fetch_bytes = &fetched[idx];
let start = (range.start - fetch_range.start) as usize;
let end = (range.end - fetch_range.start) as usize;
fetch_bytes.slice(start..end.min(fetch_bytes.len()))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the data returned by fetch is not long enough, truncated data will be silently returned here, and downstream may obtain incomplete column chunks, leading to parsing errors or data corruption. Suggest changing it to assert or returning an error, do not silently swallow it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I will change it.

})
.collect())
}
.boxed()
}

fn get_metadata(
&mut self,
options: Option<&ArrowReaderOptions>,
) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
let metadata_opts = options.map(|o| o.metadata_options().clone());
let prefetch_hint = self.metadata_size_hint;
Box::pin(async move {
let file_size = self.file_size;
let metadata = ParquetMetaDataReader::new()
.with_prefetch_hint(prefetch_hint)
.with_metadata_options(metadata_opts)
.load_and_finish(self, file_size)
.await?;
Expand All @@ -825,6 +906,98 @@ impl AsyncFileReader for ArrowFileReader {
}
}

// ---------------------------------------------------------------------------
// Range coalescing
// ---------------------------------------------------------------------------

/// Merge nearby byte ranges to reduce the number of requests.
///
/// Ranges whose gap is ≤ `coalesce` bytes are merged into a single range.
/// The input does not need to be sorted.
fn merge_byte_ranges(ranges: &[Range<u64>], coalesce: u64) -> Vec<Range<u64>> {
if ranges.is_empty() {
return vec![];
}

let mut sorted = ranges.to_vec();
sorted.sort_unstable_by_key(|r| r.start);

let mut merged = Vec::with_capacity(sorted.len());
let mut start_idx = 0;
let mut end_idx = 1;

while start_idx != sorted.len() {
let mut range_end = sorted[start_idx].end;

while end_idx != sorted.len()
&& sorted[end_idx]
.start
.checked_sub(range_end)
.map(|delta| delta <= coalesce)
.unwrap_or(true)
{
range_end = range_end.max(sorted[end_idx].end);
end_idx += 1;
}

merged.push(sorted[start_idx].start..range_end);
start_idx = end_idx;
end_idx += 1;
}

merged
}

/// Split merged ranges to utilize concurrency by repeatedly bisecting the
/// largest range at the nearest original-range boundary. This guarantees
/// every original range stays fully inside one fetch range.
fn split_ranges_for_concurrency(
merged: Vec<Range<u64>>,
original: &[Range<u64>],
target_count: usize,
) -> Vec<Range<u64>> {
if merged.is_empty() || target_count <= 1 || merged.len() >= target_count {
return merged;
}

// Collect all original-range start points as candidate split boundaries.
let mut boundaries: Vec<u64> = original.iter().map(|r| r.start).collect();
boundaries.sort_unstable();
boundaries.dedup();

let mut result = merged;

while result.len() < target_count {
// Pick the largest range.
let (idx, _) = result
.iter()
.enumerate()
.max_by_key(|(_, r)| r.end - r.start)
.unwrap();

let range = &result[idx];
let mid = range.start + (range.end - range.start) / 2;

// Find the boundary closest to the midpoint that actually splits.
let best = boundaries
.iter()
.copied()
.filter(|&b| b > range.start && b < range.end)
.min_by_key(|&b| (b as i64 - mid as i64).unsigned_abs());

let Some(split_at) = best else {
break; // No valid split point in the largest range; stop.
};

let left = range.start..split_at;
let right = split_at..range.end;
result[idx] = left;
result.insert(idx + 1, right);
}

result
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -878,4 +1051,120 @@ mod tests {

assert!(row_filter.is_some());
}

// -----------------------------------------------------------------------
// merge_byte_ranges tests
// -----------------------------------------------------------------------

#[test]
fn test_merge_byte_ranges_empty() {
assert_eq!(
super::merge_byte_ranges(&[], 1024),
Vec::<std::ops::Range<u64>>::new()
);
}

#[test]
fn test_merge_byte_ranges_no_coalesce() {
// Ranges far apart should not be merged
let ranges = vec![0..100, 1_000_000..1_000_100];
let merged = super::merge_byte_ranges(&ranges, 1024);
assert_eq!(merged, vec![0..100, 1_000_000..1_000_100]);
}

#[test]
fn test_merge_byte_ranges_coalesce() {
// Ranges within the gap threshold should be merged
let ranges = vec![0..100, 200..300, 500..600];
let merged = super::merge_byte_ranges(&ranges, 1024);
assert_eq!(merged, vec![0..600]);
}

#[test]
fn test_merge_byte_ranges_zero_coalesce_gap() {
// With coalesce=0, ranges with a 1-byte gap should NOT merge
let ranges = vec![0..100, 101..200];
let merged = super::merge_byte_ranges(&ranges, 0);
assert_eq!(merged, vec![0..100, 101..200]);
}

// -----------------------------------------------------------------------
// split_ranges_for_concurrency tests
// -----------------------------------------------------------------------

#[test]
fn test_split_single_range() {
// One merged range from a single original — no boundary to split at.
#[allow(clippy::single_range_in_vec_init)]
let merged = vec![0..1000];
#[allow(clippy::single_range_in_vec_init)]
let original = vec![0..1000];
let result = super::split_ranges_for_concurrency(merged, &original, 4);
assert_eq!(result.len(), 1);
assert_eq!(result[0], 0..1000);
}

#[test]
fn test_split_single_range_multiple_originals() {
// One merged range containing 4 originals — bisect at boundaries.
let original = vec![0..200, 250..500, 550..750, 800..1000];
#[allow(clippy::single_range_in_vec_init)]
let merged = vec![0..1000];
let result = super::split_ranges_for_concurrency(merged, &original, 4);
assert_eq!(result.len(), 4);
assert_eq!(result[0].start, 0);
assert_eq!(result.last().unwrap().end, 1000);
for window in result.windows(2) {
assert_eq!(window[0].end, window[1].start);
}
for orig in &original {
assert!(
result
.iter()
.any(|r| r.start <= orig.start && r.end >= orig.end),
"original {orig:?} not fully contained"
);
}
}

#[test]
fn test_split_mixed_sizes() {
let original = vec![0..300, 400..700, 800..1000, 2000..2010];
let merged = vec![0..1000, 2000..2010];
let result = super::split_ranges_for_concurrency(merged, &original, 4);
assert!(result.contains(&(2000..2010)));
for orig in &original {
assert!(
result
.iter()
.any(|r| r.start <= orig.start && r.end >= orig.end),
"original {orig:?} not fully contained"
);
}
}

#[test]
fn test_split_empty() {
let merged: Vec<std::ops::Range<u64>> = vec![];
let original: Vec<std::ops::Range<u64>> = vec![];
let result = super::split_ranges_for_concurrency(merged, &original, 4);
assert!(result.is_empty());
}

#[test]
fn test_split_clustered_and_sparse() {
let original = vec![0..100, 150..250, 300..400, 1000..1010, 2000..2010];
let merged = vec![0..400, 1000..1010, 2000..2010];
let result = super::split_ranges_for_concurrency(merged, &original, 5);
assert!(result.contains(&(1000..1010)));
assert!(result.contains(&(2000..2010)));
for orig in &original {
assert!(
result
.iter()
.any(|r| r.start <= orig.start && r.end >= orig.end),
"original {orig:?} not fully contained"
);
}
}
}
2 changes: 1 addition & 1 deletion crates/paimon/src/io/file_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ impl FileIOBuilder {
}

#[async_trait::async_trait]
pub trait FileRead: Send + Unpin + 'static {
pub trait FileRead: Send + Sync + Unpin + 'static {
async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
}

Expand Down
Loading