Skip to content

Commit

Permalink
feat: kafka multiple-broker support. Closes argoproj#892 (argoproj#894)
Browse files Browse the repository at this point in the history
  • Loading branch information
whynowy authored Oct 11, 2020
1 parent 73539ce commit 9d23043
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 8 deletions.
2 changes: 1 addition & 1 deletion api/event-source.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/event-source.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions eventsources/sources/kafka/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -106,7 +107,8 @@ func (listener *EventListener) consumerGroupConsumer(ctx context.Context, log *z
kafkaEventSource: kafkaEventSource,
}

client, err := sarama.NewConsumerGroup([]string{kafkaEventSource.URL}, kafkaEventSource.ConsumerGroup.GroupName, config)
urls := strings.Split(kafkaEventSource.URL, ",")
client, err := sarama.NewConsumerGroup(urls, kafkaEventSource.ConsumerGroup.GroupName, config)
if err != nil {
log.Errorf("Error creating consumer group client: %v", err)
return err
Expand Down Expand Up @@ -163,7 +165,8 @@ func (el *EventListener) partitionConsumer(ctx context.Context, log *zap.Sugared
return err
}

consumer, err = sarama.NewConsumer([]string{kafkaEventSource.URL}, config)
urls := strings.Split(kafkaEventSource.URL, ",")
consumer, err = sarama.NewConsumer(urls, config)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/eventsource/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/eventsource/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/eventsource/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ type AMQPEventSource struct {

// KafkaEventSource refers to event-source for Kafka related events
type KafkaEventSource struct {
// URL to kafka cluster
// URL to kafka cluster, multiple URLs separated by comma
URL string `json:"url" protobuf:"bytes,1,opt,name=url"`
// Partition name
Partition string `json:"partition" protobuf:"bytes,2,opt,name=partition"`
Expand Down

0 comments on commit 9d23043

Please sign in to comment.