From d791ad2619fd8f64755dc22e27846eddc832210e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Meier?= Date: Mon, 25 Dec 2023 23:14:17 +0100 Subject: [PATCH] [fire-stream-api] Publish v0.3.2 - add testing api --- fire-stream-api/CHANGELOG.md | 7 + fire-stream-api/Cargo.toml | 2 +- fire-stream-api/src/server.rs | 257 +++++++++++++++++++++++++--------- fire-stream/CHANGELOG.md | 2 +- 4 files changed, 197 insertions(+), 71 deletions(-) diff --git a/fire-stream-api/CHANGELOG.md b/fire-stream-api/CHANGELOG.md index ef29161..0da649a 100644 --- a/fire-stream-api/CHANGELOG.md +++ b/fire-stream-api/CHANGELOG.md @@ -1,3 +1,10 @@ +## 0.3.2 - 2023-12-25 +- add new api to make a request to a server without going over any network stack. +Allowing to just pass bytes (useful for testing) + +## 0.3.1 - 2023-12-23 +- add tracing + ## 0.3.0 - 2023-10-31 - Bump msrv to 1.67 - update fire-crypto 0.4 diff --git a/fire-stream-api/Cargo.toml b/fire-stream-api/Cargo.toml index f206baf..1710ce5 100644 --- a/fire-stream-api/Cargo.toml +++ b/fire-stream-api/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "fire-stream-api" description = "A more or less simple communication protocol library." -version = "0.3.1" +version = "0.3.2" authors = ["Sören meier "] repository = "https://github.com/fire-lib/fire-stream" edition = "2021" diff --git a/fire-stream-api/src/server.rs b/fire-stream-api/src/server.rs index d49a841..f9b0660 100644 --- a/fire-stream-api/src/server.rs +++ b/fire-stream-api/src/server.rs @@ -1,5 +1,6 @@ -use crate::message::{Action, Message}; -use crate::request::RequestHandler; +use crate::error::{RequestError, ApiError}; +use crate::message::{Action, Message, IntoMessage, FromMessage}; +use crate::request::{RequestHandler, Request}; use stream::util::{SocketAddr, Listener, ListenerExt}; use stream::packet::{Packet, PacketBytes}; @@ -123,22 +124,170 @@ where { /// If this is set to true /// errors which are returned in `#[api(*)]` functions are logged to tracing - #[deprecated] pub fn set_log_errors(&mut self, log: bool) { self.data.cfg.log_errors = log; } - async fn run_raw(self, new_connection: F) -> io::Result<()> + /// optionally or just use run + pub fn build(self) -> BuiltServer { + let shared = Arc::new(Shared { + requests: self.requests, + data: self.data + }); + + BuiltServer { + inner: self.inner, + shared, + more: self.more + } + } +} + +impl Server +where + A: Action, + L: Listener +{ + pub fn new(listener: L, cfg: Config) -> Self { + Self { + inner: listener, + requests: Requests::new(), + data: Data::new(), + cfg, + more: () + } + } + + pub async fn run(self) -> io::Result<()> + where A: Send + Sync + 'static { + let cfg = self.cfg.clone(); + + self.build().run_raw(|_, stream| { + Connection::new(stream, cfg.clone()) + }).await + } + +} + +#[cfg(feature = "encrypted")] +#[cfg_attr(docsrs, doc(cfg(feature = "encrypted")))] +impl Server +where + A: Action, + L: Listener +{ + pub fn new_encrypted(listener: L, cfg: Config, key: Keypair) -> Self { + Self { + inner: listener, + requests: Requests::new(), + data: Data::new(), + cfg, + more: key + } + } + + pub async fn run(self) -> io::Result<()> + where A: Send + Sync + 'static { + let cfg = self.cfg.clone(); + + self.build().run_raw(move |key, stream| { + Connection::new_encrypted(stream, cfg.clone(), key.clone()) + }).await + } +} + +// impl + +struct Shared { + requests: Requests, + data: Data +} + +pub struct BuiltServer { + inner: L, + shared: Arc>, + more: More +} + +impl BuiltServer +where + A: Action, + L: Listener +{ + pub async fn request( + &self, + r: R, + session: &Arc + ) -> Result + where + R: Request, + R: IntoMessage, + R::Response: FromMessage, + R::Error: FromMessage, + B: PacketBytes + { + let mut msg = r.into_message() + .map_err(R::Error::from_message_error)?; + msg.header_mut().set_action(R::ACTION); + + // handle the request + let action = *msg.action().unwrap(); + + let handler = match self.shared.requests.get(&action) { + Some(handler) => handler, + // todo once we bump the version again + // we need to pass our own errors via packets + // not only those from the api users + None => { + tracing::error!("no handler for {:?}", action); + return Err(R::Error::from_request_error( + RequestError::NoResponse + )) + } + }; + + let r = handler.handle( + msg, + &self.shared.data, + session + ).await; + + let res = match r { + Ok(mut msg) => { + msg.header_mut().set_action(action); + msg + }, + Err(e) => { + // todo once we bump the version again + // we need to pass our own errors via packets + // not only those from the api users + tracing::error!( + "handler returned an error {:?}", e + ); + + return Err(R::Error::from_request_error( + RequestError::NoResponse + )) + } + }; + + // now deserialize the response + if res.is_success() { + R::Response::from_message(res) + .map_err(R::Error::from_message_error) + } else { + R::Error::from_message(res) + .map(Err) + .map_err(R::Error::from_message_error)? + } + } + + async fn run_raw(&mut self, new_connection: F) -> io::Result<()> where A: Action + Send + Sync + 'static, B: PacketBytes + Send + 'static, F: Fn(&More, L::Stream) -> Connection> { - let s = Arc::new(Shared { - requests: self.requests, - data: self.data - }); - loop { // should we fail here?? @@ -148,7 +297,7 @@ where let session = Arc::new(Session::new(addr)); session.set(con.configurator()); - let share = s.clone(); + let share = self.shared.clone(); tokio::spawn(async move { while let Some(req) = con.receive().await { // todo replace with let else @@ -209,7 +358,6 @@ where }); } } - } pub struct Session { @@ -253,67 +401,11 @@ impl Session { } } -impl Server -where - A: Action, - L: Listener -{ - pub fn new(listener: L, cfg: Config) -> Self { - Self { - inner: listener, - requests: Requests::new(), - data: Data::new(), - cfg, - more: () - } - } - - pub async fn run(self) -> io::Result<()> - where A: Send + Sync + 'static { - let cfg = self.cfg.clone(); - self.run_raw(|_, stream| { - Connection::new(stream, cfg.clone()) - }).await - } - -} - -#[cfg(feature = "encrypted")] -#[cfg_attr(docsrs, doc(cfg(feature = "encrypted")))] -impl Server -where - A: Action, - L: Listener -{ - pub fn new_encrypted(listener: L, cfg: Config, key: Keypair) -> Self { - Self { - inner: listener, - requests: Requests::new(), - data: Data::new(), - cfg, - more: key - } - } - - pub async fn run(self) -> io::Result<()> - where A: Send + Sync + 'static { - let cfg = self.cfg.clone(); - self.run_raw(move |key, stream| { - Connection::new_encrypted(stream, cfg.clone(), key.clone()) - }).await - } -} - -// impl - -struct Shared { - requests: Requests, - data: Data -} - #[cfg(all(test, feature = "json"))] mod json_tests { + use super::*; + use codegen::{IntoMessage, FromMessage, api}; use crate::request::Request; use crate::message; @@ -321,6 +413,8 @@ mod json_tests { use std::fmt; + use stream::util::testing::PanicListener; + use serde::{Serialize, Deserialize}; @@ -410,6 +504,31 @@ mod json_tests { hi: req.hello }) } + + #[tokio::test] + async fn test_direct_request() { + let mut server = Server::new(PanicListener::new(), Config { + timeout: std::time::Duration::from_millis(10), + body_limit: 4096 + }); + + server.register_request(test); + server.register_request(test_2); + + let server = server.build(); + let session = Arc::new(Session::new( + SocketAddr::V4("127.0.0.1:8080".parse().unwrap()) + )); + + let r = server.request(TestReq { hello: 100 }, &session).await.unwrap(); + assert_eq!(r.hi, 100); + + let r = server.request( + TestReq2 { hello: 100 }, + &session + ).await.unwrap(); + assert_eq!(r.hi, 100); + } } diff --git a/fire-stream/CHANGELOG.md b/fire-stream/CHANGELOG.md index 67b04ea..28282d2 100644 --- a/fire-stream/CHANGELOG.md +++ b/fire-stream/CHANGELOG.md @@ -1,4 +1,4 @@ -## 0.4.2 - 2023-12-23 +## 0.4.2 - 2023-12-25 - add `util::testing::PanicListener` to make it easier to test fire-stream-api ## 0.4.1 - 2023-12-23