Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): actively checks the size of min.insync.replicas and replication-factor (#4263) #4268

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 141 additions & 6 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,21 @@ func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, o
return nil, err
}

<<<<<<< HEAD
if err := topicPreProcess(topic, config, cfg); err != nil {
=======
admin, err := NewAdminClientImpl(config.BrokerEndpoints, cfg)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
defer func() {
if err := admin.Close(); err != nil {
log.Warn("close kafka cluster admin failed", zap.Error(err))
}
}()

if err := validateAndCreateTopic(admin, topic, config, cfg, opts); err != nil {
>>>>>>> afa9a2bf5 (sink(ticdc): actively checks the size of min.insync.replicas and replication-factor (#4263))
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
opts["max-message-bytes"] = strconv.Itoa(cfg.Producer.MaxMessageBytes)
Expand Down Expand Up @@ -562,9 +576,54 @@ func kafkaClientID(role, captureAddr, changefeedID, configuredClientID string) (
return
}

<<<<<<< HEAD
// NewSaramaConfig return the default config and set the according version and metrics
func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
config := sarama.NewConfig()
=======
func validateAndCreateTopic(admin kafka.ClusterAdminClient, topic string, config *Config, saramaConfig *sarama.Config,
opts map[string]string) error {
topics, err := admin.ListTopics()
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

err = validateMinInsyncReplicas(admin, topics, topic, int(config.ReplicationFactor))
if err != nil {
return cerror.ErrKafkaInvalidConfig.Wrap(err).GenWithStack(
"because TiCDC Kafka producer's `request.required.acks` defaults to -1, " +
"TiCDC cannot deliver messages when the `replication-factor` is less than `min.insync.replicas`")
}

info, exists := topics[topic]
// once we have found the topic, no matter `auto-create-topic`, make sure user input parameters are valid.
if exists {
// make sure that producer's `MaxMessageBytes` smaller than topic's `max.message.bytes`
topicMaxMessageBytesStr, err := getTopicConfig(admin, info, kafka.TopicMaxMessageBytesConfigName,
kafka.BrokerMessageMaxBytesConfigName)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
topicMaxMessageBytes, err := strconv.Atoi(topicMaxMessageBytesStr)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

if topicMaxMessageBytes < config.MaxMessageBytes {
log.Warn("topic's `max.message.bytes` less than the user set `max-message-bytes`,"+
"use topic's `max.message.bytes` to initialize the Kafka producer",
zap.Int("max.message.bytes", topicMaxMessageBytes),
zap.Int("max-message-bytes", config.MaxMessageBytes))
saramaConfig.Producer.MaxMessageBytes = topicMaxMessageBytes
}
opts["max-message-bytes"] = strconv.Itoa(saramaConfig.Producer.MaxMessageBytes)

// no need to create the topic, but we would have to log user if they found enter wrong topic name later
if config.AutoCreate {
log.Warn("topic already exist, TiCDC will not create the topic",
zap.String("topic", topic), zap.Any("detail", info))
}
>>>>>>> afa9a2bf5 (sink(ticdc): actively checks the size of min.insync.replicas and replication-factor (#4263))

version, err := sarama.ParseKafkaVersion(c.Version)
if err != nil {
Expand All @@ -579,7 +638,11 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)

<<<<<<< HEAD
config.ClientID, err = kafkaClientID(role, captureAddr, changefeedID, c.ClientID)
=======
brokerMessageMaxBytesStr, err := getBrokerConfig(admin, kafka.BrokerMessageMaxBytesConfigName)
>>>>>>> afa9a2bf5 (sink(ticdc): actively checks the size of min.insync.replicas and replication-factor (#4263))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -626,6 +689,10 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
log.Warn("Unsupported compression algorithm", zap.String("compression", c.Compression))
config.Producer.Compression = sarama.CompressionNone
}
brokerMessageMaxBytes, err := strconv.Atoi(brokerMessageMaxBytesStr)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

// Time out in one minute(120 * 500ms).
config.Admin.Retry.Max = 120
Expand Down Expand Up @@ -656,16 +723,69 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
return config, err
}

<<<<<<< HEAD
func getBrokerMessageMaxBytes(admin sarama.ClusterAdmin) (int, error) {
target := "message.max.bytes"
=======
func validateMinInsyncReplicas(admin kafka.ClusterAdminClient,
topics map[string]sarama.TopicDetail, topic string, replicationFactor int) error {
minInsyncReplicasConfigGetter := func() (string, bool, error) {
info, exists := topics[topic]
if exists {
minInsyncReplicasStr, err := getTopicConfig(admin, info,
kafka.MinInsyncReplicasConfigName,
kafka.MinInsyncReplicasConfigName)
if err != nil {
return "", true, err
}
return minInsyncReplicasStr, true, nil
}

minInsyncReplicasStr, err := getBrokerConfig(admin, kafka.MinInsyncReplicasConfigName)
if err != nil {
return "", false, err
}

return minInsyncReplicasStr, false, nil
}

minInsyncReplicasStr, exists, err := minInsyncReplicasConfigGetter()
if err != nil {
return err
}
minInsyncReplicas, err := strconv.Atoi(minInsyncReplicasStr)
if err != nil {
return err
}

configFrom := "topic"
if !exists {
configFrom = "broker"
}

if replicationFactor < minInsyncReplicas {
msg := fmt.Sprintf("`replication-factor` cannot be smaller than the `%s` of %s",
kafka.MinInsyncReplicasConfigName, configFrom)
log.Error(msg, zap.Int("replicationFactor", replicationFactor),
zap.Int("minInsyncReplicas", minInsyncReplicas))
return errors.New(msg)
}

return nil
}

// getBrokerConfig gets broker config by name.
func getBrokerConfig(admin kafka.ClusterAdminClient, brokerConfigName string) (string, error) {
>>>>>>> afa9a2bf5 (sink(ticdc): actively checks the size of min.insync.replicas and replication-factor (#4263))
_, controllerID, err := admin.DescribeCluster()
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
return "", err
}

configEntries, err := admin.DescribeConfig(sarama.ConfigResource{
Type: sarama.BrokerResource,
Name: strconv.Itoa(int(controllerID)),
<<<<<<< HEAD
ConfigNames: []string{target},
})
if err != nil {
Expand All @@ -676,26 +796,41 @@ func getBrokerMessageMaxBytes(admin sarama.ClusterAdmin) (int, error) {
return 0, cerror.ErrKafkaNewSaramaProducer.GenWithStack(
"since cannot find the `message.max.bytes` from the broker's configuration, " +
"ticdc decline to create the topic and changefeed to prevent potential error")
=======
ConfigNames: []string{brokerConfigName},
})
if err != nil {
return "", err
>>>>>>> afa9a2bf5 (sink(ticdc): actively checks the size of min.insync.replicas and replication-factor (#4263))
}

result, err := strconv.Atoi(configEntries[0].Value)
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
if len(configEntries) == 0 || configEntries[0].Name != brokerConfigName {
return "", errors.New(fmt.Sprintf(
"cannot find the `%s` from the broker's configuration", brokerConfigName))
}

return result, nil
return configEntries[0].Value, nil
}

<<<<<<< HEAD
func getTopicMaxMessageBytes(admin sarama.ClusterAdmin, info sarama.TopicDetail) (int, error) {
if a, ok := info.ConfigEntries["max.message.bytes"]; ok {
result, err := strconv.Atoi(*a)
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
return result, nil
=======
// getTopicConfig gets topic config by name.
// If the topic does not have this configuration, we will try to get it from the broker's configuration.
// NOTICE: The configuration names of topic and broker may be different for the same configuration.
func getTopicConfig(admin kafka.ClusterAdminClient, detail sarama.TopicDetail, topicConfigName string, brokerConfigName string) (string, error) {
if a, ok := detail.ConfigEntries[topicConfigName]; ok {
return *a, nil
>>>>>>> afa9a2bf5 (sink(ticdc): actively checks the size of min.insync.replicas and replication-factor (#4263))
}

return getBrokerMessageMaxBytes(admin)
return getBrokerConfig(admin, brokerConfigName)
}

// adjust the partition-num by the topic's partition count
Expand Down
121 changes: 121 additions & 0 deletions cdc/sink/producer/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,22 +234,51 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) {
}
}

<<<<<<< HEAD
func (s *kafkaSuite) TestAdjustPartitionNum(c *check.C) {
defer testleak.AfterTest(c)()
=======
func (s *kafkaSuite) TestValidateAndCreateTopic(c *check.C) {
defer testleak.AfterTest(c)
>>>>>>> afa9a2bf5 (sink(ticdc): actively checks the size of min.insync.replicas and replication-factor (#4263))
config := NewConfig()
err := config.adjustPartitionNum(2)
c.Assert(err, check.IsNil)
<<<<<<< HEAD
c.Assert(config.PartitionNum, check.Equals, int32(2))
=======
opts := make(map[string]string)
err = validateAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg, opts)
c.Assert(err, check.IsNil)
c.Assert(opts["max-message-bytes"], check.Equals, strconv.Itoa(cfg.Producer.MaxMessageBytes))
>>>>>>> afa9a2bf5 (sink(ticdc): actively checks the size of min.insync.replicas and replication-factor (#4263))

config.PartitionNum = 1
err = config.adjustPartitionNum(2)
c.Assert(err, check.IsNil)
<<<<<<< HEAD
c.Assert(config.PartitionNum, check.Equals, int32(1))

config.PartitionNum = 3
err = config.adjustPartitionNum(2)
c.Assert(cerror.ErrKafkaInvalidPartitionNum.Equal(err), check.IsTrue)
}
=======
opts = make(map[string]string)
err = validateAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg, opts)
c.Assert(err, check.IsNil)
c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, defaultMaxMessageBytes)
c.Assert(opts["max-message-bytes"], check.Equals, strconv.Itoa(cfg.Producer.MaxMessageBytes))

config.MaxMessageBytes = defaultMaxMessageBytes - 1
cfg, err = newSaramaConfigImpl(context.Background(), config)
c.Assert(err, check.IsNil)
opts = make(map[string]string)
err = validateAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg, opts)
c.Assert(err, check.IsNil)
c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, config.MaxMessageBytes)
c.Assert(opts["max-message-bytes"], check.Equals, strconv.Itoa(cfg.Producer.MaxMessageBytes))
>>>>>>> afa9a2bf5 (sink(ticdc): actively checks the size of min.insync.replicas and replication-factor (#4263))

func (s *kafkaSuite) TestTopicPreProcess(c *check.C) {
defer testleak.AfterTest(c)
Expand All @@ -272,11 +301,33 @@ func (s *kafkaSuite) TestTopicPreProcess(c *check.C) {
config.PartitionNum = int32(0)
config.BrokerEndpoints = strings.Split(broker.Addr(), ",")
config.AutoCreate = false
<<<<<<< HEAD

cfg, err := newSaramaConfigImpl(ctx, config)
=======
cfg, err = newSaramaConfigImpl(context.Background(), config)
c.Assert(err, check.IsNil)
opts = make(map[string]string)
err = validateAndCreateTopic(adminClient, "non-exist", config, cfg, opts)
c.Assert(
errors.Cause(err),
check.ErrorMatches,
".*auto-create-topic` is false, and topic not found.*",
)

// When the topic does not exist, use the broker's configuration to create the topic.
// It is less than the value of broker.
config.AutoCreate = true
config.MaxMessageBytes = defaultMaxMessageBytes - 1
cfg, err = newSaramaConfigImpl(context.Background(), config)
c.Assert(err, check.IsNil)
opts = make(map[string]string)
err = validateAndCreateTopic(adminClient, "create-random1", config, cfg, opts)
>>>>>>> afa9a2bf5 (sink(ticdc): actively checks the size of min.insync.replicas and replication-factor (#4263))
c.Assert(err, check.IsNil)
c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, config.MaxMessageBytes)

<<<<<<< HEAD
config.BrokerEndpoints = []string{""}
cfg.Metadata.Retry.Max = 1

Expand Down Expand Up @@ -345,6 +396,76 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) {
c.Assert(cfg.Net.SASL.User, check.Equals, "user")
c.Assert(cfg.Net.SASL.Password, check.Equals, "password")
c.Assert(cfg.Net.SASL.Mechanism, check.Equals, sarama.SASLMechanism("SCRAM-SHA-256"))
=======
// When the topic does not exist, use the broker's configuration to create the topic.
// It is larger than the value of broker.
config.MaxMessageBytes = defaultMaxMessageBytes + 1
config.AutoCreate = true
cfg, err = newSaramaConfigImpl(context.Background(), config)
c.Assert(err, check.IsNil)
opts = make(map[string]string)
err = validateAndCreateTopic(adminClient, "create-random2", config, cfg, opts)
c.Assert(err, check.IsNil)
c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, defaultMaxMessageBytes)
c.Assert(opts["max-message-bytes"], check.Equals, strconv.Itoa(cfg.Producer.MaxMessageBytes))

// When the topic exists, but the topic does not store max message bytes info,
// the check of parameter succeeds.
// It is less than the value of broker.
config.MaxMessageBytes = defaultMaxMessageBytes - 1
cfg, err = newSaramaConfigImpl(context.Background(), config)
c.Assert(err, check.IsNil)
detail := &sarama.TopicDetail{
NumPartitions: 3,
// Does not contain max message bytes information.
ConfigEntries: make(map[string]*string),
}
err = adminClient.CreateTopic("test-topic", detail, false)
c.Assert(err, check.IsNil)
opts = make(map[string]string)
err = validateAndCreateTopic(adminClient, "test-topic", config, cfg, opts)
c.Assert(err, check.IsNil)
c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, config.MaxMessageBytes)
c.Assert(opts["max-message-bytes"], check.Equals, strconv.Itoa(cfg.Producer.MaxMessageBytes))

// When the topic exists, but the topic does not store max message bytes info,
// the check of parameter fails.
// It is larger than the value of broker.
config.MaxMessageBytes = defaultMaxMessageBytes + 1
cfg, err = newSaramaConfigImpl(context.Background(), config)
c.Assert(err, check.IsNil)
opts = make(map[string]string)
err = validateAndCreateTopic(adminClient, "test-topic", config, cfg, opts)
c.Assert(err, check.IsNil)
c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, defaultMaxMessageBytes)
c.Assert(opts["max-message-bytes"], check.Equals, strconv.Itoa(cfg.Producer.MaxMessageBytes))

// Report an error if the replication-factor is less than min.insync.replicas
// when the topic does not exist.
cfg, err = newSaramaConfigImpl(context.Background(), config)
c.Assert(err, check.IsNil)
adminClient.SetMinInsyncReplicas("2")
opts = make(map[string]string)
err = validateAndCreateTopic(adminClient, "create-new-fail-invalid-min-insync-replicas", config, cfg, opts)
c.Assert(
errors.Cause(err),
check.ErrorMatches,
".*`replication-factor` cannot be smaller than the `min.insync.replicas` of broker.*",
)

// Report an error if the replication-factor is less than min.insync.replicas
// when the topic does exist.
cfg, err = newSaramaConfigImpl(context.Background(), config)
c.Assert(err, check.IsNil)
adminClient.SetMinInsyncReplicas("2")
opts = make(map[string]string)
err = validateAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg, opts)
c.Assert(
errors.Cause(err),
check.ErrorMatches,
".*`replication-factor` cannot be smaller than the `min.insync.replicas` of topic.*",
)
>>>>>>> afa9a2bf5 (sink(ticdc): actively checks the size of min.insync.replicas and replication-factor (#4263))
}

func (s *kafkaSuite) TestCreateProducerFailed(c *check.C) {
Expand Down
Loading