Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deps: Bump to OpenDAL v0.4 #4678

Merged
merged 6 commits into from
Apr 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 45 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/contexts/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ test = false
common-base = { path = "../base" }

async-trait = "0.1.52"
opendal = "0.3.0"
opendal = "0.4.2"
time = "0.3.7"
57 changes: 38 additions & 19 deletions common/contexts/src/dal/dal_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,26 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io::Result;
use std::sync::Arc;
use std::time::Instant;

use async_trait::async_trait;
use opendal::error::Result as DalResult;
use opendal::io_util::observe_read;
use opendal::io_util::observe_write;
use opendal::io_util::ReadEvent;
use opendal::io_util::WriteEvent;
use opendal::ops::OpDelete;
use opendal::ops::OpList;
use opendal::ops::OpRead;
use opendal::ops::OpStat;
use opendal::ops::OpWrite;
use opendal::readers::ObserveReader;
use opendal::readers::ReadEvent;
use opendal::Accessor;
use opendal::BoxedAsyncReader;
use opendal::BoxedObjectStream;
use opendal::BytesReader;
use opendal::BytesWriter;
use opendal::Layer;
use opendal::Metadata;
use opendal::ObjectStreamer;

use crate::DalMetrics;

Expand Down Expand Up @@ -61,12 +64,12 @@ impl Layer for DalContext {

#[async_trait]
impl Accessor for DalContext {
async fn read(&self, args: &OpRead) -> DalResult<BoxedAsyncReader> {
async fn read(&self, args: &OpRead) -> Result<BytesReader> {
let metric = self.metrics.clone();

self.inner.as_ref().unwrap().read(args).await.map(|reader| {
self.inner.as_ref().unwrap().read(args).await.map(|r| {
let mut last_pending = None;
let r = ObserveReader::new(reader, move |e| {
let r = observe_read(r, move |e| {
let start = match last_pending {
None => Instant::now(),
Some(t) => t,
Expand All @@ -83,29 +86,45 @@ impl Accessor for DalContext {
metric.inc_read_bytes_cost(start.elapsed().as_millis() as u64);
});

Box::new(r) as BoxedAsyncReader
Box::new(r) as BytesReader
})
}

async fn write(&self, r: BoxedAsyncReader, args: &OpWrite) -> DalResult<usize> {
let now = Instant::now();
self.inner.as_ref().unwrap().write(r, args).await.map(|n| {
self.metrics.inc_write_bytes(n);
self.metrics
.inc_write_bytes_cost(now.elapsed().as_millis() as u64);
n
async fn write(&self, args: &OpWrite) -> Result<BytesWriter> {
let metric = self.metrics.clone();

self.inner.as_ref().unwrap().write(args).await.map(|w| {
let mut last_pending = None;
let w = observe_write(w, move |e| {
let start = match last_pending {
None => Instant::now(),
Some(t) => t,
};
match e {
WriteEvent::Pending => last_pending = Some(start),
WriteEvent::Written(n) => {
last_pending = None;
metric.inc_write_bytes(n);
}
WriteEvent::Error(_) => last_pending = None,
_ => {}
}
metric.inc_write_bytes_cost(start.elapsed().as_millis() as u64);
});

Box::new(w) as BytesWriter
})
}

async fn stat(&self, args: &OpStat) -> DalResult<Metadata> {
async fn stat(&self, args: &OpStat) -> Result<Metadata> {
self.inner.as_ref().unwrap().stat(args).await
}

async fn delete(&self, args: &OpDelete) -> DalResult<()> {
async fn delete(&self, args: &OpDelete) -> Result<()> {
self.inner.as_ref().unwrap().delete(args).await
}

async fn list(&self, args: &OpList) -> DalResult<BoxedObjectStream> {
async fn list(&self, args: &OpList) -> Result<ObjectStreamer> {
self.inner.as_ref().unwrap().list(args).await
}
}
19 changes: 10 additions & 9 deletions common/contexts/src/dal/dal_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,22 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io::Result;
use std::sync::Arc;

use async_trait::async_trait;
use common_base::tokio::runtime::Handle;
use opendal::error::Result as DalResult;
use opendal::ops::OpDelete;
use opendal::ops::OpList;
use opendal::ops::OpRead;
use opendal::ops::OpStat;
use opendal::ops::OpWrite;
use opendal::Accessor;
use opendal::BoxedAsyncReader;
use opendal::BoxedObjectStream;
use opendal::BytesReader;
use opendal::BytesWriter;
use opendal::Layer;
use opendal::Metadata;
use opendal::ObjectStreamer;

/// # TODO
///
Expand Down Expand Up @@ -61,7 +62,7 @@ impl Layer for DalRuntime {

#[async_trait]
impl Accessor for DalRuntime {
async fn read(&self, args: &OpRead) -> DalResult<BoxedAsyncReader> {
async fn read(&self, args: &OpRead) -> Result<BytesReader> {
let op = self.inner.as_ref().unwrap().clone();
let args = args.clone();
self.runtime
Expand All @@ -70,16 +71,16 @@ impl Accessor for DalRuntime {
.expect("join must success")
}

async fn write(&self, r: BoxedAsyncReader, args: &OpWrite) -> DalResult<usize> {
async fn write(&self, args: &OpWrite) -> Result<BytesWriter> {
let op = self.inner.as_ref().unwrap().clone();
let args = args.clone();
self.runtime
.spawn(async move { op.write(r, &args).await })
.spawn(async move { op.write(&args).await })
.await
.expect("join must success")
}

async fn stat(&self, args: &OpStat) -> DalResult<Metadata> {
async fn stat(&self, args: &OpStat) -> Result<Metadata> {
let op = self.inner.as_ref().unwrap().clone();
let args = args.clone();
self.runtime
Expand All @@ -88,7 +89,7 @@ impl Accessor for DalRuntime {
.expect("join must success")
}

async fn delete(&self, args: &OpDelete) -> DalResult<()> {
async fn delete(&self, args: &OpDelete) -> Result<()> {
let op = self.inner.as_ref().unwrap().clone();
let args = args.clone();
self.runtime
Expand All @@ -97,7 +98,7 @@ impl Accessor for DalRuntime {
.expect("join must success")
}

async fn list(&self, args: &OpList) -> DalResult<BoxedObjectStream> {
async fn list(&self, args: &OpList) -> Result<ObjectStreamer> {
let op = self.inner.as_ref().unwrap().clone();
let args = args.clone();
self.runtime
Expand Down
4 changes: 2 additions & 2 deletions common/datavalues/benches/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ fn add_benchmark(c: &mut Criterion) {
let values = array.values();

c.bench_function("from_iter", |b| {
b.iter(|| criterion::black_box(from_iter(&values)))
b.iter(|| criterion::black_box(from_iter(values)))
});

c.bench_function("from_builder", |b| {
b.iter(|| criterion::black_box(from_builder(&values)))
b.iter(|| criterion::black_box(from_builder(values)))
});
}

Expand Down
1 change: 0 additions & 1 deletion common/exception/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ common-arrow = { path = "../arrow" }
anyhow = "1.0.55"
backtrace = "0.3.64"
octocrab = "0.15.4"
opendal = "0.3.0"
paste = "1.0.6"
prost = "0.9.0"
serde = { version = "1.0.136", features = ["derive"] }
Expand Down
10 changes: 3 additions & 7 deletions common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,9 @@ build_exceptions! {

// Storage errors [3001, 4000].
build_exceptions! {
UnknownStorageSchemeName(3001),
SecretKeyNotSet(3002),
DalTransportError(3003),
DalPathNotFound(3004),
SerdeError(3005),
DalError(3006),
DalStatError(3007),
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
StorageNotFound(3001),
StoragePermissionDenied(3002),
StorageOther(4000)
}

// Cache errors [4001, 5000].
Expand Down
17 changes: 9 additions & 8 deletions common/exception/src/exception_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,15 @@ impl From<sqlparser::parser::ParserError> for ErrorCode {

impl From<std::io::Error> for ErrorCode {
fn from(error: std::io::Error) -> Self {
ErrorCode::from_std_error(error)
use std::io::ErrorKind;

let msg = format!("{} ({})", error.kind(), &error);

match error.kind() {
ErrorKind::NotFound => ErrorCode::StorageNotFound(msg),
ErrorKind::PermissionDenied => ErrorCode::StoragePermissionDenied(msg),
_ => ErrorCode::StorageOther(msg),
}
}
}

Expand Down Expand Up @@ -228,10 +236,3 @@ impl From<ErrorCode> for tonic::Status {
}
}
}

// OpenDAL error.
impl From<opendal::error::Error> for ErrorCode {
fn from(error: opendal::error::Error) -> Self {
ErrorCode::DalError(format!("{:?}", error))
}
}
2 changes: 1 addition & 1 deletion common/io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ bincode = { git = "https://github.com/datafuse-extras/bincode", rev = "fd3f9ff"
# Crates.io dependencies
bytes = "1.1.0"
futures = "0.3.21"
opendal = "0.3.0"
opendal = "0.4.2"
serde = { version = "1.0.136", features = ["derive"] }
time = "0.3.7"

Expand Down
Loading