Skip to content

Commit

Permalink
Merge periodic flush into connection handler loop
Browse files Browse the repository at this point in the history
  • Loading branch information
caspervonb committed Nov 8, 2022
1 parent d8b14da commit a700da7
Showing 1 changed file with 12 additions and 13 deletions.
25 changes: 12 additions & 13 deletions async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,11 @@ impl ConnectionHandler {
pub(crate) async fn process(
&mut self,
ping_period: Duration,
flush_period: Duration,
mut receiver: mpsc::Receiver<Command>,
) -> Result<(), io::Error> {
let mut ping_interval = interval(ping_period);
let mut flush_interval = interval(flush_period);

loop {
select! {
Expand All @@ -329,6 +331,11 @@ impl ConnectionHandler {

self.connection.flush().await?;
},
_ = flush_interval.tick().fuse() => {
if let Err(_err) = self.connection.flush().await {
self.handle_disconnect().await?;
}
},
maybe_command = receiver.recv().fuse() => {
match maybe_command {
Some(command) => if let Err(err) = self.handle_command(command).await {
Expand Down Expand Up @@ -637,7 +644,7 @@ pub async fn connect_with_options<A: ToServerAddrs>(
options: ConnectOptions,
) -> Result<Client, io::Error> {
let ping_period = options.ping_interval;
let flush_interval = options.flush_interval;
let flush_period = options.flush_interval;

let (events_tx, mut events_rx) = mpsc::channel(128);
let (state_tx, state_rx) = tokio::sync::watch::channel(State::Pending);
Expand Down Expand Up @@ -673,22 +680,12 @@ pub async fn connect_with_options<A: ToServerAddrs>(
let client = Client::new(
info_watcher,
state_rx,
sender.clone(),
sender,
options.subscription_capacity,
options.inbox_prefix,
options.request_timeout,
);

tokio::spawn(async move {
loop {
tokio::time::sleep(flush_interval).await;
match sender.send(Command::TryFlush).await {
Ok(()) => {}
Err(_) => return,
}
}
});

task::spawn(async move {
while let Some(event) = events_rx.recv().await {
options.event_callback.call(event).await
Expand All @@ -703,7 +700,9 @@ pub async fn connect_with_options<A: ToServerAddrs>(
}
let connection = connection.unwrap();
let mut connection_handler = ConnectionHandler::new(connection, connector, info_sender);
connection_handler.process(ping_period, receiver).await
connection_handler
.process(ping_period, flush_period, receiver)
.await
});

Ok(client)
Expand Down

0 comments on commit a700da7

Please sign in to comment.