Skip to content

Commit

Permalink
Add support for mirrors and sources in Key Value Store
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarema committed Oct 31, 2022
1 parent 28cc3b2 commit 46e0f38
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 21 deletions.
2 changes: 2 additions & 0 deletions async-nats/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::io::{self, ErrorKind};
use tokio::sync::mpsc;
use tracing::trace;

lazy_static! {
static ref VERSION_RE: Regex = Regex::new(r#"\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?"#).unwrap();
Expand Down Expand Up @@ -278,6 +279,7 @@ impl Client {
/// # }
/// ```
pub async fn request(&self, subject: String, payload: Bytes) -> Result<Message, Error> {
trace!("request sent to subject: {} ({})", subject, payload.len());
let request = Request::new().payload(payload);
self.send_request(subject, request).await
}
Expand Down
109 changes: 95 additions & 14 deletions async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use tracing::debug;

use super::kv::{Store, MAX_HISTORY};
use super::object_store::{is_valid_bucket_name, ObjectStore};
use super::stream::{self, Config, DeleteStatus, DiscardPolicy, Info, Stream};
use super::stream::{self, Config, DeleteStatus, DiscardPolicy, External, Info, Stream};

/// A context which can perform jetstream scoped requests.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -220,13 +220,44 @@ impl Context {
where
Config: From<S>,
{
let config: Config = stream_config.into();
let mut config: Config = stream_config.into();
if config.name.is_empty() {
return Err(Box::new(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
)));
}
if let Some(ref mut mirror) = config.mirror {
if let Some(ref mut domain) = mirror.domain {
if mirror.external.is_some() {
return Err(Box::new(io::Error::new(
ErrorKind::Other,
"domain and external are both set",
)));
}
mirror.external = Some(External {
api_prefix: format!("$JS.{}.API", domain),
delivery_prefix: None,
})
}
}

if let Some(ref mut sources) = config.sources {
for source in sources {
if let Some(ref mut domain) = source.domain {
if source.external.is_some() {
return Err(Box::new(io::Error::new(
ErrorKind::Other,
"domain and external are both set",
)));
}
source.external = Some(External {
api_prefix: format!("$JS.{}.API", domain),
delivery_prefix: None,
})
}
}
}
let subject = format!("STREAM.CREATE.{}", config.name);
let response: Response<Info> = self.request(subject, &config).await?;

Expand Down Expand Up @@ -438,13 +469,28 @@ impl Context {
"not a valid key-value store",
)));
}

Ok(Store {
let mut store = Store {
prefix: format!("$KV.{}.", &bucket),
name: bucket,
stream_name,
stream,
})
stream: stream.clone(),
put_prefix: None,
use_jetstream_prefix: self.prefix != "$JS.API",
};
if let Some(ref mirror) = stream.info.config.mirror {
let bucket = mirror.name.trim_start_matches("KV_");
if let Some(ref external) = mirror.external {
if !external.api_prefix.is_empty() {
store.use_jetstream_prefix = false;
store.prefix = format!("$KV.{}.", bucket);
store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
} else {
store.put_prefix = Some(format!("$KV.{}.", bucket));
}
}
};

Ok(store)
}

/// Creates a new key-value bucket.
Expand All @@ -466,7 +512,7 @@ impl Context {
/// ```
pub async fn create_key_value(
&self,
config: crate::jetstream::kv::Config,
mut config: crate::jetstream::kv::Config,
) -> Result<Store, Error> {
if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
return Err(Box::new(std::io::Error::new(
Expand All @@ -493,11 +539,27 @@ impl Context {
config.num_replicas
};

let mut subjects = Vec::new();
if let Some(ref mut mirror) = config.mirror {
if !mirror.name.starts_with("KV_") {
mirror.name = format!("KV_{}", mirror.name);
}
config.mirror_direct = true;
} else if let Some(ref mut sources) = config.sources {
for source in sources {
if !source.name.starts_with("KV_") {
source.name = format!("KV_{}", source.name);
}
}
} else {
subjects = vec![format!("$KV.{}.>", config.bucket)];
}

let stream = self
.create_stream(stream::Config {
name: format!("KV_{}", config.bucket),
description: Some(config.description),
subjects: vec![format!("$KV.{}.>", config.bucket)],
subjects,
max_messages_per_subject: history,
max_bytes: config.max_bytes,
max_age: config.max_age,
Expand All @@ -508,18 +570,37 @@ impl Context {
deny_delete: true,
deny_purge: false,
allow_direct: true,
sources: config.sources,
mirror: config.mirror,
num_replicas,
discard: stream::DiscardPolicy::New,
mirror_direct: config.mirror_direct,
..Default::default()
})
.await?;

Ok(Store {
name: config.bucket.clone(),
stream_name: stream.info.config.name.clone(),
prefix: format!("$KV.{}.", config.bucket),
stream,
})
let mut store = Store {
prefix: format!("$KV.{}.", &config.bucket),
name: config.bucket,
stream: stream.clone(),
stream_name: stream.info.config.name,
put_prefix: None,
use_jetstream_prefix: self.prefix != "$JS.API",
};
if let Some(ref mirror) = stream.info.config.mirror {
let bucket = mirror.name.trim_start_matches("KV_");
if let Some(ref external) = mirror.external {
if !external.api_prefix.is_empty() {
store.use_jetstream_prefix = false;
store.prefix = format!("$KV.{}.", bucket);
store.put_prefix = Some(format!("{}.$KV.{}.", external.api_prefix, bucket));
} else {
store.put_prefix = Some(format!("$KV.{}.", bucket));
}
}
};

Ok(store)
}

/// Deletes given key-value bucket.
Expand Down
27 changes: 23 additions & 4 deletions async-nats/src/jetstream/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use self::bucket::Status;

use super::{
consumer::DeliverPolicy,
stream::{RawMessage, Republish, StorageType, Stream},
stream::{RawMessage, Republish, Source, StorageType, Stream},
};

// Helper to extract key value operation from message headers
Expand Down Expand Up @@ -100,6 +100,12 @@ pub struct Config {
pub num_replicas: usize,
/// Republish is for republishing messages once persistent in the Key Value Bucket.
pub republish: Option<Republish>,
/// Bucket mirror configuration.
pub mirror: Option<Source>,
/// Bucket sources configuration.
pub sources: Option<Vec<Source>>,
/// Allow mirrors using direct API.
pub mirror_direct: bool,
}

/// Describes what kind of operation and entry represents
Expand All @@ -119,6 +125,8 @@ pub struct Store {
pub name: String,
pub stream_name: String,
pub prefix: String,
pub put_prefix: Option<String>,
pub use_jetstream_prefix: bool,
pub stream: Stream,
}

Expand Down Expand Up @@ -179,8 +187,13 @@ impl Store {
"invalid key",
)));
}

let subject = format!("{}{}", self.prefix.as_str(), key.as_ref());
let mut subject = String::new();
if self.use_jetstream_prefix {
subject.push_str(&self.stream.context.prefix);
subject.push('.');
}
subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
subject.push_str(key.as_ref());

let publish_ack = self.stream.context.publish(subject, value).await?;
let ack = publish_ack.await?;
Expand Down Expand Up @@ -511,7 +524,13 @@ impl Store {
"invalid key",
)));
}
let subject = format!("{}{}", self.prefix.as_str(), key.as_ref());
let mut subject = String::new();
if self.use_jetstream_prefix {
subject.push_str(&self.stream.context.prefix);
subject.push('.');
}
subject.push_str(self.put_prefix.as_ref().unwrap_or(&self.prefix));
subject.push_str(key.as_ref());

let mut headers = crate::HeaderMap::default();
// TODO: figure out which headers k/v should be where.
Expand Down
8 changes: 6 additions & 2 deletions async-nats/src/jetstream/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,10 @@ pub struct Config {
#[serde(default, skip_serializing_if = "is_default")]
pub allow_direct: bool,

/// Enable direct access also for mirrors.
#[serde(default, skip_serializing_if = "is_default")]
pub mirror_direct: bool,

/// Stream mirror configuration.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub mirror: Option<Source>,
Expand Down Expand Up @@ -1238,8 +1242,8 @@ pub struct Source {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub external: Option<External>,
/// Optional config to set a domain, if source is residing in different one.
#[serde(default, rename = "opt_start", skip_serializing_if = "is_default")]
pub doamin: Option<String>,
#[serde(default, skip_serializing_if = "is_default")]
pub domain: Option<String>,
}

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
Expand Down
3 changes: 3 additions & 0 deletions async-nats/tests/configs/jetstream_hub.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
server_name: HUB
jetstream: { domain: HUB }
leafnodes { listen: 127.0.0.1:7422 }
5 changes: 5 additions & 0 deletions async-nats/tests/configs/jetstream_leaf.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
server_name: LEAF
jetstream: { domain:LEAF }
leafnodes {
remotes = [ { url: "leaf://127.0.0.1:7422" } ]
}
65 changes: 64 additions & 1 deletion async-nats/tests/kv_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod kv {
use async_nats::{
jetstream::{
kv::Operation,
stream::{DiscardPolicy, Republish, StorageType},
stream::{DiscardPolicy, Republish, Source, StorageType},
},
ConnectOptions,
};
Expand Down Expand Up @@ -525,4 +525,67 @@ mod kv {
let message = subscribe.next().await.unwrap();
assert_eq!("bar.$KV.test.key", message.subject);
}

#[tokio::test]
async fn cross_account_mirrors() {
let hub_server = nats_server::run_server("tests/configs/jetstream_hub.conf");
let leaf_server = nats_server::run_server("tests/configs/jetstream_leaf.conf");

let hub = async_nats::connect(hub_server.client_url()).await.unwrap();
let leaf = async_nats::connect(leaf_server.client_url()).await.unwrap();

let hub_js = async_nats::jetstream::new(hub);
// create the bucket on the HUB.
let hub_kv = hub_js
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "TEST".to_string(),
..Default::default()
})
.await
.unwrap();
hub_kv.put("name", "derek".into()).await.unwrap();
hub_kv.put("age", "22".into()).await.unwrap();

let mirror_bucket = async_nats::jetstream::kv::Config {
bucket: "MIRROR".to_string(),
mirror: Some(Source {
name: "TEST".to_string(),
domain: Some("HUB".to_string()),
..Default::default()
}),
..Default::default()
};
let leaf_js = async_nats::jetstream::new(leaf.clone());
leaf_js.create_key_value(mirror_bucket).await.unwrap();

let mirror = leaf_js.get_stream("KV_MIRROR").await.unwrap();

// Make sure mirror direct set.
assert!(mirror.cached_info().config.mirror_direct);

// Make sure we sync.
tokio::time::sleep(Duration::from_secs(2)).await;

// Bind locally from leafnode and make sure both get and put work.
let local_kv = leaf_js.get_key_value("MIRROR").await.unwrap();

local_kv.put("name", "rip".into()).await.unwrap();

let name = local_kv.get("name").await.unwrap();
assert_eq!(from_utf8(&name.unwrap()).unwrap(), "rip".to_string());

// Bind through leafnode connection but to origin KV.
let leaf_hub_js = async_nats::jetstream::with_domain(leaf, "HUB");

let test = leaf_hub_js.get_key_value("TEST").await.unwrap();

test.put("name", "ivan".into()).await.unwrap();
let name = test.get("name").await.unwrap();
assert_eq!(from_utf8(&name.unwrap()).unwrap(), "ivan".to_string());

// Shutdown HUB and test get still work.
drop(hub_server);

local_kv.get("name").await.unwrap();
}
}

0 comments on commit 46e0f38

Please sign in to comment.