Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add query_account to jetstream::Context #528

Merged
merged 4 commits into from
Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions async-nats/src/jetstream/account.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use serde::{de::Deserializer, Deserialize, Serialize};
use std::collections::HashMap;

fn negative_as_none<'de, D>(deserializer: D) -> Result<Option<i64>, D::Error>
where
D: Deserializer<'de>,
{
let n = i64::deserialize(deserializer)?;
if n.is_negative() {
Ok(None)
} else {
Ok(Some(n))
}
}

#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
pub struct Limits {
/// The maximum amount of Memory storage Stream Messages may consume
#[serde(deserialize_with = "negative_as_none")]
pub max_memory: Option<i64>,
/// The maximum amount of File storage Stream Messages may consume
#[serde(deserialize_with = "negative_as_none")]
pub max_storage: Option<i64>,
/// The maximum number of Streams an account can create
#[serde(deserialize_with = "negative_as_none")]
pub max_streams: Option<i64>,
/// The maximum number of Consumer an account can create
#[serde(deserialize_with = "negative_as_none")]
pub max_consumers: Option<i64>,
/// Indicates if Streams created in this account requires the max_bytes property set
pub max_bytes_required: bool,
/// The maximum number of outstanding ACKs any consumer may configure
pub max_ack_pending: i64,
/// The maximum size any single memory stream may be
#[serde(deserialize_with = "negative_as_none")]
pub memory_max_stream_bytes: Option<i64>,
/// The maximum size any single storage based stream may be
pub storage_max_stream_bytes: Option<i64>,
}

#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
pub struct Requests {
/// Total number of requests received for this account.
pub total: u64,
/// Total number of requests that resulted in an error response.
pub errors: u64,
}

#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
pub struct Tier {
/// Memory Storage being used for Stream Message storage
pub memory: u64,
/// File Storage being used for Stream Message storage
pub storage: u64,
/// Number of active Streams
pub streams: usize,
/// Number of active Consumers
pub consumers: usize,
/// Limits imposed on this tier.
pub limits: Limits,
/// Number of requests received.
#[serde(rename = "api")]
pub requests: Requests,
}

#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct Account {
/// Memory stoage being used for Stream Message storage
pub memory: u64,
/// File Storage being used for Stream Message storage
pub storage: u64,
/// Number of active Streams
pub streams: usize,
/// Number of active Consumers
pub consumers: usize,
/// The JetStream domain this account is in
pub domain: Option<String>,
/// Limits imposed on this account.
pub limits: Limits,
/// Number of requests received.
#[serde(rename = "api")]
pub requests: Requests,
/// Tiers associated with this account.
#[serde(default)]
pub tiers: HashMap<String, Tier>,
}
22 changes: 19 additions & 3 deletions async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
//
//! Manage operations on [Context], create/delete/update [Stream][crate::jetstream::stream::Stream]

use std::borrow::Borrow;
use std::io::{self, ErrorKind};

use crate::jetstream::account::Account;
use crate::jetstream::publish::PublishAck;
use crate::jetstream::response::Response;
use crate::{Client, Error};
use bytes::Bytes;
use http::HeaderMap;
use serde::{de::DeserializeOwned, Serialize};
use serde_json::{self, json};
use std::borrow::Borrow;
use std::io::{self, ErrorKind};

use super::stream::{Config, DeleteStatus, Info, Stream};

Expand Down Expand Up @@ -133,6 +133,22 @@ impl Context {
}
}

/// Query the server for account information
pub async fn query_account(&self) -> Result<Account, Error> {
let response: Response<Account> = self.request("INFO".into(), b"").await?;

match response {
Response::Err { error } => Err(Box::new(std::io::Error::new(
ErrorKind::Other,
format!(
"nats: error while querying account information: {}, {}, {}",
error.code, error.status, error.description
),
))),
Response::Ok(account) => Ok(account),
}
}

/// Create a JetStream [Stream] with given config and return a handle to it.
/// That handle can be used to manage and use [Consumer][crate::jetstream::consumer::Consumer].
///
Expand Down
1 change: 1 addition & 0 deletions async-nats/src/jetstream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@

use crate::{Client, Error};

pub mod account;
pub mod consumer;
pub mod context;
pub mod publish;
Expand Down
13 changes: 13 additions & 0 deletions async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ mod jetstream {
use futures::stream::{StreamExt, TryStreamExt};
use time::OffsetDateTime;

#[tokio::test]
async fn query_account_requests() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
let context = async_nats::jetstream::new(client);

let account = context.query_account().await.unwrap();
assert_eq!(account.requests.total, 0);

let account = context.query_account().await.unwrap();
assert_eq!(account.requests.total, 1);
}

#[tokio::test]
async fn publish_with_headers() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
Expand Down