From 51fb4b5611d6d8fd04c5f231441466499d2b6b31 Mon Sep 17 00:00:00 2001 From: Nick Cardin Date: Sat, 9 Jan 2021 20:38:44 -0500 Subject: [PATCH] Use NLB on AWS (#657) * Use NLB on AWS * made conditional on cloud value --- Cargo.lock | 34 +++++++++---------- .../helm/fluvio-app/templates/sc-public.yaml | 4 +++ .../fluvio-app/templates/spu-k8-config.yaml | 8 ++++- src/sc/src/k8/operator/conversion.rs | 8 ++--- src/sc/src/k8/operator/spg_operator.rs | 23 ++++++++++--- src/sc/src/k8/operator/spu_k8_config.rs | 16 ++++++++- 6 files changed, 65 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2f1c4c742d..a2368546c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -188,7 +188,7 @@ dependencies = [ "httparse", "lazy_static", "log", - "pin-project 1.0.3", + "pin-project 1.0.4", ] [[package]] @@ -311,7 +311,7 @@ dependencies = [ "memchr", "num_cpus", "once_cell", - "pin-project-lite 0.2.1", + "pin-project-lite 0.2.3", "pin-utils", "slab", "wasm-bindgen-futures", @@ -1115,7 +1115,7 @@ dependencies = [ "nix 0.17.0", "openssl", "openssl-sys", - "pin-project 1.0.3", + "pin-project 1.0.4", "pin-utils", "thiserror", "tracing", @@ -1315,7 +1315,7 @@ dependencies = [ "futures-util", "log", "once_cell", - "pin-project 1.0.3", + "pin-project 1.0.4", "thiserror", "tokio", "tokio-util", @@ -1573,7 +1573,7 @@ dependencies = [ "futures-io", "memchr", "parking", - "pin-project-lite 0.2.1", + "pin-project-lite 0.2.3", "waker-fn", ] @@ -1616,7 +1616,7 @@ dependencies = [ "futures-sink", "futures-task", "memchr", - "pin-project-lite 0.2.1", + "pin-project-lite 0.2.3", "pin-utils", "proc-macro-hack", "proc-macro-nested", @@ -1840,7 +1840,7 @@ dependencies = [ "httparse", "httpdate", "itoa", - "pin-project 1.0.3", + "pin-project 1.0.4", "tokio", "tower-service", "tracing", @@ -2017,9 +2017,9 @@ dependencies = [ [[package]] name = "k8-obj-metadata" -version = "2.0.1" +version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dca07bd260ad1526a6606e4a6a5de2d26644780adc17951fed0fe2b96a5cbeb1" +checksum = "0eade50cc4869cba545e3d289895d3891a68dd33e2c484b7f124bba6931744d1" dependencies = [ "serde", "serde_json", @@ -2363,11 +2363,11 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a83804639aad6ba65345661744708855f9fbcb71176ea8d28d05aeb11d975e7" +checksum = "95b70b68509f17aa2857863b6fa00bf21fc93674c7a8893de2f469f6aa7ca2f2" dependencies = [ - "pin-project-internal 1.0.3", + "pin-project-internal 1.0.4", ] [[package]] @@ -2383,9 +2383,9 @@ dependencies = [ [[package]] name = "pin-project-internal" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7bcc46b8f73443d15bc1c5fecbb315718491fa9187fa483f0e359323cde8b3a" +checksum = "caa25a6393f22ce819b0f50e0be89287292fda8d425be38ee0ca14c4931d9e71" dependencies = [ "proc-macro2", "quote", @@ -2400,9 +2400,9 @@ checksum = "c917123afa01924fc84bb20c4c03f004d9c38e5127e3c039bbf7f4b9c76a2f6b" [[package]] name = "pin-project-lite" -version = "0.2.1" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e36743d754ccdf9954c2e352ce2d4b106e024c814f6499c2dadff80da9a442d8" +checksum = "ba36e0a6cc5a4c645073f4984f1ed55d09f5857d4de7c14550baa81a39ef5a17" [[package]] name = "pin-utils" @@ -3266,7 +3266,7 @@ checksum = "9f47026cdc4080c07e49b37087de021820269d996f581aac150ef9e5583eefe3" dependencies = [ "cfg-if 1.0.0", "log", - "pin-project-lite 0.2.1", + "pin-project-lite 0.2.3", "tracing-attributes", "tracing-core", ] diff --git a/k8-util/helm/fluvio-app/templates/sc-public.yaml b/k8-util/helm/fluvio-app/templates/sc-public.yaml index a0c2c6de24..6c524676b0 100644 --- a/k8-util/helm/fluvio-app/templates/sc-public.yaml +++ b/k8-util/helm/fluvio-app/templates/sc-public.yaml @@ -2,6 +2,10 @@ apiVersion: v1 kind: Service metadata: name: fluvio-sc-public + annotations: + {{ if eq .Values.cloud "aws" }} + service.beta.kubernetes.io/aws-load-balancer-type: "nlb" + {{ end }} spec: type: LoadBalancer externalTrafficPolicy: Local diff --git a/k8-util/helm/fluvio-app/templates/spu-k8-config.yaml b/k8-util/helm/fluvio-app/templates/spu-k8-config.yaml index e0aed9da4c..6fcdf459f4 100644 --- a/k8-util/helm/fluvio-app/templates/spu-k8-config.yaml +++ b/k8-util/helm/fluvio-app/templates/spu-k8-config.yaml @@ -4,4 +4,10 @@ metadata: name: spu-k8 data: image: {{ .Values.image.registry }}/fluvio:{{ .Values.image.tag | default .Chart.Version }} - resources: {{ .Values.spuResources | toJson | quote }} \ No newline at end of file + resources: {{ .Values.spuResources | toJson | quote }} + {{ if eq .Values.cloud "aws" }} + lbServiceAnnotations: |+ + { + "service.beta.kubernetes.io/aws-load-balancer-type": "nlb" + } + {{ end }} diff --git a/src/sc/src/k8/operator/conversion.rs b/src/sc/src/k8/operator/conversion.rs index 4b1ad4fc55..6a30c96e64 100644 --- a/src/sc/src/k8/operator/conversion.rs +++ b/src/sc/src/k8/operator/conversion.rs @@ -27,7 +27,7 @@ pub fn convert_cluster_to_statefulset( group_name: &str, group_svc_name: String, namespace: &str, - spu_k8_config: SpuK8Config, + spu_k8_config: &SpuK8Config, tls: Option<&TlsConfig>, ) -> InputK8Obj { let statefulset_name = format!("fluvio-spg-{}", group_name); @@ -61,7 +61,7 @@ fn generate_stateful( name: &str, group_svc_name: String, namespace: &str, - spu_k8_config: SpuK8Config, + spu_k8_config: &SpuK8Config, tls_config: Option<&TlsConfig>, ) -> StatefulSetSpec { let replicas = spg_spec.replicas; @@ -179,8 +179,8 @@ fn generate_stateful( termination_grace_period_seconds: Some(10), containers: vec![ContainerSpec { name: SPU_DEFAULT_NAME.to_owned(), - image: Some(spu_k8_config.image), - resources: Some(spu_k8_config.resources), + image: Some(spu_k8_config.image.clone()), + resources: Some(spu_k8_config.resources.clone()), ports: vec![public_port, private_port], volume_mounts, env, diff --git a/src/sc/src/k8/operator/spg_operator.rs b/src/sc/src/k8/operator/spg_operator.rs index f5c1c206d7..062f223348 100644 --- a/src/sc/src/k8/operator/spg_operator.rs +++ b/src/sc/src/k8/operator/spg_operator.rs @@ -124,6 +124,8 @@ impl SpgOperator { let spg_spec = &spu_group.spec; + let spu_k8_config = SpuK8Config::load(&self.client, &self.namespace).await?; + // ensure we don't have conflict with existing spu group if let Some(conflict_id) = spu_group.is_conflict_with(&self.spu_store).await { warn!(conflict_id, "spg is in conflict with existing id"); @@ -148,7 +150,13 @@ impl SpgOperator { { Ok(svc_name) => { if let Err(err) = self - .apply_stateful_set(&spu_group, spg_spec, &spg_name, svc_name) + .apply_stateful_set( + &spu_group, + spg_spec, + &spg_name, + svc_name, + &spu_k8_config, + ) .await { error!("error applying stateful sets: {}", err); @@ -159,7 +167,10 @@ impl SpgOperator { } } - if let Err(err) = self.apply_spus(&spu_group, spg_spec, &spg_name).await { + if let Err(err) = self + .apply_spus(&spu_group, spg_spec, &spg_name, &spu_k8_config) + .await + { error!("error applying spus: {}", err); } } @@ -178,9 +189,8 @@ impl SpgOperator { spg_spec: &K8SpuGroupSpec, spg_name: &str, spg_svc_name: String, + spu_k8_config: &SpuK8Config, ) -> Result<(), ClientError> { - let spu_k8_config = SpuK8Config::load(&self.client, &self.namespace).await?; - let input_stateful = convert_cluster_to_statefulset( spg_spec, &spu_group.metadata, @@ -209,6 +219,7 @@ impl SpgOperator { spg_obj: &SpuGroupObj, spg_spec: &K8SpuGroupSpec, spg_name: &str, + spu_k8_config: &SpuK8Config, ) -> Result<(), ClientError> { let replicas = spg_spec.replicas; @@ -228,7 +239,7 @@ impl SpgOperator { .await; if let Err(err) = self - .apply_spu_load_balancers(spg_obj, spg_spec, &spu_name) + .apply_spu_load_balancers(spg_obj, spg_spec, &spu_name, &spu_k8_config) .await { error!("error trying to create load balancer for spu: {}", err); @@ -318,6 +329,7 @@ impl SpgOperator { spg_obj: &SpuGroupObj, spg_spec: &K8SpuGroupSpec, spu_name: &str, + spu_k8_config: &SpuK8Config, ) -> Result, ClientError> { let metadata = &spg_obj.metadata; @@ -354,6 +366,7 @@ impl SpgOperator { name: svc_name, namespace: metadata.namespace().to_string(), owner_references: vec![owner_ref], + annotations: spu_k8_config.lb_service_annotations.clone(), ..Default::default() } .set_labels(vec![("fluvio.io/spu-name", spu_name)]), diff --git a/src/sc/src/k8/operator/spu_k8_config.rs b/src/sc/src/k8/operator/spu_k8_config.rs index a69a983202..2fae0d419f 100644 --- a/src/sc/src/k8/operator/spu_k8_config.rs +++ b/src/sc/src/k8/operator/spu_k8_config.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use k8_client::{ClientError, SharedK8Client}; use k8_metadata_client::MetadataClient; use tracing::debug; @@ -12,6 +14,7 @@ const CONFIG_MAP_NAME: &str = "spu-k8"; pub struct SpuK8Config { pub image: String, pub resources: ResourceRequirements, + pub lb_service_annotations: HashMap, } impl SpuK8Config { @@ -32,6 +35,17 @@ impl SpuK8Config { let resources = serde_json::from_str(&resources_string)?; - Ok(Self { image, resources }) + let lb_service_annotations = + if let Some(lb_service_annotations) = data.remove("lbServiceAnnotations") { + serde_json::from_str(&lb_service_annotations)? + } else { + HashMap::new() + }; + + Ok(Self { + image, + resources, + lb_service_annotations, + }) } }