Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retransmit some timed out Ethereum events #1899

Merged
merged 21 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .changelog/unreleased/improvements/1865-eth-voting-power.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- Rework voting on Ethereum tallies across epoch boundaries
([\#1865](https://github.com/anoma/namada/pull/1865))
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- Retransmit timed out Ethereum events in case they have accumulated >1/3 voting
power ([\#1899](https://github.com/anoma/namada/pull/1899))
141 changes: 116 additions & 25 deletions apps/src/lib/node/ledger/shell/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::rc::Rc;

use borsh::{BorshDeserialize, BorshSerialize};
use masp_primitives::transaction::Transaction;
use namada::core::hints;
use namada::core::ledger::eth_bridge;
use namada::ledger::eth_bridge::{EthBridgeQueries, EthereumOracleConfig};
use namada::ledger::events::log::EventLog;
Expand All @@ -52,10 +53,11 @@ use namada::proto::{self, Section, Tx};
use namada::types::address::Address;
use namada::types::chain::ChainId;
use namada::types::ethereum_events::EthereumEvent;
use namada::types::internal::TxInQueue;
use namada::types::internal::{ExpiredTx, TxInQueue};
use namada::types::key::*;
use namada::types::storage::{BlockHeight, Key, TxIndex};
use namada::types::time::DateTimeUtc;
use namada::types::transaction::protocol::EthereumTxData;
use namada::types::transaction::{
hash_tx, verify_decrypted_correctly, AffineCurve, DecryptedTx,
EllipticCurve, PairingEngine, TxType, WrapperTx,
Expand Down Expand Up @@ -814,9 +816,6 @@ where
)
});

// NOTE: the oracle isn't started through governance votes, so we don't
// check to see if we need to start it after epoch transitions

let root = self.wl_storage.storage.merkle_root();
tracing::info!(
"Committed block hash: {}, height: {}",
Expand All @@ -825,11 +824,13 @@ where
);
response.data = root.0.to_vec();

// validator specific actions
if let ShellMode::Validator {
eth_oracle: Some(eth_oracle),
..
} = &self.mode
{
// update the oracle's last processed eth block
let last_processed_block = eth_oracle
.last_processed_block_receiver
.borrow()
Expand All @@ -849,32 +850,76 @@ where
blocks"
),
}

// broadcast any queued txs
self.broadcast_queued_txs();
}

#[cfg(not(feature = "abcipp"))]
{
use crate::node::ledger::shell::vote_extensions::iter_protocol_txs;
response
}

if let ShellMode::Validator { .. } = &self.mode {
let ext = self.craft_extension();
/// Empties all the ledger's queues of transactions to be broadcasted
/// via CometBFT's P2P network.
#[inline]
fn broadcast_queued_txs(&mut self) {
self.broadcast_protocol_txs();
self.broadcast_expired_txs();
}

let protocol_key = self
.mode
.get_protocol_key()
.expect("Validators should have protocol keys");
/// Broadcast any pending protocol transactions.
fn broadcast_protocol_txs(&mut self) {
use crate::node::ledger::shell::vote_extensions::iter_protocol_txs;

let protocol_txs = iter_protocol_txs(ext).map(|protocol_tx| {
protocol_tx
.sign(protocol_key, self.chain_id.clone())
.to_bytes()
});
let ext = self.craft_extension();

for tx in protocol_txs {
self.mode.broadcast(tx);
}
}
let protocol_key = self
.mode
.get_protocol_key()
.expect("Validators should have protocol keys");

let protocol_txs = iter_protocol_txs(ext).map(|protocol_tx| {
protocol_tx
.sign(protocol_key, self.chain_id.clone())
.to_bytes()
});

for tx in protocol_txs {
self.mode.broadcast(tx);
}
}

/// Broadcast any expired transactions.
fn broadcast_expired_txs(&mut self) {
let eth_events = {
let mut events: Vec<_> = self
.wl_storage
.storage
.expired_txs_queue
.drain()
.map(|expired_tx| match expired_tx {
ExpiredTx::EthereumEvent(event) => event,
})
.collect();
events.sort();
batconjurer marked this conversation as resolved.
Show resolved Hide resolved
events
};
if hints::likely(eth_events.is_empty()) {
// more often than not, there won't by any expired
// Ethereum events to retransmit
return;
}
if let Some(vote_extension) = self.sign_ethereum_events(eth_events) {
let protocol_key = self
.mode
.get_protocol_key()
.expect("Validators should have protocol keys");

let signed_tx = EthereumTxData::EthEventsVext(vote_extension)
.sign(protocol_key, self.chain_id.clone())
.to_bytes();

self.mode.broadcast(signed_tx);
}
response
}

/// Checks that neither the wrapper nor the inner transaction have already
Expand Down Expand Up @@ -2165,14 +2210,60 @@ mod abciplus_mempool_tests {
use namada::types::key::RefTo;
use namada::types::storage::BlockHeight;
use namada::types::transaction::protocol::{
EthereumTxData, ProtocolTx, ProtocolTxType,
ethereum_tx_data_variants, ProtocolTx, ProtocolTxType,
};
use namada::types::vote_extensions::{bridge_pool_roots, ethereum_events};

use super::*;
use crate::node::ledger::shell::test_utils;
use crate::wallet;

/// Check that broadcasting expired Ethereum events works
/// as expected.
#[test]
fn test_commit_broadcasts_expired_eth_events() {
let (mut shell, mut broadcaster_rx, _, _) =
test_utils::setup_at_height(5);

// push expired events to queue
let ethereum_event_0 = EthereumEvent::TransfersToNamada {
nonce: 0u64.into(),
transfers: vec![],
valid_transfers_map: vec![],
};
let ethereum_event_1 = EthereumEvent::TransfersToNamada {
nonce: 1u64.into(),
transfers: vec![],
valid_transfers_map: vec![],
};
shell
.wl_storage
.storage
.expired_txs_queue
.push(ExpiredTx::EthereumEvent(ethereum_event_0.clone()));
shell
.wl_storage
.storage
.expired_txs_queue
.push(ExpiredTx::EthereumEvent(ethereum_event_1.clone()));

// broadcast them
shell.broadcast_expired_txs();

// attempt to receive vote extension tx aggregating
// all expired events
let serialized_tx = broadcaster_rx.blocking_recv().unwrap();
let tx = Tx::try_from(&serialized_tx[..]).unwrap();

// check data inside tx
let vote_extension =
ethereum_tx_data_variants::EthEventsVext::try_from(&tx).unwrap();
assert_eq!(
vote_extension.data.ethereum_events,
vec![ethereum_event_0, ethereum_event_1]
);
}

/// Test that we do not include protocol txs in the mempool,
/// voting on ethereum events or signing bridge pool roots
/// and nonces if the bridge is inactive.
Expand Down Expand Up @@ -2310,7 +2401,7 @@ mod abciplus_mempool_tests {
}

#[cfg(test)]
mod test_mempool_validate {
mod tests {
use namada::proof_of_stake::Epoch;
use namada::proto::{Code, Data, Section, Signature, Tx};
use namada::types::transaction::{Fee, WrapperTx};
Expand Down
13 changes: 12 additions & 1 deletion apps/src/lib/node/ledger/shell/vote_extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,19 @@ where
}

/// Extend PreCommit votes with [`ethereum_events::Vext`] instances.
#[inline]
pub fn extend_vote_with_ethereum_events(
&mut self,
) -> Option<Signed<ethereum_events::Vext>> {
let events = self.new_ethereum_events();
self.sign_ethereum_events(events)
}

/// Sign the given Ethereum events, and return the associated
/// vote extension protocol transaction.
pub fn sign_ethereum_events(
&mut self,
ethereum_events: Vec<EthereumEvent>,
) -> Option<Signed<ethereum_events::Vext>> {
if !self.wl_storage.ethbridge_queries().is_bridge_active() {
return None;
Expand All @@ -124,7 +135,7 @@ where
.get_current_decision_height(),
#[cfg(not(feature = "abcipp"))]
block_height: self.wl_storage.storage.get_last_block_height(),
ethereum_events: self.new_ethereum_events(),
ethereum_events,
sug0 marked this conversation as resolved.
Show resolved Hide resolved
validator_addr,
};
if !ext.ethereum_events.is_empty() {
Expand Down
9 changes: 9 additions & 0 deletions core/src/ledger/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use crate::types::address::{
};
use crate::types::chain::{ChainId, CHAIN_ID_LENGTH};
use crate::types::hash::{Error as HashError, Hash};
use crate::types::internal::ExpiredTxsQueue;
// TODO
#[cfg(feature = "ferveo-tpke")]
use crate::types::internal::TxQueue;
Expand Down Expand Up @@ -104,6 +105,12 @@ where
/// Wrapper txs to be decrypted in the next block proposal
#[cfg(feature = "ferveo-tpke")]
pub tx_queue: TxQueue,
/// Queue of expired transactions that need to be retransmitted.
///
/// These transactions do not need to be persisted, as they are
/// retransmitted at the **COMMIT** phase immediately following
/// the block when they were queued.
pub expired_txs_queue: ExpiredTxsQueue,
/// The latest block height on Ethereum processed, if
/// the bridge is enabled.
pub ethereum_height: Option<ethereum_structs::BlockHeight>,
Expand Down Expand Up @@ -412,6 +419,7 @@ where
conversion_state: ConversionState::default(),
#[cfg(feature = "ferveo-tpke")]
tx_queue: TxQueue::default(),
expired_txs_queue: ExpiredTxsQueue::default(),
native_token,
ethereum_height: None,
eth_events_queue: EthEventsQueue::default(),
Expand Down Expand Up @@ -1168,6 +1176,7 @@ pub mod testing {
conversion_state: ConversionState::default(),
#[cfg(feature = "ferveo-tpke")]
tx_queue: TxQueue::default(),
expired_txs_queue: ExpiredTxsQueue::default(),
native_token: address::nam(),
ethereum_height: None,
eth_events_queue: EthEventsQueue::default(),
Expand Down
29 changes: 29 additions & 0 deletions core/src/types/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

use borsh::{BorshDeserialize, BorshSerialize};

use crate::types::ethereum_events::EthereumEvent;

/// A result of a wasm call to host functions that may fail.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum HostEnvResult {
Expand Down Expand Up @@ -105,3 +107,30 @@ mod tx_queue {

#[cfg(feature = "ferveo-tpke")]
pub use tx_queue::{TxInQueue, TxQueue};

/// Expired transaction kinds.
#[derive(Clone, Debug, BorshSerialize, BorshDeserialize)]
pub enum ExpiredTx {
/// Broadcast the given Ethereum event.
EthereumEvent(EthereumEvent),
}

/// Queue of expired transactions that need to be retransmitted.
#[derive(Default, Clone, Debug, BorshSerialize, BorshDeserialize)]
pub struct ExpiredTxsQueue {
inner: Vec<ExpiredTx>,
}

impl ExpiredTxsQueue {
/// Push a new transaction to the back of the queue.
#[inline]
pub fn push(&mut self, tx: ExpiredTx) {
self.inner.push(tx);
}

/// Consume all the transactions in the queue.
#[inline]
pub fn drain(&mut self) -> impl Iterator<Item = ExpiredTx> + '_ {
self.inner.drain(..)
}
}
6 changes: 5 additions & 1 deletion core/src/types/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,13 @@ impl Amount {
denom: impl Into<u8>,
) -> Result<Self, AmountParseError> {
let denom = denom.into();
let uint = uint.into();
if denom == 0 {
return Ok(Self { raw: uint });
}
match Uint::from(10)
.checked_pow(Uint::from(denom))
.and_then(|scaling| scaling.checked_mul(uint.into()))
.and_then(|scaling| scaling.checked_mul(uint))
{
Some(amount) => Ok(Self { raw: amount }),
None => Err(AmountParseError::ConvertToDecimal),
Expand Down
19 changes: 19 additions & 0 deletions core/src/types/voting_power.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use num_traits::ops::checked::CheckedAdd;
use serde::de::Visitor;
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};

use crate::types::token::Amount;
use crate::types::uint::Uint;

/// Namada voting power, normalized to the range `0 - 2^32`.
Expand Down Expand Up @@ -170,6 +171,24 @@ impl Mul<&FractionalVotingPower> for FractionalVotingPower {
}
}

impl Mul<Amount> for FractionalVotingPower {
type Output = Amount;

fn mul(self, rhs: Amount) -> Self::Output {
self * &rhs
}
}

impl Mul<&Amount> for FractionalVotingPower {
type Output = Amount;

fn mul(self, &rhs: &Amount) -> Self::Output {
let whole: Uint = rhs.into();
let fraction = (self.0 * whole).to_integer();
Amount::from_uint(fraction, 0u8).unwrap()
}
}

impl Add<FractionalVotingPower> for FractionalVotingPower {
type Output = Self;

Expand Down
Loading