diff --git a/cmd/ingester/app/flags.go b/cmd/ingester/app/flags.go index d775a20e842c..7896662ca148 100644 --- a/cmd/ingester/app/flags.go +++ b/cmd/ingester/app/flags.go @@ -55,7 +55,7 @@ const ( DefaultTopic = "jaeger-spans" // DefaultGroupID is the default consumer Group ID DefaultGroupID = "jaeger-ingester" - // DefaultGroupID is the default consumer Client ID + // DefaultClientID is the default consumer Client ID DefaultClientID = "jaeger-ingester" // DefaultParallelism is the default parallelism for the span processor DefaultParallelism = 1000 @@ -111,6 +111,7 @@ func (o *Options) InitFromViper(v *viper.Viper) { o.Brokers = strings.Split(stripWhiteSpace(v.GetString(KafkaConsumerConfigPrefix+SuffixBrokers)), ",") o.Topic = v.GetString(KafkaConsumerConfigPrefix + SuffixTopic) o.GroupID = v.GetString(KafkaConsumerConfigPrefix + SuffixGroupID) + o.ClientID = v.GetString(KafkaConsumerConfigPrefix + SuffixClientID) o.Encoding = v.GetString(KafkaConsumerConfigPrefix + SuffixEncoding) o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism) diff --git a/plugin/storage/integration/kafka_test.go b/plugin/storage/integration/kafka_test.go index a3895ef96caf..559dde2e2418 100644 --- a/plugin/storage/integration/kafka_test.go +++ b/plugin/storage/integration/kafka_test.go @@ -46,6 +46,7 @@ func (s *KafkaIntegrationTestSuite) initialize() error { s.logger, _ = testutils.NewLogger() const encoding = "json" const groupID = "kafka-integration-test" + const ClientID = "kafka-integration-test" // A new topic is generated per execution to avoid data overlap topic := "jaeger-kafka-integration-test-" + strconv.FormatInt(time.Now().UnixNano(), 10) @@ -81,6 +82,8 @@ func (s *KafkaIntegrationTestSuite) initialize() error { encoding, "--kafka.consumer.group-id", groupID, + "--kafka.consumer.client-id", + ClientID, "--ingester.parallelism", "1000", })