Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add basic SASL and TLS support for Kafka cloud events
Browse files Browse the repository at this point in the history
Sovietaced committed Oct 5, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 9abfbda commit 6c3cfe6
Showing 10 changed files with 78 additions and 13 deletions.
1 change: 1 addition & 0 deletions charts/flyte-core/README.md
Original file line number Diff line number Diff line change
@@ -59,6 +59,7 @@ helm install gateway bitnami/contour -n flyte
| cloud_events.enable | bool | `false` | |
| cloud_events.eventsPublisher.eventTypes[0] | string | `"all"` | |
| cloud_events.eventsPublisher.topicName | string | `"arn:aws:sns:us-east-2:123456:123-my-topic"` | |
| cloud_events.secretName | string | `""` | The name of the secret to use to alternatively load in cloud events configuration via a secret. Useful when the configuration contains secrets. |
| cloud_events.type | string | `"aws"` | |
| cluster_resource_manager | object | `{"config":{"cluster_resources":{"customData":[{"production":[{"projectQuotaCpu":{"value":"5"}},{"projectQuotaMemory":{"value":"4000Mi"}}]},{"staging":[{"projectQuotaCpu":{"value":"2"}},{"projectQuotaMemory":{"value":"3000Mi"}}]},{"development":[{"projectQuotaCpu":{"value":"4"}},{"projectQuotaMemory":{"value":"3000Mi"}}]}],"refreshInterval":"5m","standaloneDeployment":false,"templatePath":"/etc/flyte/clusterresource/templates"}},"enabled":true,"nodeSelector":{},"podAnnotations":{},"podEnv":{},"podLabels":{},"prometheus":{"enabled":false,"path":"/metrics","port":10254},"resources":{},"service_account_name":"flyteadmin","standaloneDeployment":false,"templates":[{"key":"aa_namespace","value":"apiVersion: v1\nkind: Namespace\nmetadata:\n name: {{ namespace }}\nspec:\n finalizers:\n - kubernetes\n"},{"key":"ab_project_resource_quota","value":"apiVersion: v1\nkind: ResourceQuota\nmetadata:\n name: project-quota\n namespace: {{ namespace }}\nspec:\n hard:\n limits.cpu: {{ projectQuotaCpu }}\n limits.memory: {{ projectQuotaMemory }}\n"}]}` | Configuration for the Cluster resource manager component. This is an optional component, that enables automatic cluster configuration. This is useful to set default quotas, manage namespaces etc that map to a project/domain |
| cluster_resource_manager.config | object | `{"cluster_resources":{"customData":[{"production":[{"projectQuotaCpu":{"value":"5"}},{"projectQuotaMemory":{"value":"4000Mi"}}]},{"staging":[{"projectQuotaCpu":{"value":"2"}},{"projectQuotaMemory":{"value":"3000Mi"}}]},{"development":[{"projectQuotaCpu":{"value":"4"}},{"projectQuotaMemory":{"value":"3000Mi"}}]}],"refreshInterval":"5m","standaloneDeployment":false,"templatePath":"/etc/flyte/clusterresource/templates"}}` | Configmap for ClusterResource parameters |
2 changes: 1 addition & 1 deletion charts/flyte-core/templates/admin/configmap.yaml
Original file line number Diff line number Diff line change
@@ -79,7 +79,7 @@ data:
externalEvents: {{ tpl (toYaml .) $ | nindent 6 }}
{{- end }}
{{- end }}
{{- if .Values.cloud_events.enable }}
{{- if and .Values.cloud_events.enable (not .Values.cloud_events.secretName) }}
{{- with .Values.cloud_events }}
cloud_events.yaml: |
cloudEvents: {{ tpl (toYaml .) $ | nindent 6 }}
4 changes: 4 additions & 0 deletions charts/flyte-core/templates/admin/deployment.yaml
Original file line number Diff line number Diff line change
@@ -196,6 +196,10 @@ spec:
name: flyte-admin-base-config
- configMap:
name: flyte-admin-clusters-config
{{- if .Values.cloud_events.secretName }}
- secret:
name: {{ .Values.cloud_events.secretName }}
{{- end }}
name: clusters-config-volume
{{- if .Values.cluster_resource_manager.enabled }}
- configMap:
3 changes: 3 additions & 0 deletions charts/flyte-core/values.yaml
Original file line number Diff line number Diff line change
@@ -943,6 +943,9 @@ external_events:
# Cloud events are used to send events (unprocessed, as Admin see them) in cloud event format to
# an SNS topic (or gcp equivalent)
cloud_events:
# -- The name of the secret to use to alternatively load in cloud events configuration via a secret. Useful when the
# configuration contains secrets.
secretName: ""
enable: false
type: aws
aws:
4 changes: 2 additions & 2 deletions docker/sandbox-bundled/manifests/complete-agent.yaml
Original file line number Diff line number Diff line change
@@ -816,7 +816,7 @@ type: Opaque
---
apiVersion: v1
data:
haSharedSecret: cmRzbzQ4N3RQaWhuMk00OA==
haSharedSecret: ak5wVTFQVjRHMm5ZanVNUQ==
proxyPassword: ""
proxyUsername: ""
kind: Secret
@@ -1413,7 +1413,7 @@ spec:
metadata:
annotations:
checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81
checksum/secret: 51528951e92c2bf712bbde990941593aae1fcf72144a1fe944c312ddad86e161
checksum/secret: db4b259a37cc362add2a4fd4c52954eabc67e69e2c399a292605415c70da4a2b
labels:
app: docker-registry
release: flyte-sandbox
4 changes: 2 additions & 2 deletions docker/sandbox-bundled/manifests/complete.yaml
Original file line number Diff line number Diff line change
@@ -798,7 +798,7 @@ type: Opaque
---
apiVersion: v1
data:
haSharedSecret: T1I2Q2tTcmREVG15MldGUQ==
haSharedSecret: RGZkSmNtV3k4dDZYd0pHVw==
proxyPassword: ""
proxyUsername: ""
kind: Secret
@@ -1362,7 +1362,7 @@ spec:
metadata:
annotations:
checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81
checksum/secret: d723e395edc0fd2f221b9088efffe0d1f4dfabdef9892065fdabe12233362cf5
checksum/secret: d0a1f670be47a94b928141eae8a50733a775d074aee3a78db555b0728c90718e
labels:
app: docker-registry
release: flyte-sandbox
4 changes: 2 additions & 2 deletions docker/sandbox-bundled/manifests/dev.yaml
Original file line number Diff line number Diff line change
@@ -499,7 +499,7 @@ metadata:
---
apiVersion: v1
data:
haSharedSecret: ZnltNHNiZ01NRFNkb1RlMA==
haSharedSecret: amliZ0l4QXczU0ZjUUloWQ==
proxyPassword: ""
proxyUsername: ""
kind: Secret
@@ -934,7 +934,7 @@ spec:
metadata:
annotations:
checksum/config: 8f50e768255a87f078ba8b9879a0c174c3e045ffb46ac8723d2eedbe293c8d81
checksum/secret: eeab364c20a0e8ad5a1526ccd7ddbd1d5a442087e7267c4d761279102b81be21
checksum/secret: 4c93d218f3a1654f7eb3a238a4b1f57fbfda80cd8e5c4aaf5a9286a93f4a94f2
labels:
app: docker-registry
release: flyte-sandbox
2 changes: 2 additions & 0 deletions flyteadmin/.golangci.yml
Original file line number Diff line number Diff line change
@@ -39,3 +39,5 @@ issues:
exclude-rules:
- path: pkg/workflowengine/impl/prepare_execution.go
text: "copies lock"
- path: pkg/runtime/interfaces/application_configuration.go
text: "G402: TLS InsecureSkipVerify may be true."
7 changes: 1 addition & 6 deletions flyteadmin/pkg/async/cloudevent/factory.go
Original file line number Diff line number Diff line change
@@ -73,12 +73,7 @@ func NewCloudEventsPublisher(ctx context.Context, db repositoryInterfaces.Reposi

case cloudEventImplementations.Kafka:
saramaConfig := sarama.NewConfig()
var err error
saramaConfig.Version, err = sarama.ParseKafkaVersion(cloudEventsConfig.KafkaConfig.Version)
if err != nil {
logger.Fatalf(ctx, "failed to parse kafka version, %v", err)
panic(err)
}
cloudEventsConfig.KafkaConfig.UpdateSaramaConfig(ctx, saramaConfig)
kafkaSender, err := kafka_sarama.NewSender(cloudEventsConfig.KafkaConfig.Brokers, saramaConfig, cloudEventsConfig.EventsPublisherConfig.TopicName)
if err != nil {
panic(err)
60 changes: 60 additions & 0 deletions flyteadmin/pkg/runtime/interfaces/application_configuration.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package interfaces

import (
"context"
"crypto/tls"

"github.com/Shopify/sarama"
"github.com/golang/protobuf/ptypes/wrappers"
"golang.org/x/time/rate"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/config"
"github.com/flyteorg/flyte/flytestdlib/database"
"github.com/flyteorg/flyte/flytestdlib/logger"
)

// DbConfig is used to for initiating the database connection with the store that holds registered
@@ -231,11 +236,66 @@ type GCPConfig struct {
ProjectID string `json:"projectId"`
}

type SASLConfig struct {
Enabled bool `json:"enabled"`
User string `json:"user"`
Password string `json:"password"`
Handshake bool `json:"handshake"`
Mechanism sarama.SASLMechanism `json:"mechanism"`
}

type TLSConfig struct {
Enabled bool `json:"enabled"`
InsecureSkipVerify bool `json:"insecureSkipVerify"`
CertPath string `json:"certPath"`
KeyPath string `json:"keyPath"`
}

type KafkaConfig struct {
// The version of Kafka, e.g. 2.1.0, 0.8.2.0
Version string `json:"version"`
// kafka broker addresses
Brokers []string `json:"brokers"`
// sasl config
SASLConfig SASLConfig `json:"sasl_config"`
// tls config
TLSConfig TLSConfig `json:"tls_config"`
}

func (k KafkaConfig) UpdateSaramaConfig(ctx context.Context, s *sarama.Config) {
var err error
s.Version, err = sarama.ParseKafkaVersion(k.Version)
if err != nil {
logger.Fatalf(ctx, "failed to parse kafka version, %v", err)
panic(err)

Check warning on line 270 in flyteadmin/pkg/runtime/interfaces/application_configuration.go

Codecov / codecov/patch

flyteadmin/pkg/runtime/interfaces/application_configuration.go#L269-L270

Added lines #L269 - L270 were not covered by tests
}

if k.SASLConfig.Enabled {
s.Net.SASL.Enable = true
s.Net.SASL.User = k.SASLConfig.User
s.Net.SASL.Password = k.SASLConfig.Password
s.Net.SASL.Handshake = k.SASLConfig.Handshake

Check warning on line 277 in flyteadmin/pkg/runtime/interfaces/application_configuration.go

Codecov / codecov/patch

flyteadmin/pkg/runtime/interfaces/application_configuration.go#L274-L277

Added lines #L274 - L277 were not covered by tests

if k.SASLConfig.Mechanism == "" {
k.SASLConfig.Mechanism = sarama.SASLTypePlaintext

Check warning on line 280 in flyteadmin/pkg/runtime/interfaces/application_configuration.go

Codecov / codecov/patch

flyteadmin/pkg/runtime/interfaces/application_configuration.go#L279-L280

Added lines #L279 - L280 were not covered by tests
}
s.Net.SASL.Mechanism = k.SASLConfig.Mechanism

Check warning on line 282 in flyteadmin/pkg/runtime/interfaces/application_configuration.go

Codecov / codecov/patch

flyteadmin/pkg/runtime/interfaces/application_configuration.go#L282

Added line #L282 was not covered by tests
}

if k.TLSConfig.Enabled {
s.Net.TLS.Enable = true
s.Net.TLS.Config = &tls.Config{
InsecureSkipVerify: k.TLSConfig.InsecureSkipVerify,

Check warning on line 288 in flyteadmin/pkg/runtime/interfaces/application_configuration.go

Codecov / codecov/patch

flyteadmin/pkg/runtime/interfaces/application_configuration.go#L286-L288

Added lines #L286 - L288 were not covered by tests
}
if k.TLSConfig.KeyPath != "" && k.TLSConfig.CertPath != "" {
cert, err := tls.LoadX509KeyPair(k.TLSConfig.CertPath, k.TLSConfig.KeyPath)
if err != nil {
logger.Fatalf(ctx, "failed to load kafka client keypair: %v", err)
panic(err)

Check warning on line 294 in flyteadmin/pkg/runtime/interfaces/application_configuration.go

Codecov / codecov/patch

flyteadmin/pkg/runtime/interfaces/application_configuration.go#L290-L294

Added lines #L290 - L294 were not covered by tests
}
s.Net.TLS.Config.Certificates = []tls.Certificate{cert}

Check warning on line 296 in flyteadmin/pkg/runtime/interfaces/application_configuration.go

Codecov / codecov/patch

flyteadmin/pkg/runtime/interfaces/application_configuration.go#L296

Added line #L296 was not covered by tests
}
}
}

// This section holds configuration for the event scheduler used to schedule workflow executions.

0 comments on commit 6c3cfe6

Please sign in to comment.