Skip to content

Commit

Permalink
session: await_schema_agreement() gets implicit timeout
Browse files Browse the repository at this point in the history
Similarly to
- what Java driver 3 does in regard to awaiting schema
agreement's API,
- what was recently done in Rust driver in regard to tracing info's API,

the API of awaiting schema agreement is altered so that:
- `await_timed_schema_agreement()`, which takes explicit timeout, is
  removed,
- `await_schema_agreement()` is now bound with a timeout that is set
  globally per-`Session` in `SessionConfig`.

The motivation is that it could lead to application's deadlock when
`await_schema_agreement()` was called with no timeout given and the
cluster stoped synchronising (so schema agreement would be never
reached), It is rarely desirable to block the calling application for
arbitrarily long time, especially that some extreme situations are
possible, such as a network partition between nodes.

`SessionBuilder`s API is accomodated to the changes. Docs are updated as
well.
  • Loading branch information
wprzytula committed Aug 3, 2023
1 parent d93e3a8 commit 7a846ea
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 75 deletions.
40 changes: 11 additions & 29 deletions docs/source/queries/schema-agreement.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Schema agreement

Sometimes after performing queries some nodes have not been updated so we need a mechanism that checks if every node have agreed schema version.
There are four methods in `Session` that assist us.
Sometimes after performing queries some nodes have not been updated, so we need a mechanism that checks if every node have agreed on schema version.
There is a number of methods in `Session` that assist us.
Every method raise `QueryError` if something goes wrong, but they should never raise any errors, unless there is a DB or connection malfunction.

### Checking schema version
`Session::fetch_schema_version` returns an `Uuid` of local node's schema version.
`Session::fetch_schema_version` returns an `Uuid` of local node's schema version.

```rust
# extern crate scylla;
Expand All @@ -19,7 +19,9 @@ println!("Local schema version is: {}", session.fetch_schema_version().await?);

### Awaiting schema agreement

`Session::await_schema_agreement` returns a `Future` that can be `await`ed on as long as schema is not in an agreement.
`Session::await_schema_agreement` returns a `Future` that can be `await`ed as long as schema is not in an agreement.
However, it won't wait forever; `SessionConfig` defines a timeout that limits the time of waiting. If the timeout elapses,
the return value is `false`, otherwise it is `true`.

```rust
# extern crate scylla;
Expand All @@ -31,28 +33,10 @@ session.await_schema_agreement().await?;
# }
```

### Awaiting with timeout
We can also set timeout in milliseconds with `Session::await_timed_schema_agreement`.
It takes one argument, an `std::time::Duration` value that tells how long our driver should await for schema agreement. If the timeout is met the return value is `false` otherwise it is `true`.

```rust
# extern crate scylla;
# use scylla::Session;
# use std::error::Error;
# use std::time::Duration;
# async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
if session.await_timed_schema_agreement(Duration::from_secs(5)).await? { // wait for 5 seconds
println!("SCHEMA AGREED");
} else {
println!("SCHEMA IS NOT IN AGREEMENT - TIMED OUT");
}
# Ok(())
# }
```

### Checking for schema interval
If schema is not agreed driver sleeps for a duration before checking it again. Default value is 200 milliseconds but it can be changed with `SessionBuilder::schema_agreement_interval`.
### Interval of checking for schema agreement

If the schema is not agreed upon, the driver sleeps for a duration before checking it again. The default value is 200 milliseconds,
but it can be changed with `SessionBuilder::schema_agreement_interval`.

```rust
# extern crate scylla;
Expand All @@ -70,21 +54,19 @@ SessionBuilder::new()
```

### Checking if schema is in agreement now
If you want to check if schema is in agreement now without retrying after failure you can use `Session::check_schema_agreement` function.

If you want to check if schema is in agreement now, without retrying after failure, you can use `Session::check_schema_agreement` function.

```rust
# extern crate scylla;
# use scylla::Session;
# use std::error::Error;
# async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
if session.check_schema_agreement().await? {
if session.check_schema_agreement().await? {
println!("SCHEMA AGREED");
} else {
println!("SCHEMA IS NOT IN AGREEMENT");
}
# Ok(())
# }
```


11 changes: 3 additions & 8 deletions examples/schema_agreement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,12 @@ async fn main() -> Result<()> {
let schema_version = session.fetch_schema_version().await?;
println!("Schema version: {}", schema_version);

session.await_schema_agreement().await?; // without timeout example
session.query("CREATE KEYSPACE IF NOT EXISTS ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}", &[]).await?;

if session
.await_timed_schema_agreement(Duration::from_secs(5))
.await?
{
// with timeout example
println!("Timed schema is in agreement");
if session.await_schema_agreement().await? {
println!("Schema is in agreement");
} else {
println!("Timed schema is NOT in agreement");
println!("Schema is NOT in agreement");
}
session
.query(
Expand Down
56 changes: 32 additions & 24 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ pub struct Session {
default_execution_profile_handle: ExecutionProfileHandle,
schema_agreement_interval: Duration,
metrics: Arc<Metrics>,
auto_await_schema_agreement_timeout: Option<Duration>,
schema_agreement_timeout: Duration,
schema_agreement_automatic_waiting: bool,
refresh_metadata_on_auto_schema_agreement: bool,
keyspace_name: ArcSwapOption<String>,
tracing_info_fetch_attempts: NonZeroU32,
Expand All @@ -170,7 +171,7 @@ impl std::fmt::Debug for Session {
.field("metrics", &self.metrics)
.field(
"auto_await_schema_agreement_timeout",
&self.auto_await_schema_agreement_timeout,
&self.schema_agreement_timeout,
)
.finish()
}
Expand Down Expand Up @@ -204,7 +205,6 @@ pub struct SessionConfig {

pub authenticator: Option<Arc<dyn AuthenticatorProvider>>,

pub schema_agreement_interval: Duration,
pub connect_timeout: Duration,

/// Size of the per-node connection pool, i.e. how many connections the driver should keep to each node.
Expand All @@ -229,9 +229,21 @@ pub struct SessionConfig {
/// If `None`, connections are never closed due to lack of response to a keepalive message.
pub keepalive_timeout: Option<Duration>,

/// Controls the timeout for the automatic wait for schema agreement after sending a schema-altering statement.
/// If `None`, the automatic schema agreement is disabled.
pub auto_await_schema_agreement_timeout: Option<Duration>,
/// How often the driver should ask if schema is in agreement.
pub schema_agreement_interval: Duration,

/// Controls the timeout for waiting for schema agreement.
/// This works both for manual awaiting schema agreement and for
/// automatic waiting after a schema-altering statement is sent.
pub schema_agreement_timeout: Duration,

/// Controls whether schema agreement is automatically awaited
/// after sending a schema-altering statement.
pub schema_agreement_automatic_waiting: bool,

/// If true, full schema metadata is fetched after successfully reaching a schema agreement.
/// It is true by default but can be disabled if successive schema-altering statements should be performed.
pub refresh_metadata_on_auto_schema_agreement: bool,

/// The address translator is used to translate addresses received from ScyllaDB nodes
/// (either with cluster metadata or with an event) to addresses that can be used to
Expand All @@ -244,10 +256,6 @@ pub struct SessionConfig {
/// re-establishing the control connection.
pub host_filter: Option<Arc<dyn HostFilter>>,

/// If true, full schema metadata is fetched after successfully reaching a schema agreement.
/// It is true by default but can be disabled if successive schema-altering statements should be performed.
pub refresh_metadata_on_auto_schema_agreement: bool,

/// If the driver is to connect to ScyllaCloud, there is a config for it.
#[cfg(feature = "cloud")]
pub cloud_config: Option<Arc<CloudConfig>>,
Expand Down Expand Up @@ -331,7 +339,8 @@ impl SessionConfig {
fetch_schema_metadata: true,
keepalive_interval: Some(Duration::from_secs(30)),
keepalive_timeout: Some(Duration::from_secs(30)),
auto_await_schema_agreement_timeout: Some(std::time::Duration::from_secs(60)),
schema_agreement_timeout: Duration::from_secs(60),
schema_agreement_automatic_waiting: true,
address_translator: None,
host_filter: None,
refresh_metadata_on_auto_schema_agreement: true,
Expand Down Expand Up @@ -587,7 +596,8 @@ impl Session {
default_execution_profile_handle,
schema_agreement_interval: config.schema_agreement_interval,
metrics: Arc::new(Metrics::new()),
auto_await_schema_agreement_timeout: config.auto_await_schema_agreement_timeout,
schema_agreement_timeout: config.schema_agreement_timeout,
schema_agreement_automatic_waiting: config.schema_agreement_automatic_waiting,
refresh_metadata_on_auto_schema_agreement: config
.refresh_metadata_on_auto_schema_agreement,
keyspace_name: ArcSwapOption::default(), // will be set by use_keyspace
Expand Down Expand Up @@ -766,10 +776,8 @@ impl Session {
contents: &str,
response: &NonErrorQueryResponse,
) -> Result<(), QueryError> {
if let Some(timeout) = self.auto_await_schema_agreement_timeout {
if response.as_schema_change().is_some()
&& !self.await_timed_schema_agreement(timeout).await?
{
if self.schema_agreement_automatic_waiting {
if response.as_schema_change().is_some() && !self.await_schema_agreement().await? {
// TODO: The TimeoutError should allow to provide more context.
// For now, print an error to the logs
error!(
Expand Down Expand Up @@ -1840,20 +1848,20 @@ impl Session {
last_error.map(Result::Err)
}

pub async fn await_schema_agreement(&self) -> Result<(), QueryError> {
async fn await_schema_agreement_indefinitely(&self) -> Result<(), QueryError> {
while !self.check_schema_agreement().await? {
tokio::time::sleep(self.schema_agreement_interval).await
}
Ok(())
}

pub async fn await_timed_schema_agreement(
&self,
timeout_duration: Duration,
) -> Result<bool, QueryError> {
timeout(timeout_duration, self.await_schema_agreement())
.await
.map_or(Ok(false), |res| res.and(Ok(true)))
pub async fn await_schema_agreement(&self) -> Result<bool, QueryError> {
timeout(
self.schema_agreement_timeout,
self.await_schema_agreement_indefinitely(),
)
.await
.map_or(Ok(false), |res| res.and(Ok(true)))
}

pub async fn check_schema_agreement(&self) -> Result<bool, QueryError> {
Expand Down
18 changes: 9 additions & 9 deletions scylla/src/transport/session_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,28 +717,28 @@ impl<K: SessionBuilderKind> GenericSessionBuilder<K> {
self
}

/// Enables automatic wait for schema agreement and sets the timeout for it.
/// By default, it is enabled and the timeout is 60 seconds.
/// Sets the timeout for waiting for schema agreement.
/// By default, the timeout is 60 seconds.
///
/// # Example
/// ```
/// # use scylla::{Session, SessionBuilder};
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// let session: Session = SessionBuilder::new()
/// .known_node("127.0.0.1:9042")
/// .auto_schema_agreement_timeout(std::time::Duration::from_secs(120))
/// .schema_agreement_timeout(std::time::Duration::from_secs(120))
/// .build()
/// .await?;
/// # Ok(())
/// # }
/// ```
pub fn auto_schema_agreement_timeout(mut self, timeout: Duration) -> Self {
self.config.auto_await_schema_agreement_timeout = Some(timeout);
pub fn schema_agreement_timeout(mut self, timeout: Duration) -> Self {
self.config.schema_agreement_timeout = timeout;
self
}

/// Disables automatic wait for schema agreement.
/// By default, it is enabled and the timeout is 60 seconds.
/// Controls automatic waiting for schema agreement after a schema-altering
/// statement is sent. By default, it is enabled.
///
/// # Example
/// ```
Expand All @@ -752,8 +752,8 @@ impl<K: SessionBuilderKind> GenericSessionBuilder<K> {
/// # Ok(())
/// # }
/// ```
pub fn no_auto_schema_agreement(mut self) -> Self {
self.config.auto_await_schema_agreement_timeout = None;
pub fn auto_await_schema_agreement(mut self, enabled: bool) -> Self {
self.config.schema_agreement_automatic_waiting = enabled;
self
}

Expand Down
6 changes: 1 addition & 5 deletions scylla/src/transport/session_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1117,12 +1117,8 @@ async fn test_await_schema_agreement() {

#[tokio::test]
async fn test_await_timed_schema_agreement() {
use std::time::Duration;
let session = create_new_session_builder().build().await.unwrap();
session
.await_timed_schema_agreement(Duration::from_millis(50))
.await
.unwrap();
session.await_schema_agreement().await.unwrap();
}

#[tokio::test]
Expand Down

0 comments on commit 7a846ea

Please sign in to comment.