Skip to content

Commit

Permalink
refactor object store operation timeout code (openzfs#837)
Browse files Browse the repository at this point in the history
There are only a few operations that use the caller-visible timeout
`OAError::TimeoutError`:

* The PoolOwnerPhys, during try_claim().  Timeout is <2s.
* PoolPhys::create(), during pool creation.  Timeout is 30s.
* The HeartbeatPhys, during start_heartbeat().  Timeout is 10s.
* create_object_test(), during test_connectivity.  Timeout is 30s.

This commit simplifies the timeout code by:
* Always applying the PER_OPERATION_TIMEOUT (2s), which is retried
  transparently and indefinitely.
* Optionally applying a user-visible timeout only to
  put_object_timeout().

There's a slight change in behavior which is that when the caller's
timeout is >2s, their request may now be timed out and retried after 2s,
whereas before we would wait until the caller-specified timeout (up to
30s) and then fail.

This allows reducing the None arguments that are passed in many places,
as well as the tricky `xor` logic in the retry code.
  • Loading branch information
ahrens authored May 11, 2023
1 parent f1b5a4e commit 8d19597
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 74 deletions.
6 changes: 3 additions & 3 deletions cmd/zfs_object_agent/zettaobject/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ impl HeartbeatPhys {
pub async fn put_timeout(
&self,
object_access: &ObjectAccess,
timeout: Option<Duration>,
timeout: Duration,
) -> Result<(), OAError<PutError>> {
maybe_die_with(|| format!("before putting {self:#?}"));
trace!("putting {:#?}", self);
let buf = serde_json::to_vec(&self).unwrap();
object_access
.put_object_timed(
.put_object_timeout(
Self::key(self.id),
buf.into(),
// XXX should this be its own stat type so that it has its own queue in the
Expand Down Expand Up @@ -278,7 +278,7 @@ pub async fn start_heartbeat(object_access: Arc<ObjectAccess>, id: Uuid) -> Hear
};
let instant = Instant::now();
let result = heartbeat
.put_timeout(&object_access, Some(*HEARTBEAT_TIMEOUT))
.put_timeout(&object_access, *HEARTBEAT_TIMEOUT)
.await;
if lease_timed_out(last_heartbeat) {
if *HEARTBEAT_PANIC {
Expand Down
13 changes: 5 additions & 8 deletions cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::fs;
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

use anyhow::anyhow;
Expand Down Expand Up @@ -294,7 +293,7 @@ impl BucketAccessTrait for BlobBucketAccess {
let mut buckets = Vec::new();

loop {
let list_output = retry(msg, None, None, || async {
let list_output = retry(msg, None, || async {
let bucket_client = self.get_bucket_client().await;
let list_containers_builder = bucket_client.list_containers();
let list_containers_builder = if let Some(nm) = next_marker.as_ref() {
Expand Down Expand Up @@ -485,7 +484,7 @@ impl ObjectAccessTrait for BlobObjectAccess {
let op = self.access_stats.begin(stat_type);

let msg = format!("get {key}");
let bytes = retry(&msg, None, Some(&self.rate_limit_errors), || async {
let bytes = retry(&msg, Some(&self.rate_limit_errors), || async {
let begin = Instant::now();
let blob_client = self
.get_container_client()
Expand Down Expand Up @@ -530,14 +529,12 @@ impl ObjectAccessTrait for BlobObjectAccess {
key: String,
streamfunc: &(dyn Fn() -> (ByteStream, usize) + Send + Sync),
stat_type: ObjectAccessOpType,
timeout: Option<Duration>,
) -> Result<(), OAError<PutError>> {
let _permit = self.outstanding_ops[stat_type].acquire().await.unwrap();
let op = self.access_stats.begin(stat_type);

let result = retry(
&format!("put {key}"),
timeout,
Some(&self.rate_limit_errors),
|| async {
let blob_client = self
Expand Down Expand Up @@ -576,7 +573,7 @@ impl ObjectAccessTrait for BlobObjectAccess {
.map(|key| async move {
let op = self.access_stats.begin(ObjectAccessOpType::ObjectDelete);
let msg = format!("delete object {key}");
retry(&msg, None, Some(&self.rate_limit_errors), || async {
retry(&msg, Some(&self.rate_limit_errors), || async {
let begin = Instant::now();
let blob_client = self
.get_container_client()
Expand Down Expand Up @@ -614,7 +611,7 @@ impl ObjectAccessTrait for BlobObjectAccess {
#[async_backtrace::framed]
async fn stat_object(&self, key: String) -> Option<ObjectStat> {
let msg = format!("head {key}");
retry(&msg, None, Some(&self.rate_limit_errors), || async {
retry(&msg, Some(&self.rate_limit_errors), || async {
let blob_client = self
.get_container_client()
.await
Expand Down Expand Up @@ -648,7 +645,7 @@ impl ObjectAccessTrait for BlobObjectAccess {

Box::pin(try_stream! {
loop {
let output = retry(&msg, None, Some(&self.rate_limit_errors), || async {
let output = retry(&msg, Some(&self.rate_limit_errors), || async {
let container_client = self.get_container_client().await;
let list_builder = match use_delimiter {
true =>
Expand Down
65 changes: 20 additions & 45 deletions cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use core::time::Duration;
use std::collections::HashMap;
use std::convert;
use std::error::Error;
use std::fmt::Display;
use std::iter;
Expand Down Expand Up @@ -425,7 +426,7 @@ impl ObjectAccess {
{
assert!(!self.readonly);
self.as_trait()
.put_object_stream(key.clone(), &streamfunc, stat_type, None)
.put_object_stream(key, &streamfunc, stat_type)
.await
.unwrap();
}
Expand All @@ -436,7 +437,6 @@ impl ObjectAccess {
key: String,
bytes: Bytes,
stat_type: ObjectAccessOpType,
timeout: Option<Duration>,
) -> Result<(), OAError<PutError>> {
self.as_trait()
.put_object_stream(
Expand All @@ -457,7 +457,6 @@ impl ObjectAccess {
)
},
stat_type,
timeout,
)
.await
}
Expand All @@ -471,21 +470,24 @@ impl ObjectAccess {
// PutObject, a concurrent get_object() could retrieve the old value and
// add it to the cache, allowing the old value to be read (from the
// cache) after put_object() returns.
self.put_object_impl(key.clone(), data, stat_type, None)
.await
.unwrap();
self.put_object_impl(key, data, stat_type).await.unwrap();
}

/// Put an object to the object store. If the `timeout` is reached, the operation is aborted
/// and OAError::TimeoutError is returned. Note that the operation may be retried (including
/// via the PER_REQUEST_TIMEOUT) before the specified timeout is reached.
#[async_backtrace::framed]
pub async fn put_object_timed(
pub async fn put_object_timeout(
&self,
key: String,
data: Bytes,
stat_type: ObjectAccessOpType,
timeout: Option<Duration>,
timeout: Duration,
) -> Result<(), OAError<PutError>> {
self.put_object_impl(key.clone(), data, stat_type, timeout)
tokio::time::timeout(timeout, self.put_object_impl(key, data, stat_type))
.await
.map_err(OAError::TimeoutError)
.and_then(convert::identity)
}

#[async_backtrace::framed]
Expand Down Expand Up @@ -655,7 +657,6 @@ pub trait ObjectAccessTrait: Send + Sync {
// returns (stream, stream_len_bytes).
streamfunc: &(dyn Fn() -> (ByteStream, usize) + Send + Sync),
stat_type: ObjectAccessOpType,
timeout: Option<Duration>,
) -> Result<(), OAError<PutError>>;

async fn delete_objects(&self, stream: &mut (dyn Stream<Item = String> + Send + Unpin));
Expand Down Expand Up @@ -749,7 +750,6 @@ impl<E> Error for OAError<E> where E: std::error::Error + 'static {}
#[async_backtrace::framed]
async fn retry_impl<F, O, E>(
msg: &str,
timeout_opt_initial: Option<Duration>,
rate_limit_errors: Option<&RelaxedCounter>,
f: impl Fn() -> F,
) -> Result<O, OAError<E>>
Expand All @@ -760,16 +760,12 @@ where
let mut time_skew_retried = false;
let mut expired_token_retried = false;
let mut delay = Duration::from_secs_f64(thread_rng().gen_range(0.001..0.2));
let mut timeout_opt = timeout_opt_initial;
let mut timeout = PER_REQUEST_TIMEOUT.mul_f32(thread_rng().gen_range(0.9..1.1));
loop {
let begin = Instant::now();
let result = match timeout_opt {
Some(timeout) => match tokio::time::timeout(timeout, f()).await {
Err(e) => Err(OAError::TimeoutError(e)),
Ok(res2) => res2,
},
None => f().await,
};
let result = tokio::time::timeout(timeout, f())
.await
.unwrap_or_else(|e| Err(OAError::TimeoutError(e)));
let e = match result {
res @ Ok(_) => return res,
res @ Err(OAError::RequestError(RequestError::Service(_))) => return res,
Expand Down Expand Up @@ -825,8 +821,7 @@ where
}
Err(e @ OAError::RequestError(RequestError::ServiceTimeoutError)) => e,
Err(e @ OAError::TimeoutError(_)) => {
timeout_opt = timeout_opt
.map(|d| d.checked_mul(2).unwrap_or_else(|| Duration::from_secs(60)));
timeout *= 2;
e
}
Err(e @ OAError::RequestError(RequestError::RateLimited)) => {
Expand All @@ -852,16 +847,12 @@ where
}
}

/// `timeout_opt` controls whether the overall request will be cancelled after a certain amount
/// of time. This is useful for requests that have complex retry logic or need to complete
/// quickly for correctness reasons. If a timeout is not specified, a default per-request
/// timeout will be used. This helps avoid problems where the object store backend drops some
/// requests on the floor. This per-request timeout will be retried indefinitely, so
/// Err(TimeoutError) doesn't need to be handled gracefully unless `timeout_opt` is specified.
/// Run the future returned by `f()` with a timeout of `PER_REQUEST_TIMEOUT`. If the future times
/// out, retry it indefinitely with exponential backoff. If the future returns a retryable
/// OAError, also retry in that case.
#[async_backtrace::framed]
async fn retry<F, O, E>(
msg: &str,
timeout_opt: Option<Duration>,
rate_limit_errors: Option<&RelaxedCounter>,
f: impl Fn() -> F,
) -> Result<O, OAError<E>>
Expand All @@ -871,23 +862,7 @@ where
{
trace!("{}: begin", msg);
let begin = Instant::now();
// Because of the `xor` here, exactly one of timeout_opt and retry_timeout_opt will be None and
// the other will be Some.
let retry_timeout_opt = timeout_opt.xor(Some(*PER_REQUEST_TIMEOUT));
let result = match timeout_opt {
Some(timeout) => {
match tokio::time::timeout(
timeout,
retry_impl(msg, retry_timeout_opt, rate_limit_errors, f),
)
.await
{
Err(e) => Err(OAError::TimeoutError(e)),
Ok(res2) => res2,
}
}
None => retry_impl(msg, retry_timeout_opt, rate_limit_errors, f).await,
};
let result = retry_impl(msg, rate_limit_errors, f).await;
let elapsed = begin.elapsed();
trace!("{}: returned in {}ms", msg, elapsed.as_millis());

Expand Down
12 changes: 3 additions & 9 deletions cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use core::time::Duration;
use std::collections::HashMap;
use std::env::var as env_var;
use std::fmt::Display;
Expand Down Expand Up @@ -153,7 +152,7 @@ impl S3BucketAccess {
impl BucketAccessTrait for S3BucketAccess {
#[async_backtrace::framed]
async fn list_buckets(&self) -> Vec<String> {
let list_output = retry("list_buckets", None, None, || async {
let list_output = retry("list_buckets", None, || async {
Ok(self.client.list_buckets().await?)
})
.await;
Expand Down Expand Up @@ -433,7 +432,7 @@ impl ObjectAccessTrait for S3ObjectAccess {
let range_string = range
.as_ref()
.map(|r| format!("bytes={}-{}", r.start, r.end - 1));
let bytes = retry(&msg, None, Some(&self.rate_limit_errors), || async {
let bytes = retry(&msg, Some(&self.rate_limit_errors), || async {
let req = GetObjectRequest {
bucket: self.bucket.clone(),
key: key.clone(),
Expand Down Expand Up @@ -491,7 +490,6 @@ impl ObjectAccessTrait for S3ObjectAccess {
async fn stat_object(&self, key: String) -> Option<ObjectStat> {
let res = retry(
&format!("head {key}"),
None,
Some(&self.rate_limit_errors),
|| async {
let req = HeadObjectRequest {
Expand Down Expand Up @@ -521,14 +519,12 @@ impl ObjectAccessTrait for S3ObjectAccess {
key: String,
streamfunc: &(dyn Fn() -> (ByteStream, usize) + Send + Sync),
stat_type: ObjectAccessOpType,
timeout: Option<Duration>,
) -> Result<(), OAError<PutError>> {
let _permit = self.outstanding_ops[stat_type].acquire().await.unwrap();
let op = self.access_stats.begin(stat_type);

let result = retry(
&format!("put {key}"),
timeout,
Some(&self.rate_limit_errors),
|| async {
// We want to only call streamfunc() once in the common case,
Expand Down Expand Up @@ -567,7 +563,7 @@ impl ObjectAccessTrait for S3ObjectAccess {
.map(|key| async move {
let op = self.access_stats.begin(ObjectAccessOpType::ObjectDelete);
let msg = format!("delete object {key}");
retry(&msg, None, Some(&self.rate_limit_errors), || async {
retry(&msg, Some(&self.rate_limit_errors), || async {
let req = DeleteObjectRequest {
bucket: self.bucket.clone(),
key: key.clone(),
Expand Down Expand Up @@ -604,7 +600,6 @@ impl ObjectAccessTrait for S3ObjectAccess {
loop {
let output = retry(
&format!("list {prefix} (after {start_after:?})"),
None,
Some(&self.rate_limit_errors),
|| async {
let req = ListObjectsV2Request {
Expand Down Expand Up @@ -653,7 +648,6 @@ impl ObjectAccessTrait for S3ObjectAccess {
OAError<GetBucketLifecycleConfigurationError>,
> = retry(
&format!("has_retention {}", self.bucket),
None,
Some(&self.rate_limit_errors),
|| async {
let config = GetBucketLifecycleConfigurationRequest {
Expand Down
14 changes: 7 additions & 7 deletions cmd/zfs_object_agent/zettaobject/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,13 @@ impl PoolOwnerPhys {
async fn put_timeout(
&self,
object_access: &ObjectAccess,
timeout: Option<Duration>,
timeout: Duration,
) -> Result<(), OAError<PutError>> {
maybe_die_with(|| format!("before putting {self:#?}"));
debug!("putting {:#?}", self);
let buf = serde_json::to_vec(&self).unwrap();
object_access
.put_object_timed(
.put_object_timeout(
Self::key(self.id),
buf.into(),
ObjectAccessOpType::MetadataPut,
Expand Down Expand Up @@ -258,16 +258,16 @@ impl PoolPhys {
}

#[async_backtrace::framed]
pub async fn put_timed(
pub async fn put_timeout(
&self,
object_access: &ObjectAccess,
timeout: Option<Duration>,
timeout: Duration,
) -> Result<(), OAError<PutError>> {
maybe_die_with(|| format!("before putting {self:#?}"));
debug!("putting {:#?}", self);
let buf = serde_json::to_vec(&self).unwrap();
object_access
.put_object_timed(
.put_object_timeout(
Self::key(self.guid),
buf.into(),
ObjectAccessOpType::MetadataPut,
Expand Down Expand Up @@ -318,7 +318,7 @@ impl PoolPhys {
sentinel_creation: None,
};
// XXX make sure it doesn't already exist
phys.put_timed(object_access, Some(*CREATE_WAIT_DURATION))
phys.put_timeout(object_access, *CREATE_WAIT_DURATION)
.await?;
Ok(phys)
}
Expand Down Expand Up @@ -1736,7 +1736,7 @@ impl Pool {
};

let put_result = owner
.put_timeout(object_access, Some(*CLAIM_DURATION - duration))
.put_timeout(object_access, *CLAIM_DURATION - duration)
.await;

if let Err(OAError::TimeoutError(_)) = put_result {
Expand Down
4 changes: 2 additions & 2 deletions cmd/zfs_object_agent/zettaobject/src/test_connectivity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ async fn create_object_test(object_access: &ObjectAccess, key: String) -> Result
let content = "test connectivity to object storage".as_bytes().to_vec();

match object_access
.put_object_timed(
.put_object_timeout(
key.clone(),
content.into(),
ObjectAccessOpType::MetadataPut,
Some(Duration::from_secs(30)),
Duration::from_secs(30),
)
.await
{
Expand Down

0 comments on commit 8d19597

Please sign in to comment.