From efa32af7a7b2914e26cc030b76f8cb18c99c0995 Mon Sep 17 00:00:00 2001 From: zy Date: Thu, 9 Dec 2021 12:24:59 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 13 ++ Cargo.toml | 1 + fuso-api/Cargo.toml | 3 +- fuso-api/src/buffer.rs | 10 ++ fuso-api/src/core.rs | 59 +++++---- fuso-api/src/error.rs | 8 ++ fuso-api/src/lib.rs | 49 ++++++- fuso-api/src/stream.rs | 3 +- fuso-api/src/udp.rs | 117 ++++------------- fuso-core/src/bridge.rs | 5 + fuso-core/src/ciphe.rs | 41 +++--- fuso-core/src/client.rs | 201 ++++++++++++++-------------- fuso-core/src/core.rs | 84 ++++++++---- fuso-core/src/lib.rs | 8 +- fuso-core/src/packet.rs | 28 ++-- fuso-core/src/retain.rs | 49 ++++--- fuso-core/src/server.rs | 282 ---------------------------------------- fuso-core/src/udp.rs | 0 src/client.rs | 14 +- src/no-log-client.rs | 24 +++- src/server.rs | 18 +-- 21 files changed, 414 insertions(+), 603 deletions(-) delete mode 100644 fuso-core/src/server.rs create mode 100644 fuso-core/src/udp.rs diff --git a/Cargo.lock b/Cargo.lock index 87ff772..e942116 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -260,6 +260,7 @@ dependencies = [ name = "fuso" version = "0.1.0" dependencies = [ + "bytes", "clap", "env_logger", "fuso-core", @@ -277,6 +278,8 @@ dependencies = [ "env_logger", "futures", "log", + "num_cpus", + "once_cell", "smol", ] @@ -491,6 +494,16 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" +[[package]] +name = "num_cpus" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "once_cell" version = "1.8.0" diff --git a/Cargo.toml b/Cargo.toml index 54ab0f0..4e37b97 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ path = "src/no-log-client.rs" log = {version = "0.4.14"} clap = {version = "3.0.0-beta.5", features = ["yaml"]} smol = {version = "1.2.5"} +bytes = {version = "1.1.0"} env_logger = "0.9.0" fuso-core = {path = "./fuso-core"} fuso-socks = {path ="./fuso-socks"} \ No newline at end of file diff --git a/fuso-api/Cargo.toml b/fuso-api/Cargo.toml index f62b0f0..8a44991 100644 --- a/fuso-api/Cargo.toml +++ b/fuso-api/Cargo.toml @@ -15,4 +15,5 @@ smol = {version = "1.2.5"} bytes = {version = "1.1.0"} futures = "0.3" async-trait = {version = "0.1.51"} - +once_cell = "1.2.0" +num_cpus = "1.0" \ No newline at end of file diff --git a/fuso-api/src/buffer.rs b/fuso-api/src/buffer.rs index ff59b2b..c7a23f3 100644 --- a/fuso-api/src/buffer.rs +++ b/fuso-api/src/buffer.rs @@ -27,24 +27,29 @@ where } } + #[inline] pub fn is_empty(&self) -> bool { self.buf.lock().unwrap().is_empty() } + #[inline] pub fn len(&self) -> usize { self.len } + #[inline] pub fn clear(&mut self) { self.buf.lock().unwrap().clear(); self.len = 0; } + #[inline] pub fn push_back(&mut self, data: &[T]) { self.buf.lock().unwrap().push_back(data.to_vec()); self.len += data.len(); } + #[inline] pub fn push_front(&mut self, data: &[T]) { self.buf.lock().unwrap().push_front(data.to_vec()); self.len += data.len(); @@ -52,6 +57,7 @@ where } impl Buffer { + #[inline] pub fn read_to_buffer(&mut self, buf: &mut [u8]) -> std::io::Result { let mut remaining = buf.len(); let mut read_len = 0; @@ -98,6 +104,7 @@ impl Buffer { #[async_trait] impl AsyncRead for Buffer { + #[inline] fn poll_read( mut self: std::pin::Pin<&mut Self>, _: &mut std::task::Context<'_>, @@ -109,6 +116,7 @@ impl AsyncRead for Buffer { #[async_trait] impl AsyncWrite for Buffer { + #[inline] fn poll_write( mut self: std::pin::Pin<&mut Self>, _: &mut std::task::Context<'_>, @@ -118,6 +126,7 @@ impl AsyncWrite for Buffer { Poll::Ready(Ok(buf.len())) } + #[inline] fn poll_flush( self: std::pin::Pin<&mut Self>, _: &mut std::task::Context<'_>, @@ -125,6 +134,7 @@ impl AsyncWrite for Buffer { Poll::Ready(Ok(())) } + #[inline] fn poll_close( mut self: std::pin::Pin<&mut Self>, _: &mut std::task::Context<'_>, diff --git a/fuso-api/src/core.rs b/fuso-api/src/core.rs index 0ded837..1850372 100644 --- a/fuso-api/src/core.rs +++ b/fuso-api/src/core.rs @@ -12,6 +12,7 @@ use futures::{AsyncReadExt, Future}; use smol::{ io::{AsyncRead, AsyncWrite, AsyncWriteExt}, net::{TcpListener, TcpStream}, + Executor, }; use crate::{ @@ -88,7 +89,7 @@ pub trait AsyncTcpSocketEx { #[derive(Debug, Clone)] pub struct Rollback { - target: Arc>, + target: T, rollback: Arc>, store: Arc>, } @@ -284,7 +285,24 @@ where { #[inline] fn detach(self) { - smol::spawn(self).detach(); + static GLOBAL: once_cell::sync::Lazy> = once_cell::sync::Lazy::new(|| { + for n in 1..num_cpus::get() { + log::trace!("spwan executor thread fuso-{}", n); + std::thread::Builder::new() + .name(format!("fuso-{}", n)) + .spawn(|| loop { + std::panic::catch_unwind(|| { + smol::block_on(GLOBAL.run(smol::future::pending::<()>())) + }) + .ok(); + }) + .expect("cannot spawn executor thread"); + } + + Executor::new() + }); + + GLOBAL.spawn(self).detach(); } } @@ -298,10 +316,10 @@ where async fn forward(self, to: To) -> Result<()> { let (reader_s, writer_s) = self.split(); let (reader_t, writer_t) = to.split(); - + smol::future::race( - smol::io::copy(reader_s, writer_t), smol::io::copy(reader_t, writer_s), + smol::io::copy(reader_s, writer_t), ) .await .map_err(|e| error::Error::with_io(e))?; @@ -327,9 +345,10 @@ impl RollbackEx> for T where T: AsyncRead + AsyncWrite + Send + Sync + 'static, { + #[inline] fn roll(self) -> Rollback> { Rollback { - target: Arc::new(Mutex::new(self)), + target: self, rollback: Arc::new(RwLock::new(false)), store: Arc::new(Mutex::new(Buffer::new())), } @@ -369,31 +388,31 @@ impl Rollback> { impl Rollback> { #[inline] pub fn local_addr(&self) -> std::io::Result { - self.target.lock().unwrap().local_addr() + self.target.local_addr() } #[inline] pub fn peer_addr(&self) -> std::io::Result { - self.target.lock().unwrap().peer_addr() + self.target.peer_addr() } } impl Rollback> { #[inline] pub fn local_addr(&self) -> std::io::Result { - self.target.lock().unwrap().local_addr() + self.target.local_addr() } #[inline] pub fn peer_addr(&self) -> std::io::Result { - self.target.lock().unwrap().peer_addr() + self.target.peer_addr() } } impl From>> for TcpStream { #[inline] fn from(roll: Rollback>) -> Self { - roll.target.lock().unwrap().clone() + roll.target } } @@ -404,7 +423,7 @@ where { #[inline] fn poll_read( - self: std::pin::Pin<&mut Self>, + mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &mut [u8], ) -> std::task::Poll> { @@ -423,8 +442,7 @@ where if read_len >= buf.len() { std::task::Poll::Ready(Ok(read_len)) } else { - let mut io = self.target.lock().unwrap(); - match Pin::new(&mut *io).poll_read(cx, &mut buf[read_len..])? { + match Pin::new(&mut self.target).poll_read(cx, &mut buf[read_len..])? { std::task::Poll::Pending => Poll::Pending, std::task::Poll::Ready(0) => Poll::Ready(Ok(read_len)), std::task::Poll::Ready(n) => { @@ -454,29 +472,26 @@ where { #[inline] fn poll_write( - self: std::pin::Pin<&mut Self>, + mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &[u8], ) -> std::task::Poll> { - let mut io = self.target.lock().unwrap(); - Pin::new(&mut *io).poll_write(cx, buf) + Pin::new(&mut self.target).poll_write(cx, buf) } #[inline] fn poll_flush( - self: std::pin::Pin<&mut Self>, + mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - let mut io = self.target.lock().unwrap(); - Pin::new(&mut *io).poll_flush(cx) + Pin::new(&mut self.target).poll_flush(cx) } #[inline] fn poll_close( - self: std::pin::Pin<&mut Self>, + mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - let mut io = self.target.lock().unwrap(); - Pin::new(&mut *io).poll_close(cx) + Pin::new(&mut self.target).poll_close(cx) } } diff --git a/fuso-api/src/error.rs b/fuso-api/src/error.rs index 24ec258..7802b77 100644 --- a/fuso-api/src/error.rs +++ b/fuso-api/src/error.rs @@ -33,12 +33,14 @@ impl Error { } impl Display for Error { + #[inline] fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { std::fmt::Debug::fmt(&self.repr, f) } } impl From for Error { + #[inline] fn from(error: std::io::Error) -> Self { Self { repr: Repr::IO(error), @@ -47,6 +49,7 @@ impl From for Error { } impl From for Error { + #[inline] fn from(kind: ErrorKind) -> Self { Self { repr: Repr::Fuso(kind), @@ -55,6 +58,7 @@ impl From for Error { } impl From for Error { + #[inline] fn from(kind: std::io::ErrorKind) -> Self { Self { repr: Repr::IO(kind.into()), @@ -63,6 +67,7 @@ impl From for Error { } impl From for Error { + #[inline] fn from(e: smol::channel::RecvError) -> Self { Self { repr: Repr::IO(std::io::Error::new( @@ -77,6 +82,7 @@ impl From> for Error where T: Into, { + #[inline] fn from(e: smol::channel::SendError) -> Self { Self { repr: Repr::IO(std::io::Error::new( @@ -88,6 +94,7 @@ where } impl From<&str> for Error { + #[inline] fn from(txt: &str) -> Self { Self { repr: Repr::Fuso(ErrorKind::Customer(txt.into())), @@ -96,6 +103,7 @@ impl From<&str> for Error { } impl From for Error { + #[inline] fn from(txt: String) -> Self { Self { repr: Repr::Fuso(ErrorKind::Customer(txt.into())), diff --git a/fuso-api/src/lib.rs b/fuso-api/src/lib.rs index c2e9846..2a19013 100644 --- a/fuso-api/src/lib.rs +++ b/fuso-api/src/lib.rs @@ -13,7 +13,11 @@ pub use async_trait::*; #[cfg(test)] mod tests { - use std::time::Duration; + use std::{ + sync::{Arc, Mutex}, + task::Poll, + time::Duration, + }; use futures::AsyncReadExt; use smol::{future::FutureExt, io::AsyncWriteExt, net::UdpSocket}; @@ -54,6 +58,49 @@ mod tests { log::debug!("data len: {}, {:?}", packet.get_len(), packet.get_data()); } + #[test] + fn test_waker() { + init_logger(); + + smol::block_on(async move { + let (sender, receiver) = std::sync::mpsc::channel(); + + let receiver = Arc::new(Mutex::new(receiver)); + + { + smol::spawn(async move { + loop { + let receiver = receiver.clone(); + let msg = smol::unblock(move || receiver.lock().unwrap().recv()).await; + + println!("msg = {}", msg.unwrap()) + } + }) + .detach(); + } + + println!("test"); + + let mut io = smol::Unblock::new(std::io::stdin()); + + loop { + let mut buf = Vec::new(); + buf.resize(1024, 0); + + match io.read(&mut buf).await { + Ok(n) => { + buf.truncate(n); + + sender.send(String::from_utf8(buf).unwrap()).unwrap(); + } + Err(e) => { + log::error!("{}", e); + } + } + } + }); + } + #[test] fn test_udp_stream() { init_logger(); diff --git a/fuso-api/src/stream.rs b/fuso-api/src/stream.rs index ad809a9..3404cea 100644 --- a/fuso-api/src/stream.rs +++ b/fuso-api/src/stream.rs @@ -67,10 +67,9 @@ where } #[inline] - pub async fn release(&self) -> crate::Result<()>{ + pub async fn release(&self) -> crate::Result<()> { self.inner.release().await } - } impl AsyncRead for SafeStream diff --git a/fuso-api/src/udp.rs b/fuso-api/src/udp.rs index 67d5084..2f46c2c 100644 --- a/fuso-api/src/udp.rs +++ b/fuso-api/src/udp.rs @@ -2,10 +2,7 @@ use std::{ collections::HashMap, net::SocketAddr, pin::Pin, - sync::{ - mpsc::{sync_channel, SyncSender}, - Arc, - }, + sync::Arc, task::{Poll, Waker}, }; @@ -23,6 +20,7 @@ use std::sync::Mutex; use crate::{Buffer, Spwan}; pub struct UdpListener { + addr: SocketAddr, accept_ax: Receiver, core: Arc>>>, } @@ -30,11 +28,9 @@ pub struct UdpListener { pub struct UdpStream { inner: Arc>, store: Arc>>, - sender: Option>>, closed: Arc>, core: Arc>>>, read_waker: Arc>>, - send_waker: Arc>>, addr: SocketAddr, } @@ -49,74 +45,20 @@ impl UdpStream { ) -> Self { let closed = Arc::new(Mutex::new(false)); let store = Arc::new(Mutex::new(Buffer::new())); - let (sender, send_receiver) = sync_channel(10); - - let send_receiver = Arc::new(Mutex::new(send_receiver)); - let (server_stream, addr, is_server) = server_stream + let (server_stream, addr, _) = server_stream .map(|(server, addr)| (Some(server), addr, true)) .unwrap_or((None, udp.local_addr().unwrap(), false)); let read_waker = Arc::new(Mutex::new(None)); - let send_waker = Arc::new(Mutex::new(None)); Self { inner: Arc::new(Mutex::new(udp.clone())), store: store.clone(), - sender: Some(sender), closed: closed.clone(), read_waker: read_waker.clone(), - send_waker: send_waker.clone(), addr: addr, core: { - let write_future = { - let closed = closed.clone(); - let writer = udp.clone(); - async move { - loop { - let receiver = send_receiver.clone(); - let send_waker = send_waker.clone(); - - let packet = smol::future::poll_fn(move |cx| { - receiver.lock().unwrap().try_recv().map_or_else( - |e| match e { - std::sync::mpsc::TryRecvError::Empty => { - *send_waker.lock().unwrap() = Some(cx.waker().clone()); - Poll::Pending - } - std::sync::mpsc::TryRecvError::Disconnected => Poll::Ready( - Err(std::sync::mpsc::TryRecvError::Disconnected), - ), - }, - |packet| Poll::Ready(Ok(packet)), - ) - }) - .await; - - if packet.is_err() { - *closed.lock().unwrap() = true; - break; - } - - let buf = packet.unwrap(); - - log::debug!("[udp] send total {}", buf.len()); - - for packet in buf.chunks(1350).into_iter() { - if let Err(e) = if is_server { - writer.send_to(&packet, addr).await - } else { - writer.send(&packet).await - } { - log::debug!("udp error {}", e); - *closed.lock().unwrap() = true; - return; - }; - } - } - } - }; - let read_future = async move { loop { let packet = if let Some(receiver) = server_stream.as_ref() { @@ -136,6 +78,9 @@ impl UdpStream { break; } + let n = packet.unwrap(); + buf.truncate(n); + buf }; @@ -148,9 +93,7 @@ impl UdpStream { } }; - Arc::new(Mutex::new(Some(smol::spawn( - read_future.race(write_future), - )))) + Arc::new(Mutex::new(Some(smol::spawn(read_future)))) }, } } @@ -164,6 +107,8 @@ impl UdpListener { let sessions: Arc>>>> = Arc::new(smol::lock::Mutex::new(HashMap::new())); + let addr = udp.local_addr().unwrap(); + let core_future = async move { loop { let mut buf = Vec::new(); @@ -201,11 +146,17 @@ impl UdpListener { }; Ok(Self { + addr, accept_ax, core: Arc::new(Mutex::new(Some(smol::spawn(core_future)))), }) } + #[inline] + pub fn local_addr(&self) -> SocketAddr { + self.addr + } + #[inline] pub async fn accept(&self) -> std::io::Result { self.accept_ax @@ -215,6 +166,7 @@ impl UdpListener { } } + impl Drop for UdpListener { #[inline] fn drop(&mut self) { @@ -273,7 +225,7 @@ impl AsyncRead for UdpStream { impl AsyncWrite for UdpStream { fn poll_write( self: Pin<&mut Self>, - _: &mut std::task::Context<'_>, + cx: &mut std::task::Context<'_>, buf: &[u8], ) -> Poll> { if self.closed.lock().unwrap().eq(&true) { @@ -282,28 +234,9 @@ impl AsyncWrite for UdpStream { "Connection has been closed", ))) } else { - let len = buf.len(); - - if let Some(sender) = self.sender.as_ref() { - if let Err(_) = sender.send(buf.to_vec()) { - *self.closed.lock().unwrap() = false; - Poll::Ready(Err(std::io::Error::new( - std::io::ErrorKind::ConnectionReset, - "Connection has been closed", - ))) - } else { - if let Some(waker) = self.send_waker.lock().unwrap().take() { - waker.wake(); - } - - Poll::Ready(Ok(len)) - } - } else { - Poll::Ready(Err(std::io::Error::new( - std::io::ErrorKind::ConnectionReset, - "Connection has been closed", - ))) - } + let udp = self.inner.clone(); + let addr = self.addr.clone(); + Box::pin(async move { udp.lock().unwrap().send_to(buf, addr).await }).poll(cx) } } @@ -317,15 +250,11 @@ impl AsyncWrite for UdpStream { #[inline] fn poll_close( - mut self: Pin<&mut Self>, - _: &mut std::task::Context<'_>, + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, ) -> Poll> { if let Some(core) = self.core.lock().unwrap().take() { - core.cancel().detach() - } - - if let Some(sender) = self.sender.take() { - drop(sender) + let _ = Box::pin(core.cancel()).poll(cx); } *self.closed.lock().unwrap() = true; diff --git a/fuso-core/src/bridge.rs b/fuso-core/src/bridge.rs index 9596ed5..0921242 100644 --- a/fuso-core/src/bridge.rs +++ b/fuso-core/src/bridge.rs @@ -16,6 +16,7 @@ pub struct Bridge { } impl Bridge { + #[inline] pub async fn bind(bind_addr: SocketAddr, server_addr: SocketAddr) -> Result { let listen = bind_addr.tcp_listen().await?; @@ -55,6 +56,7 @@ impl Bridge { } } .detach(); + Ok(accept_tx) }) .await; @@ -68,6 +70,7 @@ impl Bridge { } impl Drop for Bridge { + #[inline] fn drop(&mut self) { if let Some(task) = self.task.lock().unwrap().take() { async move { @@ -81,6 +84,7 @@ impl Drop for Bridge { #[async_trait] impl FusoListener<(TcpStream, TcpStream)> for Bridge { + #[inline] async fn accept(&mut self) -> Result<(TcpStream, TcpStream)> { Ok(self.accept_ax.recv().await.map_err(|e| { Error::with_io(std::io::Error::new( @@ -90,6 +94,7 @@ impl FusoListener<(TcpStream, TcpStream)> for Bridge { })?) } + #[inline] async fn close(&mut self) -> Result<()> { let _ = self.accept_ax.clone(); Ok(()) diff --git a/fuso-core/src/ciphe.rs b/fuso-core/src/ciphe.rs index 11eeee8..3e8b8d8 100644 --- a/fuso-core/src/ciphe.rs +++ b/fuso-core/src/ciphe.rs @@ -34,7 +34,7 @@ pub trait Cipher { #[derive(Clone)] pub struct Crypt { buf: Arc>>, - target: Arc>, + target: T, cipher: Arc>, } @@ -45,21 +45,21 @@ pub struct Xor { impl Crypt { pub fn local_addr(&self) -> std::io::Result { - self.target.lock().unwrap().local_addr() + self.target.local_addr() } pub fn peer_addr(&self) -> std::io::Result { - self.target.lock().unwrap().peer_addr() + self.target.peer_addr() } } impl Crypt, C> { pub fn local_addr(&self) -> std::io::Result { - self.target.lock().unwrap().local_addr() + self.target.local_addr() } pub fn peer_addr(&self) -> std::io::Result { - self.target.lock().unwrap().peer_addr() + self.target.peer_addr() } } @@ -72,8 +72,8 @@ where #[inline] async fn ciphe(self, c: C) -> Crypt { Crypt { + target: self, buf: Arc::new(Mutex::new(Buffer::new())), - target: Arc::new(Mutex::new(self)), cipher: Arc::new(Mutex::new(c)), } } @@ -86,18 +86,19 @@ where C: Cipher + Unpin + Send + Sync + 'static, { fn poll_read( - self: std::pin::Pin<&mut Self>, + mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &mut [u8], ) -> std::task::Poll> { - let mut io_buf = self.buf.lock().unwrap(); + let io_buf = self.buf.clone(); + + let mut io_buf = io_buf.lock().unwrap(); if !io_buf.is_empty() { + log::info!("read buffer"); Pin::new(&mut *io_buf).poll_read(cx, buf) } else { - let mut io = self.target.lock().unwrap(); - - match Pin::new(&mut *io).poll_read(cx, buf) { + match Pin::new(&mut self.target).poll_read(cx, buf) { Poll::Pending => Poll::Pending, Poll::Ready(Err(e)) => Poll::Ready(Err(e)), Poll::Ready(Ok(0)) => Poll::Ready(Ok(0)), @@ -137,18 +138,18 @@ where { #[inline] fn poll_write( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &[u8], ) -> std::task::Poll> { - let mut cipher = self.cipher.lock().unwrap(); + let cipher = self.cipher.clone(); + let mut cipher = cipher.lock().unwrap(); match Pin::new(&mut *cipher).poll_encode(cx, buf) { Poll::Pending => Poll::Pending, Poll::Ready(Err(e)) => Poll::Ready(Err(e)), Poll::Ready(Ok(data)) => { - let mut io = self.target.lock().unwrap(); - let _ = Pin::new(&mut *io).poll_write(cx, &data)?; + let _ = Pin::new(&mut self.target).poll_write(cx, &data)?; Poll::Ready(Ok(buf.len())) } } @@ -156,20 +157,18 @@ where #[inline] fn poll_flush( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - let mut io = self.target.lock().unwrap(); - Pin::new(&mut *io).poll_flush(cx) + Pin::new(&mut self.target).poll_flush(cx) } #[inline] fn poll_close( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - let mut io = self.target.lock().unwrap(); - Pin::new(&mut *io).poll_close(cx) + Pin::new(&mut self.target).poll_close(cx) } } diff --git a/fuso-core/src/client.rs b/fuso-core/src/client.rs index b38243e..349da81 100644 --- a/fuso-core/src/client.rs +++ b/fuso-core/src/client.rs @@ -1,15 +1,12 @@ -use std::{ - net::{IpAddr, Ipv4Addr, SocketAddr}, - sync::Arc, -}; +use std::{net::SocketAddr, sync::Arc}; use fuso_api::{ async_trait, AsyncTcpSocketEx, Error, Forward, FusoListener, FusoPacket, Result, Spwan, }; -use futures::AsyncWriteExt; use smol::{ - channel::{unbounded, Receiver}, + channel::{unbounded, Receiver, Sender}, + future::FutureExt, net::TcpStream, }; @@ -25,12 +22,14 @@ pub struct Reactor { conv: u64, action: Action, addr: SocketAddr, + config: Arc, } pub struct Fuso { accept_ax: Receiver, } +#[derive(Debug)] pub struct Config { // 名称, 可选的 pub name: Option, @@ -40,9 +39,69 @@ pub struct Config { pub server_bind_port: u16, // 桥接监听的地址 pub bridge_addr: Option, + // 转发地址 + pub forward_addr: String, } impl Fuso { + async fn run_core( + conv: u64, + config: Arc, + core: TcpStream, + accept_tx: Sender, + ) -> Result<()> { + let mut core = core.guard(5000).await?; + + loop { + let action: Action = core.recv().await?.try_into()?; + + match action { + Action::Ping => {} + action => { + let reactor = Reactor { + conv, + action, + addr: core.peer_addr().unwrap(), + config: config.clone(), + }; + + accept_tx.send(reactor).await.map_err(|e| { + let err: fuso_api::Error = e.to_string().into(); + err + })?; + } + } + } + } + + async fn run_bridge( + bridge_bind_addr: SocketAddr, + server_connect_addr: SocketAddr, + ) -> Result<()> { + let mut core = Bridge::bind(bridge_bind_addr, server_connect_addr).await?; + + log::info!("[bridge] Bridge service opened successfully"); + + loop { + match core.accept().await { + Ok((from, to)) => { + log::info!( + "Bridge to {} -> {}", + from.peer_addr().unwrap(), + to.peer_addr().unwrap() + ); + + from.forward(to).detach(); + } + Err(_) => { + break; + } + } + } + + Ok(()) + } + pub async fn bind(config: Config) -> Result { let cfg = Arc::new(config); let server_addr = cfg.server_addr.clone(); @@ -52,21 +111,13 @@ impl Fuso { let mut stream = server_addr.tcp_connect().await?; - stream - .send( - Action::Bind(name, { - if cfg.server_bind_port == 0 { - None - } else { - Some(SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), - server_bind_port, - )) - } - }) - .into(), - ) - .await?; + log::debug!("{}", server_bind_port); + + let bind_addr = server_bind_port + .ne(&0) + .then(|| SocketAddr::from(([0, 0, 0, 0], server_bind_port))); + + stream.send(Action::Bind(name, bind_addr).into()).await?; let action: Action = stream.recv().await?.try_into()?; @@ -75,75 +126,21 @@ impl Fuso { match action { Action::Accept(conv) => { log::debug!("Service binding is successful {}", conv); - let mut stream = stream.guard(5000).await?; - async move { - if bridge_addr.is_none() { - return; - } - - let bridge_addr = bridge_addr.unwrap(); - - match Bridge::bind(bridge_addr, server_addr).await { - Ok(mut bridge) => { - log::info!("[bridge] Bridge service opened successfully"); - loop { - match bridge.accept().await { - Ok((from, to)) => { - log::info!( - "Bridge to {} -> {}", - from.peer_addr().unwrap(), - to.peer_addr().unwrap() - ); - from.forward(to).detach(); - } - Err(_) => { - break; - } - } - } - } - Err(e) => { - log::warn!("[bridge] Bridge service failed to open {}", e); - } - }; - } - .detach(); + let server_addr = stream.peer_addr()?; async move { - loop { - match stream.recv().await { - Err(e) => { - log::warn!("[fuc] Server disconnect {}", e); - break; - } - Ok(packet) => { - let action: Result = packet.try_into(); - match action { - Ok(Action::Ping) => {} - Ok(action) => { - match accept_tx - .send(Reactor { - conv, - action, - addr: server_addr, - }) - .await - { - Err(_) => { - let _ = stream.close().await; - break; - } - _ => {} - }; - } - Err(e) => { - log::debug!("{}", e); - } - } + let _ = Self::run_core(conv, cfg, stream, accept_tx) + .race(async move { + if bridge_addr.is_none() { + smol::future::pending::<()>().await; + } else { + let _ = Self::run_bridge(bridge_addr.unwrap(), server_addr).await; } - } - } + + Ok(()) + }) + .await; } .detach(); } @@ -161,28 +158,36 @@ impl Fuso { impl Reactor { #[inline] pub async fn join(self) -> Result<(TcpStream, TcpStream)> { - let to = TcpStream::connect({ + let (id, addr) = { match self.action { - Action::Forward(Addr::Domain(domain, port)) => { - log::info!("connect {}:{}", domain, port); - format!("{}:{}", domain, port) + Action::Forward(id, Addr::Domain(domain, port)) => { + log::info!("id={}, addr={}:{}", id, domain, port); + + (id, format!("{}:{}", domain, port)) } - Action::Forward(Addr::Socket(addr)) => { - log::info!("connect {}", addr); - addr.to_string() + Action::Forward(id, Addr::Socket(addr)) if addr.port() == 0 => { + log::info!("id={}, addr={}", id, addr); + + (id, self.config.forward_addr.clone()) + } + Action::Forward(id, Addr::Socket(addr)) => { + log::info!("id={}, addr={}", id, addr); + + (id, addr.to_string()) } - _ => { - return Err("Unsupported operation".into()); + action => { + return Err(format!("Unsupported operation {:?}", action).into()); } } - }) - .await?; + }; + + let to = TcpStream::connect(addr).await?; let mut stream = TcpStream::connect(self.addr) .await .map_err(|e| Error::with_io(e))?; - stream.send(Action::Connect(self.conv).into()).await?; + stream.send(Action::Connect(self.conv, id).into()).await?; Ok((stream, to)) } diff --git a/fuso-core/src/core.rs b/fuso-core/src/core.rs index 11adc22..ad155d4 100644 --- a/fuso-core/src/core.rs +++ b/fuso-core/src/core.rs @@ -1,8 +1,4 @@ -use std::{ - collections::{HashMap, VecDeque}, - net::SocketAddr, - sync::Arc, -}; +use std::{collections::HashMap, net::SocketAddr, sync::Arc}; use async_trait::async_trait; use futures::{lock::Mutex, AsyncWriteExt, TryStreamExt}; @@ -16,7 +12,7 @@ use smol::{ #[allow(unused)] use crate::ciphe::{Security, Xor}; -use fuso_api::{AsyncTcpSocketEx, FusoPacket, Result, SafeStreamEx, Spwan}; +use fuso_api::{AsyncTcpSocketEx, FusoPacket, Result, SafeStream, SafeStreamEx, Spwan}; use crate::retain::Heartbeat; use crate::{dispatch::DynHandler, packet::Action}; @@ -44,11 +40,12 @@ pub struct FusoStream { pub struct Channel { pub conv: u64, + pub alloc_id: Arc>, pub name: String, pub core: HeartGuard>, pub config: Arc, pub strategys: Arc, Action>>>>>, - pub wait_queue: Arc>>>, + pub wait_queue: Arc>>>, } #[derive(Clone)] @@ -57,7 +54,7 @@ pub struct Context { pub config: Arc, pub handlers: Arc, ()>>>>>, pub strategys: Arc, Action>>>>>, - pub sessions: Arc>>>>, + pub sessions: Arc)>>>>, pub accept_ax: Sender>>, } @@ -209,23 +206,40 @@ where impl Channel { #[inline] - pub async fn try_wake(&self) -> Result> { - match self.wait_queue.lock().await.pop_front() { + pub async fn try_wake(&self, id: &u64) -> Result> { + match self.wait_queue.lock().await.remove(id) { Some(tcp) => Ok(tcp), None => Err("No task operation required".into()), } } #[inline] - pub async fn suspend(&self, tcp: fuso_api::SafeStream) -> Result<()> { - self.wait_queue.lock().await.push_back(tcp); + pub async fn suspend(&self, id: u64, tcp: SafeStream) -> Result<()> { + self.wait_queue.lock().await.insert(id, tcp); Ok(()) } + + #[inline] + pub async fn gen_id(&self) -> u64 { + let sessions = self.wait_queue.lock().await; + + loop { + let (conv, _) = self.alloc_id.lock().await.overflowing_add(1); + + if sessions.get(&conv).is_some() { + continue; + } + + *self.alloc_id.lock().await = conv; + + break conv; + } + } } impl Context { #[inline] - pub async fn fork(&self) -> (u64, Receiver>) { + pub async fn fork(&self) -> (u64, Receiver<(u64, SafeStream)>) { let (accept_tx, accept_ax) = unbounded(); let mut sessions = self.sessions.write().await; @@ -240,21 +254,26 @@ impl Context { None => break conv, _ => {} } - - *self.alloc_conv.lock().await = conv; }; + *self.alloc_conv.lock().await = conv; + sessions.insert(conv, accept_tx); (conv, accept_ax) } #[inline] - pub async fn route(&self, conv: u64, tcp: fuso_api::SafeStream) -> Result<()> { + pub async fn route( + &self, + conv: u64, + id: u64, + tcp: fuso_api::SafeStream, + ) -> Result<()> { let sessions = self.sessions.read().await; if let Some(accept_tx) = sessions.get(&conv) { - if let Err(e) = accept_tx.send(tcp).await { + if let Err(e) = accept_tx.send((id, tcp)).await { Err(e.to_string().into()) } else { Ok(()) @@ -285,10 +304,11 @@ impl Context { let channel = Arc::new(Channel { conv, core, + alloc_id: Arc::new(Mutex::new(0)), strategys, name: name.clone(), config: self.config.clone(), - wait_queue: Arc::new(Mutex::new(VecDeque::new())), + wait_queue: Arc::new(Mutex::new(HashMap::new())), }); log::info!( @@ -311,9 +331,9 @@ impl Context { break; } - let mut from = from.unwrap(); + let (id, mut from) = from.unwrap(); - let to = channel.try_wake().await; + let to = channel.try_wake(&id).await; if to.is_err() { let _ = from.close().await; @@ -368,7 +388,7 @@ impl Context { .incoming() .try_fold(channel, |channel, tcp| async move { { - log::debug!("connected {}", tcp.peer_addr().unwrap()); + log::info!("connected {}", tcp.peer_addr().unwrap()); let channel = channel.clone(); async move { let mut tcp = tcp.as_safe_stream(); @@ -393,15 +413,23 @@ impl Context { log::debug!("action {:?}", action); match action { - Action::Nothing => { - let _ = tcp.close().await; + Action::Forward(id, addr) => { + let id = { + if id.eq(&0) { + channel.gen_id().await + } else { + id + } + }; + + let _ = channel.suspend(id, tcp).await; + + // 通知客户端需执行的方法 + let _ = + core.send(Action::Forward(id, addr).into()).await; } _ => { - // 通知客户端需执行的方法 - let _ = core.send(action.into()).await; - // 暂时休眠当前这个连接, 该连接可能会超时, - // 并且连接数达到一定数量时可能导致连接积累过多导致无法在建立连接,也就是fd用尽 - let _ = channel.suspend(tcp).await; + log::warn!("{:?}", action) } } } diff --git a/fuso-core/src/lib.rs b/fuso-core/src/lib.rs index 2b32ec5..da3efbe 100644 --- a/fuso-core/src/lib.rs +++ b/fuso-core/src/lib.rs @@ -7,7 +7,7 @@ pub mod dispatch; pub mod handler; pub mod packet; pub mod retain; -pub mod server; +pub mod udp; use std::sync::Arc; @@ -114,8 +114,8 @@ mod tests { ); Ok(State::Accept(())) } - Action::Connect(conv) => { - cx.route(conv, tcp.into()).await?; + Action::Connect(conv, id) => { + cx.route(conv, id, tcp.into()).await?; Ok(State::Accept(())) } _ => Ok(State::Next), @@ -137,7 +137,7 @@ mod tests { loop { match fuso.accept().await { Ok(fuso) => { - log::debug!("mut .... "); + log::debug!(".."); } Err(_) => {} } diff --git a/fuso-core/src/packet.rs b/fuso-core/src/packet.rs index 7ef945d..f4fd9cc 100644 --- a/fuso-core/src/packet.rs +++ b/fuso-core/src/packet.rs @@ -26,8 +26,10 @@ pub enum Action { Bind(Option, Option), Reset(Addr), Accept(u64), - Forward(Addr), - Connect(u64), + // id + Forward(u64, Addr), + // conv, id + Connect(u64, u64), Err(String), Nothing, } @@ -153,11 +155,13 @@ impl TryFrom for Action { Ok(Action::Accept(packet.get_mut_data().get_u64())) } CMD_RESET => Ok(Action::Reset(packet.get_data().as_ref().try_into()?)), - CMD_CONNECT if packet.get_len().ge(&8) => { - Ok(Action::Connect(packet.get_mut_data().get_u64())) + CMD_CONNECT if packet.get_len().ge(&16) => { + let data = packet.get_mut_data(); + Ok(Action::Connect(data.get_u64(), data.get_u64())) } - CMD_FORWARD if packet.get_len().ge(&0) => { - Ok(Action::Forward(packet.get_data().as_ref().try_into()?)) + CMD_FORWARD if packet.get_len().ge(&8) => { + let data = packet.get_mut_data(); + Ok(Action::Forward(data.get_u64(), data.as_ref().try_into()?)) } CMD_ERROR => Ok(Action::Err( String::from_utf8_lossy(packet.get_data()).into(), @@ -199,12 +203,18 @@ impl From for fuso_api::Packet { buf.put_u64(conv); buf.into() }), - Action::Connect(conv) => Packet::new(CMD_CONNECT, { + Action::Connect(conv, id) => Packet::new(CMD_CONNECT, { let mut buf = BytesMut::new(); - buf.put_u64(conv); + buf.put_u64(conv); // conv + buf.put_u64(id); // id + buf.into() + }), + Action::Forward(id, addr) => Packet::new(CMD_FORWARD, { + let mut buf = BytesMut::new(); + buf.put_u64(id); + buf.put_slice(&addr.to_bytes()); buf.into() }), - Action::Forward(addr) => Packet::new(CMD_FORWARD, addr.to_bytes()), Action::Err(e) => Packet::new(CMD_ERROR, e.into()), Action::Nothing => Packet::new(CMD_RESET, Bytes::new()), } diff --git a/fuso-core/src/retain.rs b/fuso-core/src/retain.rs index 17ab00a..eff0123 100644 --- a/fuso-core/src/retain.rs +++ b/fuso-core/src/retain.rs @@ -1,20 +1,21 @@ use async_trait::async_trait; -use bytes::Bytes; -use fuso_api::{now_mills, FusoPacket, Packet, Spwan}; +use fuso_api::{now_mills, FusoPacket, Spwan}; use futures::{AsyncRead, AsyncWrite, AsyncWriteExt}; -use smol::Task; -use std::io::Result; +use smol::{net::TcpStream, Task}; +use std::{io::Result, net::SocketAddr}; use std::{ ops::Sub, pin::Pin, - sync::{Arc, Mutex, RwLock}, + sync::{Arc, RwLock}, task::Poll, time::Duration, }; +use crate::packet::Action; + #[derive(Clone)] pub struct HeartGuard { - target: Arc>, + target: T, last: Arc>, guard: Arc>>>, } @@ -33,7 +34,7 @@ where Self { last: last.clone(), - target: Arc::new(Mutex::new(target.clone())), + target: target.clone(), guard: Arc::new(std::sync::Mutex::new(Some(smol::spawn({ let mut io = target.clone(); @@ -55,7 +56,7 @@ where smol::Timer::after(Duration::from_millis(interval)).await; - if let Err(_) = io.send(Packet::new(0x10, Bytes::new())).await { + if let Err(_) = io.send(Action::Ping.into()).await { let _ = io.close().await; break; } @@ -68,6 +69,7 @@ where } impl Drop for HeartGuard { + #[inline] fn drop(&mut self) { if let Some(guard) = self.guard.lock().unwrap().take() { async move { @@ -96,13 +98,11 @@ where { #[inline] fn poll_read( - self: std::pin::Pin<&mut Self>, + mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &mut [u8], ) -> std::task::Poll> { - let mut io = self.target.lock().unwrap(); - - match Pin::new(&mut *io).poll_read(cx, buf) { + match Pin::new(&mut self.target).poll_read(cx, buf) { std::task::Poll::Ready(result) => { if result.is_ok() { *self.last.write().unwrap() = now_mills(); @@ -121,29 +121,36 @@ where { #[inline] fn poll_write( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &[u8], ) -> std::task::Poll> { - let mut io = self.target.lock().unwrap(); - Pin::new(&mut *io).poll_write(cx, buf) + Pin::new(&mut self.target).poll_write(cx, buf) } #[inline] fn poll_flush( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - let mut io = self.target.lock().unwrap(); - Pin::new(&mut *io).poll_flush(cx) + Pin::new(&mut self.target).poll_flush(cx) } #[inline] fn poll_close( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - let mut io = self.target.lock().unwrap(); - Pin::new(&mut *io).poll_close(cx) + Pin::new(&mut self.target).poll_close(cx) + } +} + +impl HeartGuard { + pub fn lock_addr(&self) -> std::io::Result { + self.target.local_addr() + } + + pub fn peer_addr(&self) -> std::io::Result { + self.target.peer_addr() } } diff --git a/fuso-core/src/server.rs b/fuso-core/src/server.rs deleted file mode 100644 index 7f40137..0000000 --- a/fuso-core/src/server.rs +++ /dev/null @@ -1,282 +0,0 @@ -use std::{collections::VecDeque, net::SocketAddr, sync::Arc}; - -use bytes::Bytes; -use fuso_api::{async_trait, Error, ErrorKind, FusoPacket, Packet, Result, Spwan}; - -use smol::{ - channel::{unbounded, Receiver, Sender}, - future::FutureExt, - io::AsyncWriteExt, - lock::Mutex, - net::{TcpListener, TcpStream}, -}; - -use crate::{cmd::{CMD_BIND, CMD_CREATE}, retain::{HeartGuard, Heartbeat}, split_mutex}; - -struct Chief { - io: IO, -} - -pub struct Fuso { - visit_addr: SocketAddr, - bind_addr: SocketAddr, - accept_ax: Receiver<(TcpStream, TcpStream)>, -} - -impl Fuso { - #[inline] - async fn handler_at_client(tcp: TcpListener, accept_tx: Sender) -> Result<()> { - log::info!("[fus] Actual access address {}", tcp.local_addr().unwrap()); - - loop { - let stream = tcp.accept().await; - - if stream.is_err() { - break Err(stream.unwrap_err().into()); - } - - let (stream, addr) = stream.unwrap(); - - log::debug!("[fus] Visitor connections from {}", addr); - - if let Err(e) = accept_tx.send(stream).await { - break Err(Error::with_io(std::io::Error::new( - std::io::ErrorKind::Other, - e.to_string(), - ))); - }; - } - } - - #[inline] - async fn handler_fuso_at_client( - tcp: TcpListener, - (accept_tx, accept_ax): (Sender<(TcpStream, TcpStream)>, Receiver), - ) -> Result<()> { - log::info!("[fus] Server started! {}", tcp.local_addr().unwrap()); - log::info!("[fus] Waiting for client connection"); - - let mutex_sync: Arc>>> = Arc::new(Mutex::new(None)); - - smol::future::race( - { - let mutex_sync = mutex_sync.clone(); - async move { - loop { - let stream = accept_ax.recv().await; - - if stream.is_err() { - break Err(std::io::Error::new( - std::io::ErrorKind::Other, - stream.unwrap_err().to_string(), - ) - .into()); - } - - let mut stream = stream.unwrap(); - - let mut mutex_sync = mutex_sync.lock().await; - if let Some(accept_ax) = mutex_sync.as_mut() { - if let Err(_) = accept_ax.send(stream.clone()).await { - accept_ax.close(); - let _ = mutex_sync.take(); - let _ = stream.close().await; - } - } else { - log::debug!("[fus] Client is not ready"); - let _ = stream.close().await; - } - } - } - }, - async move { - loop { - let stream = tcp.accept().await; - - if stream.is_err() { - break Err(stream.unwrap_err().into()); - } - - let (stream, addr) = stream.unwrap(); - - log::debug!("[fus] Client connection {}", addr); - - let chief = Chief::auth(stream).await; - - if chief.is_err() { - log::warn!("[fus] Client authentication failed"); - } else { - let (accept_ax, accept_fuso_tx) = unbounded(); - - *mutex_sync.lock().await = Some(accept_ax.clone()); - - chief - .unwrap() - .join(tcp.clone(), accept_fuso_tx, accept_tx.clone()) - .await; - } - } - }, - ) - .await - } - - pub async fn bind( - (bind_at_client_addr, bind_at_fuso_client_addr): (SocketAddr, SocketAddr), - ) -> Result { - let (at_tcp, at_fuso_tcp) = ( - TcpListener::bind(bind_at_client_addr) - .await - .map_err(|e| Error::with_io(e))?, - TcpListener::bind(bind_at_fuso_client_addr) - .await - .map_err(|e| Error::with_io(e))?, - ); - - let visit_addr = at_tcp.local_addr().unwrap(); - let bind_addr = at_fuso_tcp.local_addr().unwrap(); - - let (accept_tx, accept_ax) = unbounded(); - let (accept_fuso_tx, accept_fuso_ax) = unbounded(); - - smol::spawn(smol::future::race( - Self::handler_at_client(at_tcp, accept_fuso_tx.clone()), - Self::handler_fuso_at_client(at_fuso_tcp, (accept_tx.clone(), accept_fuso_ax)), - )) - .detach(); - - Ok(Self { - accept_ax, - visit_addr, - bind_addr, - }) - } - - pub fn visit_addr(&self) -> SocketAddr { - self.visit_addr - } - - pub fn bind_addr(&self) -> SocketAddr { - self.bind_addr - } -} - -impl Chief> { - #[inline] - pub async fn auth(mut stream: TcpStream) -> Result>> { - let packet = stream.recv().await?; - - if packet.get_cmd() != CMD_BIND { - return Err(ErrorKind::BadPacket.into()); - } - - let guard = stream.guard(5000).await?; - - Ok(Self { io: guard }) - } - - pub async fn join( - self, - tcp: TcpListener, - accept_tx: Receiver, - accept_ax: Sender<(TcpStream, TcpStream)>, - ) { - let (produce_que, consume_que) = split_mutex(VecDeque::new()); - - let client_future = { - let mut io = self.io.clone(); - async move { - loop { - match io.recv().await { - Ok(_) => { - // log::debug!("recv {:?}", packet); - } - Err(_) => { - log::warn!("[fus] Client disconnect"); - break; - } - } - } - } - }; - - let visit_future = { - let mut io = self.io.clone(); - async move { - let packet = Packet::new(CMD_CREATE, Bytes::new()); - loop { - if let Ok(mut stream) = accept_tx.recv().await { - if let Ok(()) = io.send(packet.clone()).await { - produce_que.lock().await.push_back(stream); - } else { - log::warn!("[fus] No response from client"); - - let _ = stream.close().await; - let _ = io.close().await; - }; - } - } - } - }; - - let fuso_future = async move { - loop { - let stream = tcp.accept().await; - - if stream.is_err() { - log::debug!("[fus] Server error {}", stream.unwrap_err()); - break; - } - - let (mut stream, addr) = stream.unwrap(); - - log::debug!("[fus] Client mapping connection received {}", addr); - - let consume_que = consume_que.clone(); - let accept_ax = accept_ax.clone(); - - let future = async move { - let packet = stream.recv().await; - - if packet.is_err() { - log::warn!("[fus] Client error {}", packet.unwrap_err()); - } else { - let from = consume_que.lock().await.pop_front(); - - if from.is_none() { - let _ = stream.close().await; - } else { - if let Err(e) = accept_ax.send((from.unwrap(), stream.clone())).await { - log::warn!("[fus] Server exception {}", e); - let _ = stream.close().await; - } - } - } - }; - - future.detach(); - } - }; - - client_future.race(visit_future.race(fuso_future)).await; - } -} - -#[async_trait] -impl fuso_api::FusoListener<(TcpStream, TcpStream)> for Fuso { - #[inline] - async fn accept(&mut self) -> Result<(TcpStream, TcpStream)> { - Ok(self.accept_ax.recv().await.map_err(|e| { - Error::with_io(std::io::Error::new( - std::io::ErrorKind::Other, - e.to_string(), - )) - })?) - } - - #[inline] - async fn close(&mut self) -> Result<()> { - self.accept_ax.close(); - Ok(()) - } -} diff --git a/fuso-core/src/udp.rs b/fuso-core/src/udp.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/client.rs b/src/client.rs index 5d421b3..9163726 100644 --- a/src/client.rs +++ b/src/client.rs @@ -27,8 +27,8 @@ where async move { match reactor.join().await { Ok((from, to)) => { - let stream = from.ciphe(ciphe).await; - if let Err(e) = stream.forward(to).await { + let from = from.ciphe(ciphe).await; + if let Err(e) = from.forward(to).await { log::debug!("[fuc] Forwarding failed {}", e); } } @@ -42,6 +42,7 @@ where } } + fn main() { let app = App::new("fuso") .version("v1.0.3") @@ -148,10 +149,10 @@ fn main() { matches.value_of("server-port").unwrap(), ); - let forward_addr = parse_addr( - matches.value_of("forward-host").unwrap(), - matches.value_of("forward-port").unwrap(), - ); + let forward_host = matches.value_of("forward-host").unwrap(); + let forward_port = matches.value_of("forward-port").unwrap(); + + let forward_addr = parse_addr(forward_host, forward_port); let name = matches .value_of("name") @@ -240,6 +241,7 @@ fn main() { server_addr, server_bind_port: service_bind_port, bridge_addr: bridge_addr, + forward_addr: format!("{}:{}", forward_host, forward_port), }) .await { diff --git a/src/no-log-client.rs b/src/no-log-client.rs index 66c2381..b8cca20 100644 --- a/src/no-log-client.rs +++ b/src/no-log-client.rs @@ -27,8 +27,8 @@ where async move { match reactor.join().await { Ok((from, to)) => { - let stream = from.ciphe(ciphe).await; - if let Err(e) = stream.forward(to).await { + let from = from.ciphe(ciphe).await; + if let Err(e) = from.forward(to).await { log::debug!("[fuc] Forwarding failed {}", e); } } @@ -141,10 +141,10 @@ fn main() { matches.value_of("server-port").unwrap(), ); - let forward_addr = parse_addr( - matches.value_of("forward-host").unwrap(), - matches.value_of("forward-port").unwrap(), - ); + let forward_host = matches.value_of("forward-host").unwrap(); + let forward_port = matches.value_of("forward-port").unwrap(); + + let forward_addr = parse_addr(forward_host, forward_port); let name = matches .value_of("name") @@ -178,6 +178,17 @@ fn main() { _ => Proxy::Port(forward_addr), }; + env_logger::builder() + .filter_level(match matches.value_of("log").unwrap() { + "debug" => log::LevelFilter::Debug, + "info" => log::LevelFilter::Info, + "warn" => log::LevelFilter::Warn, + "error" => log::LevelFilter::Error, + _ => log::LevelFilter::Info, + }) + .filter_module("fuso_socks", log::LevelFilter::Info) + .init(); + let bridge_addr = { let bridge_bind_host = matches.value_of("bridge-bind-host").unwrap_or("0.0.0.0"); let bridge_bind_port = matches.value_of("bridge-bind-port"); @@ -202,6 +213,7 @@ fn main() { server_addr, server_bind_port: service_bind_port, bridge_addr: bridge_addr, + forward_addr: format!("{}:{}", forward_host, forward_port), }) .await { diff --git a/src/server.rs b/src/server.rs index dd21db7..715e5a4 100644 --- a/src/server.rs +++ b/src/server.rs @@ -104,8 +104,8 @@ fn main() { Ok(State::Accept(())) } }, - Action::Connect(conv) => { - cx.route(conv, tcp.into()).await?; + Action::Connect(conv, id) => { + cx.route(conv, id, tcp.into()).await?; Ok(State::Accept(())) } _ => Ok(State::Next), @@ -121,8 +121,9 @@ fn main() { let _ = udp.reject().await; Ok(State::Release) } - Ok(Socks::Tcp(_, addr)) => Ok(State::Accept(Action::Forward({ - log::info!("[socks] {}", addr); + Ok(Socks::Tcp(tcp, addr)) => Ok(State::Accept(Action::Forward(0, { + log::debug!("[socks] {}", addr); + let _ = tcp.release().await; match addr { fuso_socks::Addr::Socket(addr) => { @@ -141,9 +142,10 @@ fn main() { } }) .next(|_, _| async move { - Ok(State::Accept(Action::Forward(Addr::Socket( - ([0, 0, 0, 0], 0).into(), - )))) + Ok(State::Accept(Action::Forward( + 0, + Addr::Socket(([0, 0, 0, 0], 0).into()), + ))) }) }) .build() @@ -157,7 +159,7 @@ fn main() { let to = to.ciphe(Xor::new(xor_num)).await; - to.forward(from).detach(); + from.forward(to).detach(); } Err(e) => { log::warn!("[fuso] Server error {}", e.to_string());