Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Water-Wasm internals refactor #8

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ toml = "0.5.9"
lazy_static = "1.4"
url = { version = "2.2.2", features = ["serde"] }
libc = "0.2.147"

[dev-dependencies]
tempfile = "3.8.0"
20 changes: 15 additions & 5 deletions crates/wasm/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use super::*;
use std::net::SocketAddr;

use anyhow::Result;
use serde::{Deserialize, Serialize};

// A Config currently contains the local + remote ip & port
#[derive(Debug, Deserialize, Clone)]
pub struct Config {
pub local_address: String,
pub local_port: u32,
pub local_port: u16,
pub remote_address: String,
pub remote_port: u32,
pub remote_port: u16,
}

impl Default for Config {
Expand All @@ -25,6 +28,13 @@ impl Config {
remote_port: 8082,
}
}

pub fn dst_addr(&self) -> Result<SocketAddr> {
Ok(SocketAddr::new(
self.remote_address.parse()?,
self.remote_port,
))
}
}

// ============ Some implementation for V1 ============
Expand All @@ -33,13 +43,13 @@ impl Config {
#[derive(Serialize, Deserialize)]
pub struct StreamConfigV1 {
pub addr: String,
pub port: u32,
pub port: u16,
pub name: String,
}

// #[cfg(feature = "v1")]
impl StreamConfigV1 {
pub fn init(addr: String, port: u32, name: String) -> Self {
pub fn init(addr: String, port: u16, name: String) -> Self {
StreamConfigV1 { addr, port, name }
}
}
294 changes: 39 additions & 255 deletions crates/wasm/src/connections.rs
Original file line number Diff line number Diff line change
@@ -1,282 +1,66 @@
use super::*;
use std::io::{self, Read, Write};

// ConnStream can store either a network stream Or a file stream
pub enum ConnStream {
pub enum ConnStream<'a> {
Uninitialized,
TcpStream(std::net::TcpStream),
File(std::fs::File),
FileRef(&'a std::fs::File),
}

impl ConnStream {
impl<'a> ConnStream<'a> {
pub fn as_read(&mut self) -> &mut dyn Read {
match self {
ConnStream::TcpStream(stream) => stream,
ConnStream::File(stream) => stream,
ConnStream::FileRef(stream) => stream,
_ => panic!("ConnStream is uninitialized"),
}
}
}

// ConnFile is the struct for a connection -- either for in / outbound
pub struct ConnFile {
pub fd: i32,
pub file: Option<ConnStream>,
}

impl Default for ConnFile {
fn default() -> Self {
Self::new()
}
}

impl ConnFile {
// A default constructor for ConnFile
pub fn new() -> Self {
ConnFile { fd: -1, file: None }
}

pub fn read(&mut self, buf: &mut [u8]) -> Result<i64, anyhow::Error> {
match &mut self.file {
Some(stream) => {
let bytes_read = match stream {
ConnStream::TcpStream(stream) => {
stream.read(buf).map_err(anyhow::Error::from)?
}
ConnStream::File(stream) => stream.read(buf).map_err(anyhow::Error::from)?,
};
Ok(bytes_read as i64)
}
None => {
eprintln!("[WASM] > ERROR: ConnFile's file is None");
Err(anyhow::anyhow!("ConnFile's file is None"))
impl<'a> Read for ConnStream<'a> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self {
ConnStream::TcpStream(stream) => stream.read(buf),
ConnStream::File(stream) => stream.read(buf),
ConnStream::FileRef(stream) => stream.read(buf),
_ => {
return Err(io::Error::new(
io::ErrorKind::Other,
"ConnStream is uninitialized",
))
}
}
}

pub fn write(&mut self, buf: &[u8]) -> Result<(), anyhow::Error> {
match &mut self.file {
Some(stream) => match stream {
ConnStream::TcpStream(stream) => stream.write_all(buf).map_err(anyhow::Error::from),
ConnStream::File(stream) => stream.write_all(buf).map_err(anyhow::Error::from),
},
None => Err(anyhow::anyhow!("[WASM] > ERROR: ConnFile's file is None")),
}
}
}

// A Connection normally contains both in & outbound streams + a config
pub struct Connection {
pub inbound_conn: ConnFile,
pub outbound_conn: ConnFile,

pub config: Config,
}

impl Default for Connection {
fn default() -> Self {
Self::new()
}
}

impl Connection {
// A default constructor
pub fn new() -> Self {
Connection {
inbound_conn: ConnFile::new(),
outbound_conn: ConnFile::new(),

config: Config::new(),
}
}

pub fn set_inbound(&mut self, fd: i32, stream: ConnStream) {
if fd < 0 {
eprintln!("[WASM] > ERROR: fd is negative");
return;
}

if self.inbound_conn.fd != -1 {
eprintln!("[WASM] > ERROR: inbound_conn.fd has been set");
return;
}

self.inbound_conn.fd = fd;
self.inbound_conn.file = Some(stream);
}

pub fn set_outbound(&mut self, fd: i32, stream: ConnStream) {
if fd < 0 {
eprintln!("[WASM] > ERROR: fd is negative");
return;
}

if self.outbound_conn.fd != -1 {
eprintln!("[WASM] > ERROR: outbound_conn.fd has been set");
return;
}

self.outbound_conn.fd = fd;
self.outbound_conn.file = Some(stream);
}

// pub fn decoder_read_from_outbound<D: AsyncDecodeReader>(&mut self, decoder: &mut D, buf: &mut [u8]) -> Result<i64, anyhow::Error> {
// debug!("[WASM] running in decoder_read_from_outbound");

// // match self.outbound_conn.file.as_mut().unwrap() {
// // ConnStream::TcpStream(stream) => {
// // decoder.read_decrypted(stream);
// // },
// // ConnStream::File(stream) => {
// // decoder.read_decrypted(stream);
// // },
// // }
// Ok(decoder.poll_read_decrypted(self.outbound_conn.file.as_mut().unwrap().as_read(), buf)? as i64)
// }

/// this _read function is triggered by the Host to read from the remote connection
pub fn _read_from_outbound<D: Decoder>(
&mut self,
decoder: &mut D,
) -> Result<i64, anyhow::Error> {
debug!("[WASM] running in _read_from_net");

let mut buf = vec![0u8; 4096];
let bytes_read: i64 = match self.outbound_conn.read(&mut buf) {
Ok(n) => n,
Err(e) => {
// eprintln!("[WASM] > ERROR in _read when reading from outbound: {:?}", e);
// return -1; // Or another sentinel value to indicate error}
return Err(anyhow::anyhow!(
"[WASM] > ERROR in _read when reading from outbound: {:?}",
e
));
}
};

// NOTE: decode logic here
let mut decoded = vec![0u8; 4096];
let len_after_decoding = match decoder.decode(&buf[..bytes_read as usize], &mut decoded) {
Ok(n) => n,
Err(e) => {
// eprintln!("[WASM] > ERROR in _write when encoding: {:?}", e);
// return -1; // Or another sentinel value to indicate error
return Err(anyhow::anyhow!(
"[WASM] > ERROR in _write when encoding: {:?}",
e
));
}
};

match self
.inbound_conn
.write(decoded[..len_after_decoding as usize].as_ref())
{
Ok(_) => {}
Err(e) => {
// eprintln!("[WASM] > ERROR in _read when writing to inbound: {:?}", e);
// return -1; // Or another sentinel value to indicate error
return Err(anyhow::anyhow!(
"[WASM] > ERROR in _read when writing to inbound: {:?}",
e
));
}
}

Ok(len_after_decoding as i64)
}

pub fn _write_2_outbound<E: Encoder>(
&mut self,
encoder: &mut E,
bytes_write: i64,
) -> Result<i64, anyhow::Error> {
debug!("[WASM] running in _write_2_net");

let mut bytes_read: i64 = 0;
let mut buf = vec![0u8; 4096];
loop {
let read = match self.inbound_conn.read(&mut buf) {
Ok(n) => n,
Err(e) => {
// eprintln!("[WASM] > ERROR in _read when reading from inbound: {:?}", e);
// return -1; // Or another sentinel value to indicate error
return Err(anyhow::anyhow!(
"[WASM] > ERROR in _read when reading from inbound: {:?}",
e
));
}
};

bytes_read += read;

if read == 0 || bytes_read == bytes_write {
break;
}
}

// NOTE: encode logic here
let mut encoded = vec![0u8; 4096];
let len_after_encoding = match encoder.encode(&buf[..bytes_read as usize], &mut encoded) {
Ok(n) => n,
Err(e) => {
// eprintln!("[WASM] > ERROR in _write when encoding: {:?}", e);
// return -1; // Or another sentinel value to indicate error
return Err(anyhow::anyhow!(
"[WASM] > ERROR in _write when encoding: {:?}",
e
));
}
};

match self
.outbound_conn
.write(encoded[..len_after_encoding as usize].as_ref())
{
Ok(_) => {}
Err(e) => {
// eprintln!("[WASM] > ERROR in _read when writing to outbound: {:?}", e);
// return -1; // Or another sentinel value to indicate error
return Err(anyhow::anyhow!(
"[WASM] > ERROR in _read when writing to outbound: {:?}",
e
));
}
}

Ok(len_after_encoding as i64)
}

pub fn close_inbound(&mut self) {
match &mut self.inbound_conn.file {
Some(stream) => match stream {
ConnStream::TcpStream(stream) => {
stream.shutdown(std::net::Shutdown::Both).unwrap();
}
ConnStream::File(stream) => {
stream.sync_all().unwrap();
}
},
None => {
eprintln!("[WASM] > ERROR: ConnFile's file is None");
impl<'a> Write for ConnStream<'a> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match self {
ConnStream::TcpStream(stream) => stream.write(buf),
ConnStream::File(stream) => stream.write(buf),
ConnStream::FileRef(stream) => stream.write(buf),
_ => {
return Err(io::Error::new(
io::ErrorKind::Other,
"ConnStream is uninitialized",
))
}
}

self.inbound_conn.fd = -1;
}

pub fn close_outbound(&mut self) {
match &mut self.outbound_conn.file {
Some(stream) => match stream {
ConnStream::TcpStream(stream) => {
stream.shutdown(std::net::Shutdown::Both).unwrap();
}
ConnStream::File(stream) => {
stream.sync_all().unwrap();
}
},
None => {
eprintln!("[WASM] > ERROR: ConnFile's file is None");
fn flush(&mut self) -> io::Result<()> {
match self {
ConnStream::TcpStream(stream) => stream.flush(),
ConnStream::File(stream) => stream.flush(),
ConnStream::FileRef(stream) => stream.flush(),
_ => {
return Err(io::Error::new(
io::ErrorKind::Other,
"ConnStream is uninitialized",
))
}
}

self.outbound_conn.fd = -1;
}
}
Loading
Loading