Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify Relayer service to order Events from L1 by block index #1779

Merged
merged 23 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
02ec8cd
Change mock db to maintain order
MitchTurner Mar 25, 2024
d68bd4b
Add failing test
MitchTurner Mar 25, 2024
2120be1
Get ordered tx test passing, start message test
MitchTurner Mar 25, 2024
b200848
WIP fixing message test
MitchTurner Mar 26, 2024
c39961f
Fix broken test
MitchTurner Mar 26, 2024
307cf7c
Mark hung test to keep CI from running forever
MitchTurner Mar 26, 2024
65491a2
Fix sad path test, fix other test definition
MitchTurner Mar 26, 2024
fe2c0f0
Make iterator combinator easier to read
MitchTurner Mar 26, 2024
bb33d12
Merge branch 'master' into ensure-relayer-message-order
MitchTurner Mar 26, 2024
5d9f37a
Update CHANGELOG
MitchTurner Mar 26, 2024
a977697
Add missing Options
MitchTurner Mar 26, 2024
cfc9939
Add more missing `Option`s
MitchTurner Mar 26, 2024
3db8add
Appease Clippy-sama
MitchTurner Mar 26, 2024
3ceacf7
Redesign mock to fix pagination tests
MitchTurner Mar 27, 2024
2f82803
Fix more tests
MitchTurner Mar 27, 2024
8427203
Remove `Option`, ignore Clippy-sama
MitchTurner Mar 27, 2024
e831dd0
Remove all the formatting changes to CHANGELOG and move to `Breaking`
MitchTurner Mar 27, 2024
954607c
Refactor some service stuff
MitchTurner Mar 27, 2024
46bba1c
Reverted changes to `MockMiddleware` and use timeout in test instead
MitchTurner Mar 27, 2024
502a1f8
Configure task to allow single runs in test
MitchTurner Mar 28, 2024
f0a8297
Fix broken constructors
MitchTurner Mar 28, 2024
daabb9d
Merge branch 'master' into ensure-relayer-message-order
xgreenx Mar 28, 2024
b249ebb
Return `Err` instead of `Ok` when `retry_on_error`
MitchTurner Mar 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
623 changes: 420 additions & 203 deletions CHANGELOG.md

Large diffs are not rendered by default.

50 changes: 39 additions & 11 deletions crates/services/relayer/src/mock_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ use fuel_core_types::{
services::relayer::Event,
};
use std::{
collections::{
BTreeMap,
HashMap,
},
collections::BTreeMap,
sync::{
Arc,
Mutex,
Expand All @@ -28,9 +25,9 @@ use std::{

#[derive(Default)]
pub struct Data {
pub messages: BTreeMap<DaBlockHeight, HashMap<Nonce, Message>>,
pub messages: BTreeMap<DaBlockHeight, Vec<(Nonce, Message)>>,
pub transactions:
BTreeMap<DaBlockHeight, HashMap<RelayedTransactionId, RelayedTransaction>>,
BTreeMap<DaBlockHeight, Vec<(RelayedTransactionId, RelayedTransaction)>>,
pub finalized_da_height: Option<DaBlockHeight>,
}

Expand All @@ -44,13 +41,27 @@ pub struct MockDb {
}

impl MockDb {
pub fn get_message(&self, id: &Nonce) -> Option<Message> {
pub fn get_message(&self, nonce: &Nonce) -> Option<Message> {
self.data
.lock()
.unwrap()
.messages
.iter()
.find_map(|(_, map)| map.get(id).cloned())
.find_map(|(_, map)| {
map.iter()
.find(|(inner_nonce, _msg)| nonce == inner_nonce)
.map(|(_, msg)| msg.clone())
})
}

pub fn get_messages_for_block(&self, da_block_height: DaBlockHeight) -> Vec<Message> {
self.data
.lock()
.unwrap()
.messages
.get(&da_block_height)
.map(|map| map.iter().map(|(_, msg)| msg).cloned().collect())
.unwrap_or_default()
}

pub fn get_transaction(
Expand All @@ -62,7 +73,24 @@ impl MockDb {
.unwrap()
.transactions
.iter()
.find_map(|(_, map)| map.get(id).cloned())
.find_map(|(_, txs)| {
txs.iter()
.find(|(inner_id, _tx)| id == inner_id)
.map(|(_, tx)| tx.clone())
})
}

pub fn get_transactions_for_block(
&self,
da_block_height: DaBlockHeight,
) -> Vec<RelayedTransaction> {
self.data
.lock()
.unwrap()
.transactions
.get(&da_block_height)
.map(|map| map.iter().map(|(_, tx)| tx).cloned().collect())
.unwrap_or_default()
}
}

Expand All @@ -79,13 +107,13 @@ impl RelayerDb for MockDb {
m.messages
.entry(message.da_height())
.or_default()
.insert(*message.id(), message.clone());
.push((*message.id(), message.clone()));
}
Event::Transaction(transaction) => {
m.transactions
.entry(transaction.da_height())
.or_default()
.insert(transaction.id(), transaction.clone());
.push((transaction.id(), transaction.clone()));
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/services/relayer/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ where
self.config.log_page_size,
);
let logs = logs.take_until(self.shutdown.while_started());

write_logs(&mut self.database, logs).await
}

Expand Down
47 changes: 31 additions & 16 deletions crates/services/relayer/src/service/get_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,24 @@ where
while let Some((last_height, events)) = logs.try_next().await? {
let last_height = last_height.into();
let mut ordered_events = BTreeMap::<DaBlockHeight, Vec<Event>>::new();
let fuel_events =
events
.into_iter()
.filter_map(|event| match EthEventLog::try_from(&event) {
Ok(event) => {
match event {
EthEventLog::Message(m) => {
Some(Ok(Event::Message(Message::from(&m))))
}
EthEventLog::Transaction(tx) => {
Some(Ok(Event::Transaction(RelayedTransaction::from(tx))))
}
// TODO: Log out ignored messages.
EthEventLog::Ignored => None,
let sorted_events = sort_events_by_log_index(events)?;
let fuel_events = sorted_events.into_iter().filter_map(|event| {
match EthEventLog::try_from(&event) {
Ok(event) => {
match event {
EthEventLog::Message(m) => {
Some(Ok(Event::Message(Message::from(&m))))
}
EthEventLog::Transaction(tx) => {
Some(Ok(Event::Transaction(RelayedTransaction::from(tx))))
}
// TODO: Log out ignored messages.
EthEventLog::Ignored => None,
}
Err(e) => Some(Err(e)),
});
}
Err(e) => Some(Err(e)),
}
});

for event in fuel_events {
let event = event?;
Expand All @@ -109,3 +109,18 @@ where
}
Ok(())
}

fn sort_events_by_log_index(events: Vec<Log>) -> anyhow::Result<Vec<Log>> {
let mut with_indexes = events
.into_iter()
.map(|e| {
let log_index = e
.log_index
.ok_or(anyhow::anyhow!("Log missing `log_index`: {e:?}"))?;
Copy link
Member Author

@MitchTurner MitchTurner Mar 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The alternative here would be set to 0 if None.

I think I prefer the error, but there are still ways for this to be non-deterministic, e.g. if there are more than one log with the same index.

So, maybe we need to have additional checks that there are no duplicates? And do we want to accept batches with missing log indices?

If we don't want to worry about all the edge cases, then we could just do .unwrap_or(0) instead of .ok_or?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talked to the team about this offline. It sounds like we are okay with throwing an error here and not including the batch. In practice, this value should always be Some.

In a similar vein, we can probably assume they will always be in order and all unique.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all events coming through this sort helper fn guaranteed to be grouped by da_height?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They get sorted into blocks after they are sorted by index. So it will maintain order once sorted by block.

Ok((log_index, e))
})
.collect::<anyhow::Result<Vec<_>>>()?;
with_indexes.sort_by(|(index_a, _a), (index_b, _b)| index_a.cmp(index_b));
let new_events = with_indexes.into_iter().map(|(_, e)| e).collect();
Ok(new_events)
}
7 changes: 4 additions & 3 deletions crates/services/relayer/src/service/get_logs/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ fn messages(
nonce
.zip(block_number)
.zip(contracts)
.map(|((n, b), c)| message(n, b, c))
.map(|((n, b), c)| message(n, b, c, 0))
.collect()
}

fn message(nonce: u64, block_number: u64, contract_address: u32) -> Log {
fn message(nonce: u64, block_number: u64, contract_address: u32, index: u64) -> Log {
let message = MessageSentFilter {
nonce: U256::from_dec_str(nonce.to_string().as_str())
.expect("Should convert into U256"),
Expand All @@ -49,6 +49,7 @@ fn message(nonce: u64, block_number: u64, contract_address: u32) -> Log {
let mut log = message.into_log();
log.address = u32_to_contract(contract_address);
log.block_number = Some(block_number.into());
log.log_index = Some(index.into());
log
}

Expand Down Expand Up @@ -119,7 +120,7 @@ async fn can_paginate_logs(input: Input) -> Expected {
let eth_node = MockMiddleware::default();

eth_node.update_data(|data| {
data.logs_batch = vec![logs];
data.logs_batch = vec![logs].into();
data.best_block.number = Some((*eth_gap.end()).into());
});
let count = std::sync::Arc::new(AtomicUsize::new(0));
Expand Down
2 changes: 1 addition & 1 deletion crates/services/relayer/src/service/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async fn can_download_logs() {
..Default::default()
},
];
eth_node.update_data(|data| data.logs_batch = vec![logs.clone()]);
eth_node.update_data(|data| data.logs_batch = vec![logs.clone()].into());

let eth_state = super::state::test_builder::TestDataSource {
eth_remote_finalized: 5,
Expand Down
94 changes: 69 additions & 25 deletions crates/services/relayer/src/test_helpers/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ struct InnerState {
pub struct MockData {
pub is_syncing: SyncingStatus,
pub best_block: Block<TxHash>,
pub logs_batch: Vec<Vec<Log>>,
pub logs_batch: Option<Vec<Vec<Log>>>,
MitchTurner marked this conversation as resolved.
Show resolved Hide resolved
pub logs_batch_index: usize,
}

Expand Down Expand Up @@ -141,7 +141,7 @@ impl Default for MockData {
MockData {
best_block,
is_syncing: SyncingStatus::IsFalse,
logs_batch: Vec::new(),
logs_batch: Some(Vec::new()),
logs_batch_index: 0,
}
}
Expand Down Expand Up @@ -262,29 +262,9 @@ impl Middleware for MockMiddleware {
tokio::task::yield_now().await;
self.before_event(TriggerType::GetLogs(filter));
let r = self.update_data(|data| {
data.logs_batch
.iter()
.flat_map(|logs| {
logs.iter().filter_map(|log| {
let r = match filter.address.as_ref()? {
ethers_core::types::ValueOrArray::Value(v) => {
log.address == *v
}
ethers_core::types::ValueOrArray::Array(v) => {
v.iter().any(|v| log.address == *v)
}
};
let log_block_num = log.block_number?;
let r = r
&& log_block_num
>= filter.block_option.get_from_block()?.as_number()?
&& log_block_num
<= filter.block_option.get_to_block()?.as_number()?;
r.then_some(log)
})
})
.cloned()
.collect()
let (leave, take) = take_logs_based_on_filter(&mut data.logs_batch, filter);
data.logs_batch = Some(leave);
take
});
self.after_event(TriggerType::GetLogs(filter));
Ok(r)
Expand All @@ -303,3 +283,67 @@ impl Middleware for MockMiddleware {
r
}
}

fn take_logs_based_on_filter(
logs_batch: &mut Option<Vec<Vec<Log>>>,
filter: &Filter,
) -> (Vec<Vec<Log>>, Vec<Log>) {
let (leave, take) = logs_batch
.take()
.unwrap_or_else(|| {
tracing::debug!("No logs batch found");
Vec::new()
})
.iter()
.map(|logs| {
logs.iter()
.fold((Vec::new(), Vec::new()), |(mut leave, mut take), log| {
let r = if let Some(address) = filter.address.as_ref() {
match address {
ethers_core::types::ValueOrArray::Value(v) => {
log.address == *v
}
ethers_core::types::ValueOrArray::Array(v) => {
v.iter().any(|v| log.address == *v)
}
}
} else {
false
};
let r = if let Some(log_block_num) = log.block_number {
if let Some(from_block) = filter
.block_option
.get_from_block()
.and_then(|b| b.as_number())
{
if let Some(to_block) = filter
.block_option
.get_to_block()
.and_then(|b| b.as_number())
{
r && log_block_num >= from_block
&& log_block_num <= to_block
} else {
false
}
} else {
true
}
} else {
false
};
if r {
take.push(log.clone());
} else {
leave.push(log.clone());
}
(leave, take)
})
})
.fold((Vec::new(), Vec::new()), |(mut leave, mut take), (l, t)| {
leave.push(l);
take.extend(t);
(leave, take)
});
(leave, take)
}
Loading
Loading