Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 194 additions & 10 deletions core/layers/concurrent-limit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#![cfg_attr(docsrs, feature(doc_cfg))]
#![deny(missing_docs)]

use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
Expand Down Expand Up @@ -82,6 +83,28 @@ impl ConcurrentLimitSemaphore for Arc<Semaphore> {
/// # }
/// ```
///
/// Set per-operation concurrent limits to control different operations
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code example seems duplicate as with_operation_limit, do we need both?

/// independently:
///
/// ```no_run
/// # use opendal_core::services;
/// # use opendal_core::Operator;
/// # use opendal_core::Result;
/// # use opendal_core::raw::Operation;
/// # use opendal_layer_concurrent_limit::ConcurrentLimitLayer;
/// #
/// # fn main() -> Result<()> {
/// let _ = Operator::new(services::Memory::default())?
/// .layer(
/// ConcurrentLimitLayer::new(1024)
/// .with_operation_limit(Operation::Read, 64)
/// .with_operation_limit(Operation::Write, 32),
/// )
/// .finish();
/// # Ok(())
/// # }
/// ```
///
/// Share a concurrent limit layer between the operators:
///
/// ```no_run
Expand All @@ -106,6 +129,7 @@ impl ConcurrentLimitSemaphore for Arc<Semaphore> {
pub struct ConcurrentLimitLayer<S: ConcurrentLimitSemaphore = Arc<Semaphore>> {
operation_semaphore: S,
http_semaphore: Option<S>,
operation_limits: Option<Arc<HashMap<Operation, Arc<Semaphore>>>>,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using a HashMap, we could simply use a struct to hold read_semaphore and write_semaphore.

We can implement the functionality step by step: first implement read, write, and list operations, then later add copy and rename.

Another question is: what is the relationship between operation_semaphore and read_semaphore? When read_semaphore is set, will we simply ignore operation_semaphore?

cc @dentiny and @codetyri0n

}

impl ConcurrentLimitLayer<Arc<Semaphore>> {
Expand All @@ -126,6 +150,47 @@ impl ConcurrentLimitLayer<Arc<Semaphore>> {
pub fn with_http_concurrent_limit(self, permits: usize) -> Self {
self.with_http_semaphore(Arc::new(Semaphore::new(permits)))
}

/// Set a concurrent limit for a specific operation type.
///
/// When a per-operation limit is configured, that operation will acquire
/// a permit from its dedicated semaphore instead of the global one. This
/// allows fine-grained control over concurrency for different operation
/// types.
///
/// Operations without a dedicated limit will continue to use the global
/// semaphore.
///
/// # Examples
///
/// Limit read and write concurrency while leaving metadata operations
/// unrestricted by the global limit:
///
/// ```no_run
/// # use opendal_core::services;
/// # use opendal_core::Operator;
/// # use opendal_core::Result;
/// # use opendal_core::raw::Operation;
/// # use opendal_layer_concurrent_limit::ConcurrentLimitLayer;
/// #
/// # fn main() -> Result<()> {
/// let _ = Operator::new(services::Memory::default())?
/// .layer(
/// ConcurrentLimitLayer::new(1024)
/// .with_operation_limit(Operation::Read, 64)
/// .with_operation_limit(Operation::Write, 32),
/// )
/// .finish();
/// # Ok(())
/// # }
/// ```
pub fn with_operation_limit(mut self, op: Operation, permits: usize) -> Self {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things here:

  • I think certain operations are not supported in the concurrent limit layer, for example, copy/rename/presign, but for your implementation it will pass through if users do with_operation_limit(Operation::Copy, 1) which is a no-op, shall we check it?
  • Should we validate permits are larger than 0? otherwise the operation is permanently blocked.

let limits = self
.operation_limits
.get_or_insert_with(|| Arc::new(HashMap::new()));
Arc::make_mut(limits).insert(op, Arc::new(Semaphore::new(permits)));
self
}
}

impl<S: ConcurrentLimitSemaphore> ConcurrentLimitLayer<S> {
Expand All @@ -142,6 +207,7 @@ impl<S: ConcurrentLimitSemaphore> ConcurrentLimitLayer<S> {
Self {
operation_semaphore,
http_semaphore: None,
operation_limits: None,
}
}

Expand Down Expand Up @@ -172,10 +238,21 @@ where
ConcurrentLimitAccessor {
inner,
semaphore: self.operation_semaphore.clone(),
operation_limits: self.operation_limits.clone(),
}
}
}

/// A permit that can come from either the global semaphore (generic `S`) or
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm I'm not convinced the enum here is necessary, does it work?

type Reader = ConcurrentLimitLayerWrapper<A::Reader, S:Permit>;
async fn acquire_for(&self, op: Operation) -> ConcurrentLimitPermit<S::Permit> {
        if let Some(limits) = &self.operation_limits {
            if let Some(sem) = limits.get(&op) {
                return sem.acquire().await;
            }
        }
        self.semaphore.acquire().await
    }

/// a per-operation `Arc<Semaphore>`.
#[doc(hidden)]
pub enum ConcurrentLimitPermit<P> {
/// Permit from the global semaphore.
Global(P),
/// Permit from a per-operation semaphore.
PerOperation(OwnedSemaphorePermit),
}

#[doc(hidden)]
pub struct ConcurrentLimitHttpFetcher<S: ConcurrentLimitSemaphore> {
inner: HttpFetcher,
Expand Down Expand Up @@ -230,6 +307,7 @@ where
pub struct ConcurrentLimitAccessor<A: Access, S: ConcurrentLimitSemaphore> {
inner: A,
semaphore: S,
operation_limits: Option<Arc<HashMap<Operation, Arc<Semaphore>>>>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two questions:

  • Should we follow the template pattern here for customized semaphore?
  • Curious why do we need Arc wrapper for semaphore?
Suggested change
operation_limits: Option<Arc<HashMap<Operation, Arc<Semaphore>>>>,
operation_limits: Option<Arc<HashMap<Operation, S>>>,

}

impl<A: Access, S: ConcurrentLimitSemaphore> std::fmt::Debug for ConcurrentLimitAccessor<A, S> {
Expand All @@ -240,28 +318,42 @@ impl<A: Access, S: ConcurrentLimitSemaphore> std::fmt::Debug for ConcurrentLimit
}
}

impl<A: Access, S: ConcurrentLimitSemaphore> ConcurrentLimitAccessor<A, S> {
/// Acquire a permit for the given operation. If a per-operation semaphore
/// is configured for this operation, acquire from it; otherwise fall back
/// to the global semaphore.
async fn acquire_for(&self, op: Operation) -> ConcurrentLimitPermit<S::Permit> {
if let Some(limits) = &self.operation_limits {
if let Some(sem) = limits.get(&op) {
return ConcurrentLimitPermit::PerOperation(sem.clone().acquire_owned(1).await);
}
}
ConcurrentLimitPermit::Global(self.semaphore.acquire().await)
}
}

impl<A: Access, S: ConcurrentLimitSemaphore> LayeredAccess for ConcurrentLimitAccessor<A, S>
where
S::Permit: Unpin,
{
type Inner = A;
type Reader = ConcurrentLimitWrapper<A::Reader, S::Permit>;
type Writer = ConcurrentLimitWrapper<A::Writer, S::Permit>;
type Lister = ConcurrentLimitWrapper<A::Lister, S::Permit>;
type Deleter = ConcurrentLimitWrapper<A::Deleter, S::Permit>;
type Reader = ConcurrentLimitWrapper<A::Reader, ConcurrentLimitPermit<S::Permit>>;
type Writer = ConcurrentLimitWrapper<A::Writer, ConcurrentLimitPermit<S::Permit>>;
type Lister = ConcurrentLimitWrapper<A::Lister, ConcurrentLimitPermit<S::Permit>>;
type Deleter = ConcurrentLimitWrapper<A::Deleter, ConcurrentLimitPermit<S::Permit>>;

fn inner(&self) -> &Self::Inner {
&self.inner
}

async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
let _permit = self.semaphore.acquire().await;
let _permit = self.acquire_for(Operation::CreateDir).await;

self.inner.create_dir(path, args).await
}

async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
let permit = self.semaphore.acquire().await;
let permit = self.acquire_for(Operation::Read).await;

self.inner
.read(path, args)
Expand All @@ -270,7 +362,7 @@ where
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let permit = self.semaphore.acquire().await;
let permit = self.acquire_for(Operation::Write).await;

self.inner
.write(path, args)
Expand All @@ -279,13 +371,13 @@ where
}

async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let _permit = self.semaphore.acquire().await;
let _permit = self.acquire_for(Operation::Stat).await;

self.inner.stat(path, args).await
}

async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
let permit = self.semaphore.acquire().await;
let permit = self.acquire_for(Operation::Delete).await;

self.inner
.delete()
Expand All @@ -294,7 +386,7 @@ where
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
let permit = self.semaphore.acquire().await;
let permit = self.acquire_for(Operation::List).await;

self.inner
.list(path, args)
Expand Down Expand Up @@ -397,6 +489,98 @@ mod tests {
);
}

#[tokio::test]
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test to check whether exhausting per operation semaphore is unaffected by exhausting of global semaphore

async fn per_operation_limit_isolates_operations() {
// Stat has its own per-operation semaphore (1 permit).
// Exhausting the global semaphore should NOT block stat.
let global_sem = Arc::new(Semaphore::new(1));
let layer = ConcurrentLimitLayer::with_semaphore(global_sem.clone())
.with_operation_limit(Operation::Stat, 1);

let op = Operator::new(services::Memory::default())
.expect("operator must build")
.layer(layer)
.finish();

// Exhaust the global semaphore externally.
let _permit = global_sem.clone().acquire_owned(1).await;

// Stat should still work because it uses its dedicated per-operation
// semaphore, not the exhausted global one.
let stat_result = timeout(Duration::from_millis(200), op.stat("any")).await;
assert!(
stat_result.is_ok(),
"stat should not be blocked by exhausted global semaphore"
);
}

#[tokio::test]
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checks whether block occurs if per-operation semaphore is exhausted

async fn per_operation_limit_blocks_same_operation() {
// Stat has its own per-operation limit of 1.
// Externally exhaust the per-operation stat semaphore, then verify
// that stat blocks.
let layer = ConcurrentLimitLayer::new(1024).with_operation_limit(Operation::Stat, 1);

// Grab a reference to the per-operation semaphore so we can
// externally exhaust it. Build the layer, then extract the
// semaphore from the operation_limits map.
let op = Operator::new(services::Memory::default())
.expect("operator must build")
.layer(layer.clone())
.finish();

// Exhaust the per-operation stat semaphore by cloning the Arc from
// the layer's internal map.
let stat_sem = layer
.operation_limits
.as_ref()
.expect("operation_limits must exist")
.get(&Operation::Stat)
.expect("stat semaphore must exist")
.clone();
let _permit = stat_sem.acquire_owned(1).await;

// Stat should block because its per-operation semaphore is exhausted.
let blocked = timeout(Duration::from_millis(50), op.stat("any")).await;
assert!(
blocked.is_err(),
"stat should be blocked by exhausted per-operation semaphore"
);
}

#[tokio::test]
async fn operations_without_per_op_limit_use_global() {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checks whether only the assigned operation gets per-operation limit and usage of global for other operations

// Only stat gets a per-operation limit. Other operations (like
// create_dir) should fall back to the global semaphore.
let global_sem = Arc::new(Semaphore::new(1));
let layer = ConcurrentLimitLayer::with_semaphore(global_sem.clone())
.with_operation_limit(Operation::Stat, 10);

let op = Operator::new(services::Memory::default())
.expect("operator must build")
.layer(layer)
.finish();

// Exhaust the global semaphore externally.
let _permit = global_sem.clone().acquire_owned(1).await;

// Stat should still work because it has a dedicated per-operation
// semaphore with 10 permits, bypassing the exhausted global one.
let stat_result = timeout(Duration::from_millis(200), op.stat("any")).await;
assert!(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I usually prefer unwrap in unit tests over is_ok, since you can see the error directly.

stat_result.is_ok(),
"stat should use per-operation semaphore, not the exhausted global one"
);

// create_dir has no per-operation limit, so it falls back to the
// global semaphore which is exhausted -- it should block.
let blocked = timeout(Duration::from_millis(50), op.create_dir("blocked/")).await;
assert!(
blocked.is_err(),
"create_dir should be blocked by exhausted global semaphore"
);
}

#[tokio::test]
async fn http_semaphore_holds_until_body_dropped() {
struct DummyFetcher;
Expand Down
Loading