diff --git a/src/spanner/src/batch_read_only_transaction.rs b/src/spanner/src/batch_read_only_transaction.rs index 4617ef9e2d..ff59050803 100644 --- a/src/spanner/src/batch_read_only_transaction.rs +++ b/src/spanner/src/batch_read_only_transaction.rs @@ -16,7 +16,7 @@ use crate::database_client::DatabaseClient; use crate::model::PartitionOptions; use crate::precommit::PrecommitTokenTracker; use crate::read_only_transaction::{ - MultiUseReadOnlyTransaction, MultiUseReadOnlyTransactionBuilder, + MultiUseReadOnlyTransaction, MultiUseReadOnlyTransactionBuilder, ReadContextTransactionSelector, }; use crate::result_set::{ResultSet, StreamOperation}; use crate::statement::Statement; @@ -44,7 +44,8 @@ pub struct BatchReadOnlyTransactionBuilder { impl BatchReadOnlyTransactionBuilder { pub(crate) fn new(client: DatabaseClient) -> Self { Self { - inner: MultiUseReadOnlyTransactionBuilder::new(client), + inner: MultiUseReadOnlyTransactionBuilder::new(client) + .with_explicit_begin_transaction(true), } } @@ -148,7 +149,7 @@ impl BatchReadOnlyTransaction { .clone() .into_partition_query_request() .set_session(self.inner.context.client.session.name.clone()) - .set_transaction(self.inner.context.transaction_selector.clone()) + .set_transaction(self.inner.context.transaction_selector.selector()) .set_partition_options(options); let response = self @@ -165,7 +166,7 @@ impl BatchReadOnlyTransaction { .map(|p| Partition { inner: PartitionedOperation::Query { partition_token: p.partition_token, - transaction_selector: self.inner.context.transaction_selector.clone(), + transaction_selector: self.inner.context.transaction_selector.selector(), session_name: self.inner.context.client.session.name.clone(), statement: statement.clone(), }, @@ -203,7 +204,7 @@ impl BatchReadOnlyTransaction { .clone() .into_partition_read_request() .set_session(self.inner.context.client.session.name.clone()) - .set_transaction(self.inner.context.transaction_selector.clone()) + .set_transaction(self.inner.context.transaction_selector.selector()) .set_partition_options(options); let response = self @@ -220,7 +221,7 @@ impl BatchReadOnlyTransaction { .map(|p| Partition { inner: PartitionedOperation::Read { partition_token: p.partition_token, - transaction_selector: self.inner.context.transaction_selector.clone(), + transaction_selector: self.inner.context.transaction_selector.selector(), session_name: self.inner.context.client.session.name.clone(), read_request: read.clone(), }, @@ -344,6 +345,10 @@ impl Partition { Ok(ResultSet::new( stream, + Some(ReadContextTransactionSelector::Fixed( + transaction_selector.clone(), + None, + )), PrecommitTokenTracker::new_noop(), client.clone(), StreamOperation::Query(request), @@ -373,6 +378,10 @@ impl Partition { Ok(ResultSet::new( stream, + Some(ReadContextTransactionSelector::Fixed( + transaction_selector.clone(), + None, + )), PrecommitTokenTracker::new_noop(), client.clone(), StreamOperation::Read(request), diff --git a/src/spanner/src/read_only_transaction.rs b/src/spanner/src/read_only_transaction.rs index 3f0df51a5c..ea57a75853 100644 --- a/src/spanner/src/read_only_transaction.rs +++ b/src/spanner/src/read_only_transaction.rs @@ -19,6 +19,7 @@ use crate::precommit::PrecommitTokenTracker; use crate::result_set::{ResultSet, StreamOperation}; use crate::statement::Statement; use crate::timestamp_bound::TimestampBound; +use std::sync::{Arc, Mutex}; /// A builder for [SingleUseReadOnlyTransaction]. /// @@ -91,7 +92,10 @@ impl SingleUseReadOnlyTransactionBuilder { SingleUseReadOnlyTransaction { context: ReadContext { client: self.client, - transaction_selector, + transaction_selector: ReadContextTransactionSelector::Fixed( + transaction_selector, + None, + ), precommit_token_tracker: PrecommitTokenTracker::new_noop(), transaction_tag: None, }, @@ -204,6 +208,7 @@ impl SingleUseReadOnlyTransaction { pub struct MultiUseReadOnlyTransactionBuilder { client: DatabaseClient, timestamp_bound: Option, + explicit_begin: bool, } impl MultiUseReadOnlyTransactionBuilder { @@ -211,9 +216,44 @@ impl MultiUseReadOnlyTransactionBuilder { Self { client, timestamp_bound: None, + explicit_begin: false, } } + /// Sets whether the transaction should be explicitly started using a `BeginTransaction` RPC. + /// + /// # Example + /// ``` + /// # use google_cloud_spanner::client::Spanner; + /// # use google_cloud_spanner::client::Statement; + /// # async fn set_explicit_begin(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> { + /// let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?; + /// let transaction = db_client.read_only_transaction().with_explicit_begin_transaction(true).build().await?; + /// let statement = Statement::builder("SELECT * FROM users").build(); + /// let result_set = transaction.execute_query(statement).await?; + /// # Ok(()) + /// # } + /// ``` + /// + /// By default, the Spanner client will inline the `BeginTransaction` call with the first query + /// in the transaction. This reduces the number of round-trips to Spanner that are needed for a + /// transaction. Setting this option to `true` can be beneficial for specific transaction shapes: + /// + /// 1. When the transaction executes multiple parallel queries at the start of the transaction. + /// Only one query can include a `BeginTransaction` option, and all other queries must wait for + /// the first query to return the first result before they can proceed to execute. A + /// `BeginTransaction` RPC will quickly return a transaction ID and allow all queries to start + /// execution in parallel once the transaction ID has been returned. + /// 2. When the first query in the transaction could fail. If the query fails, then it will also + /// not start a transaction and return a transaction ID. The transaction will then fall back to + /// executing a `BeginTransaction` RPC and retry the first query. + /// + /// Default is `false` (inline begin). + pub fn with_explicit_begin_transaction(mut self, explicit: bool) -> Self { + self.explicit_begin = explicit; + self + } + /// Sets the timestamp bound for the read-only transaction. /// /// # Example @@ -231,6 +271,29 @@ impl MultiUseReadOnlyTransactionBuilder { self } + async fn begin( + &self, + options: TransactionOptions, + ) -> crate::Result { + let request = crate::model::BeginTransactionRequest::default() + .set_session(self.client.session.name.clone()) + .set_options(options); + + // TODO(#4972): make request options configurable + let response = self + .client + .spanner + .begin_transaction(request, crate::RequestOptions::default()) + .await?; + + let transaction_selector = crate::model::TransactionSelector::default().set_id(response.id); + + Ok(ReadContextTransactionSelector::Fixed( + transaction_selector, + response.read_timestamp, + )) + } + /// Builds the [MultiUseReadOnlyTransaction] and starts the transaction /// by calling the `BeginTransaction` RPC. /// @@ -245,30 +308,27 @@ impl MultiUseReadOnlyTransactionBuilder { /// ``` pub async fn build(self) -> crate::Result { let read_only = ReadOnly::default().set_return_read_timestamp(true); - let read_only = match self.timestamp_bound { - Some(b) => read_only.set_timestamp_bound(b.0), + let read_only = match self.timestamp_bound.as_ref() { + Some(b) => read_only.set_timestamp_bound(b.0.clone()), None => read_only.set_strong(true), }; - let request = crate::model::BeginTransactionRequest::default() - .set_session(self.client.session.name.clone()) - .set_options(TransactionOptions::default().set_read_only(read_only)); + let options = TransactionOptions::default().set_read_only(read_only); - // TODO(#4972): make request options configurable - let response = self - .client - .spanner - .begin_transaction(request, crate::RequestOptions::default()) - .await?; + let selector = if self.explicit_begin { + self.begin(options).await? + } else { + ReadContextTransactionSelector::Lazy(Arc::new(Mutex::new( + TransactionState::NotStarted(options), + ))) + }; - let transaction_selector = crate::model::TransactionSelector::default().set_id(response.id); Ok(MultiUseReadOnlyTransaction { context: ReadContext { client: self.client, - transaction_selector, + transaction_selector: selector, precommit_token_tracker: PrecommitTokenTracker::new_noop(), transaction_tag: None, }, - read_timestamp: response.read_timestamp, }) } } @@ -297,13 +357,12 @@ impl MultiUseReadOnlyTransactionBuilder { #[derive(Debug)] pub struct MultiUseReadOnlyTransaction { pub(crate) context: ReadContext, - pub(crate) read_timestamp: Option, } impl MultiUseReadOnlyTransaction { /// Returns the read timestamp chosen for the transaction. pub fn read_timestamp(&self) -> Option { - self.read_timestamp + self.context.transaction_selector.read_timestamp() } /// Executes a query using this transaction. @@ -370,10 +429,71 @@ impl MultiUseReadOnlyTransaction { } } +#[derive(Clone, Debug)] +pub(crate) enum ReadContextTransactionSelector { + Fixed(crate::model::TransactionSelector, Option), + Lazy(Arc>), +} + +#[derive(Clone, Debug)] +pub(crate) enum TransactionState { + NotStarted(crate::model::TransactionOptions), + Started(crate::model::TransactionSelector, Option), +} + +impl TransactionState { + fn selector(&self) -> crate::model::TransactionSelector { + match self { + Self::Started(selector, _) => selector.clone(), + Self::NotStarted(options) => { + crate::model::TransactionSelector::default().set_begin(options.clone()) + } + } + } +} + +impl ReadContextTransactionSelector { + pub(crate) fn selector(&self) -> crate::model::TransactionSelector { + match self { + Self::Fixed(selector, _) => selector.clone(), + Self::Lazy(lazy) => lazy + .lock() + .expect("transaction state mutex poisoned") + .selector(), + } + } + + pub(crate) fn update(&self, id: bytes::Bytes, timestamp: Option) { + if let Self::Lazy(lazy) = self { + let mut guard = lazy.lock().expect("transaction state mutex poisoned"); + if matches!(&*guard, TransactionState::NotStarted(_)) { + *guard = TransactionState::Started( + crate::model::TransactionSelector::default().set_id(id), + timestamp, + ); + } + } + } + + pub(crate) fn read_timestamp(&self) -> Option { + match self { + Self::Fixed(_, timestamp) => *timestamp, + Self::Lazy(lazy) => { + let guard = lazy.lock().expect("transaction state mutex poisoned"); + if let TransactionState::Started(_, timestamp) = &*guard { + *timestamp + } else { + None + } + } + } + } +} + #[derive(Clone, Debug)] pub(crate) struct ReadContext { pub(crate) client: DatabaseClient, - pub(crate) transaction_selector: crate::model::TransactionSelector, + pub(crate) transaction_selector: ReadContextTransactionSelector, pub(crate) precommit_token_tracker: PrecommitTokenTracker, pub(crate) transaction_tag: Option, } @@ -405,7 +525,7 @@ impl ReadContext { .into() .into_request() .set_session(self.client.session.name.clone()) - .set_transaction(self.transaction_selector.clone()); + .set_transaction(self.transaction_selector.selector()); request.request_options = self.amend_request_options(request.request_options); let stream = self @@ -418,6 +538,7 @@ impl ReadContext { Ok(ResultSet::new( stream, + Some(self.transaction_selector.clone()), self.precommit_token_tracker.clone(), self.client.clone(), StreamOperation::Query(request), @@ -432,7 +553,7 @@ impl ReadContext { .into() .into_request() .set_session(self.client.session.name.clone()) - .set_transaction(self.transaction_selector.clone()); + .set_transaction(self.transaction_selector.selector()); request.request_options = self.amend_request_options(request.request_options); let stream = self @@ -445,6 +566,7 @@ impl ReadContext { Ok(ResultSet::new( stream, + Some(self.transaction_selector.clone()), self.precommit_token_tracker.clone(), self.client.clone(), StreamOperation::Read(request), @@ -525,9 +647,8 @@ pub(crate) mod tests { let (db_client, _server) = setup_db_client(mock).await; let tx = db_client.single_use().build(); - let ro = tx - .context - .transaction_selector + let selector = tx.context.transaction_selector.selector(); + let ro = selector .single_use() .expect("Expected SingleUse selector") .read_only() @@ -543,9 +664,8 @@ pub(crate) mod tests { std::time::Duration::from_secs(10), )) .build(); - let ro2 = tx2 - .context - .transaction_selector + let selector = tx2.context.transaction_selector.selector(); + let ro2 = selector .single_use() .expect("Expected SingleUse selector") .read_only() @@ -646,6 +766,7 @@ pub(crate) mod tests { let tx = db_client .read_only_transaction() + .with_explicit_begin_transaction(true) .build() .await .expect("Failed to start tx"); @@ -670,6 +791,102 @@ pub(crate) mod tests { } } + #[tokio::test] + async fn execute_multi_query_inline_begin() -> anyhow::Result<()> { + use super::super::result_set::tests::string_val; + use crate::client::Statement; + use crate::value::Value; + use spanner_grpc_mock::google::spanner::v1 as mock_v1; + + let mut mock = create_session_mock(); + + // No explicit begin_transaction should be called. + mock.expect_begin_transaction().never(); + + let mut seq = mockall::Sequence::new(); + + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(move |req| { + let req = req.into_inner(); + assert_eq!( + req.session, + "projects/p/instances/i/databases/d/sessions/123" + ); + + // First call: Should have Selector::Begin + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Begin(_) => {} + _ => panic!("Expected Selector::Begin"), + } + let mut rs = setup_select1(); + rs.metadata.as_mut().unwrap().transaction = Some(mock_v1::Transaction { + id: vec![4, 5, 6], + read_timestamp: Some(prost_types::Timestamp { + seconds: 987654321, + nanos: 0, + }), + ..Default::default() + }); + Ok(gaxi::grpc::tonic::Response::new(Box::pin( + tokio_stream::iter(vec![Ok(rs)]), + ))) + }); + + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(move |req| { + let req = req.into_inner(); + // Second call: Should have Selector::Id using the ID returned in the first call + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Id(id) => { + assert_eq!(id, vec![4, 5, 6]); + } + _ => panic!("Expected Selector::Id"), + } + Ok(gaxi::grpc::tonic::Response::new(Box::pin( + tokio_stream::iter(vec![Ok(setup_select1())]), + ))) + }); + + let (db_client, _server) = setup_db_client(mock).await; + + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + // The read timestamp is not available until the first query is executed. + assert!(tx.read_timestamp().is_none()); + + for i in 0..2 { + let mut rs = tx + .execute_query(Statement::builder("SELECT 1").build()) + .await?; + + let row = rs.next().await.expect("Expected a row")?; + assert_eq!(row.raw_values(), [Value(string_val("1"))]); + + let result = rs.next().await; + assert!(result.is_none(), "Expected None, got {result:?}"); + + if i == 0 { + // Read timestamp becomes available. + assert_eq!( + tx.read_timestamp() + .expect("Expected read timestamp") + .seconds(), + 987654321 + ); + } + } + + Ok(()) + } + #[tokio::test] async fn execute_single_read() { use super::super::result_set::tests::string_val; @@ -705,4 +922,101 @@ pub(crate) mod tests { let result = rs.next().await; assert!(result.is_none(), "expected None, got {result:?}"); } + + #[tokio::test] + async fn execute_multi_read() -> anyhow::Result<()> { + use super::super::result_set::tests::string_val; + use crate::client::{KeySet, ReadRequest}; + use crate::value::Value; + use spanner_grpc_mock::google::spanner::v1 as mock_v1; + + let mut mock = create_session_mock(); + + // No explicit begin_transaction should be called. + mock.expect_begin_transaction().never(); + + let mut seq = mockall::Sequence::new(); + + mock.expect_streaming_read() + .times(1) + .in_sequence(&mut seq) + .returning(move |req| { + let req = req.into_inner(); + assert_eq!( + req.session, + "projects/p/instances/i/databases/d/sessions/123" + ); + + // First call: Should have Selector::Begin + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Begin(_) => {} + _ => panic!("Expected Selector::Begin"), + } + let mut rs = setup_select1(); + rs.metadata.as_mut().unwrap().transaction = Some(mock_v1::Transaction { + id: vec![4, 5, 6], + read_timestamp: Some(prost_types::Timestamp { + seconds: 987654321, + nanos: 0, + }), + ..Default::default() + }); + Ok(gaxi::grpc::tonic::Response::new(Box::pin( + tokio_stream::iter(vec![Ok(rs)]), + ))) + }); + + mock.expect_streaming_read() + .times(1) + .in_sequence(&mut seq) + .returning(move |req| { + let req = req.into_inner(); + // Second call: Should have Selector::Id using the ID returned in the first call + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Id(id) => { + assert_eq!(id, vec![4, 5, 6]); + } + _ => panic!("Expected Selector::Id"), + } + Ok(gaxi::grpc::tonic::Response::new(Box::pin( + tokio_stream::iter(vec![Ok(setup_select1())]), + ))) + }); + + let (db_client, _server) = setup_db_client(mock).await; + + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + // The read timestamp is not available until the first query is executed. + assert!(tx.read_timestamp().is_none()); + + for i in 0..2 { + let read = ReadRequest::builder("Users", vec!["Id", "Name"]) + .with_keys(KeySet::all()) + .build(); + let mut rs = tx.execute_read(read).await?; + + let row = rs.next().await.expect("Expected a row")?; + assert_eq!(row.raw_values(), [Value(string_val("1"))]); + + let result = rs.next().await; + assert!(result.is_none(), "Expected None, got {result:?}"); + + if i == 0 { + // Read timestamp becomes available. + assert_eq!( + tx.read_timestamp() + .expect("Expected read timestamp") + .seconds(), + 987654321 + ); + } + } + + Ok(()) + } } diff --git a/src/spanner/src/read_write_transaction.rs b/src/spanner/src/read_write_transaction.rs index 9a84b1bb87..920d69bfda 100644 --- a/src/spanner/src/read_write_transaction.rs +++ b/src/spanner/src/read_write_transaction.rs @@ -100,7 +100,11 @@ impl ReadWriteTransactionBuilder { .begin_transaction(request, RequestOptions::default()) .await?; - let transaction_selector = TransactionSelector::default().set_id(response.id); + let transaction_selector = + crate::read_only_transaction::ReadContextTransactionSelector::Fixed( + TransactionSelector::default().set_id(response.id), + None, + ); Ok(ReadWriteTransaction { context: ReadContext { client: self.client.clone(), @@ -144,7 +148,7 @@ impl ReadWriteTransaction { .into() .into_request() .set_session(self.context.client.session.name.clone()) - .set_transaction(self.context.transaction_selector.clone()) + .set_transaction(self.context.transaction_selector.selector()) .set_seqno(seqno); request.request_options = self.context.amend_request_options(request.request_options); @@ -245,7 +249,7 @@ impl ReadWriteTransaction { let request = ExecuteBatchDmlRequest::default() .set_session(self.context.client.session.name.clone()) - .set_transaction(self.context.transaction_selector.clone()) + .set_transaction(self.context.transaction_selector.selector()) .set_seqno(seqno) .set_statements(statements) .set_or_clear_request_options( @@ -271,7 +275,7 @@ impl ReadWriteTransaction { } pub(crate) fn transaction_id(&self) -> crate::Result { - match &self.context.transaction_selector.selector { + match &self.context.transaction_selector.selector().selector { Some(Selector::Id(id)) => Ok(id.clone()), _ => Err(internal_error("Transaction ID is missing")), } diff --git a/src/spanner/src/result_set.rs b/src/spanner/src/result_set.rs index cfe0397cae..6bd848b175 100644 --- a/src/spanner/src/result_set.rs +++ b/src/spanner/src/result_set.rs @@ -16,6 +16,7 @@ use crate::database_client::DatabaseClient; use crate::error::internal_error; use crate::google::spanner::v1::PartialResultSet; use crate::precommit::PrecommitTokenTracker; +use crate::read_only_transaction::ReadContextTransactionSelector; use crate::result_set_metadata::ResultSetMetadata; use crate::row::Row; use crate::server_streaming::stream::PartialResultSetStream; @@ -58,6 +59,7 @@ pub struct ResultSet { safe_to_retry: bool, max_buffered_partial_result_sets: usize, retry_count: usize, + transaction_selector: Option, } /// Errors that can occur when interacting with a [`ResultSet`]. @@ -84,6 +86,7 @@ impl ResultSet { /// Creates a new result set. pub(crate) fn new( stream: PartialResultSetStream, + transaction_selector: Option, precommit_token_tracker: PrecommitTokenTracker, client: DatabaseClient, operation: StreamOperation, @@ -102,6 +105,7 @@ impl ResultSet { safe_to_retry: true, max_buffered_partial_result_sets: MAX_BUFFERED_PARTIAL_RESULT_SETS, retry_count: 0, + transaction_selector, } } @@ -274,8 +278,19 @@ impl ResultSet { (Some(_), Some(_)) => { return Err(internal_error("Additional metadata after first result set")); } - (None, Some(m)) => { + (None, Some(mut m)) => { + let transaction = m.transaction.take(); self.metadata = Some(ResultSetMetadata::new(Some(m))); + if let (Some(selector), Some(transaction)) = + (&self.transaction_selector, transaction) + { + selector.update( + transaction.id, + transaction + .read_timestamp + .and_then(|t| wkt::Timestamp::new(t.seconds, t.nanos).ok()), + ); + } } } diff --git a/tests/spanner/src/query.rs b/tests/spanner/src/query.rs index e38b51b989..fe02427a19 100644 --- a/tests/spanner/src/query.rs +++ b/tests/spanner/src/query.rs @@ -194,17 +194,40 @@ pub async fn result_set_metadata(db_client: &DatabaseClient) -> anyhow::Result<( } pub async fn multi_use_read_only_transaction(db_client: &DatabaseClient) -> anyhow::Result<()> { + for explicit_begin in [false, true] { + test_multi_use_read_only_transaction(db_client, explicit_begin).await?; + } + Ok(()) +} + +async fn test_multi_use_read_only_transaction( + db_client: &DatabaseClient, + explicit_begin: bool, +) -> anyhow::Result<()> { // Start a multi-use read-only transaction. - let tx = db_client.read_only_transaction().build().await?; + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(explicit_begin) + .build() + .await?; - // Expect a read timestamp to have been chosen. - assert!(tx.read_timestamp().is_some()); + if explicit_begin { + // Expect a read timestamp to have been chosen immediately. + assert!(tx.read_timestamp().is_some()); + } else { + // Expect a read timestamp to NOT have been chosen yet. + assert!(tx.read_timestamp().is_none()); + } // Execute the first query. let mut rs1 = tx .execute_query(Statement::builder("SELECT 1 AS col_int").build()) .await?; let row1 = rs1.next().await.transpose()?.expect("should yield a row"); + + // The read timestamp is now always available. + assert!(tx.read_timestamp().is_some()); + let val1 = row1.raw_values()[0].as_string(); assert_eq!(val1, "1"); let next1 = rs1.next().await.transpose()?;