Skip to content

Commit

Permalink
Merge branch 'tiago/retransmit-expired-eth-events' (#1899)
Browse files Browse the repository at this point in the history
* origin/tiago/retransmit-expired-eth-events:
  Changelog for #1899
  Change wording on Ethereum tally docstr
  Check if eth events to retransmit are empty
  Add test_commit_broadcasts_expired_eth_events() unit test
  Add test_delete_expired_tally() unit test
  Add deleted eth events with >1/3 voting power to expired queue
  Return deleted tally payload if its voting power is >1/3
  Broadcast expired txs
  Factor out signing eth events vote extensions in the shell
  Refactor broadcasting protocol txs
  Add expired txs queue to storage
  Implement queue of expired txs
  • Loading branch information
Fraccaman committed Sep 25, 2023
2 parents 3f0cbd7 + be4985a commit 76ffe09
Show file tree
Hide file tree
Showing 7 changed files with 251 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- Retransmit timed out Ethereum events in case they have accumulated >1/3 voting
power ([\#1899](https://github.com/anoma/namada/pull/1899))
141 changes: 116 additions & 25 deletions apps/src/lib/node/ledger/shell/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::rc::Rc;

use borsh::{BorshDeserialize, BorshSerialize};
use masp_primitives::transaction::Transaction;
use namada::core::hints;
use namada::core::ledger::eth_bridge;
use namada::ledger::eth_bridge::{EthBridgeQueries, EthereumOracleConfig};
use namada::ledger::events::log::EventLog;
Expand All @@ -53,10 +54,11 @@ use namada::proto::{self, Section, Tx};
use namada::types::address::Address;
use namada::types::chain::ChainId;
use namada::types::ethereum_events::EthereumEvent;
use namada::types::internal::TxInQueue;
use namada::types::internal::{ExpiredTx, TxInQueue};
use namada::types::key::*;
use namada::types::storage::{BlockHeight, Key, TxIndex};
use namada::types::time::DateTimeUtc;
use namada::types::transaction::protocol::EthereumTxData;
use namada::types::transaction::{
hash_tx, verify_decrypted_correctly, AffineCurve, DecryptedTx,
EllipticCurve, PairingEngine, TxType, WrapperTx,
Expand Down Expand Up @@ -815,9 +817,6 @@ where
)
});

// NOTE: the oracle isn't started through governance votes, so we don't
// check to see if we need to start it after epoch transitions

let root = self.wl_storage.storage.merkle_root();
tracing::info!(
"Committed block hash: {}, height: {}",
Expand All @@ -826,11 +825,13 @@ where
);
response.data = root.0.to_vec();

// validator specific actions
if let ShellMode::Validator {
eth_oracle: Some(eth_oracle),
..
} = &self.mode
{
// update the oracle's last processed eth block
let last_processed_block = eth_oracle
.last_processed_block_receiver
.borrow()
Expand All @@ -850,32 +851,76 @@ where
blocks"
),
}

// broadcast any queued txs
self.broadcast_queued_txs();
}

#[cfg(not(feature = "abcipp"))]
{
use crate::node::ledger::shell::vote_extensions::iter_protocol_txs;
response
}

if let ShellMode::Validator { .. } = &self.mode {
let ext = self.craft_extension();
/// Empties all the ledger's queues of transactions to be broadcasted
/// via CometBFT's P2P network.
#[inline]
fn broadcast_queued_txs(&mut self) {
self.broadcast_protocol_txs();
self.broadcast_expired_txs();
}

let protocol_key = self
.mode
.get_protocol_key()
.expect("Validators should have protocol keys");
/// Broadcast any pending protocol transactions.
fn broadcast_protocol_txs(&mut self) {
use crate::node::ledger::shell::vote_extensions::iter_protocol_txs;

let protocol_txs = iter_protocol_txs(ext).map(|protocol_tx| {
protocol_tx
.sign(protocol_key, self.chain_id.clone())
.to_bytes()
});
let ext = self.craft_extension();

for tx in protocol_txs {
self.mode.broadcast(tx);
}
}
let protocol_key = self
.mode
.get_protocol_key()
.expect("Validators should have protocol keys");

let protocol_txs = iter_protocol_txs(ext).map(|protocol_tx| {
protocol_tx
.sign(protocol_key, self.chain_id.clone())
.to_bytes()
});

for tx in protocol_txs {
self.mode.broadcast(tx);
}
}

/// Broadcast any expired transactions.
fn broadcast_expired_txs(&mut self) {
let eth_events = {
let mut events: Vec<_> = self
.wl_storage
.storage
.expired_txs_queue
.drain()
.map(|expired_tx| match expired_tx {
ExpiredTx::EthereumEvent(event) => event,
})
.collect();
events.sort();
events
};
if hints::likely(eth_events.is_empty()) {
// more often than not, there won't by any expired
// Ethereum events to retransmit
return;
}
if let Some(vote_extension) = self.sign_ethereum_events(eth_events) {
let protocol_key = self
.mode
.get_protocol_key()
.expect("Validators should have protocol keys");

let signed_tx = EthereumTxData::EthEventsVext(vote_extension)
.sign(protocol_key, self.chain_id.clone())
.to_bytes();

self.mode.broadcast(signed_tx);
}
response
}

/// Checks that neither the wrapper nor the inner transaction have already
Expand Down Expand Up @@ -2089,14 +2134,60 @@ mod abciplus_mempool_tests {
use namada::types::key::RefTo;
use namada::types::storage::BlockHeight;
use namada::types::transaction::protocol::{
EthereumTxData, ProtocolTx, ProtocolTxType,
ethereum_tx_data_variants, ProtocolTx, ProtocolTxType,
};
use namada::types::vote_extensions::{bridge_pool_roots, ethereum_events};

use super::*;
use crate::node::ledger::shell::test_utils;
use crate::wallet;

/// Check that broadcasting expired Ethereum events works
/// as expected.
#[test]
fn test_commit_broadcasts_expired_eth_events() {
let (mut shell, mut broadcaster_rx, _, _) =
test_utils::setup_at_height(5);

// push expired events to queue
let ethereum_event_0 = EthereumEvent::TransfersToNamada {
nonce: 0u64.into(),
transfers: vec![],
valid_transfers_map: vec![],
};
let ethereum_event_1 = EthereumEvent::TransfersToNamada {
nonce: 1u64.into(),
transfers: vec![],
valid_transfers_map: vec![],
};
shell
.wl_storage
.storage
.expired_txs_queue
.push(ExpiredTx::EthereumEvent(ethereum_event_0.clone()));
shell
.wl_storage
.storage
.expired_txs_queue
.push(ExpiredTx::EthereumEvent(ethereum_event_1.clone()));

// broadcast them
shell.broadcast_expired_txs();

// attempt to receive vote extension tx aggregating
// all expired events
let serialized_tx = broadcaster_rx.blocking_recv().unwrap();
let tx = Tx::try_from(&serialized_tx[..]).unwrap();

// check data inside tx
let vote_extension =
ethereum_tx_data_variants::EthEventsVext::try_from(&tx).unwrap();
assert_eq!(
vote_extension.data.ethereum_events,
vec![ethereum_event_0, ethereum_event_1]
);
}

/// Test that we do not include protocol txs in the mempool,
/// voting on ethereum events or signing bridge pool roots
/// and nonces if the bridge is inactive.
Expand Down Expand Up @@ -2235,7 +2326,7 @@ mod abciplus_mempool_tests {
}

#[cfg(test)]
mod test_mempool_validate {
mod tests {
use namada::proof_of_stake::Epoch;
use namada::proto::{Code, Data, Section, Signature, Tx};
use namada::types::transaction::{Fee, WrapperTx};
Expand Down
13 changes: 12 additions & 1 deletion apps/src/lib/node/ledger/shell/vote_extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,19 @@ where
}

/// Extend PreCommit votes with [`ethereum_events::Vext`] instances.
#[inline]
pub fn extend_vote_with_ethereum_events(
&mut self,
) -> Option<Signed<ethereum_events::Vext>> {
let events = self.new_ethereum_events();
self.sign_ethereum_events(events)
}

/// Sign the given Ethereum events, and return the associated
/// vote extension protocol transaction.
pub fn sign_ethereum_events(
&mut self,
ethereum_events: Vec<EthereumEvent>,
) -> Option<Signed<ethereum_events::Vext>> {
if !self.wl_storage.ethbridge_queries().is_bridge_active() {
return None;
Expand All @@ -124,7 +135,7 @@ where
.get_current_decision_height(),
#[cfg(not(feature = "abcipp"))]
block_height: self.wl_storage.storage.get_last_block_height(),
ethereum_events: self.new_ethereum_events(),
ethereum_events,
validator_addr,
};
if !ext.ethereum_events.is_empty() {
Expand Down
9 changes: 9 additions & 0 deletions core/src/ledger/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use crate::types::address::{
};
use crate::types::chain::{ChainId, CHAIN_ID_LENGTH};
use crate::types::hash::{Error as HashError, Hash};
use crate::types::internal::ExpiredTxsQueue;
// TODO
#[cfg(feature = "ferveo-tpke")]
use crate::types::internal::TxQueue;
Expand Down Expand Up @@ -104,6 +105,12 @@ where
/// Wrapper txs to be decrypted in the next block proposal
#[cfg(feature = "ferveo-tpke")]
pub tx_queue: TxQueue,
/// Queue of expired transactions that need to be retransmitted.
///
/// These transactions do not need to be persisted, as they are
/// retransmitted at the **COMMIT** phase immediately following
/// the block when they were queued.
pub expired_txs_queue: ExpiredTxsQueue,
/// The latest block height on Ethereum processed, if
/// the bridge is enabled.
pub ethereum_height: Option<ethereum_structs::BlockHeight>,
Expand Down Expand Up @@ -412,6 +419,7 @@ where
conversion_state: ConversionState::default(),
#[cfg(feature = "ferveo-tpke")]
tx_queue: TxQueue::default(),
expired_txs_queue: ExpiredTxsQueue::default(),
native_token,
ethereum_height: None,
eth_events_queue: EthEventsQueue::default(),
Expand Down Expand Up @@ -1168,6 +1176,7 @@ pub mod testing {
conversion_state: ConversionState::default(),
#[cfg(feature = "ferveo-tpke")]
tx_queue: TxQueue::default(),
expired_txs_queue: ExpiredTxsQueue::default(),
native_token: address::nam(),
ethereum_height: None,
eth_events_queue: EthEventsQueue::default(),
Expand Down
29 changes: 29 additions & 0 deletions core/src/types/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
use borsh::{BorshDeserialize, BorshSerialize};

use crate::types::ethereum_events::EthereumEvent;

/// A result of a wasm call to host functions that may fail.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum HostEnvResult {
Expand Down Expand Up @@ -99,3 +101,30 @@ mod tx_queue {

#[cfg(feature = "ferveo-tpke")]
pub use tx_queue::{TxInQueue, TxQueue};

/// Expired transaction kinds.
#[derive(Clone, Debug, BorshSerialize, BorshDeserialize)]
pub enum ExpiredTx {
/// Broadcast the given Ethereum event.
EthereumEvent(EthereumEvent),
}

/// Queue of expired transactions that need to be retransmitted.
#[derive(Default, Clone, Debug, BorshSerialize, BorshDeserialize)]
pub struct ExpiredTxsQueue {
inner: Vec<ExpiredTx>,
}

impl ExpiredTxsQueue {
/// Push a new transaction to the back of the queue.
#[inline]
pub fn push(&mut self, tx: ExpiredTx) {
self.inner.push(tx);
}

/// Consume all the transactions in the queue.
#[inline]
pub fn drain(&mut self) -> impl Iterator<Item = ExpiredTx> + '_ {
self.inner.drain(..)
}
}
18 changes: 17 additions & 1 deletion ethereum_bridge/src/protocol/transactions/ethereum_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use namada_core::ledger::storage::traits::StorageHasher;
use namada_core::ledger::storage::{DBIter, WlStorage, DB};
use namada_core::types::address::Address;
use namada_core::types::ethereum_events::EthereumEvent;
use namada_core::types::internal::ExpiredTx;
use namada_core::types::storage::{BlockHeight, Epoch, Key};
use namada_core::types::token::Amount;
use namada_core::types::transaction::TxResult;
Expand Down Expand Up @@ -199,7 +200,22 @@ where
%keys.prefix,
"Ethereum event timed out",
);
votes::storage::delete(wl_storage, &keys)?;
if let Some(event) = votes::storage::delete(wl_storage, &keys)? {
tracing::debug!(
%keys.prefix,
"Queueing Ethereum event for retransmission",
);
// NOTE: if we error out in the `ethereum_bridge` crate,
// currently there is no way to reset the expired txs queue
// to its previous state. this shouldn't be a big deal, as
// replaying ethereum events has no effect on the ledger.
// however, we may need to revisit this code if we ever
// implement slashing on double voting of ethereum events.
wl_storage
.storage
.expired_txs_queue
.push(ExpiredTx::EthereumEvent(event));
}
changed.extend(keys.clone().into_iter());
}

Expand Down
Loading

0 comments on commit 76ffe09

Please sign in to comment.