From 4d7609cf5f0e8c2fdf138b8fd0ff0f8edf1f8a12 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 24 Feb 2022 23:49:42 +0800 Subject: [PATCH] cdc/sink: Kafka support user set configuration (#4512) (#4525) close pingcap/tiflow#4385 --- cdc/model/schema_storage.go | 6 +- cdc/owner/ddl_sink.go | 13 +- cdc/processor/processor.go | 24 +- cdc/sink/mq.go | 65 +++-- cdc/sink/producer/kafka/config.go | 325 ++++++++++++++++++++++ cdc/sink/producer/kafka/config_test.go | 195 +++++++++++++ cdc/sink/producer/kafka/kafka.go | 362 +++++-------------------- cdc/sink/producer/kafka/kafka_test.go | 127 +-------- cdc/sink/sink.go | 2 + cmd/kafka-consumer/main.go | 1 + pkg/util/ctx.go | 16 ++ pkg/util/identity.go | 54 ++++ 12 files changed, 741 insertions(+), 449 deletions(-) create mode 100644 cdc/sink/producer/kafka/config.go create mode 100644 cdc/sink/producer/kafka/config_test.go create mode 100644 pkg/util/identity.go diff --git a/cdc/model/schema_storage.go b/cdc/model/schema_storage.go index 2238dc3b331..909adac9739 100644 --- a/cdc/model/schema_storage.go +++ b/cdc/model/schema_storage.go @@ -17,14 +17,12 @@ import ( "fmt" "github.com/pingcap/log" - - "go.uber.org/zap" - "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/types" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/util/rowcodec" + "go.uber.org/zap" ) const ( @@ -139,7 +137,7 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode ti.findHandleIndex() ti.initColumnsFlag() - log.Debug("warpped table info", zap.Reflect("tableInfo", ti)) + log.Debug("warped table info", zap.Reflect("tableInfo", ti)) return ti } diff --git a/cdc/owner/ddl_sink.go b/cdc/owner/ddl_sink.go index 00646f19823..e62bbdd823e 100644 --- a/cdc/owner/ddl_sink.go +++ b/cdc/owner/ddl_sink.go @@ -121,11 +121,13 @@ func (s *ddlSinkImpl) run(ctx cdcContext.Context, id model.ChangeFeedID, info *m start := time.Now() if err := s.sinkInitHandler(ctx, s, id, info); err != nil { - log.Warn("ddl sink initialize failed", zap.Duration("elapsed", time.Since(start))) + log.Warn("ddl sink initialize failed", + zap.Duration("duration", time.Since(start))) ctx.Throw(err) return } - log.Info("ddl sink initialized, start processing...", zap.Duration("elapsed", time.Since(start))) + log.Info("ddl sink initialized, start processing...", + zap.Duration("duration", time.Since(start))) // TODO make the tick duration configurable ticker := time.NewTicker(time.Second) @@ -154,13 +156,16 @@ func (s *ddlSinkImpl) run(ctx cdcContext.Context, id model.ChangeFeedID, info *m err = cerror.ErrExecDDLFailed.GenWithStackByArgs() }) if err == nil || cerror.ErrDDLEventIgnored.Equal(errors.Cause(err)) { - log.Info("Execute DDL succeeded", zap.String("changefeed", ctx.ChangefeedVars().ID), zap.Bool("ignored", err != nil), zap.Reflect("ddl", ddl)) + log.Info("Execute DDL succeeded", + zap.String("changefeed", ctx.ChangefeedVars().ID), + zap.Bool("ignored", err != nil), + zap.Reflect("ddl", ddl)) atomic.StoreUint64(&s.ddlFinishedTs, ddl.CommitTs) continue } // If DDL executing failed, and the error can not be ignored, throw an error and pause the changefeed log.Error("Execute DDL failed", - zap.String("ChangeFeedID", ctx.ChangefeedVars().ID), + zap.String("changefeed", ctx.ChangefeedVars().ID), zap.Error(err), zap.Reflect("ddl", ddl)) ctx.Throw(errors.Trace(err)) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 00fc6b9443c..4f7eb09fc37 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -265,6 +265,8 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error { } stdCtx := util.PutChangefeedIDInCtx(ctx, p.changefeed.ID) + stdCtx = util.PutCaptureAddrInCtx(stdCtx, p.captureInfo.AdvertiseAddr) + stdCtx = util.PutRoleInCtx(stdCtx, util.RoleProcessor) p.mounter = entry.NewMounter(p.schemaStorage, p.changefeed.Info.Config.Mounter.WorkerNum, p.changefeed.Info.Config.EnableOldValue) p.wg.Add(1) @@ -804,15 +806,8 @@ func (p *processor) Close() error { } p.cancel() p.wg.Wait() - // mark tables share the same cdcContext with its original table, don't need to cancel - failpoint.Inject("processorStopDelay", nil) - resolvedTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) - resolvedTsLagGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) - checkpointTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) - checkpointTsLagGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) - syncTableNumGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) - processorErrorCounter.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) - processorSchemaStorageGcTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) + + // sink close might be time-consuming, do it the last. if p.sinkManager != nil { // pass a canceled context is ok here, since we don't need to wait Close ctx, cancel := context.WithCancel(context.Background()) @@ -830,6 +825,17 @@ func (p *processor) Close() error { zap.String("changefeed", p.changefeedID), zap.Duration("duration", time.Since(start))) } + + // mark tables share the same cdcContext with its original table, don't need to cancel + failpoint.Inject("processorStopDelay", nil) + resolvedTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) + resolvedTsLagGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) + checkpointTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) + checkpointTsLagGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) + syncTableNumGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) + processorErrorCounter.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) + processorSchemaStorageGcTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) + return nil } diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 389c2b3338a..35732438455 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -57,11 +57,15 @@ type mqSink struct { resolvedReceiver *notify.Receiver statistics *Statistics + + role util.Role + id model.ChangeFeedID } func newMqSink( ctx context.Context, credential *security.Credential, mqProducer producer.Producer, - filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error, + filter *filter.Filter, replicaConfig *config.ReplicaConfig, opts map[string]string, + errCh chan error, ) (*mqSink, error) { partitionNum := mqProducer.GetPartitionNum() partitionInput := make([]chan struct { @@ -74,13 +78,13 @@ func newMqSink( resolvedTs uint64 }, 12800) } - d, err := dispatcher.NewDispatcher(config, mqProducer.GetPartitionNum()) + d, err := dispatcher.NewDispatcher(replicaConfig, mqProducer.GetPartitionNum()) if err != nil { return nil, errors.Trace(err) } notifier := new(notify.Notifier) var protocol codec.Protocol - protocol.FromString(config.Sink.Protocol) + protocol.FromString(replicaConfig.Sink.Protocol) newEncoder := codec.NewEventBatchEncoder(protocol) if protocol == codec.ProtocolAvro { @@ -108,7 +112,7 @@ func newMqSink( avroEncoder.SetTimeZone(util.TimezoneFromCtx(ctx)) return avroEncoder } - } else if (protocol == codec.ProtocolCanal || protocol == codec.ProtocolCanalJSON || protocol == codec.ProtocolMaxwell) && !config.EnableOldValue { + } else if (protocol == codec.ProtocolCanal || protocol == codec.ProtocolCanalJSON || protocol == codec.ProtocolMaxwell) && !replicaConfig.EnableOldValue { log.Error(fmt.Sprintf("Old value is not enabled when using `%s` protocol. "+ "Please update changefeed config", protocol.String())) return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, errors.New(fmt.Sprintf("%s protocol requires old value to be enabled", protocol.String()))) @@ -133,7 +137,11 @@ func newMqSink( if err != nil { return nil, err } - k := &mqSink{ + + changefeedID := util.ChangefeedIDFromCtx(ctx) + role := util.RoleFromCtx(ctx) + + s := &mqSink{ mqProducer: mqProducer, dispatcher: d, newEncoder: newEncoder, @@ -148,27 +156,34 @@ func newMqSink( resolvedReceiver: resolvedReceiver, statistics: NewStatistics(ctx, "MQ"), + + role: role, + id: changefeedID, } go func() { - if err := k.run(ctx); err != nil && errors.Cause(err) != context.Canceled { + if err := s.run(ctx); err != nil && errors.Cause(err) != context.Canceled { select { case <-ctx.Done(): return case errCh <- err: default: - log.Error("error channel is full", zap.Error(err)) + log.Error("error channel is full", zap.Error(err), + zap.String("changefeed", changefeedID), zap.Any("role", s.role)) } } }() - return k, nil + return s, nil } func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { rowsCount := 0 for _, row := range rows { if k.filter.ShouldIgnoreDMLEvent(row.StartTs, row.Table.Schema, row.Table.Table) { - log.Info("Row changed event ignored", zap.Uint64("start-ts", row.StartTs)) + log.Info("Row changed event ignored", + zap.Uint64("start-ts", row.StartTs), + zap.String("changefeed", k.id), + zap.Any("role", k.role)) continue } partition := k.dispatcher.Dispatch(row) @@ -246,6 +261,8 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { zap.String("query", ddl.Query), zap.Uint64("startTs", ddl.StartTs), zap.Uint64("commitTs", ddl.CommitTs), + zap.String("changefeed", k.id), + zap.Any("role", k.role), ) return cerror.ErrDDLEventIgnored.GenWithStackByArgs() } @@ -260,7 +277,8 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { } k.statistics.AddDDLCount() - log.Debug("emit ddl event", zap.String("query", ddl.Query), zap.Uint64("commit-ts", ddl.CommitTs)) + log.Debug("emit ddl event", zap.String("query", ddl.Query), + zap.Uint64("commitTs", ddl.CommitTs), zap.String("changefeed", k.id), zap.Any("role", k.role)) err = k.writeToProducer(ctx, msg, codec.EncoderNeedSyncWrite, -1) return errors.Trace(err) } @@ -318,7 +336,8 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error { return 0, err } } - log.Debug("MQSink flushed", zap.Int("thisBatchSize", thisBatchSize)) + log.Debug("MQSink flushed", zap.Int("thisBatchSize", thisBatchSize), + zap.String("changefeed", k.id), zap.Any("role", k.role)) return thisBatchSize, nil }) } @@ -391,18 +410,17 @@ func (k *mqSink) writeToProducer(ctx context.Context, message *codec.MQMessage, log.Warn("writeToProducer called with no-op", zap.ByteString("key", message.Key), zap.ByteString("value", message.Value), - zap.Int32("partition", partition)) + zap.Int32("partition", partition), + zap.String("changefeed", k.id), + zap.Any("role", k.role)) return nil } -func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error) (*mqSink, error) { - scheme := strings.ToLower(sinkURI.Scheme) - if scheme != "kafka" && scheme != "kafka+ssl" { - return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("can't create MQ sink with unsupported scheme: %s", scheme) - } - - config := kafka.NewConfig() - if err := config.Initialize(sinkURI, replicaConfig, opts); err != nil { +func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, + filter *filter.Filter, replicaConfig *config.ReplicaConfig, + opts map[string]string, errCh chan error) (*mqSink, error) { + producerConfig := kafka.NewConfig() + if err := kafka.CompleteConfigsAndOpts(sinkURI, producerConfig, replicaConfig, opts); err != nil { return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } @@ -412,18 +430,19 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi if topic == "" { return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("no topic is specified in sink-uri") } - producer, err := kafka.NewKafkaSaramaProducer(ctx, topic, config, opts, errCh) + producer, err := kafka.NewKafkaSaramaProducer(ctx, topic, producerConfig, opts, errCh) if err != nil { return nil, errors.Trace(err) } - sink, err := newMqSink(ctx, config.Credential, producer, filter, replicaConfig, opts, errCh) + sink, err := newMqSink(ctx, producerConfig.Credential, producer, filter, replicaConfig, opts, errCh) if err != nil { return nil, errors.Trace(err) } return sink, nil } -func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error) (*mqSink, error) { +func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, + replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error) (*mqSink, error) { producer, err := pulsar.NewProducer(sinkURI, errCh) if err != nil { return nil, errors.Trace(err) diff --git a/cdc/sink/producer/kafka/config.go b/cdc/sink/producer/kafka/config.go new file mode 100644 index 00000000000..9e9a947f576 --- /dev/null +++ b/cdc/sink/producer/kafka/config.go @@ -0,0 +1,325 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "net/url" + "strconv" + "strings" + "time" + + "github.com/Shopify/sarama" + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/security" + "github.com/pingcap/tiflow/pkg/util" + "go.uber.org/zap" +) + +// Config stores user specified Kafka producer configuration +type Config struct { + BrokerEndpoints []string + PartitionNum int32 + + // User should make sure that `replication-factor` not greater than the number of kafka brokers. + ReplicationFactor int16 + + Version string + MaxMessageBytes int + Compression string + ClientID string + Credential *security.Credential + SaslScram *security.SaslScram + // control whether to create topic + AutoCreate bool + + // Timeout for sarama `config.Net` configurations, default to `10s` + DialTimeout time.Duration + WriteTimeout time.Duration + ReadTimeout time.Duration +} + +// NewConfig returns a default Kafka configuration +func NewConfig() *Config { + return &Config{ + Version: "2.4.0", + // MaxMessageBytes will be used to initialize producer + MaxMessageBytes: config.DefaultMaxMessageBytes, + ReplicationFactor: 1, + Compression: "none", + Credential: &security.Credential{}, + SaslScram: &security.SaslScram{}, + AutoCreate: true, + DialTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + ReadTimeout: 10 * time.Second, + } +} + +// set the partition-num by the topic's partition count. +func (c *Config) setPartitionNum(realPartitionCount int32) error { + // user does not specify the `partition-num` in the sink-uri + if c.PartitionNum == 0 { + c.PartitionNum = realPartitionCount + return nil + } + + if c.PartitionNum < realPartitionCount { + log.Warn("number of partition specified in sink-uri is less than that of the actual topic. "+ + "Some partitions will not have messages dispatched to", + zap.Int32("sink-uri partitions", c.PartitionNum), + zap.Int32("topic partitions", realPartitionCount)) + return nil + } + + // Make sure that the user-specified `partition-num` is not greater than + // the real partition count, since messages would be dispatched to different + // partitions, this could prevent potential correctness problems. + if c.PartitionNum > realPartitionCount { + return cerror.ErrKafkaInvalidPartitionNum.GenWithStack( + "the number of partition (%d) specified in sink-uri is more than that of actual topic (%d)", + c.PartitionNum, realPartitionCount) + } + return nil +} + +// CompleteConfigsAndOpts the kafka producer configuration, replication configuration and opts. +func CompleteConfigsAndOpts(sinkURI *url.URL, producerConfig *Config, replicaConfig *config.ReplicaConfig, opts map[string]string) error { + producerConfig.BrokerEndpoints = strings.Split(sinkURI.Host, ",") + params := sinkURI.Query() + s := params.Get("partition-num") + if s != "" { + a, err := strconv.ParseInt(s, 10, 32) + if err != nil { + return err + } + producerConfig.PartitionNum = int32(a) + if producerConfig.PartitionNum <= 0 { + return cerror.ErrKafkaInvalidPartitionNum.GenWithStackByArgs(producerConfig.PartitionNum) + } + } + + s = params.Get("replication-factor") + if s != "" { + a, err := strconv.ParseInt(s, 10, 16) + if err != nil { + return err + } + producerConfig.ReplicationFactor = int16(a) + } + + s = params.Get("kafka-version") + if s != "" { + producerConfig.Version = s + } + + s = params.Get("max-message-bytes") + if s != "" { + a, err := strconv.Atoi(s) + if err != nil { + return err + } + producerConfig.MaxMessageBytes = a + opts["max-message-bytes"] = s + } + + s = params.Get("max-batch-size") + if s != "" { + opts["max-batch-size"] = s + } + + s = params.Get("compression") + if s != "" { + producerConfig.Compression = s + } + + producerConfig.ClientID = params.Get("kafka-client-id") + + s = sinkURI.Query().Get("protocol") + if s != "" { + replicaConfig.Sink.Protocol = s + } + + s = params.Get("ca") + if s != "" { + producerConfig.Credential.CAPath = s + } + + s = params.Get("cert") + if s != "" { + producerConfig.Credential.CertPath = s + } + + s = params.Get("key") + if s != "" { + producerConfig.Credential.KeyPath = s + } + + s = params.Get("sasl-user") + if s != "" { + producerConfig.SaslScram.SaslUser = s + } + + s = params.Get("sasl-password") + if s != "" { + producerConfig.SaslScram.SaslPassword = s + } + + s = params.Get("sasl-mechanism") + if s != "" { + producerConfig.SaslScram.SaslMechanism = s + } + + s = params.Get("auto-create-topic") + if s != "" { + autoCreate, err := strconv.ParseBool(s) + if err != nil { + return err + } + producerConfig.AutoCreate = autoCreate + } + + s = params.Get("dial-timeout") + if s != "" { + a, err := time.ParseDuration(s) + if err != nil { + return err + } + producerConfig.DialTimeout = a + } + + s = params.Get("write-timeout") + if s != "" { + a, err := time.ParseDuration(s) + if err != nil { + return err + } + producerConfig.WriteTimeout = a + } + + s = params.Get("read-timeout") + if s != "" { + a, err := time.ParseDuration(s) + if err != nil { + return err + } + producerConfig.ReadTimeout = a + } + + return nil +} + +// newSaramaConfig return the default config and set the according version and metrics +func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) { + config := sarama.NewConfig() + + version, err := sarama.ParseKafkaVersion(c.Version) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidVersion, err) + } + var role string + if util.IsOwnerFromCtx(ctx) { + role = "owner" + } else { + role = "processor" + } + captureAddr := util.CaptureAddrFromCtx(ctx) + changefeedID := util.ChangefeedIDFromCtx(ctx) + + config.ClientID, err = kafkaClientID(role, captureAddr, changefeedID, c.ClientID) + if err != nil { + return nil, errors.Trace(err) + } + config.Version = version + + // Producer fetch metadata from brokers frequently, if metadata cannot be + // refreshed easily, this would indicate the network condition between the + // capture server and kafka broker is not good. + // In the scenario that cannot get response from Kafka server, this default + // setting can help to get response more quickly. + config.Metadata.Retry.Max = 1 + config.Metadata.Retry.Backoff = 100 * time.Millisecond + // This Timeout is useless if the `RefreshMetadata` time cost is less than it. + config.Metadata.Timeout = 1 * time.Minute + + // Admin.Retry take effect on `ClusterAdmin` related operations, + // only `CreateTopic` for cdc now. set the `Timeout` to `1m` to make CI stable. + config.Admin.Retry.Max = 5 + config.Admin.Retry.Backoff = 100 * time.Millisecond + config.Admin.Timeout = 1 * time.Minute + + // Producer.Retry take effect when the producer try to send message to kafka + // brokers. If kafka cluster is healthy, just the default value should be enough. + // For kafka cluster with a bad network condition, producer should not try to + // waster too much time on sending a message, get response no matter success + // or fail as soon as possible is preferred. + config.Producer.Retry.Max = 3 + config.Producer.Retry.Backoff = 100 * time.Millisecond + + // make sure sarama producer flush messages as soon as possible. + config.Producer.Flush.Bytes = 0 + config.Producer.Flush.Messages = 0 + config.Producer.Flush.Frequency = time.Duration(0) + + config.Net.DialTimeout = c.DialTimeout + config.Net.WriteTimeout = c.WriteTimeout + config.Net.ReadTimeout = c.ReadTimeout + + config.Producer.Partitioner = sarama.NewManualPartitioner + config.Producer.MaxMessageBytes = c.MaxMessageBytes + config.Producer.Return.Successes = true + config.Producer.Return.Errors = true + config.Producer.RequiredAcks = sarama.WaitForAll + switch strings.ToLower(strings.TrimSpace(c.Compression)) { + case "none": + config.Producer.Compression = sarama.CompressionNone + case "gzip": + config.Producer.Compression = sarama.CompressionGZIP + case "snappy": + config.Producer.Compression = sarama.CompressionSnappy + case "lz4": + config.Producer.Compression = sarama.CompressionLZ4 + case "zstd": + config.Producer.Compression = sarama.CompressionZSTD + default: + log.Warn("Unsupported compression algorithm", zap.String("compression", c.Compression)) + config.Producer.Compression = sarama.CompressionNone + } + + if c.Credential != nil && len(c.Credential.CAPath) != 0 { + config.Net.TLS.Enable = true + config.Net.TLS.Config, err = c.Credential.ToTLSConfig() + if err != nil { + return nil, errors.Trace(err) + } + } + if c.SaslScram != nil && len(c.SaslScram.SaslUser) != 0 { + config.Net.SASL.Enable = true + config.Net.SASL.User = c.SaslScram.SaslUser + config.Net.SASL.Password = c.SaslScram.SaslPassword + config.Net.SASL.Mechanism = sarama.SASLMechanism(c.SaslScram.SaslMechanism) + if strings.EqualFold(c.SaslScram.SaslMechanism, "SCRAM-SHA-256") { + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &security.XDGSCRAMClient{HashGeneratorFcn: security.SHA256} } + } else if strings.EqualFold(c.SaslScram.SaslMechanism, "SCRAM-SHA-512") { + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &security.XDGSCRAMClient{HashGeneratorFcn: security.SHA512} } + } else { + return nil, errors.New("Unsupported sasl-mechanism, should be SCRAM-SHA-256 or SCRAM-SHA-512") + } + } + + return config, err +} diff --git a/cdc/sink/producer/kafka/config_test.go b/cdc/sink/producer/kafka/config_test.go new file mode 100644 index 00000000000..84b7b87c7d4 --- /dev/null +++ b/cdc/sink/producer/kafka/config_test.go @@ -0,0 +1,195 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "fmt" + "net/url" + "time" + + "github.com/Shopify/sarama" + "github.com/pingcap/check" + "github.com/pingcap/errors" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/security" + "github.com/pingcap/tiflow/pkg/util" + "github.com/pingcap/tiflow/pkg/util/testleak" +) + +func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) { + defer testleak.AfterTest(c)() + ctx := context.Background() + config := NewConfig() + config.Version = "invalid" + _, err := newSaramaConfigImpl(ctx, config) + c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*") + ctx = util.SetOwnerInCtx(ctx) + config.Version = "2.6.0" + config.ClientID = "^invalid$" + _, err = newSaramaConfigImpl(ctx, config) + c.Assert(cerror.ErrKafkaInvalidClientID.Equal(err), check.IsTrue) + + config.ClientID = "test-kafka-client" + compressionCases := []struct { + algorithm string + expected sarama.CompressionCodec + }{ + {"none", sarama.CompressionNone}, + {"gzip", sarama.CompressionGZIP}, + {"snappy", sarama.CompressionSnappy}, + {"lz4", sarama.CompressionLZ4}, + {"zstd", sarama.CompressionZSTD}, + {"others", sarama.CompressionNone}, + } + for _, cc := range compressionCases { + config.Compression = cc.algorithm + cfg, err := newSaramaConfigImpl(ctx, config) + c.Assert(err, check.IsNil) + c.Assert(cfg.Producer.Compression, check.Equals, cc.expected) + } + + config.Credential = &security.Credential{ + CAPath: "/invalid/ca/path", + } + _, err = newSaramaConfigImpl(ctx, config) + c.Assert(errors.Cause(err), check.ErrorMatches, ".*no such file or directory") + + saslConfig := NewConfig() + saslConfig.Version = "2.6.0" + saslConfig.ClientID = "test-sasl-scram" + saslConfig.SaslScram = &security.SaslScram{ + SaslUser: "user", + SaslPassword: "password", + SaslMechanism: sarama.SASLTypeSCRAMSHA256, + } + + cfg, err := newSaramaConfigImpl(ctx, saslConfig) + c.Assert(err, check.IsNil) + c.Assert(cfg, check.NotNil) + c.Assert(cfg.Net.SASL.User, check.Equals, "user") + c.Assert(cfg.Net.SASL.Password, check.Equals, "password") + c.Assert(cfg.Net.SASL.Mechanism, check.Equals, sarama.SASLMechanism("SCRAM-SHA-256")) +} + +func (s *kafkaSuite) TestConfigTimeouts(c *check.C) { + defer testleak.AfterTest(c)() + + cfg := NewConfig() + c.Assert(cfg.DialTimeout, check.Equals, 10*time.Second) + c.Assert(cfg.ReadTimeout, check.Equals, 10*time.Second) + c.Assert(cfg.WriteTimeout, check.Equals, 10*time.Second) + + saramaConfig, err := newSaramaConfig(context.Background(), cfg) + c.Assert(err, check.IsNil) + c.Assert(saramaConfig.Net.DialTimeout, check.Equals, cfg.DialTimeout) + c.Assert(saramaConfig.Net.WriteTimeout, check.Equals, cfg.WriteTimeout) + c.Assert(saramaConfig.Net.ReadTimeout, check.Equals, cfg.ReadTimeout) + + uri := "kafka://127.0.0.1:9092/kafka-test?dial-timeout=5s&read-timeout=1000ms" + + "&write-timeout=2m" + sinkURI, err := url.Parse(uri) + c.Assert(err, check.IsNil) + opts := make(map[string]string) + err = CompleteConfigsAndOpts(sinkURI, cfg, config.GetDefaultReplicaConfig(), opts) + c.Assert(err, check.IsNil) + + c.Assert(cfg.DialTimeout, check.Equals, 5*time.Second) + c.Assert(cfg.ReadTimeout, check.Equals, 1000*time.Millisecond) + c.Assert(cfg.WriteTimeout, check.Equals, 2*time.Minute) + + saramaConfig, err = newSaramaConfig(context.Background(), cfg) + c.Assert(err, check.IsNil) + c.Assert(saramaConfig.Net.DialTimeout, check.Equals, 5*time.Second) + c.Assert(saramaConfig.Net.ReadTimeout, check.Equals, 1000*time.Millisecond) + c.Assert(saramaConfig.Net.WriteTimeout, check.Equals, 2*time.Minute) +} + +func (s *kafkaSuite) TestCompleteConfigByOpts(c *check.C) { + defer testleak.AfterTest(c)() + cfg := NewConfig() + + // Normal config. + uriTemplate := "kafka://127.0.0.1:9092/kafka-test?kafka-version=2.6.0&max-batch-size=5" + + "&max-message-bytes=%s&partition-num=1&replication-factor=3" + + "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip" + maxMessageSize := "4096" // 4kb + uri := fmt.Sprintf(uriTemplate, maxMessageSize) + sinkURI, err := url.Parse(uri) + c.Assert(err, check.IsNil) + opts := make(map[string]string) + err = CompleteConfigsAndOpts(sinkURI, cfg, config.GetDefaultReplicaConfig(), opts) + c.Assert(err, check.IsNil) + c.Assert(cfg.PartitionNum, check.Equals, int32(1)) + c.Assert(cfg.ReplicationFactor, check.Equals, int16(3)) + c.Assert(cfg.Version, check.Equals, "2.6.0") + c.Assert(cfg.MaxMessageBytes, check.Equals, 4096) + expectedOpts := map[string]string{ + "max-message-bytes": maxMessageSize, + "max-batch-size": "5", + } + for k, v := range opts { + c.Assert(v, check.Equals, expectedOpts[k]) + } + + // Illegal replication-factor. + uri = "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&replication-factor=a" + sinkURI, err = url.Parse(uri) + c.Assert(err, check.IsNil) + cfg = NewConfig() + err = CompleteConfigsAndOpts(sinkURI, cfg, config.GetDefaultReplicaConfig(), opts) + c.Assert(errors.Cause(err), check.ErrorMatches, ".*invalid syntax.*") + + // Illegal max-message-bytes. + uri = "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&max-message-bytes=a" + sinkURI, err = url.Parse(uri) + c.Assert(err, check.IsNil) + cfg = NewConfig() + err = CompleteConfigsAndOpts(sinkURI, cfg, config.GetDefaultReplicaConfig(), opts) + c.Assert(errors.Cause(err), check.ErrorMatches, ".*invalid syntax.*") + + // Illegal partition-num. + uri = "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=a" + sinkURI, err = url.Parse(uri) + c.Assert(err, check.IsNil) + cfg = NewConfig() + err = CompleteConfigsAndOpts(sinkURI, cfg, config.GetDefaultReplicaConfig(), opts) + c.Assert(errors.Cause(err), check.ErrorMatches, ".*invalid syntax.*") + + // Out of range partition-num. + uri = "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" + sinkURI, err = url.Parse(uri) + c.Assert(err, check.IsNil) + cfg = NewConfig() + err = CompleteConfigsAndOpts(sinkURI, cfg, config.GetDefaultReplicaConfig(), opts) + c.Assert(errors.Cause(err), check.ErrorMatches, ".*invalid partition num.*") +} + +func (s *kafkaSuite) TestSetPartitionNum(c *check.C) { + defer testleak.AfterTest(c)() + cfg := NewConfig() + err := cfg.setPartitionNum(2) + c.Assert(err, check.IsNil) + c.Assert(cfg.PartitionNum, check.Equals, int32(2)) + + cfg.PartitionNum = 1 + err = cfg.setPartitionNum(2) + c.Assert(err, check.IsNil) + c.Assert(cfg.PartitionNum, check.Equals, int32(1)) + + cfg.PartitionNum = 3 + err = cfg.setPartitionNum(2) + c.Assert(cerror.ErrKafkaInvalidPartitionNum.Equal(err), check.IsTrue) +} diff --git a/cdc/sink/producer/kafka/kafka.go b/cdc/sink/producer/kafka/kafka.go index 3a1a10ea9aa..603904fefa5 100644 --- a/cdc/sink/producer/kafka/kafka.go +++ b/cdc/sink/producer/kafka/kafka.go @@ -16,7 +16,6 @@ package kafka import ( "context" "fmt" - "net/url" "regexp" "strconv" "strings" @@ -28,160 +27,27 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/codec" - "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/kafka" "github.com/pingcap/tiflow/pkg/notify" - "github.com/pingcap/tiflow/pkg/security" "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) const defaultPartitionNum = 3 -// Config stores user specified Kafka producer configuration -type Config struct { - BrokerEndpoints []string - PartitionNum int32 - - // User should make sure that `replication-factor` not greater than the number of kafka brokers. - ReplicationFactor int16 - - Version string - MaxMessageBytes int - Compression string - ClientID string - Credential *security.Credential - SaslScram *security.SaslScram - // control whether to create topic - AutoCreate bool -} - -// NewConfig returns a default Kafka configuration -func NewConfig() *Config { - return &Config{ - Version: "2.4.0", - // MaxMessageBytes will be used to initialize producer, we set the default value (1M) identical to kafka broker. - MaxMessageBytes: config.DefaultMaxMessageBytes, - ReplicationFactor: 1, - Compression: "none", - Credential: &security.Credential{}, - SaslScram: &security.SaslScram{}, - AutoCreate: true, - } -} - -// Initialize the kafka configuration -func (c *Config) Initialize(sinkURI *url.URL, replicaConfig *config.ReplicaConfig, opts map[string]string) error { - c.BrokerEndpoints = strings.Split(sinkURI.Host, ",") - params := sinkURI.Query() - s := params.Get("partition-num") - if s != "" { - a, err := strconv.Atoi(s) - if err != nil { - return err - } - if a <= 0 { - return cerror.ErrKafkaInvalidPartitionNum.GenWithStackByArgs(a) - } - c.PartitionNum = int32(a) - if c.PartitionNum <= 0 { - return cerror.ErrKafkaInvalidPartitionNum.GenWithStackByArgs(c.PartitionNum) - } - } - - s = sinkURI.Query().Get("replication-factor") - if s != "" { - a, err := strconv.Atoi(s) - if err != nil { - return err - } - c.ReplicationFactor = int16(a) - } - - s = sinkURI.Query().Get("kafka-version") - if s != "" { - c.Version = s - } - - s = sinkURI.Query().Get("max-message-bytes") - if s != "" { - a, err := strconv.Atoi(s) - if err != nil { - return err - } - c.MaxMessageBytes = a - opts["max-message-bytes"] = s - } - - s = sinkURI.Query().Get("max-batch-size") - if s != "" { - opts["max-batch-size"] = s - } - - s = sinkURI.Query().Get("compression") - if s != "" { - c.Compression = s - } - - c.ClientID = sinkURI.Query().Get("kafka-client-id") - - s = sinkURI.Query().Get("protocol") - if s != "" { - replicaConfig.Sink.Protocol = s - } - - s = sinkURI.Query().Get("ca") - if s != "" { - c.Credential.CAPath = s - } - - s = sinkURI.Query().Get("cert") - if s != "" { - c.Credential.CertPath = s - } - - s = sinkURI.Query().Get("key") - if s != "" { - c.Credential.KeyPath = s - } - - s = sinkURI.Query().Get("sasl-user") - if s != "" { - c.SaslScram.SaslUser = s - } - - s = sinkURI.Query().Get("sasl-password") - if s != "" { - c.SaslScram.SaslPassword = s - } - - s = sinkURI.Query().Get("sasl-mechanism") - if s != "" { - c.SaslScram.SaslMechanism = s - } - - s = sinkURI.Query().Get("auto-create-topic") - if s != "" { - autoCreate, err := strconv.ParseBool(s) - if err != nil { - return err - } - c.AutoCreate = autoCreate - } - - return nil -} - type kafkaSaramaProducer struct { - // clientLock is used to protect concurrent access of asyncClient and syncClient. + // clientLock is used to protect concurrent access of asyncProducer and syncProducer. // Since we don't close these two clients (which have an input chan) from the // sender routine, data race or send on closed chan could happen. - clientLock sync.RWMutex - asyncClient sarama.AsyncProducer - syncClient sarama.SyncProducer - // producersReleased records whether asyncClient and syncClient have been closed properly + clientLock sync.RWMutex + client sarama.Client + asyncProducer sarama.AsyncProducer + syncProducer sarama.SyncProducer + + // producersReleased records whether asyncProducer and syncProducer have been closed properly producersReleased bool topic string partitionNum int32 @@ -198,6 +64,9 @@ type kafkaSaramaProducer struct { closeCh chan struct{} // atomic flag indicating whether the producer is closing closing kafkaProducerClosingFlag + + role util.Role + id model.ChangeFeedID } type kafkaProducerClosingFlag = int32 @@ -228,14 +97,15 @@ func (k *kafkaSaramaProducer) SendMessage(ctx context.Context, message *codec.MQ failpoint.Inject("KafkaSinkAsyncSendError", func() { // simulate sending message to input channel successfully but flushing // message to Kafka meets error - log.Info("failpoint error injected") + log.Info("failpoint error injected", zap.String("changefeed", k.id), zap.Any("role", k.role)) k.failpointCh <- errors.New("kafka sink injected error") failpoint.Return(nil) }) failpoint.Inject("SinkFlushDMLPanic", func() { time.Sleep(time.Second) - log.Panic("SinkFlushDMLPanic") + log.Panic("SinkFlushDMLPanic", + zap.String("changefeed", k.id), zap.Any("role", k.role)) }) select { @@ -243,7 +113,7 @@ func (k *kafkaSaramaProducer) SendMessage(ctx context.Context, message *codec.MQ return ctx.Err() case <-k.closeCh: return nil - case k.asyncClient.Input() <- msg: + case k.asyncProducer.Input() <- msg: } return nil } @@ -266,7 +136,7 @@ func (k *kafkaSaramaProducer) SyncBroadcastMessage(ctx context.Context, message case <-k.closeCh: return nil default: - err := k.syncClient.SendMessages(msgs) + err := k.syncProducer.SendMessages(msgs) return cerror.WrapError(cerror.ErrKafkaSendMessage, err) } } @@ -328,12 +198,13 @@ func (k *kafkaSaramaProducer) stop() { if atomic.SwapInt32(&k.closing, kafkaProducerClosing) == kafkaProducerClosing { return } - log.Info("kafka producer closing...") + log.Info("kafka producer closing...", zap.String("changefeed", k.id), zap.Any("role", k.role)) close(k.closeCh) } // Close closes the sync and async clients. func (k *kafkaSaramaProducer) Close() error { + log.Info("stop the kafka producer", zap.String("changefeed", k.id), zap.Any("role", k.role)) k.stop() k.clientLock.Lock() @@ -345,22 +216,42 @@ func (k *kafkaSaramaProducer) Close() error { return nil } k.producersReleased = true - // In fact close sarama sync client doesn't return any error. - // But close async client returns error if error channel is not empty, we - // don't populate this error to the upper caller, just add a log here. + + // `client` is mainly used by `asyncProducer` to fetch metadata and other related + // operations. When we close the `kafkaSaramaProducer`, TiCDC no need to make sure + // that buffered messages flushed. + // Consider the situation that the broker does not respond, If the client is not + // closed, `asyncProducer.Close()` would waste a mount of time to try flush all messages. + // To prevent the scenario mentioned above, close client first. start := time.Now() - err := k.asyncClient.Close() + if err := k.client.Close(); err != nil { + log.Error("close sarama client with error", zap.Error(err), + zap.Duration("duration", time.Since(start)), + zap.String("changefeed", k.id), zap.Any("role", k.role)) + } else { + log.Info("sarama client closed", zap.Duration("duration", time.Since(start)), + zap.String("changefeed", k.id), zap.Any("role", k.role)) + } + + start = time.Now() + err := k.asyncProducer.Close() if err != nil { - log.Error("close async client with error", zap.Error(err), zap.Duration("duration", time.Since(start))) + log.Error("close async client with error", zap.Error(err), + zap.Duration("duration", time.Since(start)), + zap.String("changefeed", k.id), zap.Any("role", k.role)) } else { - log.Info("async client closed", zap.Duration("duration", time.Since(start))) + log.Info("async client closed", zap.Duration("duration", time.Since(start)), + zap.String("changefeed", k.id), zap.Any("role", k.role)) } start = time.Now() - err = k.syncClient.Close() + err = k.syncProducer.Close() if err != nil { - log.Error("close sync client with error", zap.Error(err), zap.Duration("duration", time.Since(start))) + log.Error("close sync client with error", zap.Error(err), + zap.Duration("duration", time.Since(start)), + zap.String("changefeed", k.id), zap.Any("role", k.role)) } else { - log.Info("sync client closed", zap.Duration("duration", time.Since(start))) + log.Info("sync client closed", zap.Duration("duration", time.Since(start)), + zap.String("changefeed", k.id), zap.Any("role", k.role)) } return nil } @@ -368,6 +259,8 @@ func (k *kafkaSaramaProducer) Close() error { func (k *kafkaSaramaProducer) run(ctx context.Context) error { defer func() { k.flushedReceiver.Stop() + log.Info("stop the kafka producer", + zap.String("changefeed", k.id), zap.Any("role", k.role)) k.stop() }() for { @@ -377,16 +270,17 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error { case <-k.closeCh: return nil case err := <-k.failpointCh: - log.Warn("receive from failpoint chan", zap.Error(err)) + log.Warn("receive from failpoint chan", zap.Error(err), + zap.String("changefeed", k.id), zap.Any("role", k.role)) return err - case msg := <-k.asyncClient.Successes(): + case msg := <-k.asyncProducer.Successes(): if msg == nil || msg.Metadata == nil { continue } flushedOffset := msg.Metadata.(uint64) atomic.StoreUint64(&k.partitionOffset[msg.Partition].flushed, flushedOffset) k.flushedNotifier.Notify() - case err := <-k.asyncClient.Errors(): + case err := <-k.asyncProducer.Errors(): // We should not wrap a nil pointer if the pointer is of a subtype of `error` // because Go would store the type info and the resulted `error` variable would not be nil, // which will cause the pkg/error library to malfunction. @@ -406,6 +300,8 @@ var ( // NewKafkaSaramaProducer creates a kafka sarama producer func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, opts map[string]string, errCh chan error) (*kafkaSaramaProducer, error) { + changefeedID := util.ChangefeedIDFromCtx(ctx) + role := util.RoleFromCtx(ctx) log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config)) cfg, err := newSaramaConfigImpl(ctx, config) if err != nil { @@ -427,11 +323,17 @@ func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, o } opts["max-message-bytes"] = strconv.Itoa(cfg.Producer.MaxMessageBytes) - asyncClient, err := sarama.NewAsyncProducer(config.BrokerEndpoints, cfg) + client, err := sarama.NewClient(config.BrokerEndpoints, cfg) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + asyncProducer, err := sarama.NewAsyncProducerFromClient(client) if err != nil { return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } - syncClient, err := sarama.NewSyncProducer(config.BrokerEndpoints, cfg) + + syncProducer, err := sarama.NewSyncProducerFromClient(client) if err != nil { return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } @@ -442,10 +344,11 @@ func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, o return nil, err } k := &kafkaSaramaProducer{ - asyncClient: asyncClient, - syncClient: syncClient, - topic: topic, - partitionNum: config.PartitionNum, + client: client, + asyncProducer: asyncProducer, + syncProducer: syncProducer, + topic: topic, + partitionNum: config.PartitionNum, partitionOffset: make([]struct { flushed uint64 sent uint64 @@ -455,6 +358,9 @@ func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, o closeCh: make(chan struct{}), failpointCh: make(chan error, 1), closing: kafkaProducerRunning, + + id: changefeedID, + role: role, } go func() { if err := k.run(ctx); err != nil && errors.Cause(err) != context.Canceled { @@ -463,17 +369,14 @@ func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, o return case errCh <- err: default: - log.Error("error channel is full", zap.Error(err)) + log.Error("error channel is full", zap.Error(err), + zap.String("changefeed", k.id), zap.Any("role", role)) } } }() return k, nil } -func init() { - sarama.MaxRequestSize = 1024 * 1024 * 1024 // 1GB -} - var ( validClientID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`) commonInvalidChar = regexp.MustCompile(`[\?:,"]`) @@ -582,99 +485,6 @@ func validateAndCreateTopic(admin kafka.ClusterAdminClient, topic string, config return nil } -func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) { - config := sarama.NewConfig() - - version, err := sarama.ParseKafkaVersion(c.Version) - if err != nil { - return nil, cerror.WrapError(cerror.ErrKafkaInvalidVersion, err) - } - var role string - if util.IsOwnerFromCtx(ctx) { - role = "owner" - } else { - role = "processor" - } - captureAddr := util.CaptureAddrFromCtx(ctx) - changefeedID := util.ChangefeedIDFromCtx(ctx) - - config.ClientID, err = kafkaClientID(role, captureAddr, changefeedID, c.ClientID) - if err != nil { - return nil, errors.Trace(err) - } - config.Version = version - // See: https://kafka.apache.org/documentation/#replication - // When one of the brokers in a Kafka cluster is down, the partition leaders - // in this broker is broken, Kafka will election a new partition leader and - // replication logs, this process will last from a few seconds to a few minutes. - // Kafka cluster will not provide a writing service in this process. - // Time out in one minute. - config.Metadata.Retry.Max = 120 - config.Metadata.Retry.Backoff = 500 * time.Millisecond - // If it is not set, this means a metadata request against an unreachable - // cluster (all brokers are unreachable or unresponsive) can take up to - // `Net.[Dial|Read]Timeout * BrokerCount * (Metadata.Retry.Max + 1) + - // Metadata.Retry.Backoff * Metadata.Retry.Max` - // to fail. - // See: https://github.com/Shopify/sarama/issues/765 - // and https://github.com/pingcap/tiflow/issues/3352. - config.Metadata.Timeout = 1 * time.Minute - - config.Producer.Partitioner = sarama.NewManualPartitioner - config.Producer.MaxMessageBytes = c.MaxMessageBytes - config.Producer.Return.Successes = true - config.Producer.Return.Errors = true - config.Producer.RequiredAcks = sarama.WaitForAll - - // Time out in five minutes(600 * 500ms). - config.Producer.Retry.Max = 600 - config.Producer.Retry.Backoff = 500 * time.Millisecond - - switch strings.ToLower(strings.TrimSpace(c.Compression)) { - case "none": - config.Producer.Compression = sarama.CompressionNone - case "gzip": - config.Producer.Compression = sarama.CompressionGZIP - case "snappy": - config.Producer.Compression = sarama.CompressionSnappy - case "lz4": - config.Producer.Compression = sarama.CompressionLZ4 - case "zstd": - config.Producer.Compression = sarama.CompressionZSTD - default: - log.Warn("Unsupported compression algorithm", zap.String("compression", c.Compression)) - config.Producer.Compression = sarama.CompressionNone - } - - // Time out in one minute(120 * 500ms). - config.Admin.Retry.Max = 120 - config.Admin.Retry.Backoff = 500 * time.Millisecond - config.Admin.Timeout = 20 * time.Second - - if c.Credential != nil && len(c.Credential.CAPath) != 0 { - config.Net.TLS.Enable = true - config.Net.TLS.Config, err = c.Credential.ToTLSConfig() - if err != nil { - return nil, errors.Trace(err) - } - } - if c.SaslScram != nil && len(c.SaslScram.SaslUser) != 0 { - config.Net.SASL.Enable = true - config.Net.SASL.User = c.SaslScram.SaslUser - config.Net.SASL.Password = c.SaslScram.SaslPassword - config.Net.SASL.Mechanism = sarama.SASLMechanism(c.SaslScram.SaslMechanism) - if strings.EqualFold(c.SaslScram.SaslMechanism, "SCRAM-SHA-256") { - config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &security.XDGSCRAMClient{HashGeneratorFcn: security.SHA256} } - } else if strings.EqualFold(c.SaslScram.SaslMechanism, "SCRAM-SHA-512") { - config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &security.XDGSCRAMClient{HashGeneratorFcn: security.SHA512} } - } else { - return nil, errors.New("Unsupported sasl-mechanism, should be SCRAM-SHA-256 or SCRAM-SHA-512") - } - } - - return config, err -} - // getBrokerConfig gets broker config by name. func getBrokerConfig(admin kafka.ClusterAdminClient, brokerConfigName string) (string, error) { _, controllerID, err := admin.DescribeCluster() @@ -709,29 +519,3 @@ func getTopicConfig(admin kafka.ClusterAdminClient, detail sarama.TopicDetail, t return getBrokerConfig(admin, brokerConfigName) } - -func (c *Config) setPartitionNum(realPartitionCount int32) error { - // user does not specify the `partition-num` in the sink-uri - if c.PartitionNum == 0 { - c.PartitionNum = realPartitionCount - return nil - } - - if c.PartitionNum < realPartitionCount { - log.Warn("number of partition specified in sink-uri is less than that of the actual topic. "+ - "Some partitions will not have messages dispatched to", - zap.Int32("sink-uri partitions", c.PartitionNum), - zap.Int32("topic partitions", realPartitionCount)) - return nil - } - - // Make sure that the user-specified `partition-num` is not greater than - // the real partition count, since messages would be dispatched to different - // partitions, this could prevent potential correctness problems. - if c.PartitionNum > realPartitionCount { - return cerror.ErrKafkaInvalidPartitionNum.GenWithStack( - "the number of partition (%d) specified in sink-uri is more than that of actual topic (%d)", - c.PartitionNum, realPartitionCount) - } - return nil -} diff --git a/cdc/sink/producer/kafka/kafka_test.go b/cdc/sink/producer/kafka/kafka_test.go index 5bcd41118a0..4c67d395a60 100644 --- a/cdc/sink/producer/kafka/kafka_test.go +++ b/cdc/sink/producer/kafka/kafka_test.go @@ -15,8 +15,6 @@ package kafka import ( "context" - "fmt" - "net/url" "strings" "sync" "testing" @@ -26,10 +24,7 @@ import ( "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/tiflow/cdc/sink/codec" - "github.com/pingcap/tiflow/pkg/config" - cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/kafka" - "github.com/pingcap/tiflow/pkg/security" "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/util/testleak" ) @@ -67,45 +62,6 @@ func (s *kafkaSuite) TestClientID(c *check.C) { } } -func (s *kafkaSuite) TestInitializeConfig(c *check.C) { - defer testleak.AfterTest(c) - cfg := NewConfig() - - uriTemplate := "kafka://127.0.0.1:9092/kafka-test?kafka-version=2.6.0&max-batch-size=5" + - "&max-message-bytes=%s&partition-num=1&replication-factor=3" + - "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip" - maxMessageSize := "4096" // 4kb - uri := fmt.Sprintf(uriTemplate, maxMessageSize) - - sinkURI, err := url.Parse(uri) - c.Assert(err, check.IsNil) - - replicaConfig := config.GetDefaultReplicaConfig() - - opts := make(map[string]string) - err = cfg.Initialize(sinkURI, replicaConfig, opts) - c.Assert(err, check.IsNil) - - c.Assert(cfg.PartitionNum, check.Equals, int32(1)) - c.Assert(cfg.ReplicationFactor, check.Equals, int16(3)) - c.Assert(cfg.Version, check.Equals, "2.6.0") - c.Assert(cfg.MaxMessageBytes, check.Equals, 4096) - - expectedOpts := map[string]string{ - "max-message-bytes": maxMessageSize, - "max-batch-size": "5", - } - for k, v := range opts { - c.Assert(v, check.Equals, expectedOpts[k]) - } - - uri = "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" - sinkURI, err = url.Parse(uri) - c.Assert(err, check.IsNil) - err = cfg.Initialize(sinkURI, replicaConfig, opts) - c.Assert(errors.Cause(err), check.ErrorMatches, ".*invalid partition num.*") -} - func (s *kafkaSuite) TestSaramaProducer(c *check.C) { defer testleak.AfterTest(c)() ctx, cancel := context.WithCancel(context.Background()) @@ -117,7 +73,7 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) { metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition(topic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) metadataResponse.AddTopicPartition(topic, 1, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) - leader.Returns(metadataResponse) + // Response for `sarama.NewClient` leader.Returns(metadataResponse) prodSuccess := new(sarama.ProduceResponse) @@ -152,6 +108,7 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) { }() opts := make(map[string]string) + ctx = util.PutRoleInCtx(ctx, util.RoleTester) producer, err := NewKafkaSaramaProducer(ctx, topic, config, opts, errCh) c.Assert(err, check.IsNil) c.Assert(producer.GetPartitionNum(), check.Equals, int32(2)) @@ -235,23 +192,6 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) { } } -func (s *kafkaSuite) TestSetPartitionNum(c *check.C) { - defer testleak.AfterTest(c)() - config := NewConfig() - err := config.setPartitionNum(2) - c.Assert(err, check.IsNil) - c.Assert(config.PartitionNum, check.Equals, int32(2)) - - config.PartitionNum = 1 - err = config.setPartitionNum(2) - c.Assert(err, check.IsNil) - c.Assert(config.PartitionNum, check.Equals, int32(1)) - - config.PartitionNum = 3 - err = config.setPartitionNum(2) - c.Assert(cerror.ErrKafkaInvalidPartitionNum.Equal(err), check.IsTrue) -} - func (s *kafkaSuite) TestValidateAndCreateTopic(c *check.C) { defer testleak.AfterTest(c) config := NewConfig() @@ -343,62 +283,6 @@ func (s *kafkaSuite) TestValidateAndCreateTopic(c *check.C) { c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, defaultMaxMessageBytes) } -func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) { - defer testleak.AfterTest(c)() - ctx := context.Background() - config := NewConfig() - config.Version = "invalid" - _, err := newSaramaConfigImpl(ctx, config) - c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*") - - ctx = util.SetOwnerInCtx(ctx) - config.Version = "2.6.0" - config.ClientID = "^invalid$" - _, err = newSaramaConfigImpl(ctx, config) - c.Assert(cerror.ErrKafkaInvalidClientID.Equal(err), check.IsTrue) - - config.ClientID = "test-kafka-client" - compressionCases := []struct { - algorithm string - expected sarama.CompressionCodec - }{ - {"none", sarama.CompressionNone}, - {"gzip", sarama.CompressionGZIP}, - {"snappy", sarama.CompressionSnappy}, - {"lz4", sarama.CompressionLZ4}, - {"zstd", sarama.CompressionZSTD}, - {"others", sarama.CompressionNone}, - } - for _, cc := range compressionCases { - config.Compression = cc.algorithm - cfg, err := newSaramaConfigImpl(ctx, config) - c.Assert(err, check.IsNil) - c.Assert(cfg.Producer.Compression, check.Equals, cc.expected) - } - - config.Credential = &security.Credential{ - CAPath: "/invalid/ca/path", - } - _, err = newSaramaConfigImpl(ctx, config) - c.Assert(errors.Cause(err), check.ErrorMatches, ".*no such file or directory") - - saslConfig := NewConfig() - saslConfig.Version = "2.6.0" - saslConfig.ClientID = "test-sasl-scram" - saslConfig.SaslScram = &security.SaslScram{ - SaslUser: "user", - SaslPassword: "password", - SaslMechanism: sarama.SASLTypeSCRAMSHA256, - } - - cfg, err := newSaramaConfigImpl(ctx, saslConfig) - c.Assert(err, check.IsNil) - c.Assert(cfg, check.NotNil) - c.Assert(cfg.Net.SASL.User, check.Equals, "user") - c.Assert(cfg.Net.SASL.Password, check.Equals, "password") - c.Assert(cfg.Net.SASL.Mechanism, check.Equals, sarama.SASLMechanism("SCRAM-SHA-256")) -} - func (s *kafkaSuite) TestCreateProducerFailed(c *check.C) { defer testleak.AfterTest(c)() ctx := context.Background() @@ -412,6 +296,7 @@ func (s *kafkaSuite) TestCreateProducerFailed(c *check.C) { NewAdminClientImpl = kafka.NewSaramaAdminClient }() opts := make(map[string]string) + ctx = util.PutRoleInCtx(ctx, util.RoleTester) _, err := NewKafkaSaramaProducer(ctx, topic, config, opts, errCh) c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*") } @@ -428,7 +313,7 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition(topic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) metadataResponse.AddTopicPartition(topic, 1, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) - leader.Returns(metadataResponse) + // Response for `sarama.NewClient` leader.Returns(metadataResponse) config := NewConfig() @@ -460,6 +345,7 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { errCh := make(chan error, 1) opts := make(map[string]string) + ctx = util.PutRoleInCtx(ctx, util.RoleTester) producer, err := NewKafkaSaramaProducer(ctx, topic, config, opts, errCh) c.Assert(opts, check.HasKey, "max-message-bytes") defer func() { @@ -510,7 +396,7 @@ func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) { metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition(topic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) metadataResponse.AddTopicPartition(topic, 1, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) - leader.Returns(metadataResponse) + // Response for `sarama.NewClient` leader.Returns(metadataResponse) config := NewConfig() @@ -529,6 +415,7 @@ func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) { errCh := make(chan error, 1) opts := make(map[string]string) + ctx = util.PutRoleInCtx(ctx, util.RoleTester) producer, err := NewKafkaSaramaProducer(ctx, topic, config, opts, errCh) c.Assert(opts, check.HasKey, "max-message-bytes") defer func() { diff --git a/cdc/sink/sink.go b/cdc/sink/sink.go index 67ddb9005cd..5fd1366cec7 100644 --- a/cdc/sink/sink.go +++ b/cdc/sink/sink.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/filter" + "github.com/pingcap/tiflow/pkg/util" ) // Sink options keys @@ -124,6 +125,7 @@ func Validate(ctx context.Context, sinkURI string, cfg *config.ReplicaConfig, op return err } errCh := make(chan error) + ctx = util.PutRoleInCtx(ctx, util.RoleClient) // TODO: find a better way to verify a sinkURI is valid s, err := NewSink(ctx, "sink-verify", sinkURI, sinkFilter, cfg, opts, errCh) if err != nil { diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index a7a9d7d2c02..45676e6e815 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -333,6 +333,7 @@ func NewConsumer(ctx context.Context) (*Consumer, error) { } c.sinks = make([]*partitionSink, kafkaPartitionNum) ctx, cancel := context.WithCancel(ctx) + ctx = util.PutRoleInCtx(ctx, util.RoleKafkaConsumer) errCh := make(chan error, 1) opts := map[string]string{} for i := 0; i < int(kafkaPartitionNum); i++ { diff --git a/pkg/util/ctx.go b/pkg/util/ctx.go index 492b00f8b4e..4f68af2cf33 100644 --- a/pkg/util/ctx.go +++ b/pkg/util/ctx.go @@ -31,6 +31,7 @@ const ( ctxKeyIsOwner = ctxKey("isOwner") ctxKeyTimezone = ctxKey("timezone") ctxKeyKVStorage = ctxKey("kvStorage") + ctxKeyRole = ctxKey("role") ) // CaptureAddrFromCtx returns a capture ID stored in the specified context. @@ -121,6 +122,21 @@ func PutChangefeedIDInCtx(ctx context.Context, changefeedID string) context.Cont return context.WithValue(ctx, ctxKeyChangefeedID, changefeedID) } +// RoleFromCtx returns a role stored in the specified context. +// It returns RoleUnknown if there's no valid role found +func RoleFromCtx(ctx context.Context) Role { + role, ok := ctx.Value(ctxKeyRole).(Role) + if !ok { + return RoleUnknown + } + return role +} + +// PutRoleInCtx return a new child context with the specified role stored. +func PutRoleInCtx(ctx context.Context, role Role) context.Context { + return context.WithValue(ctx, ctxKeyRole, role) +} + // ZapFieldCapture returns a zap field containing capture address // TODO: log redact for capture address func ZapFieldCapture(ctx context.Context) zap.Field { diff --git a/pkg/util/identity.go b/pkg/util/identity.go new file mode 100644 index 00000000000..b9fa894e0cd --- /dev/null +++ b/pkg/util/identity.go @@ -0,0 +1,54 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +// Role is the operator role, mainly used for logging at the moment. +type Role int + +const ( + // RoleOwner is the owner of the cluster. + RoleOwner Role = iota + // RoleProcessor is the processor of the cluster. + RoleProcessor + // RoleClient is the client. + RoleClient + // RoleRedoLogApplier is the redo log applier. + RoleRedoLogApplier + // RoleKafkaConsumer is the kafka consumer. + RoleKafkaConsumer + // RoleTester for test. + RoleTester + // RoleUnknown is the unknown role. + RoleUnknown +) + +func (r Role) String() string { + switch r { + case RoleOwner: + return "owner" + case RoleProcessor: + return "processor" + case RoleClient: + return "cdc-client" + case RoleKafkaConsumer: + return "kafka-consumer" + case RoleRedoLogApplier: + return "redo-applier" + case RoleTester: + return "tester" + case RoleUnknown: + return "unknown" + } + return "unknown" +}