Skip to content

Commit

Permalink
Merge pull request #1103 from muzarski/retry-policy-behind-arc
Browse files Browse the repository at this point in the history
retry_policy: replace all usages of `Box<dyn RetryPolicy>` with `Arc<...>`
  • Loading branch information
Lorak-mmk authored Oct 24, 2024
2 parents aef0cf5 + c958c9b commit 9aaf25a
Show file tree
Hide file tree
Showing 13 changed files with 38 additions and 56 deletions.
2 changes: 1 addition & 1 deletion docs/source/execution-profiles/maximal-example.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ let profile = ExecutionProfile::builder()
.consistency(Consistency::All)
.serial_consistency(Some(SerialConsistency::Serial))
.request_timeout(Some(Duration::from_secs(30)))
.retry_policy(Box::new(FallthroughRetryPolicy::new()))
.retry_policy(Arc::new(FallthroughRetryPolicy::new()))
.load_balancing_policy(Arc::new(DefaultPolicy::default()))
.speculative_execution_policy(
Some(
Expand Down
7 changes: 4 additions & 3 deletions docs/source/retry-policy/default.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ To use in `Session`:
# extern crate scylla;
# use scylla::Session;
# use std::error::Error;
# use std::sync::Arc;
# async fn check_only_compiles() -> Result<(), Box<dyn Error>> {
use scylla::{Session, SessionBuilder};
use scylla::transport::ExecutionProfile;
use scylla::transport::retry_policy::DefaultRetryPolicy;

let handle = ExecutionProfile::builder()
.retry_policy(Box::new(DefaultRetryPolicy::new()))
.retry_policy(Arc::new(DefaultRetryPolicy::new()))
.build()
.into_handle();

Expand Down Expand Up @@ -45,7 +46,7 @@ my_query.set_retry_policy(Some(Arc::new(DefaultRetryPolicy::new())));

// You can also set retry policy in an execution profile
let handle = ExecutionProfile::builder()
.retry_policy(Box::new(DefaultRetryPolicy::new()))
.retry_policy(Arc::new(DefaultRetryPolicy::new()))
.build()
.into_handle();
my_query.set_execution_profile_handle(Some(handle));
Expand Down Expand Up @@ -76,7 +77,7 @@ prepared.set_retry_policy(Some(Arc::new(DefaultRetryPolicy::new())));

// You can also set retry policy in an execution profile
let handle = ExecutionProfile::builder()
.retry_policy(Box::new(DefaultRetryPolicy::new()))
.retry_policy(Arc::new(DefaultRetryPolicy::new()))
.build()
.into_handle();
prepared.set_execution_profile_handle(Some(handle));
Expand Down
9 changes: 6 additions & 3 deletions docs/source/retry-policy/downgrading-consistency.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@ To use in `Session`:
# extern crate scylla;
# use scylla::Session;
# use std::error::Error;
# use std::sync::Arc;
# async fn check_only_compiles() -> Result<(), Box<dyn Error>> {
use scylla::{Session, SessionBuilder};
use scylla::transport::ExecutionProfile;
use scylla::transport::downgrading_consistency_retry_policy::DowngradingConsistencyRetryPolicy;

let handle = ExecutionProfile::builder()
.retry_policy(Box::new(DowngradingConsistencyRetryPolicy::new()))
.retry_policy(Arc::new(DowngradingConsistencyRetryPolicy::new()))
.build()
.into_handle();

Expand All @@ -74,13 +75,14 @@ To use in a [simple query](../queries/simple.md):
# extern crate scylla;
# use scylla::Session;
# use std::error::Error;
# use std::sync::Arc;
# async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::query::Query;
use scylla::transport::ExecutionProfile;
use scylla::transport::downgrading_consistency_retry_policy::DowngradingConsistencyRetryPolicy;

let handle = ExecutionProfile::builder()
.retry_policy(Box::new(DowngradingConsistencyRetryPolicy::new()))
.retry_policy(Arc::new(DowngradingConsistencyRetryPolicy::new()))
.build()
.into_handle();

Expand All @@ -100,13 +102,14 @@ To use in a [prepared query](../queries/prepared.md):
# extern crate scylla;
# use scylla::Session;
# use std::error::Error;
# use std::sync::Arc;
# async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::prepared_statement::PreparedStatement;
use scylla::transport::ExecutionProfile;
use scylla::transport::downgrading_consistency_retry_policy::DowngradingConsistencyRetryPolicy;

let handle = ExecutionProfile::builder()
.retry_policy(Box::new(DowngradingConsistencyRetryPolicy::new()))
.retry_policy(Arc::new(DowngradingConsistencyRetryPolicy::new()))
.build()
.into_handle();

Expand Down
9 changes: 6 additions & 3 deletions docs/source/retry-policy/fallthrough.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ To use in `Session`:
# extern crate scylla;
# use scylla::Session;
# use std::error::Error;
# use std::sync::Arc;
# async fn check_only_compiles() -> Result<(), Box<dyn Error>> {
use scylla::{Session, SessionBuilder};
use scylla::transport::ExecutionProfile;
use scylla::transport::retry_policy::FallthroughRetryPolicy;

let handle = ExecutionProfile::builder()
.retry_policy(Box::new(FallthroughRetryPolicy::new()))
.retry_policy(Arc::new(FallthroughRetryPolicy::new()))
.build()
.into_handle();

Expand All @@ -32,13 +33,14 @@ To use in a [simple query](../queries/simple.md):
# extern crate scylla;
# use scylla::Session;
# use std::error::Error;
# use std::sync::Arc;
# async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::query::Query;
use scylla::transport::ExecutionProfile;
use scylla::transport::retry_policy::FallthroughRetryPolicy;

let handle = ExecutionProfile::builder()
.retry_policy(Box::new(FallthroughRetryPolicy::new()))
.retry_policy(Arc::new(FallthroughRetryPolicy::new()))
.build()
.into_handle();

Expand All @@ -58,13 +60,14 @@ To use in a [prepared query](../queries/prepared.md):
# extern crate scylla;
# use scylla::Session;
# use std::error::Error;
# use std::sync::Arc;
# async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
use scylla::prepared_statement::PreparedStatement;
use scylla::transport::ExecutionProfile;
use scylla::transport::retry_policy::FallthroughRetryPolicy;

let handle = ExecutionProfile::builder()
.retry_policy(Box::new(FallthroughRetryPolicy::new()))
.retry_policy(Arc::new(FallthroughRetryPolicy::new()))
.build()
.into_handle();

Expand Down
4 changes: 2 additions & 2 deletions examples/execution_profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async fn main() -> Result<()> {
.serial_consistency(Some(SerialConsistency::Serial))
.request_timeout(Some(Duration::from_secs(42)))
.load_balancing_policy(Arc::new(load_balancing::DefaultPolicy::default()))
.retry_policy(Box::new(FallthroughRetryPolicy::new()))
.retry_policy(Arc::new(FallthroughRetryPolicy::new()))
.speculative_execution_policy(Some(Arc::new(PercentileSpeculativeExecutionPolicy {
max_retry_count: 2,
percentile: 42.0,
Expand All @@ -34,7 +34,7 @@ async fn main() -> Result<()> {
.serial_consistency(None)
.request_timeout(Some(Duration::from_secs(3)))
.load_balancing_policy(Arc::new(load_balancing::DefaultPolicy::default()))
.retry_policy(Box::new(DefaultRetryPolicy::new()))
.retry_policy(Arc::new(DefaultRetryPolicy::new()))
.speculative_execution_policy(None)
.build();

Expand Down
4 changes: 0 additions & 4 deletions scylla/src/transport/downgrading_consistency_retry_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ impl RetryPolicy for DowngradingConsistencyRetryPolicy {
fn new_session(&self) -> Box<dyn RetrySession> {
Box::new(DowngradingConsistencyRetrySession::new())
}

fn clone_boxed(&self) -> Box<dyn RetryPolicy> {
Box::new(DowngradingConsistencyRetryPolicy)
}
}

pub struct DowngradingConsistencyRetrySession {
Expand Down
19 changes: 11 additions & 8 deletions scylla/src/transport/execution_profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ pub(crate) mod defaults {
pub(crate) fn load_balancing_policy() -> Arc<dyn LoadBalancingPolicy> {
Arc::new(load_balancing::DefaultPolicy::default())
}
pub(crate) fn retry_policy() -> Box<dyn RetryPolicy> {
Box::new(DefaultRetryPolicy::new())
pub(crate) fn retry_policy() -> Arc<dyn RetryPolicy> {
Arc::new(DefaultRetryPolicy::new())
}
pub(crate) fn speculative_execution_policy() -> Option<Arc<dyn SpeculativeExecutionPolicy>> {
None
Expand All @@ -218,10 +218,11 @@ pub(crate) mod defaults {
/// ```
/// # use scylla::transport::{ExecutionProfile, retry_policy::FallthroughRetryPolicy};
/// # use scylla::statement::Consistency;
/// # use std::sync::Arc;
/// # fn example() -> Result<(), Box<dyn std::error::Error>> {
/// let profile: ExecutionProfile = ExecutionProfile::builder()
/// .consistency(Consistency::Three) // as this is the number we shall count to
/// .retry_policy(Box::new(FallthroughRetryPolicy::new()))
/// .retry_policy(Arc::new(FallthroughRetryPolicy::new()))
/// .build();
/// # Ok(())
/// # }
Expand All @@ -232,7 +233,7 @@ pub struct ExecutionProfileBuilder {
consistency: Option<Consistency>,
serial_consistency: Option<Option<SerialConsistency>>,
load_balancing_policy: Option<Arc<dyn LoadBalancingPolicy>>,
retry_policy: Option<Box<dyn RetryPolicy>>,
retry_policy: Option<Arc<dyn RetryPolicy>>,
speculative_execution_policy: Option<Option<Arc<dyn SpeculativeExecutionPolicy>>>,
}

Expand Down Expand Up @@ -302,14 +303,15 @@ impl ExecutionProfileBuilder {
/// ```
/// use scylla::transport::retry_policy::DefaultRetryPolicy;
/// # use scylla::transport::ExecutionProfile;
/// # use std::sync::Arc;
/// # fn example() -> Result<(), Box<dyn std::error::Error>> {
/// let profile: ExecutionProfile = ExecutionProfile::builder()
/// .retry_policy(Box::new(DefaultRetryPolicy::new()))
/// .retry_policy(Arc::new(DefaultRetryPolicy::new()))
/// .build();
/// # Ok(())
/// # }
/// ```
pub fn retry_policy(mut self, retry_policy: Box<dyn RetryPolicy>) -> Self {
pub fn retry_policy(mut self, retry_policy: Arc<dyn RetryPolicy>) -> Self {
self.retry_policy = Some(retry_policy);
self
}
Expand Down Expand Up @@ -352,9 +354,10 @@ impl ExecutionProfileBuilder {
/// ```
/// use scylla::transport::retry_policy::DefaultRetryPolicy;
/// # use scylla::transport::ExecutionProfile;
/// # use std::sync::Arc;
/// # fn example() -> Result<(), Box<dyn std::error::Error>> {
/// let profile: ExecutionProfile = ExecutionProfile::builder()
/// .retry_policy(Box::new(DefaultRetryPolicy::new()))
/// .retry_policy(Arc::new(DefaultRetryPolicy::new()))
/// .build();
/// # Ok(())
/// # }
Expand Down Expand Up @@ -402,7 +405,7 @@ pub(crate) struct ExecutionProfileInner {
pub(crate) serial_consistency: Option<SerialConsistency>,

pub(crate) load_balancing_policy: Arc<dyn LoadBalancingPolicy>,
pub(crate) retry_policy: Box<dyn RetryPolicy>,
pub(crate) retry_policy: Arc<dyn RetryPolicy>,
pub(crate) speculative_execution_policy: Option<Arc<dyn SpeculativeExecutionPolicy>>,
}

Expand Down
17 changes: 0 additions & 17 deletions scylla/src/transport/retry_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,6 @@ pub enum RetryDecision {
pub trait RetryPolicy: std::fmt::Debug + Send + Sync {
/// Called for each new query, starts a session of deciding about retries
fn new_session(&self) -> Box<dyn RetrySession>;

/// Used to clone this RetryPolicy
fn clone_boxed(&self) -> Box<dyn RetryPolicy>;
}

impl Clone for Box<dyn RetryPolicy> {
fn clone(&self) -> Box<dyn RetryPolicy> {
self.clone_boxed()
}
}

/// Used throughout a single query to decide when to retry it
Expand Down Expand Up @@ -71,10 +62,6 @@ impl RetryPolicy for FallthroughRetryPolicy {
fn new_session(&self) -> Box<dyn RetrySession> {
Box::new(FallthroughRetrySession)
}

fn clone_boxed(&self) -> Box<dyn RetryPolicy> {
Box::new(FallthroughRetryPolicy)
}
}

impl RetrySession for FallthroughRetrySession {
Expand Down Expand Up @@ -106,10 +93,6 @@ impl RetryPolicy for DefaultRetryPolicy {
fn new_session(&self) -> Box<dyn RetrySession> {
Box::new(DefaultRetrySession::new())
}

fn clone_boxed(&self) -> Box<dyn RetryPolicy> {
Box::new(DefaultRetryPolicy)
}
}

pub struct DefaultRetrySession {
Expand Down
5 changes: 1 addition & 4 deletions scylla/src/transport/session_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2708,9 +2708,6 @@ async fn test_iter_works_when_retry_policy_returns_ignore_write_error() {
fn new_session(&self) -> Box<dyn RetrySession> {
Box::new(MyRetrySession(self.0.clone()))
}
fn clone_boxed(&self) -> Box<dyn RetryPolicy> {
Box::new(MyRetryPolicy(self.0.clone()))
}
}

struct MyRetrySession(Arc<AtomicBool>);
Expand All @@ -2724,7 +2721,7 @@ async fn test_iter_works_when_retry_policy_returns_ignore_write_error() {

let handle = ExecutionProfile::builder()
.consistency(Consistency::All)
.retry_policy(Box::new(MyRetryPolicy(retried_flag.clone())))
.retry_policy(Arc::new(MyRetryPolicy(retried_flag.clone())))
.build()
.into_handle();

Expand Down
4 changes: 2 additions & 2 deletions scylla/tests/integration/consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ async fn consistency_is_correctly_set_in_cql_requests() {
}

let fallthrough_exec_profile_builder =
ExecutionProfile::builder().retry_policy(Box::new(FallthroughRetryPolicy));
ExecutionProfile::builder().retry_policy(Arc::new(FallthroughRetryPolicy));

let translation_map = Arc::new(translation_map);

Expand Down Expand Up @@ -421,7 +421,7 @@ async fn consistency_is_correctly_set_in_routing_info() {
};

let exec_profile_builder = ExecutionProfile::builder()
.retry_policy(Box::new(FallthroughRetryPolicy))
.retry_policy(Arc::new(FallthroughRetryPolicy))
.load_balancing_policy(Arc::new(reporting_load_balancer));

// DB preparation phase
Expand Down
8 changes: 2 additions & 6 deletions scylla/tests/integration/execution_profiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,6 @@ impl<const NODE: u8> RetryPolicy for BoundToPredefinedNodePolicy<NODE> {
self.report_node(Report::RetryPolicy);
Box::new(self.clone())
}

fn clone_boxed(&self) -> Box<dyn RetryPolicy> {
Box::new(self.clone())
}
}

impl<const NODE: u8> RetrySession for BoundToPredefinedNodePolicy<NODE> {
Expand Down Expand Up @@ -145,15 +141,15 @@ async fn test_execution_profiles() {

let profile1 = ExecutionProfile::builder()
.load_balancing_policy(policy1.clone())
.retry_policy(Box::new(policy1.deref().clone()))
.retry_policy(Arc::new(policy1.deref().clone()))
.consistency(Consistency::One)
.serial_consistency(None)
.speculative_execution_policy(None)
.build();

let profile2 = ExecutionProfile::builder()
.load_balancing_policy(policy2.clone())
.retry_policy(Box::new(policy2.deref().clone()))
.retry_policy(Arc::new(policy2.deref().clone()))
.consistency(Consistency::Two)
.serial_consistency(Some(SerialConsistency::LocalSerial))
.speculative_execution_policy(Some(policy2))
Expand Down
2 changes: 1 addition & 1 deletion scylla/tests/integration/lwt_optimisation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async fn if_lwt_optimisation_mark_offered_then_negotiatied_and_lwt_routed_optima
});

let handle = ExecutionProfile::builder()
.retry_policy(Box::new(FallthroughRetryPolicy))
.retry_policy(Arc::new(FallthroughRetryPolicy))
.build()
.into_handle();

Expand Down
4 changes: 2 additions & 2 deletions scylla/tests/integration/retries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async fn speculative_execution_is_fired() {
let simple_speculative_no_retry_profile = ExecutionProfile::builder().speculative_execution_policy(Some(Arc::new(SimpleSpeculativeExecutionPolicy {
max_retry_count: 2,
retry_interval: Duration::from_millis(10),
}))).retry_policy(Box::new(FallthroughRetryPolicy)).build();
}))).retry_policy(Arc::new(FallthroughRetryPolicy)).build();
let session: Session = SessionBuilder::new()
.known_node(proxy_uris[0].as_str())
.default_execution_profile_handle(simple_speculative_no_retry_profile.into_handle())
Expand Down Expand Up @@ -180,7 +180,7 @@ async fn speculative_execution_panic_regression_test() {
};
let profile = ExecutionProfile::builder()
.speculative_execution_policy(Some(Arc::new(se)))
.retry_policy(Box::new(FallthroughRetryPolicy))
.retry_policy(Arc::new(FallthroughRetryPolicy))
.build();
// DB preparation phase
let session: Session = SessionBuilder::new()
Expand Down

0 comments on commit 9aaf25a

Please sign in to comment.