diff --git a/README.adoc b/README.adoc index c06b750..273ba0e 100644 --- a/README.adoc +++ b/README.adoc @@ -144,6 +144,10 @@ contexts: # optional: nodeSelector to add to the pod nodeSelector: key: value + # optional: affinity to add to the pod + affnity: '{"nodeAffinity": "requiredDuringSchedulingIgnoredDuringExecution": {nodeSelectorTerms: [{"matchExpressions":[{"key":"", "operator":"", "values":[""]}]}]}}' + # optional: tolerations to add to the pod + tolerations: '[{"effect":"","key":"","operator":"","value":""}]' # optional: clientID config (defaults to kafkactl-{username}) clientID: my-client-id diff --git a/internal/common-operation.go b/internal/common-operation.go index 9d264f9..82efa8e 100644 --- a/internal/common-operation.go +++ b/internal/common-operation.go @@ -3,6 +3,7 @@ package internal import ( "crypto/tls" "crypto/x509" + "encoding/json" "fmt" "net/http" "os" @@ -74,6 +75,8 @@ type K8sConfig struct { Labels map[string]string Annotations map[string]string NodeSelector map[string]string + Affinity map[string]any + Tolerations []map[string]any } type ConsumerConfig struct { @@ -174,6 +177,14 @@ func CreateClientContext() (ClientContext, error) { context.Kubernetes.Labels = viper.GetStringMapString("contexts." + context.Name + ".kubernetes.labels") context.Kubernetes.Annotations = viper.GetStringMapString("contexts." + context.Name + ".kubernetes.annotations") context.Kubernetes.NodeSelector = viper.GetStringMapString("contexts." + context.Name + ".kubernetes.nodeSelector") + context.Kubernetes.Affinity = viper.GetStringMap("contexts." + context.Name + ".kubernetes.affinity") + + var tolerations []map[string]any + err := json.Unmarshal([]byte(viper.GetString("contexts."+context.Name+".kubernetes.tolerations")), &tolerations) + if err != nil { + return context, err + } + context.Kubernetes.Tolerations = tolerations return context, nil } diff --git a/internal/k8s/executor.go b/internal/k8s/executor.go index 87aef8f..5e49490 100644 --- a/internal/k8s/executor.go +++ b/internal/k8s/executor.go @@ -37,6 +37,8 @@ type executor struct { labels map[string]string annotations map[string]string nodeSelector map[string]string + affinity map[string]any + tolerations []map[string]any } const letterBytes = "abcdefghijklmnpqrstuvwxyz123456789" @@ -111,6 +113,8 @@ func newExecutor(context internal.ClientContext, runner Runner) *executor { labels: context.Kubernetes.Labels, annotations: context.Kubernetes.Annotations, nodeSelector: context.Kubernetes.NodeSelector, + affinity: context.Kubernetes.Affinity, + tolerations: context.Kubernetes.Tolerations, runner: runner, } } diff --git a/internal/k8s/pod_overrides.go b/internal/k8s/pod_overrides.go index 425b30e..1ecce65 100644 --- a/internal/k8s/pod_overrides.go +++ b/internal/k8s/pod_overrides.go @@ -13,6 +13,8 @@ type specType struct { ImagePullSecrets []imagePullSecretType `json:"imagePullSecrets,omitempty"` ServiceAccountName *string `json:"serviceAccountName,omitempty"` NodeSelector *map[string]string `json:"nodeSelector,omitempty"` + Affinity *map[string]any `json:"affinity,omitempty"` + Tolerations *[]map[string]any `json:"tolerations,omitempty"` } type PodOverrideType struct { @@ -29,7 +31,7 @@ func (kubectl *executor) createPodOverride() PodOverrideType { var override PodOverrideType override.APIVersion = "v1" - if kubectl.serviceAccount != "" || kubectl.imagePullSecret != "" || len(kubectl.nodeSelector) > 0 { + if kubectl.serviceAccount != "" || kubectl.imagePullSecret != "" || len(kubectl.nodeSelector) > 0 || len(kubectl.affinity) > 0 || len(kubectl.tolerations) > 0 { override.Spec = &specType{} if kubectl.serviceAccount != "" { @@ -44,6 +46,14 @@ func (kubectl *executor) createPodOverride() PodOverrideType { if len(kubectl.nodeSelector) > 0 { override.Spec.NodeSelector = &kubectl.nodeSelector } + + if len(kubectl.affinity) > 0 { + override.Spec.Affinity = &kubectl.affinity + } + + if len(kubectl.tolerations) > 0 { + override.Spec.Tolerations = &kubectl.tolerations + } } if len(kubectl.labels) > 0 || len(kubectl.annotations) > 0 {