From 096a1c439331be7ff8580c08ad148f256f8234e6 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Wed, 24 Aug 2022 15:13:42 +0800 Subject: [PATCH] feat: remove write s3 directly --- common/configs/src/lib.rs | 1 - common/configs/src/storage.rs | 21 --------------------- common/storages/src/lib.rs | 2 +- common/storages/src/storage.rs | 29 ++++++++++++++--------------- ethetl/src/contexts/context.rs | 4 ++-- 5 files changed, 17 insertions(+), 40 deletions(-) diff --git a/common/configs/src/lib.rs b/common/configs/src/lib.rs index cc56f9e..814f651 100644 --- a/common/configs/src/lib.rs +++ b/common/configs/src/lib.rs @@ -22,6 +22,5 @@ pub use eth::EthConfig; pub use eth::ExportConfig; pub use log::LogConfig; pub use storage::AzblobStorageConfig; -pub use storage::FsStorageConfig; pub use storage::S3StorageConfig; pub use storage::StorageConfig; diff --git a/common/configs/src/storage.rs b/common/configs/src/storage.rs index dc1030d..0a2fc0c 100644 --- a/common/configs/src/storage.rs +++ b/common/configs/src/storage.rs @@ -26,10 +26,6 @@ pub struct StorageConfig { #[serde(rename = "type", alias = "storage_type")] pub storage_type: String, - // Fs storage backend config. - #[clap(flatten)] - pub fs: FsStorageConfig, - // S3 storage backend config. #[clap(flatten)] pub s3: S3StorageConfig, @@ -43,29 +39,12 @@ impl Default for StorageConfig { fn default() -> Self { StorageConfig { storage_type: "fs".to_string(), - fs: Default::default(), s3: Default::default(), azblob: Default::default(), } } } -#[derive(Parser, Debug, Clone, Serialize, Deserialize)] -#[serde(default)] -pub struct FsStorageConfig { - /// fs storage backend data path - #[clap(long = "storage-fs-data-path", default_value = "_datas")] - pub data_path: String, -} - -impl Default for FsStorageConfig { - fn default() -> Self { - FsStorageConfig { - data_path: "_datas".to_string(), - } - } -} - #[derive(Parser, Clone, Serialize, Deserialize)] #[serde(default)] pub struct S3StorageConfig { diff --git a/common/storages/src/lib.rs b/common/storages/src/lib.rs index 03c5c38..1c7affe 100644 --- a/common/storages/src/lib.rs +++ b/common/storages/src/lib.rs @@ -21,5 +21,5 @@ mod txt; pub use csv::write_csv; pub use parquet::write_parquet; -pub use storage::init_storage; +pub use storage::*; pub use txt::write_txt; diff --git a/common/storages/src/storage.rs b/common/storages/src/storage.rs index 5500c29..2c3b8cf 100644 --- a/common/storages/src/storage.rs +++ b/common/storages/src/storage.rs @@ -17,7 +17,6 @@ use std::env; use backon::ExponentialBackoff; use common_configs::AzblobStorageConfig; use common_configs::EthConfig; -use common_configs::FsStorageConfig; use common_configs::S3StorageConfig; use common_exceptions::ErrorCode; use common_exceptions::Result; @@ -26,23 +25,11 @@ use opendal::services::fs; use opendal::services::s3; use opendal::Operator; -pub async fn init_storage(conf: &EthConfig) -> Result { - match conf.storage.storage_type.as_str() { - "fs" => init_fs_operator(&conf.storage.fs).await, - "s3" => init_s3_operator(&conf.storage.s3).await, - "azure" => init_azblob_operator(&conf.storage.azblob).await, - typ => Err(ErrorCode::Invalid(format!( - "Unsupported storage type:{}", - typ - ))), - } -} - /// init_fs_operator will init a opendal fs operator. -pub async fn init_fs_operator(cfg: &FsStorageConfig) -> Result { +pub async fn init_fs_storage(data_path: &str) -> Result { let mut builder = fs::Backend::build(); - let mut path = cfg.data_path.clone(); + let mut path = data_path.to_string(); if !path.starts_with('/') { path = env::current_dir().unwrap().join(path).display().to_string(); } @@ -51,6 +38,18 @@ pub async fn init_fs_operator(cfg: &FsStorageConfig) -> Result { Ok(Operator::new(builder.finish().await?).with_backoff(ExponentialBackoff::default())) } +/// init object storage +pub async fn init_object_storage(conf: &EthConfig) -> Result { + match conf.storage.storage_type.as_str() { + "s3" => init_s3_operator(&conf.storage.s3).await, + "azure" => init_azblob_operator(&conf.storage.azblob).await, + typ => Err(ErrorCode::Invalid(format!( + "Unsupported storage type:{}", + typ + ))), + } +} + /// init_s3_operator will init a opendal s3 operator with input s3 config. pub async fn init_s3_operator(cfg: &S3StorageConfig) -> Result { let mut builder = s3::Backend::build(); diff --git a/ethetl/src/contexts/context.rs b/ethetl/src/contexts/context.rs index 64e0573..4f1d57e 100644 --- a/ethetl/src/contexts/context.rs +++ b/ethetl/src/contexts/context.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use common_configs::EthConfig; -use common_storages::init_storage; +use common_storages::init_fs_storage; use opendal::Operator; use crate::contexts::Progress; @@ -36,7 +36,7 @@ pub type ContextRef = Arc; impl Context { pub async fn create(conf: &EthConfig) -> Arc { let all = conf.export.end_block - conf.export.start_block + 1; - let storage = Arc::new(init_storage(conf).await.unwrap()); + let storage = Arc::new(init_fs_storage(&conf.export.output_dir).await.unwrap()); Arc::new(Context { progress: Progress::create(all),