From 9d23043faf1f3b34ced51e0ce555cc28a87cf202 Mon Sep 17 00:00:00 2001 From: Derek Wang Date: Sat, 10 Oct 2020 20:59:32 -0700 Subject: [PATCH] feat: kafka multiple-broker support. Closes #892 (#894) --- api/event-source.html | 2 +- api/event-source.md | 2 +- api/openapi-spec/swagger.json | 2 +- eventsources/sources/kafka/start.go | 7 +++++-- pkg/apis/eventsource/v1alpha1/generated.proto | 2 +- pkg/apis/eventsource/v1alpha1/openapi_generated.go | 2 +- pkg/apis/eventsource/v1alpha1/types.go | 2 +- 7 files changed, 11 insertions(+), 8 deletions(-) diff --git a/api/event-source.html b/api/event-source.html index 7bd6915eccd7..ba6282cab40f 100644 --- a/api/event-source.html +++ b/api/event-source.html @@ -1895,7 +1895,7 @@

KafkaEventSource -

URL to kafka cluster

+

URL to kafka cluster, multiple URLs separated by comma

diff --git a/api/event-source.md b/api/event-source.md index 24bd29be2999..3496acaba9b4 100644 --- a/api/event-source.md +++ b/api/event-source.md @@ -3607,7 +3607,7 @@ Description

-URL to kafka cluster +URL to kafka cluster, multiple URLs separated by comma

diff --git a/api/openapi-spec/swagger.json b/api/openapi-spec/swagger.json index 6f00bdc16250..e7a22ad3464c 100644 --- a/api/openapi-spec/swagger.json +++ b/api/openapi-spec/swagger.json @@ -1152,7 +1152,7 @@ "type": "string" }, "url": { - "description": "URL to kafka cluster", + "description": "URL to kafka cluster, multiple URLs separated by comma", "type": "string" }, "version": { diff --git a/eventsources/sources/kafka/start.go b/eventsources/sources/kafka/start.go index 250ac3d6268b..01f9680f5a6e 100644 --- a/eventsources/sources/kafka/start.go +++ b/eventsources/sources/kafka/start.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "strconv" + "strings" "sync" "time" @@ -106,7 +107,8 @@ func (listener *EventListener) consumerGroupConsumer(ctx context.Context, log *z kafkaEventSource: kafkaEventSource, } - client, err := sarama.NewConsumerGroup([]string{kafkaEventSource.URL}, kafkaEventSource.ConsumerGroup.GroupName, config) + urls := strings.Split(kafkaEventSource.URL, ",") + client, err := sarama.NewConsumerGroup(urls, kafkaEventSource.ConsumerGroup.GroupName, config) if err != nil { log.Errorf("Error creating consumer group client: %v", err) return err @@ -163,7 +165,8 @@ func (el *EventListener) partitionConsumer(ctx context.Context, log *zap.Sugared return err } - consumer, err = sarama.NewConsumer([]string{kafkaEventSource.URL}, config) + urls := strings.Split(kafkaEventSource.URL, ",") + consumer, err = sarama.NewConsumer(urls, config) if err != nil { return err } diff --git a/pkg/apis/eventsource/v1alpha1/generated.proto b/pkg/apis/eventsource/v1alpha1/generated.proto index 8d0e9f02a39f..b2d9d33e2241 100644 --- a/pkg/apis/eventsource/v1alpha1/generated.proto +++ b/pkg/apis/eventsource/v1alpha1/generated.proto @@ -427,7 +427,7 @@ message KafkaConsumerGroup { // KafkaEventSource refers to event-source for Kafka related events message KafkaEventSource { - // URL to kafka cluster + // URL to kafka cluster, multiple URLs separated by comma optional string url = 1; // Partition name diff --git a/pkg/apis/eventsource/v1alpha1/openapi_generated.go b/pkg/apis/eventsource/v1alpha1/openapi_generated.go index 4b10a7aad33e..a779a0c7b06e 100644 --- a/pkg/apis/eventsource/v1alpha1/openapi_generated.go +++ b/pkg/apis/eventsource/v1alpha1/openapi_generated.go @@ -1307,7 +1307,7 @@ func schema_pkg_apis_eventsource_v1alpha1_KafkaEventSource(ref common.ReferenceC Properties: map[string]spec.Schema{ "url": { SchemaProps: spec.SchemaProps{ - Description: "URL to kafka cluster", + Description: "URL to kafka cluster, multiple URLs separated by comma", Type: []string{"string"}, Format: "", }, diff --git a/pkg/apis/eventsource/v1alpha1/types.go b/pkg/apis/eventsource/v1alpha1/types.go index 77104c85436a..8654de4643ce 100644 --- a/pkg/apis/eventsource/v1alpha1/types.go +++ b/pkg/apis/eventsource/v1alpha1/types.go @@ -288,7 +288,7 @@ type AMQPEventSource struct { // KafkaEventSource refers to event-source for Kafka related events type KafkaEventSource struct { - // URL to kafka cluster + // URL to kafka cluster, multiple URLs separated by comma URL string `json:"url" protobuf:"bytes,1,opt,name=url"` // Partition name Partition string `json:"partition" protobuf:"bytes,2,opt,name=partition"`