diff --git a/Cargo.lock b/Cargo.lock index a07e5f5..3d29ced 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -462,6 +462,14 @@ dependencies = [ "serfig", ] +[[package]] +name = "common-eth" +version = "0.1.0" +dependencies = [ + "common-exceptions", + "web3", +] + [[package]] name = "common-exceptions" version = "0.1.0" @@ -873,6 +881,7 @@ dependencies = [ "arrow2", "clap", "common-configs", + "common-eth", "common-exceptions", "common-storages", "deadqueue", diff --git a/Cargo.toml b/Cargo.toml index e0b7891..7516fd4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ members = [ "common/configs", "common/exceptions", + "common/eth", "common/storages", "common/tracings", "ethetl", diff --git a/common/eth/Cargo.toml b/common/eth/Cargo.toml new file mode 100644 index 0000000..a369fb4 --- /dev/null +++ b/common/eth/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "common-eth" +version = "0.1.0" +edition = "2021" + +[lib] +doctest = false +test = false + +[dependencies] +# Workspace dependencies +common-exceptions = { path = "../exceptions" } +web3 = "0.18.0" + +[dev-dependencies] diff --git a/common/eth/src/erc.rs b/common/eth/src/erc.rs new file mode 100644 index 0000000..980e6a6 --- /dev/null +++ b/common/eth/src/erc.rs @@ -0,0 +1,16 @@ +// Copyright 2022 BohuTANG. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub const ERC20_TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX: &str = + "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"; diff --git a/common/eth/src/hex.rs b/common/eth/src/hex.rs new file mode 100644 index 0000000..515729b --- /dev/null +++ b/common/eth/src/hex.rs @@ -0,0 +1,29 @@ +// Copyright 2022 BohuTANG. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use web3::types::Bytes; +use web3::types::H256; + +pub fn h256_to_hex(v: &H256) -> String { + let hex = v + .as_bytes() + .iter() + .map(|x| format!("{:02x}", x)) + .collect::(); + hex.trim_start_matches('0').to_string() +} + +pub fn bytes_to_hex(v: &Bytes) -> String { + v.0.iter().map(|x| format!("{:02x}", x)).collect::() +} diff --git a/common/eth/src/lib.rs b/common/eth/src/lib.rs new file mode 100644 index 0000000..0c7863f --- /dev/null +++ b/common/eth/src/lib.rs @@ -0,0 +1,20 @@ +// Copyright 2022 BohuTANG. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod erc; +mod hex; + +pub use erc::ERC20_TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX; +pub use hex::bytes_to_hex; +pub use hex::h256_to_hex; diff --git a/common/storages/src/parquet.rs b/common/storages/src/parquet.rs index f0a0fd7..00ef08f 100644 --- a/common/storages/src/parquet.rs +++ b/common/storages/src/parquet.rs @@ -36,7 +36,7 @@ pub async fn write_parquet( columns: Chunk>, ) -> Result<()> { let options = WriteOptions { - write_statistics: true, + write_statistics: false, compression: CompressionOptions::Snappy, version: Version::V2, }; diff --git a/ethetl/Cargo.toml b/ethetl/Cargo.toml index 11d535f..950e1d4 100644 --- a/ethetl/Cargo.toml +++ b/ethetl/Cargo.toml @@ -16,6 +16,7 @@ test = false [dependencies] # Workspace dependencies common-configs = { path = "../common/configs" } +common-eth = { path = "../common/eth" } common-exceptions = { path = "../common/exceptions" } common-storages = { path = "../common/storages" } diff --git a/ethetl/src/exporters/logs.rs b/ethetl/src/exporters/logs.rs index 11d49b2..af6b810 100644 --- a/ethetl/src/exporters/logs.rs +++ b/ethetl/src/exporters/logs.rs @@ -12,22 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - -use arrow2::array::Array; -use arrow2::array::ListArray; use arrow2::array::UInt64Array; use arrow2::array::Utf8Array; -use arrow2::buffer::Buffer; use arrow2::chunk::Chunk; -use arrow2::datatypes::DataType; use arrow2::datatypes::Field; use arrow2::datatypes::Schema; +use common_eth::bytes_to_hex; use common_exceptions::Result; +use web3::types::Address; use web3::types::TransactionReceipt; +use web3::types::H256; +use web3::types::U64; use crate::contexts::ContextRef; -use crate::exporters::bytes_to_hex; use crate::exporters::write_file; pub struct LogsExporter { @@ -51,43 +48,44 @@ impl LogsExporter { let mut log_index_vec = Vec::new(); let mut transaction_hash_vec = Vec::new(); let mut transaction_index_vec = Vec::new(); + let mut block_hash_vec = Vec::new(); + let mut block_number_vec = Vec::new(); + let mut contract_address_vec = Vec::new(); let mut data_vec = Vec::new(); let mut topics_vec = Vec::new(); - let mut topic_offsets_vec = Vec::new(); - let mut offset = 0i32; - topic_offsets_vec.push(offset); for (idx, receipt) in receipts.iter().enumerate() { for log in &receipt.logs { log_index_vec.push(idx as u64); transaction_hash_vec.push(format!("{:#x}", receipt.transaction_hash)); transaction_index_vec.push(receipt.transaction_index.as_u64()); + block_hash_vec.push(format!( + "{:#x}", + receipt.block_hash.unwrap_or_else(H256::zero) + )); + block_number_vec.push(receipt.block_number.unwrap_or_else(U64::zero).as_u64()); + contract_address_vec.push(format!( + "{:#x}", + receipt.contract_address.unwrap_or_else(Address::zero) + )); data_vec.push(format!("0x{:}", &bytes_to_hex(&log.data))); - - for topic in &log.topics { - topics_vec.push(format!("{:#x}", topic)); - } - offset += log.topics.len() as i32; - topic_offsets_vec.push(offset); + let topics = log + .topics + .iter() + .map(|x| format!("0x{:#x}", x)) + .collect::>() + .join("|"); + topics_vec.push(topics); } } let log_index_array = UInt64Array::from_slice(log_index_vec); let transaction_hash_array = Utf8Array::::from_slice(transaction_hash_vec); let transaction_index_array = UInt64Array::from_slice(transaction_index_vec); + let block_hash_array = Utf8Array::::from_slice(block_hash_vec); + let block_number_array = UInt64Array::from_slice(block_number_vec); + let contract_address_array = Utf8Array::::from_slice(contract_address_vec); let data_array = Utf8Array::::from_slice(data_vec); - - // log topics. - let topics_values = Utf8Array::::from_slice(topics_vec); - let topics_array = ListArray::::from_data( - DataType::List(Box::from(Field::new( - "topics", - topics_values.data_type().clone(), - true, - ))), - Buffer::from(topic_offsets_vec), - Arc::new(topics_values), - None, - ); + let topics_array = Utf8Array::::from_slice(topics_vec); let log_index_field = Field::new("log_index", log_index_array.data_type().clone(), true); let transaction_hash_field = Field::new( @@ -100,6 +98,14 @@ impl LogsExporter { transaction_index_array.data_type().clone(), true, ); + let block_hash_field = Field::new("block_hash", block_hash_array.data_type().clone(), true); + let block_number_field = + Field::new("block_number", block_number_array.data_type().clone(), true); + let contracet_address_field = Field::new( + "contract_address", + contract_address_array.data_type().clone(), + true, + ); let data_field = Field::new("data", data_array.data_type().clone(), true); let topics_field = Field::new("topics", topics_array.data_type().clone(), true); @@ -107,6 +113,9 @@ impl LogsExporter { log_index_field, transaction_hash_field, transaction_index_field, + block_hash_field, + block_number_field, + contracet_address_field, data_field, topics_field, ]); @@ -114,6 +123,9 @@ impl LogsExporter { log_index_array.boxed(), transaction_hash_array.boxed(), transaction_index_array.boxed(), + block_hash_array.boxed(), + block_number_array.boxed(), + contract_address_array.boxed(), data_array.boxed(), topics_array.boxed(), ])?; diff --git a/ethetl/src/exporters/mod.rs b/ethetl/src/exporters/mod.rs index 75b50d7..d837b73 100644 --- a/ethetl/src/exporters/mod.rs +++ b/ethetl/src/exporters/mod.rs @@ -33,28 +33,10 @@ pub use pipeline::Pipeline; pub use receipts::ReceiptExporter; pub use token_transfers::TokenTransferExporter; pub use transactions::TransactionExporter; -use web3::types::Bytes; -use web3::types::H256; pub use worker::Worker; use crate::contexts::ContextRef; -pub const TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX: &str = - "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"; - -pub fn h256_to_hex(v: &H256) -> String { - let hex = v - .as_bytes() - .iter() - .map(|x| format!("{:02x}", x)) - .collect::(); - hex.trim_start_matches('0').to_string() -} - -pub fn bytes_to_hex(v: &Bytes) -> String { - v.0.iter().map(|x| format!("{:02x}", x)).collect::() -} - pub async fn write_file( ctx: &ContextRef, path: &str, diff --git a/ethetl/src/exporters/nft_transfers.rs b/ethetl/src/exporters/nft_transfers.rs index d678dee..449d335 100644 --- a/ethetl/src/exporters/nft_transfers.rs +++ b/ethetl/src/exporters/nft_transfers.rs @@ -17,6 +17,8 @@ use arrow2::array::Utf8Array; use arrow2::chunk::Chunk; use arrow2::datatypes::Field; use arrow2::datatypes::Schema; +use common_eth::h256_to_hex; +use common_eth::ERC20_TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX; use common_exceptions::Result; use web3::types::TransactionReceipt; use web3::types::H256; @@ -24,9 +26,7 @@ use web3::types::U256; use web3::types::U64; use crate::contexts::ContextRef; -use crate::exporters::h256_to_hex; use crate::exporters::write_file; -use crate::exporters::TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX; pub struct NftTransferExporter { ctx: ContextRef, @@ -62,7 +62,9 @@ impl NftTransferExporter { // NFT token transfer contract address. let topic_0 = format!("{:#x}", topics[0]); - if topic_0.as_str() == TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX && topics.len() == 4 { + if topic_0.as_str() == ERC20_TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX + && topics.len() == 4 + { token_address_vec.push(format!("{:#x}", logs.address)); from_address_vec.push(format!("0x{}", h256_to_hex(&topics[1]))); to_address_vec.push(format!("0x{}", h256_to_hex(&topics[2]))); diff --git a/ethetl/src/exporters/token_transfers.rs b/ethetl/src/exporters/token_transfers.rs index 13d41b6..d674c9d 100644 --- a/ethetl/src/exporters/token_transfers.rs +++ b/ethetl/src/exporters/token_transfers.rs @@ -17,6 +17,9 @@ use arrow2::array::Utf8Array; use arrow2::chunk::Chunk; use arrow2::datatypes::Field; use arrow2::datatypes::Schema; +use common_eth::bytes_to_hex; +use common_eth::h256_to_hex; +use common_eth::ERC20_TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX; use common_exceptions::Result; use web3::types::TransactionReceipt; use web3::types::H256; @@ -24,10 +27,7 @@ use web3::types::U256; use web3::types::U64; use crate::contexts::ContextRef; -use crate::exporters::bytes_to_hex; -use crate::exporters::h256_to_hex; use crate::exporters::write_file; -use crate::exporters::TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX; pub struct TokenTransferExporter { ctx: ContextRef, @@ -62,7 +62,9 @@ impl TokenTransferExporter { // Token transfer contract address. let topic_0 = format!("{:#x}", topics[0]); - if topic_0.as_str() == TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX && topics.len() == 3 { + if topic_0.as_str() == ERC20_TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX + && topics.len() == 3 + { token_address_vec.push(format!("{:#x}", logs.address)); from_address_vec.push(format!("0x{}", h256_to_hex(&topics[1]))); to_address_vec.push(format!("0x{}", h256_to_hex(&topics[2]))); diff --git a/ethetl/src/exporters/transactions.rs b/ethetl/src/exporters/transactions.rs index f4aa957..9404186 100644 --- a/ethetl/src/exporters/transactions.rs +++ b/ethetl/src/exporters/transactions.rs @@ -20,6 +20,7 @@ use arrow2::array::Utf8Array; use arrow2::chunk::Chunk; use arrow2::datatypes::Field; use arrow2::datatypes::Schema; +use common_eth::bytes_to_hex; use common_exceptions::Result; use web3::ethabi::Address; use web3::types::Block; @@ -29,7 +30,6 @@ use web3::types::U256; use web3::types::U64; use crate::contexts::ContextRef; -use crate::exporters::bytes_to_hex; use crate::exporters::write_file; pub struct TransactionExporter { diff --git a/schemas/databend/logs.sql b/schemas/databend/logs.sql new file mode 100644 index 0000000..7768c28 --- /dev/null +++ b/schemas/databend/logs.sql @@ -0,0 +1,10 @@ +CREATE TABLE IF NOT EXISTS logs ( + log_index BIGINT UNSIGNED, + transaction_hash STRING, + transaction_index BIGINT UNSIGNED, + block_hash STRING, + block_number STRING, + contract_adderss STRING, + data STRING, + topics STRING +); \ No newline at end of file