Skip to content

Commit

Permalink
electrum: Don't deadlock when shutting down
Browse files Browse the repository at this point in the history
Resolves #66
  • Loading branch information
shesek committed Nov 12, 2020
1 parent 81639fa commit 7f74d70
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 15 deletions.
4 changes: 2 additions & 2 deletions scripts/setup-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ runbwt () {
cleanup() {
trap - SIGTERM SIGINT
set +eo pipefail
kill -9 `jobs -rp` 2> /dev/null
kill `jobs -rp` 2> /dev/null
wait `jobs -rp` 2> /dev/null
ele daemon stop &> /dev/null
[ -n "$KEEP_DIR" ] || rm -rf $DIR
kill -9 -- -$$ 2> /dev/null
kill -- -$$ 2> /dev/null
}
trap cleanup SIGINT SIGTERM EXIT

Expand Down
37 changes: 24 additions & 13 deletions src/electrum/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,30 +539,41 @@ impl ElectrumServer {
query: query.clone(),
}));
Self::start_notifier(notification, subman.clone(), acceptor.sender());
let mut children = vec![];

let threads = Arc::new(Mutex::new(HashMap::new()));

while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() {
let query = query.clone();
let subman = subman.clone();
children.push(spawn_thread("peer", move || {
let threads_ = threads.clone();

let thandle = spawn_thread("peer", move || {
info!(target: LT, "[{}] connected peer", addr);
let conn = Connection::new(query, skip_merkle, stream, addr, subman);
conn.run();
info!(target: LT, "[{}] disconnected peer", addr);
}));
threads_.lock().unwrap().remove(&thread::current().id());
});

let thread_id = thandle.thread().id();
threads.lock().unwrap().insert(thread_id, thandle);
}

let subman = subman.lock().unwrap();
trace!(
target: LT,
"closing {} RPC connections",
subman.subscribers.len()
);
for (_, subscriber) in subman.subscribers.iter() {
let subscribers = &subman.subscribers;
trace!(target: LT, "closing {} RPC connections", subscribers.len());
for (_, subscriber) in subscribers.iter() {
let _ = subscriber.sender.send(Message::Done);
}
// FIXME this can deadlock
trace!(target: LT, "waiting for {} RPC threads", children.len());
for child in children {
let _ = child.join();
drop(subman); // Needed because the threads unsubscribe themselves during shutdown.

// Collect the threads JoinHandles, free the `threads` mutex and only then join them.
// Needed because the threads access the mutex to attempt removing themselves during shutdown.
let handles: Vec<_> = threads.lock().unwrap().drain().map(|(_, t)| t).collect();

trace!(target: LT, "waiting for {} RPC threads", handles.len());
for thandle in handles {
let _ = thandle.join();
}
trace!(target: LT, "RPC connections are closed");
})),
Expand Down

0 comments on commit 7f74d70

Please sign in to comment.