Skip to content

Commit

Permalink
Add watch_with_history for Object Store
Browse files Browse the repository at this point in the history
* feat(ObjectStore): Add watch_with_history method

* feat(ObjectStore): Add test for watch_with_history
  • Loading branch information
tinou98 authored Aug 11, 2023
1 parent 0a80d9f commit af0779f
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 2 deletions.
18 changes: 16 additions & 2 deletions async-nats/src/jetstream/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use serde::{Deserialize, Serialize};
use tracing::{debug, trace};

use super::consumer::push::OrderedError;
use super::consumer::{StreamError, StreamErrorKind};
use super::consumer::{DeliverPolicy, StreamError, StreamErrorKind};
use super::context::{PublishError, PublishErrorKind};
use super::stream::{ConsumerError, ConsumerErrorKind, PurgeError, PurgeErrorKind};
use super::{consumer::push::Ordered, stream::StorageType};
Expand Down Expand Up @@ -401,11 +401,25 @@ impl ObjectStore {
/// # }
/// ```
pub async fn watch(&self) -> Result<Watch<'_>, WatchError> {
self.watch_with_deliver_policy(DeliverPolicy::New).await
}

/// Creates a [Watch] stream over changes in the [ObjectStore] which yields values whenever
/// there are changes for that key with as well as last value.
pub async fn watch_with_history(&self) -> Result<Watch<'_>, WatchError> {
self.watch_with_deliver_policy(DeliverPolicy::LastPerSubject)
.await
}

async fn watch_with_deliver_policy(
&self,
deliver_policy: DeliverPolicy,
) -> Result<Watch<'_>, WatchError> {
let subject = format!("$O.{}.M.>", self.name);
let ordered = self
.stream
.create_consumer(crate::jetstream::consumer::push::OrderedConfig {
deliver_policy: super::consumer::DeliverPolicy::New,
deliver_policy,
deliver_subject: self.stream.context.client.new_inbox(),
description: Some("object store watcher".to_string()),
filter_subject: subject,
Expand Down
60 changes: 60 additions & 0 deletions async-nats/tests/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,66 @@ mod object_store {
assert!(object.deleted);
}

#[tokio::test]
async fn watch_with_history() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();

let jetstream = async_nats::jetstream::new(client);

let bucket = jetstream
.create_object_store(async_nats::jetstream::object_store::Config {
bucket: "bucket".to_string(),
..Default::default()
})
.await
.unwrap();

bucket
.put("FOO", &mut std::io::Cursor::new(vec![1, 2, 3, 4]))
.await
.unwrap();

bucket
.put("BAR", &mut std::io::Cursor::new(vec![5, 6, 7, 8]))
.await
.unwrap();

bucket
.put("FOO", &mut std::io::Cursor::new(vec![9, 0, 1, 2]))
.await
.unwrap();

let mut watcher = bucket.watch_with_history().await.unwrap();

tokio::task::spawn({
let bucket = bucket.clone();
async move {
tokio::time::sleep(Duration::from_millis(100)).await;
bucket
.put("BAR", &mut io::Cursor::new(vec![2, 3, 4, 5]))
.await
.unwrap();
bucket.delete("BAR").await.unwrap();
}
});

// check to see if we get the values in accordance to the LastPerSubject deliver policy
// we should get `BAR` and only one `FOO`
let object = watcher.next().await.unwrap().unwrap();
assert_eq!(object.name, "BAR".to_string());

let object = watcher.next().await.unwrap().unwrap();
assert_eq!(object.name, "FOO".to_string());

// make sure we get the rest correctly
let object = watcher.next().await.unwrap().unwrap();
assert_eq!(object.name, "BAR".to_string());
let object = watcher.next().await.unwrap().unwrap();
assert_eq!(object.name, "BAR".to_string());
assert!(object.deleted);
}

#[tokio::test]
async fn info() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
Expand Down

0 comments on commit af0779f

Please sign in to comment.