Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sanskar Jaiswal <[email protected]>
  • Loading branch information
aryan9600 committed Dec 11, 2023
1 parent 60c7800 commit 277f483
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 72 deletions.
7 changes: 1 addition & 6 deletions dataplane/api-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,7 @@ impl BackendService {
state: _,
},
)) => {
// Our BPF maps contain everything in big endianness.
let bk = BackendKey {
ip: u32::from_be(key.ip),
port: u16::from_be(key.port as u16) as u32,
};
if backend_key == bk {
if backend_key == key {
tcp_conns_map.remove(&client_key)?;
};
}
Expand Down
2 changes: 2 additions & 0 deletions dataplane/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ pub struct ClientKey {
#[cfg(feature = "user")]
unsafe impl aya::Pod for ClientKey {}

// TCPState contains variants that represent the current phase of the TCP connection at a point in
// time during the connection's termination.
#[derive(Copy, Clone, Debug, Default)]
#[repr(C)]
pub enum TCPState {
Expand Down
35 changes: 8 additions & 27 deletions dataplane/ebpf/src/egress/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ use aya_bpf::{
programs::TcContext,
};
use aya_log_ebpf::info;
use common::{ClientKey, TCPBackend, TCPState};
use common::ClientKey;
use network_types::{eth::EthHdr, ip::Ipv4Hdr, tcp::TcpHdr};

use crate::{
utils::{csum_fold_helper, handle_tcp_conn_next_state, ptr_at},
utils::{csum_fold_helper, ptr_at, update_tcp_conns},
TCP_CONNECTIONS,
};

Expand Down Expand Up @@ -47,10 +47,13 @@ pub fn handle_tcp_egress(ctx: TcContext) -> Result<i32, i64> {
tcp_backend.backend_key.port,
);

// TODO: connection tracking cleanup https://github.com/kubernetes-sigs/blixt/issues/85
// SNAT the ip address
unsafe {
(*ip_hdr).src_addr = tcp_backend.backend_key.ip;
(*ip_hdr).src_addr = tcp_backend.backend_key.ip.to_be();
};
// SNAT the port
unsafe { (*tcp_hdr).source = u16::from_be(tcp_backend.backend_key.port as u16) };

if (ctx.data() + EthHdr::LEN + Ipv4Hdr::LEN) > ctx.data_end() {
info!(&ctx, "Iphdr is out of bounds");
Expand All @@ -70,10 +73,6 @@ pub fn handle_tcp_egress(ctx: TcContext) -> Result<i32, i64> {
unsafe { (*ip_hdr).check = csum_fold_helper(full_cksum) };
unsafe { (*tcp_hdr).check = 0 };

// TODO: connection tracking cleanup https://github.com/kubernetes-sigs/blixt/issues/85
// SNAT the port
unsafe { (*tcp_hdr).source = tcp_backend.backend_key.port as u16 };

let tcp_hdr_ref = unsafe { tcp_hdr.as_ref().ok_or(TC_ACT_OK)? };

// If the packet has the RST flag set, it means the connection is being terminated, so remove it
Expand All @@ -84,26 +83,8 @@ pub fn handle_tcp_egress(ctx: TcContext) -> Result<i32, i64> {
}
}

let mut tcp_state = tcp_backend.state;
let moved = handle_tcp_conn_next_state(tcp_hdr_ref, &mut tcp_state);
// If the connection has moved to the Closed state, stop tracking it.
if let TCPState::Closed = tcp_state {
unsafe {
TCP_CONNECTIONS.remove(&client_key)?;
}
// If the connection has not reached the Closed state yet, but it did advance to a new state,
// then record the new state.
} else if moved {
let bk = *tcp_backend;
let new_tcp_backend = TCPBackend {
backend: bk.backend,
backend_key: bk.backend_key,
state: tcp_state,
};
unsafe {
TCP_CONNECTIONS.insert(&client_key, &new_tcp_backend, 0_u64)?;
}
}
let mut tcp_bk = *tcp_backend;
update_tcp_conns(tcp_hdr_ref, &client_key, &mut tcp_bk)?;

Ok(TC_ACT_PIPE)
}
51 changes: 15 additions & 36 deletions dataplane/ebpf/src/ingress/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use aya_log_ebpf::{debug, info};
use network_types::{eth::EthHdr, ip::Ipv4Hdr, tcp::TcpHdr};

use crate::{
utils::{csum_fold_helper, handle_tcp_conn_next_state, ptr_at},
utils::{csum_fold_helper, ptr_at, update_tcp_conns},
BACKENDS, GATEWAY_INDEXES, TCP_CONNECTIONS,
};
use common::{Backend, BackendKey, ClientKey, TCPBackend, TCPState, BACKENDS_ARRAY_CAPACITY};
Expand All @@ -36,6 +36,8 @@ pub fn handle_tcp_ingress(ctx: TcContext) -> Result<i32, i64> {
};
// The backend that is responsible for handling this TCP connection.
let mut backend: Backend;
// The Gateway that the TCP connections is forwarded from.
let backend_key: BackendKey;
// Flag to check whether this is a new connection.
let mut new_conn = false;
// The state of this TCP connection.
Expand All @@ -46,10 +48,11 @@ pub fn handle_tcp_ingress(ctx: TcContext) -> Result<i32, i64> {
if let Some(val) = unsafe { TCP_CONNECTIONS.get(&client_key) } {
backend = val.backend;
tcp_state = val.state;
backend_key = val.backend_key;
} else {
new_conn = true;

let backend_key = BackendKey {
backend_key = BackendKey {
ip: u32::from_be(original_daddr),
port: (u16::from_be(unsafe { (*tcp_hdr).dest })) as u32,
};
Expand Down Expand Up @@ -96,6 +99,8 @@ pub fn handle_tcp_ingress(ctx: TcContext) -> Result<i32, i64> {
unsafe {
(*ip_hdr).dst_addr = backend.daddr.to_be();
}
// DNAT the port
unsafe { (*tcp_hdr).dest = (backend.dport as u16).to_be() };

if (ctx.data() + EthHdr::LEN + Ipv4Hdr::LEN) > ctx.data_end() {
info!(&ctx, "Iphdr is out of bounds");
Expand All @@ -118,10 +123,6 @@ pub fn handle_tcp_ingress(ctx: TcContext) -> Result<i32, i64> {
// FIXME
unsafe { (*tcp_hdr).check = 0 };

let original_dport = unsafe { (*tcp_hdr).dest };
// DNAT the port
unsafe { (*tcp_hdr).dest = (backend.dport as u16).to_be() };

let action = unsafe {
bpf_redirect_neigh(
backend.ifindex as u32,
Expand All @@ -131,18 +132,14 @@ pub fn handle_tcp_ingress(ctx: TcContext) -> Result<i32, i64> {
)
};

let tcp_hdr_ref = unsafe { tcp_hdr.as_ref().ok_or(TC_ACT_OK)? };
let mut tcp_backend = TCPBackend {
backend,
backend_key,
state: tcp_state,
};

// If the connection is new, then record it in our map for future tracking.
if new_conn {
let tcp_backend = TCPBackend {
backend,
backend_key: BackendKey {
ip: original_daddr,
port: original_dport as u32,
},
state: tcp_state,
};
unsafe {
TCP_CONNECTIONS.insert(&client_key, &tcp_backend, 0_u64)?;
}
Expand All @@ -152,6 +149,8 @@ pub fn handle_tcp_ingress(ctx: TcContext) -> Result<i32, i64> {
return Ok(action as i32);
}

let tcp_hdr_ref = unsafe { tcp_hdr.as_ref().ok_or(TC_ACT_OK)? };

// If the packet has the RST flag set, it means the connection is being terminated, so remove it
// from our map.
if tcp_hdr_ref.rst() == 1 {
Expand All @@ -160,27 +159,7 @@ pub fn handle_tcp_ingress(ctx: TcContext) -> Result<i32, i64> {
}
}

let moved = handle_tcp_conn_next_state(tcp_hdr_ref, &mut tcp_state);
// If the connection has moved to the Closed state, stop tracking it.
if let TCPState::Closed = tcp_state {
unsafe {
TCP_CONNECTIONS.remove(&client_key)?;
}
// If the connection has not reached the Closed state yet, but it did advance to a new state,
// then record the new state.
} else if moved {
let tcp_backend = TCPBackend {
backend,
backend_key: BackendKey {
ip: original_daddr,
port: original_dport as u32,
},
state: tcp_state,
};
unsafe {
TCP_CONNECTIONS.insert(&client_key, &tcp_backend, 0_u64)?;
}
}
update_tcp_conns(tcp_hdr_ref, &client_key, &mut tcp_backend)?;

info!(&ctx, "redirect action: {}", action);
Ok(action as i32)
Expand Down
33 changes: 30 additions & 3 deletions dataplane/ebpf/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use aya_bpf::{bindings::TC_ACT_OK, programs::TcContext};
use core::mem;
use network_types::tcp::TcpHdr;

use common::TCPState;
use crate::TCP_CONNECTIONS;
use common::{ClientKey, TCPBackend, TCPState};

// -----------------------------------------------------------------------------
// Helper Functions
Expand Down Expand Up @@ -38,11 +39,12 @@ pub fn csum_fold_helper(mut csum: u64) -> u16 {
return !(csum as u16);
}

// Handles the termination of connection.
// Updates the TCP connection's state based on the current phase and the incoming packet's header.
// It returns true if the state transitioned to a different phase.
// Ref: https://en.wikipedia.org/wiki/File:Tcp_state_diagram.png and
// http://www.tcpipguide.com/free/t_TCPConnectionTermination-2.htm
#[inline(always)]
pub fn handle_tcp_conn_next_state(hdr: &TcpHdr, state: &mut TCPState) -> bool {
pub fn process_tcp_state_transition(hdr: &TcpHdr, state: &mut TCPState) -> bool {
let fin = hdr.fin() == 1;
let ack = hdr.ack() == 1;
match state {
Expand Down Expand Up @@ -95,3 +97,28 @@ pub fn handle_tcp_conn_next_state(hdr: &TcpHdr, state: &mut TCPState) -> bool {
}
return false;
}

// Modifies the map tracking TCP connections based on the current state
// of the TCP connection and the incoming TCP packet's header.
#[inline(always)]
pub fn update_tcp_conns(
hdr: &TcpHdr,
client_key: &ClientKey,
tcp_backend: &mut TCPBackend,
) -> Result<(), i64> {
let transitioned = process_tcp_state_transition(hdr, &mut tcp_backend.state);
if let TCPState::Closed = tcp_backend.state {
unsafe {
return TCP_CONNECTIONS.remove(&client_key);
}
}
// If the connection has not reached the Closed state yet, but it did transition to a new state,
// then record the new state.
if transitioned {
unsafe {
return TCP_CONNECTIONS.insert(&client_key, &tcp_backend, 0_u64);
}
}

Ok(())
}

0 comments on commit 277f483

Please sign in to comment.