Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions src/pubsub/src/subscriber/lease_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,30 +106,35 @@ impl AtLeastOnceInfo {
}
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub(super) enum MessageStatus {
Leased,
/// We are currently trying to ack this message.
/// We need to continue to extend these leases because the exactly-once
/// confirmed ack retry loop can take arbitrarily long.
/// The client will not expire leases in this state. The server will
/// report if a lease has expired. We do not want to mask a success with
/// a `LeaseExpired` error.
Acking,
/// We are currently trying to nack this message.
/// We keep it in `under_lease` to hold onto `result_tx` until the nack is confirmed,
/// but we do not want to extend its lease while we wait.
Nacking,
}

#[derive(Debug)]
pub(super) struct ExactlyOnceInfo {
receive_time: Instant,
result_tx: Sender<AckResult>,
// If true, we are currently trying to ack this message.
// We need to continue to extend these leases because the exactly-once
// confirmed ack retry loop can take arbitrarily long.
// The client will not expire leases in this state. The server will
// report if a lease has expired. We do not want to mask a success with
// a `LeaseExpired` error.
pending: bool,
// If true, we are currently trying to nack this message.
// We keep it in `under_lease` to hold onto `result_tx` until the nack is confirmed,
// but we do not want to extend its lease while we wait.
nacking: bool,
status: MessageStatus,
}

impl ExactlyOnceInfo {
pub(super) fn new(result_tx: Sender<AckResult>) -> Self {
ExactlyOnceInfo {
receive_time: Instant::now(),
result_tx,
pending: false,
nacking: false,
status: MessageStatus::Leased,
}
}
}
Expand Down
121 changes: 89 additions & 32 deletions src/pubsub/src/subscriber/lease_state/exactly_once.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
// limitations under the License.

use super::super::handler::AckResult;
use super::ExactlyOnceInfo;
use super::MAX_IDS_PER_RPC;
use super::{ExactlyOnceInfo, MessageStatus};
use crate::error::AckError;
use std::collections::HashMap;
// Use a `tokio::time::Instant` to facilitate time-based unit testing.
Expand Down Expand Up @@ -52,22 +52,22 @@ impl Leases {

/// Process an ack from the application
pub fn ack(&mut self, ack_id: String) {
let Some(ExactlyOnceInfo { pending, .. }) = self.under_lease.get_mut(&ack_id) else {
let Some(info) = self.under_lease.get_mut(&ack_id) else {
// We already reported an error for this message, either because
// it's lease expired, or because the server reported a failure in
// an attempt to extend its lease.
return;
};
*pending = true;
info.status = MessageStatus::Acking;
self.to_ack.push(ack_id);
}

/// Process a nack from the application
pub fn nack(&mut self, ack_id: String) {
let Some(ExactlyOnceInfo { nacking, .. }) = self.under_lease.get_mut(&ack_id) else {
let Some(info) = self.under_lease.get_mut(&ack_id) else {
return;
};
*nacking = true;
info.status = MessageStatus::Nacking;
self.to_nack.push(ack_id);
}

Expand Down Expand Up @@ -106,14 +106,16 @@ impl Leases {
let remaining = self
.under_lease
.iter()
.filter_map(|(id, info)| {
if info.nacking {
None
} else if !info.pending && info.receive_time + max_lease < now {
expired.push(id.clone());
None
} else {
Some(id.clone())
.filter_map(|(id, info)| match info.status {
MessageStatus::Nacking => None,
MessageStatus::Acking => Some(id.clone()),
MessageStatus::Leased => {
if info.receive_time + max_lease < now {
expired.push(id.clone());
None
} else {
Some(id.clone())
}
}
})
.collect::<Vec<_>>()
Expand Down Expand Up @@ -186,8 +188,7 @@ mod tests {
ExactlyOnceInfo {
receive_time: Instant::now(),
result_tx,
pending: false,
nacking: false,
status: MessageStatus::Leased,
}
}

Expand Down Expand Up @@ -293,14 +294,14 @@ mod tests {
.under_lease
.get(&test_id(1))
.expect("ack ID should be under lease");
assert!(!ack_id.pending, "{ack_id:?}");
assert_eq!(ack_id.status, MessageStatus::Leased);

leases.ack(test_id(1));
let ack_id = leases
.under_lease
.get(&test_id(1))
.expect("ack ID should be under lease");
assert!(ack_id.pending, "{ack_id:?}");
assert_eq!(ack_id.status, MessageStatus::Acking);
}

#[tokio::test]
Expand All @@ -313,8 +314,7 @@ mod tests {
ExactlyOnceInfo {
receive_time: Instant::now() - Duration::from_secs(3),
result_tx,
pending: false,
nacking: false,
status: MessageStatus::Leased,
},
);
assert_eq!(
Expand Down Expand Up @@ -539,8 +539,7 @@ mod tests {
ExactlyOnceInfo {
receive_time: Instant::now() - Duration::from_secs(3),
result_tx,
pending: false,
nacking: false,
status: MessageStatus::Leased,
},
);

Expand All @@ -550,8 +549,7 @@ mod tests {
ExactlyOnceInfo {
receive_time: Instant::now() - Duration::from_secs(1),
result_tx,
pending: false,
nacking: false,
status: MessageStatus::Leased,
},
);

Expand Down Expand Up @@ -611,8 +609,7 @@ mod tests {
ExactlyOnceInfo {
receive_time: Instant::now() - Duration::from_secs(1),
result_tx,
pending: true,
nacking: false,
status: MessageStatus::Acking,
},
);

Expand All @@ -622,8 +619,7 @@ mod tests {
ExactlyOnceInfo {
receive_time: Instant::now() - Duration::from_secs(1),
result_tx,
pending: false,
nacking: false,
status: MessageStatus::Leased,
},
);

Expand All @@ -645,6 +641,51 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn nacking_messages_are_not_extended() -> anyhow::Result<()> {
let mut leases = Leases::default();

let (result_tx, result_rx1) = channel();
leases.add(
test_id(1),
ExactlyOnceInfo {
receive_time: Instant::now() - Duration::from_secs(1),
result_tx,
status: MessageStatus::Leased,
},
);
leases.nack(test_id(1));

let (result_tx, result_rx2) = channel();
leases.add(
test_id(2),
ExactlyOnceInfo {
receive_time: Instant::now() - Duration::from_secs(1),
result_tx,
status: MessageStatus::Leased,
},
);

let batches = leases.retain(Duration::ZERO);
assert!(batches.is_empty(), "{batches:?}");

assert_eq!(
TestLeases {
under_lease: vec![test_id(1)],
to_ack: Vec::new(),
to_nack: vec![test_id(1)],
},
leases
);

let err = result_rx2.await?.expect_err("error should be returned");
assert!(matches!(err, AckError::LeaseExpired), "{err:?}");

assert!(result_rx1.is_empty(), "{result_rx1:?}");

Ok(())
}

#[tokio::test]
async fn evict() -> anyhow::Result<()> {
let mut leases = Leases::default();
Expand All @@ -657,8 +698,7 @@ mod tests {
result_tx,
// Even pending acks will be evicted, and satisfied with
// `Shutdown` errors.
pending: true,
nacking: false,
status: MessageStatus::Acking,
},
);
let (result_tx, result_rx2) = channel();
Expand All @@ -667,12 +707,27 @@ mod tests {
ExactlyOnceInfo {
receive_time: Instant::now(),
result_tx,
pending: false,
nacking: false,
status: MessageStatus::Leased,
},
);
let (result_tx, result_rx3) = channel();
leases.add(
test_id(3),
ExactlyOnceInfo {
receive_time: Instant::now(),
result_tx,
status: MessageStatus::Leased,
},
);
leases.add(test_id(3), test_info());
leases.nack(test_id(3));
assert_eq!(
leases
.under_lease
.get(&test_id(3))
.expect("nack is under lease")
.status,
MessageStatus::Nacking
);
assert_eq!(
TestLeases {
under_lease: vec![test_id(1), test_id(2), test_id(3)],
Expand All @@ -692,6 +747,8 @@ mod tests {
assert!(matches!(err, AckError::Shutdown(_)), "{err:?}");
let err = result_rx2.await?.expect_err("error should be returned");
assert!(matches!(err, AckError::Shutdown(_)), "{err:?}");
let err = result_rx3.await?.expect_err("error should be returned");
assert!(matches!(err, AckError::Shutdown(_)), "{err:?}");

Ok(())
}
Expand Down
Loading