feat: provide per-operation concurrent limit#7329
feat: provide per-operation concurrent limit#7329codetyri0n wants to merge 1 commit intoapache:mainfrom
Conversation
| ); | ||
| } | ||
|
|
||
| #[tokio::test] |
There was a problem hiding this comment.
Test to check whether exhausting per operation semaphore is unaffected by exhausting of global semaphore
| ); | ||
| } | ||
|
|
||
| #[tokio::test] |
There was a problem hiding this comment.
Checks whether block occurs if per-operation semaphore is exhausted
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn operations_without_per_op_limit_use_global() { |
There was a problem hiding this comment.
Checks whether only the assigned operation gets per-operation limit and usage of global for other operations
dentiny
left a comment
There was a problem hiding this comment.
one more comment on unit test: most of the comments seem unnecessary -- the code is already explanatory itself
| /// # } | ||
| /// ``` | ||
| /// | ||
| /// Set per-operation concurrent limits to control different operations |
There was a problem hiding this comment.
The code example seems duplicate as with_operation_limit, do we need both?
| /// # Ok(()) | ||
| /// # } | ||
| /// ``` | ||
| pub fn with_operation_limit(mut self, op: Operation, permits: usize) -> Self { |
There was a problem hiding this comment.
Two things here:
- I think certain operations are not supported in the concurrent limit layer, for example, copy/rename/presign, but for your implementation it will pass through if users do
with_operation_limit(Operation::Copy, 1)which is a no-op, shall we check it? - Should we validate
permitsare larger than 0? otherwise the operation is permanently blocked.
| pub struct ConcurrentLimitAccessor<A: Access, S: ConcurrentLimitSemaphore> { | ||
| inner: A, | ||
| semaphore: S, | ||
| operation_limits: Option<Arc<HashMap<Operation, Arc<Semaphore>>>>, |
There was a problem hiding this comment.
Two questions:
- Should we follow the template pattern here for customized semaphore?
- Curious why do we need
Arcwrapper for semaphore?
| operation_limits: Option<Arc<HashMap<Operation, Arc<Semaphore>>>>, | |
| operation_limits: Option<Arc<HashMap<Operation, S>>>, |
| // Stat should still work because it has a dedicated per-operation | ||
| // semaphore with 10 permits, bypassing the exhausted global one. | ||
| let stat_result = timeout(Duration::from_millis(200), op.stat("any")).await; | ||
| assert!( |
There was a problem hiding this comment.
nit: I usually prefer unwrap in unit tests over is_ok, since you can see the error directly.
| } | ||
| } | ||
|
|
||
| /// A permit that can come from either the global semaphore (generic `S`) or |
There was a problem hiding this comment.
Hmmm I'm not convinced the enum here is necessary, does it work?
type Reader = ConcurrentLimitLayerWrapper<A::Reader, S:Permit>;
async fn acquire_for(&self, op: Operation) -> ConcurrentLimitPermit<S::Permit> {
if let Some(limits) = &self.operation_limits {
if let Some(sem) = limits.get(&op) {
return sem.acquire().await;
}
}
self.semaphore.acquire().await
}| pub struct ConcurrentLimitLayer<S: ConcurrentLimitSemaphore = Arc<Semaphore>> { | ||
| operation_semaphore: S, | ||
| http_semaphore: Option<S>, | ||
| operation_limits: Option<Arc<HashMap<Operation, Arc<Semaphore>>>>, |
There was a problem hiding this comment.
Instead of using a HashMap, we could simply use a struct to hold read_semaphore and write_semaphore.
We can implement the functionality step by step: first implement read, write, and list operations, then later add copy and rename.
Another question is: what is the relationship between operation_semaphore and read_semaphore? When read_semaphore is set, will we simply ignore operation_semaphore?
cc @dentiny and @codetyri0n
Which issue does this PR close?
Closes #7245
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?
yes
AI Usage Statement
Used claude opus 4.6 for code generation and manually reviewed by me, please flag if any concerns!