diff --git a/Cargo.lock b/Cargo.lock index e43a46353c..6aeb2c4a20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -824,6 +824,15 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" +[[package]] +name = "ansi_term" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" +dependencies = [ + "winapi", +] + [[package]] name = "anstream" version = "0.6.14" @@ -2173,7 +2182,7 @@ dependencies = [ "starknet-types-core", "thiserror", "tracing", - "tracing-subscriber", + "tracing-subscriber 0.3.18", "url", ] @@ -2198,7 +2207,7 @@ dependencies = [ "starknet-types-core", "thiserror", "tracing", - "tracing-subscriber", + "tracing-subscriber 0.3.18", "url", ] @@ -2498,7 +2507,7 @@ dependencies = [ "tower-lsp", "tracing", "tracing-chrome", - "tracing-subscriber", + "tracing-subscriber 0.3.18", ] [[package]] @@ -6716,7 +6725,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.7", "tokio", "tower-service", "tracing", @@ -7757,7 +7766,7 @@ dependencies = [ "shellexpand", "tokio", "tracing", - "tracing-subscriber", + "tracing-subscriber 0.3.18", "url", ] @@ -9004,6 +9013,15 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" +[[package]] +name = "matchers" +version = "0.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "matchers" version = "0.1.0" @@ -11949,7 +11967,7 @@ dependencies = [ "starknet-crypto 0.7.1", "tokio", "tracing", - "tracing-subscriber", + "tracing-subscriber 0.3.18", "url", ] @@ -12087,7 +12105,7 @@ dependencies = [ "toml 0.8.15", "toml_edit 0.22.16", "tracing", - "tracing-subscriber", + "tracing-subscriber 0.3.18", "typed-builder", "url", "walkdir", @@ -12943,7 +12961,7 @@ dependencies = [ "tokio", "tracing", "tracing-log 0.1.4", - "tracing-subscriber", + "tracing-subscriber 0.3.18", "url", ] @@ -14060,7 +14078,7 @@ checksum = "3dffced63c2b5c7be278154d76b479f9f9920ed34e7574201407f0b14e2bbb93" dependencies = [ "env_logger 0.11.3", "test-log-macros", - "tracing-subscriber", + "tracing-subscriber 0.3.18", ] [[package]] @@ -14581,7 +14599,7 @@ dependencies = [ "tower", "tower-http", "tracing", - "tracing-subscriber", + "tracing-subscriber 0.3.18", "url", "webbrowser", ] @@ -14654,6 +14672,7 @@ dependencies = [ "tokio-stream", "tokio-util", "tracing", + "tracing-test", ] [[package]] @@ -14779,7 +14798,7 @@ dependencies = [ "tokio", "torii-core", "tracing", - "tracing-subscriber", + "tracing-subscriber 0.3.18", "tracing-wasm", "wasm-bindgen-futures", "wasm-bindgen-test", @@ -14934,7 +14953,7 @@ checksum = "bf0a738ed5d6450a9fb96e86a23ad808de2b727fd1394585da5cdd6788ffe724" dependencies = [ "serde_json", "tracing-core", - "tracing-subscriber", + "tracing-subscriber 0.3.18", ] [[package]] @@ -14979,13 +14998,35 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-subscriber" +version = "0.2.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e0d2eaa99c3c2e41547cfa109e910a68ea03823cccad4a0525dcbc9b01e8c71" +dependencies = [ + "ansi_term", + "chrono", + "lazy_static", + "matchers 0.0.1", + "regex", + "serde", + "serde_json", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log 0.1.4", + "tracing-serde", +] + [[package]] name = "tracing-subscriber" version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ - "matchers", + "matchers 0.1.0", "nu-ansi-term", "once_cell", "regex", @@ -15000,6 +15041,29 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "tracing-test" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3b48778c2d401c6a7fcf38a0e3c55dc8e8e753cbd381044a8cdb6fd69a29f53" +dependencies = [ + "lazy_static", + "tracing-core", + "tracing-subscriber 0.2.25", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c49adbab879d2e0dd7f75edace5f0ac2156939ecb7e6a1e8fa14e53728328c48" +dependencies = [ + "lazy_static", + "quote", + "syn 1.0.109", +] + [[package]] name = "tracing-wasm" version = "0.2.1" @@ -15007,7 +15071,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4575c663a174420fa2d78f4108ff68f65bf2fbb7dd89f33749b6e826b3626e07" dependencies = [ "tracing", - "tracing-subscriber", + "tracing-subscriber 0.3.18", "wasm-bindgen", ] diff --git a/Cargo.toml b/Cargo.toml index cabc141fed..5b25146ed4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -206,6 +206,7 @@ tower = "0.4.13" tower-http = "0.4.4" tracing = "0.1.34" tracing-subscriber = { version = "0.3.16", features = [ "env-filter", "json" ] } +tracing-test = "0.1" url = { version = "2.4.0", features = [ "serde" ] } # server diff --git a/bin/torii/src/main.rs b/bin/torii/src/main.rs index 40a3514fd9..1675482a8c 100644 --- a/bin/torii/src/main.rs +++ b/bin/torii/src/main.rs @@ -170,7 +170,7 @@ async fn main() -> anyhow::Result<()> { let provider: Arc<_> = JsonRpcClient::new(HttpTransport::new(args.rpc)).into(); // Get world address - let world = WorldContractReader::new(args.world_address, &provider); + let world = WorldContractReader::new(args.world_address, Arc::clone(&provider)); let db = Sql::new(pool.clone(), args.world_address).await?; @@ -193,7 +193,7 @@ async fn main() -> anyhow::Result<()> { let mut engine = Engine::new( world, db.clone(), - &provider, + Arc::clone(&provider), processors, EngineConfig { start_block: args.start_block, @@ -217,7 +217,7 @@ async fn main() -> anyhow::Result<()> { let mut libp2p_relay_server = torii_relay::server::Relay::new( db, - provider.clone(), + Arc::clone(&provider), args.relay_port, args.relay_webrtc_port, args.relay_websocket_port, diff --git a/crates/torii/core/Cargo.toml b/crates/torii/core/Cargo.toml index a22ccfcc9c..ae5d81b6a5 100644 --- a/crates/torii/core/Cargo.toml +++ b/crates/torii/core/Cargo.toml @@ -38,6 +38,7 @@ thiserror.workspace = true tokio = { version = "1.32.0", features = [ "sync" ], default-features = true } tokio-stream = "0.1.11" tokio-util = "0.7.7" +tracing-test.workspace = true tracing.workspace = true [dev-dependencies] diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index cac214c602..bc5c3d4f28 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -1,24 +1,30 @@ use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; +use std::sync::Arc; use std::time::Duration; -use anyhow::Result; +use anyhow::{Context, Result}; use dojo_world::contracts::world::WorldContractReader; use hashlink::LinkedHashMap; use starknet::core::types::{ BlockId, BlockTag, EmittedEvent, Event, EventFilter, Felt, MaybePendingBlockWithReceipts, MaybePendingBlockWithTxHashes, PendingBlockWithReceipts, ReceiptBlock, TransactionReceipt, - TransactionReceiptWithBlockInfo, TransactionWithReceipt, + TransactionReceiptWithBlockInfo, }; +use starknet::macros::selector; use starknet::providers::Provider; +use starknet_crypto::poseidon_hash_many; use tokio::sync::broadcast::Sender; use tokio::sync::mpsc::Sender as BoundedSender; +use tokio::task::{self, JoinHandle}; use tokio::time::sleep; use tracing::{debug, error, info, trace, warn}; use crate::processors::event_message::EventMessageProcessor; -use crate::processors::{BlockProcessor, EventProcessor, TransactionProcessor}; +use crate::processors::NUM_KEYS_INDEX; +use crate::processors::{BlockProcessor, EventProcessor, TransactionProcessor, ENTITY_ID_INDEX}; use crate::sql::Sql; +use num_traits::ToPrimitive; #[allow(missing_debug_implementations)] pub struct Processors { @@ -84,11 +90,11 @@ pub struct FetchPendingResult { } #[allow(missing_debug_implementations)] -pub struct Engine { +pub struct Engine { world: WorldContractReader

, db: Sql, provider: Box

, - processors: Processors

, + processors: Arc>, config: EngineConfig, shutdown_tx: Sender<()>, block_tx: Option>, @@ -99,7 +105,7 @@ struct UnprocessedEvent { data: Vec, } -impl Engine

{ +impl Engine

{ pub fn new( world: WorldContractReader

, db: Sql, @@ -109,6 +115,7 @@ impl Engine

{ shutdown_tx: Sender<()>, block_tx: Option>, ) -> Self { + let processors = Arc::new(processors); Self { world, db, provider: Box::new(provider), processors, config, shutdown_tx, block_tx } } @@ -149,6 +156,9 @@ impl Engine

{ match self.process(fetch_result).await { Ok(()) => {} Err(e) => { + // TODO: we might not able able to properly handle this error case + // since we are trying to do things in parallel so we don't exactly + // know which task failed error!(target: LOG_TARGET, error = %e, "Processing fetched data."); erroring_out = true; sleep(backoff_delay).await; @@ -350,7 +360,14 @@ impl Engine

{ let mut last_pending_block_tx = data.last_pending_block_tx; let mut last_pending_block_world_tx = None; - let timestamp = data.pending_block.timestamp; + let block_timestamp = data.pending_block.timestamp; + let block_number = data.block_number; + + // We use BTreeMap because we want to process events with key 0 first + // because they contain events like register_model which are required for other events + let mut map = BTreeMap::>::new(); + let modulo = 8u64; + let other = 0u64; for t in data.pending_block.transactions { let transaction_hash = t.transaction.transaction_hash(); @@ -363,40 +380,124 @@ impl Engine

{ continue; } - match self.process_transaction_with_receipt(&t, data.block_number, timestamp).await { - Err(e) => { - match e.to_string().as_str() { - "TransactionHashNotFound" => { - // We failed to fetch the transaction, which is because - // the transaction might not have been processed fast enough by the - // provider. So we can fail silently and try - // again in the next iteration. - warn!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Retrieving pending transaction receipt."); - self.db.set_head( - data.block_number - 1, - last_pending_block_world_tx, - last_pending_block_tx, - ); - return Ok(()); - } - _ => { - error!(target: LOG_TARGET, error = %e, transaction_hash = %format!("{:#x}", transaction_hash), "Processing pending transaction."); - return Err(e); - } - } + let events = match t.receipt { + TransactionReceipt::Invoke(receipt) => receipt.events, + TransactionReceipt::L1Handler(receipt) => receipt.events, + _ => { + continue; } - Ok(true) => { + }; + + for event in events { + if event.from_address == self.world.address { last_pending_block_world_tx = Some(*transaction_hash); - last_pending_block_tx = Some(*transaction_hash); - info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Processed pending world transaction."); } - Ok(_) => { - last_pending_block_tx = Some(*transaction_hash); - info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Processed pending transaction.") + last_pending_block_tx = Some(*transaction_hash); + let event_name = event_type_from_felt(event.keys[0]); + let entity_id = match event_name { + EventType::StoreSetRecord => { + // let selector = event.data[MODEL_INDEX]; + + let keys_start = NUM_KEYS_INDEX + 1; + let keys_end: usize = keys_start + + event.data[NUM_KEYS_INDEX].to_usize().context("invalid usize")?; + + let keys = event.data[keys_start..keys_end].to_vec(); + let entity_id = poseidon_hash_many(&keys); + entity_id.to_raw()[3] % modulo + 1 + } + EventType::StoreDeleteRecord => { + let entity_id = event.data[ENTITY_ID_INDEX]; + entity_id.to_raw()[3] % modulo + 1 + } + EventType::StoreUpdateMember => { + let entity_id = event.data[ENTITY_ID_INDEX]; + entity_id.to_raw()[3] % modulo + 1 + } + EventType::StoreUpdateRecord => { + let entity_id = event.data[ENTITY_ID_INDEX]; + entity_id.to_raw()[3] % modulo + 1 + } + EventType::Other => other, + }; + + map.entry(entity_id as u8).or_default().push(( + event, + *transaction_hash, + block_number, + block_timestamp, + )); + } + + // TODO: remove this after implementation + // match self.process_transaction_with_receipt(&t, data.block_number, timestamp).await { + // Ok(true) => { + // last_pending_block_world_tx = Some(*transaction_hash); + // last_pending_block_tx = Some(*transaction_hash); + // info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Processed pending world transaction."); + // } + // Ok(_) => { + // last_pending_block_tx = Some(*transaction_hash); + // info!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Processed pending transaction.") + // } + // } + } + + let mut tasks = Vec::new(); + + // loop over the collected events + for (id, events) in map.into_iter() { + let mut db = self.db.clone(); + let processors = Arc::clone(&self.processors); + let world = + WorldContractReader::new(self.world.address, self.provider.as_ref().clone()); + + let task: JoinHandle> = task::spawn(async move { + for (event_idx, (event, transaction_hash, block_number, block_timestamp)) in + events.iter().enumerate() + { + if db.query_queue.queue.len() >= QUERY_QUEUE_BATCH_SIZE { + db.execute().await?; + } + + let event_id = format!( + "{:#064x}:{:#x}:{:#04x}", + block_number, transaction_hash, event_idx + ); + let event = Event { + from_address: event.from_address, + keys: event.keys.clone(), + data: event.data.clone(), + }; + + process_event( + *block_number, + *block_timestamp, + &event_id, + &event, + *transaction_hash, + &mut db, + Arc::clone(&processors), + &world, + ) + .await? } + + db.execute().await?; + Ok(()) + }); + + if id as u64 == other { + task.await??; + } else { + tasks.push(task); } } + for task in tasks { + task.await??; + } + // Set the head to the last processed pending transaction // Head block number should still be latest block number self.db.set_head(data.block_number - 1, last_pending_block_world_tx, last_pending_block_tx); @@ -407,33 +508,129 @@ impl Engine

{ pub async fn process_range(&mut self, data: FetchRangeResult) -> Result<()> { // Process all transactions - let mut last_block = 0; - for ((block_number, transaction_hash), events) in data.transactions { - debug!("Processing transaction hash: {:#x}", transaction_hash); - // Process transaction - // let transaction = self.provider.get_transaction_by_hash(transaction_hash).await?; - - self.process_transaction_with_events( - transaction_hash, - events.as_slice(), - block_number, - data.blocks[&block_number], - ) - .await?; + // StoreSetRecord | StoreDeleteRecord | StoreUpdateMember | StoreUpdateRecord | Other + + let modulo = 8u64; + let other = 0u64; + let mut map = BTreeMap::>::new(); + + for ((block_number, _), events) in data.transactions { + let block_timestamp = data.blocks[&block_number]; + for event in events { + let event_name = event_type_from_felt(event.keys[0]); + let entity_id = match event_name { + EventType::StoreSetRecord => { + let keys_start = NUM_KEYS_INDEX + 1; + let keys_end: usize = keys_start + + event.data[NUM_KEYS_INDEX].to_usize().context("invalid usize")?; + + let keys = event.data[keys_start..keys_end].to_vec(); + let entity_id = poseidon_hash_many(&keys); + entity_id.to_raw()[3] % modulo + 1 + } + EventType::StoreDeleteRecord => { + let entity_id = event.data[ENTITY_ID_INDEX]; + entity_id.to_raw()[3] % modulo + 1 + } + EventType::StoreUpdateMember => { + let entity_id = event.data[ENTITY_ID_INDEX]; + entity_id.to_raw()[3] % modulo + 1 + } + EventType::StoreUpdateRecord => { + let entity_id = event.data[ENTITY_ID_INDEX]; + entity_id.to_raw()[3] % modulo + 1 + } + EventType::Other => other, + }; + + map.entry(entity_id as u8).or_default().push(( + event, + block_number, + block_timestamp, + )); + } + } + + let mut tasks = Vec::new(); - // Process block - if block_number > last_block { - if let Some(ref block_tx) = self.block_tx { - block_tx.send(block_number).await?; + // loop over the collected events + for (id, events) in map.into_iter() { + let mut db = self.db.clone(); + let processors = Arc::clone(&self.processors); + let world = + WorldContractReader::new(self.world.address, self.provider.as_ref().clone()); + + let task: JoinHandle> = task::spawn(async move { + for (event_idx, (event, block_number, block_timestamp)) in events.iter().enumerate() + { + if db.query_queue.queue.len() >= QUERY_QUEUE_BATCH_SIZE { + db.execute().await?; + } + + let transaction_hash = event.transaction_hash; + let event_id = format!( + "{:#064x}:{:#x}:{:#04x}", + block_number, transaction_hash, event_idx + ); + let event = Event { + from_address: event.from_address, + keys: event.keys.clone(), + data: event.data.clone(), + }; + + process_event( + *block_number, + *block_timestamp, + &event_id, + &event, + transaction_hash, + &mut db, + Arc::clone(&processors), + &world, + ) + .await? } + db.execute().await?; + Ok(()) + }); - self.process_block(block_number, data.blocks[&block_number]).await?; - last_block = block_number; + if id as u64 == other { + task.await??; + } else { + tasks.push(task); } + } + + for task in tasks { + task.await??; + } - if self.db.query_queue.queue.len() >= QUERY_QUEUE_BATCH_SIZE { - self.db.execute().await?; + // TODO: remove this after implementation + // for ((block_number, transaction_hash), events) in data.transactions { + // debug!("Processing transaction hash: {:#x}", transaction_hash); + // // Process transaction + // // let transaction = self.provider.get_transaction_by_hash(transaction_hash).await?; + + // self.process_transaction_with_events( + // transaction_hash, + // events.as_slice(), + // block_number, + // data.blocks[&block_number], + // ) + // .await?; + + // // Process block + // if self.db.query_queue.queue.len() >= QUERY_QUEUE_BATCH_SIZE { + // self.db.execute().await?; + // } + // } + + for (block_number, timestamp) in data.blocks.iter() { + if let Some(ref block_tx) = self.block_tx { + block_tx.send(*block_number).await?; } + + self.process_block(*block_number, *timestamp).await?; } // We return None for the pending_block_tx because our process_range @@ -455,97 +652,98 @@ impl Engine

{ } } - async fn process_transaction_with_events( - &mut self, - transaction_hash: Felt, - events: &[EmittedEvent], - block_number: u64, - block_timestamp: u64, - ) -> Result<()> { - for (event_idx, event) in events.iter().enumerate() { - let event_id = - format!("{:#064x}:{:#x}:{:#04x}", block_number, transaction_hash, event_idx); - - let event = Event { - from_address: event.from_address, - keys: event.keys.clone(), - data: event.data.clone(), - }; - Self::process_event( - self, - block_number, - block_timestamp, - &event_id, - &event, - transaction_hash, - ) - .await?; - } + // async fn process_transaction_with_events( + // &mut self, + // transaction_hash: Felt, + // events: &[EmittedEvent], + // block_number: u64, + // block_timestamp: u64, + // ) -> Result<()> { + // for (event_idx, event) in events.iter().enumerate() { + // let event_id = + // format!("{:#064x}:{:#x}:{:#04x}", block_number, transaction_hash, event_idx); + + // let event = Event { + // from_address: event.from_address, + // keys: event.keys.clone(), + // data: event.data.clone(), + // }; + // Self::process_event( + // self, + // block_number, + // block_timestamp, + // &event_id, + // &event, + // transaction_hash, + // ) + // .await?; + // } - // Commented out this transaction processor because it requires an RPC call for each - // transaction which is slowing down the sync process by alot. - // Self::process_transaction( - // self, - // block_number, - // block_timestamp, - // transaction_hash, - // transaction, - // ) - // .await?; + // // Commented out this transaction processor because it requires an RPC call for each + // // transaction which is slowing down the sync process by alot. + // // Self::process_transaction( + // // self, + // // block_number, + // // block_timestamp, + // // transaction_hash, + // // transaction, + // // ) + // // .await?; + + // Ok(()) + // } - Ok(()) - } // Process a transaction and events from its receipt. // Returns whether the transaction has a world event. - async fn process_transaction_with_receipt( - &mut self, - transaction_with_receipt: &TransactionWithReceipt, - block_number: u64, - block_timestamp: u64, - ) -> Result { - let transaction_hash = transaction_with_receipt.transaction.transaction_hash(); - let events = match &transaction_with_receipt.receipt { - TransactionReceipt::Invoke(receipt) => Some(&receipt.events), - TransactionReceipt::L1Handler(receipt) => Some(&receipt.events), - _ => None, - }; - - let mut world_event = false; - if let Some(events) = events { - for (event_idx, event) in events.iter().enumerate() { - if event.from_address != self.world.address { - continue; - } - - world_event = true; - let event_id = - format!("{:#064x}:{:#x}:{:#04x}", block_number, *transaction_hash, event_idx); - - Self::process_event( - self, - block_number, - block_timestamp, - &event_id, - event, - *transaction_hash, - ) - .await?; - } - - // if world_event { - // Self::process_transaction( - // self, - // block_number, - // block_timestamp, - // transaction_hash, - // transaction, - // ) - // .await?; - // } - } + // async fn process_transaction_with_receipt( + // &mut self, + // transaction_with_receipt: &TransactionWithReceipt, + // block_number: u64, + // block_timestamp: u64, + // ) -> Result { + // let transaction_hash = transaction_with_receipt.transaction.transaction_hash(); + // let events = match &transaction_with_receipt.receipt { + // TransactionReceipt::Invoke(receipt) => Some(&receipt.events), + // TransactionReceipt::L1Handler(receipt) => Some(&receipt.events), + // _ => None, + // }; + + // let mut world_event = false; + // if let Some(events) = events { + // for (event_idx, event) in events.iter().enumerate() { + // if event.from_address != self.world.address { + // continue; + // } + + // world_event = true; + // let event_id = + // format!("{:#064x}:{:#x}:{:#04x}", block_number, *transaction_hash, event_idx); + + // Self::process_event( + // self, + // block_number, + // block_timestamp, + // &event_id, + // event, + // *transaction_hash, + // ) + // .await?; + // } + + // // if world_event { + // // Self::process_transaction( + // // self, + // // block_number, + // // block_timestamp, + // // transaction_hash, + // // transaction, + // // ) + // // .await?; + // // } + // } - Ok(world_event) - } + // Ok(world_event) + // } async fn process_block(&mut self, block_number: u64, block_timestamp: u64) -> Result<()> { for processor in &self.processors.block { @@ -580,62 +778,79 @@ impl Engine

{ // Ok(()) // } +} - async fn process_event( - &mut self, - block_number: u64, - block_timestamp: u64, - event_id: &str, - event: &Event, - transaction_hash: Felt, - ) -> Result<()> { - self.db.store_event(event_id, event, transaction_hash, block_timestamp); - let event_key = event.keys[0]; - - let Some(processor) = self.processors.event.get(&event_key) else { - // if we dont have a processor for this event, we try the catch all processor - if self.processors.catch_all_event.validate(event) { - if let Err(e) = self - .processors - .catch_all_event - .process( - &self.world, - &mut self.db, - block_number, - block_timestamp, - event_id, - event, - ) - .await - { - error!(target: LOG_TARGET, error = %e, "Processing catch all event processor."); - } - } else { - let unprocessed_event = UnprocessedEvent { - keys: event.keys.iter().map(|k| format!("{:#x}", k)).collect(), - data: event.data.iter().map(|d| format!("{:#x}", d)).collect(), - }; +fn event_type_from_felt(input: Felt) -> EventType { + let store_set = selector!("StoreSetRecord"); + let store_delete = selector!("StoreDeleteRecord"); + let store_update_member = selector!("StoreUpdateMember"); + let store_update_record = selector!("StoreUpdateRecord"); + + match input { + x if x == store_set => EventType::StoreSetRecord, + x if x == store_delete => EventType::StoreDeleteRecord, + x if x == store_update_member => EventType::StoreUpdateMember, + x if x == store_update_record => EventType::StoreUpdateRecord, + _ => EventType::Other, + } +} - trace!( - target: LOG_TARGET, - keys = ?unprocessed_event.keys, - data = ?unprocessed_event.data, - "Unprocessed event.", - ); - } +enum EventType { + StoreSetRecord, + StoreDeleteRecord, + StoreUpdateMember, + StoreUpdateRecord, + Other, +} - return Ok(()); - }; +#[allow(clippy::too_many_arguments)] +async fn process_event( + block_number: u64, + block_timestamp: u64, + event_id: &str, + event: &Event, + transaction_hash: Felt, + db: &mut Sql, + processors: Arc>, + world: &WorldContractReader

, +) -> Result<()> { + db.store_event(event_id, event, transaction_hash, block_timestamp); + let event_key = event.keys[0]; + + let Some(processor) = processors.event.get(&event_key) else { + // if we dont have a processor for this event, we try the catch all processor + if processors.catch_all_event.validate(event) { + if let Err(e) = processors + .catch_all_event + .process(world, db, block_number, block_timestamp, event_id, event) + .await + { + error!(target: LOG_TARGET, error = %e, "Processing catch all event processor."); + } + } else { + let unprocessed_event = UnprocessedEvent { + keys: event.keys.iter().map(|k| format!("{:#x}", k)).collect(), + data: event.data.iter().map(|d| format!("{:#x}", d)).collect(), + }; - // if processor.validate(event) { - if let Err(e) = processor - .process(&self.world, &mut self.db, block_number, block_timestamp, event_id, event) - .await - { - error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, "Processing event."); + trace!( + target: LOG_TARGET, + keys = ?unprocessed_event.keys, + data = ?unprocessed_event.data, + "Unprocessed event.", + ); } - // } - Ok(()) + return Ok(()); + }; + + // if processor.validate(event) { + if let Err(e) = + processor.process(world, db, block_number, block_timestamp, event_id, event).await + { + error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, "Processing event."); } + // } + + Ok(()) } diff --git a/crates/torii/core/src/processors/mod.rs b/crates/torii/core/src/processors/mod.rs index a860364360..2f939e2dca 100644 --- a/crates/torii/core/src/processors/mod.rs +++ b/crates/torii/core/src/processors/mod.rs @@ -18,12 +18,12 @@ pub mod store_transaction; pub mod store_update_member; pub mod store_update_record; -const MODEL_INDEX: usize = 0; -const NUM_KEYS_INDEX: usize = 1; -const ENTITY_ID_INDEX: usize = 1; +pub(crate) const MODEL_INDEX: usize = 0; +pub(crate) const NUM_KEYS_INDEX: usize = 1; +pub(crate) const ENTITY_ID_INDEX: usize = 1; #[async_trait] -pub trait EventProcessor

+pub trait EventProcessor

: Send + Sync where P: Provider + Sync, { @@ -48,7 +48,7 @@ where } #[async_trait] -pub trait BlockProcessor { +pub trait BlockProcessor: Send + Sync { fn get_block_number(&self) -> String; async fn process( &self, @@ -60,7 +60,7 @@ pub trait BlockProcessor { } #[async_trait] -pub trait TransactionProcessor { +pub trait TransactionProcessor: Send + Sync { #[allow(clippy::too_many_arguments)] async fn process( &self, diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index dea2a7be92..687c100afc 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -33,7 +33,7 @@ pub const FELT_DELIMITER: &str = "/"; #[path = "sql_test.rs"] mod test; -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct Sql { world_address: Felt, pub pool: Pool, @@ -41,6 +41,17 @@ pub struct Sql { model_cache: Arc, } +impl Clone for Sql { + fn clone(&self) -> Self { + Sql { + world_address: self.world_address, + pool: self.pool.clone(), + query_queue: QueryQueue::new(self.pool.clone()), + model_cache: self.model_cache.clone(), + } + } +} + impl Sql { pub async fn new(pool: Pool, world_address: Felt) -> Result { let mut query_queue = QueryQueue::new(pool.clone()); @@ -193,6 +204,9 @@ impl Sql { ?, ?, ?) ON CONFLICT(id) DO UPDATE SET \ updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \ event_id=EXCLUDED.event_id RETURNING *"; + // if timeout doesn't work + // fetch to get entity + // if not available, insert into queue let mut entity_updated: EntityUpdated = sqlx::query_as(insert_entities) .bind(&entity_id) .bind(&keys_str) @@ -760,7 +774,11 @@ impl Sql { Ty::Enum(e) => { if e.options.iter().all( |o| { - if let Ty::Tuple(t) = &o.ty { t.is_empty() } else { false } + if let Ty::Tuple(t) = &o.ty { + t.is_empty() + } else { + false + } }, ) { return; diff --git a/crates/torii/core/src/sql_test.rs b/crates/torii/core/src/sql_test.rs index a52242cc6a..2ee52cea6a 100644 --- a/crates/torii/core/src/sql_test.rs +++ b/crates/torii/core/src/sql_test.rs @@ -1,4 +1,5 @@ use std::str::FromStr; +use std::sync::Arc; use cainome::cairo_serde::ContractAddress; use camino::Utf8PathBuf; @@ -10,12 +11,14 @@ use dojo_world::contracts::world::{WorldContract, WorldContractReader}; use katana_runner::{KatanaRunner, KatanaRunnerConfig}; use scarb::compiler::Profile; use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; -use starknet::accounts::{Account, Call, ConnectedAccount}; +use starknet::accounts::{Account, Call}; use starknet::core::types::Felt; use starknet::core::utils::{get_contract_address, get_selector_from_name}; -use starknet::providers::Provider; +use starknet::providers::jsonrpc::HttpTransport; +use starknet::providers::{JsonRpcClient, Provider}; use starknet_crypto::poseidon_hash_many; use tokio::sync::broadcast; +use tracing_test::traced_test; use crate::engine::{Engine, EngineConfig, Processors}; use crate::processors::generate_event_processors_map; @@ -30,7 +33,7 @@ pub async fn bootstrap_engine

( provider: P, ) -> Result, Box> where - P: Provider + Send + Sync + core::fmt::Debug, + P: Provider + Send + Sync + core::fmt::Debug + Clone + 'static, { let (shutdown_tx, _) = broadcast::channel(1); let to = provider.block_hash_and_number().await?.block_number; @@ -76,6 +79,7 @@ async fn test_load_from_remote() { let sequencer = KatanaRunner::new_with_config(seq_config).expect("Failed to start runner."); let account = sequencer.account(0); + let provider = Arc::new(JsonRpcClient::new(HttpTransport::new(sequencer.url()))); let (strat, _) = prepare_migration_with_world_and_seed( manifest_path, @@ -102,7 +106,7 @@ async fn test_load_from_remote() { .await .unwrap(); - TransactionWaiter::new(res.transaction_hash, &account.provider()).await.unwrap(); + TransactionWaiter::new(res.transaction_hash, &provider).await.unwrap(); // spawn let tx = &account @@ -115,13 +119,13 @@ async fn test_load_from_remote() { .await .unwrap(); - TransactionWaiter::new(tx.transaction_hash, &account.provider()).await.unwrap(); + TransactionWaiter::new(tx.transaction_hash, &provider).await.unwrap(); - let world_reader = WorldContractReader::new(strat.world_address, account.provider()); + let world_reader = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); let mut db = Sql::new(pool.clone(), world_reader.address).await.unwrap(); - let _ = bootstrap_engine(world_reader, db.clone(), account.provider()).await.unwrap(); + let _ = bootstrap_engine(world_reader, db.clone(), provider).await.unwrap(); let _block_timestamp = 1710754478_u64; let models = sqlx::query("SELECT * FROM models").fetch_all(&pool).await.unwrap(); @@ -210,6 +214,7 @@ async fn test_load_from_remote_del() { let sequencer = KatanaRunner::new_with_config(seq_config).expect("Failed to start runner."); let account = sequencer.account(0); + let provider = Arc::new(JsonRpcClient::new(HttpTransport::new(sequencer.url()))); let (strat, _) = prepare_migration_with_world_and_seed( manifest_path, @@ -235,7 +240,7 @@ async fn test_load_from_remote_del() { .await .unwrap(); - TransactionWaiter::new(res.transaction_hash, &account.provider()).await.unwrap(); + TransactionWaiter::new(res.transaction_hash, &provider).await.unwrap(); // spawn let res = account @@ -248,7 +253,7 @@ async fn test_load_from_remote_del() { .await .unwrap(); - TransactionWaiter::new(res.transaction_hash, &account.provider()).await.unwrap(); + TransactionWaiter::new(res.transaction_hash, &provider).await.unwrap(); // Set player config. let res = account @@ -262,7 +267,7 @@ async fn test_load_from_remote_del() { .await .unwrap(); - TransactionWaiter::new(res.transaction_hash, &account.provider()).await.unwrap(); + TransactionWaiter::new(res.transaction_hash, &provider).await.unwrap(); let res = account .execute_v1(vec![Call { @@ -274,13 +279,13 @@ async fn test_load_from_remote_del() { .await .unwrap(); - TransactionWaiter::new(res.transaction_hash, &account.provider()).await.unwrap(); + TransactionWaiter::new(res.transaction_hash, &provider).await.unwrap(); - let world_reader = WorldContractReader::new(strat.world_address, account.provider()); + let world_reader = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); let mut db = Sql::new(pool.clone(), world_reader.address).await.unwrap(); - let _ = bootstrap_engine(world_reader, db.clone(), account.provider()).await; + let _ = bootstrap_engine(world_reader, db.clone(), provider).await; assert_eq!(count_table("dojo_examples-PlayerConfig", &pool).await, 0); assert_eq!(count_table("dojo_examples-PlayerConfig$favorite_item", &pool).await, 0); @@ -293,6 +298,7 @@ async fn test_load_from_remote_del() { } #[tokio::test(flavor = "multi_thread")] +#[traced_test] async fn test_get_entity_keys() { let options = SqliteConnectOptions::from_str("sqlite::memory:").unwrap().create_if_missing(true); @@ -328,6 +334,7 @@ async fn test_get_entity_keys() { ); let account = sequencer.account(0); + let provider = Arc::new(JsonRpcClient::new(HttpTransport::new(sequencer.url()))); let world = WorldContract::new(strat.world_address, &account); @@ -337,7 +344,7 @@ async fn test_get_entity_keys() { .await .unwrap(); - TransactionWaiter::new(res.transaction_hash, &account.provider()).await.unwrap(); + TransactionWaiter::new(res.transaction_hash, &provider).await.unwrap(); // spawn let res = account @@ -350,13 +357,13 @@ async fn test_get_entity_keys() { .await .unwrap(); - TransactionWaiter::new(res.transaction_hash, &account.provider()).await.unwrap(); + TransactionWaiter::new(res.transaction_hash, &provider).await.unwrap(); - let world_reader = WorldContractReader::new(strat.world_address, account.provider()); + let world_reader = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); let mut db = Sql::new(pool.clone(), world_reader.address).await.unwrap(); - let _ = bootstrap_engine(world_reader, db.clone(), account.provider()).await; + let _ = bootstrap_engine(world_reader, db.clone(), Arc::clone(&provider)).await.unwrap(); let keys = db.get_entity_keys_def("dojo_examples-Moves").await.unwrap(); assert_eq!(keys, vec![("player".to_string(), "ContractAddress".to_string()),]); diff --git a/crates/torii/graphql/src/tests/mod.rs b/crates/torii/graphql/src/tests/mod.rs index d535fa2acd..321466e09d 100644 --- a/crates/torii/graphql/src/tests/mod.rs +++ b/crates/torii/graphql/src/tests/mod.rs @@ -1,4 +1,5 @@ use std::str::FromStr; +use std::sync::Arc; use anyhow::Result; use async_graphql::dynamic::Schema; @@ -21,7 +22,8 @@ use sqlx::SqlitePool; use starknet::accounts::{Account, Call, ConnectedAccount}; use starknet::core::types::{Felt, InvokeTransactionResult}; use starknet::macros::selector; -use starknet::providers::Provider; +use starknet::providers::jsonrpc::HttpTransport; +use starknet::providers::{JsonRpcClient, Provider}; use tokio::sync::broadcast; use tokio_stream::StreamExt; use torii_core::engine::{Engine, EngineConfig, Processors}; @@ -290,6 +292,7 @@ pub async fn spinup_types_test() -> Result { let sequencer = KatanaRunner::new_with_config(seq_config).expect("Failed to start runner."); let account = sequencer.account(0); + let provider = Arc::new(JsonRpcClient::new(HttpTransport::new(sequencer.url()))); let (strat, _) = prepare_migration_with_world_and_seed( manifest_path, @@ -328,7 +331,7 @@ pub async fn spinup_types_test() -> Result { .await .unwrap(); - TransactionWaiter::new(transaction_hash, &account.provider()).await?; + TransactionWaiter::new(transaction_hash, &provider).await?; // Execute `delete` and delete Record with id 20 let InvokeTransactionResult { transaction_hash } = account @@ -341,9 +344,9 @@ pub async fn spinup_types_test() -> Result { .await .unwrap(); - TransactionWaiter::new(transaction_hash, &account.provider()).await?; + TransactionWaiter::new(transaction_hash, &provider).await?; - let world = WorldContractReader::new(strat.world_address, account.provider()); + let world = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); let db = Sql::new(pool.clone(), strat.world_address).await.unwrap(); @@ -351,7 +354,7 @@ pub async fn spinup_types_test() -> Result { let mut engine = Engine::new( world, db, - account.provider(), + Arc::clone(&provider), Processors { event: generate_event_processors_map(vec![ Box::new(RegisterModelProcessor), diff --git a/spawn-and-move-db.tar.gz b/spawn-and-move-db.tar.gz index 65fa481ffd..1237a288f5 100644 Binary files a/spawn-and-move-db.tar.gz and b/spawn-and-move-db.tar.gz differ diff --git a/types-test-db.tar.gz b/types-test-db.tar.gz index 5bae29e25d..98942d96e3 100644 Binary files a/types-test-db.tar.gz and b/types-test-db.tar.gz differ