-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(eth-watch): redesign to support multiple chains (#2867)
This PR contains code from sync-layer-stable + I removed old upgrades processor and updated unit-tests to use the new one --------- Signed-off-by: tomg10 <[email protected]>
- Loading branch information
Showing
19 changed files
with
665 additions
and
348 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
33 changes: 33 additions & 0 deletions
33
...lib/dal/.sqlx/query-26c80e9bafcf7989e7d40c6e424240b34ce29aad3ac6571116e084d45574c448.json
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
26 changes: 26 additions & 0 deletions
26
...lib/dal/.sqlx/query-afdeecb78e3af802c2b8ffb0f5ab9863082ccd1ce45b2d20e1119f1e78171d66.json
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
26 changes: 26 additions & 0 deletions
26
...lib/dal/.sqlx/query-c61682ed92c1a43855a991598d593639a63047f46dc8e605ff37b78f43f5cef5.json
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
4 changes: 4 additions & 0 deletions
4
core/lib/dal/migrations/20240912085550_add_eth_watcher_progress_table.down.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
DROP TABLE IF EXISTS processed_events; | ||
|
||
DROP TYPE IF EXISTS event_type; | ||
|
9 changes: 9 additions & 0 deletions
9
core/lib/dal/migrations/20240912085550_add_eth_watcher_progress_table.up.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
CREATE TYPE event_type AS ENUM ('ProtocolUpgrades', 'PriorityTransactions'); | ||
|
||
CREATE TABLE processed_events | ||
( | ||
type event_type NOT NULL, | ||
chain_id BIGINT NOT NULL, | ||
next_block_to_process BIGINT NOT NULL, | ||
PRIMARY KEY (chain_id, type) | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
use zksync_db_connection::{connection::Connection, error::DalResult, instrument::InstrumentExt}; | ||
use zksync_types::SLChainId; | ||
|
||
use crate::Core; | ||
|
||
pub struct EthWatcherDal<'a, 'c> { | ||
pub(crate) storage: &'a mut Connection<'c, Core>, | ||
} | ||
|
||
#[derive(Debug, Copy, Clone, sqlx::Type)] | ||
#[sqlx(type_name = "event_type")] | ||
pub enum EventType { | ||
ProtocolUpgrades, | ||
PriorityTransactions, | ||
} | ||
|
||
impl EthWatcherDal<'_, '_> { | ||
// Returns last set value of next_block_to_process for given event_type and chain_id. | ||
// If the value was missing, initializes it with provided next_block_to_process value | ||
pub async fn get_or_set_next_block_to_process( | ||
&mut self, | ||
event_type: EventType, | ||
chain_id: SLChainId, | ||
next_block_to_process: u64, | ||
) -> DalResult<u64> { | ||
let result = sqlx::query!( | ||
r#" | ||
SELECT | ||
next_block_to_process | ||
FROM | ||
processed_events | ||
WHERE | ||
TYPE = $1 | ||
AND chain_id = $2 | ||
"#, | ||
event_type as EventType, | ||
chain_id.0 as i64 | ||
) | ||
.instrument("get_or_set_next_block_to_process") | ||
.with_arg("event_type", &event_type) | ||
.with_arg("chain_id", &chain_id) | ||
.fetch_optional(self.storage) | ||
.await?; | ||
|
||
if let Some(row) = result { | ||
Ok(row.next_block_to_process as u64) | ||
} else { | ||
sqlx::query!( | ||
r#" | ||
INSERT INTO | ||
processed_events ( | ||
TYPE, | ||
chain_id, | ||
next_block_to_process | ||
) | ||
VALUES | ||
($1, $2, $3) | ||
"#, | ||
event_type as EventType, | ||
chain_id.0 as i64, | ||
next_block_to_process as i64 | ||
) | ||
.instrument("get_or_set_next_block_to_process - insert") | ||
.with_arg("event_type", &event_type) | ||
.with_arg("chain_id", &chain_id) | ||
.execute(self.storage) | ||
.await?; | ||
|
||
Ok(next_block_to_process) | ||
} | ||
} | ||
|
||
pub async fn update_next_block_to_process( | ||
&mut self, | ||
event_type: EventType, | ||
chain_id: SLChainId, | ||
next_block_to_process: u64, | ||
) -> DalResult<()> { | ||
sqlx::query!( | ||
r#" | ||
UPDATE processed_events | ||
SET | ||
next_block_to_process = $3 | ||
WHERE | ||
TYPE = $1 | ||
AND chain_id = $2 | ||
"#, | ||
event_type as EventType, | ||
chain_id.0 as i64, | ||
next_block_to_process as i64 | ||
) | ||
.instrument("update_next_block_to_process") | ||
.with_arg("event_type", &event_type) | ||
.with_arg("chain_id", &chain_id) | ||
.execute(self.storage) | ||
.await?; | ||
Ok(()) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
use crate::{ConnectionPool, Core, CoreDal}; | ||
|
||
#[tokio::test] | ||
async fn test_get_or_set_next_block_to_process_with_different_event_types() { | ||
let pool = ConnectionPool::<Core>::test_pool().await; | ||
let mut conn = pool.connection().await.unwrap(); | ||
let mut dal = conn.processed_events_dal(); | ||
|
||
// Test with ProtocolUpgrades | ||
let next_block = dal | ||
.get_or_set_next_block_to_process(EventType::ProtocolUpgrades, SLChainId(1), 100) | ||
.await | ||
.expect("Failed to get or set next block to process"); | ||
assert_eq!(next_block, 100); | ||
|
||
// Test with PriorityTransactions | ||
let next_block = dal | ||
.get_or_set_next_block_to_process(EventType::PriorityTransactions, SLChainId(1), 200) | ||
.await | ||
.expect("Failed to get or set next block to process"); | ||
assert_eq!(next_block, 200); | ||
|
||
// Test with PriorityTransactions | ||
let next_block = dal | ||
.get_or_set_next_block_to_process(EventType::PriorityTransactions, SLChainId(2), 300) | ||
.await | ||
.expect("Failed to get or set next block to process"); | ||
assert_eq!(next_block, 300); | ||
|
||
// Verify that the initial block is not updated for ProtocolUpgrades | ||
let next_block = dal | ||
.get_or_set_next_block_to_process(EventType::ProtocolUpgrades, SLChainId(1), 150) | ||
.await | ||
.expect("Failed to get or set next block to process"); | ||
assert_eq!(next_block, 100); | ||
|
||
// Verify that the initial block is not updated for PriorityTransactions | ||
let next_block = dal | ||
.get_or_set_next_block_to_process(EventType::PriorityTransactions, SLChainId(1), 250) | ||
.await | ||
.expect("Failed to get or set next block to process"); | ||
assert_eq!(next_block, 200); | ||
|
||
// Verify that the initial block is not updated for PriorityTransactions | ||
let next_block = dal | ||
.get_or_set_next_block_to_process(EventType::PriorityTransactions, SLChainId(2), 350) | ||
.await | ||
.expect("Failed to get or set next block to process"); | ||
assert_eq!(next_block, 300); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.