Skip to content

Commit

Permalink
Restart the event monitor event loop to reinitialize the subscription…
Browse files Browse the repository at this point in the history
…s `Stream` after a restart (#1027)

* Add chain id to tracing messages in event monitor

* Restart the event loop after restarting the WebSocket client

* Restart the event loop in a stack-safe fashion

* Remove trampoline, and just use `return` instead
  • Loading branch information
romac authored Jun 2, 2021
1 parent fef5cd3 commit a1b40ae
Showing 1 changed file with 43 additions and 19 deletions.
62 changes: 43 additions & 19 deletions relayer/src/event/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl EventMonitor {
let mut subscriptions = vec![];

for query in &self.event_queries {
trace!("subscribing to query: {}", query);
trace!(chain.id = %self.chain_id, "subscribing to query: {}", query);

let subscription = self
.rt
Expand All @@ -204,14 +204,14 @@ impl EventMonitor {

self.subscriptions = Box::new(select_all(subscriptions));

trace!("subscribed to all queries");
trace!(chain.id = %self.chain_id, "subscribed to all queries");

Ok(())
}

fn try_reconnect(&mut self) -> Result<()> {
trace!(
"trying to reconnect to WebSocket endpoint: {}",
trace!(chain.id = %self.chain_id,
"trying to reconnect to WebSocket endpoint {}",
self.node_addr
);

Expand All @@ -228,27 +228,28 @@ impl EventMonitor {
std::mem::swap(&mut self.client, &mut client);
std::mem::swap(&mut self.driver_handle, &mut driver_handle);

trace!("reconnected to WebSocket endpoint: {}", self.node_addr);
trace!(
chain.id = %self.chain_id,
"reconnected to WebSocket endpoint {}",
self.node_addr,
);

// Shut down previous client
trace!("gracefully shutting down previous client");
if let Err(e) = client.close() {
trace!("previous websocket client closing failure: {}", e);
}
trace!(chain.id = %self.chain_id, "gracefully shutting down previous client");
let _ = client.close();

self.rt
.block_on(driver_handle)
.map_err(|e| Error::ClientTerminationFailed(Arc::new(e)))?;

trace!("previous client successfully shutdown");
trace!(chain.id = %self.chain_id, "previous client successfully shutdown");

Ok(())
}

/// Try to resubscribe to events
fn try_resubscribe(&mut self) -> Result<()> {
trace!("trying to resubscribe to events");

trace!(chain.id = %self.chain_id, "trying to resubscribe to events");
self.subscribe()
}

Expand All @@ -260,29 +261,45 @@ impl EventMonitor {
let result = retry_with_index(retry_strategy::default(), |index| {
// Try to reconnect
if let Err(e) = self.try_reconnect() {
trace!("error when reconnecting: {}", e);
trace!(chain.id = %self.chain_id, "error when reconnecting: {}", e);
return RetryResult::Retry(index);
}

// Try to resubscribe
if let Err(e) = self.try_resubscribe() {
trace!("error when reconnecting: {}", e);
trace!(chain.id = %self.chain_id, "error when reconnecting: {}", e);
return RetryResult::Retry(index);
}

RetryResult::Ok(())
});

match result {
Ok(()) => info!("successfully reconnected to WebSocket endpoint"),
Err(retries) => error!("failed to reconnect after {} retries", retries),
Ok(()) => info!(
chain.id = %self.chain_id,
"successfully reconnected to WebSocket endpoint {}",
self.node_addr
),
Err(retries) => error!(
chain.id = %self.chain_id,
"failed to reconnect to {} after {} retries",
self.node_addr, retries
),
}
}

/// Event monitor loop
pub fn run(mut self) {
debug!(chain.id = %self.chain_id, "starting event monitor");

// Continuously run the event loop, so that when it aborts
// because of WebSocket client restart, we pick up the work again.
loop {
self.run_loop();
}
}

fn run_loop(&mut self) {
// Take ownership of the subscriptions
let subscriptions =
std::mem::replace(&mut self.subscriptions, Box::new(futures::stream::empty()));
Expand All @@ -306,13 +323,20 @@ impl EventMonitor {

match result {
Ok(batch) => self.process_batch(batch).unwrap_or_else(|e| {
error!("failed to process event batch: {}", e);
error!(chain.id = %self.chain_id, "failed to process event batch: {}", e);
}),
Err(e) => {
error!("failed to collect events: {}", e);
error!(chain.id = %self.chain_id, "failed to collect events: {}", e);

// Restart the event monitor
// Restart the event monitor, reconnect to the WebSocket endpoint,
// and subscribe again to the queries.
self.restart();

// Abort this event loop, the `run` method will start a new one.
// We can't just write `return self.run()` here because Rust
// does not perform tail call optimization, and we would
// thus potentially blow up the stack after many restarts.
return;
}
}
}
Expand Down

0 comments on commit a1b40ae

Please sign in to comment.