Skip to content

Commit

Permalink
Prepare logic to handle chunks in native bridge.
Browse files Browse the repository at this point in the history
  • Loading branch information
tmpfs committed Dec 6, 2024
1 parent eefa2ff commit 5f96d11
Show file tree
Hide file tree
Showing 7 changed files with 281 additions and 69 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use anyhow::Result;
use http::Method;
use http::StatusCode;
use sos_ipc::{
local_transport::{HttpMessage, LocalRequest},
Expand Down Expand Up @@ -83,16 +82,12 @@ async fn integration_ipc_native_bridge_list_accounts() -> Result<()> {

tokio::time::sleep(Duration::from_millis(50)).await;

let mut request = LocalRequest {
method: Method::GET,
uri: ACCOUNTS_LIST.parse().unwrap(),
..Default::default()
};
let mut request = LocalRequest::get(ACCOUNTS_LIST.parse().unwrap());
request.set_request_id(1);

let (command, arguments) = super::native_bridge_cmd(SOCKET_NAME);
let mut client = NativeBridgeClient::new(command, arguments).await?;
let response = client.send(&request).await?;
let response = client.send(request).await?;
assert_eq!(StatusCode::OK, response.status().unwrap());

client.kill().await?;
Expand Down
9 changes: 2 additions & 7 deletions crates/integration_tests/tests/ipc/native_bridge_probe.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use anyhow::Result;
use http::Method;
use http::StatusCode;
use sos_ipc::{
local_transport::{HttpMessage, LocalRequest},
Expand All @@ -15,16 +14,12 @@ const SOCKET_NAME: &str = "ipc_native_bridge_probe.sock";
async fn integration_ipc_native_bridge_probe() -> Result<()> {
// crate::test_utils::init_tracing();

let mut request = LocalRequest {
method: Method::GET,
uri: "/probe".parse().unwrap(),
..Default::default()
};
let mut request = LocalRequest::get("/probe".parse().unwrap());
request.set_request_id(1);

let (command, arguments) = super::native_bridge_cmd(SOCKET_NAME);
let mut client = NativeBridgeClient::new(command, arguments).await?;
let response = client.send(&request).await?;
let response = client.send(request).await?;
assert_eq!(StatusCode::OK, response.status().unwrap());

client.kill().await?;
Expand Down
5 changes: 1 addition & 4 deletions crates/ipc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,7 @@ impl LocalSocketClient {

/// List accounts.
pub async fn list_accounts(&mut self) -> Result<Vec<PublicIdentity>> {
let request = LocalRequest {
uri: ACCOUNTS_LIST.parse()?,
..Default::default()
};
let request = LocalRequest::get(ACCOUNTS_LIST.parse()?);

let response = self.send_request(request).await?;
let status = response.status()?;
Expand Down
168 changes: 167 additions & 1 deletion crates/ipc/src/local_transport.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
//! Types used for communicating between apps on the same device.
//! Types used for HTTP communication between apps
//! on the same device.
//!
//! Wraps the `http` request and response types so we can
//! serialize and deserialize from JSON for transfer via
//! the browser native messaging API.
use crate::{Error, Result};

Expand Down Expand Up @@ -39,9 +44,32 @@ pub trait HttpMessage {
/// Message body.
fn body(&self) -> &[u8];

/// Mutable message body.
fn body_mut(&mut self) -> &mut Vec<u8>;

/// Consume the message body.
fn into_body(self) -> Vec<u8>;

/// Number of chunks.
fn chunks_len(&self) -> u32;

/// Zero-based chunk index of this message.
fn chunk_index(&self) -> u32;

/// Convert this message into a collection of chunks.
///
/// If the size of the body is less than limit then
/// only this message is included.
///
/// Conversion is performed on the number of bytes in the
/// body but the native messaging API restricts the serialized
/// JSON to 1MB so it's wise to choose a value smaller
/// than the 1MB limit so there is some headroom for the JSON
/// serialization overhead.
fn into_chunks(self, limit: usize, chunk_size: usize) -> Vec<Self>
where
Self: Sized;

/// Extract a request id.
///
/// If no header is present or the value is invalid zero
Expand Down Expand Up @@ -125,13 +153,42 @@ pub trait HttpMessage {
}
*/

/// Convert the message into parts.
fn into_parts(mut self) -> (Headers, Vec<u8>)
where
Self: Sized,
{
let headers =
std::mem::replace(self.headers_mut(), Default::default());
(headers, self.into_body())
}

/// Convert the body to bytes.
fn bytes(self) -> Bytes
where
Self: Sized,
{
self.into_body().into()
}

/// Convert from a collection of chunks into a response.
///
/// # Panics
///
/// If chunks is empty.
fn from_chunks(mut chunks: Vec<Self>) -> Self
where
Self: Sized,
{
chunks.sort_by(|a, b| a.chunk_index().cmp(&b.chunk_index()));
let mut it = chunks.into_iter();
let mut message = it.next().expect("to have one chunk");
for chunk in it {
let mut body = chunk.into_body();
message.body_mut().append(&mut body);
}
message
}
}

/// Request that can be sent to a local data source.
Expand All @@ -158,6 +215,8 @@ pub struct LocalRequest {
/// Request body.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub body: Vec<u8>,
/// Chunk information; length and then index.
chunks: (u32, u32),
}

impl Default for LocalRequest {
Expand All @@ -167,11 +226,23 @@ impl Default for LocalRequest {
uri: Uri::builder().path_and_query("/").build().unwrap(),
headers: Default::default(),
body: Default::default(),
chunks: (1, 0),
}
}
}

impl LocalRequest {
/// Create a GET request from a URI.
pub fn get(uri: Uri) -> Self {
Self {
method: Method::GET,
uri,
headers: Default::default(),
body: Default::default(),
chunks: (1, 0),
}
}

/// Duration allowed for a request.
pub fn timeout_duration(&self) -> Duration {
Duration::from_secs(15)
Expand All @@ -191,9 +262,54 @@ impl HttpMessage for LocalRequest {
self.body.as_slice()
}

fn body_mut(&mut self) -> &mut Vec<u8> {
&mut self.body
}

fn into_body(self) -> Vec<u8> {
self.body
}

fn chunks_len(&self) -> u32 {
self.chunks.0
}

fn chunk_index(&self) -> u32 {
self.chunks.1
}

fn into_chunks(self, limit: usize, chunk_size: usize) -> Vec<Self> {
if self.body.len() < limit {
vec![self]
} else {
let mut messages = Vec::new();
let uri = self.uri.clone();
let method = self.method.clone();
let (headers, body) = self.into_parts();
let len = if body.len() > chunk_size {
let mut len = body.len() / chunk_size;
if body.len() % chunk_size != 0 {
len += 1;
}
len
} else {
1
};
for (index, window) in
body.as_slice().chunks(chunk_size).enumerate()
{
let message = Self {
uri: uri.clone(),
method: method.clone(),
body: window.to_owned(),
headers: headers.clone(),
chunks: (len as u32, index as u32),
};
messages.push(message);
}
messages
}
}
}

impl fmt::Debug for LocalRequest {
Expand Down Expand Up @@ -222,6 +338,7 @@ impl From<Request<Vec<u8>>> for LocalRequest {
uri: parts.uri,
headers,
body,
chunks: (1, 0),
}
}
}
Expand Down Expand Up @@ -261,6 +378,8 @@ pub struct LocalResponse {
/// Response body.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub body: Vec<u8>,
/// Chunk information; length and then index.
chunks: (u32, u32),
}

impl Default for LocalResponse {
Expand All @@ -269,6 +388,7 @@ impl Default for LocalResponse {
status: StatusCode::OK.into(),
headers: Default::default(),
body: Default::default(),
chunks: (1, 0),
}
}
}
Expand Down Expand Up @@ -297,6 +417,7 @@ impl From<Response<Vec<u8>>> for LocalResponse {
status: parts.status.into(),
headers,
body,
chunks: (1, 0),
}
}
}
Expand All @@ -317,6 +438,7 @@ impl LocalResponse {
status: status.into(),
headers: Default::default(),
body: Default::default(),
chunks: (1, 0),
};
res.set_request_id(id);
res
Expand Down Expand Up @@ -350,7 +472,51 @@ impl HttpMessage for LocalResponse {
self.body.as_slice()
}

fn body_mut(&mut self) -> &mut Vec<u8> {
&mut self.body
}

fn into_body(self) -> Vec<u8> {
self.body
}

fn chunks_len(&self) -> u32 {
self.chunks.0
}

fn chunk_index(&self) -> u32 {
self.chunks.1
}

fn into_chunks(self, limit: usize, chunk_size: usize) -> Vec<Self> {
if self.body.len() < limit {
vec![self]
} else {
let mut messages = Vec::new();
let status = self.status.clone();
let (headers, body) = self.into_parts();
let len = if body.len() > chunk_size {
let mut len = body.len() / chunk_size;
if body.len() % chunk_size != 0 {
len += 1;
}
len
} else {
1
};
for (index, window) in
body.as_slice().chunks(chunk_size).enumerate()
{
let message = Self {
status,
headers: headers.clone(),
body: window.to_owned(),
chunks: (len as u32, index as u32),
};
messages.push(message);
}
println!("split into chunks: {}", messages.len());
messages
}
}
}
Loading

0 comments on commit 5f96d11

Please sign in to comment.