Skip to content

Commit

Permalink
Use NLB on AWS (#657)
Browse files Browse the repository at this point in the history
* Use NLB on AWS

* made conditional on cloud value
  • Loading branch information
nacardin authored Jan 10, 2021
1 parent 57cbadf commit 51fb4b5
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 28 deletions.
34 changes: 17 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions k8-util/helm/fluvio-app/templates/sc-public.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ apiVersion: v1
kind: Service
metadata:
name: fluvio-sc-public
annotations:
{{ if eq .Values.cloud "aws" }}
service.beta.kubernetes.io/aws-load-balancer-type: "nlb"
{{ end }}
spec:
type: LoadBalancer
externalTrafficPolicy: Local
Expand Down
8 changes: 7 additions & 1 deletion k8-util/helm/fluvio-app/templates/spu-k8-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,10 @@ metadata:
name: spu-k8
data:
image: {{ .Values.image.registry }}/fluvio:{{ .Values.image.tag | default .Chart.Version }}
resources: {{ .Values.spuResources | toJson | quote }}
resources: {{ .Values.spuResources | toJson | quote }}
{{ if eq .Values.cloud "aws" }}
lbServiceAnnotations: |+
{
"service.beta.kubernetes.io/aws-load-balancer-type": "nlb"
}
{{ end }}
8 changes: 4 additions & 4 deletions src/sc/src/k8/operator/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub fn convert_cluster_to_statefulset(
group_name: &str,
group_svc_name: String,
namespace: &str,
spu_k8_config: SpuK8Config,
spu_k8_config: &SpuK8Config,
tls: Option<&TlsConfig>,
) -> InputK8Obj<StatefulSetSpec> {
let statefulset_name = format!("fluvio-spg-{}", group_name);
Expand Down Expand Up @@ -61,7 +61,7 @@ fn generate_stateful(
name: &str,
group_svc_name: String,
namespace: &str,
spu_k8_config: SpuK8Config,
spu_k8_config: &SpuK8Config,
tls_config: Option<&TlsConfig>,
) -> StatefulSetSpec {
let replicas = spg_spec.replicas;
Expand Down Expand Up @@ -179,8 +179,8 @@ fn generate_stateful(
termination_grace_period_seconds: Some(10),
containers: vec![ContainerSpec {
name: SPU_DEFAULT_NAME.to_owned(),
image: Some(spu_k8_config.image),
resources: Some(spu_k8_config.resources),
image: Some(spu_k8_config.image.clone()),
resources: Some(spu_k8_config.resources.clone()),
ports: vec![public_port, private_port],
volume_mounts,
env,
Expand Down
23 changes: 18 additions & 5 deletions src/sc/src/k8/operator/spg_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ impl SpgOperator {

let spg_spec = &spu_group.spec;

let spu_k8_config = SpuK8Config::load(&self.client, &self.namespace).await?;

// ensure we don't have conflict with existing spu group
if let Some(conflict_id) = spu_group.is_conflict_with(&self.spu_store).await {
warn!(conflict_id, "spg is in conflict with existing id");
Expand All @@ -148,7 +150,13 @@ impl SpgOperator {
{
Ok(svc_name) => {
if let Err(err) = self
.apply_stateful_set(&spu_group, spg_spec, &spg_name, svc_name)
.apply_stateful_set(
&spu_group,
spg_spec,
&spg_name,
svc_name,
&spu_k8_config,
)
.await
{
error!("error applying stateful sets: {}", err);
Expand All @@ -159,7 +167,10 @@ impl SpgOperator {
}
}

if let Err(err) = self.apply_spus(&spu_group, spg_spec, &spg_name).await {
if let Err(err) = self
.apply_spus(&spu_group, spg_spec, &spg_name, &spu_k8_config)
.await
{
error!("error applying spus: {}", err);
}
}
Expand All @@ -178,9 +189,8 @@ impl SpgOperator {
spg_spec: &K8SpuGroupSpec,
spg_name: &str,
spg_svc_name: String,
spu_k8_config: &SpuK8Config,
) -> Result<(), ClientError> {
let spu_k8_config = SpuK8Config::load(&self.client, &self.namespace).await?;

let input_stateful = convert_cluster_to_statefulset(
spg_spec,
&spu_group.metadata,
Expand Down Expand Up @@ -209,6 +219,7 @@ impl SpgOperator {
spg_obj: &SpuGroupObj,
spg_spec: &K8SpuGroupSpec,
spg_name: &str,
spu_k8_config: &SpuK8Config,
) -> Result<(), ClientError> {
let replicas = spg_spec.replicas;

Expand All @@ -228,7 +239,7 @@ impl SpgOperator {
.await;

if let Err(err) = self
.apply_spu_load_balancers(spg_obj, spg_spec, &spu_name)
.apply_spu_load_balancers(spg_obj, spg_spec, &spu_name, &spu_k8_config)
.await
{
error!("error trying to create load balancer for spu: {}", err);
Expand Down Expand Up @@ -318,6 +329,7 @@ impl SpgOperator {
spg_obj: &SpuGroupObj,
spg_spec: &K8SpuGroupSpec,
spu_name: &str,
spu_k8_config: &SpuK8Config,
) -> Result<ApplyResult<ServiceSpec>, ClientError> {
let metadata = &spg_obj.metadata;

Expand Down Expand Up @@ -354,6 +366,7 @@ impl SpgOperator {
name: svc_name,
namespace: metadata.namespace().to_string(),
owner_references: vec![owner_ref],
annotations: spu_k8_config.lb_service_annotations.clone(),
..Default::default()
}
.set_labels(vec![("fluvio.io/spu-name", spu_name)]),
Expand Down
16 changes: 15 additions & 1 deletion src/sc/src/k8/operator/spu_k8_config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

use k8_client::{ClientError, SharedK8Client};
use k8_metadata_client::MetadataClient;
use tracing::debug;
Expand All @@ -12,6 +14,7 @@ const CONFIG_MAP_NAME: &str = "spu-k8";
pub struct SpuK8Config {
pub image: String,
pub resources: ResourceRequirements,
pub lb_service_annotations: HashMap<String, String>,
}

impl SpuK8Config {
Expand All @@ -32,6 +35,17 @@ impl SpuK8Config {

let resources = serde_json::from_str(&resources_string)?;

Ok(Self { image, resources })
let lb_service_annotations =
if let Some(lb_service_annotations) = data.remove("lbServiceAnnotations") {
serde_json::from_str(&lb_service_annotations)?
} else {
HashMap::new()
};

Ok(Self {
image,
resources,
lb_service_annotations,
})
}
}

0 comments on commit 51fb4b5

Please sign in to comment.