Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

Commit

Permalink
fix: drop WS server if work complete (#399)
Browse files Browse the repository at this point in the history
* fix: drop WS server if work complete

* fix: remove closed subscriptions

* fix: handle error correctly

* style: better notification handling
  • Loading branch information
mattsse authored Aug 22, 2021
1 parent 8891ed3 commit bfbbee5
Showing 1 changed file with 21 additions and 4 deletions.
25 changes: 21 additions & 4 deletions ethers-providers/src/transports/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use futures_util::{
stream::{Fuse, Stream, StreamExt},
};
use serde::{de::DeserializeOwned, Serialize};
use std::collections::btree_map::Entry;
use std::{
collections::BTreeMap,
fmt::{self, Debug},
Expand Down Expand Up @@ -198,13 +199,25 @@ where
}
}

/// Returns whether the all work has been completed.
///
/// If this method returns `true`, then the `instructions` channel has been closed and all
/// pending requests and subscriptions have been completed.
fn is_done(&self) -> bool {
self.instructions.is_done() && self.pending.is_empty() && self.subscriptions.is_empty()
}

/// Spawns the event loop
fn spawn(mut self)
where
S: 'static,
{
let f = async move {
loop {
if self.is_done() {
tracing::info!("work complete");
break;
}
match self.tick().await {
Err(ClientError::UnexpectedClose) => {
tracing::error!("{}", ClientError::UnexpectedClose);
Expand Down Expand Up @@ -288,10 +301,14 @@ where
}
Ok(Incoming::Notification(notification)) => {
let id = notification.params.subscription;
if let Some(stream) = self.subscriptions.get(&id) {
stream
.unbounded_send(notification.params.result)
.map_err(to_client_error)?;
if let Entry::Occupied(stream) = self.subscriptions.entry(id) {
if let Err(err) = stream.get().unbounded_send(notification.params.result) {
if err.is_disconnected() {
// subscription channel was closed on the receiver end
stream.remove();
}
return Err(to_client_error(err));
}
}
}
}
Expand Down

0 comments on commit bfbbee5

Please sign in to comment.