Skip to content

Commit

Permalink
clean up ObjectAccessTrait (openzfs#868)
Browse files Browse the repository at this point in the history
Move the logic for `access_stats` and `outstanding_ops` from
ObjectAccessTrait implementors (s3, blob, oci) to the more general
ObjectAccess layer.  This reduces code duplication and ensures
uniformity of semantics on different clouds.

Change `ObjectAccess::inner` to `Box<dyn ObjectAccessTrait>`.

No behavior change, code cleanup only
  • Loading branch information
ahrens authored May 24, 2023
1 parent 708f379 commit 89a7c7f
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 161 deletions.
41 changes: 9 additions & 32 deletions cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::collections::HashMap;
use std::env;
use std::fs;
use std::ops::Range;
Expand Down Expand Up @@ -35,7 +34,6 @@ use bytes::Bytes;
use bytes::BytesMut;
use chrono::offset::TimeZone;
use chrono::Utc;
use enum_map::EnumMap;
use futures::stream;
use futures::Stream;
use futures::StreamExt;
Expand All @@ -57,8 +55,6 @@ use super::RequestError;
use super::OBJECT_DELETION_BATCH_SIZE;
use crate::access_stats::ObjectAccessOpType;
use crate::access_stats::ObjectAccessStats;
use crate::access_stats::OutstandingOps;
use crate::access_stats::StatMapValue;
use crate::object_access::OAError;
use crate::object_access::ObjectAccessTrait;
use crate::object_access::ObjectStat;
Expand Down Expand Up @@ -172,8 +168,6 @@ pub struct BlobObjectAccess {
container_client: ContainerClient,
endpoint: Option<String>,
credentials: BlobCredentials,
access_stats: ObjectAccessStats,
outstanding_ops: EnumMap<ObjectAccessOpType, OutstandingOps>,
rate_limit_errors: Arc<RelaxedCounter>,
}

Expand All @@ -195,8 +189,6 @@ impl BlobObjectAccess {

Ok(Self {
container_client,
access_stats: Default::default(),
outstanding_ops: Default::default(),
endpoint,
credentials,
rate_limit_errors,
Expand Down Expand Up @@ -249,15 +241,7 @@ impl ObjectAccessTrait for BlobObjectAccess {
}

#[async_backtrace::framed]
async fn get_object(
&self,
key: String,
stat_type: ObjectAccessOpType,
range: Option<Range<usize>>,
) -> Result<Bytes> {
let _permit = self.outstanding_ops[stat_type].acquire().await.unwrap();
let op = self.access_stats.begin(stat_type);

async fn get_object(&self, key: String, range: Option<Range<usize>>) -> Result<Bytes> {
let msg = format!("get {key}");
let blob_client = self.container_client.blob_client(key);
let vec = loop {
Expand Down Expand Up @@ -307,7 +291,6 @@ impl ObjectAccessTrait for BlobObjectAccess {
break result.context(msg)?;
};

op.end(vec.len() as u64);
Ok(vec.into())
}

Expand All @@ -333,10 +316,7 @@ impl ObjectAccessTrait for BlobObjectAccess {
&self,
key: String,
streamfunc: &(dyn Fn() -> (ByteStream, usize) + Send + Sync),
stat_type: ObjectAccessOpType,
) -> Result<(), OAError> {
let _permit = self.outstanding_ops[stat_type].acquire().await.unwrap();
let op = self.access_stats.begin(stat_type);
) -> Result<usize, OAError> {
let msg = format!("put {key}");
let blob_client = self.container_client.blob_client(key);

Expand All @@ -352,22 +332,24 @@ impl ObjectAccessTrait for BlobObjectAccess {
blob_client.put_block_blob(body.clone()).await
})
.await
.map(|_| {
op.end(len as u64);
})
.map(|_| len)
.map_err(|e| {
debug!("{msg}: {e:?}");
e.into()
})
}

#[async_backtrace::framed]
async fn delete_objects(&self, stream: &mut (dyn Stream<Item = String> + Send + Unpin)) {
async fn delete_objects(
&self,
access_stats: &ObjectAccessStats,
stream: &mut (dyn Stream<Item = String> + Send + Unpin),
) {
// azure-sdk-for-rust does not yet support batched deletion of objects. So we issue
// object deletion requests in parallel.
stream
.map(|key| async move {
let op = self.access_stats.begin(ObjectAccessOpType::ObjectDelete);
let op = access_stats.begin(ObjectAccessOpType::ObjectDelete);
let msg = format!("delete {key}");
let blob_client = self.container_client.blob_client(key);
let begin = Instant::now();
Expand Down Expand Up @@ -401,11 +383,6 @@ impl ObjectAccessTrait for BlobObjectAccess {
.await;
}

#[async_backtrace::framed]
fn collect_stats(&self) -> HashMap<String, StatMapValue> {
self.access_stats.collect_stats()
}

fn supports_list_after(&self) -> bool {
false
}
Expand Down
Loading

0 comments on commit 89a7c7f

Please sign in to comment.