diff --git a/src/spanner/src/read_only_transaction.rs b/src/spanner/src/read_only_transaction.rs index ea57a75853..67ca13feb4 100644 --- a/src/spanner/src/read_only_transaction.rs +++ b/src/spanner/src/read_only_transaction.rs @@ -275,16 +275,7 @@ impl MultiUseReadOnlyTransactionBuilder { &self, options: TransactionOptions, ) -> crate::Result { - let request = crate::model::BeginTransactionRequest::default() - .set_session(self.client.session.name.clone()) - .set_options(options); - - // TODO(#4972): make request options configurable - let response = self - .client - .spanner - .begin_transaction(request, crate::RequestOptions::default()) - .await?; + let response = execute_begin_transaction(&self.client, options).await?; let transaction_selector = crate::model::TransactionSelector::default().set_id(response.id); @@ -429,6 +420,22 @@ impl MultiUseReadOnlyTransaction { } } +/// Executes an explicit `BeginTransaction` RPC on Spanner. +async fn execute_begin_transaction( + client: &crate::database_client::DatabaseClient, + options: crate::model::TransactionOptions, +) -> crate::Result { + let request = crate::model::BeginTransactionRequest::default() + .set_session(client.session.name.clone()) + .set_options(options); + + // TODO(#4972): make request options configurable + client + .spanner + .begin_transaction(request, crate::RequestOptions::default()) + .await +} + #[derive(Clone, Debug)] pub(crate) enum ReadContextTransactionSelector { Fixed(crate::model::TransactionSelector, Option), @@ -463,6 +470,32 @@ impl ReadContextTransactionSelector { } } + /// Explicitly begins a transaction if the transaction selector is a `Lazy` + /// selector and the transaction has not yet been started. This is used by + /// the client to force the start of a transaction if the first statement + /// failed. + pub(crate) async fn begin_explicitly( + &self, + client: &crate::database_client::DatabaseClient, + ) -> crate::Result<()> { + let Self::Lazy(lazy) = self else { + return Ok(()); + }; + + let options = { + let guard = lazy.lock().expect("transaction state mutex poisoned"); + let TransactionState::NotStarted(options) = &*guard else { + return Ok(()); + }; + options.clone() + }; + + let response = execute_begin_transaction(client, options).await?; + self.update(response.id, response.read_timestamp); + + Ok(()) + } + pub(crate) fn update(&self, id: bytes::Bytes, timestamp: Option) { if let Self::Lazy(lazy) = self { let mut guard = lazy.lock().expect("transaction state mutex poisoned"); @@ -517,6 +550,64 @@ impl ReadContext { options } + /// Attempts to execute an explicit `begin_transaction` RPC if the current transaction + /// selector is still in the `Lazy(NotStarted)` state. This is used as a + /// fallback mechanism when an initial implicit begin attempt failed. + async fn begin_explicitly_if_not_started(&self) -> crate::Result { + let ReadContextTransactionSelector::Lazy(lazy) = &self.transaction_selector else { + return Ok(false); + }; + let is_started = matches!(&*lazy.lock().unwrap(), TransactionState::Started(_, _)); + if is_started { + return Ok(false); + } + + self.transaction_selector + .begin_explicitly(&self.client) + .await?; + Ok(true) + } +} + +/// Helper macro to execute a streaming SQL or streaming read RPC with retry logic. +macro_rules! execute_stream_with_retry { + ($self:expr, $request:ident, $rpc_method:ident, $operation_variant:path) => {{ + let stream = match $self + .client + .spanner + // TODO(#4972): make request options configurable + .$rpc_method($request.clone(), crate::RequestOptions::default()) + .send() + .await + { + Ok(s) => s, + Err(e) => { + if $self.begin_explicitly_if_not_started().await? { + $request.transaction = Some($self.transaction_selector.selector()); + $self + .client + .spanner + // TODO(#4972): make request options configurable + .$rpc_method($request.clone(), crate::RequestOptions::default()) + .send() + .await? + } else { + return Err(e); + } + } + }; + + Ok(ResultSet::new( + stream, + Some($self.transaction_selector.clone()), + $self.precommit_token_tracker.clone(), + $self.client.clone(), + $operation_variant($request), + )) + }}; +} + +impl ReadContext { pub(crate) async fn execute_query>( &self, statement: T, @@ -528,21 +619,7 @@ impl ReadContext { .set_transaction(self.transaction_selector.selector()); request.request_options = self.amend_request_options(request.request_options); - let stream = self - .client - .spanner - // TODO(#4972): make request options configurable - .execute_streaming_sql(request.clone(), crate::RequestOptions::default()) - .send() - .await?; - - Ok(ResultSet::new( - stream, - Some(self.transaction_selector.clone()), - self.precommit_token_tracker.clone(), - self.client.clone(), - StreamOperation::Query(request), - )) + execute_stream_with_retry!(self, request, execute_streaming_sql, StreamOperation::Query) } pub(crate) async fn execute_read>( @@ -556,27 +633,15 @@ impl ReadContext { .set_transaction(self.transaction_selector.selector()); request.request_options = self.amend_request_options(request.request_options); - let stream = self - .client - .spanner - // TODO(#4972): make request options configurable - .streaming_read(request.clone(), crate::RequestOptions::default()) - .send() - .await?; - - Ok(ResultSet::new( - stream, - Some(self.transaction_selector.clone()), - self.precommit_token_tracker.clone(), - self.client.clone(), - StreamOperation::Read(request), - )) + execute_stream_with_retry!(self, request, streaming_read, StreamOperation::Read) } } #[cfg(test)] pub(crate) mod tests { use super::*; + use crate::result_set::tests::string_val; + use spanner_grpc_mock::google::spanner::v1 as mock_v1; #[test] fn auto_traits() { @@ -1019,4 +1084,345 @@ pub(crate) mod tests { Ok(()) } + + #[tokio::test] + async fn inline_begin_failure_retry_success() -> anyhow::Result<()> { + use crate::value::Value; + use gaxi::grpc::tonic::Response; + use gaxi::grpc::tonic::Status; + + let mut mock = create_session_mock(); + let mut seq = mockall::Sequence::new(); + + // 1. Initial query fails + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error"))); + + // 2. Explicit begin transaction succeeds + mock.expect_begin_transaction() + .times(1) + .in_sequence(&mut seq) + .returning(|req| { + let req = req.into_inner(); + assert_eq!( + req.session, + "projects/p/instances/i/databases/d/sessions/123" + ); + // Return a transaction with ID + Ok(Response::new(mock_v1::Transaction { + id: vec![7, 8, 9], + read_timestamp: Some(prost_types::Timestamp { + seconds: 123456789, + nanos: 0, + }), + ..Default::default() + })) + }); + + // 3. Retry of the query succeeds + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|req| { + let req = req.into_inner(); + // Ensure it uses the new transaction ID + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Id(id) => { + assert_eq!(id, vec![7, 8, 9]); + } + _ => panic!("Expected Selector::Id"), + } + Ok(Response::new(Box::pin(tokio_stream::iter(vec![Ok( + setup_select1(), + )])))) + }); + + let (db_client, _server) = setup_db_client(mock).await; + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + let mut rs = tx + .execute_query(Statement::builder("SELECT 1").build()) + .await?; + + let row = rs + .next() + .await + .ok_or_else(|| anyhow::anyhow!("Expected a row but stream cleanly exhausted"))??; + assert_eq!( + row.raw_values(), + [Value(string_val("1"))], + "The parsed row value safely matched the underlying stream chunk" + ); + + Ok(()) + } + + #[tokio::test] + async fn inline_begin_failure_retry_failure() -> anyhow::Result<()> { + use gaxi::grpc::tonic::Response; + use gaxi::grpc::tonic::Status; + + let mut mock = create_session_mock(); + let mut seq = mockall::Sequence::new(); + + // 1. Initial query fails + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error first"))); + + // 2. Explicit begin transaction succeeds + mock.expect_begin_transaction() + .times(1) + .in_sequence(&mut seq) + .returning(|_| { + Ok(Response::new(mock_v1::Transaction { + id: vec![7, 8, 9], + read_timestamp: Some(prost_types::Timestamp { + seconds: 123456789, + nanos: 0, + }), + ..Default::default() + })) + }); + + // 3. Retry of the query fails again + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error second"))); + + let (db_client, _server) = setup_db_client(mock).await; + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + let rs_result = tx + .execute_query(Statement::builder("SELECT 1").build()) + .await; + + assert!( + rs_result.is_err(), + "The failed execution bubbled upwards securely" + ); + let err_str = rs_result.unwrap_err().to_string(); + assert!( + err_str.contains("Internal error second"), + "Secondary error message accurately propagates: {}", + err_str + ); + + Ok(()) + } + + #[tokio::test] + async fn inline_begin_failure_fallback_rpc_fails() -> anyhow::Result<()> { + use gaxi::grpc::tonic::Status; + + let mut mock = create_session_mock(); + let mut seq = mockall::Sequence::new(); + + // 1. Initial query fails + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error query"))); + + // 2. Explicit begin transaction fails + mock.expect_begin_transaction() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error begin tx"))); + + let (db_client, _server) = setup_db_client(mock).await; + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + let rs_result = tx + .execute_query(Statement::builder("SELECT 1").build()) + .await; + + assert!( + rs_result.is_err(), + "The explicitly errored fallback boot securely propagated outwards" + ); + let err_str = rs_result.unwrap_err().to_string(); + assert!( + err_str.contains("Internal error begin tx"), + "Natively propagated specific BeginTx bounds: {}", + err_str + ); + + Ok(()) + } + + #[tokio::test] + async fn inline_begin_read_failure_retry_success() -> anyhow::Result<()> { + use crate::client::{KeySet, ReadRequest}; + use crate::value::Value; + use gaxi::grpc::tonic::Response; + use gaxi::grpc::tonic::Status; + + let mut mock = create_session_mock(); + let mut seq = mockall::Sequence::new(); + + // 1. Initial read fails + mock.expect_streaming_read() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error"))); + + // 2. Explicit begin transaction succeeds + mock.expect_begin_transaction() + .times(1) + .in_sequence(&mut seq) + .returning(|_| { + Ok(Response::new(mock_v1::Transaction { + id: vec![7, 8, 9], + read_timestamp: None, + ..Default::default() + })) + }); + + // 3. Retry of the read succeeds + mock.expect_streaming_read() + .times(1) + .in_sequence(&mut seq) + .returning(|req| { + let req = req.into_inner(); + // Ensure it uses the new transaction ID + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Id(id) => { + assert_eq!(id, vec![7, 8, 9]); + } + _ => panic!("Expected Selector::Id"), + } + Ok(Response::new(Box::pin(tokio_stream::iter(vec![Ok( + setup_select1(), + )])))) + }); + + let (db_client, _server) = setup_db_client(mock).await; + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + let read = ReadRequest::builder("Users", vec!["Id", "Name"]) + .with_keys(KeySet::all()) + .build(); + let mut rs = tx.execute_read(read).await?; + + let row = rs + .next() + .await + .ok_or_else(|| anyhow::anyhow!("Expected a row uniquely returned"))??; + assert_eq!( + row.raw_values(), + [Value(string_val("1"))], + "The macro correctly unpacked read arrays seamlessly" + ); + + Ok(()) + } + + #[tokio::test] + async fn single_use_query_send_error_returns_immediately() -> anyhow::Result<()> { + use crate::client::Statement; + use gaxi::grpc::tonic::Status; + + let mut mock = create_session_mock(); + + mock.expect_execute_streaming_sql() + .times(1) + .returning(|_| Err(Status::internal("Internal error single use query"))); + + mock.expect_begin_transaction().never(); + + let (db_client, _server) = setup_db_client(mock).await; + // single_use creates a Fixed selector + let tx = db_client.single_use().build(); + + let rs_result = tx + .execute_query(Statement::builder("SELECT 1").build()) + .await; + + assert!(rs_result.is_err()); + let err_str = rs_result.unwrap_err().to_string(); + assert!(err_str.contains("Internal error single use query")); + + Ok(()) + } + + #[tokio::test] + async fn inline_begin_already_started_query_send_error_returns_immediately() + -> anyhow::Result<()> { + use crate::client::Statement; + use gaxi::grpc::tonic::Status; + use spanner_grpc_mock::google::spanner::v1 as mock_v1; + + let mut mock = create_session_mock(); + let mut seq = mockall::Sequence::new(); + + mock.expect_begin_transaction().never(); + + // 1. First query executes successfully and implicitly starts the transaction. + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(move |_req| { + let mut rs = setup_select1(); + rs.metadata.as_mut().unwrap().transaction = Some(mock_v1::Transaction { + id: vec![4, 5, 6], + read_timestamp: None, + ..Default::default() + }); + Ok(gaxi::grpc::tonic::Response::new(Box::pin( + tokio_stream::iter(vec![Ok(rs)]), + ))) + }); + + // 2. Second query fails immediately upon send() + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error second query"))); + + let (db_client, _server) = setup_db_client(mock).await; + + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + // Run first query (starts tx) + let mut rs = tx + .execute_query(Statement::builder("SELECT 1").build()) + .await?; + let _ = rs.next().await.expect("has row")?; + + // Run second query (fails) + let rs_result = tx + .execute_query(Statement::builder("SELECT 2").build()) + .await; + + assert!(rs_result.is_err()); + let err_str = rs_result.unwrap_err().to_string(); + assert!(err_str.contains("Internal error second query")); + + Ok(()) + } } diff --git a/src/spanner/src/result_set.rs b/src/spanner/src/result_set.rs index 6bd848b175..2d775b7412 100644 --- a/src/spanner/src/result_set.rs +++ b/src/spanner/src/result_set.rs @@ -233,7 +233,29 @@ impl ResultSet { return Ok(()); } - Err(e) + // Check if this stream included an inlined BeginTransaction option + // and has not yet returned a transaction ID. If so, we explicitly + // begin the transaction and restart the stream. + let Some(ReadContextTransactionSelector::Lazy(lazy)) = &self.transaction_selector else { + return Err(e); + }; + let is_started = matches!( + &*lazy.lock().unwrap(), + crate::read_only_transaction::TransactionState::Started(_, _) + ); + if is_started { + return Err(e); + } + + self.transaction_selector + .as_ref() + .unwrap() + .begin_explicitly(&self.client) + .await?; + + self.partial_result_sets_buffer.clear(); + self.restart_stream().await?; + Ok(()) } fn handle_stream_end(&mut self) -> crate::Result> { @@ -281,15 +303,25 @@ impl ResultSet { (None, Some(mut m)) => { let transaction = m.transaction.take(); self.metadata = Some(ResultSetMetadata::new(Some(m))); - if let (Some(selector), Some(transaction)) = - (&self.transaction_selector, transaction) - { - selector.update( - transaction.id, - transaction - .read_timestamp - .and_then(|t| wkt::Timestamp::new(t.seconds, t.nanos).ok()), - ); + if let Some(selector) = &self.transaction_selector { + if let Some(transaction) = transaction { + selector.update( + transaction.id, + transaction + .read_timestamp + .and_then(|t| wkt::Timestamp::new(t.seconds, t.nanos).ok()), + ); + } else if let ReadContextTransactionSelector::Lazy(lazy) = selector { + let is_started = matches!( + &*lazy.lock().expect("transaction state mutex poisoned"), + crate::read_only_transaction::TransactionState::Started(_, _) + ); + if !is_started { + return Err(internal_error( + "Spanner failed to return a transaction ID for a query that included a BeginTransaction option", + )); + } + } } } } @@ -336,9 +368,15 @@ impl ResultSet { } async fn restart_stream(&mut self) -> crate::Result<()> { + // Get the latest transaction selector for this transaction. + let transaction_selector = self.transaction_selector.as_ref().map(|s| s.selector()); + match &mut self.operation { StreamOperation::Query(req) => { req.resume_token = self.last_resume_token.clone(); + req.transaction = transaction_selector + .clone() + .or_else(|| req.transaction.take()); let stream = self .client .spanner @@ -349,6 +387,9 @@ impl ResultSet { } StreamOperation::Read(req) => { req.resume_token = self.last_resume_token.clone(); + req.transaction = transaction_selector + .clone() + .or_else(|| req.transaction.take()); let stream = self .client .spanner @@ -465,6 +506,7 @@ pub(crate) mod tests { use super::*; use crate::client::Spanner; use gaxi::grpc::tonic::Response; + use google_cloud_auth::credentials::anonymous::Builder as Anonymous; use prost_types::Value; use spanner_grpc_mock::MockSpanner; use spanner_grpc_mock::google::spanner::v1::spanner_server::Spanner as SpannerTrait; @@ -528,7 +570,7 @@ pub(crate) mod tests { let client: Spanner = Spanner::builder() .with_endpoint(address) - .with_credentials(google_cloud_auth::credentials::anonymous::Builder::new().build()) + .with_credentials(Anonymous::new().build()) .build() .await .expect("Failed to build client"); @@ -564,6 +606,39 @@ pub(crate) mod tests { assert!(next.is_none()); } + #[tokio::test] + async fn test_result_set_metadata() -> anyhow::Result<()> { + let mut rs = run_mock_query(vec![PartialResultSet { + metadata: metadata(2), + values: vec![string_val("a"), string_val("b")], + last: true, + ..Default::default() + }]) + .await; + + // Called before next() -> returns MetadataNotAvailable + let meta_err = rs.metadata(); + assert!(meta_err.is_err()); + assert!(matches!( + meta_err.unwrap_err(), + ResultSetError::MetadataNotAvailable + )); + + // Advance to fetch metadata + let _next = rs.next().await.expect("Expected a row")?; + + // Called after next() -> returns metadata + let meta = rs.metadata(); + assert!(meta.is_ok()); + let meta = meta.unwrap(); + assert_eq!( + meta.column_names(), + &["col0".to_string(), "col1".to_string()] + ); + + Ok(()) + } + #[tokio::test] async fn test_result_set_handle_partial_result_set_error() -> anyhow::Result<()> { let mut rs = run_mock_query(vec![PartialResultSet { @@ -586,6 +661,34 @@ pub(crate) mod tests { Ok(()) } + #[tokio::test] + async fn test_result_set_handle_partial_result_set_error_immediate() -> anyhow::Result<()> { + let mut rs = run_mock_query(vec![ + PartialResultSet { + values: vec![string_val("row1")], + ..Default::default() + }, + PartialResultSet { + resume_token: b"token".to_vec(), + ..Default::default() + }, + ]) + .await; + + let res = rs.next().await; + assert!(res.is_some(), "Expected an error but got None"); + let res = res.expect("Expected some response but got None"); + assert!(res.is_err(), "Expected an error but got Ok"); + let err_str = res.expect_err("Expected should be an error").to_string(); + assert!( + err_str.contains("First PartialResultSet did not contain metadata"), + "Expected error to contain 'First PartialResultSet did not contain metadata', but got '{}'", + err_str + ); + + Ok(()) + } + #[tokio::test] async fn test_result_set_stream_ended_with_chunked_value() -> anyhow::Result<()> { let mut rs = run_mock_query(vec![PartialResultSet { @@ -725,7 +828,7 @@ pub(crate) mod tests { let client: Spanner = Spanner::builder() .with_endpoint(address) - .with_credentials(google_cloud_auth::credentials::anonymous::Builder::new().build()) + .with_credentials(Anonymous::new().build()) .build() .await?; @@ -1081,7 +1184,7 @@ pub(crate) mod tests { let client: Spanner = Spanner::builder() .with_endpoint(address) - .with_credentials(google_cloud_auth::credentials::anonymous::Builder::new().build()) + .with_credentials(Anonymous::new().build()) .build() .await?; @@ -1137,7 +1240,7 @@ pub(crate) mod tests { let client: Spanner = Spanner::builder() .with_endpoint(address) - .with_credentials(google_cloud_auth::credentials::anonymous::Builder::new().build()) + .with_credentials(Anonymous::new().build()) .build() .await?; @@ -1207,7 +1310,7 @@ pub(crate) mod tests { let client: Spanner = Spanner::builder() .with_endpoint(address) - .with_credentials(google_cloud_auth::credentials::anonymous::Builder::new().build()) + .with_credentials(Anonymous::new().build()) .build() .await?; @@ -1296,7 +1399,7 @@ pub(crate) mod tests { let client: Spanner = Spanner::builder() .with_endpoint(address) - .with_credentials(google_cloud_auth::credentials::anonymous::Builder::new().build()) + .with_credentials(Anonymous::new().build()) .build() .await?; @@ -1375,7 +1478,7 @@ pub(crate) mod tests { let client: Spanner = Spanner::builder() .with_endpoint(address) - .with_credentials(google_cloud_auth::credentials::anonymous::Builder::new().build()) + .with_credentials(Anonymous::new().build()) .build() .await?; @@ -1427,7 +1530,7 @@ pub(crate) mod tests { let client: Spanner = Spanner::builder() .with_endpoint(address) - .with_credentials(google_cloud_auth::credentials::anonymous::Builder::new().build()) + .with_credentials(Anonymous::new().build()) .build() .await?; @@ -1448,4 +1551,368 @@ pub(crate) mod tests { Ok(()) } + + #[tokio::test] + async fn result_set_inline_begin_stream_error_fallback() -> anyhow::Result<()> { + use gaxi::grpc::tonic::Response; + use gaxi::grpc::tonic::Status; + use spanner_grpc_mock::MockSpanner; + use spanner_grpc_mock::google::spanner::v1 as mock_v1; + use spanner_grpc_mock::start; + + let mut mock = MockSpanner::new(); + let mut seq = mockall::Sequence::new(); + + // 1. Stream yields an error on the first chunk before returning transaction metadata. + // E.g., INVALID_ARGUMENT because the query is malformed. + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_request| { + let stream = + tokio_stream::iter(vec![Err(Status::invalid_argument("Invalid query"))]); + Ok(Response::new( + Box::pin(stream) as ::ExecuteStreamingSqlStream, + )) + }); + + // 2. The explicit BeginTransaction fallback gets triggered. + mock.expect_begin_transaction() + .times(1) + .in_sequence(&mut seq) + .returning(|_| { + Ok(Response::new(mock_v1::Transaction { + id: vec![7, 8, 9], + read_timestamp: Some(prost_types::Timestamp { + seconds: 123456789, + nanos: 0, + }), + ..Default::default() + })) + }); + + // 3. The ResultSet gracefully restarts the stream using the transaction ID returned by BeginTransaction. + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|req| { + let req = req.into_inner(); + // Ensure the explicitly yielded ID is routed into the new stream transaction selector + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Id(id) => { + assert_eq!(id, vec![7, 8, 9]); + } + _ => panic!("Expected Selector::Id"), + } + + let stream = tokio_stream::iter(vec![Ok(PartialResultSet { + metadata: metadata(1), + values: vec![string_val("1")], + ..Default::default() + })]); + Ok(Response::new( + Box::pin(stream) as ::ExecuteStreamingSqlStream, + )) + }); + + mock.expect_create_session().returning(|_| { + Ok(Response::new(Session { + name: "session".to_string(), + multiplexed: true, + ..Default::default() + })) + }); + + let (address, _server) = start("127.0.0.1:0", mock).await?; + + let client: Spanner = Spanner::builder() + .with_endpoint(address) + .with_credentials(Anonymous::new().build()) + .build() + .await?; + + let db_client = client.database_client("db").build().await?; + + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + let mut rs = tx.execute_query("SELECT 1").await?; + + let row1 = rs.next().await.ok_or_else(|| { + anyhow::anyhow!("Expected row returned successfully despite stream breaking") + })??; + assert_eq!( + row1.raw_values()[0].0, + string_val("1"), + "Verify the returned stream successfully resumed with the correct payload" + ); + + Ok(()) + } + + #[tokio::test] + async fn result_set_retry_inline_begin_transient_error() -> anyhow::Result<()> { + use gaxi::grpc::tonic::Response; + use gaxi::grpc::tonic::Status; + use spanner_grpc_mock::MockSpanner; + use spanner_grpc_mock::google::spanner::v1 as mock_v1; + use spanner_grpc_mock::start; + + let mut mock = MockSpanner::new(); + let mut seq = mockall::Sequence::new(); + + // 1. Initial stream throws UNAVAILABLE before metadata. + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_request| { + let stream = + tokio_stream::iter(vec![Err(Status::unavailable("Transient network issue"))]); + Ok(Response::new( + Box::pin(stream) as ::ExecuteStreamingSqlStream, + )) + }); + + // 2. We retry the stream since it was a transient error. + // The retry should use the same transaction selector as the original request. + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|req| { + let req = req.into_inner(); + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Begin(_) => {} + _ => panic!("Expected Selector::Begin on stream retry"), + } + + let mut meta = metadata(1).unwrap(); + meta.transaction = Some(mock_v1::Transaction { + id: vec![7, 8, 9], + read_timestamp: None, + ..Default::default() + }); + + let stream = tokio_stream::iter(vec![Ok(PartialResultSet { + metadata: Some(meta), + values: vec![string_val("1")], + ..Default::default() + })]); + Ok(Response::new( + Box::pin(stream) as ::ExecuteStreamingSqlStream, + )) + }); + + mock.expect_create_session().returning(|_| { + Ok(Response::new(Session { + name: "session".to_string(), + multiplexed: true, + ..Default::default() + })) + }); + + let (address, _server) = start("127.0.0.1:0", mock).await?; + + let client: Spanner = Spanner::builder() + .with_endpoint(address) + .with_credentials(Anonymous::new().build()) + .build() + .await?; + + let db_client = client.database_client("db").build().await?; + + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + let mut rs = tx.execute_query("SELECT 1").await?; + + let row1 = rs + .next() + .await + .ok_or_else(|| anyhow::anyhow!("Expected stream to recover safely"))??; + assert_eq!( + row1.raw_values()[0].0, + string_val("1"), + "Verify resumed stream returns data" + ); + + Ok(()) + } + + #[tokio::test] + async fn result_set_retry_inline_begin_id_recovered() -> anyhow::Result<()> { + use gaxi::grpc::tonic::Response; + use gaxi::grpc::tonic::Status; + use spanner_grpc_mock::MockSpanner; + use spanner_grpc_mock::google::spanner::v1 as mock_v1; + use spanner_grpc_mock::start; + + let mut mock = MockSpanner::new(); + let mut seq = mockall::Sequence::new(); + + // 1. Stream successfully returns metadata chunk then throws UNAVAILABLE on chunk 2. + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_request| { + let mut meta = metadata(1).unwrap(); + meta.transaction = Some(mock_v1::Transaction { + id: vec![7, 8, 9], + read_timestamp: None, + ..Default::default() + }); + let stream = tokio_stream::iter(vec![ + Ok(PartialResultSet { + metadata: Some(meta), + values: vec![string_val("1")], + resume_token: b"token1".to_vec(), + ..Default::default() + }), + Err(Status::unavailable("Transient mid-stream network issue")), + ]); + Ok(Response::new( + Box::pin(stream) as ::ExecuteStreamingSqlStream, + )) + }); + + // 2. Stream resumes using Selector::Id. + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|req| { + let req = req.into_inner(); + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Id(id) => { + assert_eq!(id, vec![7, 8, 9]); + } + _ => panic!("Expected Selector::Id on stream retry"), + } + + let stream = tokio_stream::iter(vec![Ok(PartialResultSet { + values: vec![string_val("2")], + ..Default::default() + })]); + Ok(Response::new( + Box::pin(stream) as ::ExecuteStreamingSqlStream, + )) + }); + + mock.expect_create_session().returning(|_| { + Ok(Response::new(Session { + name: "session".to_string(), + multiplexed: true, + ..Default::default() + })) + }); + + let (address, _server) = start("127.0.0.1:0", mock).await?; + + let client: Spanner = Spanner::builder() + .with_endpoint(address) + .with_credentials(Anonymous::new().build()) + .build() + .await?; + + let db_client = client.database_client("db").build().await?; + + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + let mut rs = tx.execute_query("SELECT 1").await?; + + let row1 = rs + .next() + .await + .ok_or_else(|| anyhow::anyhow!("Expected stream row1 extracted"))??; + assert_eq!( + row1.raw_values()[0].0, + string_val("1"), + "Verified chunk 1 payload" + ); + let row2 = rs + .next() + .await + .ok_or_else(|| anyhow::anyhow!("Expected stream row2 recovered"))??; + assert_eq!( + row2.raw_values()[0].0, + string_val("2"), + "Verified chunk 2 reboot dynamically intercepted ID bounds correctly" + ); + + Ok(()) + } + + #[tokio::test] + async fn result_set_inline_begin_metadata_missing_transaction_fails() -> anyhow::Result<()> { + use gaxi::grpc::tonic::Response; + use spanner_grpc_mock::MockSpanner; + use spanner_grpc_mock::start; + + let mut mock = MockSpanner::new(); + let mut seq = mockall::Sequence::new(); + + // 1. Initial stream successfully returns metadata chunk but completely lacks the `Transaction` entity. + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_request| { + let stream = tokio_stream::iter(vec![Ok(PartialResultSet { + metadata: metadata(1), // Missing `.transaction` natively + values: vec![string_val("1")], + ..Default::default() + })]); + Ok(Response::new( + Box::pin(stream) as ::ExecuteStreamingSqlStream, + )) + }); + + mock.expect_create_session().returning(|_| { + Ok(Response::new(Session { + name: "session".to_string(), + multiplexed: true, + ..Default::default() + })) + }); + + let (address, _server) = start("127.0.0.1:0", mock).await?; + + let client: Spanner = Spanner::builder() + .with_endpoint(address) + .with_credentials(Anonymous::new().build()) + .build() + .await?; + + let db_client = client.database_client("db").build().await?; + + // Use explicitly deferred Lazy begin transaction! + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + let mut rs = tx.execute_query("SELECT 1").await?; + + let rs_result = rs + .next() + .await + .ok_or_else(|| anyhow::anyhow!("Expected explicit crash bound properly"))?; + assert!( + rs_result.is_err(), + "Securely aborted when metadata failed to package internal bounds properly" + ); + + let err_str = rs_result.unwrap_err().to_string(); + assert!( + err_str.contains("failed to return a transaction ID"), + "Caught implicit gap boundary: {}", + err_str + ); + + Ok(()) + } } diff --git a/tests/spanner/src/query.rs b/tests/spanner/src/query.rs index fe02427a19..0f1a54553e 100644 --- a/tests/spanner/src/query.rs +++ b/tests/spanner/src/query.rs @@ -246,6 +246,45 @@ async fn test_multi_use_read_only_transaction( Ok(()) } +pub async fn multi_use_read_only_transaction_invalid_query_fallback( + db_client: &DatabaseClient, +) -> anyhow::Result<()> { + // Start a multi-use read-only transaction with implicit begin. + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + // Expect a read timestamp to NOT have been chosen yet. + assert!(tx.read_timestamp().is_none()); + + // Execute the first query with invalid syntax. + let rs_result = tx + .execute_query(Statement::builder("SELECT * FROM NonExistentTable").build()) + .await; + + assert!( + rs_result.is_err(), + "Expected an error from an invalid query" + ); + + // The read timestamp should now be available because the transaction + // fell back to an explicit BeginTransaction. + assert!(tx.read_timestamp().is_some()); + + // It should be possible to use the transaction. + let mut rs2 = tx + .execute_query(Statement::builder("SELECT 2 AS col_int").build()) + .await?; + + let row2 = rs2.next().await.transpose()?.expect("should yield a row"); + let val2 = row2.raw_values()[0].as_string(); + assert_eq!(val2, "2"); + + Ok(()) +} + fn verify_null_row(row: &google_cloud_spanner::client::Row) { let raw_values = row.raw_values(); assert_eq!(raw_values.len(), 20, "Row should have exactly 20 columns"); diff --git a/tests/spanner/tests/driver.rs b/tests/spanner/tests/driver.rs index d27cc10dc5..2492088f67 100644 --- a/tests/spanner/tests/driver.rs +++ b/tests/spanner/tests/driver.rs @@ -26,6 +26,10 @@ mod spanner { integration_tests_spanner::query::query_with_parameters(&db_client).await?; integration_tests_spanner::query::result_set_metadata(&db_client).await?; integration_tests_spanner::query::multi_use_read_only_transaction(&db_client).await?; + integration_tests_spanner::query::multi_use_read_only_transaction_invalid_query_fallback( + &db_client, + ) + .await?; Ok(()) }