From a700da741621751cdc9d09e5f5d287a6a00c7a89 Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Fri, 4 Nov 2022 21:27:00 -0500 Subject: [PATCH] Merge periodic flush into connection handler loop --- async-nats/src/lib.rs | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index 141fd8c55..92c76fb57 100644 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -315,9 +315,11 @@ impl ConnectionHandler { pub(crate) async fn process( &mut self, ping_period: Duration, + flush_period: Duration, mut receiver: mpsc::Receiver, ) -> Result<(), io::Error> { let mut ping_interval = interval(ping_period); + let mut flush_interval = interval(flush_period); loop { select! { @@ -329,6 +331,11 @@ impl ConnectionHandler { self.connection.flush().await?; }, + _ = flush_interval.tick().fuse() => { + if let Err(_err) = self.connection.flush().await { + self.handle_disconnect().await?; + } + }, maybe_command = receiver.recv().fuse() => { match maybe_command { Some(command) => if let Err(err) = self.handle_command(command).await { @@ -637,7 +644,7 @@ pub async fn connect_with_options( options: ConnectOptions, ) -> Result { let ping_period = options.ping_interval; - let flush_interval = options.flush_interval; + let flush_period = options.flush_interval; let (events_tx, mut events_rx) = mpsc::channel(128); let (state_tx, state_rx) = tokio::sync::watch::channel(State::Pending); @@ -673,22 +680,12 @@ pub async fn connect_with_options( let client = Client::new( info_watcher, state_rx, - sender.clone(), + sender, options.subscription_capacity, options.inbox_prefix, options.request_timeout, ); - tokio::spawn(async move { - loop { - tokio::time::sleep(flush_interval).await; - match sender.send(Command::TryFlush).await { - Ok(()) => {} - Err(_) => return, - } - } - }); - task::spawn(async move { while let Some(event) = events_rx.recv().await { options.event_callback.call(event).await @@ -703,7 +700,9 @@ pub async fn connect_with_options( } let connection = connection.unwrap(); let mut connection_handler = ConnectionHandler::new(connection, connector, info_sender); - connection_handler.process(ping_period, receiver).await + connection_handler + .process(ping_period, flush_period, receiver) + .await }); Ok(client)