Skip to content

Commit

Permalink
WIP: refactor to HTTP transport.
Browse files Browse the repository at this point in the history
  • Loading branch information
tmpfs committed Dec 5, 2024
1 parent 3430784 commit 09b24eb
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ features = ["full"]
path = "../net"

[dev-dependencies.sos-ipc]
features = []
features = ["client", "server"]
path = "../ipc"

[dev-dependencies.sos-server]
Expand Down
7 changes: 5 additions & 2 deletions crates/ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ repository = "https://github.com/saveoursecrets/sdk"

[features]
default = []
native-bridge = ["open", "once_cell"]
client = []
server = ["hyper/http1", "hyper/server"]
native-bridge = ["client","open", "once_cell"]
native-send = ["tokio/process"]

[dependencies]
Expand All @@ -33,11 +35,12 @@ interprocess = { version = "2", features = ["tokio"] }

open = { version = "5", optional = true }
once_cell = { workspace = true, optional = true }
pin-project-lite = "0.2"

# HTTP dependencies for local server
http.workspace = true
tower = { version = "0.5", features = ["util"]}
hyper = { version = "1", features = ["http1", "server"] }
hyper = { version = "1" }
matchit = "0.7"

[build-dependencies]
Expand Down
103 changes: 103 additions & 0 deletions crates/ipc/src/io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
use pin_project_lite::pin_project;
use std::io::{Read, Write};
use std::pin::Pin;
use std::task::{Context, Poll};

pin_project! {
#[derive(Debug)]
pub struct TokioIo<T> {
#[pin]
inner: T,
}
}

impl<T> TokioIo<T> {
pub fn new(inner: T) -> Self {
Self { inner }
}

pub fn inner(self) -> T {
self.inner
}
}

impl<T> hyper::rt::Read for TokioIo<T>
where
T: Read,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
mut buf: hyper::rt::ReadBufCursor<'_>,
) -> Poll<Result<(), std::io::Error>> {
/*
let n = unsafe {
let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut());
match tokio::io::AsyncRead::poll_read(
self.project().inner,
cx,
&mut tbuf,
) {
Poll::Ready(Ok(())) => tbuf.filled().len(),
other => return other,
}
};
unsafe {
buf.advance(n);
}
Poll::Ready(Ok(()))
*/
todo!();
}
}

impl<T> hyper::rt::Write for TokioIo<T>
where
T: Write,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
// tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf)
todo!();
}

fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
// tokio::io::AsyncWrite::poll_flush(self.project().inner, cx)
todo!();
}

fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
// tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx)
todo!();
}

fn is_write_vectored(&self) -> bool {
// tokio::io::AsyncWrite::is_write_vectored(&self.inner)
todo!();
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> Poll<Result<usize, std::io::Error>> {
/*
tokio::io::AsyncWrite::poll_write_vectored(
self.project().inner,
cx,
bufs,
)
*/
todo!();
}
}
11 changes: 11 additions & 0 deletions crates/ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,18 @@
mod error;

mod bindings;
#[cfg(feature = "client")]
mod client;
pub(crate) mod codec;
#[cfg(feature = "native-bridge")]
pub mod native_bridge;

#[cfg(any(feature = "client", feature = "server"))]
pub(crate) mod io;

#[cfg(feature = "server")]
mod server;
#[cfg(feature = "server")]
mod service;

pub use error::Error;
Expand All @@ -30,8 +37,12 @@ pub(crate) use bindings::*;
/// Result type for the library.
pub type Result<T> = std::result::Result<T, Error>;

#[cfg(feature = "client")]
pub use client::{AppIntegration, SocketClient};

#[cfg(feature = "server")]
pub use server::SocketServer;
#[cfg(feature = "server")]
pub use service::{IpcService, IpcServiceOptions};

/// Information about the service.
Expand Down
36 changes: 28 additions & 8 deletions crates/ipc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use sos_protocol::local_transport::{LocalRequest, LocalResponse};
use std::{pin::Pin, sync::Arc};

use futures_util::sink::SinkExt;
use hyper::{
body::Incoming, server::conn::http1::Builder, service::HttpService,
};
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::RwLock,
Expand All @@ -18,8 +21,8 @@ use tokio_util::{
};

use crate::{
codec, decode_proto, encode_proto, io_err, IpcService, WireLocalRequest,
WireLocalResponse,
codec, decode_proto, encode_proto, io::TokioIo, io_err, IpcService,
WireLocalRequest, WireLocalResponse,
};

use crate::Result;
Expand All @@ -29,10 +32,11 @@ pub struct SocketServer;

impl SocketServer {
/// Listen on a bind address.
pub async fn listen(
socket_name: &str,
service: Arc<RwLock<IpcService>>,
) -> Result<()> {
pub async fn listen<S>(socket_name: &str, service: S) -> Result<()>
where
S: HttpService<Incoming> + Send + 'static,
// B: Incoming + Send + Sync + 'static,
{
let name = socket_name.to_ns_name::<GenericNamespaced>()?;
let opts = ListenerOptions::new().name(name);
let listener = match opts.create_tokio() {
Expand All @@ -47,15 +51,30 @@ impl SocketServer {

loop {
let socket = listener.accept().await?;
let service = service.clone();
// let service = service.clone();
//
// socket.foo();

tokio::spawn(async move {
handle_conn(service, socket).await;
let socket = TokioIo::new(socket);
// handle_conn(service, socket).await;

let http = Builder::new();
let conn = http.serve_connection(socket, service);

/*
if let Err(e) = conn.await {
eprintln!("server connection error: {}", e);
}
*/

todo!();
});
}
}
}

/*
async fn handle_conn<T>(service: Arc<RwLock<IpcService>>, socket: T)
where
T: AsyncRead + AsyncWrite + Sized,
Expand Down Expand Up @@ -144,3 +163,4 @@ where
channel.send(buffer.into()).await?;
Ok(())
}
*/

0 comments on commit 09b24eb

Please sign in to comment.