From e40a70b3b0b4565c5b222173b970657e8636e6b9 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Wed, 3 Aug 2022 11:42:15 +0800 Subject: [PATCH 1/6] Add method_id to transaction --- ethetl/src/exporters/transactions.rs | 13 ++++++++++++- schemas/databend/transactios.sql | 2 ++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/ethetl/src/exporters/transactions.rs b/ethetl/src/exporters/transactions.rs index c270d27..f4aa957 100644 --- a/ethetl/src/exporters/transactions.rs +++ b/ethetl/src/exporters/transactions.rs @@ -57,6 +57,7 @@ impl TransactionExporter { let mut to_address_vec = vec![]; let mut value_vec = vec![]; let mut gas_vec = vec![]; + let mut method_id_vec = vec![]; let mut gas_price_vec = vec![]; let mut input_vec = vec![]; let mut max_fee_per_gas_vec = vec![]; @@ -75,8 +76,14 @@ impl TransactionExporter { to_address_vec.push(format!("{:#x}", tx.to.unwrap_or_else(Address::zero))); value_vec.push(format!("{:}", tx.value)); gas_vec.push(tx.gas.as_u64()); + let input = bytes_to_hex(&tx.input); + if input.len() > 7 { + method_id_vec.push(format!("0x{:}", &input[..8])); + } else { + method_id_vec.push(format!("0x{:0>8}", &input)); + } gas_price_vec.push(tx.gas_price.unwrap_or_else(U256::zero).as_u64()); - input_vec.push(format!("0x{}", bytes_to_hex(&tx.input))); + input_vec.push(bytes_to_hex(&tx.input)); max_fee_per_gas_vec.push(tx.max_fee_per_gas.unwrap_or_else(U256::zero).as_u64()); max_priority_fee_per_gas_vec.push( tx.max_priority_fee_per_gas @@ -98,6 +105,7 @@ impl TransactionExporter { let to_address_array = Utf8Array::::from_slice(to_address_vec); let value_array = Utf8Array::::from_slice(value_vec); let gas_array = UInt64Array::from_slice(gas_vec); + let method_id_array = Utf8Array::::from_slice(method_id_vec); let gas_price_array = UInt64Array::from_slice(gas_price_vec); let input_array = Utf8Array::::from_slice(input_vec); let max_fee_per_gas_array = UInt64Array::from_slice(max_fee_per_gas_vec); @@ -120,6 +128,7 @@ impl TransactionExporter { let to_address_field = Field::new("to_address", to_address_array.data_type().clone(), true); let value_field = Field::new("value", value_array.data_type().clone(), true); let gas_field = Field::new("gas", gas_array.data_type().clone(), true); + let method_id_field = Field::new("method_id", method_id_array.data_type().clone(), true); let gas_price_field = Field::new("gas_price", gas_price_array.data_type().clone(), true); let input_field = Field::new("input", input_array.data_type().clone(), true); let max_fee_per_gas_field = Field::new( @@ -154,6 +163,7 @@ impl TransactionExporter { to_address_field, value_field, gas_field, + method_id_field, gas_price_field, input_field, max_fee_per_gas_field, @@ -172,6 +182,7 @@ impl TransactionExporter { to_address_array.boxed(), value_array.boxed(), gas_array.boxed(), + method_id_array.boxed(), gas_price_array.boxed(), input_array.boxed(), max_fee_per_gas_array.boxed(), diff --git a/schemas/databend/transactios.sql b/schemas/databend/transactios.sql index 29ff44b..68f134b 100644 --- a/schemas/databend/transactios.sql +++ b/schemas/databend/transactios.sql @@ -6,6 +6,8 @@ CREATE TABLE IF NOT EXISTS transactions ( to_address STRING, value STRING, gas BIGINT UNSIGNED, + method_id STRING, + gas BIGINT UNSIGNED, gas_price BIGINT UNSIGNED, input STRING, max_fee_per_gas BIGINT UNSIGNED, From a99943a4498ed7420a45810492c16c9f54379f92 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Wed, 3 Aug 2022 14:25:28 +0800 Subject: [PATCH 2/6] Reorder status and root --- common/storages/src/csv.rs | 5 ++++- ethetl/src/exporters/receipts.rs | 8 ++++---- schemas/databend/receipts.sql | 2 +- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/common/storages/src/csv.rs b/common/storages/src/csv.rs index 4590d7f..6e8ae6f 100644 --- a/common/storages/src/csv.rs +++ b/common/storages/src/csv.rs @@ -35,7 +35,10 @@ pub async fn write_csv( .map(|f| f.name.clone()) .collect::>(); - let options = write::SerializeOptions::default(); + let options = write::SerializeOptions { + delimiter: u8::try_from('\t').unwrap(), + ..Default::default() + }; write::write_header(&mut cursor, headers.as_slice(), &options)?; write::write_chunk(&mut cursor, &columns, &options)?; diff --git a/ethetl/src/exporters/receipts.rs b/ethetl/src/exporters/receipts.rs index b797b43..0a1c629 100644 --- a/ethetl/src/exporters/receipts.rs +++ b/ethetl/src/exporters/receipts.rs @@ -71,8 +71,8 @@ impl ReceiptExporter { let mut cumulative_gas_used_vec = Vec::with_capacity(receipt_len); let mut gas_used_vec = Vec::with_capacity(receipt_len); let mut contract_address_vec = Vec::with_capacity(receipt_len); - let mut root_vec = Vec::with_capacity(receipt_len); let mut status_vec = Vec::with_capacity(receipt_len); + let mut root_vec = Vec::with_capacity(receipt_len); let mut effective_gas_price_vec = Vec::with_capacity(receipt_len); for receipt in receipts { @@ -89,8 +89,8 @@ impl ReceiptExporter { "{:#x}", receipt.contract_address.unwrap_or_else(Address::zero) )); - root_vec.push(format!("{:#x}", receipt.root.unwrap_or_else(H256::zero))); status_vec.push(receipt.status.unwrap_or_else(U64::zero).as_u64()); + root_vec.push(format!("{:#x}", receipt.root.unwrap_or_else(H256::zero))); effective_gas_price_vec.push( receipt .effective_gas_price @@ -105,8 +105,8 @@ impl ReceiptExporter { let cumulative_gas_used_array = UInt64Array::from_slice(cumulative_gas_used_vec); let gas_used_array = UInt64Array::from_slice(gas_used_vec); let contract_address_array = Utf8Array::::from_slice(contract_address_vec); - let root_array = Utf8Array::::from_slice(root_vec); let status_array = UInt64Array::from_slice(status_vec); + let root_array = Utf8Array::::from_slice(root_vec); let effective_gas_price_array = UInt64Array::from_slice(effective_gas_price_vec); let transaction_hash_field = Field::new( @@ -161,8 +161,8 @@ impl ReceiptExporter { cumulative_gas_used_array.boxed(), gas_used_array.boxed(), contract_address_array.boxed(), - root_array.boxed(), status_array.boxed(), + root_array.boxed(), effective_gas_price_array.boxed(), ])?; diff --git a/schemas/databend/receipts.sql b/schemas/databend/receipts.sql index bc92c68..d3fa9e2 100644 --- a/schemas/databend/receipts.sql +++ b/schemas/databend/receipts.sql @@ -6,7 +6,7 @@ CREATE TABLE IF NOT EXISTS receipts ( cumulative_gas_used BIGINT UNSIGNED, gas_used BIGINT UNSIGNED, contract_address STRING, - root STRING, status BIGINT UNSIGNED, + root STRING, effective_gas_price BIGINT UNSIGNED ); \ No newline at end of file From c5f066af9bcc489fa5fe5debaa42155def7a400b Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Wed, 3 Aug 2022 14:31:47 +0800 Subject: [PATCH 3/6] Add erc_standard to nft transfers table --- ethetl/src/exporters/nft_transfers.rs | 7 +++++++ schemas/databend/nft_transfers.sql | 1 + 2 files changed, 8 insertions(+) diff --git a/ethetl/src/exporters/nft_transfers.rs b/ethetl/src/exporters/nft_transfers.rs index 14a48f8..d678dee 100644 --- a/ethetl/src/exporters/nft_transfers.rs +++ b/ethetl/src/exporters/nft_transfers.rs @@ -48,6 +48,7 @@ impl NftTransferExporter { let mut from_address_vec = vec![]; let mut to_address_vec = vec![]; let mut token_id_vec = vec![]; + let mut erc_standard_vec = vec![]; let mut transaction_hash_vec = vec![]; let mut log_index_vec = vec![]; let mut block_number_vec = vec![]; @@ -66,6 +67,7 @@ impl NftTransferExporter { from_address_vec.push(format!("0x{}", h256_to_hex(&topics[1]))); to_address_vec.push(format!("0x{}", h256_to_hex(&topics[2]))); token_id_vec.push(format!("0x{}", h256_to_hex(&topics[3]))); + erc_standard_vec.push("ERC20"); transaction_hash_vec.push(format!( "{:#x}", logs.transaction_hash.unwrap_or_else(H256::zero) @@ -80,6 +82,7 @@ impl NftTransferExporter { let from_address_array = Utf8Array::::from_slice(from_address_vec); let to_address_array = Utf8Array::::from_slice(to_address_vec); let token_id_array = Utf8Array::::from_slice(token_id_vec); + let erc_standard_array = Utf8Array::::from_slice(erc_standard_vec); let transaction_hash_array = Utf8Array::::from_slice(transaction_hash_vec); let log_index_array = UInt64Array::from_slice(log_index_vec); let block_number_array = UInt64Array::from_slice(block_number_vec); @@ -93,6 +96,8 @@ impl NftTransferExporter { Field::new("from_address", from_address_array.data_type().clone(), true); let to_address_field = Field::new("to_address", to_address_array.data_type().clone(), true); let token_id_field = Field::new("token_id", token_id_array.data_type().clone(), true); + let erc_standard_field = + Field::new("erc_standard", erc_standard_array.data_type().clone(), true); let transaction_hash_field = Field::new( "transaction_hash", transaction_hash_array.data_type().clone(), @@ -106,6 +111,7 @@ impl NftTransferExporter { from_address_field, to_address_field, token_id_field, + erc_standard_field, transaction_hash_field, log_index_field, block_number_field, @@ -116,6 +122,7 @@ impl NftTransferExporter { from_address_array.boxed(), to_address_array.boxed(), token_id_array.boxed(), + erc_standard_array.boxed(), transaction_hash_array.boxed(), log_index_array.boxed(), block_number_array.boxed(), diff --git a/schemas/databend/nft_transfers.sql b/schemas/databend/nft_transfers.sql index 7572f08..3552359 100644 --- a/schemas/databend/nft_transfers.sql +++ b/schemas/databend/nft_transfers.sql @@ -3,6 +3,7 @@ CREATE TABLE IF NOT EXISTS nft_transfers( from_address STRING, to_address STRING, token_id STRING, + erc_standard STRING, transaction_hash STRING, log_index BIGINT UNSIGNED, block_number BIGINT UNSIGNED, From 992e18a29603992d34153290874dcfb1efb57531 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Wed, 3 Aug 2022 16:40:30 +0800 Subject: [PATCH 4/6] Add topics array --- ethetl/src/exporters/logs.rs | 120 ++++++++++++++++++++++++ ethetl/src/exporters/mod.rs | 2 + ethetl/src/exporters/receipts.rs | 9 +- ethetl/src/exporters/token_transfers.rs | 11 +-- 4 files changed, 131 insertions(+), 11 deletions(-) create mode 100644 ethetl/src/exporters/logs.rs diff --git a/ethetl/src/exporters/logs.rs b/ethetl/src/exporters/logs.rs new file mode 100644 index 0000000..d125dd7 --- /dev/null +++ b/ethetl/src/exporters/logs.rs @@ -0,0 +1,120 @@ +// Copyright 2022 BohuTANG. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use arrow2::array::Array; +use arrow2::array::ListArray; +use arrow2::array::UInt64Array; +use arrow2::array::Utf8Array; +use arrow2::buffer::Buffer; +use arrow2::chunk::Chunk; +use arrow2::datatypes::DataType; +use arrow2::datatypes::Field; +use arrow2::datatypes::Schema; +use common_exceptions::Result; +use web3::types::TransactionReceipt; + +use crate::contexts::ContextRef; +use crate::exporters::bytes_to_hex; +use crate::exporters::write_file; + +pub struct LogsExporter { + ctx: ContextRef, + dir: String, + receipts: Vec, +} + +impl LogsExporter { + pub fn create(ctx: &ContextRef, dir: &str, receipts: &[TransactionReceipt]) -> LogsExporter { + Self { + ctx: ctx.clone(), + dir: dir.to_string(), + receipts: receipts.to_vec(), + } + } + + pub async fn export(&self) -> Result<()> { + let receipts = &self.receipts; + + let mut log_index_vec = Vec::new(); + let mut transaction_hash_vec = Vec::new(); + let mut transaction_index_vec = Vec::new(); + let mut data_vec = Vec::new(); + let mut topics_vec = Vec::new(); + let mut topic_offsets_vec = Vec::new(); + let mut offset = 0i32; + topic_offsets_vec.push(offset); + + for (idx, receipt) in receipts.iter().enumerate() { + for log in &receipt.logs { + log_index_vec.push(idx as u64); + transaction_hash_vec.push(format!("{:#x}", receipt.transaction_hash)); + transaction_index_vec.push(receipt.transaction_index.as_u64()); + data_vec.push(format!("0x{:}", &bytes_to_hex(&log.data))); + + for topic in &log.topics { + topics_vec.push(format!("{:#x}", topic)); + } + offset += log.topics.len() as i32; + topic_offsets_vec.push(offset); + } + } + let log_index_array = UInt64Array::from_slice(log_index_vec); + let transaction_hash_array = Utf8Array::::from_slice(transaction_hash_vec); + let transaction_index_array = UInt64Array::from_slice(transaction_index_vec); + let data_array = Utf8Array::::from_slice(data_vec); + + // log topics. + let topics_values = Utf8Array::::from_slice(topics_vec); + let topics_array = ListArray::::from_data( + DataType::Utf8, + Buffer::from(topic_offsets_vec), + Arc::new(topics_values), + None, + ); + + let log_index_field = Field::new("log_index", log_index_array.data_type().clone(), true); + let transaction_hash_field = Field::new( + "transaction_hash", + transaction_hash_array.data_type().clone(), + true, + ); + let transaction_index_field = Field::new( + "transaction_index", + transaction_index_array.data_type().clone(), + true, + ); + let data_field = Field::new("data", data_array.data_type().clone(), true); + let topics_field = Field::new("topics", topics_array.data_type().clone(), true); + + let schema = Schema::from(vec![ + log_index_field, + transaction_hash_field, + transaction_index_field, + data_field, + topics_field, + ]); + let columns = Chunk::try_new(vec![ + log_index_array.boxed(), + transaction_hash_array.boxed(), + transaction_index_array.boxed(), + data_array.boxed(), + topics_array.boxed(), + ])?; + + let path = format!("{}/logs", self.dir); + write_file(&self.ctx, &path, schema, columns, "logs").await + } +} diff --git a/ethetl/src/exporters/mod.rs b/ethetl/src/exporters/mod.rs index d6f963a..75b50d7 100644 --- a/ethetl/src/exporters/mod.rs +++ b/ethetl/src/exporters/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. mod blocks; +mod logs; mod nft_transfers; mod pipeline; mod receipts; @@ -26,6 +27,7 @@ use arrow2::datatypes::Schema; pub use blocks::BlockExporter; use common_exceptions::ErrorCode; use common_exceptions::Result; +pub use logs::LogsExporter; pub use nft_transfers::NftTransferExporter; pub use pipeline::Pipeline; pub use receipts::ReceiptExporter; diff --git a/ethetl/src/exporters/receipts.rs b/ethetl/src/exporters/receipts.rs index 0a1c629..2488e7c 100644 --- a/ethetl/src/exporters/receipts.rs +++ b/ethetl/src/exporters/receipts.rs @@ -27,6 +27,7 @@ use web3::types::U64; use crate::contexts::ContextRef; use crate::eth::ReceiptFetcher; use crate::exporters::write_file; +use crate::exporters::LogsExporter; use crate::exporters::NftTransferExporter; use crate::exporters::TokenTransferExporter; @@ -53,6 +54,10 @@ impl ReceiptExporter { let receipts = fetcher.fetch().await?; self.export_receipts(&receipts).await?; + // Logs. + let logs_export = LogsExporter::create(&self.ctx, &self.dir, &receipts); + logs_export.export().await?; + // ERC20 Token transfers. let eth_transfer_export = TokenTransferExporter::create(&self.ctx, &self.dir, &receipts); eth_transfer_export.export().await?; @@ -166,7 +171,7 @@ impl ReceiptExporter { effective_gas_price_array.boxed(), ])?; - let receipt_path = format!("{}/receipts", self.dir); - write_file(&self.ctx, &receipt_path, schema, columns, "receipts").await + let path = format!("{}/receipts", self.dir); + write_file(&self.ctx, &path, schema, columns, "receipts").await } } diff --git a/ethetl/src/exporters/token_transfers.rs b/ethetl/src/exporters/token_transfers.rs index c3f048e..13d41b6 100644 --- a/ethetl/src/exporters/token_transfers.rs +++ b/ethetl/src/exporters/token_transfers.rs @@ -122,14 +122,7 @@ impl TokenTransferExporter { block_number_array.boxed(), ])?; - let receipt_path = format!("{}/eth_token_transfers", self.dir); - write_file( - &self.ctx, - &receipt_path, - schema, - columns, - "eth_token_transfer", - ) - .await + let path = format!("{}/eth_token_transfers", self.dir); + write_file(&self.ctx, &path, schema, columns, "eth_token_transfer").await } } From e41c1ebb5adf8c7aa74d6470c1182eefae618a06 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Wed, 3 Aug 2022 16:51:57 +0800 Subject: [PATCH 5/6] Add array type --- ethetl/src/exporters/logs.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ethetl/src/exporters/logs.rs b/ethetl/src/exporters/logs.rs index d125dd7..11d49b2 100644 --- a/ethetl/src/exporters/logs.rs +++ b/ethetl/src/exporters/logs.rs @@ -79,7 +79,11 @@ impl LogsExporter { // log topics. let topics_values = Utf8Array::::from_slice(topics_vec); let topics_array = ListArray::::from_data( - DataType::Utf8, + DataType::List(Box::from(Field::new( + "topics", + topics_values.data_type().clone(), + true, + ))), Buffer::from(topic_offsets_vec), Arc::new(topics_values), None, From f0fc2e6d50c54f693833c2c13694e7ecfb320950 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Wed, 3 Aug 2022 18:00:29 +0800 Subject: [PATCH 6/6] Add eth to common --- Cargo.lock | 9 ++++ Cargo.toml | 1 + common/eth/Cargo.toml | 15 ++++++ common/eth/src/erc.rs | 16 ++++++ common/eth/src/hex.rs | 29 ++++++++++ common/eth/src/lib.rs | 20 +++++++ common/storages/src/parquet.rs | 2 +- ethetl/Cargo.toml | 1 + ethetl/src/exporters/logs.rs | 70 +++++++++++++++---------- ethetl/src/exporters/mod.rs | 18 ------- ethetl/src/exporters/nft_transfers.rs | 8 +-- ethetl/src/exporters/token_transfers.rs | 10 ++-- ethetl/src/exporters/transactions.rs | 2 +- schemas/databend/logs.sql | 10 ++++ 14 files changed, 155 insertions(+), 56 deletions(-) create mode 100644 common/eth/Cargo.toml create mode 100644 common/eth/src/erc.rs create mode 100644 common/eth/src/hex.rs create mode 100644 common/eth/src/lib.rs create mode 100644 schemas/databend/logs.sql diff --git a/Cargo.lock b/Cargo.lock index a07e5f5..3d29ced 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -462,6 +462,14 @@ dependencies = [ "serfig", ] +[[package]] +name = "common-eth" +version = "0.1.0" +dependencies = [ + "common-exceptions", + "web3", +] + [[package]] name = "common-exceptions" version = "0.1.0" @@ -873,6 +881,7 @@ dependencies = [ "arrow2", "clap", "common-configs", + "common-eth", "common-exceptions", "common-storages", "deadqueue", diff --git a/Cargo.toml b/Cargo.toml index e0b7891..7516fd4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ members = [ "common/configs", "common/exceptions", + "common/eth", "common/storages", "common/tracings", "ethetl", diff --git a/common/eth/Cargo.toml b/common/eth/Cargo.toml new file mode 100644 index 0000000..a369fb4 --- /dev/null +++ b/common/eth/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "common-eth" +version = "0.1.0" +edition = "2021" + +[lib] +doctest = false +test = false + +[dependencies] +# Workspace dependencies +common-exceptions = { path = "../exceptions" } +web3 = "0.18.0" + +[dev-dependencies] diff --git a/common/eth/src/erc.rs b/common/eth/src/erc.rs new file mode 100644 index 0000000..980e6a6 --- /dev/null +++ b/common/eth/src/erc.rs @@ -0,0 +1,16 @@ +// Copyright 2022 BohuTANG. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub const ERC20_TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX: &str = + "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"; diff --git a/common/eth/src/hex.rs b/common/eth/src/hex.rs new file mode 100644 index 0000000..515729b --- /dev/null +++ b/common/eth/src/hex.rs @@ -0,0 +1,29 @@ +// Copyright 2022 BohuTANG. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use web3::types::Bytes; +use web3::types::H256; + +pub fn h256_to_hex(v: &H256) -> String { + let hex = v + .as_bytes() + .iter() + .map(|x| format!("{:02x}", x)) + .collect::(); + hex.trim_start_matches('0').to_string() +} + +pub fn bytes_to_hex(v: &Bytes) -> String { + v.0.iter().map(|x| format!("{:02x}", x)).collect::() +} diff --git a/common/eth/src/lib.rs b/common/eth/src/lib.rs new file mode 100644 index 0000000..0c7863f --- /dev/null +++ b/common/eth/src/lib.rs @@ -0,0 +1,20 @@ +// Copyright 2022 BohuTANG. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod erc; +mod hex; + +pub use erc::ERC20_TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX; +pub use hex::bytes_to_hex; +pub use hex::h256_to_hex; diff --git a/common/storages/src/parquet.rs b/common/storages/src/parquet.rs index f0a0fd7..00ef08f 100644 --- a/common/storages/src/parquet.rs +++ b/common/storages/src/parquet.rs @@ -36,7 +36,7 @@ pub async fn write_parquet( columns: Chunk>, ) -> Result<()> { let options = WriteOptions { - write_statistics: true, + write_statistics: false, compression: CompressionOptions::Snappy, version: Version::V2, }; diff --git a/ethetl/Cargo.toml b/ethetl/Cargo.toml index 11d535f..950e1d4 100644 --- a/ethetl/Cargo.toml +++ b/ethetl/Cargo.toml @@ -16,6 +16,7 @@ test = false [dependencies] # Workspace dependencies common-configs = { path = "../common/configs" } +common-eth = { path = "../common/eth" } common-exceptions = { path = "../common/exceptions" } common-storages = { path = "../common/storages" } diff --git a/ethetl/src/exporters/logs.rs b/ethetl/src/exporters/logs.rs index 11d49b2..af6b810 100644 --- a/ethetl/src/exporters/logs.rs +++ b/ethetl/src/exporters/logs.rs @@ -12,22 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - -use arrow2::array::Array; -use arrow2::array::ListArray; use arrow2::array::UInt64Array; use arrow2::array::Utf8Array; -use arrow2::buffer::Buffer; use arrow2::chunk::Chunk; -use arrow2::datatypes::DataType; use arrow2::datatypes::Field; use arrow2::datatypes::Schema; +use common_eth::bytes_to_hex; use common_exceptions::Result; +use web3::types::Address; use web3::types::TransactionReceipt; +use web3::types::H256; +use web3::types::U64; use crate::contexts::ContextRef; -use crate::exporters::bytes_to_hex; use crate::exporters::write_file; pub struct LogsExporter { @@ -51,43 +48,44 @@ impl LogsExporter { let mut log_index_vec = Vec::new(); let mut transaction_hash_vec = Vec::new(); let mut transaction_index_vec = Vec::new(); + let mut block_hash_vec = Vec::new(); + let mut block_number_vec = Vec::new(); + let mut contract_address_vec = Vec::new(); let mut data_vec = Vec::new(); let mut topics_vec = Vec::new(); - let mut topic_offsets_vec = Vec::new(); - let mut offset = 0i32; - topic_offsets_vec.push(offset); for (idx, receipt) in receipts.iter().enumerate() { for log in &receipt.logs { log_index_vec.push(idx as u64); transaction_hash_vec.push(format!("{:#x}", receipt.transaction_hash)); transaction_index_vec.push(receipt.transaction_index.as_u64()); + block_hash_vec.push(format!( + "{:#x}", + receipt.block_hash.unwrap_or_else(H256::zero) + )); + block_number_vec.push(receipt.block_number.unwrap_or_else(U64::zero).as_u64()); + contract_address_vec.push(format!( + "{:#x}", + receipt.contract_address.unwrap_or_else(Address::zero) + )); data_vec.push(format!("0x{:}", &bytes_to_hex(&log.data))); - - for topic in &log.topics { - topics_vec.push(format!("{:#x}", topic)); - } - offset += log.topics.len() as i32; - topic_offsets_vec.push(offset); + let topics = log + .topics + .iter() + .map(|x| format!("0x{:#x}", x)) + .collect::>() + .join("|"); + topics_vec.push(topics); } } let log_index_array = UInt64Array::from_slice(log_index_vec); let transaction_hash_array = Utf8Array::::from_slice(transaction_hash_vec); let transaction_index_array = UInt64Array::from_slice(transaction_index_vec); + let block_hash_array = Utf8Array::::from_slice(block_hash_vec); + let block_number_array = UInt64Array::from_slice(block_number_vec); + let contract_address_array = Utf8Array::::from_slice(contract_address_vec); let data_array = Utf8Array::::from_slice(data_vec); - - // log topics. - let topics_values = Utf8Array::::from_slice(topics_vec); - let topics_array = ListArray::::from_data( - DataType::List(Box::from(Field::new( - "topics", - topics_values.data_type().clone(), - true, - ))), - Buffer::from(topic_offsets_vec), - Arc::new(topics_values), - None, - ); + let topics_array = Utf8Array::::from_slice(topics_vec); let log_index_field = Field::new("log_index", log_index_array.data_type().clone(), true); let transaction_hash_field = Field::new( @@ -100,6 +98,14 @@ impl LogsExporter { transaction_index_array.data_type().clone(), true, ); + let block_hash_field = Field::new("block_hash", block_hash_array.data_type().clone(), true); + let block_number_field = + Field::new("block_number", block_number_array.data_type().clone(), true); + let contracet_address_field = Field::new( + "contract_address", + contract_address_array.data_type().clone(), + true, + ); let data_field = Field::new("data", data_array.data_type().clone(), true); let topics_field = Field::new("topics", topics_array.data_type().clone(), true); @@ -107,6 +113,9 @@ impl LogsExporter { log_index_field, transaction_hash_field, transaction_index_field, + block_hash_field, + block_number_field, + contracet_address_field, data_field, topics_field, ]); @@ -114,6 +123,9 @@ impl LogsExporter { log_index_array.boxed(), transaction_hash_array.boxed(), transaction_index_array.boxed(), + block_hash_array.boxed(), + block_number_array.boxed(), + contract_address_array.boxed(), data_array.boxed(), topics_array.boxed(), ])?; diff --git a/ethetl/src/exporters/mod.rs b/ethetl/src/exporters/mod.rs index 75b50d7..d837b73 100644 --- a/ethetl/src/exporters/mod.rs +++ b/ethetl/src/exporters/mod.rs @@ -33,28 +33,10 @@ pub use pipeline::Pipeline; pub use receipts::ReceiptExporter; pub use token_transfers::TokenTransferExporter; pub use transactions::TransactionExporter; -use web3::types::Bytes; -use web3::types::H256; pub use worker::Worker; use crate::contexts::ContextRef; -pub const TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX: &str = - "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"; - -pub fn h256_to_hex(v: &H256) -> String { - let hex = v - .as_bytes() - .iter() - .map(|x| format!("{:02x}", x)) - .collect::(); - hex.trim_start_matches('0').to_string() -} - -pub fn bytes_to_hex(v: &Bytes) -> String { - v.0.iter().map(|x| format!("{:02x}", x)).collect::() -} - pub async fn write_file( ctx: &ContextRef, path: &str, diff --git a/ethetl/src/exporters/nft_transfers.rs b/ethetl/src/exporters/nft_transfers.rs index d678dee..449d335 100644 --- a/ethetl/src/exporters/nft_transfers.rs +++ b/ethetl/src/exporters/nft_transfers.rs @@ -17,6 +17,8 @@ use arrow2::array::Utf8Array; use arrow2::chunk::Chunk; use arrow2::datatypes::Field; use arrow2::datatypes::Schema; +use common_eth::h256_to_hex; +use common_eth::ERC20_TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX; use common_exceptions::Result; use web3::types::TransactionReceipt; use web3::types::H256; @@ -24,9 +26,7 @@ use web3::types::U256; use web3::types::U64; use crate::contexts::ContextRef; -use crate::exporters::h256_to_hex; use crate::exporters::write_file; -use crate::exporters::TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX; pub struct NftTransferExporter { ctx: ContextRef, @@ -62,7 +62,9 @@ impl NftTransferExporter { // NFT token transfer contract address. let topic_0 = format!("{:#x}", topics[0]); - if topic_0.as_str() == TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX && topics.len() == 4 { + if topic_0.as_str() == ERC20_TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX + && topics.len() == 4 + { token_address_vec.push(format!("{:#x}", logs.address)); from_address_vec.push(format!("0x{}", h256_to_hex(&topics[1]))); to_address_vec.push(format!("0x{}", h256_to_hex(&topics[2]))); diff --git a/ethetl/src/exporters/token_transfers.rs b/ethetl/src/exporters/token_transfers.rs index 13d41b6..d674c9d 100644 --- a/ethetl/src/exporters/token_transfers.rs +++ b/ethetl/src/exporters/token_transfers.rs @@ -17,6 +17,9 @@ use arrow2::array::Utf8Array; use arrow2::chunk::Chunk; use arrow2::datatypes::Field; use arrow2::datatypes::Schema; +use common_eth::bytes_to_hex; +use common_eth::h256_to_hex; +use common_eth::ERC20_TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX; use common_exceptions::Result; use web3::types::TransactionReceipt; use web3::types::H256; @@ -24,10 +27,7 @@ use web3::types::U256; use web3::types::U64; use crate::contexts::ContextRef; -use crate::exporters::bytes_to_hex; -use crate::exporters::h256_to_hex; use crate::exporters::write_file; -use crate::exporters::TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX; pub struct TokenTransferExporter { ctx: ContextRef, @@ -62,7 +62,9 @@ impl TokenTransferExporter { // Token transfer contract address. let topic_0 = format!("{:#x}", topics[0]); - if topic_0.as_str() == TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX && topics.len() == 3 { + if topic_0.as_str() == ERC20_TOKEN_TRANSFER_CONTRACT_ADDRESS_HEX + && topics.len() == 3 + { token_address_vec.push(format!("{:#x}", logs.address)); from_address_vec.push(format!("0x{}", h256_to_hex(&topics[1]))); to_address_vec.push(format!("0x{}", h256_to_hex(&topics[2]))); diff --git a/ethetl/src/exporters/transactions.rs b/ethetl/src/exporters/transactions.rs index f4aa957..9404186 100644 --- a/ethetl/src/exporters/transactions.rs +++ b/ethetl/src/exporters/transactions.rs @@ -20,6 +20,7 @@ use arrow2::array::Utf8Array; use arrow2::chunk::Chunk; use arrow2::datatypes::Field; use arrow2::datatypes::Schema; +use common_eth::bytes_to_hex; use common_exceptions::Result; use web3::ethabi::Address; use web3::types::Block; @@ -29,7 +30,6 @@ use web3::types::U256; use web3::types::U64; use crate::contexts::ContextRef; -use crate::exporters::bytes_to_hex; use crate::exporters::write_file; pub struct TransactionExporter { diff --git a/schemas/databend/logs.sql b/schemas/databend/logs.sql new file mode 100644 index 0000000..7768c28 --- /dev/null +++ b/schemas/databend/logs.sql @@ -0,0 +1,10 @@ +CREATE TABLE IF NOT EXISTS logs ( + log_index BIGINT UNSIGNED, + transaction_hash STRING, + transaction_index BIGINT UNSIGNED, + block_hash STRING, + block_number STRING, + contract_adderss STRING, + data STRING, + topics STRING +); \ No newline at end of file