Skip to content

Commit

Permalink
changefeedccl: hook up kafka producer library logs to our logging
Browse files Browse the repository at this point in the history
I don't entirely love this, given that we don't have much control over
sarama's log volume, but sarama returns the same "kafka: client has run
out of available brokers to talk to (Is your cluster reachable?)" error
for any connection issue. Which means it's basically impossible to debug
kafka connection issues without something like this and I think it's
better than the alternative of nothing. Motivated by #35510 which makes
hard to debug connection issues even more likely.

Logs look something like this:

    I190312 18:56:53.535646 585 vendor/github.com/Shopify/sarama/client.go:123  [kafka-producer] Initializing new client
    I190312 18:56:53.535714 585 vendor/github.com/Shopify/sarama/client.go:724  [kafka-producer] client/metadata fetching metadata for all topics from broker localhost:9092
    I190312 18:56:53.536730 569 vendor/github.com/Shopify/sarama/broker.go:148  [kafka-producer] Connected to broker at localhost:9092 (unregistered)
    I190312 18:56:53.537661 585 vendor/github.com/Shopify/sarama/client.go:500  [kafka-producer] client/brokers registered new broker #0 at 172.16.94.87:9092
    I190312 18:56:53.537686 585 vendor/github.com/Shopify/sarama/client.go:170  [kafka-producer] Successfully initialized new client

Release note (enterprise change): `CHANGEFEED`s using kafka log
information to help debug connection issues
  • Loading branch information
danhhz committed Mar 18, 2019
1 parent 3fb5e6f commit 18f2f6f
Showing 1 changed file with 30 additions and 0 deletions.
30 changes: 30 additions & 0 deletions pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logtags"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/pkg/errors"
Expand Down Expand Up @@ -203,6 +204,30 @@ func getSink(
return s, nil
}

type kafkaLogAdapter struct {
ctx context.Context
}

var _ sarama.StdLogger = (*kafkaLogAdapter)(nil)

func (l *kafkaLogAdapter) Print(v ...interface{}) {
log.InfoDepth(l.ctx, 1, v...)
}
func (l *kafkaLogAdapter) Printf(format string, v ...interface{}) {
log.InfofDepth(l.ctx, 1, format, v...)
}
func (l *kafkaLogAdapter) Println(v ...interface{}) {
log.InfoDepth(l.ctx, 1, v...)
}

func init() {
// We'd much prefer to make one of these per sink, so we can use the real
// context, but quite unfortunately, sarama only has a global logger hook.
ctx := context.Background()
ctx = logtags.AddTag(ctx, "kafka-producer", nil)
sarama.Logger = &kafkaLogAdapter{ctx: ctx}
}

type kafkaSinkConfig struct {
kafkaTopicPrefix string
tlsEnabled bool
Expand Down Expand Up @@ -246,6 +271,7 @@ func makeKafkaSink(
}

config := sarama.NewConfig()
config.ClientID = `CockroachDB`
config.Producer.Return.Successes = true
config.Producer.Partitioner = newChangefeedPartitioner

Expand Down Expand Up @@ -301,6 +327,10 @@ func makeKafkaSink(
// to test this one more before changing it.
config.Producer.Flush.MaxMessages = 1000

// config.Producer.Flush.Messages is set to 1 so we don't need this, but
// sarama prints scary things to the logs if we don't.
config.Producer.Flush.Frequency = time.Hour

var err error
sink.client, err = sarama.NewClient(strings.Split(bootstrapServers, `,`), config)
if err != nil {
Expand Down

0 comments on commit 18f2f6f

Please sign in to comment.