Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify Relayer service to order Events from L1 by block index #1779

Merged
merged 23 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
02ec8cd
Change mock db to maintain order
MitchTurner Mar 25, 2024
d68bd4b
Add failing test
MitchTurner Mar 25, 2024
2120be1
Get ordered tx test passing, start message test
MitchTurner Mar 25, 2024
b200848
WIP fixing message test
MitchTurner Mar 26, 2024
c39961f
Fix broken test
MitchTurner Mar 26, 2024
307cf7c
Mark hung test to keep CI from running forever
MitchTurner Mar 26, 2024
65491a2
Fix sad path test, fix other test definition
MitchTurner Mar 26, 2024
fe2c0f0
Make iterator combinator easier to read
MitchTurner Mar 26, 2024
bb33d12
Merge branch 'master' into ensure-relayer-message-order
MitchTurner Mar 26, 2024
5d9f37a
Update CHANGELOG
MitchTurner Mar 26, 2024
a977697
Add missing Options
MitchTurner Mar 26, 2024
cfc9939
Add more missing `Option`s
MitchTurner Mar 26, 2024
3db8add
Appease Clippy-sama
MitchTurner Mar 26, 2024
3ceacf7
Redesign mock to fix pagination tests
MitchTurner Mar 27, 2024
2f82803
Fix more tests
MitchTurner Mar 27, 2024
8427203
Remove `Option`, ignore Clippy-sama
MitchTurner Mar 27, 2024
e831dd0
Remove all the formatting changes to CHANGELOG and move to `Breaking`
MitchTurner Mar 27, 2024
954607c
Refactor some service stuff
MitchTurner Mar 27, 2024
46bba1c
Reverted changes to `MockMiddleware` and use timeout in test instead
MitchTurner Mar 27, 2024
502a1f8
Configure task to allow single runs in test
MitchTurner Mar 28, 2024
f0a8297
Fix broken constructors
MitchTurner Mar 28, 2024
daabb9d
Merge branch 'master' into ensure-relayer-message-order
xgreenx Mar 28, 2024
b249ebb
Return `Err` instead of `Ok` when `retry_on_error`
MitchTurner Mar 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Description of the upcoming release here.
### Changed

#### Breaking
- [1779](https://github.com/FuelLabs/fuel-core/pull/1779): Modify Relayer service to order Events from L1 by block index
- [#1783](https://github.com/FuelLabs/fuel-core/pull/1783): The PR upgrade `fuel-vm` to `0.48.0` release. Because of some breaking changes, we also adapted our codebase to follow them:
- Implementation of `Default` for configs was moved under the `test-helpers` feature. The `fuel-core` binary uses testnet configuration instead of `Default::default`(for cases when `ChainConfig` was not provided by the user).
- All parameter types are enums now and require corresponding modifications across the codebase(we need to use getters and setters). The GraphQL API remains the same for simplicity, but each parameter now has one more field - `version`, that can be used to decide how to deserialize.
Expand Down
50 changes: 39 additions & 11 deletions crates/services/relayer/src/mock_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ use fuel_core_types::{
services::relayer::Event,
};
use std::{
collections::{
BTreeMap,
HashMap,
},
collections::BTreeMap,
sync::{
Arc,
Mutex,
Expand All @@ -28,9 +25,9 @@ use std::{

#[derive(Default)]
pub struct Data {
pub messages: BTreeMap<DaBlockHeight, HashMap<Nonce, Message>>,
pub messages: BTreeMap<DaBlockHeight, Vec<(Nonce, Message)>>,
pub transactions:
BTreeMap<DaBlockHeight, HashMap<RelayedTransactionId, RelayedTransaction>>,
BTreeMap<DaBlockHeight, Vec<(RelayedTransactionId, RelayedTransaction)>>,
pub finalized_da_height: Option<DaBlockHeight>,
}

Expand All @@ -44,13 +41,27 @@ pub struct MockDb {
}

impl MockDb {
pub fn get_message(&self, id: &Nonce) -> Option<Message> {
pub fn get_message(&self, nonce: &Nonce) -> Option<Message> {
self.data
.lock()
.unwrap()
.messages
.iter()
.find_map(|(_, map)| map.get(id).cloned())
.find_map(|(_, map)| {
map.iter()
.find(|(inner_nonce, _msg)| nonce == inner_nonce)
.map(|(_, msg)| msg.clone())
})
}

pub fn get_messages_for_block(&self, da_block_height: DaBlockHeight) -> Vec<Message> {
self.data
.lock()
.unwrap()
.messages
.get(&da_block_height)
.map(|map| map.iter().map(|(_, msg)| msg).cloned().collect())
.unwrap_or_default()
}

pub fn get_transaction(
Expand All @@ -62,7 +73,24 @@ impl MockDb {
.unwrap()
.transactions
.iter()
.find_map(|(_, map)| map.get(id).cloned())
.find_map(|(_, txs)| {
txs.iter()
.find(|(inner_id, _tx)| id == inner_id)
.map(|(_, tx)| tx.clone())
})
}

pub fn get_transactions_for_block(
&self,
da_block_height: DaBlockHeight,
) -> Vec<RelayedTransaction> {
self.data
.lock()
.unwrap()
.transactions
.get(&da_block_height)
.map(|map| map.iter().map(|(_, tx)| tx).cloned().collect())
.unwrap_or_default()
}
}

Expand All @@ -79,13 +107,13 @@ impl RelayerDb for MockDb {
m.messages
.entry(message.da_height())
.or_default()
.insert(*message.id(), message.clone());
.push((*message.id(), message.clone()));
}
Event::Transaction(transaction) => {
m.transactions
.entry(transaction.da_height())
.or_default()
.insert(transaction.id(), transaction.clone());
.push((transaction.id(), transaction.clone()));
}
}
}
Expand Down
38 changes: 32 additions & 6 deletions crates/services/relayer/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ pub struct NotInitializedTask<P, D> {
database: D,
/// Configuration settings.
config: Config,
/// Retry on error
retry_on_error: bool,
}

/// The actual relayer background task that syncs with the DA layer.
Expand All @@ -93,17 +95,20 @@ pub struct Task<P, D> {
/// The watcher used to track the state of the service. If the service stops,
/// the task will stop synchronization.
shutdown: StateWatcher,
/// Retry on error
retry_on_error: bool,
}

impl<P, D> NotInitializedTask<P, D> {
/// Create a new relayer task.
fn new(eth_node: P, database: D, config: Config) -> Self {
fn new(eth_node: P, database: D, config: Config, retry_on_error: bool) -> Self {
let (synced, _) = watch::channel(None);
Self {
synced,
eth_node,
database,
config,
retry_on_error,
}
}
}
Expand Down Expand Up @@ -153,6 +158,7 @@ where
self.config.log_page_size,
);
let logs = logs.take_until(self.shutdown.while_started());

write_logs(&mut self.database, logs).await
}

Expand Down Expand Up @@ -193,13 +199,15 @@ where
eth_node,
database,
config,
retry_on_error,
} = self;
let mut task = Task {
synced,
eth_node,
database,
config,
shutdown,
retry_on_error,
};
task.set_deploy_height();

Expand All @@ -215,7 +223,6 @@ where
{
async fn run(&mut self, _: &mut StateWatcher) -> anyhow::Result<bool> {
let now = tokio::time::Instant::now();
let should_continue = true;

let result = run::run(self).await;

Expand All @@ -231,7 +238,18 @@ where
.await;
}

result.map(|_| should_continue)
if let Err(err) = result {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to return an error if retry_on_error == true

if !self.retry_on_error {
tracing::error!("Exiting due to Error in relayer task: {:?}", err);
let should_continue = false;
Ok(should_continue)
} else {
Err(err)
}
} else {
let should_continue = true;
Ok(should_continue)
}
}

async fn shutdown(self) -> anyhow::Result<()> {
Expand Down Expand Up @@ -331,7 +349,13 @@ where
// TODO: Does this handle https?
let http = Http::new(url);
let eth_node = Provider::new(http);
Ok(new_service_internal(eth_node, database, config))
let retry_on_error = true;
Ok(new_service_internal(
eth_node,
database,
config,
retry_on_error,
))
}

#[cfg(any(test, feature = "test-helpers"))]
Expand All @@ -345,19 +369,21 @@ where
P: Middleware<Error = ProviderError> + 'static,
D: RelayerDb + Clone + 'static,
{
new_service_internal(eth_node, database, config)
let retry_on_fail = false;
new_service_internal(eth_node, database, config, retry_on_fail)
}

fn new_service_internal<P, D>(
eth_node: P,
database: D,
config: Config,
retry_on_error: bool,
) -> CustomizableService<P, D>
where
P: Middleware<Error = ProviderError> + 'static,
D: RelayerDb + Clone + 'static,
{
let task = NotInitializedTask::new(eth_node, database, config);
let task = NotInitializedTask::new(eth_node, database, config, retry_on_error);

CustomizableService::new(task)
}
47 changes: 31 additions & 16 deletions crates/services/relayer/src/service/get_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,24 @@ where
while let Some((last_height, events)) = logs.try_next().await? {
let last_height = last_height.into();
let mut ordered_events = BTreeMap::<DaBlockHeight, Vec<Event>>::new();
let fuel_events =
events
.into_iter()
.filter_map(|event| match EthEventLog::try_from(&event) {
Ok(event) => {
match event {
EthEventLog::Message(m) => {
Some(Ok(Event::Message(Message::from(&m))))
}
EthEventLog::Transaction(tx) => {
Some(Ok(Event::Transaction(RelayedTransaction::from(tx))))
}
// TODO: Log out ignored messages.
EthEventLog::Ignored => None,
let sorted_events = sort_events_by_log_index(events)?;
let fuel_events = sorted_events.into_iter().filter_map(|event| {
match EthEventLog::try_from(&event) {
Ok(event) => {
match event {
EthEventLog::Message(m) => {
Some(Ok(Event::Message(Message::from(&m))))
}
EthEventLog::Transaction(tx) => {
Some(Ok(Event::Transaction(RelayedTransaction::from(tx))))
}
// TODO: Log out ignored messages.
EthEventLog::Ignored => None,
}
Err(e) => Some(Err(e)),
});
}
Err(e) => Some(Err(e)),
}
});

for event in fuel_events {
let event = event?;
Expand All @@ -109,3 +109,18 @@ where
}
Ok(())
}

fn sort_events_by_log_index(events: Vec<Log>) -> anyhow::Result<Vec<Log>> {
let mut with_indexes = events
.into_iter()
.map(|e| {
let log_index = e
.log_index
.ok_or(anyhow::anyhow!("Log missing `log_index`: {e:?}"))?;
Copy link
Member Author

@MitchTurner MitchTurner Mar 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The alternative here would be set to 0 if None.

I think I prefer the error, but there are still ways for this to be non-deterministic, e.g. if there are more than one log with the same index.

So, maybe we need to have additional checks that there are no duplicates? And do we want to accept batches with missing log indices?

If we don't want to worry about all the edge cases, then we could just do .unwrap_or(0) instead of .ok_or?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talked to the team about this offline. It sounds like we are okay with throwing an error here and not including the batch. In practice, this value should always be Some.

In a similar vein, we can probably assume they will always be in order and all unique.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all events coming through this sort helper fn guaranteed to be grouped by da_height?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They get sorted into blocks after they are sorted by index. So it will maintain order once sorted by block.

Ok((log_index, e))
})
.collect::<anyhow::Result<Vec<_>>>()?;
with_indexes.sort_by(|(index_a, _a), (index_b, _b)| index_a.cmp(index_b));
let new_events = with_indexes.into_iter().map(|(_, e)| e).collect();
Ok(new_events)
}
5 changes: 3 additions & 2 deletions crates/services/relayer/src/service/get_logs/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ fn messages(
nonce
.zip(block_number)
.zip(contracts)
.map(|((n, b), c)| message(n, b, c))
.map(|((n, b), c)| message(n, b, c, 0))
.collect()
}

fn message(nonce: u64, block_number: u64, contract_address: u32) -> Log {
fn message(nonce: u64, block_number: u64, contract_address: u32, index: u64) -> Log {
let message = MessageSentFilter {
nonce: U256::from_dec_str(nonce.to_string().as_str())
.expect("Should convert into U256"),
Expand All @@ -49,6 +49,7 @@ fn message(nonce: u64, block_number: u64, contract_address: u32) -> Log {
let mut log = message.into_log();
log.address = u32_to_contract(contract_address);
log.block_number = Some(block_number.into());
log.log_index = Some(index.into());
log
}

Expand Down
4 changes: 2 additions & 2 deletions crates/services/relayer/src/service/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async fn deploy_height_does_not_override() {
..Default::default()
};
let eth_node = MockMiddleware::default();
let relayer = NotInitializedTask::new(eth_node, mock_db.clone(), config);
let relayer = NotInitializedTask::new(eth_node, mock_db.clone(), config, false);
let _ = relayer.into_task(&Default::default(), ()).await;

assert_eq!(*mock_db.get_finalized_da_height().unwrap(), 50);
Expand All @@ -70,7 +70,7 @@ async fn deploy_height_does_override() {
..Default::default()
};
let eth_node = MockMiddleware::default();
let relayer = NotInitializedTask::new(eth_node, mock_db.clone(), config);
let relayer = NotInitializedTask::new(eth_node, mock_db.clone(), config, false);
let _ = relayer.into_task(&Default::default(), ()).await;

assert_eq!(*mock_db.get_finalized_da_height().unwrap(), 52);
Expand Down
52 changes: 27 additions & 25 deletions crates/services/relayer/src/test_helpers/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ impl Default for MockMiddleware {
s
}
}

#[derive(Error, Debug)]
/// Thrown when an error happens at the Nonce Manager
pub enum MockMiddlewareError {
Expand Down Expand Up @@ -261,31 +262,8 @@ impl Middleware for MockMiddleware {
async fn get_logs(&self, filter: &Filter) -> Result<Vec<Log>, Self::Error> {
tokio::task::yield_now().await;
self.before_event(TriggerType::GetLogs(filter));
let r = self.update_data(|data| {
data.logs_batch
.iter()
.flat_map(|logs| {
logs.iter().filter_map(|log| {
let r = match filter.address.as_ref()? {
ethers_core::types::ValueOrArray::Value(v) => {
log.address == *v
}
ethers_core::types::ValueOrArray::Array(v) => {
v.iter().any(|v| log.address == *v)
}
};
let log_block_num = log.block_number?;
let r = r
&& log_block_num
>= filter.block_option.get_from_block()?.as_number()?
&& log_block_num
<= filter.block_option.get_to_block()?.as_number()?;
r.then_some(log)
})
})
.cloned()
.collect()
});
let r =
self.update_data(|data| take_logs_based_on_filter(&data.logs_batch, filter));
self.after_event(TriggerType::GetLogs(filter));
Ok(r)
}
Expand All @@ -303,3 +281,27 @@ impl Middleware for MockMiddleware {
r
}
}

fn take_logs_based_on_filter(logs_batch: &[Vec<Log>], filter: &Filter) -> Vec<Log> {
logs_batch
.iter()
.flat_map(|logs| {
logs.iter().filter_map(|log| {
let r = match filter.address.as_ref()? {
ethers_core::types::ValueOrArray::Value(v) => log.address == *v,
ethers_core::types::ValueOrArray::Array(v) => {
v.iter().any(|v| log.address == *v)
}
};
let log_block_num = log.block_number?;
let r = r
&& log_block_num
>= filter.block_option.get_from_block()?.as_number()?
&& log_block_num
<= filter.block_option.get_to_block()?.as_number()?;
r.then_some(log)
})
})
.cloned()
.collect()
}
Loading
Loading