Skip to content

Commit

Permalink
Update server to expect HTTP on named pipe.
Browse files Browse the repository at this point in the history
  • Loading branch information
tmpfs committed Dec 6, 2024
1 parent 09b24eb commit dbe5fb3
Show file tree
Hide file tree
Showing 13 changed files with 146 additions and 273 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 15 additions & 11 deletions crates/integration_tests/tests/ipc/app_info.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Result;
use sos_ipc::{
remove_socket_file, AppIntegration, Error, IpcService, IpcServiceOptions,
ServiceAppInfo, SocketClient, SocketServer,
remove_socket_file, AppIntegration, Error, ServiceAppInfo,
ServiceOptions, SocketClient, SocketServer,
};
use sos_net::sdk::{prelude::LocalAccountSwitcher, Paths};
use sos_test_utils::teardown;
Expand Down Expand Up @@ -35,21 +35,25 @@ async fn integration_ipc_app_info() -> Result<()> {
let build_number = 1u32;

// Start the IPC service
/*
let service = Arc::new(RwLock::new(IpcService::new(
ipc_accounts,
IpcServiceOptions {
app_info: Some(ServiceAppInfo {
name: name.to_string(),
version: version.to_string(),
build_number,
}),
..Default::default()
},
)));
*/

let options = ServiceOptions {
app_info: Some(ServiceAppInfo {
name: name.to_string(),
version: version.to_string(),
build_number,
}),
..Default::default()
};

let server_socket_name = socket_name.clone();
tokio::task::spawn(async move {
SocketServer::listen(&server_socket_name, service).await?;
SocketServer::listen(&server_socket_name, ipc_accounts, options)
.await?;
Ok::<(), Error>(())
});

Expand Down
16 changes: 8 additions & 8 deletions crates/integration_tests/tests/ipc/list_accounts.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use anyhow::Result;
use sos_ipc::{
remove_socket_file, AppIntegration, Error, IpcService, SocketClient,
SocketServer,
remove_socket_file, AppIntegration, Error, SocketClient, SocketServer,
};
use sos_net::sdk::{
crypto::AccessKey,
Expand Down Expand Up @@ -64,15 +63,16 @@ async fn integration_ipc_list_accounts() -> Result<()> {
accounts.add_account(auth_account);
accounts.add_account(unauth_account);

// Start the IPC service
let service = Arc::new(RwLock::new(IpcService::new(
Arc::new(RwLock::new(accounts)),
Default::default(),
)));
let ipc_accounts = Arc::new(RwLock::new(accounts));

let server_socket_name = socket_name.clone();
tokio::task::spawn(async move {
SocketServer::listen(&server_socket_name, service).await?;
SocketServer::listen(
&server_socket_name,
ipc_accounts,
Default::default(),
)
.await?;
Ok::<(), Error>(())
});

Expand Down
15 changes: 8 additions & 7 deletions crates/integration_tests/tests/ipc/local_sync.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::Result;
use sos_ipc::{remove_socket_file, Error, IpcService, SocketServer};
use sos_ipc::{remove_socket_file, Error, SocketServer};
use sos_net::{
protocol::{
integration::{LinkedAccount, LocalClient, LocalIntegration},
Expand Down Expand Up @@ -64,15 +64,16 @@ async fn integration_ipc_local_sync() -> Result<()> {
local_accounts.switch_account(&address);
let local_accounts = Arc::new(RwLock::new(local_accounts));

// Start the IPC service
let service = Arc::new(RwLock::new(IpcService::new(
local_accounts.clone(),
Default::default(),
)));
let ipc_accounts = local_accounts.clone();

let server_socket_name = socket_name.clone();
tokio::task::spawn(async move {
SocketServer::listen(&server_socket_name, service).await?;
SocketServer::listen(
&server_socket_name,
ipc_accounts,
Default::default(),
)
.await?;
Ok::<(), Error>(())
});

Expand Down
2 changes: 2 additions & 0 deletions crates/ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ http.workspace = true
tower = { version = "0.5", features = ["util"]}
hyper = { version = "1" }
matchit = "0.7"
bytes.workspace = true
http-body-util = "0.1"

[build-dependencies]
rustc_version = "0.4.1"
Expand Down
36 changes: 11 additions & 25 deletions crates/ipc/src/io.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,31 @@
use pin_project_lite::pin_project;
use std::io::{Read, Write};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};

pin_project! {
#[derive(Debug)]
pub struct TokioIo<T> {
pub struct TokioAdapter<T> {
#[pin]
inner: T,
}
}

impl<T> TokioIo<T> {
impl<T> TokioAdapter<T> {
pub fn new(inner: T) -> Self {
Self { inner }
}

pub fn inner(self) -> T {
self.inner
}
}

impl<T> hyper::rt::Read for TokioIo<T>
impl<T> hyper::rt::Read for TokioAdapter<T>
where
T: Read,
T: AsyncRead,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
mut buf: hyper::rt::ReadBufCursor<'_>,
) -> Poll<Result<(), std::io::Error>> {
/*
let n = unsafe {
let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut());
match tokio::io::AsyncRead::poll_read(
Expand All @@ -47,57 +42,48 @@ where
buf.advance(n);
}
Poll::Ready(Ok(()))
*/
todo!();
}
}

impl<T> hyper::rt::Write for TokioIo<T>
impl<T> hyper::rt::Write for TokioAdapter<T>
where
T: Write,
T: AsyncWrite,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
// tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf)
todo!();
tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf)
}

fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
// tokio::io::AsyncWrite::poll_flush(self.project().inner, cx)
todo!();
tokio::io::AsyncWrite::poll_flush(self.project().inner, cx)
}

fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
// tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx)
todo!();
tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx)
}

fn is_write_vectored(&self) -> bool {
// tokio::io::AsyncWrite::is_write_vectored(&self.inner)
todo!();
tokio::io::AsyncWrite::is_write_vectored(&self.inner)
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> Poll<Result<usize, std::io::Error>> {
/*
tokio::io::AsyncWrite::poll_write_vectored(
self.project().inner,
cx,
bufs,
)
*/
todo!();
}
}
9 changes: 4 additions & 5 deletions crates/ipc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![deny(missing_docs)]
#![forbid(unsafe_code)]
#![cfg_attr(all(doc, CHANNEL_NIGHTLY), feature(doc_auto_cfg))]
//! Inter-process communcation library for [Save Our Secrets](https://saveoursecrets.com/).
//!
Expand All @@ -25,10 +24,12 @@ pub mod native_bridge;
#[cfg(any(feature = "client", feature = "server"))]
pub(crate) mod io;

#[cfg(feature = "server")]
mod local_server;
#[cfg(feature = "server")]
mod server;
#[cfg(feature = "server")]
mod service;
pub(crate) use local_server::LocalServer;

pub use error::Error;

Expand All @@ -41,9 +42,7 @@ pub type Result<T> = std::result::Result<T, Error>;
pub use client::{AppIntegration, SocketClient};

#[cfg(feature = "server")]
pub use server::SocketServer;
#[cfg(feature = "server")]
pub use service::{IpcService, IpcServiceOptions};
pub use server::{ServiceOptions, SocketServer};

/// Information about the service.
#[typeshare::typeshare]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use http::{Request, Response, StatusCode};
use http_body_util::Full;
use hyper::body::Bytes;
use sos_protocol::{
server_helpers, Merge, SyncPacket, SyncStorage, WireEncodeDecode,
Expand All @@ -9,7 +10,8 @@ use tokio::sync::RwLock;

use super::{
bad_request, forbidden, internal_server_error, json, not_found, ok,
parse_account_id, protobuf, protobuf_compress, Body, Incoming,
parse_account_id, protobuf, protobuf_compress, read_bytes, Body,
Incoming,
};

/// List of account public identities.
Expand Down Expand Up @@ -133,7 +135,7 @@ where
return internal_server_error("fetch_account::encode");
};

protobuf_compress(buffer)
protobuf_compress(Full::new(Bytes::from(buffer)))
}
Err(e) => internal_server_error(e),
}
Expand Down Expand Up @@ -190,7 +192,7 @@ where
let Ok(buffer) = status.encode().await else {
return internal_server_error("sync_status::encode");
};
protobuf(buffer)
protobuf(Full::new(Bytes::from(buffer)))
}
Err(e) => internal_server_error(e),
}
Expand Down Expand Up @@ -224,7 +226,7 @@ where
if let Some(account) =
accounts.iter_mut().find(|a| a.address() == &account_id)
{
let buf: Bytes = req.into_body().into();
let buf: Bytes = read_bytes(req).await?;
let Ok(packet) = SyncPacket::decode(buf).await else {
return bad_request();
};
Expand All @@ -235,7 +237,7 @@ where
return internal_server_error("sync_account::encode");
};

protobuf(response)
protobuf(Full::new(Bytes::from(response)))
}
Err(e) => internal_server_error(e),
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use bytes::Bytes;
use http::{
header::{CONTENT_ENCODING, CONTENT_TYPE},
Request, Response, StatusCode,
};
use http_body_util::{BodyExt, Full};
use serde::Serialize;
use sos_protocol::constants::{
ENCODING_ZLIB, ENCODING_ZSTD, MIME_TYPE_JSON, MIME_TYPE_PROTOBUF,
Expand All @@ -17,6 +19,10 @@ struct ErrorReply {
message: String,
}

pub async fn read_bytes(req: Request<Incoming>) -> hyper::Result<Bytes> {
Ok(req.collect().await?.to_bytes())
}

pub fn parse_account_id(req: &Request<Incoming>) -> Option<Address> {
let Some(Ok(account_id)) =
req.headers().get(X_SOS_ACCOUNT_ID).map(|v| v.to_str())
Expand Down Expand Up @@ -78,7 +84,7 @@ pub fn json<S: Serialize>(
let response = Response::builder()
.status(status)
.header(CONTENT_TYPE, MIME_TYPE_JSON)
.body(body)
.body(Full::new(Bytes::from(body)))
.unwrap();
Ok(response)
}
Expand All @@ -93,15 +99,17 @@ pub fn protobuf(body: Body) -> hyper::Result<Response<Body>> {
}

pub fn protobuf_compress(body: Body) -> hyper::Result<Response<Body>> {
/*
use sos_protocol::compression::zlib;
let Ok(buf) = zlib::encode_all(body.as_slice()) else {
return internal_server_error("zlib::compress");
};
*/
Ok(Response::builder()
.status(StatusCode::OK)
.header(CONTENT_ENCODING, ENCODING_ZLIB)
// .header(CONTENT_ENCODING, ENCODING_ZLIB)
.header(CONTENT_TYPE, MIME_TYPE_PROTOBUF)
.body(buf)
.body(body)
.unwrap())
}

Expand Down
Loading

0 comments on commit dbe5fb3

Please sign in to comment.