Skip to content
This repository has been archived by the owner on Jan 15, 2024. It is now read-only.

Persist EQS state after loop over events #1195

Merged
merged 2 commits into from
Jul 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions eqs/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use cap_rust_sandbox::{
};

pub async fn run(opt: &EQSOptions) -> std::io::Result<()> {
tracing::info!("Starting EQS");
tracing::warn!("Starting EQS");

if !opt.temp_test_run {
let provider = get_provider_from_url(opt.rpc_url());
Expand All @@ -49,7 +49,7 @@ pub async fn run(opt: &EQSOptions) -> std::io::Result<()> {
}),
));
let toc = std::time::Instant::now();
tracing::info!("Restored state in {:?}", toc - tic);
tracing::warn!("Restored state in {:?}", toc - tic);
(state_persistence, query_result_state)
};

Expand Down
39 changes: 19 additions & 20 deletions eqs/src/eth_polling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,6 @@ impl EthPolling {
// select cape events starting from the first block for which we do not have confirmed
// completion of processing

tracing::info!("Fetching events from block {} to {}", from_block, to_block);

let new_event_result = self
.connection
.contract
Expand All @@ -191,11 +189,20 @@ impl EthPolling {
}
};

// Track if we made changes to the state and need to persist it at the end of the loop.
let mut persist_state = false;

for (filter, meta) in new_event {
let current_block = meta.block_number.as_u64();
let current_log_index = meta.log_index.as_u64();
let current_index = (current_block, current_log_index);

tracing::warn!(
"Processing block {} event {}",
current_block,
current_log_index,
);

// We sometimes see repeat events in spite of the `from_block(next_block_to_query)`
// filter. This appears to be a hardhat bug. Skip this event if it does not come after
// the last reported event.
Expand All @@ -204,6 +211,7 @@ impl EthPolling {
continue;
}
}

match filter {
CAPEEvents::BlockCommittedFilter(filter_data) => {
let memos = fetch_cape_memos(&self.connection, meta.transaction_hash)
Expand Down Expand Up @@ -369,12 +377,7 @@ impl EthPolling {
updated_state.ledger_state.state_number += 1;
updated_state.last_reported_index = Some(current_index);
self.last_event_index = Some(current_index);

let state_for_write = updated_state.clone();
drop(updated_state);

// persist the state block updates (will be more fine grained in r3)
self.state_persistence.store_latest_state(&state_for_write);
persist_state = true;
}
CAPEEvents::Erc20TokensDepositedFilter(filter_data) => {
let ro_bytes = filter_data.ro_bytes.clone();
Expand Down Expand Up @@ -465,12 +468,7 @@ impl EthPolling {

updated_state.last_reported_index = Some(current_index);
self.last_event_index = Some(current_index);

let state_for_write = updated_state.clone();
drop(updated_state);

// persist the state block updates (will be more fine grained in r3)
self.state_persistence.store_latest_state(&state_for_write);
persist_state = true;
}

CAPEEvents::AssetSponsoredFilter(filter_data) => {
Expand All @@ -483,16 +481,17 @@ impl EthPolling {

updated_state.last_reported_index = Some(current_index);
self.last_event_index = Some(current_index);

let state_for_write = updated_state.clone();
drop(updated_state);

// persist the state block updates (will be more fine grained in r3)
self.state_persistence.store_latest_state(&state_for_write);
persist_state = true;
}
}
}

// persist the state block updates (will be more fine grained in r3)
if persist_state {
self.state_persistence
.store_latest_state(&*self.query_result_state.read().await);
}

// We won't ever get here if we haven't successfully processed all events up to and including any in `to_block` from the query range.
// This means the block we care about isn't the one in `current_index`, it's `to_block`, and if we fail partway, we're going to short circuit to the error return, not here.
self.next_block_to_query = to_block + 1;
Expand Down
2 changes: 1 addition & 1 deletion eqs/src/state_persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl StatePersistence {
self.atomic_store.commit_version().unwrap();
self.state_snapshot.prune_file_entries().unwrap();
let toc = std::time::Instant::now();
tracing::info!("Persisting state took {:?}", toc - tic);
tracing::warn!("Persisting state took {:?}", toc - tic);
}

pub fn load_latest_state(&self) -> Result<QueryResultState, PersistenceError> {
Expand Down