Skip to content

Commit

Permalink
Improve on names
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarema committed Oct 3, 2022
1 parent 11097b7 commit 01a6c38
Showing 1 changed file with 4 additions and 19 deletions.
23 changes: 4 additions & 19 deletions async-nats/src/jetstream/consumer/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ impl<'a> futures::Stream for Ordered<'a> {
let config = self.consumer.config.clone();
let stream_name = self.consumer.info.stream_name.clone();
self.subscriber_future = Some(Box::pin(async move {
recreate_ephemeral_subscriber(
recreate_consumer_and_subscription(
context,
config,
stream_name,
Expand Down Expand Up @@ -611,33 +611,18 @@ impl<'a> futures::Stream for Ordered<'a> {
}
}

async fn recreate_ephemeral_subscriber(
async fn recreate_consumer_and_subscription(
context: Context,
config: OrderedConfig,
stream_name: String,
sequence: u64,
) -> Result<Subscriber, Error> {
let stream = context.get_stream(stream_name.clone()).await?;

let subscriber = context
.client
.subscribe(config.deliver_subject.clone())
.await?;
let deliver_policy = {
if sequence == 0 {
DeliverPolicy::All
} else {
DeliverPolicy::ByStartSequence {
start_sequence: sequence + 1,
}
}
};
stream
.create_consumer(jetstream::consumer::push::OrderedConfig {
deliver_policy,
..config
})
.await?;

recreate_ephemeral_consumer(context, config, stream_name, sequence).await?;
Ok(subscriber)
}
async fn recreate_ephemeral_consumer(
Expand Down

0 comments on commit 01a6c38

Please sign in to comment.