Skip to content

Commit

Permalink
Improve ordered consumer to use mem_storage and R1
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarema authored Oct 14, 2022
1 parent 471cbb4 commit 8b31346
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 38 deletions.
1 change: 1 addition & 0 deletions async-nats/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ tracing = "0.1"
itertools = "0.10"
lazy_static = "1.4"
base64 = "0.13"
tokio-retry = "0.3"
ring = "0.16"

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion async-nats/src/jetstream/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ pub struct Config {
#[serde(default, skip_serializing_if = "is_default")]
pub num_replicas: usize,
/// Force consumer to use memory storage.
#[serde(default, skip_serializing_if = "is_default")]
#[serde(default, skip_serializing_if = "is_default", rename = "mem_storage")]
pub memory_storage: bool,
}

Expand Down
196 changes: 159 additions & 37 deletions async-nats/src/jetstream/consumer/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,28 @@

use super::{AckPolicy, Consumer, DeliverPolicy, FromConsumer, IntoConsumerConfig, ReplayPolicy};
use crate::{
connection::State,
jetstream::{self, Context, Message},
Error, StatusCode, Subscriber,
};

use bytes::Bytes;
use futures::future::BoxFuture;
use serde::{Deserialize, Serialize};
use std::pin::Pin;
use std::task::{self, Poll};
use std::time::Duration;
use std::{
io::{self, ErrorKind},
pin::Pin,
sync::{Arc, Mutex},
time::Instant,
};
use std::{
sync::atomic::AtomicU64,
task::{self, Poll},
};
use std::{sync::atomic::Ordering, time::Duration};
use tokio::sync::oneshot::error::TryRecvError;
use tokio_retry::{strategy::ExponentialBackoff, Retry};
use tracing::{debug, trace};

impl Consumer<Config> {
/// Returns a stream of messages for Push Consumer.
Expand Down Expand Up @@ -202,8 +214,8 @@ pub struct Config {
impl FromConsumer for Config {
fn try_from_consumer_config(config: super::Config) -> Result<Self, Error> {
if config.deliver_subject.is_none() {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
return Err(Box::new(io::Error::new(
ErrorKind::Other,
"push consumer must have delivery subject",
)));
}
Expand Down Expand Up @@ -305,9 +317,6 @@ pub struct OrderedConfig {
/// The maximum number of waiting consumers.
#[serde(default, skip_serializing_if = "is_default")]
pub max_waiting: i64,
/// Number of consumer replicas
#[serde(default, skip_serializing_if = "is_default")]
pub num_replicas: usize,
}

impl FromConsumer for OrderedConfig {
Expand All @@ -316,8 +325,8 @@ impl FromConsumer for OrderedConfig {
Self: Sized,
{
if config.deliver_subject.is_none() {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
return Err(Box::new(io::Error::new(
ErrorKind::Other,
"push consumer must have delivery subject",
)));
}
Expand All @@ -332,7 +341,6 @@ impl FromConsumer for OrderedConfig {
headers_only: config.headers_only,
deliver_policy: config.deliver_policy,
max_waiting: config.max_waiting,
num_replicas: config.num_replicas,
})
}
}
Expand Down Expand Up @@ -361,7 +369,7 @@ impl IntoConsumerConfig for OrderedConfig {
max_batch: 0,
max_expires: Duration::default(),
inactive_threshold: Duration::from_secs(30),
num_replicas: self.num_replicas,
num_replicas: 1,
memory_storage: true,
}
}
Expand All @@ -375,13 +383,76 @@ impl Consumer<OrderedConfig> {
.subscribe(self.info.config.deliver_subject.clone().unwrap())
.await?;

let last_seen = Arc::new(Mutex::new(Instant::now()));
let last_sequence = Arc::new(AtomicU64::new(0));
let consumer_sequence = Arc::new(AtomicU64::new(0));
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
tokio::task::spawn({
let last_seen = last_seen.clone();
let stream_name = self.info.stream_name.clone();
let config = self.config.clone();
let mut context = self.context.clone();
let last_sequence = last_sequence.clone();
let consumer_sequence = consumer_sequence.clone();
let state = self.context.client.state.clone();
async move {
loop {
let current_state = state.borrow().to_owned();
tokio::select! {
_ = context.client.state.changed() => {
if state.borrow().to_owned() != State::Connected || current_state == State::Connected {
continue;
}
debug!("reconnected. trigger consumer recreation");
},
_ = tokio::time::sleep(Duration::from_secs(5)) => {
debug!("hearbeat check");

if !last_seen
.lock()
.unwrap()
.elapsed()
.gt(&Duration::from_secs(10)) {
println!("last seen ok. wait");
continue;
}
println!("last seen not ok");
}
}
debug!(
"idle hearbeats expired. recreating consumer s: {}, {:?}",
stream_name, config
);
let retry_strategy = ExponentialBackoff::from_millis(500).take(5);
let consumer = Retry::spawn(retry_strategy, || {
recreate_ephemeral_consumer(
context.clone(),
config.clone(),
stream_name.clone(),
last_sequence.load(Ordering::Relaxed),
)
})
.await;
if let Err(err) = consumer {
shutdown_tx.send(err).unwrap();
break;
}
*last_seen.lock().unwrap() = Instant::now();
debug!("resseting consume sequence to 0");
consumer_sequence.store(0, Ordering::Relaxed);
}
}
});

Ok(Ordered {
context: self.context.clone(),
consumer: self,
subscriber: Some(subscriber),
subscriber_future: None,
stream_sequence: 0,
consumer_sequence: 0,
stream_sequence: last_sequence,
consumer_sequence,
last_seen,
shutdown: shutdown_rx,
})
}
}
Expand All @@ -391,25 +462,42 @@ pub struct Ordered<'a> {
consumer: Consumer<OrderedConfig>,
subscriber: Option<Subscriber>,
subscriber_future: Option<BoxFuture<'a, Result<Subscriber, Error>>>,
stream_sequence: u64,
consumer_sequence: u64,
stream_sequence: Arc<AtomicU64>,
consumer_sequence: Arc<AtomicU64>,
last_seen: Arc<Mutex<Instant>>,
shutdown: tokio::sync::oneshot::Receiver<Error>,
}

impl<'a> futures::Stream for Ordered<'a> {
type Item = Result<Message, Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match self.shutdown.try_recv() {
Ok(err) => return Poll::Ready(Some(Err(err))),
Err(TryRecvError::Closed) => {
return Poll::Ready(Some(Err(Box::from(io::Error::new(
ErrorKind::Other,
"push consumer task closed",
)))))
}
Err(TryRecvError::Empty) => {}
}
if self.subscriber.is_none() {
match self.subscriber_future.as_mut() {
None => {
let context = self.context.clone();
let sequence = self.stream_sequence;
let sequence = self.stream_sequence.clone();
let config = self.consumer.config.clone();
let stream_name = self.consumer.info.stream_name.clone();
self.subscriber_future = Some(Box::pin(async move {
recreate_ephemeral_subscriber(context, config, stream_name, sequence)
.await
recreate_consumer_and_subscription(
context,
config,
stream_name,
sequence.load(Ordering::Relaxed),
)
.await
}));
match self.subscriber_future.as_mut().unwrap().as_mut().poll(cx) {
Poll::Ready(subscriber) => {
Expand All @@ -424,7 +512,7 @@ impl<'a> futures::Stream for Ordered<'a> {
Some(subscriber) => match subscriber.as_mut().poll(cx) {
Poll::Ready(subscriber) => {
self.subscriber_future = None;
self.consumer_sequence = 0;
self.consumer_sequence.store(0, Ordering::Relaxed);
self.subscriber = Some(subscriber?);
}
Poll::Pending => {
Expand All @@ -438,8 +526,10 @@ impl<'a> futures::Stream for Ordered<'a> {
Poll::Ready(maybe_message) => {
match maybe_message {
Some(message) => {
*self.last_seen.lock().unwrap() = Instant::now();
match message.status {
Some(StatusCode::IDLE_HEARTBEAT) => {
debug!("received idle hearbeats");
if let Some(headers) = message.headers.as_ref() {
if let Some(sequence) =
headers.get(crate::header::NATS_LAST_STREAM)
Expand All @@ -448,12 +538,14 @@ impl<'a> futures::Stream for Ordered<'a> {
.iter().next().unwrap()
.parse()
.map_err(|err|
Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
Box::new(io::Error::new(
ErrorKind::Other,
format!("could not parse header into u64: {}", err))
))?;

if sequence != self.stream_sequence {
if sequence
!= self.stream_sequence.load(Ordering::Relaxed)
{
self.subscriber = None;
}
}
Expand All @@ -480,19 +572,36 @@ impl<'a> futures::Stream for Ordered<'a> {
};

let info = jetstream_message.info()?;
if info.consumer_sequence != self.consumer_sequence + 1
&& info.stream_sequence != self.stream_sequence + 1
trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
self.consumer_sequence,
self.stream_sequence,
info.consumer_sequence,
info.stream_sequence);
if info.consumer_sequence
!= self.consumer_sequence.load(Ordering::Relaxed) + 1
&& info.stream_sequence
!= self.stream_sequence.load(Ordering::Relaxed) + 1
{
debug!(
"ordered consumer mismatch. current {}, info: {}",
self.consumer_sequence.load(Ordering::Relaxed),
info.consumer_sequence
);
self.subscriber = None;
continue;
}
self.stream_sequence = info.stream_sequence;
self.consumer_sequence = info.consumer_sequence;
self.stream_sequence
.store(info.stream_sequence, Ordering::Relaxed);
self.consumer_sequence
.store(info.consumer_sequence, Ordering::Relaxed);
return Poll::Ready(Some(Ok(jetstream_message)));
}
}
}
None => return Poll::Ready(None),
None => {
debug!("received None from subscription");
return Poll::Ready(None);
}
}
}
Poll::Pending => return Poll::Pending,
Expand All @@ -502,32 +611,45 @@ impl<'a> futures::Stream for Ordered<'a> {
}
}

async fn recreate_ephemeral_subscriber(
async fn recreate_consumer_and_subscription(
context: Context,
config: OrderedConfig,
stream_name: String,
sequence: u64,
) -> Result<Subscriber, Error> {
let stream = context.get_stream(stream_name.clone()).await?;

let subscriber = context
.client
.subscribe(config.deliver_subject.clone())
.await?;

recreate_ephemeral_consumer(context, config, stream_name, sequence).await?;
Ok(subscriber)
}
async fn recreate_ephemeral_consumer(
context: Context,
config: OrderedConfig,
stream_name: String,
sequence: u64,
) -> Result<(), Error> {
let stream = context.get_stream(stream_name.clone()).await?;

let deliver_policy = {
if sequence == 0 {
DeliverPolicy::All
} else {
DeliverPolicy::ByStartSequence {
start_sequence: sequence,
start_sequence: sequence + 1,
}
}
};
stream
.create_consumer(jetstream::consumer::push::OrderedConfig {
tokio::time::timeout(
Duration::from_secs(5),
stream.create_consumer(jetstream::consumer::push::OrderedConfig {
deliver_policy,
..config
})
.await?;
Ok(subscriber)
}),
)
.await
.map_err(|_| io::Error::new(ErrorKind::TimedOut, "timed out"))??;
Ok(())
}
Loading

0 comments on commit 8b31346

Please sign in to comment.