Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tcproute: add support for round-robin load balancing #148

Merged
merged 3 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,16 @@ jobs:
BLIXT_UDP_SERVER_IMAGE: "ghcr.io/kubernetes-sigs/blixt-udp-test-server"
TAG: "integration-tests"

- name: run integration tests with bpfd
run: make test.integration
env:
BLIXT_CONTROLPLANE_IMAGE: "ghcr.io/kubernetes-sigs/blixt-controlplane"
BLIXT_DATAPLANE_IMAGE: "ghcr.io/kubernetes-sigs/blixt-dataplane"
BLIXT_UDP_SERVER_IMAGE: "ghcr.io/kubernetes-sigs/blixt-udp-test-server"
BLIXT_USE_BPFD: true
TAG: "integration-tests"
# temporarily disabled due to upstream changes in bpfman (previously bpfd)
# ref: https://github.com/kubernetes-sigs/blixt/issues/152
# - name: run integration tests with bpfd
# run: make test.integration
# env:
# BLIXT_CONTROLPLANE_IMAGE: "ghcr.io/kubernetes-sigs/blixt-controlplane"
# BLIXT_DATAPLANE_IMAGE: "ghcr.io/kubernetes-sigs/blixt-dataplane"
# BLIXT_UDP_SERVER_IMAGE: "ghcr.io/kubernetes-sigs/blixt-udp-test-server"
# BLIXT_USE_BPFD: true
# TAG: "integration-tests"

## Upload diagnostics if integration test step failed.
- name: upload diagnostics
Expand Down
6 changes: 2 additions & 4 deletions config/samples/tcproute/server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ spec:
spec:
containers:
- name: server
image: ghcr.io/shaneutt/malutki
image: istio/tcp-echo-server:1.1
imagePullPolicy: IfNotPresent
env:
- name: LISTEN_PORT
value: "8080"
args: [ "8080", "blixt-tcproute-sample:" ]
ports:
- containerPort: 8080
protocol: TCP
Expand Down
7 changes: 7 additions & 0 deletions config/tests/tcproute-rr/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- ../../samples/tcproute
- server.yaml
patches:
- path: patch.yaml
17 changes: 17 additions & 0 deletions config/tests/tcproute-rr/patch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
apiVersion: gateway.networking.k8s.io/v1alpha2
kind: TCPRoute
metadata:
name: blixt-tcproute-sample
spec:
parentRefs:
- name: blixt-tcproute-sample
port: 8080
rules:
- backendRefs:
- name: blixt-tcproute-sample
port: 8080
- backendRefs:
- name: tcproute-rr-v1
port: 8080
- name: tcproute-rr-v2
port: 8080
76 changes: 76 additions & 0 deletions config/tests/tcproute-rr/server.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: tcproute-rr-v1
labels:
app: tcproute-rr-v1
spec:
selector:
matchLabels:
app: tcproute-rr-v1
template:
metadata:
labels:
app: tcproute-rr-v1
spec:
containers:
- name: tcp-echo
image: istio/tcp-echo-server:1.1
imagePullPolicy: IfNotPresent
args: [ "8080", "tcproute-rr-v1:" ]
ports:
- containerPort: 8080
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: tcproute-rr-v2
labels:
app: tcproute-rr-v2
spec:
selector:
matchLabels:
app: tcproute-rr-v2
template:
metadata:
labels:
app: tcproute-rr-v2
spec:
containers:
- name: tcp-echo
image: istio/tcp-echo-server:1.1
imagePullPolicy: IfNotPresent
args: [ "8080", "tcproute-rr-v2:" ]
ports:
- containerPort: 8080
---
apiVersion: v1
kind: Service
metadata:
labels:
app: tcproute-rr-v1
name: tcproute-rr-v1
spec:
ports:
- name: tcp
port: 8080
protocol: TCP
selector:
app: tcproute-rr-v1
type: ClusterIP
---
apiVersion: v1
kind: Service
metadata:
labels:
app: tcproute-rr-v2
name: tcproute-rr-v2
spec:
ports:
- name: tcp
port: 8080
protocol: TCP
selector:
app: tcproute-rr-v2
type: ClusterIP
136 changes: 89 additions & 47 deletions dataplane/api-server/src/backends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ pub struct InterfaceIndexConfirmation {
/// Generated client implementations.
pub mod backends_client {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
use tonic::codegen::http::Uri;
use tonic::codegen::*;
use tonic::codegen::http::Uri;
#[derive(Debug, Clone)]
pub struct BackendsClient<T> {
inner: tonic::client::Grpc<T>,
Expand Down Expand Up @@ -90,8 +90,9 @@ pub mod backends_client {
<T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
>,
>,
<T as tonic::codegen::Service<http::Request<tonic::body::BoxBody>>>::Error:
Into<StdError> + Send + Sync,
<T as tonic::codegen::Service<
http::Request<tonic::body::BoxBody>,
>>::Error: Into<StdError> + Send + Sync,
{
BackendsClient::new(InterceptedService::new(inner, interceptor))
}
Expand Down Expand Up @@ -129,16 +130,23 @@ pub mod backends_client {
pub async fn get_interface_index(
&mut self,
request: impl tonic::IntoRequest<super::PodIp>,
) -> std::result::Result<tonic::Response<super::InterfaceIndexConfirmation>, tonic::Status>
{
self.inner.ready().await.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
) -> std::result::Result<
tonic::Response<super::InterfaceIndexConfirmation>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static("/backends.backends/GetInterfaceIndex");
let path = http::uri::PathAndQuery::from_static(
"/backends.backends/GetInterfaceIndex",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("backends.backends", "GetInterfaceIndex"));
Expand All @@ -148,34 +156,38 @@ pub mod backends_client {
&mut self,
request: impl tonic::IntoRequest<super::Targets>,
) -> std::result::Result<tonic::Response<super::Confirmation>, tonic::Status> {
self.inner.ready().await.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static("/backends.backends/Update");
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("backends.backends", "Update"));
req.extensions_mut().insert(GrpcMethod::new("backends.backends", "Update"));
self.inner.unary(req, path, codec).await
}
pub async fn delete(
&mut self,
request: impl tonic::IntoRequest<super::Vip>,
) -> std::result::Result<tonic::Response<super::Confirmation>, tonic::Status> {
self.inner.ready().await.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static("/backends.backends/Delete");
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("backends.backends", "Delete"));
req.extensions_mut().insert(GrpcMethod::new("backends.backends", "Delete"));
self.inner.unary(req, path, codec).await
}
}
Expand All @@ -190,7 +202,10 @@ pub mod backends_server {
async fn get_interface_index(
&self,
request: tonic::Request<super::PodIp>,
) -> std::result::Result<tonic::Response<super::InterfaceIndexConfirmation>, tonic::Status>;
) -> std::result::Result<
tonic::Response<super::InterfaceIndexConfirmation>,
tonic::Status,
>;
async fn update(
&self,
request: tonic::Request<super::Targets>,
Expand Down Expand Up @@ -223,7 +238,10 @@ pub mod backends_server {
max_encoding_message_size: None,
}
}
pub fn with_interceptor<F>(inner: T, interceptor: F) -> InterceptedService<Self, F>
pub fn with_interceptor<F>(
inner: T,
interceptor: F,
) -> InterceptedService<Self, F>
where
F: tonic::service::Interceptor,
{
Expand Down Expand Up @@ -279,12 +297,21 @@ pub mod backends_server {
"/backends.backends/GetInterfaceIndex" => {
#[allow(non_camel_case_types)]
struct GetInterfaceIndexSvc<T: Backends>(pub Arc<T>);
impl<T: Backends> tonic::server::UnaryService<super::PodIp> for GetInterfaceIndexSvc<T> {
impl<T: Backends> tonic::server::UnaryService<super::PodIp>
for GetInterfaceIndexSvc<T> {
type Response = super::InterfaceIndexConfirmation;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
fn call(&mut self, request: tonic::Request<super::PodIp>) -> Self::Future {
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::PodIp>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move { (*inner).get_interface_index(request).await };
let fut = async move {
(*inner).get_interface_index(request).await
};
Box::pin(fut)
}
}
Expand Down Expand Up @@ -314,9 +341,13 @@ pub mod backends_server {
"/backends.backends/Update" => {
#[allow(non_camel_case_types)]
struct UpdateSvc<T: Backends>(pub Arc<T>);
impl<T: Backends> tonic::server::UnaryService<super::Targets> for UpdateSvc<T> {
impl<T: Backends> tonic::server::UnaryService<super::Targets>
for UpdateSvc<T> {
type Response = super::Confirmation;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::Targets>,
Expand Down Expand Up @@ -352,10 +383,17 @@ pub mod backends_server {
"/backends.backends/Delete" => {
#[allow(non_camel_case_types)]
struct DeleteSvc<T: Backends>(pub Arc<T>);
impl<T: Backends> tonic::server::UnaryService<super::Vip> for DeleteSvc<T> {
impl<T: Backends> tonic::server::UnaryService<super::Vip>
for DeleteSvc<T> {
type Response = super::Confirmation;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
fn call(&mut self, request: tonic::Request<super::Vip>) -> Self::Future {
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::Vip>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move { (*inner).delete(request).await };
Box::pin(fut)
Expand Down Expand Up @@ -384,14 +422,18 @@ pub mod backends_server {
};
Box::pin(fut)
}
_ => Box::pin(async move {
Ok(http::Response::builder()
.status(200)
.header("grpc-status", "12")
.header("content-type", "application/grpc")
.body(empty_body())
.unwrap())
}),
_ => {
Box::pin(async move {
Ok(
http::Response::builder()
.status(200)
.header("grpc-status", "12")
.header("content-type", "application/grpc")
.body(empty_body())
.unwrap(),
)
})
}
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions dataplane/api-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ use aya::maps::{HashMap, MapData};
use tonic::transport::Server;

use backends::backends_server::BackendsServer;
use common::{BackendKey, BackendList};
use common::{BackendKey, BackendList, ClientKey, TCPBackend};

pub async fn start(
addr: Ipv4Addr,
port: u16,
backends_map: HashMap<MapData, BackendKey, BackendList>,
gateway_indexes_map: HashMap<MapData, BackendKey, u16>,
tcp_conns_map: HashMap<MapData, ClientKey, TCPBackend>,
) -> Result<(), Error> {
let server = server::BackendService::new(backends_map, gateway_indexes_map);
let server = server::BackendService::new(backends_map, gateway_indexes_map, tcp_conns_map);
// TODO: mTLS https://github.com/Kong/blixt/issues/50
Server::builder()
.add_service(BackendsServer::new(server))
Expand Down
Loading
Loading