Skip to content

Commit

Permalink
Merge branch 'tiago/main/event-log' (#674)
Browse files Browse the repository at this point in the history
* tiago/main/event-log:
  changelog: add #674
  Small fixes
  Code review suggestions
  [ci] wasm checksums update
  Move namada_apps::node::ledger::events to the shared crate
  Update apps/src/lib/client/tendermint_rpc_types.rs
  Update apps/src/lib/client/tx.rs
  Implement event log
  • Loading branch information
tzemanovic committed Nov 16, 2022
2 parents 06911fa + 20ad42b commit 9e0dd9f
Show file tree
Hide file tree
Showing 27 changed files with 765 additions and 965 deletions.
3 changes: 3 additions & 0 deletions .changelog/unreleased/improvements/674-event-log.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- Added a custom events store and replaced WebSocket client for
transaction results with query endpoints to the events store.
([#674](https://github.com/anoma/namada/pull/674))
24 changes: 11 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion apps/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ flate2 = "1.0.22"
file-lock = "2.0.2"
futures = "0.3"
itertools = "0.10.1"
jsonpath_lib = "0.3.0"
libc = "0.2.97"
libloading = "0.7.2"
num-derive = "0.3.3"
Expand Down
1 change: 0 additions & 1 deletion apps/src/lib/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
pub mod rpc;
pub mod signing;
pub mod tendermint_rpc_types;
mod tendermint_websocket_client;
pub mod tx;
pub mod types;
pub mod utils;
115 changes: 98 additions & 17 deletions apps/src/lib/client/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ use async_std::path::PathBuf;
use async_std::prelude::*;
use borsh::{BorshDeserialize, BorshSerialize};
use data_encoding::HEXLOWER;
use eyre::{eyre, Context as EyreContext};
use itertools::Itertools;
use masp_primitives::asset_type::AssetType;
use masp_primitives::merkle_tree::MerklePath;
use masp_primitives::primitives::ViewingKey;
use masp_primitives::sapling::Node;
use masp_primitives::transaction::components::Amount;
use masp_primitives::zip32::ExtendedFullViewingKey;
use namada::ledger::events::Event;
use namada::ledger::governance::parameters::GovParams;
use namada::ledger::governance::storage as gov_storage;
use namada::ledger::governance::utils::Votes;
Expand All @@ -39,6 +41,7 @@ use namada::types::governance::{
OfflineProposal, OfflineVote, ProposalResult, ProposalVote, TallyResult,
VotePower,
};
use namada::types::hash::Hash;
use namada::types::key::*;
use namada::types::masp::{BalanceOwner, ExtendedViewingKey, PaymentAddress};
use namada::types::storage::{
Expand All @@ -50,6 +53,7 @@ use namada::types::transaction::{
WrapperTx,
};
use namada::types::{address, storage, token};
use tokio::time::{Duration, Instant};

use crate::cli::{self, args, Context};
use crate::client::tendermint_rpc_types::TxResponse;
Expand All @@ -64,6 +68,57 @@ use crate::facade::tendermint_rpc::{
Client, HttpClient, Order, SubscriptionClient, WebSocketClient,
};

/// Query the status of a given transaction.
///
/// If a response is not delivered until `deadline`, we exit the cli with an
/// error.
pub async fn query_tx_status(
status: TxEventQuery<'_>,
address: TendermintAddress,
deadline: Instant,
) -> Event {
const ONE_SECOND: Duration = Duration::from_secs(1);
// sleep for the duration of `backoff`,
// and update the underlying value
async fn sleep_update(query: TxEventQuery<'_>, backoff: &mut Duration) {
tracing::debug!(
?query,
duration = ?backoff,
"Retrying tx status query after timeout",
);
// simple linear backoff - if an event is not available,
// increase the backoff duration by one second
tokio::time::sleep(*backoff).await;
*backoff += ONE_SECOND;
}
tokio::time::timeout_at(deadline, async move {
let client = HttpClient::new(address).unwrap();
let mut backoff = ONE_SECOND;

loop {
tracing::debug!(query = ?status, "Querying tx status");
let maybe_event = match query_tx_events(&client, status).await {
Ok(response) => response,
Err(err) => {
tracing::debug!(%err, "ABCI query failed");
sleep_update(status, &mut backoff).await;
continue;
}
};
if let Some(e) = maybe_event {
break Ok(e);
}
sleep_update(status, &mut backoff).await;
}
})
.await
.map_err(|_| {
eprintln!("Transaction status query deadline of {deadline:?} exceeded");
})
.and_then(|result| result)
.unwrap_or_else(|_| cli::safe_exit(1))
}

/// Query the epoch of the last committed block
pub async fn query_epoch(args: args::Query) -> Epoch {
let client = HttpClient::new(args.ledger_address).unwrap();
Expand Down Expand Up @@ -2197,23 +2252,23 @@ pub async fn query_has_storage_key(
}

/// Represents a query for an event pertaining to the specified transaction
#[derive(Debug, Clone)]
pub enum TxEventQuery {
Accepted(String),
Applied(String),
#[derive(Debug, Copy, Clone)]
pub enum TxEventQuery<'a> {
Accepted(&'a str),
Applied(&'a str),
}

impl TxEventQuery {
impl<'a> TxEventQuery<'a> {
/// The event type to which this event query pertains
fn event_type(&self) -> &'static str {
fn event_type(self) -> &'static str {
match self {
TxEventQuery::Accepted(_tx_hash) => "accepted",
TxEventQuery::Applied(_tx_hash) => "applied",
TxEventQuery::Accepted(_) => "accepted",
TxEventQuery::Applied(_) => "applied",
}
}

/// The transaction to which this event query pertains
fn tx_hash(&self) -> &String {
fn tx_hash(self) -> &'a str {
match self {
TxEventQuery::Accepted(tx_hash) => tx_hash,
TxEventQuery::Applied(tx_hash) => tx_hash,
Expand All @@ -2222,8 +2277,8 @@ impl TxEventQuery {
}

/// Transaction event queries are semantically a subset of general queries
impl From<TxEventQuery> for Query {
fn from(tx_query: TxEventQuery) -> Self {
impl<'a> From<TxEventQuery<'a>> for Query {
fn from(tx_query: TxEventQuery<'a>) -> Self {
match tx_query {
TxEventQuery::Accepted(tx_hash) => {
Query::default().and_eq("accepted.hash", tx_hash)
Expand All @@ -2235,17 +2290,43 @@ impl From<TxEventQuery> for Query {
}
}

/// Call the corresponding `tx_event_query` RPC method, to fetch
/// the current status of a transation.
pub async fn query_tx_events(
client: &HttpClient,
tx_event_query: TxEventQuery<'_>,
) -> eyre::Result<Option<Event>> {
let tx_hash: Hash = tx_event_query.tx_hash().try_into()?;
match tx_event_query {
TxEventQuery::Accepted(_) => RPC
.shell()
.accepted(client, &tx_hash)
.await
.wrap_err_with(|| {
eyre!("Failed querying whether a transaction was accepted")
}),
TxEventQuery::Applied(_) => RPC
.shell()
.applied(client, &tx_hash)
.await
.wrap_err_with(|| {
eyre!("Error querying whether a transaction was applied")
}),
}
}

/// Lookup the full response accompanying the specified transaction event
// TODO: maybe remove this in favor of `query_tx_status`
pub async fn query_tx_response(
ledger_address: &TendermintAddress,
tx_query: TxEventQuery,
tx_query: TxEventQuery<'_>,
) -> Result<TxResponse, TError> {
// Connect to the Tendermint server holding the transactions
let (client, driver) = WebSocketClient::new(ledger_address.clone()).await?;
let driver_handle = tokio::spawn(async move { driver.run().await });
// Find all blocks that apply a transaction with the specified hash
let blocks = &client
.block_search(Query::from(tx_query.clone()), 1, 255, Order::Ascending)
.block_search(tx_query.into(), 1, 255, Order::Ascending)
.await
.expect("Unable to query for transaction with given hash")
.blocks;
Expand Down Expand Up @@ -2321,7 +2402,7 @@ pub async fn query_result(_ctx: Context, args: args::QueryResult) {
// First try looking up application event pertaining to given hash.
let tx_response = query_tx_response(
&args.query.ledger_address,
TxEventQuery::Applied(args.tx_hash.clone()),
TxEventQuery::Applied(&args.tx_hash),
)
.await;
match tx_response {
Expand All @@ -2335,7 +2416,7 @@ pub async fn query_result(_ctx: Context, args: args::QueryResult) {
// If this fails then instead look for an acceptance event.
let tx_response = query_tx_response(
&args.query.ledger_address,
TxEventQuery::Accepted(args.tx_hash),
TxEventQuery::Accepted(&args.tx_hash),
)
.await;
match tx_response {
Expand Down Expand Up @@ -2374,7 +2455,7 @@ pub async fn get_proposal_votes(
if let Some(vote_iter) = vote_iter {
for (key, vote) in vote_iter {
let voter_address = gov_storage::get_voter_address(&key)
.expect("Vote key should contains the voting address.")
.expect("Vote key should contain the voting address.")
.clone();
if vote.is_yay() && validators.contains(&voter_address) {
let amount: VotePower =
Expand All @@ -2386,7 +2467,7 @@ pub async fn get_proposal_votes(
let validator_address =
gov_storage::get_vote_delegation_address(&key)
.expect(
"Vote key should contains the delegation address.",
"Vote key should contain the delegation address.",
)
.clone();
let delegator_token_amount = get_bond_amount_at(
Expand Down
Loading

0 comments on commit 9e0dd9f

Please sign in to comment.