Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple endpoints service #791

Merged
merged 8 commits into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 162 additions & 57 deletions async-nats/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ use std::{
fmt::Display,
pin::Pin,
sync::{Arc, Mutex},
task::Poll,
time::{Duration, Instant},
};

use bytes::Bytes;
use futures::{
stream::{self, SelectAll},
Future, Stream, StreamExt, TryFutureExt,
Future, FutureExt, Stream, StreamExt,
};
use lazy_static::lazy_static;
use regex::Regex;
Expand All @@ -33,7 +34,7 @@ use serde_json::json;
use time::serde::rfc3339;
use time::OffsetDateTime;
use tokio::task::JoinHandle;
use tracing::debug;
use tracing::{debug, trace};

use crate::{Client, Error, HeaderMap, Message, PublishError, Subscriber};

Expand Down Expand Up @@ -66,7 +67,7 @@ pub struct StatsResponse {
pub version: String,
#[serde(with = "rfc3339")]
pub started: OffsetDateTime,
pub stats: Vec<EndpointStats>,
pub endpoints: Vec<EndpointStats>,
}

/// Stats of a single endpoint.
Expand Down Expand Up @@ -94,7 +95,6 @@ pub struct Info {
pub id: String,
pub description: Option<String>,
pub version: String,
pub subject: String,
}

/// Schema of requests and responses.
Expand All @@ -107,13 +107,6 @@ pub struct Schema {
pub response: String,
}

/// Endpoint definition.
/// In this iteration, it's just a subject.
#[derive(Clone, Debug)]
pub struct Endpoint {
pub subject: String,
}

/// Configuration of the [Service].
#[derive(Debug)]
pub struct Config {
Expand All @@ -126,8 +119,6 @@ pub struct Config {
pub version: String,
// Request / Response schemas
pub schema: Option<Schema>,
/// A subject to which service will subscribe.
pub endpoint: String,
}

/// Verbs that can be used to acquire information from the services.
Expand Down Expand Up @@ -165,12 +156,13 @@ pub trait ServiceExt {
/// let mut service = client.add_service( async_nats::service::Config {
/// name: "generator".to_string(),
/// version: "1.0.0".to_string(),
/// endpoint: "events.>".to_string(),
/// schema: None,
/// description: None,
/// }).await?;
///
/// if let Some(request) = service.next().await {
/// let mut endpoint = service.endpoint("get").await?;
///
/// if let Some(request) = endpoint.next().await {
/// request.respond(Ok("hello".into())).await?;
/// }
///
Expand Down Expand Up @@ -202,12 +194,13 @@ impl ServiceExt for crate::Client {
/// let mut service = client.add_service( async_nats::service::Config {
/// name: "generator".to_string(),
/// version: "1.0.0".to_string(),
/// endpoint: "events.>".to_string(),
/// schema: None,
/// description: None,
/// }).await?;
///
/// if let Some(request) = service.next().await {
/// let mut endpoint = service.endpoint("get").await?;
///
/// if let Some(request) = endpoint.next().await {
/// request.respond(Ok("hello".into())).await?;
/// }
///
Expand All @@ -219,9 +212,57 @@ pub struct Service {
stats: Arc<Mutex<Stats>>,
info: Info,
client: Client,
requests: Subscriber,
handle: JoinHandle<Result<(), Error>>,
shutdown_tx: tokio::sync::broadcast::Sender<()>,
}

pub struct Group {
prefix: String,
stats: Arc<Mutex<Stats>>,
client: Client,
shutdown_tx: tokio::sync::broadcast::Sender<()>,
}

impl Group {
pub fn group<S: ToString>(&self, prefix: S) -> Group {
Group {
prefix: prefix.to_string(),
stats: self.stats.clone(),
client: self.client.clone(),
shutdown_tx: self.shutdown_tx.clone(),
}
}
pub async fn endpoint<S: ToString>(&self, subject: S) -> Result<Endpoint, Error> {
let subject = subject.to_string();
let requests = self
.client
.queue_subscribe(
format!("{}.{subject}", self.prefix),
QUEUE_GROUP.to_string(),
)
.await?;
debug!("created service for endpoint {}.{subject}", self.prefix);

let mut shutdown_rx = self.shutdown_tx.subscribe();

let mut stats = self.stats.lock().unwrap();
stats
.endpoints
.entry(subject.clone())
.or_insert(EndpointStats {
name: subject.clone(),
..Default::default()
});
Ok(Endpoint {
requests,
stats: self.stats.clone(),
client: self.client.clone(),
endpoint: subject,
shutdown: Box::pin(async move { shutdown_rx.recv().fuse().await }),
})
}
}

impl Service {
async fn add(client: Client, config: Config) -> Result<Service, Error> {
// validate service version semver string.
Expand All @@ -246,28 +287,12 @@ impl Service {
id: id.clone(),
description: config.description.clone(),
version: config.version.clone(),
subject: config.endpoint.clone(),
};
let request_stats = EndpointStats {
// FIXME: what should be the name?
name: "requests".to_string(),
..Default::default()
};

let mut endpoints = HashMap::new();
endpoints.insert("requests".to_string(), request_stats);
let (shutdown_tx, _) = tokio::sync::broadcast::channel(1);

let endpoints = HashMap::new();
let endpoint_stats = Arc::new(Mutex::new(Stats { endpoints }));
let requests = client
.queue_subscribe(
format!("{SERVICE_API_PREFIX}.{}", config.endpoint.clone()),
QUEUE_GROUP.to_string(),
)
.await?;
debug!(
"crerated service for endpoint {}.{}",
SERVICE_API_PREFIX,
config.endpoint.clone()
);

// create subscriptions for all verbs.
let mut pings =
Expand Down Expand Up @@ -328,7 +353,7 @@ impl Service {
id: info.id.clone(),
version: info.version.clone(),
started,
stats: endpoint_stats.lock().unwrap().endpoints.values().cloned().collect(),
endpoints: endpoint_stats.lock().unwrap().endpoints.values().cloned().collect(),
})?;
client.publish(stats_request.reply.unwrap(), stats.into()).await?;
},
Expand All @@ -342,11 +367,12 @@ impl Service {
stats: endpoint_stats,
info,
client,
requests,
handle,
shutdown_tx,
})
}
}

async fn verb_subscription(
client: Client,
verb: Verb,
Expand All @@ -365,36 +391,78 @@ async fn verb_subscription(
Ok(stream::select_all([verb_all, verb_id, verb_name]).fuse())
}

impl Stream for Service {
pub struct Endpoint {
requests: Subscriber,
stats: Arc<Mutex<Stats>>,
client: Client,
endpoint: String,
shutdown: Pin<Box<dyn Future<Output = Result<(), tokio::sync::broadcast::error::RecvError>>>>,
Copy link
Collaborator

@caspervonb caspervonb Jan 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not take the broadcast receiver here? avoid the pin box indirection.

Suggested change
shutdown: Pin<Box<dyn Future<Output = Result<(), tokio::sync::broadcast::error::RecvError>>>>,
shutdown: Pin<Box<dyn Future<Output = Result<(), tokio::sync::broadcast::error::RecvError>>>>,

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would need to either call recv() and store the future in the same way, or call sync try_recv() which would not poll any future, hence when shutdown happens, iterators without messages would not be closed.

}

impl Stream for Endpoint {
type Item = Request;

// FIXME: how we implement stats (durations) if it's a stream?
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
trace!("polling for next request");
match self.shutdown.as_mut().poll(cx) {
Poll::Ready(_result) => {
debug!("got stop broadcast");
self.requests
.sender
.try_send(crate::Command::Unsubscribe {
sid: self.requests.sid,
max: None,
})
.ok();
}
Poll::Pending => {
trace!("stop broadcast still pending");
}
}
trace!("checking for new messages");
match self.requests.poll_next_unpin(cx) {
std::task::Poll::Ready(message) => match message {
Some(message) => std::task::Poll::Ready(Some(Request {
issued: Instant::now(),
stats: self.stats.clone(),
client: self.client.clone(),
message,
})),
None => std::task::Poll::Ready(None),
},
std::task::Poll::Pending => std::task::Poll::Pending,
Poll::Ready(message) => {
debug!("got next message");
match message {
Some(message) => Poll::Ready(Some(Request {
issued: Instant::now(),
stats: self.stats.clone(),
client: self.client.clone(),
message,
endpoint: self.endpoint.clone(),
})),
None => Poll::Ready(None),
}
}

Poll::Pending => {
trace!("still pending for messages");
Poll::Pending
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}

impl Endpoint {
pub async fn stop(&mut self) -> Result<(), std::io::Error> {
self.requests.unsubscribe().await
}
}

impl Service {
/// Stops this instance of the [Service].
/// If there are more instances of [Services][Service] with the same name, the [Service] will
/// be scaled down by one instance. If it was the only running instance, it will effectively
/// remove the service entirely.
pub async fn stop(mut self) -> Result<(), Error> {
self.requests.unsubscribe().map_err(Box::new).await?;
pub async fn stop(self) -> Result<(), Error> {
self.shutdown_tx.send(())?;
self.handle.abort();
Ok(())
}
Expand All @@ -418,6 +486,41 @@ impl Service {
pub async fn info(&self) -> Info {
self.info.clone()
}

pub fn group<S: ToString>(&self, prefix: S) -> Group {
Group {
prefix: prefix.to_string(),
stats: self.stats.clone(),
client: self.client.clone(),
shutdown_tx: self.shutdown_tx.clone(),
}
}
pub async fn endpoint<S: ToString>(&self, subject: S) -> Result<Endpoint, Error> {
let subject = subject.to_string();
let requests = self
.client
.queue_subscribe(subject.clone(), QUEUE_GROUP.to_string())
.await?;
debug!("created service for endpoint {subject}");

let mut shutdown_rx = self.shutdown_tx.subscribe();

let mut stats = self.stats.lock().unwrap();
stats
.endpoints
.entry(subject.clone())
.or_insert(EndpointStats {
name: subject.clone(),
..Default::default()
});
Ok(Endpoint {
requests,
stats: self.stats.clone(),
client: self.client.clone(),
endpoint: subject,
shutdown: Box::pin(async move { shutdown_rx.recv().fuse().await }),
})
}
}

/// Request returned by [Service] [Stream][futures::Stream].
Expand All @@ -426,6 +529,7 @@ pub struct Request {
issued: Instant,
client: Client,
pub message: Message,
endpoint: String,
stats: Arc<Mutex<Stats>>,
}

Expand All @@ -443,12 +547,12 @@ impl Request {
/// # let mut service = client.add_service(async_nats::service::Config {
/// # name: "generator".to_string(),
/// # version: "1.0.0".to_string(),
/// # endpoint: "events.>".to_string(),
/// # schema: None,
/// # description: None,
/// # }).await?;
///
/// let request = service.next().await.unwrap();
/// let mut endpoint = service.endpoint("endpoint").await?;
/// let request = endpoint.next().await.unwrap();
/// request.respond(Ok("hello".into())).await?;
/// # Ok(())
/// # }
Expand All @@ -462,7 +566,7 @@ impl Request {
.lock()
.unwrap()
.endpoints
.entry("requests".to_string())
.entry(self.endpoint.clone())
.and_modify(|stats| {
stats.last_error = Some(err.clone());
stats.errors += 1
Expand All @@ -478,7 +582,8 @@ impl Request {
};
let elapsed = self.issued.elapsed();
let mut stats = self.stats.lock().unwrap();
let mut stats = stats.endpoints.get_mut("requests").unwrap();
// let mut stats = stats.endpoints.entry(key)
let mut stats = stats.endpoints.get_mut(self.endpoint.as_str()).unwrap();
stats.requests += 1;
stats.processing_time += elapsed;
stats.average_processing_time = stats.processing_time.checked_div(2).unwrap();
Expand Down
Loading