From 970ec4b07b21073a13bf21f8341a16cf14248eef Mon Sep 17 00:00:00 2001 From: Sean Chen Date: Mon, 28 Feb 2022 16:31:23 -0600 Subject: [PATCH 01/18] Sketching out refactor --- relayer/src/link/pending.rs | 31 ++++++++++++++++++++----------- relayer/src/link/relay_path.rs | 2 +- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/relayer/src/link/pending.rs b/relayer/src/link/pending.rs index 4aeea71823..bb99c1d6fa 100644 --- a/relayer/src/link/pending.rs +++ b/relayer/src/link/pending.rs @@ -13,7 +13,10 @@ use crate::link::error::LinkError; use crate::util::queue::Queue; use crate::{ chain::handle::ChainHandle, - link::{operational_data::OperationalData, relay_sender::AsyncReply, RelaySummary, TxHashes}, + link::{ + operational_data::OperationalData, relay_sender::AsyncReply, RelayPath, RelaySummary, + TxHashes, + }, }; pub const TIMEOUT: Duration = Duration::from_secs(300); @@ -177,16 +180,22 @@ impl PendingTxs { // relayer to resubmit the transaction to the chain again. error!("timed out while confirming {}", tx_hashes); - let resubmit_res = resubmit(pending.original_od.clone()); - - match resubmit_res { - Ok(reply) => { - self.insert_new_pending_tx(reply, pending.original_od); - Ok(None) - } - Err(e) => { - self.pending_queue.push_back(pending); - Err(e) + // if `clear_interval` > 0, then the relayer will periodically clear pending + // packets such that resubmitting would be duplicating work + if clear_interval == 0 { + let new_od = + RelayPath::regenerate_operational_data(pending.original_od.clone()); + let resubmit_res = resubmit(new_od); + + match resubmit_res { + Ok(reply) => { + self.insert_new_pending_tx(reply, pending.original_od); + Ok(None) + } + Err(e) => { + self.pending_queue.push_back(pending); + Err(e) + } } } } else { diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index b61544f870..c844b65eb7 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -527,7 +527,7 @@ impl RelayPath { /// /// Side effects: may schedule a new operational data targeting the source chain, comprising /// new timeout messages. - fn regenerate_operational_data( + pub(crate) fn regenerate_operational_data( &self, initial_odata: OperationalData, ) -> Option { From 4b14829ddc63044427b62cf4e4ca2328f823e5dc Mon Sep 17 00:00:00 2001 From: Sean Chen Date: Tue, 1 Mar 2022 11:14:00 -0600 Subject: [PATCH 02/18] Only relay operational data if `clear_interval` == 0 --- relayer/src/link/relay_path.rs | 67 ++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 31 deletions(-) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index c844b65eb7..878682217b 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -471,45 +471,50 @@ impl RelayPath { /// sender, which implements [`relay_sender::Submit`]. pub(crate) fn relay_from_operational_data( &self, + clear_interval: u64, initial_od: OperationalData, ) -> Result { - // We will operate on potentially different operational data if the initial one fails. - let _span = span!(Level::INFO, "relay", odata = %initial_od.info()).entered(); - let mut odata = initial_od; - for i in 0..MAX_RETRIES { - debug!( - "delayed by: {:?} [try {}/{}]", - odata.scheduled_time.elapsed(), - i + 1, - MAX_RETRIES - ); + // if `clear_interval` > 0, then the relayer will periodically clear pending + // packets such that resubmitting would be duplicating work + if clear_interval == 0 { + // We will operate on potentially different operational data if the initial one fails. + let _span = span!(Level::INFO, "relay", odata = %initial_od.info()).entered(); + + for i in 0..MAX_RETRIES { + debug!( + "delayed by: {:?} [try {}/{}]", + odata.scheduled_time.elapsed(), + i + 1, + MAX_RETRIES + ); - // Consume the operational data by attempting to send its messages - match self.send_from_operational_data::(odata.clone()) { - Ok(reply) => { - // Done with this op. data - info!("success"); + // Consume the operational data by attempting to send its messages + match self.send_from_operational_data::(odata.clone()) { + Ok(reply) => { + // Done with this op. data + info!("success"); - return Ok(reply); - } - Err(LinkError(error::LinkErrorDetail::Send(e), _)) => { - // This error means we could retry - error!("error {}", e.event); - if i + 1 == MAX_RETRIES { - error!("{}/{} retries exhausted. giving up", i + 1, MAX_RETRIES) - } else { - // If we haven't exhausted all retries, regenerate the op. data & retry - match self.regenerate_operational_data(odata.clone()) { - None => return Ok(S::Reply::empty()), // Nothing to retry - Some(new_od) => odata = new_od, + return Ok(reply); + } + Err(LinkError(error::LinkErrorDetail::Send(e), _)) => { + // This error means we could retry + error!("error {}", e.event); + if i + 1 == MAX_RETRIES { + error!("{}/{} retries exhausted. giving up", i + 1, MAX_RETRIES) + } else { + // If we haven't exhausted all retries, regenerate the op. data & retry + match self.regenerate_operational_data(odata.clone()) { + None => return Ok(S::Reply::empty()), // Nothing to retry + Some(new_od) => odata = new_od, + } } } - } - Err(e) => { - // Unrecoverable error, propagate up the stack - return Err(e); + Err(e) => { + // Unrecoverable error, propagate up the stack + return Err(e); + } } } } From 3cd85bbf8b8e49d385b2a4773675357c5815aec0 Mon Sep 17 00:00:00 2001 From: Sean Chen Date: Wed, 2 Mar 2022 08:42:28 -0600 Subject: [PATCH 03/18] Back to making changes to `process_pending` --- relayer/src/link/pending.rs | 17 +++------ relayer/src/link/relay_path.rs | 67 ++++++++++++++++------------------ 2 files changed, 36 insertions(+), 48 deletions(-) diff --git a/relayer/src/link/pending.rs b/relayer/src/link/pending.rs index bb99c1d6fa..ea619999ff 100644 --- a/relayer/src/link/pending.rs +++ b/relayer/src/link/pending.rs @@ -13,10 +13,7 @@ use crate::link::error::LinkError; use crate::util::queue::Queue; use crate::{ chain::handle::ChainHandle, - link::{ - operational_data::OperationalData, relay_sender::AsyncReply, RelayPath, RelaySummary, - TxHashes, - }, + link::{operational_data::OperationalData, relay_sender::AsyncReply, RelaySummary, TxHashes}, }; pub const TIMEOUT: Duration = Duration::from_secs(300); @@ -141,6 +138,7 @@ impl PendingTxs { &self, timeout: Duration, resubmit: impl FnOnce(OperationalData) -> Result, + should_resubmit: bool, ) -> Result, LinkError> { // We process pending transactions in a FIFO manner, so take from // the front of the queue. @@ -180,14 +178,9 @@ impl PendingTxs { // relayer to resubmit the transaction to the chain again. error!("timed out while confirming {}", tx_hashes); - // if `clear_interval` > 0, then the relayer will periodically clear pending - // packets such that resubmitting would be duplicating work - if clear_interval == 0 { - let new_od = - RelayPath::regenerate_operational_data(pending.original_od.clone()); - let resubmit_res = resubmit(new_od); - - match resubmit_res { + if should_resubmit { + let new_od = regenerate_operational_data(pending.original_od.clone()); + match resubmit(new_od) { Ok(reply) => { self.insert_new_pending_tx(reply, pending.original_od); Ok(None) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 878682217b..c844b65eb7 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -471,50 +471,45 @@ impl RelayPath { /// sender, which implements [`relay_sender::Submit`]. pub(crate) fn relay_from_operational_data( &self, - clear_interval: u64, initial_od: OperationalData, ) -> Result { + // We will operate on potentially different operational data if the initial one fails. + let _span = span!(Level::INFO, "relay", odata = %initial_od.info()).entered(); + let mut odata = initial_od; - // if `clear_interval` > 0, then the relayer will periodically clear pending - // packets such that resubmitting would be duplicating work - if clear_interval == 0 { - // We will operate on potentially different operational data if the initial one fails. - let _span = span!(Level::INFO, "relay", odata = %initial_od.info()).entered(); - - for i in 0..MAX_RETRIES { - debug!( - "delayed by: {:?} [try {}/{}]", - odata.scheduled_time.elapsed(), - i + 1, - MAX_RETRIES - ); + for i in 0..MAX_RETRIES { + debug!( + "delayed by: {:?} [try {}/{}]", + odata.scheduled_time.elapsed(), + i + 1, + MAX_RETRIES + ); - // Consume the operational data by attempting to send its messages - match self.send_from_operational_data::(odata.clone()) { - Ok(reply) => { - // Done with this op. data - info!("success"); + // Consume the operational data by attempting to send its messages + match self.send_from_operational_data::(odata.clone()) { + Ok(reply) => { + // Done with this op. data + info!("success"); - return Ok(reply); - } - Err(LinkError(error::LinkErrorDetail::Send(e), _)) => { - // This error means we could retry - error!("error {}", e.event); - if i + 1 == MAX_RETRIES { - error!("{}/{} retries exhausted. giving up", i + 1, MAX_RETRIES) - } else { - // If we haven't exhausted all retries, regenerate the op. data & retry - match self.regenerate_operational_data(odata.clone()) { - None => return Ok(S::Reply::empty()), // Nothing to retry - Some(new_od) => odata = new_od, - } + return Ok(reply); + } + Err(LinkError(error::LinkErrorDetail::Send(e), _)) => { + // This error means we could retry + error!("error {}", e.event); + if i + 1 == MAX_RETRIES { + error!("{}/{} retries exhausted. giving up", i + 1, MAX_RETRIES) + } else { + // If we haven't exhausted all retries, regenerate the op. data & retry + match self.regenerate_operational_data(odata.clone()) { + None => return Ok(S::Reply::empty()), // Nothing to retry + Some(new_od) => odata = new_od, } } - Err(e) => { - // Unrecoverable error, propagate up the stack - return Err(e); - } + } + Err(e) => { + // Unrecoverable error, propagate up the stack + return Err(e); } } } From 57db06206aaa249a8a9d1138ffd7b211bce9db75 Mon Sep 17 00:00:00 2001 From: Sean Chen Date: Tue, 8 Mar 2022 12:22:14 -0600 Subject: [PATCH 04/18] Pass `clear_interval` parameter to `process_pending` fn --- relayer/src/link/pending.rs | 2 ++ relayer/src/link/relay_path.rs | 50 ++++++++++++++++++++-------------- relayer/src/worker.rs | 6 +++- relayer/src/worker/packet.rs | 5 ++-- 4 files changed, 40 insertions(+), 23 deletions(-) diff --git a/relayer/src/link/pending.rs b/relayer/src/link/pending.rs index ea619999ff..7261e6d9cb 100644 --- a/relayer/src/link/pending.rs +++ b/relayer/src/link/pending.rs @@ -190,6 +190,8 @@ impl PendingTxs { Err(e) } } + } else { + Ok(None) } } else { // Reinsert the pending transaction, this time diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index c844b65eb7..46b36263bb 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -1227,45 +1227,55 @@ impl RelayPath { Ok(()) } - pub fn process_pending_txs(&self) -> RelaySummary { + pub fn process_pending_txs(&self, clear_interval: u64) -> RelaySummary { if !self.confirm_txes { return RelaySummary::empty(); } - let mut summary_src = self.process_pending_txs_src().unwrap_or_else(|e| { - error!("error processing pending events in source chain: {}", e); - RelaySummary::empty() - }); + let mut summary_src = self + .process_pending_txs_src(clear_interval) + .unwrap_or_else(|e| { + error!("error processing pending events in source chain: {}", e); + RelaySummary::empty() + }); - let summary_dst = self.process_pending_txs_dst().unwrap_or_else(|e| { - error!( - "error processing pending events in destination chain: {}", - e - ); - RelaySummary::empty() - }); + let summary_dst = self + .process_pending_txs_dst(clear_interval) + .unwrap_or_else(|e| { + error!( + "error processing pending events in destination chain: {}", + e + ); + RelaySummary::empty() + }); summary_src.extend(summary_dst); summary_src } - fn process_pending_txs_src(&self) -> Result { + fn process_pending_txs_src(&self, clear_interval: u64) -> Result { + let should_resubmit = clear_interval == 0; let res = self .pending_txs_src - .process_pending(pending::TIMEOUT, |odata| { - self.relay_from_operational_data::(odata) - })? + .process_pending( + pending::TIMEOUT, + |odata| self.relay_from_operational_data::(odata), + should_resubmit, + )? .unwrap_or_else(RelaySummary::empty); Ok(res) } - fn process_pending_txs_dst(&self) -> Result { + fn process_pending_txs_dst(&self, clear_interval: u64) -> Result { + let should_resubmit = clear_interval == 0; let res = self .pending_txs_dst - .process_pending(pending::TIMEOUT, |odata| { - self.relay_from_operational_data::(odata) - })? + .process_pending( + pending::TIMEOUT, + |odata| self.relay_from_operational_data::(odata), + should_resubmit, + )? .unwrap_or_else(RelaySummary::empty); Ok(res) diff --git a/relayer/src/worker.rs b/relayer/src/worker.rs index cb91350cfa..0bd7175c4a 100644 --- a/relayer/src/worker.rs +++ b/relayer/src/worker.rs @@ -120,7 +120,11 @@ pub fn spawn_worker_tasks( ); task_handles.push(packet_task); - let link_task = packet::spawn_packet_worker(path.clone(), link); + let link_task = packet::spawn_packet_worker( + path.clone(), + link, + packets_config.clear_interval, + ); task_handles.push(link_task); Some(cmd_tx) } diff --git a/relayer/src/worker/packet.rs b/relayer/src/worker/packet.rs index 01fdb1d648..9a433c2a27 100644 --- a/relayer/src/worker/packet.rs +++ b/relayer/src/worker/packet.rs @@ -46,6 +46,7 @@ pub fn spawn_packet_worker( path: Packet, // Mutex is used to prevent race condition between the packet workers link: Arc>>, + clear_interval: u64, ) -> TaskHandle { let span = { let relay_path = &link.lock().unwrap().a_to_b; @@ -69,7 +70,7 @@ pub fn spawn_packet_worker( .execute_schedule() .map_err(handle_link_error_in_task)?; - let summary = relay_path.process_pending_txs(); + let summary = relay_path.process_pending_txs(clear_interval); if !summary.is_empty() { trace!("Packet worker produced relay summary: {:?}", summary); @@ -202,7 +203,7 @@ fn handle_packet_cmd( } } - let summary = link.a_to_b.process_pending_txs(); + let summary = link.a_to_b.process_pending_txs(clear_interval); if !summary.is_empty() { trace!("produced relay summary: {:?}", summary); From 64e7c223c7f692a3c8757a1b7a898e1af9697bab Mon Sep 17 00:00:00 2001 From: Sean Chen Date: Wed, 9 Mar 2022 08:52:59 -0600 Subject: [PATCH 05/18] Add RelayPath parameter to `process_pending` fn --- relayer/src/link/pending.rs | 34 +++++++++++++++++--------------- relayer/src/link/relay_path.rs | 36 +++++++++++++++++++++------------- 2 files changed, 40 insertions(+), 30 deletions(-) diff --git a/relayer/src/link/pending.rs b/relayer/src/link/pending.rs index 7261e6d9cb..c350f4526b 100644 --- a/relayer/src/link/pending.rs +++ b/relayer/src/link/pending.rs @@ -9,7 +9,7 @@ use ibc::events::IbcEvent; use ibc::query::{QueryTxHash, QueryTxRequest}; use crate::error::Error as RelayerError; -use crate::link::error::LinkError; +use crate::link::{error::LinkError, RelayPath}; use crate::util::queue::Queue; use crate::{ chain::handle::ChainHandle, @@ -134,11 +134,11 @@ impl PendingTxs { } /// Try and process one pending transaction if available. - pub fn process_pending( + pub fn process_pending( &self, timeout: Duration, - resubmit: impl FnOnce(OperationalData) -> Result, - should_resubmit: bool, + relay_path: &RelayPath, + resubmit: Option Result>, ) -> Result, LinkError> { // We process pending transactions in a FIFO manner, so take from // the front of the queue. @@ -178,20 +178,22 @@ impl PendingTxs { // relayer to resubmit the transaction to the chain again. error!("timed out while confirming {}", tx_hashes); - if should_resubmit { - let new_od = regenerate_operational_data(pending.original_od.clone()); - match resubmit(new_od) { - Ok(reply) => { - self.insert_new_pending_tx(reply, pending.original_od); - Ok(None) - } - Err(e) => { - self.pending_queue.push_back(pending); - Err(e) + match resubmit { + Some(f) => { + let new_od = relay_path + .regenerate_operational_data(pending.original_od.clone()); + match f(new_od) { + Ok(reply) => { + self.insert_new_pending_tx(reply, pending.original_od); + Ok(None) + } + Err(e) => { + self.pending_queue.push_back(pending); + Err(e) + } } } - } else { - Ok(None) + None => Ok(None), } } else { // Reinsert the pending transaction, this time diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 46b36263bb..c976494ce2 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -1253,29 +1253,37 @@ impl RelayPath { summary_src } - fn process_pending_txs_src(&self, clear_interval: u64) -> Result { - let should_resubmit = clear_interval == 0; + fn process_pending_txs_src( + &self, + clear_interval: u64, + relay_path: &RelayPath, + ) -> Result { + let resubmit = if clear_interval == 0 { + Some(|odata| self.relay_from_operational_data::(odata)) + } else { + None + }; let res = self .pending_txs_src - .process_pending( - pending::TIMEOUT, - |odata| self.relay_from_operational_data::(odata), - should_resubmit, - )? + .process_pending(pending::TIMEOUT, relay_path, resubmit)? .unwrap_or_else(RelaySummary::empty); Ok(res) } - fn process_pending_txs_dst(&self, clear_interval: u64) -> Result { - let should_resubmit = clear_interval == 0; + fn process_pending_txs_dst( + &self, + clear_interval: u64, + relay_path: &RelayPath, + ) -> Result { + let resubmit = if clear_interval == 0 { + Some(|odata| self.relay_from_operational_data::(odata)) + } else { + None + }; let res = self .pending_txs_dst - .process_pending( - pending::TIMEOUT, - |odata| self.relay_from_operational_data::(odata), - should_resubmit, - )? + .process_pending(pending::TIMEOUT, relay_path, resubmit)? .unwrap_or_else(RelaySummary::empty); Ok(res) From 65eff98c9a9a3dc6249553a5372d96cbf1a6c8c5 Mon Sep 17 00:00:00 2001 From: Sean Chen Date: Wed, 9 Mar 2022 12:47:09 -0600 Subject: [PATCH 06/18] Make `regenerate_operational_data` private. --- relayer/src/link/relay_path.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index c976494ce2..8a0cbd0510 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -527,7 +527,7 @@ impl RelayPath { /// /// Side effects: may schedule a new operational data targeting the source chain, comprising /// new timeout messages. - pub(crate) fn regenerate_operational_data( + fn regenerate_operational_data( &self, initial_odata: OperationalData, ) -> Option { From 7cc1373f723e9dce4609499444583eab0b7d5e37 Mon Sep 17 00:00:00 2001 From: Sean Chen Date: Thu, 31 Mar 2022 09:52:24 -0500 Subject: [PATCH 07/18] Call `process_pending` such that operational data can now be regenerated --- relayer/src/link/pending.rs | 18 ++++++++++++++---- relayer/src/link/relay_path.rs | 18 +++++------------- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/relayer/src/link/pending.rs b/relayer/src/link/pending.rs index c350f4526b..3db6ac062d 100644 --- a/relayer/src/link/pending.rs +++ b/relayer/src/link/pending.rs @@ -172,6 +172,9 @@ impl PendingTxs { trace!("transaction is not yet committed: {} ", tx_hashes); + // As a hacky test, set this timeout to a very small value + // (or just make this an `if true` check) + // do an ft transfer with a small timeout with 1 or 2 blocks if submit_time.elapsed() > timeout { // The submission time for the transaction has exceeded the // timeout threshold. Returning Outcome::Timeout for the @@ -182,18 +185,25 @@ impl PendingTxs { Some(f) => { let new_od = relay_path .regenerate_operational_data(pending.original_od.clone()); - match f(new_od) { - Ok(reply) => { + match new_od.map(f) { + Some(Ok(reply)) => { self.insert_new_pending_tx(reply, pending.original_od); Ok(None) } - Err(e) => { + Some(Err(e)) => { self.pending_queue.push_back(pending); Err(e) } + None => { + // No operational data was regenerated; nothing to resubmit + Ok(None) + } } } - None => Ok(None), + None => { + // The clear packet interval is 0 such that no resubmit logic was received + Ok(None) + } } } else { // Reinsert the pending transaction, this time diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 8a0cbd0510..e986b7e877 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -527,7 +527,7 @@ impl RelayPath { /// /// Side effects: may schedule a new operational data targeting the source chain, comprising /// new timeout messages. - fn regenerate_operational_data( + pub(crate) fn regenerate_operational_data( &self, initial_odata: OperationalData, ) -> Option { @@ -1253,11 +1253,7 @@ impl RelayPath { summary_src } - fn process_pending_txs_src( - &self, - clear_interval: u64, - relay_path: &RelayPath, - ) -> Result { + fn process_pending_txs_src(&self, clear_interval: u64) -> Result { let resubmit = if clear_interval == 0 { Some(|odata| self.relay_from_operational_data::(odata)) } else { @@ -1265,17 +1261,13 @@ impl RelayPath { }; let res = self .pending_txs_src - .process_pending(pending::TIMEOUT, relay_path, resubmit)? + .process_pending(pending::TIMEOUT, &self, resubmit)? .unwrap_or_else(RelaySummary::empty); Ok(res) } - fn process_pending_txs_dst( - &self, - clear_interval: u64, - relay_path: &RelayPath, - ) -> Result { + fn process_pending_txs_dst(&self, clear_interval: u64) -> Result { let resubmit = if clear_interval == 0 { Some(|odata| self.relay_from_operational_data::(odata)) } else { @@ -1283,7 +1275,7 @@ impl RelayPath { }; let res = self .pending_txs_dst - .process_pending(pending::TIMEOUT, relay_path, resubmit)? + .process_pending(pending::TIMEOUT, &self, resubmit)? .unwrap_or_else(RelaySummary::empty); Ok(res) From 17939923a82dd4c166226d69f987bd6204e62a43 Mon Sep 17 00:00:00 2001 From: Sean Chen Date: Thu, 31 Mar 2022 10:13:47 -0500 Subject: [PATCH 08/18] Fix clippy warnings --- relayer/src/link/relay_path.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index e986b7e877..e62decfca4 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -1261,7 +1261,7 @@ impl RelayPath { }; let res = self .pending_txs_src - .process_pending(pending::TIMEOUT, &self, resubmit)? + .process_pending(pending::TIMEOUT, self, resubmit)? .unwrap_or_else(RelaySummary::empty); Ok(res) @@ -1275,7 +1275,7 @@ impl RelayPath { }; let res = self .pending_txs_dst - .process_pending(pending::TIMEOUT, &self, resubmit)? + .process_pending(pending::TIMEOUT, self, resubmit)? .unwrap_or_else(RelaySummary::empty); Ok(res) From 9215d0c90cd5a464ef5a6b33fd28b8ff4a430b77 Mon Sep 17 00:00:00 2001 From: Sean Chen Date: Mon, 4 Apr 2022 14:04:19 -0500 Subject: [PATCH 09/18] Remove unnecessary comment --- relayer/src/link/pending.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/relayer/src/link/pending.rs b/relayer/src/link/pending.rs index 3db6ac062d..5d57d4a6b5 100644 --- a/relayer/src/link/pending.rs +++ b/relayer/src/link/pending.rs @@ -172,9 +172,6 @@ impl PendingTxs { trace!("transaction is not yet committed: {} ", tx_hashes); - // As a hacky test, set this timeout to a very small value - // (or just make this an `if true` check) - // do an ft transfer with a small timeout with 1 or 2 blocks if submit_time.elapsed() > timeout { // The submission time for the transaction has exceeded the // timeout threshold. Returning Outcome::Timeout for the From 653326b7626b2d412ed8491403ab087dd226958e Mon Sep 17 00:00:00 2001 From: Sean Chen Date: Tue, 5 Apr 2022 13:56:58 -0500 Subject: [PATCH 10/18] Add changelog entry --- .../1792-fix-hermes-retrying-not-regenerating-msgs.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 .changelog/unreleased/bug-fixes/ibc-relayer/1792-fix-hermes-retrying-not-regenerating-msgs.md diff --git a/.changelog/unreleased/bug-fixes/ibc-relayer/1792-fix-hermes-retrying-not-regenerating-msgs.md b/.changelog/unreleased/bug-fixes/ibc-relayer/1792-fix-hermes-retrying-not-regenerating-msgs.md new file mode 100644 index 0000000000..7e01a0fb21 --- /dev/null +++ b/.changelog/unreleased/bug-fixes/ibc-relayer/1792-fix-hermes-retrying-not-regenerating-msgs.md @@ -0,0 +1 @@ +- Fixed Hermes retrying mechanism not regenerating operational data for messages ([#1792](https://github.com/informalsystems/ibc-rs/pull/1951)) From 52855f53b6bf0f8cffdb19a3b8db3338801bc9c8 Mon Sep 17 00:00:00 2001 From: Sean Chen Date: Thu, 7 Apr 2022 09:46:49 -0500 Subject: [PATCH 11/18] Update doc comment for `regenerate_operational_data` method --- relayer/src/link/relay_path.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index e62decfca4..937e994de3 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -517,8 +517,8 @@ impl RelayPath { Ok(S::Reply::empty()) } - /// Helper for managing retries of the `relay_from_operational_data` method. - /// Expects as input the initial operational data that failed to send. + /// Generates fresh operational data for a tx given the initial operational data + /// that failed to send. /// /// Return value: /// - `Some(..)`: a new operational data from which to retry sending, From 9c5ec2952416567092e0b16bdcc55dfae3451e59 Mon Sep 17 00:00:00 2001 From: Sean Chen Date: Thu, 7 Apr 2022 10:25:09 -0500 Subject: [PATCH 12/18] Replace `clear_interval` param with `do_resubmit` in fn signatures --- relayer/src/link/relay_path.rs | 19 ++++++++++++------- relayer/src/worker.rs | 7 ++----- relayer/src/worker/packet.rs | 9 ++++++--- 3 files changed, 20 insertions(+), 15 deletions(-) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 937e994de3..3064e97ea7 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -1227,20 +1227,25 @@ impl RelayPath { Ok(()) } - pub fn process_pending_txs(&self, clear_interval: u64) -> RelaySummary { + /// Kicks off the process of relaying pending txs to the source and destination chains. + /// + /// Packet resubmission is enabled when the clear interval for packets is 0. Otherwise, + /// when the packet clear interval is > 0, the relayer will periodically clear unsent packets + /// such that resubmitting packets is not necessary. + pub fn process_pending_txs(&self, do_resubmit: bool) -> RelaySummary { if !self.confirm_txes { return RelaySummary::empty(); } let mut summary_src = self - .process_pending_txs_src(clear_interval) + .process_pending_txs_src(do_resubmit) .unwrap_or_else(|e| { error!("error processing pending events in source chain: {}", e); RelaySummary::empty() }); let summary_dst = self - .process_pending_txs_dst(clear_interval) + .process_pending_txs_dst(do_resubmit) .unwrap_or_else(|e| { error!( "error processing pending events in destination chain: {}", @@ -1253,8 +1258,8 @@ impl RelayPath { summary_src } - fn process_pending_txs_src(&self, clear_interval: u64) -> Result { - let resubmit = if clear_interval == 0 { + fn process_pending_txs_src(&self, do_resubmit: bool) -> Result { + let resubmit = if do_resubmit { Some(|odata| self.relay_from_operational_data::(odata)) } else { None @@ -1267,8 +1272,8 @@ impl RelayPath { Ok(res) } - fn process_pending_txs_dst(&self, clear_interval: u64) -> Result { - let resubmit = if clear_interval == 0 { + fn process_pending_txs_dst(&self, do_resubmit: bool) -> Result { + let resubmit = if do_resubmit { Some(|odata| self.relay_from_operational_data::(odata)) } else { None diff --git a/relayer/src/worker.rs b/relayer/src/worker.rs index 0bd7175c4a..a3c1ecbd1d 100644 --- a/relayer/src/worker.rs +++ b/relayer/src/worker.rs @@ -111,6 +111,7 @@ pub fn spawn_worker_tasks( Ok(link) => { let (cmd_tx, cmd_rx) = crossbeam_channel::unbounded(); let link = Arc::new(Mutex::new(link)); + let do_resubmit = packets_config.clear_interval == 0; let packet_task = packet::spawn_packet_cmd_worker( cmd_rx, link.clone(), @@ -120,11 +121,7 @@ pub fn spawn_worker_tasks( ); task_handles.push(packet_task); - let link_task = packet::spawn_packet_worker( - path.clone(), - link, - packets_config.clear_interval, - ); + let link_task = packet::spawn_packet_worker(path.clone(), link, do_resubmit); task_handles.push(link_task); Some(cmd_tx) } diff --git a/relayer/src/worker/packet.rs b/relayer/src/worker/packet.rs index 9a433c2a27..7101662a85 100644 --- a/relayer/src/worker/packet.rs +++ b/relayer/src/worker/packet.rs @@ -42,11 +42,13 @@ fn handle_link_error_in_task(e: LinkError) -> TaskError { } } +/// Spawns a packet worker task in the background that handles the work of +/// processing pending txs between `ChainA` and `ChainB`. pub fn spawn_packet_worker( path: Packet, // Mutex is used to prevent race condition between the packet workers link: Arc>>, - clear_interval: u64, + do_resubmit: bool, ) -> TaskHandle { let span = { let relay_path = &link.lock().unwrap().a_to_b; @@ -70,7 +72,7 @@ pub fn spawn_packet_worker( .execute_schedule() .map_err(handle_link_error_in_task)?; - let summary = relay_path.process_pending_txs(clear_interval); + let summary = relay_path.process_pending_txs(do_resubmit); if !summary.is_empty() { trace!("Packet worker produced relay summary: {:?}", summary); @@ -203,7 +205,8 @@ fn handle_packet_cmd( } } - let summary = link.a_to_b.process_pending_txs(clear_interval); + let do_resubmit = clear_interval == 0; + let summary = link.a_to_b.process_pending_txs(do_resubmit); if !summary.is_empty() { trace!("produced relay summary: {:?}", summary); From 846d4048ff9025698f5c01adba09e15b235cfdc3 Mon Sep 17 00:00:00 2001 From: Sean Chen Date: Thu, 7 Apr 2022 10:49:51 -0500 Subject: [PATCH 13/18] Improve doc comments for the `process_pending` fn --- relayer/src/link/pending.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/relayer/src/link/pending.rs b/relayer/src/link/pending.rs index 5d57d4a6b5..2079ec2448 100644 --- a/relayer/src/link/pending.rs +++ b/relayer/src/link/pending.rs @@ -133,7 +133,12 @@ impl PendingTxs { Ok(Some(all_events)) } - /// Try and process one pending transaction if available. + /// Try and process one pending transaction within the given timeout duration if one + /// is available. + /// + /// A `resubmit` closure is provided when the clear interval for packets is 0. If this closure + /// is provided, the pending transactions that fail to process within the given timeout duration + /// are resubmitted following the logic specified by the closure. pub fn process_pending( &self, timeout: Duration, @@ -180,6 +185,9 @@ impl PendingTxs { match resubmit { Some(f) => { + // The pending tx needs to be resubmitted. This involves replacing the tx's + // stale operational data with a fresh copy and then applying the `resubmit` + // closure to it. let new_od = relay_path .regenerate_operational_data(pending.original_od.clone()); match new_od.map(f) { From b563c6aac88612e65cd6020164dca5c94035bc5c Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Tue, 12 Apr 2022 13:31:44 +0200 Subject: [PATCH 14/18] Introduce `Resubmit` type instead of boolean --- relayer/src/link.rs | 4 +- relayer/src/link/relay_path.rs | 67 +++++++++++++++++++++------------- relayer/src/worker.rs | 6 +-- relayer/src/worker/packet.rs | 9 +++-- 4 files changed, 53 insertions(+), 33 deletions(-) diff --git a/relayer/src/link.rs b/relayer/src/link.rs index dd0530fbe0..436e00a287 100644 --- a/relayer/src/link.rs +++ b/relayer/src/link.rs @@ -11,11 +11,11 @@ use crate::chain::counterparty::check_channel_counterparty; use crate::chain::handle::ChainHandle; use crate::channel::{Channel, ChannelSide}; use crate::link::error::LinkError; -use crate::link::relay_path::RelayPath; pub mod cli; pub mod error; pub mod operational_data; + mod pending; mod relay_path; mod relay_sender; @@ -26,6 +26,8 @@ use tx_hashes::TxHashes; // Re-export the telemetries summary pub use relay_summary::RelaySummary; +pub use relay_path::{RelayPath, Resubmit}; + #[derive(Clone, Debug)] pub struct LinkParameters { pub src_port_id: PortId, diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 46d5e26ffd..cd9bc9f47c 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -59,6 +59,27 @@ use crate::util::queue::Queue; const MAX_RETRIES: usize = 5; +/// Whether or not to resubmit packets when pending transactions +/// fail to process within the given timeout duration. +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum Resubmit { + Yes, + No, +} + +impl Resubmit { + /// Packet resubmission is enabled when the clear interval for packets is 0. Otherwise, + /// when the packet clear interval is > 0, the relayer will periodically clear unsent packets + /// such that resubmitting packets is not necessary. + pub fn from_clear_interval(clear_interval: u64) -> Self { + if clear_interval == 0 { + Self::Yes + } else { + Self::No + } + } +} + pub struct RelayPath { channel: Channel, @@ -1373,58 +1394,54 @@ impl RelayPath { /// Kicks off the process of relaying pending txs to the source and destination chains. /// - /// Packet resubmission is enabled when the clear interval for packets is 0. Otherwise, - /// when the packet clear interval is > 0, the relayer will periodically clear unsent packets - /// such that resubmitting packets is not necessary. - pub fn process_pending_txs(&self, do_resubmit: bool) -> RelaySummary { + /// See [`Resubmit::from_clear_interval`] for more info about the `resubmit` parameter. + pub fn process_pending_txs(&self, resubmit: Resubmit) -> RelaySummary { if !self.confirm_txes { return RelaySummary::empty(); } - let mut summary_src = self - .process_pending_txs_src(do_resubmit) - .unwrap_or_else(|e| { - error!("error processing pending events in source chain: {}", e); - RelaySummary::empty() - }); + let mut summary_src = self.process_pending_txs_src(resubmit).unwrap_or_else(|e| { + error!("error processing pending events in source chain: {}", e); + RelaySummary::empty() + }); - let summary_dst = self - .process_pending_txs_dst(do_resubmit) - .unwrap_or_else(|e| { - error!( - "error processing pending events in destination chain: {}", - e - ); - RelaySummary::empty() - }); + let summary_dst = self.process_pending_txs_dst(resubmit).unwrap_or_else(|e| { + error!( + "error processing pending events in destination chain: {}", + e + ); + RelaySummary::empty() + }); summary_src.extend(summary_dst); summary_src } - fn process_pending_txs_src(&self, do_resubmit: bool) -> Result { - let resubmit = if do_resubmit { + fn process_pending_txs_src(&self, resubmit: Resubmit) -> Result { + let do_resubmit = if resubmit == Resubmit::Yes { Some(|odata| self.relay_from_operational_data::(odata)) } else { None }; + let res = self .pending_txs_src - .process_pending(pending::TIMEOUT, self, resubmit)? + .process_pending(pending::TIMEOUT, self, do_resubmit)? .unwrap_or_else(RelaySummary::empty); Ok(res) } - fn process_pending_txs_dst(&self, do_resubmit: bool) -> Result { - let resubmit = if do_resubmit { + fn process_pending_txs_dst(&self, resubmit: Resubmit) -> Result { + let do_resubmit = if resubmit == Resubmit::Yes { Some(|odata| self.relay_from_operational_data::(odata)) } else { None }; + let res = self .pending_txs_dst - .process_pending(pending::TIMEOUT, self, resubmit)? + .process_pending(pending::TIMEOUT, self, do_resubmit)? .unwrap_or_else(RelaySummary::empty); Ok(res) diff --git a/relayer/src/worker.rs b/relayer/src/worker.rs index 69bf4668d4..6681d58627 100644 --- a/relayer/src/worker.rs +++ b/relayer/src/worker.rs @@ -5,7 +5,7 @@ use std::sync::Mutex; use tracing::error; use crate::foreign_client::ForeignClient; -use crate::link::{Link, LinkParameters}; +use crate::link::{Link, LinkParameters, Resubmit}; use crate::{ chain::handle::{ChainHandle, ChainHandlePair}, config::Config, @@ -125,7 +125,7 @@ pub fn spawn_worker_tasks( Ok(link) => { let (cmd_tx, cmd_rx) = crossbeam_channel::unbounded(); let link = Arc::new(Mutex::new(link)); - let do_resubmit = packets_config.clear_interval == 0; + let resubmit = Resubmit::from_clear_interval(packets_config.clear_interval); let packet_task = packet::spawn_packet_cmd_worker( cmd_rx, link.clone(), @@ -135,7 +135,7 @@ pub fn spawn_worker_tasks( ); task_handles.push(packet_task); - let link_task = packet::spawn_packet_worker(path.clone(), link, do_resubmit); + let link_task = packet::spawn_packet_worker(path.clone(), link, resubmit); task_handles.push(link_task); (Some(cmd_tx), None) diff --git a/relayer/src/worker/packet.rs b/relayer/src/worker/packet.rs index c93489d6a5..1a0f1144d4 100644 --- a/relayer/src/worker/packet.rs +++ b/relayer/src/worker/packet.rs @@ -9,6 +9,7 @@ use ibc::Height; use crate::chain::handle::ChainHandle; use crate::foreign_client::HasExpiredOrFrozenError; +use crate::link::Resubmit; use crate::link::{error::LinkError, Link}; use crate::object::Packet; use crate::telemetry; @@ -58,7 +59,7 @@ pub fn spawn_packet_worker( path: Packet, // Mutex is used to prevent race condition between the packet workers link: Arc>>, - do_resubmit: bool, + resubmit: Resubmit, ) -> TaskHandle { let span = { let relay_path = &link.lock().unwrap().a_to_b; @@ -82,7 +83,7 @@ pub fn spawn_packet_worker( .execute_schedule() .map_err(handle_link_error_in_task)?; - let summary = relay_path.process_pending_txs(do_resubmit); + let summary = relay_path.process_pending_txs(resubmit); if !summary.is_empty() { trace!("Packet worker produced relay summary: {:?}", summary); @@ -221,8 +222,8 @@ fn handle_packet_cmd( } } - let do_resubmit = clear_interval == 0; - let summary = link.a_to_b.process_pending_txs(do_resubmit); + let resubmit = Resubmit::from_clear_interval(clear_interval); + let summary = link.a_to_b.process_pending_txs(resubmit); if !summary.is_empty() { trace!("produced relay summary: {:?}", summary); From f9846f13d9b7979782398bc3b107aeeb1e768ec9 Mon Sep 17 00:00:00 2001 From: Sean Chen Date: Thu, 14 Apr 2022 09:58:59 -0500 Subject: [PATCH 15/18] Document the interaction between `clear_interval` and `tx_confirmation` parameters --- config.toml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/config.toml b/config.toml index a0fa87f0e6..9319b1cdc2 100644 --- a/config.toml +++ b/config.toml @@ -56,7 +56,9 @@ clear_on_start = true # Toggle the transaction confirmation mechanism. # The tx confirmation mechanism periodically queries the `/tx_search` RPC # endpoint to check that previously-submitted transactions -# (to any chain in this config file) have delivered successfully. +# (to any chain in this config file) have been successfully delivered. +# If they have not been, and `clear_interval = 0`, then those packets are +# queued up for re-submission. # Experimental feature. Affects telemetry if set to false. # [Default: true] tx_confirmation = true From 16bdde7bc9dd94282390e631c4204b92a221f10f Mon Sep 17 00:00:00 2001 From: Sean Chen Date: Thu, 14 Apr 2022 10:06:18 -0500 Subject: [PATCH 16/18] Fix incorrect comment --- relayer/src/link/pending.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relayer/src/link/pending.rs b/relayer/src/link/pending.rs index 2079ec2448..0b528f660e 100644 --- a/relayer/src/link/pending.rs +++ b/relayer/src/link/pending.rs @@ -206,7 +206,7 @@ impl PendingTxs { } } None => { - // The clear packet interval is 0 such that no resubmit logic was received + // `clear_interval != 0` such that resubmission has been disabled Ok(None) } } From a6f951c123ec13061409ea13f07c6307fe716849 Mon Sep 17 00:00:00 2001 From: Sean Chen Date: Thu, 14 Apr 2022 12:18:17 -0500 Subject: [PATCH 17/18] Switch from if on `Resubmit` to match --- relayer/src/link/relay_path.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index cd9bc9f47c..17cd295cb6 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -1418,10 +1418,11 @@ impl RelayPath { } fn process_pending_txs_src(&self, resubmit: Resubmit) -> Result { - let do_resubmit = if resubmit == Resubmit::Yes { - Some(|odata| self.relay_from_operational_data::(odata)) - } else { - None + let do_resubmit = match resubmit { + Resubmit::Yes => { + Some(|odata| self.relay_from_operational_data::(odata)) + } + Resubmit::No => None, }; let res = self @@ -1433,10 +1434,11 @@ impl RelayPath { } fn process_pending_txs_dst(&self, resubmit: Resubmit) -> Result { - let do_resubmit = if resubmit == Resubmit::Yes { - Some(|odata| self.relay_from_operational_data::(odata)) - } else { - None + let do_resubmit = match resubmit { + Resubmit::Yes => { + Some(|odata| self.relay_from_operational_data::(odata)) + } + Resubmit::No => None, }; let res = self From 74e768923391f226b4d8fe62e620a75bde556377 Mon Sep 17 00:00:00 2001 From: Sean Chen Date: Fri, 15 Apr 2022 11:15:33 -0500 Subject: [PATCH 18/18] Fix Queue::push_back method --- relayer/src/link/pending.rs | 5 ++++- relayer/src/util/queue.rs | 12 ++++++------ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/relayer/src/link/pending.rs b/relayer/src/link/pending.rs index 0b528f660e..3df931608a 100644 --- a/relayer/src/link/pending.rs +++ b/relayer/src/link/pending.rs @@ -65,7 +65,7 @@ impl PendingTxs { self.chain.id() } - // Insert new pending transaction to the back of the queue. + /// Insert a new pending transaction to the back of the queue. pub fn insert_new_pending_tx(&self, r: AsyncReply, od: OperationalData) { let mut tx_hashes = Vec::new(); let mut error_events = Vec::new(); @@ -190,6 +190,9 @@ impl PendingTxs { // closure to it. let new_od = relay_path .regenerate_operational_data(pending.original_od.clone()); + + trace!("regenerated operational data for {}", tx_hashes); + match new_od.map(f) { Some(Ok(reply)) => { self.insert_new_pending_tx(reply, pending.original_od); diff --git a/relayer/src/util/queue.rs b/relayer/src/util/queue.rs index 2fe5ab144a..27d501eb33 100644 --- a/relayer/src/util/queue.rs +++ b/relayer/src/util/queue.rs @@ -3,11 +3,11 @@ use std::sync::{Arc, RwLock}; use crate::util::lock::LockExt; -// A lightweight wrapper type to RefCell> so that -// we can safely mutate it with regular reference instead of -// mutable reference. We only expose subset of VecDeque methods -// that does not return any inner reference, so that the RefCell -// can never panic caused by simultaneous borrow and borrow_mut. +/// A lightweight wrapper type to RefCell> so that +/// we can safely mutate it with regular reference instead of +/// mutable reference. We only expose subset of VecDeque methods +/// that does not return any inner reference, so that the RefCell +/// can never panic caused by simultaneous borrow and borrow_mut. pub struct Queue(Arc>>); impl Queue { @@ -24,7 +24,7 @@ impl Queue { } pub fn push_back(&self, val: T) { - self.0.acquire_write().push_front(val) + self.0.acquire_write().push_back(val) } pub fn push_front(&self, val: T) {