Skip to content

Commit

Permalink
Merge pull request #75 from BohuTANG/export-directory
Browse files Browse the repository at this point in the history
Move to path/blocks_start_end
  • Loading branch information
BohuTANG authored Aug 19, 2022
2 parents ab854da + cd98b26 commit 57e2bb5
Show file tree
Hide file tree
Showing 24 changed files with 129 additions and 139 deletions.
7 changes: 0 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,20 +126,13 @@ $ make build
[2022-08-15T08:25:47Z INFO ] Config: EthConfig { log: LogConfig { level: "INFO", dir: "_logs" }, export: ExportConfig { provider_uri: "https://mainnet.infura.io/v3/6e83aaa316ef4a8c947b949364f81619", start_block: 15340159, end_block: 15340160, batch_size: 1000, max_worker: 16, web3_batch_size: 100, output_dir: "/tmp/eth", output_format: "csv" }, storage: StorageConfig { storage_type: "fs", fs: FsStorageConfig { data_path: "_datas" }, s3: S3StorageConfig { endpoint_url: "https://s3.amazonaws.com", region: "", bucket: "", root: "", access_key_id: "", secret_access_key: "" }, azblob: AzureStorageBlobConfig { endpoint_url: "", container: "", root: "", account_name: "", account_key: "" } }, config_file: "" }
[2022-08-15T08:25:47Z INFO ] backend build started: Builder { root: Some("/home/bohu/github/deepeth/mars/_datas") }
[2022-08-15T08:25:47Z INFO ] backend build finished: Builder { root: Some("/home/bohu/github/deepeth/mars/_datas") }
[2022-08-15T08:25:52Z INFO ] Write blocks to /tmp/eth/15340159_15340160/blocks.csv
[2022-08-15T08:25:52Z INFO ] Write transactions to /tmp/eth/15340159_15340160/transactions.csv
[2022-08-15T08:25:52Z INFO ] Write /tmp/eth/15340159_15340160/_transaction_hashes.txt
[2022-08-15T08:25:53Z INFO ] 2 blocks processed, 703 transactions processed, 0 receipts processed, 0 logs processed, 0 token_transfers processed, 0 ens processed. Progress is 100%
[2022-08-15T08:25:55Z INFO ] 2 blocks processed, 703 transactions processed, 0 receipts processed, 0 logs processed, 0 token_transfers processed, 0 ens processed. Progress is 100%
[2022-08-15T08:25:57Z INFO ] 2 blocks processed, 703 transactions processed, 100 receipts processed, 0 logs processed, 0 token_transfers processed, 0 ens processed. Progress is 100%
[2022-08-15T08:25:59Z INFO ] 2 blocks processed, 703 transactions processed, 200 receipts processed, 0 logs processed, 0 token_transfers processed, 0 ens processed. Progress is 100%
[2022-08-15T08:26:01Z INFO ] 2 blocks processed, 703 transactions processed, 300 receipts processed, 0 logs processed, 0 token_transfers processed, 0 ens processed. Progress is 100%
[2022-08-15T08:26:03Z INFO ] 2 blocks processed, 703 transactions processed, 500 receipts processed, 0 logs processed, 0 token_transfers processed, 0 ens processed. Progress is 100%
[2022-08-15T08:26:05Z INFO ] 2 blocks processed, 703 transactions processed, 600 receipts processed, 0 logs processed, 0 token_transfers processed, 0 ens processed. Progress is 100%
[2022-08-15T08:26:06Z INFO ] Write receipts to /tmp/eth/15340159_15340160/receipts.csv
[2022-08-15T08:26:06Z INFO ] Write logs to /tmp/eth/15340159_15340160/logs.csv
[2022-08-15T08:26:07Z INFO ] Write token_transfer to /tmp/eth/15340159_15340160/token_transfers.csv
[2022-08-15T08:26:07Z INFO ] Write ens to /tmp/eth/15340159_15340160/ens.csv
[2022-08-15T08:26:07Z INFO ] 2 blocks processed, 703 transactions processed, 703 receipts processed, 1542 logs processed, 777 token_transfers processed, 3 ens processed. Progress is 100%
... ...
```
Expand Down
45 changes: 19 additions & 26 deletions ethetl/src/exporters/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::io::BufRead;
use std::io::BufReader;
use std::io::Cursor;
use std::io::Write;
use std::str::FromStr;

use arrow2::array::UInt64Array;
Expand Down Expand Up @@ -45,15 +44,22 @@ use crate::exporters::TransactionExporter;

pub struct BlockExporter {
ctx: ContextRef,
dir: String,
output_dir: String,
range_path: String,
numbers: Vec<usize>,
}

impl BlockExporter {
pub fn create(ctx: &ContextRef, dir: &str, numbers: Vec<usize>) -> BlockExporter {
pub fn create(
ctx: &ContextRef,
output_dir: &str,
range_path: &str,
numbers: Vec<usize>,
) -> BlockExporter {
Self {
ctx: ctx.clone(),
dir: dir.to_string(),
output_dir: output_dir.to_string(),
range_path: range_path.to_string(),
numbers,
}
}
Expand All @@ -63,13 +69,11 @@ impl BlockExporter {
fetcher.push_batch(self.numbers.to_vec())?;
let blocks = fetcher.fetch().await?;

self.write_begin_file().await?;
{
self.export_blocks(&blocks).await?;
self.export_txs(&blocks).await?;
self.export_tx_receipts().await?;
}
self.write_commit_file().await?;

Ok(())
}
Expand Down Expand Up @@ -218,25 +222,30 @@ impl BlockExporter {
base_fee_per_gas_array.boxed(),
])?;

let block_path = format!("{}/blocks", self.dir);
let block_path = format!("{}/blocks/blocks_{}", self.output_dir, self.range_path);
write_file(&self.ctx, &block_path, schema, columns, "blocks").await
}

pub async fn export_txs(&self, blocks: &[Block<Transaction>]) -> Result<()> {
let exporter = TransactionExporter::create(&self.ctx, &self.dir, blocks);
let exporter =
TransactionExporter::create(&self.ctx, &self.output_dir, &self.range_path, blocks);
exporter.export().await
}

pub async fn export_tx_receipts(&self) -> Result<()> {
let tx_hashes = self.read_tx_hash_file().await?;
let exporter = ReceiptExporter::create(&self.ctx, &self.dir, tx_hashes);
let exporter =
ReceiptExporter::create(&self.ctx, &self.output_dir, &self.range_path, tx_hashes);
exporter.export().await?;
Ok(())
}

pub async fn read_tx_hash_file(&self) -> Result<Vec<H256>> {
let mut tx_hashes = vec![];
let path = format!("{}/_transaction_hashes.txt", self.dir);
let path = format!(
"{}/transactions/_transactions_hash_{}.txt",
self.output_dir, self.range_path
);

let meta = self.ctx.get_storage().object(&path).metadata().await?;
if meta.content_length() > 0 {
Expand All @@ -251,20 +260,4 @@ impl BlockExporter {
}
Ok(tx_hashes)
}

pub async fn write_begin_file(&self) -> Result<()> {
let path = format!("{}/_begin.txt", self.dir);
let mut cursor = Cursor::new(Vec::new());
writeln!(cursor, "begin")?;
cursor.flush()?;
common_storages::write_txt(self.ctx.get_storage(), &path, cursor.get_ref().as_slice()).await
}

pub async fn write_commit_file(&self) -> Result<()> {
let path = format!("{}/_commit.txt", self.dir);
let mut cursor = Cursor::new(Vec::new());
writeln!(cursor, "commit")?;
cursor.flush()?;
common_storages::write_txt(self.ctx.get_storage(), &path, cursor.get_ref().as_slice()).await
}
}
15 changes: 11 additions & 4 deletions ethetl/src/exporters/ens.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,22 @@ struct Ens {

pub struct EnsExporter {
ctx: ContextRef,
dir: String,
output_dir: String,
range_path: String,
receipts: Vec<TransactionReceipt>,
}

impl EnsExporter {
pub fn create(ctx: &ContextRef, dir: &str, receipts: &[TransactionReceipt]) -> Self {
pub fn create(
ctx: &ContextRef,
dir: &str,
range_path: &str,
receipts: &[TransactionReceipt],
) -> Self {
Self {
ctx: ctx.clone(),
dir: dir.to_string(),
output_dir: dir.to_string(),
range_path: range_path.to_string(),
receipts: receipts.to_vec(),
}
}
Expand Down Expand Up @@ -135,7 +142,7 @@ impl EnsExporter {
block_number_array.boxed(),
])?;

let path = format!("{}/ens", self.dir);
let path = format!("{}/ens/ens_{}", self.output_dir, self.range_path);
write_file(&self.ctx, &path, schema, columns, "ens").await
}
}
15 changes: 11 additions & 4 deletions ethetl/src/exporters/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,22 @@ use crate::exporters::write_file;

pub struct LogsExporter {
ctx: ContextRef,
dir: String,
output_dir: String,
range_path: String,
receipts: Vec<TransactionReceipt>,
}

impl LogsExporter {
pub fn create(ctx: &ContextRef, dir: &str, receipts: &[TransactionReceipt]) -> LogsExporter {
pub fn create(
ctx: &ContextRef,
dir: &str,
range_path: &str,
receipts: &[TransactionReceipt],
) -> LogsExporter {
Self {
ctx: ctx.clone(),
dir: dir.to_string(),
output_dir: dir.to_string(),
range_path: range_path.to_string(),
receipts: receipts.to_vec(),
}
}
Expand Down Expand Up @@ -130,7 +137,7 @@ impl LogsExporter {
topics_array.boxed(),
])?;

let path = format!("{}/logs", self.dir);
let path = format!("{}/logs/logs_{}", self.output_dir, self.range_path);
write_file(&self.ctx, &path, schema, columns, "logs").await
}
}
15 changes: 11 additions & 4 deletions ethetl/src/exporters/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,27 @@ use crate::exporters::BlockExporter;
pub struct Pipeline {
ctx: ContextRef,
block_numbers: Vec<usize>,
dir: String,
output_dir: String,
range_path: String,
}

impl Pipeline {
pub fn create(ctx: &ContextRef, dir: &str, block_numbers: Vec<usize>) -> Self {
pub fn create(ctx: &ContextRef, range_path: &str, block_numbers: Vec<usize>) -> Self {
Self {
ctx: ctx.clone(),
dir: dir.to_string(),
output_dir: ctx.get_output_dir().to_string(),
range_path: range_path.to_string(),
block_numbers,
}
}

pub async fn execute(&self) -> Result<()> {
let export = BlockExporter::create(&self.ctx, &self.dir, self.block_numbers.to_vec());
let export = BlockExporter::create(
&self.ctx,
&self.output_dir,
&self.range_path,
self.block_numbers.to_vec(),
);
let res = export.export().await;
res
}
Expand Down
24 changes: 17 additions & 7 deletions ethetl/src/exporters/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,22 @@ use crate::exporters::TokenTransferExporter;

pub struct ReceiptExporter {
ctx: ContextRef,
dir: String,
output_dir: String,
range_path: String,
hashes: Vec<H256>,
}

impl ReceiptExporter {
pub fn create(ctx: &ContextRef, dir: &str, hashes: Vec<H256>) -> ReceiptExporter {
pub fn create(
ctx: &ContextRef,
dir: &str,
range_path: &str,
hashes: Vec<H256>,
) -> ReceiptExporter {
Self {
ctx: ctx.clone(),
dir: dir.to_string(),
output_dir: dir.to_string(),
range_path: range_path.to_string(),
hashes,
}
}
Expand All @@ -55,15 +62,18 @@ impl ReceiptExporter {
self.export_receipts(&receipts).await?;

// Logs.
let logs_export = LogsExporter::create(&self.ctx, &self.dir, &receipts);
let logs_export =
LogsExporter::create(&self.ctx, &self.output_dir, &self.range_path, &receipts);
logs_export.export().await?;

// Token transfers.
let token_transfer_export = TokenTransferExporter::create(&self.ctx, &self.dir, &receipts);
let token_transfer_export =
TokenTransferExporter::create(&self.ctx, &self.output_dir, &self.range_path, &receipts);
token_transfer_export.export().await?;

// Ens.
let ens_export = EnsExporter::create(&self.ctx, &self.dir, &receipts);
let ens_export =
EnsExporter::create(&self.ctx, &self.output_dir, &self.range_path, &receipts);
ens_export.export().await
}

Expand Down Expand Up @@ -171,7 +181,7 @@ impl ReceiptExporter {
effective_gas_price_array.boxed(),
])?;

let path = format!("{}/receipts", self.dir);
let path = format!("{}/receipts/receipts_{}", self.output_dir, self.range_path);
write_file(&self.ctx, &path, schema, columns, "receipts").await
}
}
18 changes: 14 additions & 4 deletions ethetl/src/exporters/token_transfers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,22 @@ struct Transfer {

pub struct TokenTransferExporter {
ctx: ContextRef,
dir: String,
output_dir: String,
range_path: String,
receipts: Vec<TransactionReceipt>,
}

impl TokenTransferExporter {
pub fn create(ctx: &ContextRef, dir: &str, receipts: &[TransactionReceipt]) -> Self {
pub fn create(
ctx: &ContextRef,
dir: &str,
range_path: &str,
receipts: &[TransactionReceipt],
) -> Self {
Self {
ctx: ctx.clone(),
dir: dir.to_string(),
output_dir: dir.to_string(),
range_path: range_path.to_string(),
receipts: receipts.to_vec(),
}
}
Expand Down Expand Up @@ -218,7 +225,10 @@ impl TokenTransferExporter {
block_number_array.boxed(),
])?;

let path = format!("{}/token_transfers", self.dir);
let path = format!(
"{}/token_transfers/token_transfers_{}",
self.output_dir, self.range_path
);
write_file(&self.ctx, &path, schema, columns, "token_transfer").await
}
}
23 changes: 18 additions & 5 deletions ethetl/src/exporters/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,22 @@ use crate::exporters::write_file;

pub struct TransactionExporter {
ctx: ContextRef,
dir: String,
output_dir: String,
range_path: String,
blocks: Vec<Block<Transaction>>,
}

impl TransactionExporter {
pub fn create(ctx: &ContextRef, dir: &str, blocks: &[Block<Transaction>]) -> Self {
pub fn create(
ctx: &ContextRef,
dir: &str,
range_path: &str,
blocks: &[Block<Transaction>],
) -> Self {
Self {
ctx: ctx.clone(),
dir: dir.to_string(),
output_dir: dir.to_string(),
range_path: range_path.to_string(),
blocks: blocks.to_vec(),
}
}
Expand Down Expand Up @@ -200,13 +207,19 @@ impl TransactionExporter {
block_timestamp_array.boxed(),
])?;

let tx_path = format!("{}/transactions", self.dir);
let tx_path = format!(
"{}/transactions/transactions_{}",
self.output_dir, self.range_path
);
write_file(&self.ctx, &tx_path, schema, columns, "transactions").await?;
self.write_tx_hash_file(&hash_vec).await
}

pub async fn write_tx_hash_file(&self, tx_hashes: &[String]) -> Result<()> {
let path = format!("{}/_transaction_hashes.txt", self.dir);
let path = format!(
"{}/transactions/_transactions_hash_{}.txt",
self.output_dir, self.range_path
);
let mut cursor = Cursor::new(Vec::new());
for hash in tx_hashes {
writeln!(cursor, "{}", hash)?;
Expand Down
14 changes: 3 additions & 11 deletions ethetl/src/exporters/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,18 @@ impl Worker {
while !queue.is_empty() {
let range = queue.pop().await;
let (start, end) = (range[0], range[range.len() - 1]);
let dir = format!("{}/{}_{}", ctx.get_output_dir(), start, end);
let range_path = format!("{}_{}", start, end);

let pipeline = Pipeline::create(&ctx, &dir, range);
let pipeline = Pipeline::create(&ctx, &range_path, range);
let res = pipeline.execute().await;
match res {
Ok(_) => {}
Err(e) => {
log::error!(
"Pipeline execute error will remove {:?}, error: {:?}",
dir,
range_path,
e
);

// Remove dir to keep atomic.
let storage = ctx.get_storage();
storage.batch().remove_all(&dir).await.unwrap_or_else(|x| {
log::error!("Remove {:?} error:{:?}", dir, x)
});

// TODO(Write the failure dir to file)
}
}
}
Expand Down
Loading

0 comments on commit 57e2bb5

Please sign in to comment.