From 6d9fbeecefaaa27566e3cf3cc8bd4ae40cf8b38f Mon Sep 17 00:00:00 2001 From: lambda-0x <0xlambda@protonmail.com> Date: Thu, 28 Nov 2024 16:50:35 +0530 Subject: [PATCH] fix(torii): add erc20 patch for eth mainnet token commit-id:93263be6 --- crates/torii/core/src/engine.rs | 14 ++++++- crates/torii/core/src/executor/erc.rs | 41 +++++++++++++++++-- crates/torii/core/src/executor/mod.rs | 5 ++- .../src/processors/erc20_legacy_transfer.rs | 3 +- .../core/src/processors/erc20_transfer.rs | 3 +- .../src/processors/erc721_legacy_transfer.rs | 14 +++++-- .../core/src/processors/erc721_transfer.rs | 14 +++++-- crates/torii/core/src/sql/erc.rs | 11 +++-- 8 files changed, 89 insertions(+), 16 deletions(-) diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index 86730665f3..b756645793 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -169,6 +169,17 @@ pub enum FetchDataResult { None, } +impl FetchDataResult { + pub fn block_id(&self) -> BlockId { + match self { + FetchDataResult::Range(range) => BlockId::Number(range.latest_block_number), + FetchDataResult::Pending(_pending) => BlockId::Tag(BlockTag::Pending), + // we dont require block_id when result is none, so this is a dummy value + FetchDataResult::None => BlockId::Number(0), + } + } +} + #[derive(Debug)] pub struct FetchRangeResult { // (block_number, transaction_hash) -> events @@ -263,10 +274,11 @@ impl Engine

{ info!(target: LOG_TARGET, "Syncing reestablished."); } + let block_id = fetch_result.block_id(); match self.process(fetch_result).await { Ok(_) => { self.db.flush().await?; - self.db.apply_cache_diff().await?; + self.db.apply_cache_diff(block_id).await?; self.db.execute().await?; }, Err(e) => { diff --git a/crates/torii/core/src/executor/erc.rs b/crates/torii/core/src/executor/erc.rs index a7d91e478a..cb995fbaff 100644 --- a/crates/torii/core/src/executor/erc.rs +++ b/crates/torii/core/src/executor/erc.rs @@ -10,10 +10,11 @@ use starknet::core::types::{BlockId, BlockTag, FunctionCall, U256}; use starknet::core::utils::{get_selector_from_name, parse_cairo_short_string}; use starknet::providers::Provider; use starknet_crypto::Felt; -use tracing::{debug, error, trace}; +use tracing::{debug, error, trace, warn}; use super::{ApplyBalanceDiffQuery, Executor}; use crate::constants::{IPFS_CLIENT_MAX_RETRY, SQL_FELT_DELIMITER, TOKEN_BALANCE_TABLE}; +use crate::executor::LOG_TARGET; use crate::sql::utils::{felt_to_sql_string, sql_string_to_u256, u256_to_sql_string, I256}; use crate::types::ContractType; use crate::utils::fetch_content_from_ipfs; @@ -46,6 +47,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { pub async fn apply_balance_diff( &mut self, apply_balance_diff: ApplyBalanceDiffQuery, + provider: Arc

, ) -> Result<()> { let erc_cache = apply_balance_diff.erc_cache; for ((contract_type, id_str), balance) in erc_cache.iter() { @@ -66,6 +68,8 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { contract_address, token_id, balance, + Arc::clone(&provider), + apply_balance_diff.block_id, ) .await .with_context(|| "Failed to apply balance diff in apply_cache_diff")?; @@ -83,6 +87,8 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { contract_address, token_id, balance, + Arc::clone(&provider), + apply_balance_diff.block_id, ) .await .with_context(|| "Failed to apply balance diff in apply_cache_diff")?; @@ -93,6 +99,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { Ok(()) } + #[allow(clippy::too_many_arguments)] pub async fn apply_balance_diff_helper( &mut self, id: &str, @@ -100,6 +107,8 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { contract_address: &str, token_id: &str, balance_diff: &I256, + provider: Arc

, + block_id: BlockId, ) -> Result<()> { let tx = &mut self.transaction; let balance: Option<(String,)> = @@ -116,9 +125,35 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { if balance_diff.is_negative { if balance < balance_diff.value { - dbg!(&balance_diff, balance, id); + // HACK: ideally we should never hit this case. But ETH on starknet mainnet didn't + // emit transfer events properly so they are broken. For those cases + // we manually fetch the balance of the address using RPC + + let current_balance = provider + .call( + FunctionCall { + contract_address: Felt::from_str(contract_address).unwrap(), + entry_point_selector: get_selector_from_name("balanceOf").unwrap(), + calldata: vec![Felt::from_str(account_address).unwrap()], + }, + block_id, + ) + .await + .with_context(|| format!("Failed to fetch balance for id: {}", id))?; + + let current_balance = + cainome::cairo_serde::U256::cairo_deserialize(¤t_balance, 0).unwrap(); + + warn!( + target: LOG_TARGET, + id = id, + "Invalid transfer event detected, overriding balance by querying RPC directly" + ); + // override the balance from onchain data + balance = U256::from_words(current_balance.low, current_balance.high); + } else { + balance -= balance_diff.value; } - balance -= balance_diff.value; } else { balance += balance_diff.value; } diff --git a/crates/torii/core/src/executor/mod.rs b/crates/torii/core/src/executor/mod.rs index 18c76f6d64..475f62a433 100644 --- a/crates/torii/core/src/executor/mod.rs +++ b/crates/torii/core/src/executor/mod.rs @@ -61,6 +61,7 @@ pub struct DeleteEntityQuery { #[derive(Debug, Clone)] pub struct ApplyBalanceDiffQuery { pub erc_cache: HashMap<(ContractType, String), I256>, + pub block_id: BlockId, } #[derive(Debug, Clone)] @@ -613,7 +614,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { QueryType::ApplyBalanceDiff(apply_balance_diff) => { debug!(target: LOG_TARGET, "Applying balance diff."); let instant = Instant::now(); - self.apply_balance_diff(apply_balance_diff).await?; + self.apply_balance_diff(apply_balance_diff, self.provider.clone()).await?; debug!(target: LOG_TARGET, duration = ?instant.elapsed(), "Applied balance diff."); } QueryType::RegisterErc721Token(register_erc721_token) => { @@ -685,6 +686,8 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { self.register_tasks.spawn(async move { let permit = semaphore.acquire().await.unwrap(); + let span = tracing::span!(tracing::Level::INFO, "contract_address_span", contract_address = %register_erc721_token.contract_address); + let _enter = span.enter(); let result = Self::process_register_erc721_token_query( register_erc721_token, diff --git a/crates/torii/core/src/processors/erc20_legacy_transfer.rs b/crates/torii/core/src/processors/erc20_legacy_transfer.rs index 4ed17416bc..903836c65d 100644 --- a/crates/torii/core/src/processors/erc20_legacy_transfer.rs +++ b/crates/torii/core/src/processors/erc20_legacy_transfer.rs @@ -38,7 +38,7 @@ where &self, world: &WorldContractReader

, db: &mut Sql, - _block_number: u64, + block_number: u64, block_timestamp: u64, event_id: &str, event: &Event, @@ -59,6 +59,7 @@ where world.provider(), block_timestamp, event_id, + block_number, ) .await?; debug!(target: LOG_TARGET,from = ?from, to = ?to, value = ?value, "Legacy ERC20 Transfer"); diff --git a/crates/torii/core/src/processors/erc20_transfer.rs b/crates/torii/core/src/processors/erc20_transfer.rs index 64f50d13a2..014e131a3b 100644 --- a/crates/torii/core/src/processors/erc20_transfer.rs +++ b/crates/torii/core/src/processors/erc20_transfer.rs @@ -38,7 +38,7 @@ where &self, world: &WorldContractReader

, db: &mut Sql, - _block_number: u64, + block_number: u64, block_timestamp: u64, event_id: &str, event: &Event, @@ -59,6 +59,7 @@ where world.provider(), block_timestamp, event_id, + block_number, ) .await?; debug!(target: LOG_TARGET,from = ?from, to = ?to, value = ?value, "ERC20 Transfer"); diff --git a/crates/torii/core/src/processors/erc721_legacy_transfer.rs b/crates/torii/core/src/processors/erc721_legacy_transfer.rs index ebbf4d6b53..34106c5eb5 100644 --- a/crates/torii/core/src/processors/erc721_legacy_transfer.rs +++ b/crates/torii/core/src/processors/erc721_legacy_transfer.rs @@ -38,7 +38,7 @@ where &self, _world: &WorldContractReader

, db: &mut Sql, - _block_number: u64, + block_number: u64, block_timestamp: u64, event_id: &str, event: &Event, @@ -51,8 +51,16 @@ where let token_id = U256Cainome::cairo_deserialize(&event.data, 2)?; let token_id = U256::from_words(token_id.low, token_id.high); - db.handle_erc721_transfer(token_address, from, to, token_id, block_timestamp, event_id) - .await?; + db.handle_erc721_transfer( + token_address, + from, + to, + token_id, + block_timestamp, + event_id, + block_number, + ) + .await?; debug!(target: LOG_TARGET, from = ?from, to = ?to, token_id = ?token_id, "ERC721 Transfer"); Ok(()) diff --git a/crates/torii/core/src/processors/erc721_transfer.rs b/crates/torii/core/src/processors/erc721_transfer.rs index a0f56479a9..47205d2ed9 100644 --- a/crates/torii/core/src/processors/erc721_transfer.rs +++ b/crates/torii/core/src/processors/erc721_transfer.rs @@ -38,7 +38,7 @@ where &self, _world: &WorldContractReader

, db: &mut Sql, - _block_number: u64, + block_number: u64, block_timestamp: u64, event_id: &str, event: &Event, @@ -51,8 +51,16 @@ where let token_id = U256Cainome::cairo_deserialize(&event.keys, 3)?; let token_id = U256::from_words(token_id.low, token_id.high); - db.handle_erc721_transfer(token_address, from, to, token_id, block_timestamp, event_id) - .await?; + db.handle_erc721_transfer( + token_address, + from, + to, + token_id, + block_timestamp, + event_id, + block_number, + ) + .await?; debug!(target: LOG_TARGET, from = ?from, to = ?to, token_id = ?token_id, "ERC721 Transfer"); Ok(()) diff --git a/crates/torii/core/src/sql/erc.rs b/crates/torii/core/src/sql/erc.rs index cf94e005f0..740c564b1a 100644 --- a/crates/torii/core/src/sql/erc.rs +++ b/crates/torii/core/src/sql/erc.rs @@ -29,6 +29,7 @@ impl Sql { provider: &P, block_timestamp: u64, event_id: &str, + block_number: u64, ) -> Result<()> { // contract_address let token_id = felt_to_sql_string(&contract_address); @@ -66,10 +67,11 @@ impl Sql { self.local_cache.erc_cache.entry((ContractType::ERC20, to_balance_id)).or_default(); *to_balance += I256::from(amount); } + let block_id = BlockId::Number(block_number); if self.local_cache.erc_cache.len() >= 100000 { self.flush().await.with_context(|| "Failed to flush in handle_erc20_transfer")?; - self.apply_cache_diff().await?; + self.apply_cache_diff(block_id).await?; } Ok(()) @@ -84,6 +86,7 @@ impl Sql { token_id: U256, block_timestamp: u64, event_id: &str, + block_number: u64, ) -> Result<()> { // contract_address:id let actual_token_id = token_id; @@ -127,10 +130,11 @@ impl Sql { .or_default(); *to_balance += I256::from(1u8); } + let block_id = BlockId::Number(block_number); if self.local_cache.erc_cache.len() >= 100000 { self.flush().await.with_context(|| "Failed to flush in handle_erc721_transfer")?; - self.apply_cache_diff().await?; + self.apply_cache_diff(block_id).await?; } Ok(()) @@ -272,7 +276,7 @@ impl Sql { Ok(()) } - pub async fn apply_cache_diff(&mut self) -> Result<()> { + pub async fn apply_cache_diff(&mut self, block_id: BlockId) -> Result<()> { if !self.local_cache.erc_cache.is_empty() { self.executor.send(QueryMessage::new( "".to_string(), @@ -282,6 +286,7 @@ impl Sql { &mut self.local_cache.erc_cache, HashMap::with_capacity(64), ), + block_id, }), ))?; }