Skip to content

Commit

Permalink
Handle begin & end block channel events (informalsystems#1801)
Browse files Browse the repository at this point in the history
* Implement conversions for channel events

* Implement conversions for packets

* Resurrect code (from PR informalsystems#1172) to extract begin/end-block events from a tendermint NewBlock event

* Add channel events in the right order

* Remove redundant clones

* Minor refactoring

* Fix failing CI tests

* Extract block events without depending on message.action

* Cleanup

* Add .changelog entry

* Document event extraction
  • Loading branch information
hu55a1n1 authored Feb 3, 2022
1 parent 450f328 commit 5991532
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- Handle channel events originating from Tendermint ABCI's BeginBlock and EndBlock methods
([#1793](https://github.com/informalsystems/ibc-rs/issues/1793))
202 changes: 177 additions & 25 deletions relayer/src/event/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use alloc::collections::BTreeMap as HashMap;
use core::convert::TryFrom;

use tendermint_rpc::{event::Event as RpcEvent, event::EventData as RpcEventData};

Expand All @@ -10,28 +11,123 @@ use ibc::events::{IbcEvent, RawObject};

use crate::event::monitor::queries;

/// Extract IBC events from Tendermint RPC events
///
/// Events originate from the following ABCI methods ->
/// 1. `DeliverTx` - these events are generated during the execution of transaction messages.
/// 2. `BeginBlock`
/// 3. `EndBlock`
///
/// Events originating from `DeliverTx` are currently extracted via the `RpcEvent::data` field as
/// the `EventData::Tx` variant.
/// Here's an example of what these events look like (i.e. `TxInfo::TxResult::events`) -
/// ```ron
/// [
/// Event {
/// type_str: "channel_open_init",
/// attributes: [
/// Tag {
/// key: Key(
/// "port_id",
/// ),
/// value: Value(
/// "transfer",
/// ),
/// },
/// Tag {
/// key: Key(
/// "channel_id",
/// ),
/// value: Value(
/// "channel-1",
/// ),
/// },
/// Tag {
/// key: Key(
/// "counterparty_port_id",
/// ),
/// value: Value(
/// "transfer",
/// ),
/// },
/// Tag {
/// key: Key(
/// "counterparty_channel_id",
/// ),
/// value: Value(
/// "",
/// ),
/// },
/// Tag {
/// key: Key(
/// "connection_id",
/// ),
/// value: Value(
/// "connection-1",
/// ),
/// },
/// ],
/// },
/// // ...
/// ]
/// ```
///
/// Events originating from `BeginBlock` and `EndBlock` methods are extracted via the
/// `RpcEvent::events` field. Here's an example of what these events look like ->
/// ```json
/// {
/// "channel_open_init.channel_id": [
/// "channel-0",
/// ],
/// "channel_open_init.connection_id": [
/// "connection-0",
/// ],
/// "channel_open_init.counterparty_channel_id": [
/// "channel-0",
/// ],
/// "channel_open_init.counterparty_port_id": [
/// "transfer",
/// ],
/// "channel_open_init.port_id": [
/// "transfer",
/// ],
/// // ...
/// }
/// ```
///
/// Note: Historically, all events were extracted from the `RpcEvent::events` field. This was
/// possible because these events had a `message.action` field that allowed us to infer the order in
/// which these events were triggered ->
/// ```json
/// "message.action": [
/// "update_client",
/// "channel_open_ack",
/// ],
/// "message.module": [
/// "ibc_client",
/// "ibc_channel",
/// ],
/// ```
/// {Begin,End}Block events however do not have any such `message.action` associated with them, so
/// this doesn't work. For this reason, we extract block events in the following order ->
/// OpenInit -> OpenTry -> OpenAck -> OpenConfirm -> SendPacket -> CloseInit -> CloseConfirm.
pub fn get_all_events(
chain_id: &ChainId,
result: RpcEvent,
) -> Result<Vec<(Height, IbcEvent)>, String> {
let mut vals: Vec<(Height, IbcEvent)> = vec![];
let RpcEvent { data, events, .. } = result;
let events = events.ok_or("missing events")?;

match &result.data {
match data {
RpcEventData::NewBlock { block, .. } => {
let height = Height::new(
ChainId::chain_version(chain_id.to_string().as_str()),
u64::from(block.as_ref().ok_or("tx.height")?.header.height),
);

vals.push((height, ClientEvents::NewBlock::new(height).into()));

if let Some(events) = &result.events {
let ibc_events =
send_packet_from_block_events(height, events.clone().into_iter().collect());
if !ibc_events.is_empty() {
vals.extend(ibc_events);
}
}
vals.append(&mut extract_block_events(height, &events));
}
RpcEventData::Tx { tx_result } => {
let height = Height::new(
Expand Down Expand Up @@ -62,11 +158,8 @@ pub fn get_all_events(
tracing::trace!("extracted {:?}", chan_event);
if matches!(chan_event, IbcEvent::SendPacket(_)) {
// Should be the same as the hash of tx_result.tx?
if let Some(hash) = result
.events
.as_ref()
.and_then(|events| events.get("tx.hash"))
.and_then(|values| values.get(0))
if let Some(hash) =
events.get("tx.hash").and_then(|values| values.get(0))
{
tracing::trace!(event = "SendPacket", "tx hash: {}", hash);
}
Expand All @@ -82,19 +175,78 @@ pub fn get_all_events(
Ok(vals)
}

fn send_packet_from_block_events(
fn extract_block_events(
height: Height,
events: HashMap<String, Vec<String>>,
block_events: &HashMap<String, Vec<String>>,
) -> Vec<(Height, IbcEvent)> {
let mut vals: Vec<(Height, IbcEvent)> = vec![];
if let Some(packets) = events.get("send_packet.packet_data") {
for i in 0..packets.len() {
let raw_obj = RawObject::new(height, "send_packet".to_string(), i, events.clone());
#[inline]
fn extract_events<'a, T: TryFrom<RawObject<'a>>>(
height: Height,
block_events: &'a HashMap<String, Vec<String>>,
event_type: &str,
event_field: &str,
) -> Vec<T> {
block_events
.get(&format!("{}.{}", event_type, event_field))
.unwrap_or(&vec![])
.iter()
.enumerate()
.filter_map(|(i, _)| {
let raw_obj = RawObject::new(height, event_type.to_owned(), i, block_events);
T::try_from(raw_obj).ok()
})
.collect()
}

if let Ok(pkg) = ChannelEvents::SendPacket::try_from(raw_obj) {
vals.push((height, IbcEvent::from(pkg)))
}
}
#[inline]
fn append_events<T: Into<IbcEvent>>(
events: &mut Vec<(Height, IbcEvent)>,
chan_events: Vec<T>,
height: Height,
) {
events.append(
&mut chan_events
.into_iter()
.map(|ev| (height, ev.into()))
.collect(),
);
}
vals

let mut events: Vec<(Height, IbcEvent)> = vec![];
append_events::<ChannelEvents::OpenInit>(
&mut events,
extract_events(height, block_events, "channel_open_init", "channel_id"),
height,
);
append_events::<ChannelEvents::OpenTry>(
&mut events,
extract_events(height, block_events, "channel_open_try", "channel_id"),
height,
);
append_events::<ChannelEvents::OpenAck>(
&mut events,
extract_events(height, block_events, "channel_open_ack", "channel_id"),
height,
);
append_events::<ChannelEvents::OpenConfirm>(
&mut events,
extract_events(height, block_events, "channel_open_confirm", "channel_id"),
height,
);
append_events::<ChannelEvents::SendPacket>(
&mut events,
extract_events(height, block_events, "send_packet", "packet_data"),
height,
);
append_events::<ChannelEvents::CloseInit>(
&mut events,
extract_events(height, block_events, "channel_close_init", "channel_id"),
height,
);
append_events::<ChannelEvents::CloseConfirm>(
&mut events,
extract_events(height, block_events, "channel_close_confirm", "channel_id"),
height,
);
events
}

0 comments on commit 5991532

Please sign in to comment.