From cffeac85f9a40fb32f00eaae218c964118d3e717 Mon Sep 17 00:00:00 2001 From: Sanskar Jaiswal Date: Fri, 1 Dec 2023 14:23:20 +0000 Subject: [PATCH 1/3] tcproute: add support for round-robin load balancing Add support for round-robin load balancing for `TCPRoute`. This enables mutliple backend references in a UDPRoute object, with traffic being distributed to each backend in a round-robin fashion. A new BPF map `TCP_CONNECTIONS` was introduced to help keep track of active TCP connections, with its key storing the client identifier and the value storing the backend's along with the state of the connection. Signed-off-by: Sanskar Jaiswal --- dataplane/api-server/src/backends.rs | 136 ++++++++++++++++++--------- dataplane/api-server/src/lib.rs | 5 +- dataplane/api-server/src/server.rs | 41 +++++++- dataplane/common/src/lib.rs | 40 +++++++- dataplane/ebpf/src/egress/tcp.rs | 45 ++++++--- dataplane/ebpf/src/ingress/tcp.rs | 112 +++++++++++++++++----- dataplane/ebpf/src/main.rs | 6 +- dataplane/ebpf/src/utils.rs | 90 +++++++++++++++++- dataplane/loader/src/main.rs | 29 +++++- internal/dataplane/client/utils.go | 84 ++++++++--------- 10 files changed, 449 insertions(+), 139 deletions(-) diff --git a/dataplane/api-server/src/backends.rs b/dataplane/api-server/src/backends.rs index 25b70150..5571964e 100644 --- a/dataplane/api-server/src/backends.rs +++ b/dataplane/api-server/src/backends.rs @@ -45,8 +45,8 @@ pub struct InterfaceIndexConfirmation { /// Generated client implementations. pub mod backends_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::http::Uri; use tonic::codegen::*; + use tonic::codegen::http::Uri; #[derive(Debug, Clone)] pub struct BackendsClient { inner: tonic::client::Grpc, @@ -90,8 +90,9 @@ pub mod backends_client { >::ResponseBody, >, >, - >>::Error: - Into + Send + Sync, + , + >>::Error: Into + Send + Sync, { BackendsClient::new(InterceptedService::new(inner, interceptor)) } @@ -129,16 +130,23 @@ pub mod backends_client { pub async fn get_interface_index( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> - { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/backends.backends/GetInterfaceIndex"); + let path = http::uri::PathAndQuery::from_static( + "/backends.backends/GetInterfaceIndex", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("backends.backends", "GetInterfaceIndex")); @@ -148,34 +156,38 @@ pub mod backends_client { &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static("/backends.backends/Update"); let mut req = request.into_request(); - req.extensions_mut() - .insert(GrpcMethod::new("backends.backends", "Update")); + req.extensions_mut().insert(GrpcMethod::new("backends.backends", "Update")); self.inner.unary(req, path, codec).await } pub async fn delete( &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static("/backends.backends/Delete"); let mut req = request.into_request(); - req.extensions_mut() - .insert(GrpcMethod::new("backends.backends", "Delete")); + req.extensions_mut().insert(GrpcMethod::new("backends.backends", "Delete")); self.inner.unary(req, path, codec).await } } @@ -190,7 +202,10 @@ pub mod backends_server { async fn get_interface_index( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn update( &self, request: tonic::Request, @@ -223,7 +238,10 @@ pub mod backends_server { max_encoding_message_size: None, } } - pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService where F: tonic::service::Interceptor, { @@ -279,12 +297,21 @@ pub mod backends_server { "/backends.backends/GetInterfaceIndex" => { #[allow(non_camel_case_types)] struct GetInterfaceIndexSvc(pub Arc); - impl tonic::server::UnaryService for GetInterfaceIndexSvc { + impl tonic::server::UnaryService + for GetInterfaceIndexSvc { type Response = super::InterfaceIndexConfirmation; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { (*inner).get_interface_index(request).await }; + let fut = async move { + (*inner).get_interface_index(request).await + }; Box::pin(fut) } } @@ -314,9 +341,13 @@ pub mod backends_server { "/backends.backends/Update" => { #[allow(non_camel_case_types)] struct UpdateSvc(pub Arc); - impl tonic::server::UnaryService for UpdateSvc { + impl tonic::server::UnaryService + for UpdateSvc { type Response = super::Confirmation; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, @@ -352,10 +383,17 @@ pub mod backends_server { "/backends.backends/Delete" => { #[allow(non_camel_case_types)] struct DeleteSvc(pub Arc); - impl tonic::server::UnaryService for DeleteSvc { + impl tonic::server::UnaryService + for DeleteSvc { type Response = super::Confirmation; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { (*inner).delete(request).await }; Box::pin(fut) @@ -384,14 +422,18 @@ pub mod backends_server { }; Box::pin(fut) } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) - }), + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } } } } diff --git a/dataplane/api-server/src/lib.rs b/dataplane/api-server/src/lib.rs index 579913df..bcd51e86 100644 --- a/dataplane/api-server/src/lib.rs +++ b/dataplane/api-server/src/lib.rs @@ -15,15 +15,16 @@ use aya::maps::{HashMap, MapData}; use tonic::transport::Server; use backends::backends_server::BackendsServer; -use common::{BackendKey, BackendList}; +use common::{BackendKey, BackendList, ClientKey, TCPBackend}; pub async fn start( addr: Ipv4Addr, port: u16, backends_map: HashMap, gateway_indexes_map: HashMap, + tcp_conns_map: HashMap, ) -> Result<(), Error> { - let server = server::BackendService::new(backends_map, gateway_indexes_map); + let server = server::BackendService::new(backends_map, gateway_indexes_map, tcp_conns_map); // TODO: mTLS https://github.com/Kong/blixt/issues/50 Server::builder() .add_service(BackendsServer::new(server)) diff --git a/dataplane/api-server/src/server.rs b/dataplane/api-server/src/server.rs index 91d0bf53..3d898944 100644 --- a/dataplane/api-server/src/server.rs +++ b/dataplane/api-server/src/server.rs @@ -8,28 +8,31 @@ use std::net::Ipv4Addr; use std::sync::Arc; use anyhow::Error; -use aya::maps::{HashMap, MapData}; +use aya::maps::{HashMap, MapData, MapError}; use tokio::sync::Mutex; use tonic::{Request, Response, Status}; use crate::backends::backends_server::Backends; use crate::backends::{Confirmation, InterfaceIndexConfirmation, PodIp, Targets, Vip}; use crate::netutils::{if_name_for_routing_ip, if_nametoindex}; -use common::{Backend, BackendKey, BackendList, BACKENDS_ARRAY_CAPACITY}; +use common::{Backend, BackendKey, BackendList, ClientKey, TCPBackend, BACKENDS_ARRAY_CAPACITY}; pub struct BackendService { backends_map: Arc>>, gateway_indexes_map: Arc>>, + tcp_conns_map: Arc>>, } impl BackendService { pub fn new( backends_map: HashMap, gateway_indexes_map: HashMap, + tcp_conns_map: HashMap, ) -> BackendService { BackendService { backends_map: Arc::new(Mutex::new(backends_map)), gateway_indexes_map: Arc::new(Mutex::new(gateway_indexes_map)), + tcp_conns_map: Arc::new(Mutex::new(tcp_conns_map)), } } @@ -51,6 +54,35 @@ impl BackendService { backends_map.remove(&key)?; let mut gateway_indexes_map = self.gateway_indexes_map.lock().await; gateway_indexes_map.remove(&key)?; + + // Delete all entries in our tcp connection tracking map that this backend + // key was related to. This is needed because the TCPRoute might have been + // deleted with TCP connection(s) still open, so without the below logic + // they'll hang around forever. + // Its better to do this rather than maintain a reverse index because the index + // would need to be updated with each new connection. With remove being a less + // frequently used operation, the performance cost is less visible. + let mut tcp_conns_map = self.tcp_conns_map.lock().await; + for item in tcp_conns_map + .iter() + .collect::>>() + { + match item { + Ok(( + client_key, + TCPBackend { + backend: _, + backend_key, + state: _, + }, + )) => { + if backend_key == key { + tcp_conns_map.remove(&client_key)?; + }; + } + Err(err) => return Err(err.into()), + }; + } Ok(()) } } @@ -144,9 +176,10 @@ impl Backends for BackendService { match self.insert_and_reset_index(key, backend_list).await { Ok(_) => Ok(Response::new(Confirmation { confirmation: format!( - "success, vip {}:{} was updated", + "success, vip {}:{} was updated with {} backends", Ipv4Addr::from(vip.ip), - vip.port + vip.port, + count, ), })), Err(err) => Err(Status::internal(format!("failure: {}", err))), diff --git a/dataplane/common/src/lib.rs b/dataplane/common/src/lib.rs index 10fd7b7e..1da5155d 100644 --- a/dataplane/common/src/lib.rs +++ b/dataplane/common/src/lib.rs @@ -20,7 +20,7 @@ pub struct Backend { #[cfg(feature = "user")] unsafe impl aya::Pod for Backend {} -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, PartialEq, Eq)] #[repr(C)] pub struct BackendKey { pub ip: u32, @@ -40,3 +40,41 @@ pub struct BackendList { #[cfg(feature = "user")] unsafe impl aya::Pod for BackendList {} + +#[derive(Copy, Clone, Debug)] +#[repr(C)] +pub struct ClientKey { + pub ip: u32, + pub port: u32, +} + +#[cfg(feature = "user")] +unsafe impl aya::Pod for ClientKey {} + +// TCPState contains variants that represent the current phase of the TCP connection at a point in +// time during the connection's termination. +#[derive(Copy, Clone, Debug, Default)] +#[repr(C)] +pub enum TCPState { + #[default] + Established, + FinWait1, + FinWait2, + Closing, + TimeWait, + Closed, +} + +#[cfg(feature = "user")] +unsafe impl aya::Pod for TCPState {} + +#[derive(Copy, Clone, Debug)] +#[repr(C)] +pub struct TCPBackend { + pub backend: Backend, + pub backend_key: BackendKey, + pub state: TCPState, +} + +#[cfg(feature = "user")] +unsafe impl aya::Pod for TCPBackend {} diff --git a/dataplane/ebpf/src/egress/tcp.rs b/dataplane/ebpf/src/egress/tcp.rs index 89829329..48358ab8 100644 --- a/dataplane/ebpf/src/egress/tcp.rs +++ b/dataplane/ebpf/src/egress/tcp.rs @@ -12,11 +12,12 @@ use aya_bpf::{ programs::TcContext, }; use aya_log_ebpf::info; +use common::ClientKey; use network_types::{eth::EthHdr, ip::Ipv4Hdr, tcp::TcpHdr}; use crate::{ - utils::{csum_fold_helper, ptr_at}, - BLIXT_CONNTRACK, + utils::{csum_fold_helper, ptr_at, update_tcp_conns}, + TCP_CONNECTIONS, }; pub fn handle_tcp_egress(ctx: TcContext) -> Result { @@ -29,25 +30,30 @@ pub fn handle_tcp_egress(ctx: TcContext) -> Result { // capture some IP and port information let client_addr = unsafe { (*ip_hdr).dst_addr }; - let dest_port = unsafe { (*tcp_hdr).dest.to_be() }; - let ip_port_tuple = unsafe { BLIXT_CONNTRACK.get(&client_addr) }.ok_or(TC_ACT_PIPE)?; - - // verify traffic destination - if ip_port_tuple.1 as u16 != dest_port { - return Ok(TC_ACT_PIPE); - } + let dest_port = unsafe { (*tcp_hdr).dest }; + // The source identifier + let client_key = ClientKey { + ip: u32::from_be(client_addr), + port: u16::from_be(dest_port) as u32, + }; + let tcp_backend = unsafe { TCP_CONNECTIONS.get(&client_key) }.ok_or(TC_ACT_PIPE)?; info!( &ctx, - "Received TCP packet destined for tracked IP {:i}:{} setting source IP to VIP {:i}", + "Received TCP packet destined for tracked IP {:i}:{} setting source IP to VIP {:i}:{}", u32::from_be(client_addr), - ip_port_tuple.1 as u16, - u32::from_be(ip_port_tuple.0), + u16::from_be(dest_port), + tcp_backend.backend_key.ip, + tcp_backend.backend_key.port, ); + // TODO: connection tracking cleanup https://github.com/kubernetes-sigs/blixt/issues/85 + // SNAT the ip address unsafe { - (*ip_hdr).src_addr = ip_port_tuple.0; + (*ip_hdr).src_addr = tcp_backend.backend_key.ip.to_be(); }; + // SNAT the port + unsafe { (*tcp_hdr).source = u16::from_be(tcp_backend.backend_key.port as u16) }; if (ctx.data() + EthHdr::LEN + Ipv4Hdr::LEN) > ctx.data_end() { info!(&ctx, "Iphdr is out of bounds"); @@ -67,7 +73,18 @@ pub fn handle_tcp_egress(ctx: TcContext) -> Result { unsafe { (*ip_hdr).check = csum_fold_helper(full_cksum) }; unsafe { (*tcp_hdr).check = 0 }; - // TODO: connection tracking cleanup https://github.com/kubernetes-sigs/blixt/issues/85 + let tcp_hdr_ref = unsafe { tcp_hdr.as_ref().ok_or(TC_ACT_OK)? }; + + // If the packet has the RST flag set, it means the connection is being terminated, so remove it + // from our map. + if tcp_hdr_ref.rst() == 1 { + unsafe { + TCP_CONNECTIONS.remove(&client_key)?; + } + } + + let mut tcp_bk = *tcp_backend; + update_tcp_conns(tcp_hdr_ref, &client_key, &mut tcp_bk)?; Ok(TC_ACT_PIPE) } diff --git a/dataplane/ebpf/src/ingress/tcp.rs b/dataplane/ebpf/src/ingress/tcp.rs index 3fb56c5b..c6464d97 100644 --- a/dataplane/ebpf/src/ingress/tcp.rs +++ b/dataplane/ebpf/src/ingress/tcp.rs @@ -11,14 +11,14 @@ use aya_bpf::{ helpers::{bpf_csum_diff, bpf_redirect_neigh}, programs::TcContext, }; -use aya_log_ebpf::info; +use aya_log_ebpf::{debug, info}; use network_types::{eth::EthHdr, ip::Ipv4Hdr, tcp::TcpHdr}; use crate::{ - utils::{csum_fold_helper, ptr_at}, - BACKENDS, BLIXT_CONNTRACK, + utils::{csum_fold_helper, ptr_at, update_tcp_conns}, + BACKENDS, GATEWAY_INDEXES, TCP_CONNECTIONS, }; -use common::BackendKey; +use common::{Backend, BackendKey, ClientKey, TCPBackend, TCPState, BACKENDS_ARRAY_CAPACITY}; pub fn handle_tcp_ingress(ctx: TcContext) -> Result { let ip_hdr: *mut Ipv4Hdr = unsafe { ptr_at(&ctx, EthHdr::LEN)? }; @@ -29,14 +29,64 @@ pub fn handle_tcp_ingress(ctx: TcContext) -> Result { let original_daddr = unsafe { (*ip_hdr).dst_addr }; - let key = BackendKey { - ip: u32::from_be(original_daddr), - port: (u16::from_be(unsafe { (*tcp_hdr).dest })) as u32, + // The source identifier + let client_key = ClientKey { + ip: u32::from_be(unsafe { (*ip_hdr).src_addr }), + port: (u16::from_be(unsafe { (*tcp_hdr).source })) as u32, }; - let backend_list = unsafe { BACKENDS.get(&key) }.ok_or(TC_ACT_OK)?; - // Only a single backend is supported for TCP connections. - // TODO(aryan9600): Add support for multiple backends (https://github.com/kubernetes-sigs/blixt/issues/119) - let backend = backend_list.backends[0]; + // The backend that is responsible for handling this TCP connection. + let mut backend: Backend; + // The Gateway that the TCP connections is forwarded from. + let backend_key: BackendKey; + // Flag to check whether this is a new connection. + let mut new_conn = false; + // The state of this TCP connection. + let mut tcp_state = TCPState::default(); + + // Try to find the backend previously used for this connection. If not found, it means that + // this is a new connection, so assign it the next backend in line. + if let Some(val) = unsafe { TCP_CONNECTIONS.get(&client_key) } { + backend = val.backend; + tcp_state = val.state; + backend_key = val.backend_key; + } else { + new_conn = true; + + backend_key = BackendKey { + ip: u32::from_be(original_daddr), + port: (u16::from_be(unsafe { (*tcp_hdr).dest })) as u32, + }; + let backend_list = unsafe { BACKENDS.get(&backend_key) }.ok_or(TC_ACT_OK)?; + let backend_index = unsafe { GATEWAY_INDEXES.get(&backend_key) }.ok_or(TC_ACT_OK)?; + + debug!(&ctx, "Destination backend index: {}", *backend_index); + debug!(&ctx, "Backends length: {}", backend_list.backends_len); + + // this check asserts that we don't use a "zero-value" Backend + if backend_list.backends_len <= *backend_index { + return Ok(TC_ACT_OK); + } + // the bpf verifier is aware of variables that are used as an index for + // an array and requires that we check the array boundaries against + // the index to ensure our access is in-bounds. + if *backend_index as usize >= BACKENDS_ARRAY_CAPACITY { + return Ok(TC_ACT_OK); + } + + backend = backend_list.backends[0]; + if let Some(val) = backend_list.backends.get(*backend_index as usize) { + backend = *val; + } + + // move the index to the next backend in our list + let mut next = *backend_index + 1; + if next >= backend_list.backends_len { + next = 0; + } + unsafe { + GATEWAY_INDEXES.insert(&backend_key, &next, 0_u64)?; + } + } info!( &ctx, @@ -45,9 +95,12 @@ pub fn handle_tcp_ingress(ctx: TcContext) -> Result { u16::from_be(unsafe { (*tcp_hdr).dest }) ); + // DNAT the ip address unsafe { (*ip_hdr).dst_addr = backend.daddr.to_be(); } + // DNAT the port + unsafe { (*tcp_hdr).dest = (backend.dport as u16).to_be() }; if (ctx.data() + EthHdr::LEN + Ipv4Hdr::LEN) > ctx.data_end() { info!(&ctx, "Iphdr is out of bounds"); @@ -67,9 +120,6 @@ pub fn handle_tcp_ingress(ctx: TcContext) -> Result { ) } as u64; unsafe { (*ip_hdr).check = csum_fold_helper(full_cksum) }; - - // Update destination port - unsafe { (*tcp_hdr).dest = (backend.dport as u16).to_be() }; // FIXME unsafe { (*tcp_hdr).check = 0 }; @@ -82,15 +132,35 @@ pub fn handle_tcp_ingress(ctx: TcContext) -> Result { ) }; - unsafe { - BLIXT_CONNTRACK.insert( - &(*ip_hdr).src_addr, - &(original_daddr, (*tcp_hdr).source.to_be() as u32), - 0 as u64, - )?; + let mut tcp_backend = TCPBackend { + backend, + backend_key, + state: tcp_state, }; - info!(&ctx, "redirect action: {}", action); + // If the connection is new, then record it in our map for future tracking. + if new_conn { + unsafe { + TCP_CONNECTIONS.insert(&client_key, &tcp_backend, 0_u64)?; + } + // since this is a new connection, there is nothing else to do, so exit early + info!(&ctx, "redirect action: {}", action); + return Ok(action as i32); + } + + let tcp_hdr_ref = unsafe { tcp_hdr.as_ref().ok_or(TC_ACT_OK)? }; + + // If the packet has the RST flag set, it means the connection is being terminated, so remove it + // from our map. + if tcp_hdr_ref.rst() == 1 { + unsafe { + TCP_CONNECTIONS.remove(&client_key)?; + } + } + + update_tcp_conns(tcp_hdr_ref, &client_key, &mut tcp_backend)?; + + info!(&ctx, "redirect action: {}", action); Ok(action as i32) } diff --git a/dataplane/ebpf/src/main.rs b/dataplane/ebpf/src/main.rs index 4e050d9b..cbcdfea2 100644 --- a/dataplane/ebpf/src/main.rs +++ b/dataplane/ebpf/src/main.rs @@ -22,7 +22,7 @@ use aya_bpf::{ programs::TcContext, }; -use common::{BackendKey, BackendList, BPF_MAPS_CAPACITY}; +use common::{BackendKey, BackendList, ClientKey, TCPBackend, BPF_MAPS_CAPACITY}; use egress::{icmp::handle_icmp_egress, tcp::handle_tcp_egress}; use ingress::{tcp::handle_tcp_ingress, udp::handle_udp_ingress}; @@ -48,6 +48,10 @@ static mut GATEWAY_INDEXES: HashMap = static mut BLIXT_CONNTRACK: HashMap = HashMap::::with_max_entries(BPF_MAPS_CAPACITY, 0); +#[map(name = "TCP_CONNECTIONS")] +static mut TCP_CONNECTIONS: HashMap = + HashMap::::with_max_entries(128, 0); + // ----------------------------------------------------------------------------- // Ingress // ----------------------------------------------------------------------------- diff --git a/dataplane/ebpf/src/utils.rs b/dataplane/ebpf/src/utils.rs index 4cc8aaf0..47e9af82 100644 --- a/dataplane/ebpf/src/utils.rs +++ b/dataplane/ebpf/src/utils.rs @@ -4,8 +4,12 @@ Copyright 2023 The Kubernetes Authors. SPDX-License-Identifier: (GPL-2.0-only OR BSD-2-Clause) */ -use core::mem; use aya_bpf::{bindings::TC_ACT_OK, programs::TcContext}; +use core::mem; +use network_types::tcp::TcpHdr; + +use crate::TCP_CONNECTIONS; +use common::{ClientKey, TCPBackend, TCPState}; // ----------------------------------------------------------------------------- // Helper Functions @@ -34,3 +38,87 @@ pub fn csum_fold_helper(mut csum: u64) -> u16 { } return !(csum as u16); } + +// Updates the TCP connection's state based on the current phase and the incoming packet's header. +// It returns true if the state transitioned to a different phase. +// Ref: https://en.wikipedia.org/wiki/File:Tcp_state_diagram.png and +// http://www.tcpipguide.com/free/t_TCPConnectionTermination-2.htm +#[inline(always)] +pub fn process_tcp_state_transition(hdr: &TcpHdr, state: &mut TCPState) -> bool { + let fin = hdr.fin() == 1; + let ack = hdr.ack() == 1; + match state { + TCPState::Established => { + // At the Established state, a FIN packet moves the state to FinWait1. + if fin { + *state = TCPState::FinWait1; + return true; + } + } + TCPState::FinWait1 => { + // At the FinWait1 state, a packet with both the FIN and ACK bits set + // moves the state to TimeWait. + if fin && ack { + *state = TCPState::TimeWait; + return true; + } + // At the FinWait1 state, a FIN packet moves the state to Closing. + if fin { + *state = TCPState::Closing; + return true; + } + // At the FinWait1 state, an ACK packet moves the state to FinWait2. + if ack { + *state = TCPState::FinWait2; + return true; + } + } + TCPState::FinWait2 => { + // At the FinWait2 state, an ACK packet moves the state to TimeWait. + if ack { + *state = TCPState::TimeWait; + return true; + } + } + TCPState::Closing => { + // At the Closing state, an ACK packet moves the state to TimeWait. + if ack { + *state = TCPState::TimeWait; + return true; + } + } + TCPState::TimeWait => { + if ack { + *state = TCPState::Closed; + return true; + } + } + TCPState::Closed => {} + } + return false; +} + +// Modifies the map tracking TCP connections based on the current state +// of the TCP connection and the incoming TCP packet's header. +#[inline(always)] +pub fn update_tcp_conns( + hdr: &TcpHdr, + client_key: &ClientKey, + tcp_backend: &mut TCPBackend, +) -> Result<(), i64> { + let transitioned = process_tcp_state_transition(hdr, &mut tcp_backend.state); + if let TCPState::Closed = tcp_backend.state { + unsafe { + return TCP_CONNECTIONS.remove(&client_key); + } + } + // If the connection has not reached the Closed state yet, but it did transition to a new state, + // then record the new state. + if transitioned { + unsafe { + return TCP_CONNECTIONS.insert(&client_key, &tcp_backend, 0_u64); + } + } + + Ok(()) +} diff --git a/dataplane/loader/src/main.rs b/dataplane/loader/src/main.rs index 88db3e08..f62fc4be 100644 --- a/dataplane/loader/src/main.rs +++ b/dataplane/loader/src/main.rs @@ -13,7 +13,7 @@ use aya::programs::{tc, SchedClassifier, TcAttachType}; use aya::{include_bytes_aligned, Bpf}; use aya_log::BpfLogger; use clap::Parser; -use common::{BackendKey, BackendList}; +use common::{BackendKey, BackendList, ClientKey, TCPBackend}; use log::{info, warn}; #[derive(Debug, Parser)] @@ -46,9 +46,21 @@ async fn main() -> Result<(), anyhow::Error> { .expect("no maps named GATEWAY_INDEXES"), ) .try_into()?; + let tcp_conns: HashMap<_, ClientKey, TCPBackend> = Map::HashMap( + MapData::from_pin(bpfd_maps.join("TCP_CONNECTIONS")) + .expect("no maps named TCP_CONNECTIONS"), + ) + .try_into()?; info!("starting api server"); - start_api_server(Ipv4Addr::new(0, 0, 0, 0), 9874, backends, gateway_indexes).await?; + start_api_server( + Ipv4Addr::new(0, 0, 0, 0), + 9874, + backends, + gateway_indexes, + tcp_conns, + ) + .await?; } else { info!("loading ebpf programs"); @@ -90,8 +102,19 @@ async fn main() -> Result<(), anyhow::Error> { bpf.take_map("GATEWAY_INDEXES") .expect("no maps named GATEWAY_INDEXES"), )?; + let tcp_conns: HashMap<_, ClientKey, TCPBackend> = HashMap::try_from( + bpf.take_map("TCP_CONNECTIONS") + .expect("no maps named TCP_CONNECTIONS"), + )?; - start_api_server(Ipv4Addr::new(0, 0, 0, 0), 9874, backends, gateway_indexes).await?; + start_api_server( + Ipv4Addr::new(0, 0, 0, 0), + 9874, + backends, + gateway_indexes, + tcp_conns, + ) + .await?; } info!("Exiting..."); diff --git a/internal/dataplane/client/utils.go b/internal/dataplane/client/utils.go index 8977beeb..8eef5c1a 100644 --- a/internal/dataplane/client/utils.go +++ b/internal/dataplane/client/utils.go @@ -31,7 +31,6 @@ import ( // CompileUDPRouteToDataPlaneBackend takes a UDPRoute and the Gateway it is // attached to and produces Backend Targets for the DataPlane to configure. func CompileUDPRouteToDataPlaneBackend(ctx context.Context, c client.Client, udproute *gatewayv1alpha2.UDPRoute, gateway *gatewayv1beta1.Gateway) (*Targets, error) { - gatewayIP, err := GetGatewayIP(gateway) if gatewayIP == nil { return nil, err @@ -98,19 +97,8 @@ func CompileUDPRouteToDataPlaneBackend(ctx context.Context, c client.Client, udp // CompileTCPRouteToDataPlaneBackend takes a TCPRoute and the Gateway it is // attached to and produces Backend Targets for the DataPlane to configure. -func CompileTCPRouteToDataPlaneBackend(ctx context.Context, c client.Client, tcproute *gatewayv1alpha2.TCPRoute, gateway *gatewayv1beta1.Gateway) (*Targets, error) { - // TODO: add support for multiple rules https://github.com/Kong/blixt/issues/10 - if len(tcproute.Spec.Rules) != 1 { - return nil, fmt.Errorf("currently can only support 1 TCPRoute rule, received %d", len(tcproute.Spec.Rules)) - } - rule := tcproute.Spec.Rules[0] - - // TODO: add support for multiple rules https://github.com/Kong/blixt/issues/10 - if len(rule.BackendRefs) != 1 { - return nil, fmt.Errorf("expect 1 backendRef received %d", len(rule.BackendRefs)) - } - backendRef := rule.BackendRefs[0] - +func CompileTCPRouteToDataPlaneBackend(ctx context.Context, c client.Client, + tcproute *gatewayv1alpha2.TCPRoute, gateway *gatewayv1beta1.Gateway) (*Targets, error) { gatewayIP, err := GetGatewayIP(gateway) if gatewayIP == nil { return nil, err @@ -120,42 +108,49 @@ func CompileTCPRouteToDataPlaneBackend(ctx context.Context, c client.Client, tcp if err != nil { return nil, err } - - // TODO only using one endpoint for now until https://github.com/Kong/blixt/issues/10 - var target *Target - if tcproute.DeletionTimestamp == nil { - endpoints, err := endpointsFromBackendRef(ctx, c, tcproute.Namespace, backendRef) - if err != nil { - return nil, err - } - - for _, subset := range endpoints.Subsets { - if len(subset.Addresses) < 1 { - return nil, fmt.Errorf("addresses not ready for endpoints") - } - if len(subset.Ports) < 1 { - return nil, fmt.Errorf("ports not ready for endpoints") + var backendTargets []*Target + for _, rule := range tcproute.Spec.Rules { + for _, backendRef := range rule.BackendRefs { + endpoints, err := endpointsFromBackendRef(ctx, c, tcproute.Namespace, backendRef) + if err != nil { + return nil, err } - if subset.Addresses[0].IP == "" { - return nil, fmt.Errorf("empty IP for endpoint subset") + if len(endpoints.Subsets) < 1 { + return nil, fmt.Errorf("endpoint has no subsets") } + for _, subset := range endpoints.Subsets { + if len(subset.Addresses) < 1 { + return nil, fmt.Errorf("addresses not ready for endpoints") + } + if len(subset.Ports) < 1 { + return nil, fmt.Errorf("ports not ready for endpoints") + } - ip := net.ParseIP(subset.Addresses[0].IP) - podip := binary.BigEndian.Uint32(ip.To4()) - podPort, err := getBackendPort(ctx, c, tcproute.Namespace, backendRef, subset.Ports) - if err != nil { - return nil, err - } + for _, addr := range subset.Addresses { + if addr.IP == "" { + return nil, fmt.Errorf("empty IP for endpoint subset") + } - target = &Target{ - Daddr: podip, - Dport: uint32(podPort), + ip := net.ParseIP(addr.IP) + podip := binary.BigEndian.Uint32(ip.To4()) + podPort, err := getBackendPort(ctx, c, tcproute.Namespace, backendRef, subset.Ports) + if err != nil { + return nil, err + } + + target := &Target{ + Daddr: podip, + Dport: uint32(podPort), + } + backendTargets = append(backendTargets, target) + } } } - if target == nil { - return nil, fmt.Errorf("endpoints not ready") - } + } + + if len(backendTargets) == 0 { + return nil, fmt.Errorf("no healthy backends") } ipint := binary.BigEndian.Uint32(gatewayIP.To4()) @@ -165,8 +160,7 @@ func CompileTCPRouteToDataPlaneBackend(ctx context.Context, c client.Client, tcp Ip: ipint, Port: gatewayPort, }, - // TODO(aryan9600): Add support for multiple targets (https://github.com/kubernetes-sigs/blixt/issues/119) - Targets: []*Target{target}, + Targets: backendTargets, } return targets, nil From 7065fe44ddc5b9bac46214403e5ba53c1918adef Mon Sep 17 00:00:00 2001 From: Sanskar Jaiswal Date: Fri, 1 Dec 2023 15:01:46 +0000 Subject: [PATCH 2/3] tcproute: add tests for round-robin load balancing Signed-off-by: Sanskar Jaiswal --- config/samples/tcproute/server.yaml | 6 +- config/tests/tcproute-rr/kustomization.yaml | 7 + config/tests/tcproute-rr/patch.yaml | 17 ++ config/tests/tcproute-rr/server.yaml | 76 +++++++++ test/integration/tcproute_test.go | 176 ++++++++++++++++++-- 5 files changed, 265 insertions(+), 17 deletions(-) create mode 100644 config/tests/tcproute-rr/kustomization.yaml create mode 100644 config/tests/tcproute-rr/patch.yaml create mode 100644 config/tests/tcproute-rr/server.yaml diff --git a/config/samples/tcproute/server.yaml b/config/samples/tcproute/server.yaml index cc0942f2..cebc923b 100644 --- a/config/samples/tcproute/server.yaml +++ b/config/samples/tcproute/server.yaml @@ -16,11 +16,9 @@ spec: spec: containers: - name: server - image: ghcr.io/shaneutt/malutki + image: istio/tcp-echo-server:1.1 imagePullPolicy: IfNotPresent - env: - - name: LISTEN_PORT - value: "8080" + args: [ "8080", "blixt-tcproute-sample:" ] ports: - containerPort: 8080 protocol: TCP diff --git a/config/tests/tcproute-rr/kustomization.yaml b/config/tests/tcproute-rr/kustomization.yaml new file mode 100644 index 00000000..7a75e9ec --- /dev/null +++ b/config/tests/tcproute-rr/kustomization.yaml @@ -0,0 +1,7 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization +resources: +- ../../samples/tcproute +- server.yaml +patches: +- path: patch.yaml diff --git a/config/tests/tcproute-rr/patch.yaml b/config/tests/tcproute-rr/patch.yaml new file mode 100644 index 00000000..5dce38a4 --- /dev/null +++ b/config/tests/tcproute-rr/patch.yaml @@ -0,0 +1,17 @@ +apiVersion: gateway.networking.k8s.io/v1alpha2 +kind: TCPRoute +metadata: + name: blixt-tcproute-sample +spec: + parentRefs: + - name: blixt-tcproute-sample + port: 8080 + rules: + - backendRefs: + - name: blixt-tcproute-sample + port: 8080 + - backendRefs: + - name: tcproute-rr-v1 + port: 8080 + - name: tcproute-rr-v2 + port: 8080 diff --git a/config/tests/tcproute-rr/server.yaml b/config/tests/tcproute-rr/server.yaml new file mode 100644 index 00000000..5c323dd5 --- /dev/null +++ b/config/tests/tcproute-rr/server.yaml @@ -0,0 +1,76 @@ +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: tcproute-rr-v1 + labels: + app: tcproute-rr-v1 +spec: + selector: + matchLabels: + app: tcproute-rr-v1 + template: + metadata: + labels: + app: tcproute-rr-v1 + spec: + containers: + - name: tcp-echo + image: istio/tcp-echo-server:1.1 + imagePullPolicy: IfNotPresent + args: [ "8080", "tcproute-rr-v1:" ] + ports: + - containerPort: 8080 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: tcproute-rr-v2 + labels: + app: tcproute-rr-v2 +spec: + selector: + matchLabels: + app: tcproute-rr-v2 + template: + metadata: + labels: + app: tcproute-rr-v2 + spec: + containers: + - name: tcp-echo + image: istio/tcp-echo-server:1.1 + imagePullPolicy: IfNotPresent + args: [ "8080", "tcproute-rr-v2:" ] + ports: + - containerPort: 8080 +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app: tcproute-rr-v1 + name: tcproute-rr-v1 +spec: + ports: + - name: tcp + port: 8080 + protocol: TCP + selector: + app: tcproute-rr-v1 + type: ClusterIP +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app: tcproute-rr-v2 + name: tcproute-rr-v2 +spec: + ports: + - name: tcp + port: 8080 + protocol: TCP + selector: + app: tcproute-rr-v2 + type: ClusterIP diff --git a/test/integration/tcproute_test.go b/test/integration/tcproute_test.go index 0ac88143..3e05fc91 100644 --- a/test/integration/tcproute_test.go +++ b/test/integration/tcproute_test.go @@ -20,9 +20,10 @@ limitations under the License. package integration import ( + "bufio" "context" "fmt" - "net/http" + "net" "strings" "testing" "time" @@ -38,9 +39,12 @@ import ( const ( tcprouteSampleKustomize = "../../config/tests/tcproute" + tcprouteRRKustomize = "../../config/tests/tcproute-rr" tcprouteSampleName = "blixt-tcproute-sample" ) +var tcpServerNames = []string{"blixt-tcproute-sample", "tcproute-rr-v1", "tcproute-rr-v2"} + func TestTCPRouteBasics(t *testing.T) { tcpRouteBasicsCleanupKey := "tcproutebasics" defer func() { @@ -69,38 +73,184 @@ func TestTCPRouteBasics(t *testing.T) { require.Equal(t, gatewayv1beta1.IPAddressType, *gw.Status.Addresses[0].Type) gwaddr := fmt.Sprintf("%s:8080", gw.Status.Addresses[0].Value) - t.Log("waiting for HTTP server to be available") + t.Log("waiting for TCP server to be available") require.Eventually(t, func() bool { server, err := env.Cluster().Client().AppsV1().Deployments(corev1.NamespaceDefault).Get(ctx, tcprouteSampleName, metav1.GetOptions{}) require.NoError(t, err) return server.Status.AvailableReplicas > 0 }, time.Minute, time.Second) - t.Log("verifying HTTP connectivity to the server") - httpc := http.Client{Timeout: time.Second * 30} + t.Log("verifying TCP connectivity to the server") + var conn net.Conn require.Eventually(t, func() bool { - resp, err := httpc.Get(fmt.Sprintf("http://%s/status/%d", gwaddr, http.StatusTeapot)) + var err error + conn, err = net.Dial("tcp", gwaddr) if err != nil { - t.Logf("received error checking HTTP server: [%s], retrying...", err) + t.Logf("received error connecting to TCP server: [%s], retrying...", err) return false } - defer resp.Body.Close() - return resp.StatusCode == http.StatusTeapot + return true }, time.Minute*5, time.Second) - t.Log("deleting the TCPRoute and verifying that HTTP traffic stops") + response := writeAndReadTCP(t, conn) + require.Contains(t, response, tcpServerNames[0]) + + t.Log("deleting the TCPRoute and verifying that TCP connection is closed") require.NoError(t, gwclient.GatewayV1alpha2().TCPRoutes(corev1.NamespaceDefault).Delete(ctx, tcprouteSampleName, metav1.DeleteOptions{})) - httpc = http.Client{Timeout: time.Second * 3} require.Eventually(t, func() bool { - resp, err := httpc.Get(fmt.Sprintf("http://%s/status/%d", gwaddr, http.StatusTeapot)) + _, err := conn.Write([]byte("blahhh\n")) + require.NoError(t, err) + + err = conn.SetReadDeadline(time.Now().Add(time.Second * 3)) + require.NoError(t, err) + reader := bufio.NewReader(conn) + _, err = reader.ReadBytes(byte('\n')) if err != nil { - if strings.Contains(err.Error(), "context deadline exceeded") { + if strings.Contains(err.Error(), "i/o timeout") { return true } t.Logf("received unexpected error waiting for TCPRoute to decomission: %s", err) return false } - defer resp.Body.Close() return false }, time.Minute, time.Second) } + +func TestTCPRouteRoundRobin(t *testing.T) { + tcpRouteRRCleanupKey := "tcprouterr" + defer func() { + testutils.DumpDiagnosticsIfFailed(ctx, t, env.Cluster()) + if err := runCleanup(tcpRouteRRCleanupKey); err != nil { + t.Errorf("cleanup failed: %s", err) + } + }() + + t.Log("deploying config/samples/tcproute-rr kustomize") + require.NoError(t, clusters.KustomizeDeployForCluster(ctx, env.Cluster(), tcprouteRRKustomize)) + addCleanup(tcpRouteRRCleanupKey, func(ctx context.Context) error { + cleanupLog("cleaning up config/samples/tcproute-rr kustomize") + return clusters.KustomizeDeleteForCluster(ctx, env.Cluster(), tcprouteRRKustomize, "--ignore-not-found=true") + }) + + t.Log("waiting for Gateway to have an address") + var gw *gatewayv1beta1.Gateway + require.Eventually(t, func() bool { + var err error + gw, err = gwclient.GatewayV1beta1().Gateways(corev1.NamespaceDefault).Get(ctx, tcprouteSampleName, metav1.GetOptions{}) + require.NoError(t, err) + return len(gw.Status.Addresses) > 0 + }, time.Minute, time.Second) + require.NotNil(t, gw.Status.Addresses[0].Type) + require.Equal(t, gatewayv1beta1.IPAddressType, *gw.Status.Addresses[0].Type) + gwaddr := fmt.Sprintf("%s:8080", gw.Status.Addresses[0].Value) + + t.Log("waiting for TCP servers to be available") + labelSelector := metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "app", + Operator: metav1.LabelSelectorOpIn, + Values: tcpServerNames, + }, + }, + } + require.Eventually(t, func() bool { + servers, err := env.Cluster().Client().AppsV1().Deployments(corev1.NamespaceDefault).List(ctx, metav1.ListOptions{ + LabelSelector: metav1.FormatLabelSelector(&labelSelector), + }) + require.NoError(t, err) + for _, server := range servers.Items { + if server.Status.AvailableReplicas <= 0 { + return false + } + } + return true + }, time.Minute, time.Second) + + t.Log("verifying TCP connectivity to the servers") + // We create three TCP connections, one for each backend. + var conn1 net.Conn + require.Eventually(t, func() bool { + var err error + conn1, err = net.Dial("tcp", gwaddr) + if err != nil { + t.Logf("received error connecting to TCP server: [%s], retrying...", err) + return false + } + return true + }, time.Minute*5, time.Second) + conn2, err := net.Dial("tcp", gwaddr) + require.NoError(t, err) + conn3, err := net.Dial("tcp", gwaddr) + require.NoError(t, err) + conns := []net.Conn{conn1, conn2, conn3} + + // Run it twice to verify that we load balance in a round-robin fashion. + for c := 0; c < 2; c++ { + // We can't do names := tcpServerNames because we overwrite this in the loop later. + var names []string + names = append(names, tcpServerNames...) + + for _, conn := range conns { + response := writeAndReadTCP(t, conn) + split := strings.Split(response, ":") + require.Len(t, split, 2) + name := split[0] + var removed bool + names, removed = removeName(names, name) + // If no name was removed from the list, it means that the response + // does not contain the name of a known server. + if !removed { + t.Fatalf("received unexpected response from backend: %s", name) + } + } + require.Len(t, names, 0) + } + + t.Log("deleting the TCPRoute and verifying that all TCP connections are closed") + require.NoError(t, gwclient.GatewayV1alpha2().TCPRoutes(corev1.NamespaceDefault).Delete(ctx, tcprouteSampleName, metav1.DeleteOptions{})) + require.Eventually(t, func() bool { + for _, conn := range conns { + _, err := conn.Write([]byte("blahhh\n")) + require.NoError(t, err) + err = conn.SetReadDeadline(time.Now().Add(time.Second * 3)) + require.NoError(t, err) + + reader := bufio.NewReader(conn) + _, err = reader.ReadBytes(byte('\n')) + if err != nil { + if strings.Contains(err.Error(), "i/o timeout") { + continue + } + t.Logf("received unexpected error waiting for TCPRoute to decomission: %s", err) + } + return false + } + return true + }, time.Minute, time.Second) +} + +func removeName(names []string, name string) ([]string, bool) { + for i, v := range names { + if v == name { + names = append(names[:i], names[i+1:]...) + return names, true + } + } + return nil, false +} + +func writeAndReadTCP(t *testing.T, conn net.Conn) string { + t.Helper() + + t.Logf("writing data to TCP connection with backend %s", conn.RemoteAddr().String()) + request := "wazzzaaaa" + _, err := conn.Write([]byte(request + "\n")) + require.NoError(t, err) + + t.Logf("reading data from TCP connection with backend %s", conn.RemoteAddr().String()) + reader := bufio.NewReader(conn) + response, err := reader.ReadBytes(byte('\n')) + require.NoError(t, err) + return string(response) +} From 4acd90b0f452b52d12a96ce8d435ebac04627cf0 Mon Sep 17 00:00:00 2001 From: Sanskar Jaiswal Date: Mon, 4 Dec 2023 16:59:08 +0000 Subject: [PATCH 3/3] ci: disable bpfd integration tests due to change upstream Signed-off-by: Sanskar Jaiswal --- .github/workflows/test.yaml | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index a9bd396a..75550ee9 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -78,14 +78,16 @@ jobs: BLIXT_UDP_SERVER_IMAGE: "ghcr.io/kubernetes-sigs/blixt-udp-test-server" TAG: "integration-tests" - - name: run integration tests with bpfd - run: make test.integration - env: - BLIXT_CONTROLPLANE_IMAGE: "ghcr.io/kubernetes-sigs/blixt-controlplane" - BLIXT_DATAPLANE_IMAGE: "ghcr.io/kubernetes-sigs/blixt-dataplane" - BLIXT_UDP_SERVER_IMAGE: "ghcr.io/kubernetes-sigs/blixt-udp-test-server" - BLIXT_USE_BPFD: true - TAG: "integration-tests" + # temporarily disabled due to upstream changes in bpfman (previously bpfd) + # ref: https://github.com/kubernetes-sigs/blixt/issues/152 + # - name: run integration tests with bpfd + # run: make test.integration + # env: + # BLIXT_CONTROLPLANE_IMAGE: "ghcr.io/kubernetes-sigs/blixt-controlplane" + # BLIXT_DATAPLANE_IMAGE: "ghcr.io/kubernetes-sigs/blixt-dataplane" + # BLIXT_UDP_SERVER_IMAGE: "ghcr.io/kubernetes-sigs/blixt-udp-test-server" + # BLIXT_USE_BPFD: true + # TAG: "integration-tests" ## Upload diagnostics if integration test step failed. - name: upload diagnostics