Skip to content

Commit

Permalink
Encapsulate ProxyAssociation management logic
Browse files Browse the repository at this point in the history
  • Loading branch information
zonyitoo committed May 23, 2020
1 parent 520018d commit b305219
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 206 deletions.
88 changes: 86 additions & 2 deletions src/relay/udprelay/association.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,26 @@
#![allow(dead_code)]

use std::{
future::Future,
io::{self, Cursor, Read},
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::Arc,
};

use async_trait::async_trait;
use bytes::BytesMut;
use futures::future::{self, AbortHandle};
use log::{debug, error, warn};
use lru_time_cache::{Entry, LruCache};
use tokio::{
self,
net::udp::{RecvHalf, SendHalf},
sync::mpsc,
sync::{mpsc, Mutex},
time,
};

use crate::{
config::{ServerAddr, ServerConfig},
config::{Config, ServerAddr, ServerConfig},
context::Context,
relay::{
loadbalancing::server::{ServerData, SharedServerStatistic},
Expand All @@ -31,6 +35,7 @@ use crate::{

use super::{
crypto_io::{decrypt_payload, encrypt_payload},
DEFAULT_TIMEOUT,
MAXIMUM_UDP_PAYLOAD_SIZE,
};

Expand Down Expand Up @@ -421,3 +426,82 @@ impl ProxyAssociation {
Ok((addr, payload))
}
}

#[derive(Clone)]
pub struct ProxyAssociationManager<K> {
map: Arc<Mutex<LruCache<K, ProxyAssociation>>>,
watcher: AbortHandle,
}

impl<K> Drop for ProxyAssociationManager<K> {
fn drop(&mut self) {
self.watcher.abort()
}
}

impl<K> ProxyAssociationManager<K>
where
K: Ord + Clone + Send + 'static,
{
/// Create a new ProxyAssociationManager based on Config
pub fn new(config: &Config) -> ProxyAssociationManager<K> {
let timeout = config.udp_timeout.unwrap_or(DEFAULT_TIMEOUT);

// TODO: Set default capacity by getrlimit #262
// Associations are only eliminated by expire time by default
// So it may exhaust all available file descriptors
let assoc_map = if let Some(max_assoc) = config.udp_max_associations {
LruCache::with_expiry_duration_and_capacity(timeout, max_assoc)
} else {
LruCache::with_expiry_duration(timeout)
};

let map = Arc::new(Mutex::new(assoc_map));

// Create a task for releasing timed out association
let map2 = map.clone();
let (release_task, watcher) = future::abortable(async move {
let mut interval = time::interval(timeout);
loop {
interval.tick().await;

let mut m = map2.lock().await;
// Cleanup expired association
// Do not consume this iterator, it will updates expire time of items that traversed
let _ = m.iter();
}
});

tokio::spawn(release_task);

ProxyAssociationManager { map, watcher }
}

/// Try to reset ProxyAssociation's last used time by key
///
/// Return true if ProxyAssociation is still exist
pub async fn keep_alive(&self, key: &K) -> bool {
let mut assoc = self.map.lock().await;
assoc.get(key).is_some()
}

/// Send a packet to target address
///
/// Create a new association by `create_fut` if association doesn't exist
pub async fn send_packet<F>(&self, key: K, target: Address, pkt: Vec<u8>, create_fut: F) -> io::Result<()>
where
F: Future<Output = io::Result<ProxyAssociation>>,
{
let mut assoc_map = self.map.lock().await;
let assoc = match assoc_map.entry(key) {
Entry::Occupied(oc) => oc.into_mut(),
Entry::Vacant(vc) => vc.insert(create_fut.await?),
};

// FIXME: Lock is still kept for a mutable reference
// Send to local -> remote task
assoc.send(target, pkt).await;

Ok(())
}
}
95 changes: 29 additions & 66 deletions src/relay/udprelay/redir_local.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
//! UDP relay local server
use std::{io, net::SocketAddr, sync::Arc};
use std::{io, net::SocketAddr};

use async_trait::async_trait;
use log::{error, info, trace};
use lru_time_cache::{Entry, LruCache};
use tokio::{self, sync::Mutex, time};

use crate::{
config::RedirType,
Expand All @@ -18,28 +16,24 @@ use crate::{
};

use super::{
association::{ProxyAssociation, ProxySend},
association::{ProxyAssociation, ProxyAssociationManager, ProxySend},
redir::sys::UdpRedirSocket,
DEFAULT_TIMEOUT,
MAXIMUM_UDP_PAYLOAD_SIZE,
};

type AssocMap = LruCache<String, ProxyAssociation>;
type SharedAssocMap = Arc<Mutex<AssocMap>>;

struct ProxyHandler {
ty: RedirType,
src_addr: SocketAddr,
cache_key: String,
assoc_map: SharedAssocMap,
assoc_map: ProxyAssociationManager<String>,
}

impl ProxyHandler {
pub fn new(
ty: RedirType,
src_addr: SocketAddr,
cache_key: String,
assoc_map: SharedAssocMap,
assoc_map: ProxyAssociationManager<String>,
) -> io::Result<ProxyHandler> {
Ok(ProxyHandler {
ty,
Expand All @@ -62,12 +56,7 @@ impl ProxySend for ProxyHandler {
local_udp.send_to(&data, &self.src_addr).await?;

// Update LRU
{
let mut amap = self.assoc_map.lock().await;

// Check or update expire time
let _ = amap.get(&self.cache_key);
}
self.assoc_map.keep_alive(&self.cache_key).await;

return Ok(());
}
Expand All @@ -94,29 +83,12 @@ pub async fn run(context: SharedContext) -> io::Result<()> {

info!("shadowsocks UDP redirect listening on {}", local_addr);

// NOTE: Associations are only eliminated by expire time by default
// So it may exhaust all available file descriptors
let timeout = context.config().udp_timeout.unwrap_or(DEFAULT_TIMEOUT);
let assoc_map = if let Some(max_assoc) = context.config().udp_max_associations {
LruCache::with_expiry_duration_and_capacity(timeout, max_assoc)
} else {
LruCache::with_expiry_duration(timeout)
};
let assoc_map = Arc::new(Mutex::new(assoc_map));
let assoc_manager = ProxyAssociationManager::new(context.config());

let mut pkt_buf = vec![0u8; MAXIMUM_UDP_PAYLOAD_SIZE];

loop {
let (recv_len, src, dst) = match time::timeout(timeout, l.recv_from_redir(&mut pkt_buf)).await {
Ok(r) => r?,
Err(..) => {
// Cleanup expired association
// Do not consume this iterator, it will updates expire time of items that traversed
let mut assoc_map = assoc_map.lock().await;
let _ = assoc_map.iter();
continue;
}
};
let (recv_len, src, dst) = l.recv_from_redir(&mut pkt_buf).await?;

// Packet length is limited by MAXIMUM_UDP_PAYLOAD_SIZE, excess bytes will be discarded.
// Copy bytes, because udp_associate runs in another tokio Task
Expand Down Expand Up @@ -145,41 +117,32 @@ pub async fn run(context: SharedContext) -> io::Result<()> {
let is_bypassed = context.check_target_bypassed(&target).await;

// Check or (re)create an association
{
// Locks the whole association map
let mut ref_assoc_map = assoc_map.lock().await;

let cache_key = format!("{}-{}", src, dst);

// Get or create an association
let assoc = match ref_assoc_map.entry(cache_key.clone()) {
Entry::Occupied(oc) => oc.into_mut(),
Entry::Vacant(vc) => {
// Pick a server
let server = balancer.pick_server();

let sender = match ProxyHandler::new(ty, src, cache_key, assoc_map.clone()) {
Ok(s) => s,
Err(err) => {
error!("create UDP association for {} <-> {}, error: {}", src, dst, err);
continue;
}
};

let assoc = if is_bypassed {
ProxyAssociation::associate_bypassed(src, server, sender).await
} else {
ProxyAssociation::associate_proxied(src, server, sender).await
let cache_key = format!("{}-{}", src, dst);
let cache_key_cloned = cache_key.clone();
let res = assoc_manager
.send_packet(cache_key, target, pkt.to_vec(), async {
// Pick a server
let server = balancer.pick_server();

let sender = match ProxyHandler::new(ty, src, cache_key_cloned, assoc_manager.clone()) {
Ok(s) => s,
Err(err) => {
error!("create UDP association for {} <-> {}, error: {}", src, dst, err);
return Err(err);
}
.expect("create UDP association");
};

vc.insert(assoc)
if is_bypassed {
ProxyAssociation::associate_bypassed(src, server, sender).await
} else {
ProxyAssociation::associate_proxied(src, server, sender).await
}
};
})
.await;

// FIXME: Lock is still kept for a mutable reference
// Send to local -> remote task
assoc.send(target, pkt.to_vec()).await;
if let Err(err) = res {
error!("failed to create UDP association, {}", err);
return Err(err);
}
}
}
Loading

0 comments on commit b305219

Please sign in to comment.