Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt(torii): parallalize event processing #2385

Closed
wants to merge 1 commit into from
Closed

Conversation

lambda-0x
Copy link
Contributor

@lambda-0x lambda-0x commented Sep 5, 2024

Stack:

⚠️ Part of a stack created by spr. Do not merge manually using the UI - doing so may have unexpected results.

Copy link

coderabbitai bot commented Sep 5, 2024

Walkthrough

Ohayo, sensei! The changes involve adding the tracing-test dependency to enhance testing capabilities. The provider variable in the main function has been modified to use Arc::clone, ensuring safe shared ownership. The Engine struct has been updated to utilize Arc<Processors<P>>, improving thread safety. Additionally, various constants and traits have been made more accessible and thread-safe, while the Sql struct's Clone implementation has been refined.

Changes

File Change Summary
Cargo.toml, crates/torii/core/Cargo.toml Added tracing-test dependency to enhance testing capabilities.
bin/torii/src/main.rs Modified provider handling to use Arc::clone, ensuring safe shared ownership.
crates/torii/core/src/engine.rs Updated Engine struct to use Arc<Processors<P>>, improving concurrency and thread safety.
crates/torii/core/src/processors/mod.rs Changed visibility of constants to pub(crate) and added Send + Sync bounds to traits.
crates/torii/core/src/sql.rs Implemented Clone for Sql struct explicitly, refining cloning behavior.
crates/torii/core/src/sql_test.rs Updated provider handling for async functions, ensuring it implements Clone.
crates/torii/graphql/src/tests/mod.rs Enhanced spinup_types_test function to use a new provider variable for transaction management.

Sequence Diagram(s)

sequenceDiagram
    participant Main as Main Function
    participant Provider as JsonRpcClient
    participant Engine as Engine Struct
    participant Processors as Processors<P>

    Main->>Provider: Arc::clone(provider)
    Main->>Engine: new(Arc::clone(provider))
    Main->>Processors: new(Arc::clone(provider))
    Engine->>Processors: process_event()
Loading

Recent review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

Commits

Files that changed from the base of the PR and between e4c88f6 and 8f096a4.

Files ignored due to path filters (3)
  • Cargo.lock is excluded by !**/*.lock
  • spawn-and-move-db.tar.gz is excluded by !**/*.gz
  • types-test-db.tar.gz is excluded by !**/*.gz
Files selected for processing (10)
  • Cargo.toml (1 hunks)
  • bin/torii/src/main.rs (3 hunks)
  • crates/torii/core/Cargo.toml (1 hunks)
  • crates/torii/core/src/engine.rs (10 hunks)
  • crates/torii/core/src/processors/mod.rs (3 hunks)
  • crates/torii/core/src/sql.rs (2 hunks)
  • crates/torii/core/src/sql_test.rs (14 hunks)
  • crates/torii/graphql/src/tests/entities_test.rs (1 hunks)
  • crates/torii/graphql/src/tests/mod.rs (5 hunks)
  • crates/torii/graphql/src/tests/models_test.rs (11 hunks)
Files skipped from review due to trivial changes (1)
  • crates/torii/core/Cargo.toml
Files skipped from review as they are similar to previous changes (7)
  • Cargo.toml
  • bin/torii/src/main.rs
  • crates/torii/core/src/processors/mod.rs
  • crates/torii/core/src/sql_test.rs
  • crates/torii/graphql/src/tests/entities_test.rs
  • crates/torii/graphql/src/tests/mod.rs
  • crates/torii/graphql/src/tests/models_test.rs
Additional comments not posted (8)
crates/torii/core/src/engine.rs (5)

94-98: Refinement in struct definition to support concurrency.

The addition of Clone to the trait bounds of the Engine struct is a crucial update for enabling shared ownership across threads, which is essential for the parallel processing of events. This change aligns well with the PR's objectives of enhancing performance through concurrency.


Line range hint 109-119: Proper use of Arc for shared ownership in constructors.

Wrapping the processors in an Arc within the new method is a key adjustment for thread safety in a concurrent processing environment. This modification is well-executed and crucial for the parallel processing capabilities introduced in this PR.


160-162: Consider enhancing error isolation in parallel tasks.

The TODO comment highlights a potential issue with error handling in the context of parallel processing. It's crucial to identify which specific task failed when errors occur. This could be addressed by improving the logging of errors or by redesigning the task handling to better isolate and report errors.

Consider implementing more granular error handling or logging mechanisms to trace errors back to their origin tasks more effectively.


Line range hint 364-501: Effective use of BTreeMap for ordered parallel processing.

The process_pending method's use of BTreeMap to maintain order while processing events in parallel is a well-thought-out approach. This ensures that dependencies among events are respected, which is crucial for the integrity of event processing. The method's implementation aligns well with the PR's goal of enhancing efficiency through parallel processing.


509-607: Consistent and efficient parallel processing in process_range.

The process_range method effectively mirrors the parallel processing strategy seen in process_pending, using BTreeMap to organize events and handling them in parallel tasks. This consistency in approach is commendable and well-suited to the PR's objectives of improving performance through parallel processing.

crates/torii/core/src/sql.rs (3)

36-36: Refinement of Sql struct's Debug trait.

The removal of the Clone derive and the manual implementation of the Clone trait suggests a move towards more controlled cloning behavior, which is crucial for managing resources like database connections and caches effectively.


44-52: Custom implementation of the Clone trait for Sql.

This new implementation ensures that while the world_address is copied directly, the pool and model_cache are cloned in a way that respects their internal state and concurrency requirements. This is a critical change for ensuring thread safety and consistent state across clones of the Sql struct.

+impl Clone for Sql {
+    fn clone(&self) -> Self {
+        Sql {
+            world_address: self.world_address,
+            pool: self.pool.clone(),
+            query_queue: QueryQueue::new(self.pool.clone()),
+            model_cache: self.model_cache.clone(),
+        }
+    }
+}

207-209: Consider handling timeouts and entity fetching more robustly.

The comments added in lines 207-209 suggest areas for potential future enhancements, specifically around handling timeouts and entity fetching. It's great to see proactive comments that guide future development and maintenance efforts. However, it would be beneficial to start creating more concrete plans or tickets for these enhancements to ensure they are not overlooked. Consider using issue trackers or TODOs with specific tags that can be easily searched and prioritized.


Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Outside diff range, codebase verification and nitpick comments (1)
crates/torii/core/src/engine.rs (1)

Line range hint 363-500: Refactor processing of pending transactions to enhance clarity and maintainability.

The method for processing pending transactions uses a complex logic involving multiple nested loops and conditionals. This could be refactored to improve readability and maintainability. Additionally, the use of BTreeMap for ordering events by entity ID is a thoughtful choice that ensures events are processed in a predictable order.

Consider breaking down this method into smaller, more focused functions or methods. This can help isolate functionality and make the code easier to understand and maintain.

Comment on lines +159 to +162
// TODO: we might not able able to properly handle this error case
// since we are trying to do things in parallel so we don't exactly
// know which task failed
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider improving error handling in parallel tasks.

The TODO comment highlights a potential issue with error handling in the context of parallel processing. It's crucial to identify which specific task failed when errors occur. This could be addressed by improving the logging of errors or by redesigning the task handling to better isolate and report errors.

Consider implementing more granular error handling or logging mechanisms to trace errors back to their origin tasks more effectively.

Comment on lines 511 to 607
let modulo = 8u64;
let other = 0u64;
let mut map = BTreeMap::<u8, Vec<(EmittedEvent, u64, u64)>>::new();

for ((block_number, _), events) in data.transactions {
let block_timestamp = data.blocks[&block_number];
for event in events {
let event_name = event_type_from_felt(event.keys[0]);
let entity_id = match event_name {
EventType::StoreSetRecord => {
let keys_start = NUM_KEYS_INDEX + 1;
let keys_end: usize = keys_start
+ event.data[NUM_KEYS_INDEX].to_usize().context("invalid usize")?;

let keys = event.data[keys_start..keys_end].to_vec();
let entity_id = poseidon_hash_many(&keys);
entity_id.to_raw()[3] % modulo + 1
}
EventType::StoreDeleteRecord => {
let entity_id = event.data[ENTITY_ID_INDEX];
entity_id.to_raw()[3] % modulo + 1
}
EventType::StoreUpdateMember => {
let entity_id = event.data[ENTITY_ID_INDEX];
entity_id.to_raw()[3] % modulo + 1
}
EventType::StoreUpdateRecord => {
let entity_id = event.data[ENTITY_ID_INDEX];
entity_id.to_raw()[3] % modulo + 1
}
EventType::Other => other,
};

map.entry(entity_id as u8).or_default().push((
event,
block_number,
block_timestamp,
));
}
}

let mut tasks = Vec::new();

// Process block
if block_number > last_block {
if let Some(ref block_tx) = self.block_tx {
block_tx.send(block_number).await?;
// loop over the collected events
for (id, events) in map.into_iter() {
let mut db = self.db.clone();
let processors = Arc::clone(&self.processors);
let world =
WorldContractReader::new(self.world.address, self.provider.as_ref().clone());

let task: JoinHandle<Result<()>> = task::spawn(async move {
for (event_idx, (event, block_number, block_timestamp)) in events.iter().enumerate()
{
if db.query_queue.queue.len() >= QUERY_QUEUE_BATCH_SIZE {
db.execute().await?;
}

let transaction_hash = event.transaction_hash;
let event_id = format!(
"{:#064x}:{:#x}:{:#04x}",
block_number, transaction_hash, event_idx
);
let event = Event {
from_address: event.from_address,
keys: event.keys.clone(),
data: event.data.clone(),
};

process_event(
*block_number,
*block_timestamp,
&event_id,
&event,
transaction_hash,
&mut db,
Arc::clone(&processors),
&world,
)
.await?
}
db.execute().await?;
Ok(())
});

self.process_block(block_number, data.blocks[&block_number]).await?;
last_block = block_number;
if id as u64 == other {
task.await??;
} else {
tasks.push(task);
}
}

for task in tasks {
task.await??;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimization of event processing in range data.

The method for processing range data efficiently organizes events into a BTreeMap based on entity IDs, which is a good strategy for ensuring that events are processed in a structured manner. However, the complex nesting and extensive use of asynchronous tasks within loops could be streamlined for better performance and readability.

Refactor this method to reduce complexity and improve the efficiency of task management. Consider using more of Rust's functional programming features to handle data transformations and concurrency more cleanly.

Comment on lines +207 to +209
// if timeout doesn't work
// fetch to get entity
// if not available, insert into queue
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider handling timeouts and entity fetching more robustly.

The comments added in lines 207-209 suggest areas for potential future enhancements, specifically around handling timeouts and entity fetching. It's great to see proactive comments that guide future development and maintenance efforts. However, it would be beneficial to start creating more concrete plans or tickets for these enhancements to ensure they are not overlooked. Consider using issue trackers or TODOs with specific tags that can be easily searched and prioritized.

Comment on lines 777 to 781
if let Ty::Tuple(t) = &o.ty {
t.is_empty()
} else {
false
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor the conditional logic within the Enum handling.

The conditional logic in lines 777-781 for handling empty tuples in Enum types is a bit cumbersome and could be simplified. Using a method or a helper function to determine if an Enum option is empty could improve readability and maintainability. Here's a suggested refactor:

- if e.options.iter().all(|o| if let Ty::Tuple(t) = &o.ty { t.is_empty() } else { false }) {
+ if e.options.iter().all(|o| o.ty.is_empty_tuple()) {

You would need to implement is_empty_tuple in an appropriate place in your type definitions. This change would make the code cleaner and easier to understand at a glance.

Committable suggestion was skipped due to low confidence.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Outside diff range, codebase verification and nitpick comments (1)
crates/torii/core/src/engine.rs (1)

Line range hint 363-500: Refactor processing of pending data to enhance clarity and maintainability.

The method process_pending is quite lengthy and handles multiple responsibilities, which could impact the maintainability and readability of the code. Consider breaking down this method into smaller, more focused methods, each handling a specific part of the processing logic.

For instance, handling of events and transactions could be separated into distinct methods, and the logic for updating the database state could be encapsulated in another method. This would not only improve readability but also make the code easier to test and maintain.

Comment on lines +806 to +857
#[allow(clippy::too_many_arguments)]
async fn process_event<P: Provider + Send + Sync + std::fmt::Debug>(
block_number: u64,
block_timestamp: u64,
event_id: &str,
event: &Event,
transaction_hash: Felt,
db: &mut Sql,
processors: Arc<Processors<P>>,
world: &WorldContractReader<P>,
) -> Result<()> {
db.store_event(event_id, event, transaction_hash, block_timestamp);
let event_key = event.keys[0];

let Some(processor) = processors.event.get(&event_key) else {
// if we dont have a processor for this event, we try the catch all processor
if processors.catch_all_event.validate(event) {
if let Err(e) = processors
.catch_all_event
.process(world, db, block_number, block_timestamp, event_id, event)
.await
{
error!(target: LOG_TARGET, error = %e, "Processing catch all event processor.");
}
} else {
let unprocessed_event = UnprocessedEvent {
keys: event.keys.iter().map(|k| format!("{:#x}", k)).collect(),
data: event.data.iter().map(|d| format!("{:#x}", d)).collect(),
};

// if processor.validate(event) {
if let Err(e) = processor
.process(&self.world, &mut self.db, block_number, block_timestamp, event_id, event)
.await
{
error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, "Processing event.");
trace!(
target: LOG_TARGET,
keys = ?unprocessed_event.keys,
data = ?unprocessed_event.data,
"Unprocessed event.",
);
}
// }

Ok(())
return Ok(());
};

// if processor.validate(event) {
if let Err(e) =
processor.process(world, db, block_number, block_timestamp, event_id, event).await
{
error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, "Processing event.");
}
// }

Ok(())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhance error handling and logging in process_event.

The process_event function could benefit from improved error handling. Currently, errors in event processing are logged but not rethrown, potentially swallowing exceptions that should be visible to callers. Consider modifying the error handling strategy to rethrow exceptions after logging, ensuring that errors are properly propagated and can be handled at a higher level.

-        error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, "Processing event.");
+        let err = error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, "Processing event.");
+        return Err(err);

Committable suggestion was skipped due to low confidence.

Copy link

codecov bot commented Sep 5, 2024

Codecov Report

Attention: Patch coverage is 60.35242% with 90 lines in your changes missing coverage. Please review.

Project coverage is 68.28%. Comparing base (72002a2) to head (8f096a4).
Report is 33 commits behind head on main.

Files with missing lines Patch % Lines
crates/torii/core/src/engine.rs 56.06% 87 Missing ⚠️
bin/torii/src/main.rs 0.00% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2385      +/-   ##
==========================================
- Coverage   68.30%   68.28%   -0.02%     
==========================================
  Files         357      357              
  Lines       47181    47257      +76     
==========================================
+ Hits        32225    32270      +45     
- Misses      14956    14987      +31     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@@ -193,6 +204,9 @@ impl Sql {
?, ?, ?) ON CONFLICT(id) DO UPDATE SET \
updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \
event_id=EXCLUDED.event_id RETURNING *";
// if timeout doesn't work
// fetch to get entity
// if not available, insert into queue
let mut entity_updated: EntityUpdated = sqlx::query_as(insert_entities)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason as to why we push our broker message to the queue? Because the insert_entities query is executed without the queue. Are we going at some point insrt it into queue in some cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have updated it to use queue in the next commit on stack

@@ -33,14 +33,25 @@ pub const FELT_DELIMITER: &str = "/";
#[path = "sql_test.rs"]
mod test;

#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct Sql {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also update the set_event_message and other functions. Like set_model_member. Or dont they need changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for now we only run store_set, store_del, store_update_member and store_update_record in parallel

@lambda-0x
Copy link
Contributor Author

supersede by #2423

@lambda-0x lambda-0x closed this Sep 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants