Skip to content
This repository has been archived by the owner on Jul 12, 2024. It is now read-only.

Commit

Permalink
[fire-stream-api] Publish v0.3.2
Browse files Browse the repository at this point in the history
- add testing api
  • Loading branch information
soerenmeier committed Dec 25, 2023
1 parent 0d9072a commit d791ad2
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 71 deletions.
7 changes: 7 additions & 0 deletions fire-stream-api/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 0.3.2 - 2023-12-25
- add new api to make a request to a server without going over any network stack.
Allowing to just pass bytes (useful for testing)

## 0.3.1 - 2023-12-23
- add tracing

## 0.3.0 - 2023-10-31
- Bump msrv to 1.67
- update fire-crypto 0.4
Expand Down
2 changes: 1 addition & 1 deletion fire-stream-api/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "fire-stream-api"
description = "A more or less simple communication protocol library."
version = "0.3.1"
version = "0.3.2"
authors = ["Sören meier <[email protected]>"]
repository = "https://github.com/fire-lib/fire-stream"
edition = "2021"
Expand Down
257 changes: 188 additions & 69 deletions fire-stream-api/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::message::{Action, Message};
use crate::request::RequestHandler;
use crate::error::{RequestError, ApiError};
use crate::message::{Action, Message, IntoMessage, FromMessage};
use crate::request::{RequestHandler, Request};

use stream::util::{SocketAddr, Listener, ListenerExt};
use stream::packet::{Packet, PacketBytes};
Expand Down Expand Up @@ -123,22 +124,170 @@ where
{
/// If this is set to true
/// errors which are returned in `#[api(*)]` functions are logged to tracing
#[deprecated]
pub fn set_log_errors(&mut self, log: bool) {
self.data.cfg.log_errors = log;
}

async fn run_raw<F>(self, new_connection: F) -> io::Result<()>
/// optionally or just use run
pub fn build(self) -> BuiltServer<A, B, L, More> {
let shared = Arc::new(Shared {
requests: self.requests,
data: self.data
});

BuiltServer {
inner: self.inner,
shared,
more: self.more
}
}
}

impl<A, L> Server<A, PlainBytes, L, ()>
where
A: Action,
L: Listener
{
pub fn new(listener: L, cfg: Config) -> Self {
Self {
inner: listener,
requests: Requests::new(),
data: Data::new(),
cfg,
more: ()
}
}

pub async fn run(self) -> io::Result<()>
where A: Send + Sync + 'static {
let cfg = self.cfg.clone();

self.build().run_raw(|_, stream| {
Connection::new(stream, cfg.clone())
}).await
}

}

#[cfg(feature = "encrypted")]
#[cfg_attr(docsrs, doc(cfg(feature = "encrypted")))]
impl<A, L> Server<A, EncryptedBytes, L, Keypair>
where
A: Action,
L: Listener
{
pub fn new_encrypted(listener: L, cfg: Config, key: Keypair) -> Self {
Self {
inner: listener,
requests: Requests::new(),
data: Data::new(),
cfg,
more: key
}
}

pub async fn run(self) -> io::Result<()>
where A: Send + Sync + 'static {
let cfg = self.cfg.clone();

self.build().run_raw(move |key, stream| {
Connection::new_encrypted(stream, cfg.clone(), key.clone())
}).await
}
}

// impl

struct Shared<A, B> {
requests: Requests<A, B>,
data: Data
}

pub struct BuiltServer<A, B, L, More> {
inner: L,
shared: Arc<Shared<A, B>>,
more: More
}

impl<A, B, L, More> BuiltServer<A, B, L, More>
where
A: Action,
L: Listener
{
pub async fn request<R>(
&self,
r: R,
session: &Arc<Session>
) -> Result<R::Response, R::Error>
where
R: Request<Action=A>,
R: IntoMessage<A, B>,
R::Response: FromMessage<A, B>,
R::Error: FromMessage<A, B>,
B: PacketBytes
{
let mut msg = r.into_message()
.map_err(R::Error::from_message_error)?;
msg.header_mut().set_action(R::ACTION);

// handle the request
let action = *msg.action().unwrap();

let handler = match self.shared.requests.get(&action) {
Some(handler) => handler,
// todo once we bump the version again
// we need to pass our own errors via packets
// not only those from the api users
None => {
tracing::error!("no handler for {:?}", action);
return Err(R::Error::from_request_error(
RequestError::NoResponse
))
}
};

let r = handler.handle(
msg,
&self.shared.data,
session
).await;

let res = match r {
Ok(mut msg) => {
msg.header_mut().set_action(action);
msg
},
Err(e) => {
// todo once we bump the version again
// we need to pass our own errors via packets
// not only those from the api users
tracing::error!(
"handler returned an error {:?}", e
);

return Err(R::Error::from_request_error(
RequestError::NoResponse
))
}
};

// now deserialize the response
if res.is_success() {
R::Response::from_message(res)
.map_err(R::Error::from_message_error)
} else {
R::Error::from_message(res)
.map(Err)
.map_err(R::Error::from_message_error)?
}
}

async fn run_raw<F>(&mut self, new_connection: F) -> io::Result<()>
where
A: Action + Send + Sync + 'static,
B: PacketBytes + Send + 'static,
F: Fn(&More, L::Stream) -> Connection<Message<A, B>>
{
let s = Arc::new(Shared {
requests: self.requests,
data: self.data
});

loop {

// should we fail here??
Expand All @@ -148,7 +297,7 @@ where
let session = Arc::new(Session::new(addr));
session.set(con.configurator());

let share = s.clone();
let share = self.shared.clone();
tokio::spawn(async move {
while let Some(req) = con.receive().await {
// todo replace with let else
Expand Down Expand Up @@ -209,7 +358,6 @@ where
});
}
}

}

pub struct Session {
Expand Down Expand Up @@ -253,74 +401,20 @@ impl Session {
}
}

impl<A, L> Server<A, PlainBytes, L, ()>
where
A: Action,
L: Listener
{
pub fn new(listener: L, cfg: Config) -> Self {
Self {
inner: listener,
requests: Requests::new(),
data: Data::new(),
cfg,
more: ()
}
}

pub async fn run(self) -> io::Result<()>
where A: Send + Sync + 'static {
let cfg = self.cfg.clone();
self.run_raw(|_, stream| {
Connection::new(stream, cfg.clone())
}).await
}

}

#[cfg(feature = "encrypted")]
#[cfg_attr(docsrs, doc(cfg(feature = "encrypted")))]
impl<A, L> Server<A, EncryptedBytes, L, Keypair>
where
A: Action,
L: Listener
{
pub fn new_encrypted(listener: L, cfg: Config, key: Keypair) -> Self {
Self {
inner: listener,
requests: Requests::new(),
data: Data::new(),
cfg,
more: key
}
}

pub async fn run(self) -> io::Result<()>
where A: Send + Sync + 'static {
let cfg = self.cfg.clone();
self.run_raw(move |key, stream| {
Connection::new_encrypted(stream, cfg.clone(), key.clone())
}).await
}
}

// impl

struct Shared<A, B> {
requests: Requests<A, B>,
data: Data
}


#[cfg(all(test, feature = "json"))]
mod json_tests {
use super::*;

use codegen::{IntoMessage, FromMessage, api};
use crate::request::Request;
use crate::message;
use crate::error;

use std::fmt;

use stream::util::testing::PanicListener;

use serde::{Serialize, Deserialize};


Expand Down Expand Up @@ -410,6 +504,31 @@ mod json_tests {
hi: req.hello
})
}

#[tokio::test]
async fn test_direct_request() {
let mut server = Server::new(PanicListener::new(), Config {
timeout: std::time::Duration::from_millis(10),
body_limit: 4096
});

server.register_request(test);
server.register_request(test_2);

let server = server.build();
let session = Arc::new(Session::new(
SocketAddr::V4("127.0.0.1:8080".parse().unwrap())
));

let r = server.request(TestReq { hello: 100 }, &session).await.unwrap();
assert_eq!(r.hi, 100);

let r = server.request(
TestReq2 { hello: 100 },
&session
).await.unwrap();
assert_eq!(r.hi, 100);
}
}


Expand Down
2 changes: 1 addition & 1 deletion fire-stream/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## 0.4.2 - 2023-12-23
## 0.4.2 - 2023-12-25
- add `util::testing::PanicListener` to make it easier to test fire-stream-api

## 0.4.1 - 2023-12-23
Expand Down

0 comments on commit d791ad2

Please sign in to comment.