diff --git a/docs/source/queries/schema-agreement.md b/docs/source/queries/schema-agreement.md index b9a519b4da..aa70a84ebc 100644 --- a/docs/source/queries/schema-agreement.md +++ b/docs/source/queries/schema-agreement.md @@ -1,11 +1,11 @@ # Schema agreement -Sometimes after performing queries some nodes have not been updated so we need a mechanism that checks if every node have agreed schema version. -There are four methods in `Session` that assist us. +Sometimes after performing queries some nodes have not been updated, so we need a mechanism that checks if every node have agreed on schema version. +There is a number of methods in `Session` that assist us. Every method raise `QueryError` if something goes wrong, but they should never raise any errors, unless there is a DB or connection malfunction. ### Checking schema version -`Session::fetch_schema_version` returns an `Uuid` of local node's schema version. +`Session::fetch_schema_version` returns an `Uuid` of local node's schema version. ```rust # extern crate scylla; @@ -19,7 +19,9 @@ println!("Local schema version is: {}", session.fetch_schema_version().await?); ### Awaiting schema agreement -`Session::await_schema_agreement` returns a `Future` that can be `await`ed on as long as schema is not in an agreement. +`Session::await_schema_agreement` returns a `Future` that can be `await`ed as long as schema is not in an agreement. +However, it won't wait forever; `SessionConfig` defines a timeout that limits the time of waiting. If the timeout elapses, +the return value is `false`, otherwise it is `true`. ```rust # extern crate scylla; @@ -31,28 +33,10 @@ session.await_schema_agreement().await?; # } ``` -### Awaiting with timeout -We can also set timeout in milliseconds with `Session::await_timed_schema_agreement`. -It takes one argument, an `std::time::Duration` value that tells how long our driver should await for schema agreement. If the timeout is met the return value is `false` otherwise it is `true`. - -```rust -# extern crate scylla; -# use scylla::Session; -# use std::error::Error; -# use std::time::Duration; -# async fn check_only_compiles(session: &Session) -> Result<(), Box> { -if session.await_timed_schema_agreement(Duration::from_secs(5)).await? { // wait for 5 seconds - println!("SCHEMA AGREED"); -} else { - println!("SCHEMA IS NOT IN AGREEMENT - TIMED OUT"); -} -# Ok(()) -# } -``` - -### Checking for schema interval -If schema is not agreed driver sleeps for a duration before checking it again. Default value is 200 milliseconds but it can be changed with `SessionBuilder::schema_agreement_interval`. +### Interval of checking for schema agreement +If the schema is not agreed upon, the driver sleeps for a duration before checking it again. The default value is 200 milliseconds, +but it can be changed with `SessionBuilder::schema_agreement_interval`. ```rust # extern crate scylla; @@ -70,15 +54,15 @@ SessionBuilder::new() ``` ### Checking if schema is in agreement now -If you want to check if schema is in agreement now without retrying after failure you can use `Session::check_schema_agreement` function. +If you want to check if schema is in agreement now, without retrying after failure, you can use `Session::check_schema_agreement` function. ```rust # extern crate scylla; # use scylla::Session; # use std::error::Error; # async fn check_only_compiles(session: &Session) -> Result<(), Box> { -if session.check_schema_agreement().await? { +if session.check_schema_agreement().await? { println!("SCHEMA AGREED"); } else { println!("SCHEMA IS NOT IN AGREEMENT"); @@ -86,5 +70,3 @@ if session.check_schema_agreement().await? { # Ok(()) # } ``` - - diff --git a/examples/schema_agreement.rs b/examples/schema_agreement.rs index b1825ddfbe..44e2b32859 100644 --- a/examples/schema_agreement.rs +++ b/examples/schema_agreement.rs @@ -20,17 +20,12 @@ async fn main() -> Result<()> { let schema_version = session.fetch_schema_version().await?; println!("Schema version: {}", schema_version); - session.await_schema_agreement().await?; // without timeout example session.query("CREATE KEYSPACE IF NOT EXISTS ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}", &[]).await?; - if session - .await_timed_schema_agreement(Duration::from_secs(5)) - .await? - { - // with timeout example - println!("Timed schema is in agreement"); + if session.await_schema_agreement().await? { + println!("Schema is in agreement"); } else { - println!("Timed schema is NOT in agreement"); + println!("Schema is NOT in agreement"); } session .query( diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index 9fb7450d3f..629efdf3b7 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -148,7 +148,8 @@ pub struct Session { default_execution_profile_handle: ExecutionProfileHandle, schema_agreement_interval: Duration, metrics: Arc, - auto_await_schema_agreement_timeout: Option, + schema_agreement_timeout: Duration, + schema_agreement_automatic_waiting: bool, refresh_metadata_on_auto_schema_agreement: bool, keyspace_name: ArcSwapOption, tracing_info_fetch_attempts: NonZeroU32, @@ -170,7 +171,7 @@ impl std::fmt::Debug for Session { .field("metrics", &self.metrics) .field( "auto_await_schema_agreement_timeout", - &self.auto_await_schema_agreement_timeout, + &self.schema_agreement_timeout, ) .finish() } @@ -204,7 +205,6 @@ pub struct SessionConfig { pub authenticator: Option>, - pub schema_agreement_interval: Duration, pub connect_timeout: Duration, /// Size of the per-node connection pool, i.e. how many connections the driver should keep to each node. @@ -229,9 +229,21 @@ pub struct SessionConfig { /// If `None`, connections are never closed due to lack of response to a keepalive message. pub keepalive_timeout: Option, - /// Controls the timeout for the automatic wait for schema agreement after sending a schema-altering statement. - /// If `None`, the automatic schema agreement is disabled. - pub auto_await_schema_agreement_timeout: Option, + /// How often the driver should ask if schema is in agreement. + pub schema_agreement_interval: Duration, + + /// Controls the timeout for waiting for schema agreement. + /// This works both for manual awaiting schema agreement and for + /// automatic waiting after a schema-altering statement is sent. + pub schema_agreement_timeout: Duration, + + /// Controls whether schema agreement is automatically awaited + /// after sending a schema-altering statement. + pub schema_agreement_automatic_waiting: bool, + + /// If true, full schema metadata is fetched after successfully reaching a schema agreement. + /// It is true by default but can be disabled if successive schema-altering statements should be performed. + pub refresh_metadata_on_auto_schema_agreement: bool, /// The address translator is used to translate addresses received from ScyllaDB nodes /// (either with cluster metadata or with an event) to addresses that can be used to @@ -244,10 +256,6 @@ pub struct SessionConfig { /// re-establishing the control connection. pub host_filter: Option>, - /// If true, full schema metadata is fetched after successfully reaching a schema agreement. - /// It is true by default but can be disabled if successive schema-altering statements should be performed. - pub refresh_metadata_on_auto_schema_agreement: bool, - /// If the driver is to connect to ScyllaCloud, there is a config for it. #[cfg(feature = "cloud")] pub cloud_config: Option>, @@ -331,7 +339,8 @@ impl SessionConfig { fetch_schema_metadata: true, keepalive_interval: Some(Duration::from_secs(30)), keepalive_timeout: Some(Duration::from_secs(30)), - auto_await_schema_agreement_timeout: Some(std::time::Duration::from_secs(60)), + schema_agreement_timeout: Duration::from_secs(60), + schema_agreement_automatic_waiting: true, address_translator: None, host_filter: None, refresh_metadata_on_auto_schema_agreement: true, @@ -587,7 +596,8 @@ impl Session { default_execution_profile_handle, schema_agreement_interval: config.schema_agreement_interval, metrics: Arc::new(Metrics::new()), - auto_await_schema_agreement_timeout: config.auto_await_schema_agreement_timeout, + schema_agreement_timeout: config.schema_agreement_timeout, + schema_agreement_automatic_waiting: config.schema_agreement_automatic_waiting, refresh_metadata_on_auto_schema_agreement: config .refresh_metadata_on_auto_schema_agreement, keyspace_name: ArcSwapOption::default(), // will be set by use_keyspace @@ -766,10 +776,8 @@ impl Session { contents: &str, response: &NonErrorQueryResponse, ) -> Result<(), QueryError> { - if let Some(timeout) = self.auto_await_schema_agreement_timeout { - if response.as_schema_change().is_some() - && !self.await_timed_schema_agreement(timeout).await? - { + if self.schema_agreement_automatic_waiting { + if response.as_schema_change().is_some() && !self.await_schema_agreement().await? { // TODO: The TimeoutError should allow to provide more context. // For now, print an error to the logs error!( @@ -1840,20 +1848,20 @@ impl Session { last_error.map(Result::Err) } - pub async fn await_schema_agreement(&self) -> Result<(), QueryError> { + async fn await_schema_agreement_indefinitely(&self) -> Result<(), QueryError> { while !self.check_schema_agreement().await? { tokio::time::sleep(self.schema_agreement_interval).await } Ok(()) } - pub async fn await_timed_schema_agreement( - &self, - timeout_duration: Duration, - ) -> Result { - timeout(timeout_duration, self.await_schema_agreement()) - .await - .map_or(Ok(false), |res| res.and(Ok(true))) + pub async fn await_schema_agreement(&self) -> Result { + timeout( + self.schema_agreement_timeout, + self.await_schema_agreement_indefinitely(), + ) + .await + .map_or(Ok(false), |res| res.and(Ok(true))) } pub async fn check_schema_agreement(&self) -> Result { diff --git a/scylla/src/transport/session_builder.rs b/scylla/src/transport/session_builder.rs index 8804e10e9b..8c87e450a2 100644 --- a/scylla/src/transport/session_builder.rs +++ b/scylla/src/transport/session_builder.rs @@ -717,8 +717,8 @@ impl GenericSessionBuilder { self } - /// Enables automatic wait for schema agreement and sets the timeout for it. - /// By default, it is enabled and the timeout is 60 seconds. + /// Sets the timeout for waiting for schema agreement. + /// By default, the timeout is 60 seconds. /// /// # Example /// ``` @@ -726,19 +726,19 @@ impl GenericSessionBuilder { /// # async fn example() -> Result<(), Box> { /// let session: Session = SessionBuilder::new() /// .known_node("127.0.0.1:9042") - /// .auto_schema_agreement_timeout(std::time::Duration::from_secs(120)) + /// .schema_agreement_timeout(std::time::Duration::from_secs(120)) /// .build() /// .await?; /// # Ok(()) /// # } /// ``` - pub fn auto_schema_agreement_timeout(mut self, timeout: Duration) -> Self { - self.config.auto_await_schema_agreement_timeout = Some(timeout); + pub fn schema_agreement_timeout(mut self, timeout: Duration) -> Self { + self.config.schema_agreement_timeout = timeout; self } - /// Disables automatic wait for schema agreement. - /// By default, it is enabled and the timeout is 60 seconds. + /// Controls automatic waiting for schema agreement after a schema-altering + /// statement is sent. By default, it is enabled. /// /// # Example /// ``` @@ -752,8 +752,8 @@ impl GenericSessionBuilder { /// # Ok(()) /// # } /// ``` - pub fn no_auto_schema_agreement(mut self) -> Self { - self.config.auto_await_schema_agreement_timeout = None; + pub fn auto_await_schema_agreement(mut self, enabled: bool) -> Self { + self.config.schema_agreement_automatic_waiting = enabled; self } diff --git a/scylla/src/transport/session_test.rs b/scylla/src/transport/session_test.rs index 35e2f7bf4d..ead2481be6 100644 --- a/scylla/src/transport/session_test.rs +++ b/scylla/src/transport/session_test.rs @@ -1117,12 +1117,8 @@ async fn test_await_schema_agreement() { #[tokio::test] async fn test_await_timed_schema_agreement() { - use std::time::Duration; let session = create_new_session_builder().build().await.unwrap(); - session - .await_timed_schema_agreement(Duration::from_millis(50)) - .await - .unwrap(); + session.await_schema_agreement().await.unwrap(); } #[tokio::test]