Skip to content

Commit

Permalink
Merge pull request #10 from editso/dev
Browse files Browse the repository at this point in the history
优化代码
  • Loading branch information
editso authored Dec 9, 2021
2 parents 91b319c + efa32af commit 9862014
Show file tree
Hide file tree
Showing 21 changed files with 414 additions and 603 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ path = "src/no-log-client.rs"
log = {version = "0.4.14"}
clap = {version = "3.0.0-beta.5", features = ["yaml"]}
smol = {version = "1.2.5"}
bytes = {version = "1.1.0"}
env_logger = "0.9.0"
fuso-core = {path = "./fuso-core"}
fuso-socks = {path ="./fuso-socks"}
3 changes: 2 additions & 1 deletion fuso-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ smol = {version = "1.2.5"}
bytes = {version = "1.1.0"}
futures = "0.3"
async-trait = {version = "0.1.51"}

once_cell = "1.2.0"
num_cpus = "1.0"
10 changes: 10 additions & 0 deletions fuso-api/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,37 @@ where
}
}

#[inline]
pub fn is_empty(&self) -> bool {
self.buf.lock().unwrap().is_empty()
}

#[inline]
pub fn len(&self) -> usize {
self.len
}

#[inline]
pub fn clear(&mut self) {
self.buf.lock().unwrap().clear();
self.len = 0;
}

#[inline]
pub fn push_back(&mut self, data: &[T]) {
self.buf.lock().unwrap().push_back(data.to_vec());
self.len += data.len();
}

#[inline]
pub fn push_front(&mut self, data: &[T]) {
self.buf.lock().unwrap().push_front(data.to_vec());
self.len += data.len();
}
}

impl Buffer<u8> {
#[inline]
pub fn read_to_buffer(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let mut remaining = buf.len();
let mut read_len = 0;
Expand Down Expand Up @@ -98,6 +104,7 @@ impl Buffer<u8> {

#[async_trait]
impl AsyncRead for Buffer<u8> {
#[inline]
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
_: &mut std::task::Context<'_>,
Expand All @@ -109,6 +116,7 @@ impl AsyncRead for Buffer<u8> {

#[async_trait]
impl AsyncWrite for Buffer<u8> {
#[inline]
fn poll_write(
mut self: std::pin::Pin<&mut Self>,
_: &mut std::task::Context<'_>,
Expand All @@ -118,13 +126,15 @@ impl AsyncWrite for Buffer<u8> {
Poll::Ready(Ok(buf.len()))
}

#[inline]
fn poll_flush(
self: std::pin::Pin<&mut Self>,
_: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}

#[inline]
fn poll_close(
mut self: std::pin::Pin<&mut Self>,
_: &mut std::task::Context<'_>,
Expand Down
59 changes: 37 additions & 22 deletions fuso-api/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use futures::{AsyncReadExt, Future};
use smol::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
net::{TcpListener, TcpStream},
Executor,
};

use crate::{
Expand Down Expand Up @@ -88,7 +89,7 @@ pub trait AsyncTcpSocketEx<B, C> {

#[derive(Debug, Clone)]
pub struct Rollback<T, Store> {
target: Arc<Mutex<T>>,
target: T,
rollback: Arc<RwLock<bool>>,
store: Arc<Mutex<Store>>,
}
Expand Down Expand Up @@ -284,7 +285,24 @@ where
{
#[inline]
fn detach(self) {
smol::spawn(self).detach();
static GLOBAL: once_cell::sync::Lazy<Executor<'_>> = once_cell::sync::Lazy::new(|| {
for n in 1..num_cpus::get() {
log::trace!("spwan executor thread fuso-{}", n);
std::thread::Builder::new()
.name(format!("fuso-{}", n))
.spawn(|| loop {
std::panic::catch_unwind(|| {
smol::block_on(GLOBAL.run(smol::future::pending::<()>()))
})
.ok();
})
.expect("cannot spawn executor thread");
}

Executor::new()
});

GLOBAL.spawn(self).detach();
}
}

Expand All @@ -298,10 +316,10 @@ where
async fn forward(self, to: To) -> Result<()> {
let (reader_s, writer_s) = self.split();
let (reader_t, writer_t) = to.split();

smol::future::race(
smol::io::copy(reader_s, writer_t),
smol::io::copy(reader_t, writer_s),
smol::io::copy(reader_s, writer_t),
)
.await
.map_err(|e| error::Error::with_io(e))?;
Expand All @@ -327,9 +345,10 @@ impl<T> RollbackEx<T, Buffer<u8>> for T
where
T: AsyncRead + AsyncWrite + Send + Sync + 'static,
{
#[inline]
fn roll(self) -> Rollback<T, Buffer<u8>> {
Rollback {
target: Arc::new(Mutex::new(self)),
target: self,
rollback: Arc::new(RwLock::new(false)),
store: Arc::new(Mutex::new(Buffer::new())),
}
Expand Down Expand Up @@ -369,31 +388,31 @@ impl<T> Rollback<T, Buffer<u8>> {
impl Rollback<TcpStream, Buffer<u8>> {
#[inline]
pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
self.target.lock().unwrap().local_addr()
self.target.local_addr()
}

#[inline]
pub fn peer_addr(&self) -> std::io::Result<SocketAddr> {
self.target.lock().unwrap().peer_addr()
self.target.peer_addr()
}
}

impl Rollback<UdpStream, Buffer<u8>> {
#[inline]
pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
self.target.lock().unwrap().local_addr()
self.target.local_addr()
}

#[inline]
pub fn peer_addr(&self) -> std::io::Result<SocketAddr> {
self.target.lock().unwrap().peer_addr()
self.target.peer_addr()
}
}

impl From<Rollback<TcpStream, Buffer<u8>>> for TcpStream {
#[inline]
fn from(roll: Rollback<TcpStream, Buffer<u8>>) -> Self {
roll.target.lock().unwrap().clone()
roll.target
}
}

Expand All @@ -404,7 +423,7 @@ where
{
#[inline]
fn poll_read(
self: std::pin::Pin<&mut Self>,
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> std::task::Poll<std::io::Result<usize>> {
Expand All @@ -423,8 +442,7 @@ where
if read_len >= buf.len() {
std::task::Poll::Ready(Ok(read_len))
} else {
let mut io = self.target.lock().unwrap();
match Pin::new(&mut *io).poll_read(cx, &mut buf[read_len..])? {
match Pin::new(&mut self.target).poll_read(cx, &mut buf[read_len..])? {
std::task::Poll::Pending => Poll::Pending,
std::task::Poll::Ready(0) => Poll::Ready(Ok(read_len)),
std::task::Poll::Ready(n) => {
Expand Down Expand Up @@ -454,29 +472,26 @@ where
{
#[inline]
fn poll_write(
self: std::pin::Pin<&mut Self>,
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
let mut io = self.target.lock().unwrap();
Pin::new(&mut *io).poll_write(cx, buf)
Pin::new(&mut self.target).poll_write(cx, buf)
}

#[inline]
fn poll_flush(
self: std::pin::Pin<&mut Self>,
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let mut io = self.target.lock().unwrap();
Pin::new(&mut *io).poll_flush(cx)
Pin::new(&mut self.target).poll_flush(cx)
}

#[inline]
fn poll_close(
self: std::pin::Pin<&mut Self>,
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let mut io = self.target.lock().unwrap();
Pin::new(&mut *io).poll_close(cx)
Pin::new(&mut self.target).poll_close(cx)
}
}
8 changes: 8 additions & 0 deletions fuso-api/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ impl Error {
}

impl Display for Error {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Debug::fmt(&self.repr, f)
}
}

impl From<std::io::Error> for Error {
#[inline]
fn from(error: std::io::Error) -> Self {
Self {
repr: Repr::IO(error),
Expand All @@ -47,6 +49,7 @@ impl From<std::io::Error> for Error {
}

impl From<ErrorKind> for Error {
#[inline]
fn from(kind: ErrorKind) -> Self {
Self {
repr: Repr::Fuso(kind),
Expand All @@ -55,6 +58,7 @@ impl From<ErrorKind> for Error {
}

impl From<std::io::ErrorKind> for Error {
#[inline]
fn from(kind: std::io::ErrorKind) -> Self {
Self {
repr: Repr::IO(kind.into()),
Expand All @@ -63,6 +67,7 @@ impl From<std::io::ErrorKind> for Error {
}

impl From<smol::channel::RecvError> for Error {
#[inline]
fn from(e: smol::channel::RecvError) -> Self {
Self {
repr: Repr::IO(std::io::Error::new(
Expand All @@ -77,6 +82,7 @@ impl<T> From<smol::channel::SendError<T>> for Error
where
T: Into<String>,
{
#[inline]
fn from(e: smol::channel::SendError<T>) -> Self {
Self {
repr: Repr::IO(std::io::Error::new(
Expand All @@ -88,6 +94,7 @@ where
}

impl From<&str> for Error {
#[inline]
fn from(txt: &str) -> Self {
Self {
repr: Repr::Fuso(ErrorKind::Customer(txt.into())),
Expand All @@ -96,6 +103,7 @@ impl From<&str> for Error {
}

impl From<String> for Error {
#[inline]
fn from(txt: String) -> Self {
Self {
repr: Repr::Fuso(ErrorKind::Customer(txt.into())),
Expand Down
Loading

0 comments on commit 9862014

Please sign in to comment.