Skip to content

Commit

Permalink
Recover from missed RPC events after WebSocket subscription is closed…
Browse files Browse the repository at this point in the history
… by Tendermint (informalsystems#1205)

After some investigation, the culprit for informalsystems#1196 seems to be that Tendermint is closing the WebSocket connection over which we listen for IBC events whenever more than 100 txs are included in a single block [0], as we are not able to pull the events fast enough over the WebSocket connection to avoid completely filling the event buffer in Tendermint (which currently has a hard-coded capacity of 100 events, hence the issue).

We never noticed this previously since this problem only appears in practice with a high-enough commit/propose timeout (to allow enough txs to be included in a single block), and we were testing with a lower value for the timeouts.

Now that we landed some changes in tendermint-rs [1] which allow us to notice the connection being closed, this PR makes use of this to resubscribe to the events and trigger a packet clear whenever we notice the connection being closed under our feet.

[0] tendermint/tendermint#6729
[1] informalsystems/tendermint-rs#929

---

* Propagate JSON-RPC errors through the Rust subscription

* Use tendermint-rs branch with both fixes

* Fix compilation issue in tests

* Clear pending packets when event subscription is cancelled

* Temp: Update one-chain script to use 10s commit timeout

* Use tendermint-rs master

* Update Cargo.lock

* Update changelog

* Update lockfile

* Increase delay before checking for relaying result in e2e tests

* Add comment explaining who the RPC error is propagated to

* Improve event monitor logs

* Reset `timeout_commit` and `timeout_propose` to 1s
  • Loading branch information
romac authored Jul 21, 2021
1 parent 77ba767 commit 1c578f1
Show file tree
Hide file tree
Showing 15 changed files with 368 additions and 172 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@

- [ibc-relayer]
- Fixed: Hermes does not clear packets on start ([#1200])
- Recover from missed RPC events after WebSocket subscription is closed by Tendermint ([#1196])


[#1094]: https://github.com/informalsystems/ibc-rs/issues/1094
[#1114]: https://github.com/informalsystems/ibc-rs/issues/1114
[#1192]: https://github.com/informalsystems/ibc-rs/issues/1192
[#1194]: https://github.com/informalsystems/ibc-rs/issues/1194
[#1196]: https://github.com/informalsystems/ibc-rs/issues/1196
[#1198]: https://github.com/informalsystems/ibc-rs/issues/1198
[#1200]: https://github.com/informalsystems/ibc-rs/issues/1200

Expand Down
50 changes: 25 additions & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 37 additions & 27 deletions e2e/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@


def passive_packets(
c: Config,
ibc0: ChainId, ibc1: ChainId, port_id: PortId,
ibc0_channel_id: ChannelId, ibc1_channel_id: ChannelId):
c: Config,
ibc0: ChainId, ibc1: ChainId, port_id: PortId,
ibc0_channel_id: ChannelId, ibc1_channel_id: ChannelId):

# 1. create some unreceived acks

Expand All @@ -27,26 +27,26 @@ def passive_packets(
src_channel=ibc0_channel_id, amount=10000, height_offset=1000, number_msgs=2)

# hermes tx raw ft-transfer ibc-0 ibc-1 transfer channel-1 10000 1000 -n 2
packet.packet_send(c, src=ibc1, dst=ibc0 , src_port=port_id,
packet.packet_send(c, src=ibc1, dst=ibc0, src_port=port_id,
src_channel=ibc1_channel_id, amount=10000, height_offset=1000, number_msgs=2)
sleep(5.0)

# hermes tx raw packet-recv ibc-1 ibc-0 transfer channel-0
packet.packet_recv(c, src=ibc0 , dst=ibc1,
packet.packet_recv(c, src=ibc0, dst=ibc1,
src_port=port_id, src_channel=ibc0_channel_id)

# hermes tx raw packet-recv ibc-0 ibc-1 transfer channel-1
packet.packet_recv(c, src=ibc1, dst=ibc0 ,
packet.packet_recv(c, src=ibc1, dst=ibc0,
src_port=port_id, src_channel=ibc1_channel_id)

# 2. create some unreceived packets

# hermes tx raw ft-transfer ibc-0 ibc-1 transfer channel-1 10000 1000 -n 3
packet.packet_send(c, src=ibc1, dst=ibc0 , src_port=port_id,
packet.packet_send(c, src=ibc1, dst=ibc0, src_port=port_id,
src_channel=ibc1_channel_id, amount=10000, height_offset=1000, number_msgs=3)

# hermes tx raw ft-transfer ibc-1 ibc-0 transfer channel-0 10000 1000 -n 4
packet.packet_send(c, src=ibc0 , dst=ibc1, src_port=port_id,
packet.packet_send(c, src=ibc0, dst=ibc1, src_port=port_id,
src_channel=ibc0_channel_id, amount=10000, height_offset=1000, number_msgs=4)

sleep(10.0)
Expand All @@ -55,7 +55,7 @@ def passive_packets(

# hermes query packet unreceived-packets ibc-0 transfer channel-0
unreceived = packet.query_unreceived_packets(
c, chain=ibc0 , port=port_id, channel=ibc0_channel_id)
c, chain=ibc0, port=port_id, channel=ibc0_channel_id)

assert (len(unreceived) == 3), (unreceived, "unreceived packet mismatch")

Expand All @@ -73,7 +73,7 @@ def passive_packets(

# hermes query packet unreceived-acks ibc-0 transfer channel-0
unreceived = packet.query_unreceived_acks(
c, chain=ibc0 , port=port_id, channel=ibc0_channel_id)
c, chain=ibc0, port=port_id, channel=ibc0_channel_id)

assert (len(unreceived) == 2), (unreceived, "unreceived packet mismatch")

Expand All @@ -100,28 +100,29 @@ def passive_packets(

# hermes query packet unreceived-packets ibc-0 transfer channel-0
unreceived = packet.query_unreceived_packets(
c, chain=ibc0 , port=port_id, channel=ibc0_channel_id)
c, chain=ibc0, port=port_id, channel=ibc0_channel_id)

assert (len(unreceived) == 0), (unreceived,
"unreceived packets mismatch (expected 0)")

# hermes query packet unreceived-acks ibc-0 transfer channel-0
unreceived = packet.query_unreceived_acks(
c, chain=ibc0 , port=port_id, channel=ibc0_channel_id)
c, chain=ibc0, port=port_id, channel=ibc0_channel_id)

assert (len(unreceived) == 0), (unreceived,
"unreceived acks mismatch (expected 0)")

# 7. send some packets
# hermes tx raw ft-transfer ibc-0 ibc-1 transfer channel-1 10000 1000 -n 3
packet.packet_send(c, src=ibc1, dst=ibc0 , src_port=port_id,
packet.packet_send(c, src=ibc1, dst=ibc0, src_port=port_id,
src_channel=ibc1_channel_id, amount=10000, height_offset=1000, number_msgs=3)

# hermes tx raw ft-transfer ibc-1 ibc-0 transfer channel-0 10000 1000 -n 4
packet.packet_send(c, src=ibc0, dst=ibc1, src_port=port_id,
src_channel=ibc0_channel_id, amount=10000, height_offset=1000, number_msgs=4)

sleep(10.0)
sleep(20.0)

# 8. verify that there are no pending packets
# hermes query packet unreceived-packets ibc-1 transfer channel-1
unreceived = packet.query_unreceived_packets(
Expand All @@ -139,30 +140,30 @@ def passive_packets(

# hermes query packet unreceived-packets ibc-0 transfer channel-0
unreceived = packet.query_unreceived_packets(
c, chain=ibc0 , port=port_id, channel=ibc0_channel_id)
c, chain=ibc0, port=port_id, channel=ibc0_channel_id)

assert (len(unreceived) == 0), (unreceived,
"unreceived packets mismatch (expected 0)")

# hermes query packet unreceived-acks ibc-0 transfer channel-0
unreceived = packet.query_unreceived_acks(
c, chain=ibc0 , port=port_id, channel=ibc0_channel_id)
c, chain=ibc0, port=port_id, channel=ibc0_channel_id)

assert (len(unreceived) == 0), (unreceived,
"unreceived acks mismatch (expected 0)")
# 9.Stop the relayer
proc.kill()


def raw(c: Config, ibc0: ChainId, ibc1: ChainId, port_id: PortId) -> Tuple[ ClientId, ConnectionId, ChannelId, ClientId, ConnectionId, ChannelId]:
def raw(c: Config, ibc0: ChainId, ibc1: ChainId, port_id: PortId) -> Tuple[ClientId, ConnectionId, ChannelId, ClientId, ConnectionId, ChannelId]:
ibc0_client_id = client.create_update_query_client(c, ibc0, ibc1)

# Allocate first IDs on ibc-1
ibc1_client_id = client.create_update_query_client(c, ibc1, ibc0)
ibc1_conn_id = connection.conn_init(
c, ibc1, ibc0 , ibc1_client_id, ibc0_client_id)
c, ibc1, ibc0, ibc1_client_id, ibc0_client_id)
ibc1_chan_id = channel.chan_open_init(
c, dst=ibc1, src=ibc0 , dst_conn=ibc1_conn_id)
c, dst=ibc1, src=ibc0, dst_conn=ibc1_conn_id)

ibc1_client_id = client.create_update_query_client(c, ibc1, ibc0)

Expand Down Expand Up @@ -195,6 +196,7 @@ def raw(c: Config, ibc0: ChainId, ibc1: ChainId, port_id: PortId) -> Tuple[ Clie

return ibc0_client_id, ibc0_conn_id, ibc0_chan_id, ibc1_client_id, ibc1_conn_id, ibc1_chan_id


def main():
parser = argparse.ArgumentParser(
description='Test all relayer commands, end-to-end')
Expand Down Expand Up @@ -234,33 +236,41 @@ def main():

chains = toml.load(config.config_file)['chains']

ibc0 = chains[0]['id']
ibc0 = chains[0]['id']
ibc1 = chains[1]['id']
port_id = PortId('transfer')

ibc0_client_id, ibc0_conn_id, ibc0_chan_id, ibc1_client_id, ibc1_conn_id, ibc1_chan_id = raw(config, ibc0 , ibc1, port_id)
ibc0_client_id, ibc0_conn_id, ibc0_chan_id, ibc1_client_id, ibc1_conn_id, ibc1_chan_id = raw(
config, ibc0, ibc1, port_id)
sleep(2.0)

passive_packets(config, ibc0, ibc1, port_id, ibc0_chan_id, ibc1_chan_id)
sleep(2.0)

connection.passive_connection_init_then_start(config, ibc1, ibc0, ibc1_client_id, ibc0_client_id)
connection.passive_connection_init_then_start(
config, ibc1, ibc0, ibc1_client_id, ibc0_client_id)
sleep(2.0)

connection.passive_connection_start_then_init(config, ibc1, ibc0, ibc1_client_id, ibc0_client_id)
connection.passive_connection_start_then_init(
config, ibc1, ibc0, ibc1_client_id, ibc0_client_id)
sleep(2.0)

connection.passive_connection_try_then_start(config, ibc1, ibc0, ibc1_client_id, ibc0_client_id)
connection.passive_connection_try_then_start(
config, ibc1, ibc0, ibc1_client_id, ibc0_client_id)
sleep(2.0)

channel.passive_channel_start_then_init(config, ibc1, ibc0, ibc1_conn_id, port_id)
channel.passive_channel_start_then_init(
config, ibc1, ibc0, ibc1_conn_id, port_id)
sleep(2.0)

channel.passive_channel_init_then_start(config, ibc1, ibc0, ibc1_conn_id, port_id)
channel.passive_channel_init_then_start(
config, ibc1, ibc0, ibc1_conn_id, port_id)
sleep(2.0)

channel.passive_channel_try_then_start(config, ibc1, ibc0, ibc1_conn_id, ibc0_conn_id, port_id)
channel.passive_channel_try_then_start(
config, ibc1, ibc0, ibc1_conn_id, ibc0_conn_id, port_id)
sleep(2.0)


if __name__ == "__main__":
main()
13 changes: 7 additions & 6 deletions relayer-cli/src/commands/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{fmt, ops::Deref, str::FromStr, sync::Arc, thread};
use abscissa_core::{application::fatal_error, error::BoxError, Command, Options, Runnable};
use itertools::Itertools;
use tokio::runtime::Runtime as TokioRuntime;
use tracing::{error, info};

use ibc::{events::IbcEvent, ics24_host::identifier::ChainId};

Expand Down Expand Up @@ -92,8 +93,8 @@ impl Runnable for ListenCmd {

/// Listen to events
pub fn listen(config: &ChainConfig, filters: &[EventFilter]) -> Result<(), BoxError> {
println!(
"[info] Listening for events `{}` on '{}'...",
info!(
"listening for events `{}` on '{}'...",
filters.iter().format(", "),
config.id
);
Expand All @@ -116,15 +117,15 @@ pub fn listen(config: &ChainConfig, filters: &[EventFilter]) -> Result<(), BoxEr
continue;
}

println!("- Event batch at height {}", batch.height);
info!("- event batch at height {}", batch.height);

for event in matching_events {
println!("+ {:#?}", event);
info!("+ {:#?}", event);
}

println!();
info!("");
}
Err(e) => println!("- Error: {}", e),
Err(e) => error!("- error: {}", e),
}
}

Expand Down
Loading

0 comments on commit 1c578f1

Please sign in to comment.