Skip to content

Commit

Permalink
Modify Relayer service to order Events from L1 by block index (#1779)
Browse files Browse the repository at this point in the history
closes #1772

***

Had to modify `MockDB` to also maintain order.

This code only works if the Relayer DB maintains order. In production it
is stored as a `Mappable` with value being `[Event]`:

```
impl Mappable for EventsHistory {
    /// The key is the height of the DA.
    type Key = Self::OwnedKey;
    type OwnedKey = DaBlockHeight;
    /// The value is an events happened at the height.
    type Value = [Event];
    type OwnedValue = Vec<Event>;
}
```
So it should be good.

This approach is flakier than having the relayer reader in the executor
ensure the order, but it could add a lot of extra execution cost to the
executor code if we put the sorting on that side. We are already making
assumptions about what the writing code is doing though. For example, if
an event is missing from L1, correct order doesn't matter. So why not
add more assumptions :P

***

We require that all `Logs`include the `log_index` so we can sort them.
So I added a test
`relayer__if_a_log_does_not_include_index_then_event_not_included` to
check the sad path. **I'm not positive this is the behavior we want.**
It is erroring in the `sort_events_by_log_index` method but I'm not
exactly sure where that bubbles up to: `start_and_await` still returns
an `Ok(Starting)` not an `Ok(StoppedWithError)`. That might be a bug in
our `Service` code, or might be meant to be used differently. Not sure.
So I just check that the `Log` isn't included in the test...

~I had to modify `MockMiddleware` to only let you take the batch once.
Before it would error and then just try again and call the same value
over and over and never exited. Making some assumptions there too.~

---------

Co-authored-by: Green Baneling <[email protected]>
  • Loading branch information
MitchTurner and xgreenx authored Mar 28, 2024
1 parent 33848a3 commit 951308e
Show file tree
Hide file tree
Showing 10 changed files with 319 additions and 104 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Description of the upcoming release here.
### Changed

#### Breaking
- [1779](https://github.com/FuelLabs/fuel-core/pull/1779): Modify Relayer service to order Events from L1 by block index
- [#1783](https://github.com/FuelLabs/fuel-core/pull/1783): The PR upgrade `fuel-vm` to `0.48.0` release. Because of some breaking changes, we also adapted our codebase to follow them:
- Implementation of `Default` for configs was moved under the `test-helpers` feature. The `fuel-core` binary uses testnet configuration instead of `Default::default`(for cases when `ChainConfig` was not provided by the user).
- All parameter types are enums now and require corresponding modifications across the codebase(we need to use getters and setters). The GraphQL API remains the same for simplicity, but each parameter now has one more field - `version`, that can be used to decide how to deserialize.
Expand Down
50 changes: 39 additions & 11 deletions crates/services/relayer/src/mock_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ use fuel_core_types::{
services::relayer::Event,
};
use std::{
collections::{
BTreeMap,
HashMap,
},
collections::BTreeMap,
sync::{
Arc,
Mutex,
Expand All @@ -28,9 +25,9 @@ use std::{

#[derive(Default)]
pub struct Data {
pub messages: BTreeMap<DaBlockHeight, HashMap<Nonce, Message>>,
pub messages: BTreeMap<DaBlockHeight, Vec<(Nonce, Message)>>,
pub transactions:
BTreeMap<DaBlockHeight, HashMap<RelayedTransactionId, RelayedTransaction>>,
BTreeMap<DaBlockHeight, Vec<(RelayedTransactionId, RelayedTransaction)>>,
pub finalized_da_height: Option<DaBlockHeight>,
}

Expand All @@ -44,13 +41,27 @@ pub struct MockDb {
}

impl MockDb {
pub fn get_message(&self, id: &Nonce) -> Option<Message> {
pub fn get_message(&self, nonce: &Nonce) -> Option<Message> {
self.data
.lock()
.unwrap()
.messages
.iter()
.find_map(|(_, map)| map.get(id).cloned())
.find_map(|(_, map)| {
map.iter()
.find(|(inner_nonce, _msg)| nonce == inner_nonce)
.map(|(_, msg)| msg.clone())
})
}

pub fn get_messages_for_block(&self, da_block_height: DaBlockHeight) -> Vec<Message> {
self.data
.lock()
.unwrap()
.messages
.get(&da_block_height)
.map(|map| map.iter().map(|(_, msg)| msg).cloned().collect())
.unwrap_or_default()
}

pub fn get_transaction(
Expand All @@ -62,7 +73,24 @@ impl MockDb {
.unwrap()
.transactions
.iter()
.find_map(|(_, map)| map.get(id).cloned())
.find_map(|(_, txs)| {
txs.iter()
.find(|(inner_id, _tx)| id == inner_id)
.map(|(_, tx)| tx.clone())
})
}

pub fn get_transactions_for_block(
&self,
da_block_height: DaBlockHeight,
) -> Vec<RelayedTransaction> {
self.data
.lock()
.unwrap()
.transactions
.get(&da_block_height)
.map(|map| map.iter().map(|(_, tx)| tx).cloned().collect())
.unwrap_or_default()
}
}

Expand All @@ -79,13 +107,13 @@ impl RelayerDb for MockDb {
m.messages
.entry(message.da_height())
.or_default()
.insert(*message.id(), message.clone());
.push((*message.id(), message.clone()));
}
Event::Transaction(transaction) => {
m.transactions
.entry(transaction.da_height())
.or_default()
.insert(transaction.id(), transaction.clone());
.push((transaction.id(), transaction.clone()));
}
}
}
Expand Down
38 changes: 32 additions & 6 deletions crates/services/relayer/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ pub struct NotInitializedTask<P, D> {
database: D,
/// Configuration settings.
config: Config,
/// Retry on error
retry_on_error: bool,
}

/// The actual relayer background task that syncs with the DA layer.
Expand All @@ -93,17 +95,20 @@ pub struct Task<P, D> {
/// The watcher used to track the state of the service. If the service stops,
/// the task will stop synchronization.
shutdown: StateWatcher,
/// Retry on error
retry_on_error: bool,
}

impl<P, D> NotInitializedTask<P, D> {
/// Create a new relayer task.
fn new(eth_node: P, database: D, config: Config) -> Self {
fn new(eth_node: P, database: D, config: Config, retry_on_error: bool) -> Self {
let (synced, _) = watch::channel(None);
Self {
synced,
eth_node,
database,
config,
retry_on_error,
}
}
}
Expand Down Expand Up @@ -153,6 +158,7 @@ where
self.config.log_page_size,
);
let logs = logs.take_until(self.shutdown.while_started());

write_logs(&mut self.database, logs).await
}

Expand Down Expand Up @@ -193,13 +199,15 @@ where
eth_node,
database,
config,
retry_on_error,
} = self;
let mut task = Task {
synced,
eth_node,
database,
config,
shutdown,
retry_on_error,
};
task.set_deploy_height();

Expand All @@ -215,7 +223,6 @@ where
{
async fn run(&mut self, _: &mut StateWatcher) -> anyhow::Result<bool> {
let now = tokio::time::Instant::now();
let should_continue = true;

let result = run::run(self).await;

Expand All @@ -231,7 +238,18 @@ where
.await;
}

result.map(|_| should_continue)
if let Err(err) = result {
if !self.retry_on_error {
tracing::error!("Exiting due to Error in relayer task: {:?}", err);
let should_continue = false;
Ok(should_continue)
} else {
Err(err)
}
} else {
let should_continue = true;
Ok(should_continue)
}
}

async fn shutdown(self) -> anyhow::Result<()> {
Expand Down Expand Up @@ -331,7 +349,13 @@ where
// TODO: Does this handle https?
let http = Http::new(url);
let eth_node = Provider::new(http);
Ok(new_service_internal(eth_node, database, config))
let retry_on_error = true;
Ok(new_service_internal(
eth_node,
database,
config,
retry_on_error,
))
}

#[cfg(any(test, feature = "test-helpers"))]
Expand All @@ -345,19 +369,21 @@ where
P: Middleware<Error = ProviderError> + 'static,
D: RelayerDb + Clone + 'static,
{
new_service_internal(eth_node, database, config)
let retry_on_fail = false;
new_service_internal(eth_node, database, config, retry_on_fail)
}

fn new_service_internal<P, D>(
eth_node: P,
database: D,
config: Config,
retry_on_error: bool,
) -> CustomizableService<P, D>
where
P: Middleware<Error = ProviderError> + 'static,
D: RelayerDb + Clone + 'static,
{
let task = NotInitializedTask::new(eth_node, database, config);
let task = NotInitializedTask::new(eth_node, database, config, retry_on_error);

CustomizableService::new(task)
}
47 changes: 31 additions & 16 deletions crates/services/relayer/src/service/get_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,24 @@ where
while let Some((last_height, events)) = logs.try_next().await? {
let last_height = last_height.into();
let mut ordered_events = BTreeMap::<DaBlockHeight, Vec<Event>>::new();
let fuel_events =
events
.into_iter()
.filter_map(|event| match EthEventLog::try_from(&event) {
Ok(event) => {
match event {
EthEventLog::Message(m) => {
Some(Ok(Event::Message(Message::from(&m))))
}
EthEventLog::Transaction(tx) => {
Some(Ok(Event::Transaction(RelayedTransaction::from(tx))))
}
// TODO: Log out ignored messages.
EthEventLog::Ignored => None,
let sorted_events = sort_events_by_log_index(events)?;
let fuel_events = sorted_events.into_iter().filter_map(|event| {
match EthEventLog::try_from(&event) {
Ok(event) => {
match event {
EthEventLog::Message(m) => {
Some(Ok(Event::Message(Message::from(&m))))
}
EthEventLog::Transaction(tx) => {
Some(Ok(Event::Transaction(RelayedTransaction::from(tx))))
}
// TODO: Log out ignored messages.
EthEventLog::Ignored => None,
}
Err(e) => Some(Err(e)),
});
}
Err(e) => Some(Err(e)),
}
});

for event in fuel_events {
let event = event?;
Expand All @@ -109,3 +109,18 @@ where
}
Ok(())
}

fn sort_events_by_log_index(events: Vec<Log>) -> anyhow::Result<Vec<Log>> {
let mut with_indexes = events
.into_iter()
.map(|e| {
let log_index = e
.log_index
.ok_or(anyhow::anyhow!("Log missing `log_index`: {e:?}"))?;
Ok((log_index, e))
})
.collect::<anyhow::Result<Vec<_>>>()?;
with_indexes.sort_by(|(index_a, _a), (index_b, _b)| index_a.cmp(index_b));
let new_events = with_indexes.into_iter().map(|(_, e)| e).collect();
Ok(new_events)
}
5 changes: 3 additions & 2 deletions crates/services/relayer/src/service/get_logs/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ fn messages(
nonce
.zip(block_number)
.zip(contracts)
.map(|((n, b), c)| message(n, b, c))
.map(|((n, b), c)| message(n, b, c, 0))
.collect()
}

fn message(nonce: u64, block_number: u64, contract_address: u32) -> Log {
fn message(nonce: u64, block_number: u64, contract_address: u32, index: u64) -> Log {
let message = MessageSentFilter {
nonce: U256::from_dec_str(nonce.to_string().as_str())
.expect("Should convert into U256"),
Expand All @@ -49,6 +49,7 @@ fn message(nonce: u64, block_number: u64, contract_address: u32) -> Log {
let mut log = message.into_log();
log.address = u32_to_contract(contract_address);
log.block_number = Some(block_number.into());
log.log_index = Some(index.into());
log
}

Expand Down
4 changes: 2 additions & 2 deletions crates/services/relayer/src/service/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async fn deploy_height_does_not_override() {
..Default::default()
};
let eth_node = MockMiddleware::default();
let relayer = NotInitializedTask::new(eth_node, mock_db.clone(), config);
let relayer = NotInitializedTask::new(eth_node, mock_db.clone(), config, false);
let _ = relayer.into_task(&Default::default(), ()).await;

assert_eq!(*mock_db.get_finalized_da_height().unwrap(), 50);
Expand All @@ -70,7 +70,7 @@ async fn deploy_height_does_override() {
..Default::default()
};
let eth_node = MockMiddleware::default();
let relayer = NotInitializedTask::new(eth_node, mock_db.clone(), config);
let relayer = NotInitializedTask::new(eth_node, mock_db.clone(), config, false);
let _ = relayer.into_task(&Default::default(), ()).await;

assert_eq!(*mock_db.get_finalized_da_height().unwrap(), 52);
Expand Down
52 changes: 27 additions & 25 deletions crates/services/relayer/src/test_helpers/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ impl Default for MockMiddleware {
s
}
}

#[derive(Error, Debug)]
/// Thrown when an error happens at the Nonce Manager
pub enum MockMiddlewareError {
Expand Down Expand Up @@ -261,31 +262,8 @@ impl Middleware for MockMiddleware {
async fn get_logs(&self, filter: &Filter) -> Result<Vec<Log>, Self::Error> {
tokio::task::yield_now().await;
self.before_event(TriggerType::GetLogs(filter));
let r = self.update_data(|data| {
data.logs_batch
.iter()
.flat_map(|logs| {
logs.iter().filter_map(|log| {
let r = match filter.address.as_ref()? {
ethers_core::types::ValueOrArray::Value(v) => {
log.address == *v
}
ethers_core::types::ValueOrArray::Array(v) => {
v.iter().any(|v| log.address == *v)
}
};
let log_block_num = log.block_number?;
let r = r
&& log_block_num
>= filter.block_option.get_from_block()?.as_number()?
&& log_block_num
<= filter.block_option.get_to_block()?.as_number()?;
r.then_some(log)
})
})
.cloned()
.collect()
});
let r =
self.update_data(|data| take_logs_based_on_filter(&data.logs_batch, filter));
self.after_event(TriggerType::GetLogs(filter));
Ok(r)
}
Expand All @@ -303,3 +281,27 @@ impl Middleware for MockMiddleware {
r
}
}

fn take_logs_based_on_filter(logs_batch: &[Vec<Log>], filter: &Filter) -> Vec<Log> {
logs_batch
.iter()
.flat_map(|logs| {
logs.iter().filter_map(|log| {
let r = match filter.address.as_ref()? {
ethers_core::types::ValueOrArray::Value(v) => log.address == *v,
ethers_core::types::ValueOrArray::Array(v) => {
v.iter().any(|v| log.address == *v)
}
};
let log_block_num = log.block_number?;
let r = r
&& log_block_num
>= filter.block_option.get_from_block()?.as_number()?
&& log_block_num
<= filter.block_option.get_to_block()?.as_number()?;
r.then_some(log)
})
})
.cloned()
.collect()
}
Loading

0 comments on commit 951308e

Please sign in to comment.