Skip to content

Commit

Permalink
feat: remove write s3 directly
Browse files Browse the repository at this point in the history
  • Loading branch information
BohuTANG committed Aug 24, 2022
1 parent 57e2bb5 commit 096a1c4
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 40 deletions.
1 change: 0 additions & 1 deletion common/configs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,5 @@ pub use eth::EthConfig;
pub use eth::ExportConfig;
pub use log::LogConfig;
pub use storage::AzblobStorageConfig;
pub use storage::FsStorageConfig;
pub use storage::S3StorageConfig;
pub use storage::StorageConfig;
21 changes: 0 additions & 21 deletions common/configs/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ pub struct StorageConfig {
#[serde(rename = "type", alias = "storage_type")]
pub storage_type: String,

// Fs storage backend config.
#[clap(flatten)]
pub fs: FsStorageConfig,

// S3 storage backend config.
#[clap(flatten)]
pub s3: S3StorageConfig,
Expand All @@ -43,29 +39,12 @@ impl Default for StorageConfig {
fn default() -> Self {
StorageConfig {
storage_type: "fs".to_string(),
fs: Default::default(),
s3: Default::default(),
azblob: Default::default(),
}
}
}

#[derive(Parser, Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct FsStorageConfig {
/// fs storage backend data path
#[clap(long = "storage-fs-data-path", default_value = "_datas")]
pub data_path: String,
}

impl Default for FsStorageConfig {
fn default() -> Self {
FsStorageConfig {
data_path: "_datas".to_string(),
}
}
}

#[derive(Parser, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct S3StorageConfig {
Expand Down
2 changes: 1 addition & 1 deletion common/storages/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ mod txt;

pub use csv::write_csv;
pub use parquet::write_parquet;
pub use storage::init_storage;
pub use storage::*;
pub use txt::write_txt;
29 changes: 14 additions & 15 deletions common/storages/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::env;
use backon::ExponentialBackoff;
use common_configs::AzblobStorageConfig;
use common_configs::EthConfig;
use common_configs::FsStorageConfig;
use common_configs::S3StorageConfig;
use common_exceptions::ErrorCode;
use common_exceptions::Result;
Expand All @@ -26,23 +25,11 @@ use opendal::services::fs;
use opendal::services::s3;
use opendal::Operator;

pub async fn init_storage(conf: &EthConfig) -> Result<Operator> {
match conf.storage.storage_type.as_str() {
"fs" => init_fs_operator(&conf.storage.fs).await,
"s3" => init_s3_operator(&conf.storage.s3).await,
"azure" => init_azblob_operator(&conf.storage.azblob).await,
typ => Err(ErrorCode::Invalid(format!(
"Unsupported storage type:{}",
typ
))),
}
}

/// init_fs_operator will init a opendal fs operator.
pub async fn init_fs_operator(cfg: &FsStorageConfig) -> Result<Operator> {
pub async fn init_fs_storage(data_path: &str) -> Result<Operator> {
let mut builder = fs::Backend::build();

let mut path = cfg.data_path.clone();
let mut path = data_path.to_string();
if !path.starts_with('/') {
path = env::current_dir().unwrap().join(path).display().to_string();
}
Expand All @@ -51,6 +38,18 @@ pub async fn init_fs_operator(cfg: &FsStorageConfig) -> Result<Operator> {
Ok(Operator::new(builder.finish().await?).with_backoff(ExponentialBackoff::default()))
}

/// init object storage
pub async fn init_object_storage(conf: &EthConfig) -> Result<Operator> {
match conf.storage.storage_type.as_str() {
"s3" => init_s3_operator(&conf.storage.s3).await,
"azure" => init_azblob_operator(&conf.storage.azblob).await,
typ => Err(ErrorCode::Invalid(format!(
"Unsupported storage type:{}",
typ
))),
}
}

/// init_s3_operator will init a opendal s3 operator with input s3 config.
pub async fn init_s3_operator(cfg: &S3StorageConfig) -> Result<Operator> {
let mut builder = s3::Backend::build();
Expand Down
4 changes: 2 additions & 2 deletions ethetl/src/contexts/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::sync::Arc;

use common_configs::EthConfig;
use common_storages::init_storage;
use common_storages::init_fs_storage;
use opendal::Operator;

use crate::contexts::Progress;
Expand All @@ -36,7 +36,7 @@ pub type ContextRef = Arc<Context>;
impl Context {
pub async fn create(conf: &EthConfig) -> Arc<Context> {
let all = conf.export.end_block - conf.export.start_block + 1;
let storage = Arc::new(init_storage(conf).await.unwrap());
let storage = Arc::new(init_fs_storage(&conf.export.output_dir).await.unwrap());

Arc::new(Context {
progress: Progress::create(all),
Expand Down

0 comments on commit 096a1c4

Please sign in to comment.