Skip to content

Commit

Permalink
feat: Read error response for better debugging (#170)
Browse files Browse the repository at this point in the history
* feat: Read error response for better debugging

Signed-off-by: Xuanwo <[email protected]>

* Format code

Signed-off-by: Xuanwo <[email protected]>

* Simplify logic

Signed-off-by: Xuanwo <[email protected]>

* Refactor error handle logic

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Mar 21, 2022
1 parent a7728c9 commit bd45177
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 74 deletions.
137 changes: 66 additions & 71 deletions src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::min;
use std::collections::HashMap;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::task::Context;
use std::task::Poll;

use anyhow::anyhow;
use async_trait::async_trait;
use bytes::BufMut;
use futures::TryStreamExt;
use http::header::HeaderName;
use http::HeaderValue;
use http::Response;
use http::StatusCode;
use hyper::body::HttpBody;
use hyper::Body;
use log::debug;
use log::error;
use log::info;
Expand Down Expand Up @@ -156,6 +162,11 @@ impl Builder {
source: anyhow::Error::new(e),
})?;

debug!(
"auto detect region got response: status {:?}, header: {:?}",
res.status(),
res.headers()
);
match res.status() {
// The endpoint works, return with not changed endpoint and
// default region.
Expand Down Expand Up @@ -380,32 +391,15 @@ impl Accessor for Backend {
let resp = self.get_object(&p, args.offset, args.size).await?;

match resp.status() {
http::StatusCode::OK | http::StatusCode::PARTIAL_CONTENT => {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
info!(
"object {} reader created: offset {:?}, size {:?}",
&p, args.offset, args.size
);

Ok(Box::new(ByteStream(resp).into_async_read()))
}
http::StatusCode::NOT_FOUND => Err(Error::Object {
kind: Kind::ObjectNotExist,
op: "read",
path: p.clone(),
source: anyhow!("object not found: {:?}", resp),
}),
http::StatusCode::FORBIDDEN => Err(Error::Object {
kind: Kind::ObjectPermissionDenied,
op: "read",
path: p.clone(),
source: anyhow!("object permission denied: {:?}", resp),
}),
_ => Err(Error::Object {
kind: Kind::Unexpected,
op: "read",
path: p.clone(),
source: anyhow!("object unexpected: {:?}", resp),
}),
_ => Err(parse_error_response(resp, "read", &p).await),
}
}

Expand All @@ -415,22 +409,11 @@ impl Accessor for Backend {

let resp = self.put_object(&p, r, args.size).await?;
match resp.status() {
http::StatusCode::CREATED | http::StatusCode::OK => {
StatusCode::CREATED | StatusCode::OK => {
info!("object {} write finished: size {:?}", &p, args.size);
Ok(args.size as usize)
}
http::StatusCode::FORBIDDEN => Err(Error::Object {
kind: Kind::ObjectPermissionDenied,
op: "write",
path: p.clone(),
source: anyhow!("object permission denied: {:?}", resp),
}),
_ => Err(Error::Object {
kind: Kind::Unexpected,
op: "write",
path: p.to_string(),
source: anyhow!("object unexpected: {:?}", resp),
}),
_ => Err(parse_error_response(resp, "write", &p).await),
}
}

Expand All @@ -455,7 +438,7 @@ impl Accessor for Backend {
let resp = self.head_object(&p).await?;

match resp.status() {
http::StatusCode::OK => {
StatusCode::OK => {
let mut m = Metadata::default();
m.set_path(&args.path);

Expand Down Expand Up @@ -493,32 +476,17 @@ impl Accessor for Backend {
info!("object {} stat finished: {:?}", &p, m);
Ok(m)
}
http::StatusCode::NOT_FOUND => {
// Always returns empty dir object if path is endswith "/"
if p.ends_with('/') {
let mut m = Metadata::default();
m.set_path(&args.path);
m.set_content_length(0);
m.set_mode(ObjectMode::DIR);
m.set_complete();
StatusCode::NOT_FOUND if p.ends_with('/') => {
let mut m = Metadata::default();
m.set_path(&args.path);
m.set_content_length(0);
m.set_mode(ObjectMode::DIR);
m.set_complete();

info!("object {} stat finished", &p);
Ok(m)
} else {
Err(Error::Object {
kind: Kind::ObjectNotExist,
op: "stat",
path: p.to_string(),
source: anyhow!("object not exist: {:?}", resp),
})
}
info!("object {} stat finished", &p);
Ok(m)
}
_ => Err(Error::Object {
kind: Kind::Unexpected,
op: "stat",
path: p.to_string(),
source: anyhow!("object unexpected: {:?}", resp),
}),
_ => Err(parse_error_response(resp, "stat", &p).await),
}
}

Expand All @@ -531,22 +499,11 @@ impl Accessor for Backend {
let resp = self.delete_object(&p).await?;

match resp.status() {
http::StatusCode::NO_CONTENT => {
StatusCode::NO_CONTENT => {
info!("object {} delete finished", &p);
Ok(())
}
http::StatusCode::FORBIDDEN => Err(Error::Object {
kind: Kind::ObjectPermissionDenied,
op: "delete",
path: p.to_string(),
source: anyhow!("object permission denied: {:?}", resp),
}),
_ => Err(Error::Object {
kind: Kind::Unexpected,
op: "delete",
path: p.to_string(),
source: anyhow!("object unexpected: {:?}", resp),
}),
_ => Err(parse_error_response(resp, "delete", &p).await),
}
}

Expand Down Expand Up @@ -704,3 +661,41 @@ impl futures::Stream for ByteStream {
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))
}
}

// Read and decode whole error response.
async fn parse_error_response(resp: Response<Body>, op: &'static str, path: &str) -> Error {
let (part, mut body) = resp.into_parts();
let kind = match part.status {
StatusCode::NOT_FOUND => Kind::ObjectNotExist,
StatusCode::FORBIDDEN => Kind::ObjectPermissionDenied,
_ => Kind::Unexpected,
};

// Only read 4KiB from the response to avoid broken services.
let mut bs = Vec::new();
let mut limit = 4 * 1024;

while let Some(b) = body.data().await {
match b {
Ok(b) => {
bs.put_slice(&b[..min(b.len(), limit)]);
limit -= b.len();
if limit == 0 {
break;
}
}
Err(e) => return Error::Unexpected(anyhow!("parse error response parse: {:?}", e)),
}
}

Error::Object {
kind,
op,
path: path.to_string(),
source: anyhow!(
"response part: {:?}, body: {:?}",
part,
String::from_utf8_lossy(&bs)
),
}
}
7 changes: 5 additions & 2 deletions src/services/s3/object_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use std::task::Context;
use std::task::Poll;

use anyhow::anyhow;
use bytes::{Buf, BufMut};
use bytes::Buf;
use bytes::BufMut;
use futures::future::BoxFuture;
use futures::ready;
use futures::StreamExt;
Expand All @@ -28,7 +29,9 @@ use quick_xml::de;
use serde::Deserialize;

use super::Backend;
use crate::error::{Error, Kind, Result};
use crate::error::Error;
use crate::error::Kind;
use crate::error::Result;
use crate::Object;
use crate::ObjectMode;

Expand Down
1 change: 0 additions & 1 deletion tests/behavior/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use anyhow::Result;
use futures::AsyncReadExt;
use futures::AsyncSeekExt;
use futures::StreamExt;

use opendal::error::Kind;
use opendal::ObjectMode;
use opendal::Operator;
Expand Down

0 comments on commit bd45177

Please sign in to comment.