Skip to content

Commit

Permalink
Fix close() hang after js.subscribe() is called
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarema committed Feb 10, 2022
1 parent 3d35a49 commit db36688
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ impl Client {
let mut write = self.state.write.lock();
let mut read = self.state.read.lock();

write.flush_kicker.send(()).ok();
write.flush_kicker.try_send(()).ok();
// Initiate shutdown process.
if self.shutdown() {
// Clear all subscriptions.
Expand Down
19 changes: 19 additions & 0 deletions tests/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ fn shutdown_responsivness_regression_check() {
let s = util::run_basic_server();
let conn = nats::Options::new().connect(s.client_url()).unwrap();
conn.rtt().unwrap();
let sub = conn.subscribe("test").unwrap();
conn.publish("test", b"msg").unwrap();
sub.next().unwrap();

let now = Instant::now();
conn.close();
assert!(now.elapsed().le(&Duration::from_secs(5)));
Expand All @@ -111,3 +115,18 @@ fn drop_responsivness_regression_check() {
}
assert!(now.elapsed().le(&Duration::from_secs(5)));
}

#[test]
fn close_responsiveness_regression_jetstream() {
let (_s, nc, js) = run_basic_jetstream();

js.add_stream(nats::jetstream::StreamConfig {
name: "TEST".to_string(),
subjects: vec!["subject".to_string()],
..Default::default()
})
.unwrap();

js.subscribe("subject").expect("failed to subscribe");
nc.close();
}

0 comments on commit db36688

Please sign in to comment.