diff --git a/src/pubsub/src/subscriber/lease_state.rs b/src/pubsub/src/subscriber/lease_state.rs index e53b9b9cf6..2cdadabe4a 100644 --- a/src/pubsub/src/subscriber/lease_state.rs +++ b/src/pubsub/src/subscriber/lease_state.rs @@ -106,21 +106,30 @@ impl AtLeastOnceInfo { } } +#[derive(Debug, Clone, Copy, PartialEq)] +pub(super) enum MessageStatus { + Leased, + /// We are currently trying to ack this message. + /// + /// We need to continue to extend these leases because the exactly-once + /// confirmed ack retry loop can take arbitrarily long. + /// + /// The client will not expire leases in this state. The server will + /// report if a lease has expired. We do not want to mask a success with + /// a `LeaseExpired` error. + Acking, + /// We are currently trying to nack this message. + /// + /// We keep it in `under_lease` to hold onto `result_tx` until the nack is confirmed, + /// but we do not want to extend its lease while we wait. + Nacking, +} + #[derive(Debug)] pub(super) struct ExactlyOnceInfo { receive_time: Instant, result_tx: Sender, - // If true, we are currently trying to ack this message. - // We need to continue to extend these leases because the exactly-once - // confirmed ack retry loop can take arbitrarily long. - // The client will not expire leases in this state. The server will - // report if a lease has expired. We do not want to mask a success with - // a `LeaseExpired` error. - pending: bool, - // If true, we are currently trying to nack this message. - // We keep it in `under_lease` to hold onto `result_tx` until the nack is confirmed, - // but we do not want to extend its lease while we wait. - nacking: bool, + status: MessageStatus, } impl ExactlyOnceInfo { @@ -128,8 +137,7 @@ impl ExactlyOnceInfo { ExactlyOnceInfo { receive_time: Instant::now(), result_tx, - pending: false, - nacking: false, + status: MessageStatus::Leased, } } } diff --git a/src/pubsub/src/subscriber/lease_state/exactly_once.rs b/src/pubsub/src/subscriber/lease_state/exactly_once.rs index 59984eec3a..b24619b516 100644 --- a/src/pubsub/src/subscriber/lease_state/exactly_once.rs +++ b/src/pubsub/src/subscriber/lease_state/exactly_once.rs @@ -13,8 +13,8 @@ // limitations under the License. use super::super::handler::AckResult; -use super::ExactlyOnceInfo; use super::MAX_IDS_PER_RPC; +use super::{ExactlyOnceInfo, MessageStatus}; use crate::error::AckError; use std::collections::HashMap; // Use a `tokio::time::Instant` to facilitate time-based unit testing. @@ -52,22 +52,22 @@ impl Leases { /// Process an ack from the application pub fn ack(&mut self, ack_id: String) { - let Some(ExactlyOnceInfo { pending, .. }) = self.under_lease.get_mut(&ack_id) else { + let Some(info) = self.under_lease.get_mut(&ack_id) else { // We already reported an error for this message, either because // it's lease expired, or because the server reported a failure in // an attempt to extend its lease. return; }; - *pending = true; + info.status = MessageStatus::Acking; self.to_ack.push(ack_id); } /// Process a nack from the application pub fn nack(&mut self, ack_id: String) { - let Some(ExactlyOnceInfo { nacking, .. }) = self.under_lease.get_mut(&ack_id) else { + let Some(info) = self.under_lease.get_mut(&ack_id) else { return; }; - *nacking = true; + info.status = MessageStatus::Nacking; self.to_nack.push(ack_id); } @@ -106,14 +106,16 @@ impl Leases { let remaining = self .under_lease .iter() - .filter_map(|(id, info)| { - if info.nacking { - None - } else if !info.pending && info.receive_time + max_lease < now { - expired.push(id.clone()); - None - } else { - Some(id.clone()) + .filter_map(|(id, info)| match info.status { + MessageStatus::Nacking => None, + MessageStatus::Acking => Some(id.clone()), + MessageStatus::Leased => { + if info.receive_time + max_lease < now { + expired.push(id.clone()); + None + } else { + Some(id.clone()) + } } }) .collect::>() @@ -186,8 +188,7 @@ mod tests { ExactlyOnceInfo { receive_time: Instant::now(), result_tx, - pending: false, - nacking: false, + status: MessageStatus::Leased, } } @@ -293,14 +294,14 @@ mod tests { .under_lease .get(&test_id(1)) .expect("ack ID should be under lease"); - assert!(!ack_id.pending, "{ack_id:?}"); + assert_eq!(ack_id.status, MessageStatus::Leased); leases.ack(test_id(1)); let ack_id = leases .under_lease .get(&test_id(1)) .expect("ack ID should be under lease"); - assert!(ack_id.pending, "{ack_id:?}"); + assert_eq!(ack_id.status, MessageStatus::Acking); } #[tokio::test] @@ -313,8 +314,7 @@ mod tests { ExactlyOnceInfo { receive_time: Instant::now() - Duration::from_secs(3), result_tx, - pending: false, - nacking: false, + status: MessageStatus::Leased, }, ); assert_eq!( @@ -539,8 +539,7 @@ mod tests { ExactlyOnceInfo { receive_time: Instant::now() - Duration::from_secs(3), result_tx, - pending: false, - nacking: false, + status: MessageStatus::Leased, }, ); @@ -550,8 +549,7 @@ mod tests { ExactlyOnceInfo { receive_time: Instant::now() - Duration::from_secs(1), result_tx, - pending: false, - nacking: false, + status: MessageStatus::Leased, }, ); @@ -611,8 +609,7 @@ mod tests { ExactlyOnceInfo { receive_time: Instant::now() - Duration::from_secs(1), result_tx, - pending: true, - nacking: false, + status: MessageStatus::Acking, }, ); @@ -622,8 +619,7 @@ mod tests { ExactlyOnceInfo { receive_time: Instant::now() - Duration::from_secs(1), result_tx, - pending: false, - nacking: false, + status: MessageStatus::Leased, }, ); @@ -645,6 +641,51 @@ mod tests { Ok(()) } + #[tokio::test] + async fn nacking_messages_are_not_extended() -> anyhow::Result<()> { + let mut leases = Leases::default(); + + let (result_tx, result_rx1) = channel(); + leases.add( + test_id(1), + ExactlyOnceInfo { + receive_time: Instant::now() - Duration::from_secs(1), + result_tx, + status: MessageStatus::Leased, + }, + ); + leases.nack(test_id(1)); + + let (result_tx, result_rx2) = channel(); + leases.add( + test_id(2), + ExactlyOnceInfo { + receive_time: Instant::now() - Duration::from_secs(1), + result_tx, + status: MessageStatus::Leased, + }, + ); + + let batches = leases.retain(Duration::ZERO); + assert!(batches.is_empty(), "{batches:?}"); + + assert_eq!( + TestLeases { + under_lease: vec![test_id(1)], + to_ack: Vec::new(), + to_nack: vec![test_id(1)], + }, + leases + ); + + let err = result_rx2.await?.expect_err("error should be returned"); + assert!(matches!(err, AckError::LeaseExpired), "{err:?}"); + + assert!(result_rx1.is_empty(), "{result_rx1:?}"); + + Ok(()) + } + #[tokio::test] async fn evict() -> anyhow::Result<()> { let mut leases = Leases::default(); @@ -657,8 +698,7 @@ mod tests { result_tx, // Even pending acks will be evicted, and satisfied with // `Shutdown` errors. - pending: true, - nacking: false, + status: MessageStatus::Acking, }, ); let (result_tx, result_rx2) = channel(); @@ -667,12 +707,27 @@ mod tests { ExactlyOnceInfo { receive_time: Instant::now(), result_tx, - pending: false, - nacking: false, + status: MessageStatus::Leased, + }, + ); + let (result_tx, result_rx3) = channel(); + leases.add( + test_id(3), + ExactlyOnceInfo { + receive_time: Instant::now(), + result_tx, + status: MessageStatus::Leased, }, ); - leases.add(test_id(3), test_info()); leases.nack(test_id(3)); + assert_eq!( + leases + .under_lease + .get(&test_id(3)) + .expect("nack is under lease") + .status, + MessageStatus::Nacking + ); assert_eq!( TestLeases { under_lease: vec![test_id(1), test_id(2), test_id(3)], @@ -692,6 +747,8 @@ mod tests { assert!(matches!(err, AckError::Shutdown(_)), "{err:?}"); let err = result_rx2.await?.expect_err("error should be returned"); assert!(matches!(err, AckError::Shutdown(_)), "{err:?}"); + let err = result_rx3.await?.expect_err("error should be returned"); + assert!(matches!(err, AckError::Shutdown(_)), "{err:?}"); Ok(()) }