Skip to content

Commit

Permalink
Merge pull request #49 from deepeth/dev-tokens
Browse files Browse the repository at this point in the history
Add logs exporter
  • Loading branch information
BohuTANG authored Aug 3, 2022
2 parents 90533db + f0fc2e6 commit 88b3825
Show file tree
Hide file tree
Showing 19 changed files with 292 additions and 45 deletions.
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
members = [
"common/configs",
"common/exceptions",
"common/eth",
"common/storages",
"common/tracings",
"ethetl",
Expand Down
15 changes: 15 additions & 0 deletions common/eth/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "common-eth"
version = "0.1.0"
edition = "2021"

[lib]
doctest = false
test = false

[dependencies]
# Workspace dependencies
common-exceptions = { path = "../exceptions" }
web3 = "0.18.0"

[dev-dependencies]
16 changes: 16 additions & 0 deletions common/eth/src/erc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2022 BohuTANG.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub const ERC20_TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX: &str =
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef";
29 changes: 29 additions & 0 deletions common/eth/src/hex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2022 BohuTANG.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use web3::types::Bytes;
use web3::types::H256;

pub fn h256_to_hex(v: &H256) -> String {
let hex = v
.as_bytes()
.iter()
.map(|x| format!("{:02x}", x))
.collect::<String>();
hex.trim_start_matches('0').to_string()
}

pub fn bytes_to_hex(v: &Bytes) -> String {
v.0.iter().map(|x| format!("{:02x}", x)).collect::<String>()
}
20 changes: 20 additions & 0 deletions common/eth/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2022 BohuTANG.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod erc;
mod hex;

pub use erc::ERC20_TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX;
pub use hex::bytes_to_hex;
pub use hex::h256_to_hex;
5 changes: 4 additions & 1 deletion common/storages/src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ pub async fn write_csv(
.map(|f| f.name.clone())
.collect::<Vec<String>>();

let options = write::SerializeOptions::default();
let options = write::SerializeOptions {
delimiter: u8::try_from('\t').unwrap(),
..Default::default()
};
write::write_header(&mut cursor, headers.as_slice(), &options)?;
write::write_chunk(&mut cursor, &columns, &options)?;

Expand Down
2 changes: 1 addition & 1 deletion common/storages/src/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub async fn write_parquet(
columns: Chunk<Box<dyn Array>>,
) -> Result<()> {
let options = WriteOptions {
write_statistics: true,
write_statistics: false,
compression: CompressionOptions::Snappy,
version: Version::V2,
};
Expand Down
1 change: 1 addition & 0 deletions ethetl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ test = false
[dependencies]
# Workspace dependencies
common-configs = { path = "../common/configs" }
common-eth = { path = "../common/eth" }
common-exceptions = { path = "../common/exceptions" }
common-storages = { path = "../common/storages" }

Expand Down
136 changes: 136 additions & 0 deletions ethetl/src/exporters/logs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright 2022 BohuTANG.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use arrow2::array::UInt64Array;
use arrow2::array::Utf8Array;
use arrow2::chunk::Chunk;
use arrow2::datatypes::Field;
use arrow2::datatypes::Schema;
use common_eth::bytes_to_hex;
use common_exceptions::Result;
use web3::types::Address;
use web3::types::TransactionReceipt;
use web3::types::H256;
use web3::types::U64;

use crate::contexts::ContextRef;
use crate::exporters::write_file;

pub struct LogsExporter {
ctx: ContextRef,
dir: String,
receipts: Vec<TransactionReceipt>,
}

impl LogsExporter {
pub fn create(ctx: &ContextRef, dir: &str, receipts: &[TransactionReceipt]) -> LogsExporter {
Self {
ctx: ctx.clone(),
dir: dir.to_string(),
receipts: receipts.to_vec(),
}
}

pub async fn export(&self) -> Result<()> {
let receipts = &self.receipts;

let mut log_index_vec = Vec::new();
let mut transaction_hash_vec = Vec::new();
let mut transaction_index_vec = Vec::new();
let mut block_hash_vec = Vec::new();
let mut block_number_vec = Vec::new();
let mut contract_address_vec = Vec::new();
let mut data_vec = Vec::new();
let mut topics_vec = Vec::new();

for (idx, receipt) in receipts.iter().enumerate() {
for log in &receipt.logs {
log_index_vec.push(idx as u64);
transaction_hash_vec.push(format!("{:#x}", receipt.transaction_hash));
transaction_index_vec.push(receipt.transaction_index.as_u64());
block_hash_vec.push(format!(
"{:#x}",
receipt.block_hash.unwrap_or_else(H256::zero)
));
block_number_vec.push(receipt.block_number.unwrap_or_else(U64::zero).as_u64());
contract_address_vec.push(format!(
"{:#x}",
receipt.contract_address.unwrap_or_else(Address::zero)
));
data_vec.push(format!("0x{:}", &bytes_to_hex(&log.data)));
let topics = log
.topics
.iter()
.map(|x| format!("0x{:#x}", x))
.collect::<Vec<String>>()
.join("|");
topics_vec.push(topics);
}
}
let log_index_array = UInt64Array::from_slice(log_index_vec);
let transaction_hash_array = Utf8Array::<i32>::from_slice(transaction_hash_vec);
let transaction_index_array = UInt64Array::from_slice(transaction_index_vec);
let block_hash_array = Utf8Array::<i32>::from_slice(block_hash_vec);
let block_number_array = UInt64Array::from_slice(block_number_vec);
let contract_address_array = Utf8Array::<i32>::from_slice(contract_address_vec);
let data_array = Utf8Array::<i32>::from_slice(data_vec);
let topics_array = Utf8Array::<i32>::from_slice(topics_vec);

let log_index_field = Field::new("log_index", log_index_array.data_type().clone(), true);
let transaction_hash_field = Field::new(
"transaction_hash",
transaction_hash_array.data_type().clone(),
true,
);
let transaction_index_field = Field::new(
"transaction_index",
transaction_index_array.data_type().clone(),
true,
);
let block_hash_field = Field::new("block_hash", block_hash_array.data_type().clone(), true);
let block_number_field =
Field::new("block_number", block_number_array.data_type().clone(), true);
let contracet_address_field = Field::new(
"contract_address",
contract_address_array.data_type().clone(),
true,
);
let data_field = Field::new("data", data_array.data_type().clone(), true);
let topics_field = Field::new("topics", topics_array.data_type().clone(), true);

let schema = Schema::from(vec![
log_index_field,
transaction_hash_field,
transaction_index_field,
block_hash_field,
block_number_field,
contracet_address_field,
data_field,
topics_field,
]);
let columns = Chunk::try_new(vec![
log_index_array.boxed(),
transaction_hash_array.boxed(),
transaction_index_array.boxed(),
block_hash_array.boxed(),
block_number_array.boxed(),
contract_address_array.boxed(),
data_array.boxed(),
topics_array.boxed(),
])?;

let path = format!("{}/logs", self.dir);
write_file(&self.ctx, &path, schema, columns, "logs").await
}
}
20 changes: 2 additions & 18 deletions ethetl/src/exporters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

mod blocks;
mod logs;
mod nft_transfers;
mod pipeline;
mod receipts;
Expand All @@ -26,33 +27,16 @@ use arrow2::datatypes::Schema;
pub use blocks::BlockExporter;
use common_exceptions::ErrorCode;
use common_exceptions::Result;
pub use logs::LogsExporter;
pub use nft_transfers::NftTransferExporter;
pub use pipeline::Pipeline;
pub use receipts::ReceiptExporter;
pub use token_transfers::TokenTransferExporter;
pub use transactions::TransactionExporter;
use web3::types::Bytes;
use web3::types::H256;
pub use worker::Worker;

use crate::contexts::ContextRef;

pub const TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX: &str =
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef";

pub fn h256_to_hex(v: &H256) -> String {
let hex = v
.as_bytes()
.iter()
.map(|x| format!("{:02x}", x))
.collect::<String>();
hex.trim_start_matches('0').to_string()
}

pub fn bytes_to_hex(v: &Bytes) -> String {
v.0.iter().map(|x| format!("{:02x}", x)).collect::<String>()
}

pub async fn write_file(
ctx: &ContextRef,
path: &str,
Expand Down
15 changes: 12 additions & 3 deletions ethetl/src/exporters/nft_transfers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@ use arrow2::array::Utf8Array;
use arrow2::chunk::Chunk;
use arrow2::datatypes::Field;
use arrow2::datatypes::Schema;
use common_eth::h256_to_hex;
use common_eth::ERC20_TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX;
use common_exceptions::Result;
use web3::types::TransactionReceipt;
use web3::types::H256;
use web3::types::U256;
use web3::types::U64;

use crate::contexts::ContextRef;
use crate::exporters::h256_to_hex;
use crate::exporters::write_file;
use crate::exporters::TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX;

pub struct NftTransferExporter {
ctx: ContextRef,
Expand All @@ -48,6 +48,7 @@ impl NftTransferExporter {
let mut from_address_vec = vec![];
let mut to_address_vec = vec![];
let mut token_id_vec = vec![];
let mut erc_standard_vec = vec![];
let mut transaction_hash_vec = vec![];
let mut log_index_vec = vec![];
let mut block_number_vec = vec![];
Expand All @@ -61,11 +62,14 @@ impl NftTransferExporter {

// NFT token transfer contract address.
let topic_0 = format!("{:#x}", topics[0]);
if topic_0.as_str() == TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX && topics.len() == 4 {
if topic_0.as_str() == ERC20_TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX
&& topics.len() == 4
{
token_address_vec.push(format!("{:#x}", logs.address));
from_address_vec.push(format!("0x{}", h256_to_hex(&topics[1])));
to_address_vec.push(format!("0x{}", h256_to_hex(&topics[2])));
token_id_vec.push(format!("0x{}", h256_to_hex(&topics[3])));
erc_standard_vec.push("ERC20");
transaction_hash_vec.push(format!(
"{:#x}",
logs.transaction_hash.unwrap_or_else(H256::zero)
Expand All @@ -80,6 +84,7 @@ impl NftTransferExporter {
let from_address_array = Utf8Array::<i32>::from_slice(from_address_vec);
let to_address_array = Utf8Array::<i32>::from_slice(to_address_vec);
let token_id_array = Utf8Array::<i32>::from_slice(token_id_vec);
let erc_standard_array = Utf8Array::<i32>::from_slice(erc_standard_vec);
let transaction_hash_array = Utf8Array::<i32>::from_slice(transaction_hash_vec);
let log_index_array = UInt64Array::from_slice(log_index_vec);
let block_number_array = UInt64Array::from_slice(block_number_vec);
Expand All @@ -93,6 +98,8 @@ impl NftTransferExporter {
Field::new("from_address", from_address_array.data_type().clone(), true);
let to_address_field = Field::new("to_address", to_address_array.data_type().clone(), true);
let token_id_field = Field::new("token_id", token_id_array.data_type().clone(), true);
let erc_standard_field =
Field::new("erc_standard", erc_standard_array.data_type().clone(), true);
let transaction_hash_field = Field::new(
"transaction_hash",
transaction_hash_array.data_type().clone(),
Expand All @@ -106,6 +113,7 @@ impl NftTransferExporter {
from_address_field,
to_address_field,
token_id_field,
erc_standard_field,
transaction_hash_field,
log_index_field,
block_number_field,
Expand All @@ -116,6 +124,7 @@ impl NftTransferExporter {
from_address_array.boxed(),
to_address_array.boxed(),
token_id_array.boxed(),
erc_standard_array.boxed(),
transaction_hash_array.boxed(),
log_index_array.boxed(),
block_number_array.boxed(),
Expand Down
Loading

0 comments on commit 88b3825

Please sign in to comment.