Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 276 additions & 1 deletion crates/paimon/src/arrow/format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use arrow_schema::ArrowError;
use async_trait::async_trait;
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::{StreamExt, TryFutureExt};
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use parquet::arrow::arrow_reader::{
ArrowPredicate, ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
};
Expand Down Expand Up @@ -785,6 +785,17 @@ struct ArrowFileReader {
r: Box<dyn FileRead>,
}

/// coalesce threshold: 1 MiB.
const RANGE_COALESCE_BYTES: u64 = 1024 * 1024;
/// concurrent range fetches.
const RANGE_FETCH_CONCURRENCY: usize = 10;
/// metadata prefetch hint: 512 KiB.
const METADATA_SIZE_HINT: usize = 512 * 1024;
/// Minimum range size for splitting: 4 MiB.
/// Ranges smaller than this will not be split further to avoid
/// excessive small IO requests whose per-request overhead dominates.
const MIN_SPLIT_SIZE: u64 = 4 * 1024 * 1024;

impl ArrowFileReader {
fn new(file_size: u64, r: Box<dyn FileRead>) -> Self {
Self { file_size, r }
Expand All @@ -809,14 +820,128 @@ impl AsyncFileReader for ArrowFileReader {
self.read_bytes(range)
}

fn get_byte_ranges(
&mut self,
ranges: Vec<Range<u64>>,
) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> {
let coalesce_bytes = RANGE_COALESCE_BYTES;
let concurrency = RANGE_FETCH_CONCURRENCY;

async move {
if ranges.is_empty() {
return Ok(vec![]);
}

// Two-phase range optimization:
// Phase 1: Merge nearby ranges based on coalesce threshold.
let coalesced = merge_byte_ranges(&ranges, coalesce_bytes);
// Phase 2: Split large merged ranges to utilize concurrency,
// but only at original range boundaries.
let fetch_ranges = split_ranges_for_concurrency(coalesced, concurrency);

// Fetch merged ranges concurrently.
let r = &self.r;
let fetched: Vec<Bytes> = if fetch_ranges.len() <= concurrency {
// All ranges fit within the concurrency limit — fire them all at once.
futures::future::try_join_all(fetch_ranges.iter().map(|range| {
r.read(range.clone())
.map_err(|e| parquet::errors::ParquetError::External(format!("{e}").into()))
}))
.await?
} else {
// More ranges than concurrency slots — use buffered stream.
futures::stream::iter(fetch_ranges.iter().cloned())
.map(|range| async move {
r.read(range).await.map_err(|e| {
parquet::errors::ParquetError::External(format!("{e}").into())
})
})
.buffered(concurrency)
.try_collect()
.await?
};

// Slice the fetched data back into the originally requested
// ranges. A single original range may span multiple fetch
// chunks, so we copy from as many chunks as needed.
let result: parquet::errors::Result<Vec<Bytes>> = ranges
.iter()
.map(|range| {
// Find the first fetch chunk whose end is past range.start.
let first = fetch_ranges.partition_point(|v| v.end <= range.start);
if first >= fetch_ranges.len() {
return Err(parquet::errors::ParquetError::General(format!(
"No fetch range covers requested range {}..{}",
range.start, range.end
)));
}

let need = (range.end - range.start) as usize;

// Fast path: the original range fits entirely within one
// fetch chunk — zero-copy slice.
let fr = &fetch_ranges[first];
if range.end <= fr.end {
let start = (range.start - fr.start) as usize;
let end = (range.end - fr.start) as usize;
return Ok(fetched[first].slice(start..end));
}

// Slow path: the original range spans multiple fetch
// chunks — copy pieces into a new buffer (mirrors Java's
// copyMultiBytesToBytes).
let mut buf = Vec::with_capacity(need);
let mut pos = range.start;
for i in first..fetch_ranges.len() {
if pos >= range.end {
break;
}
let fr = &fetch_ranges[i];
let chunk = &fetched[i];
let src_start = (pos - fr.start) as usize;
let src_end = ((range.end.min(fr.end)) - fr.start) as usize;
if src_end > chunk.len() {
return Err(parquet::errors::ParquetError::General(format!(
"Fetched data too short for range {}..{}: \
chunk {}..{} has {} bytes, need up to offset {}",
range.start,
range.end,
fr.start,
fr.end,
chunk.len(),
src_end,
)));
}
buf.extend_from_slice(&chunk[src_start..src_end]);
pos = fr.end;
}
if buf.len() != need {
return Err(parquet::errors::ParquetError::General(format!(
"Assembled {} bytes for range {}..{}, expected {}",
buf.len(),
range.start,
range.end,
need,
)));
}
Ok(Bytes::from(buf))
})
.collect();
result
}
.boxed()
}

fn get_metadata(
&mut self,
options: Option<&ArrowReaderOptions>,
) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
let metadata_opts = options.map(|o| o.metadata_options().clone());
let prefetch_hint = Some(METADATA_SIZE_HINT);
Box::pin(async move {
let file_size = self.file_size;
let metadata = ParquetMetaDataReader::new()
.with_prefetch_hint(prefetch_hint)
.with_metadata_options(metadata_opts)
.load_and_finish(self, file_size)
.await?;
Expand All @@ -825,6 +950,84 @@ impl AsyncFileReader for ArrowFileReader {
}
}

// ---------------------------------------------------------------------------
// Range coalescing
// ---------------------------------------------------------------------------

/// Merge nearby byte ranges to reduce the number of requests.
///
/// Ranges whose gap is ≤ `coalesce` bytes are merged into a single range.
/// The input does not need to be sorted.
fn merge_byte_ranges(ranges: &[Range<u64>], coalesce: u64) -> Vec<Range<u64>> {
if ranges.is_empty() {
return vec![];
}

let mut sorted = ranges.to_vec();
sorted.sort_unstable_by_key(|r| r.start);

let mut merged = Vec::with_capacity(sorted.len());
let mut start_idx = 0;
let mut end_idx = 1;

while start_idx != sorted.len() {
let mut range_end = sorted[start_idx].end;

while end_idx != sorted.len()
&& sorted[end_idx]
.start
.checked_sub(range_end)
.map(|delta| delta <= coalesce)
.unwrap_or(true)
{
range_end = range_end.max(sorted[end_idx].end);
end_idx += 1;
}

merged.push(sorted[start_idx].start..range_end);
start_idx = end_idx;
end_idx += 1;
}

merged
}

/// Split merged ranges into fixed-size batches to utilize concurrency,
/// Each merged range is divided into chunks of `expected_size`,
/// with the last chunk taking whatever remains.
/// Ranges smaller than `2 * MIN_SPLIT_SIZE` are kept as-is to
/// avoid excessive small IO requests.
fn split_ranges_for_concurrency(merged: Vec<Range<u64>>, target_count: usize) -> Vec<Range<u64>> {
if merged.is_empty() || target_count <= 1 {
return merged;
}

let mut result = Vec::with_capacity(merged.len());

for range in &merged {
let length = range.end - range.start;
let expected_size = MIN_SPLIT_SIZE.max(length / target_count as u64 + 1);
let min_remain = expected_size.max(MIN_SPLIT_SIZE * 2);

let mut offset = range.start;
let end = range.end;

loop {
if offset + min_remain > end {
if offset < end {
result.push(offset..end);
}
break;
} else {
result.push(offset..offset + expected_size);
offset += expected_size;
}
}
}

result
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -878,4 +1081,76 @@ mod tests {

assert!(row_filter.is_some());
}

// -----------------------------------------------------------------------
// merge_byte_ranges tests
// -----------------------------------------------------------------------

#[test]
fn test_merge_byte_ranges_empty() {
assert_eq!(
super::merge_byte_ranges(&[], 1024),
Vec::<std::ops::Range<u64>>::new()
);
}

#[test]
fn test_merge_byte_ranges_no_coalesce() {
// Ranges far apart should not be merged
let ranges = vec![0..100, 1_000_000..1_000_100];
let merged = super::merge_byte_ranges(&ranges, 1024);
assert_eq!(merged, vec![0..100, 1_000_000..1_000_100]);
}

#[test]
fn test_merge_byte_ranges_coalesce() {
// Ranges within the gap threshold should be merged
let ranges = vec![0..100, 200..300, 500..600];
let merged = super::merge_byte_ranges(&ranges, 1024);
assert_eq!(merged, vec![0..600]);
}

#[test]
fn test_merge_byte_ranges_zero_coalesce_gap() {
// With coalesce=0, ranges with a 1-byte gap should NOT merge
let ranges = vec![0..100, 101..200];
let merged = super::merge_byte_ranges(&ranges, 0);
assert_eq!(merged, vec![0..100, 101..200]);
}

// -----------------------------------------------------------------------
// split_ranges_for_concurrency tests
// -----------------------------------------------------------------------

#[test]
fn test_split_single_small_range() {
// A single range smaller than 2 * MIN_SPLIT_SIZE should not be split.
#[allow(clippy::single_range_in_vec_init)]
let merged = vec![0..1000];
let result = super::split_ranges_for_concurrency(merged, 4);
assert_eq!(result.len(), 1);
assert_eq!(result[0], 0..1000);
}

#[test]
fn test_split_large_range_into_batches() {
let mb = 1024 * 1024u64;
let size = 40 * mb;
#[allow(clippy::single_range_in_vec_init)]
let merged = vec![0..size];
let result = super::split_ranges_for_concurrency(merged, 4);
assert!(result.len() > 1);
assert_eq!(result.first().unwrap().start, 0);
assert_eq!(result.last().unwrap().end, size);
for i in 1..result.len() {
assert_eq!(result[i].start, result[i - 1].end);
}
}

#[test]
fn test_split_empty() {
let merged: Vec<std::ops::Range<u64>> = vec![];
let result = super::split_ranges_for_concurrency(merged, 4);
assert!(result.is_empty());
}
}
2 changes: 1 addition & 1 deletion crates/paimon/src/io/file_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ impl FileIOBuilder {
}

#[async_trait::async_trait]
pub trait FileRead: Send + Unpin + 'static {
pub trait FileRead: Send + Sync + Unpin + 'static {
async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
}

Expand Down
Loading