Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): set max-message-bytes default to 10m (#4036) #4063

Merged
Merged
Prev Previous commit
Next Next commit
This is an automated cherry-pick of #4074
Signed-off-by: ti-chi-bot <[email protected]>
3AceShowHand authored and ti-chi-bot committed Dec 26, 2021
commit a01549d3af21b5a682f2e4b90aff5add104b5e7a
10 changes: 10 additions & 0 deletions cdc/sink/codec/craft.go
Original file line number Diff line number Diff line change
@@ -97,13 +97,23 @@ func (e *CraftEventBatchEncoder) Reset() {
// SetParams reads relevant parameters for craft protocol
func (e *CraftEventBatchEncoder) SetParams(params map[string]string) error {
var err error
maxMessageBytes, ok := params["max-message-bytes"]
if !ok {
return cerror.ErrSinkInvalidConfig.GenWithStack("max-message-bytes not found")
}

<<<<<<< HEAD
e.maxMessageSize = DefaultMaxMessageBytes
if maxMessageBytes, ok := params["max-message-bytes"]; ok {
e.maxMessageSize, err = strconv.Atoi(maxMessageBytes)
if err != nil {
return cerror.ErrSinkInvalidConfig.Wrap(err)
}
=======
e.maxMessageBytes, err = strconv.Atoi(maxMessageBytes)
if err != nil {
return cerror.WrapError(cerror.ErrSinkInvalidConfig, err)
>>>>>>> f097a1294 (codec(cdc): fix encoder `max-message-bytes` (#4074))
}
if e.maxMessageSize <= 0 || e.maxMessageSize > math.MaxInt32 {
return cerror.ErrSinkInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", e.maxMessageSize))
14 changes: 7 additions & 7 deletions cdc/sink/codec/craft_test.go
Original file line number Diff line number Diff line change
@@ -138,7 +138,7 @@ func (s *craftBatchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatc
func (s *craftBatchSuite) TestParamsEdgeCases(c *check.C) {
defer testleak.AfterTest(c)()
encoder := NewCraftEventBatchEncoder().(*CraftEventBatchEncoder)
err := encoder.SetParams(map[string]string{})
err := encoder.SetParams(map[string]string{"max-message-bytes": "10485760"})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
c.Assert(encoder.maxMessageSize, check.Equals, DefaultMaxMessageBytes)
@@ -157,21 +157,21 @@ func (s *craftBatchSuite) TestParamsEdgeCases(c *check.C) {
err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxUint32)})
c.Assert(err, check.NotNil)

err = encoder.SetParams(map[string]string{"max-batch-size": "0"})
err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxUint32), "max-batch-size": "0"})
c.Assert(err, check.ErrorMatches, ".*invalid.*")

err = encoder.SetParams(map[string]string{"max-batch-size": "-1"})
err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxUint32), "max-batch-size": "-1"})
c.Assert(err, check.ErrorMatches, ".*invalid.*")

err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxUint16)})
err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": strconv.Itoa(math.MaxUint16)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, int(math.MaxUint16))
c.Assert(encoder.maxMessageSize, check.Equals, DefaultMaxMessageBytes)

err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxInt32)})
err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": strconv.Itoa(math.MaxInt32)})
c.Assert(err, check.NotNil)

err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxUint32)})
err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": strconv.Itoa(math.MaxUint32)})
c.Assert(err, check.NotNil)
}

@@ -202,7 +202,7 @@ func (s *craftBatchSuite) TestMaxMessageBytes(c *check.C) {
func (s *craftBatchSuite) TestMaxBatchSize(c *check.C) {
defer testleak.AfterTest(c)()
encoder := NewCraftEventBatchEncoder()
err := encoder.SetParams(map[string]string{"max-batch-size": "64"})
err := encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": "64"})
c.Check(err, check.IsNil)

testEvent := &model.RowChangedEvent{
17 changes: 17 additions & 0 deletions cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
@@ -348,9 +348,15 @@ type JSONEventBatchEncoder struct {
maxBatchSize int
}

<<<<<<< HEAD
// GetMaxMessageSize is only for unit testing.
func (d *JSONEventBatchEncoder) GetMaxMessageSize() int {
return d.maxMessageSize
=======
// GetMaxMessageBytes is only for unit testing.
func (d *JSONEventBatchEncoder) GetMaxMessageBytes() int {
return d.maxMessageBytes
>>>>>>> f097a1294 (codec(cdc): fix encoder `max-message-bytes` (#4074))
}

// GetMaxBatchSize is only for unit testing.
@@ -577,12 +583,23 @@ func (d *JSONEventBatchEncoder) Reset() {
func (d *JSONEventBatchEncoder) SetParams(params map[string]string) error {
var err error

<<<<<<< HEAD
d.maxMessageSize = DefaultMaxMessageBytes
if maxMessageBytes, ok := params["max-message-bytes"]; ok {
d.maxMessageSize, err = strconv.Atoi(maxMessageBytes)
if err != nil {
return cerror.ErrSinkInvalidConfig.Wrap(err)
}
=======
maxMessageBytes, ok := params["max-message-bytes"]
if !ok {
return cerror.ErrSinkInvalidConfig.Wrap(errors.New("max-message-bytes not found"))
}

d.maxMessageBytes, err = strconv.Atoi(maxMessageBytes)
if err != nil {
return cerror.ErrSinkInvalidConfig.Wrap(err)
>>>>>>> f097a1294 (codec(cdc): fix encoder `max-message-bytes` (#4074))
}
if d.maxMessageSize <= 0 {
return cerror.ErrSinkInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", d.maxMessageSize))
42 changes: 36 additions & 6 deletions cdc/sink/codec/json_test.go
Original file line number Diff line number Diff line change
@@ -14,12 +14,14 @@
package codec

import (
"context"
"math"
"sort"
"strconv"
"testing"

"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/util/testleak"
@@ -200,7 +202,7 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco
func (s *batchSuite) TestParamsEdgeCases(c *check.C) {
defer testleak.AfterTest(c)()
encoder := NewJSONEventBatchEncoder().(*JSONEventBatchEncoder)
err := encoder.SetParams(map[string]string{})
err := encoder.SetParams(map[string]string{"max-message-bytes": "10485760"})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize)
c.Assert(encoder.maxMessageSize, check.Equals, DefaultMaxMessageBytes)
@@ -227,17 +229,43 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) {
err = encoder.SetParams(map[string]string{"max-batch-size": "-1"})
c.Assert(err, check.ErrorMatches, ".*invalid.*")

err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxInt32)})
err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": strconv.Itoa(math.MaxInt32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, math.MaxInt32)
c.Assert(encoder.maxMessageSize, check.Equals, DefaultMaxMessageBytes)

err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxUint32)})
err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": strconv.Itoa(math.MaxUint32)})
c.Assert(err, check.IsNil)
c.Assert(encoder.maxBatchSize, check.Equals, math.MaxUint32)
c.Assert(encoder.maxMessageSize, check.Equals, DefaultMaxMessageBytes)
}

func (s *batchSuite) TestSetParams(c *check.C) {
defer testleak.AfterTest(c)

opts := make(map[string]string)
encoderBuilder := newJSONEventBatchEncoderBuilder(opts)
c.Assert(encoderBuilder, check.NotNil)
encoder, err := encoderBuilder.Build(context.Background())
c.Assert(encoder, check.IsNil)
c.Assert(
errors.Cause(err),
check.ErrorMatches,
".*max-message-bytes not found.*",
)

opts["max-message-bytes"] = "1"
encoderBuilder = newJSONEventBatchEncoderBuilder(opts)
c.Assert(encoderBuilder, check.NotNil)
encoder, err = encoderBuilder.Build(context.Background())
c.Assert(err, check.IsNil)
c.Assert(encoder, check.NotNil)

jsonEncoder, ok := encoder.(*JSONEventBatchEncoder)
c.Assert(ok, check.IsTrue)
c.Assert(jsonEncoder.GetMaxMessageBytes(), check.Equals, 1)
}

func (s *batchSuite) TestMaxMessageBytes(c *check.C) {
defer testleak.AfterTest(c)()
encoder := NewJSONEventBatchEncoder()
@@ -282,9 +310,11 @@ func (s *batchSuite) TestMaxMessageBytes(c *check.C) {

func (s *batchSuite) TestMaxBatchSize(c *check.C) {
defer testleak.AfterTest(c)()
encoder := NewJSONEventBatchEncoder()
err := encoder.SetParams(map[string]string{"max-batch-size": "64"})
c.Check(err, check.IsNil)
encoderBuilder := newJSONEventBatchEncoderBuilder(map[string]string{"max-message-bytes": "1048576", "max-batch-size": "64"})
c.Assert(encoderBuilder, check.NotNil)
encoder, err := encoderBuilder.Build(context.Background())
c.Assert(err, check.IsNil)
c.Assert(encoder, check.NotNil)

testEvent := &model.RowChangedEvent{
CommitTs: 1,
8 changes: 8 additions & 0 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
@@ -387,7 +387,15 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi
topic := strings.TrimFunc(sinkURI.Path, func(r rune) bool {
return r == '/'
})
<<<<<<< HEAD
producer, err := kafka.NewKafkaSaramaProducer(ctx, sinkURI.Host, topic, config, errCh)
=======
if topic == "" {
return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("no topic is specified in sink-uri")
}

sProducer, err := kafka.NewKafkaSaramaProducer(ctx, topic, producerConfig, opts, errCh)
>>>>>>> f097a1294 (codec(cdc): fix encoder `max-message-bytes` (#4074))
if err != nil {
return nil, errors.Trace(err)
}
6 changes: 5 additions & 1 deletion cdc/sink/mq_test.go
Original file line number Diff line number Diff line change
@@ -70,7 +70,11 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) {

c.Assert(encoder, check.FitsTypeOf, &codec.JSONEventBatchEncoder{})
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxBatchSize(), check.Equals, 1)
<<<<<<< HEAD
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageSize(), check.Equals, 4194304)
=======
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageBytes(), check.Equals, 1048576)
>>>>>>> f097a1294 (codec(cdc): fix encoder `max-message-bytes` (#4074))

// mock kafka broker processes 1 row changed event
leader.Returns(prodSuccess)
@@ -219,5 +223,5 @@ func (s mqSinkSuite) TestPulsarSinkEncoderConfig(c *check.C) {
c.Assert(err, check.IsNil)
c.Assert(encoder, check.FitsTypeOf, &codec.JSONEventBatchEncoder{})
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxBatchSize(), check.Equals, 1)
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageSize(), check.Equals, 4194304)
c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageBytes(), check.Equals, 4194304)
}
88 changes: 88 additions & 0 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
@@ -432,7 +432,11 @@ func kafkaTopicPreProcess(topic, address string, config *Config, cfg *sarama.Con
var newSaramaConfigImpl = newSaramaConfig

// NewKafkaSaramaProducer creates a kafka sarama producer
<<<<<<< HEAD
func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, config *Config, errCh chan error) (*kafkaSaramaProducer, error) {
=======
func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, opts map[string]string, errCh chan error) (*kafkaSaramaProducer, error) {
>>>>>>> f097a1294 (codec(cdc): fix encoder `max-message-bytes` (#4074))
log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config))
cfg, err := newSaramaConfigImpl(ctx, config)
if err != nil {
@@ -441,7 +445,21 @@ func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, c
if config.PartitionNum < 0 {
return nil, cerror.ErrKafkaInvalidPartitionNum.GenWithStackByArgs(config.PartitionNum)
}
<<<<<<< HEAD
asyncClient, err := sarama.NewAsyncProducer(strings.Split(address, ","), cfg)
=======
defer func() {
if err := admin.Close(); err != nil {
log.Warn("close kafka cluster admin failed", zap.Error(err))
}
}()

if err := validateMaxMessageBytesAndCreateTopic(admin, topic, config, cfg, opts); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

asyncClient, err := sarama.NewAsyncProducer(config.BrokerEndpoints, cfg)
>>>>>>> f097a1294 (codec(cdc): fix encoder `max-message-bytes` (#4074))
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
@@ -514,19 +532,89 @@ func kafkaClientID(role, captureAddr, changefeedID, configuredClientID string) (
return
}

<<<<<<< HEAD
// NewSaramaConfig return the default config and set the according version and metrics
func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
config := sarama.NewConfig()
=======
func validateMaxMessageBytesAndCreateTopic(admin kafka.ClusterAdminClient, topic string, config *Config, saramaConfig *sarama.Config, opts map[string]string) error {
topics, err := admin.ListTopics()
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

info, exists := topics[topic]
// once we have found the topic, no matter `auto-create-topic`, make sure user input parameters are valid.
if exists {
// make sure that producer's `MaxMessageBytes` smaller than topic's `max.message.bytes`
topicMaxMessageBytes, err := getTopicMaxMessageBytes(admin, info)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

if topicMaxMessageBytes < config.MaxMessageBytes {
log.Warn("topic's `max.message.bytes` less than the user set `max-message-bytes`,"+
"use topic's `max.message.bytes` to initialize the Kafka producer",
zap.Int("max.message.bytes", topicMaxMessageBytes),
zap.Int("max-message-bytes", config.MaxMessageBytes))
saramaConfig.Producer.MaxMessageBytes = topicMaxMessageBytes
}
opts["max-message-bytes"] = strconv.Itoa(saramaConfig.Producer.MaxMessageBytes)
>>>>>>> f097a1294 (codec(cdc): fix encoder `max-message-bytes` (#4074))

version, err := sarama.ParseKafkaVersion(c.Version)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidVersion, err)
}
<<<<<<< HEAD
var role string
if util.IsOwnerFromCtx(ctx) {
role = "owner"
} else {
role = "processor"
=======

// when create the topic, `max.message.bytes` is decided by the broker,
// it would use broker's `message.max.bytes` to set topic's `max.message.bytes`.
// TiCDC need to make sure that the producer's `MaxMessageBytes` won't larger than
// broker's `message.max.bytes`.
if brokerMessageMaxBytes < config.MaxMessageBytes {
log.Warn("broker's `message.max.bytes` less than the user set `max-message-bytes`,"+
"use broker's `message.max.bytes` to initialize the Kafka producer",
zap.Int("message.max.bytes", brokerMessageMaxBytes),
zap.Int("max-message-bytes", config.MaxMessageBytes))
saramaConfig.Producer.MaxMessageBytes = brokerMessageMaxBytes
}
opts["max-message-bytes"] = strconv.Itoa(saramaConfig.Producer.MaxMessageBytes)

// topic not exists yet, and user does not specify the `partition-num` in the sink uri.
if config.PartitionNum == 0 {
config.PartitionNum = defaultPartitionNum
log.Warn("partition-num is not set, use the default partition count",
zap.String("topic", topic), zap.Int32("partitions", config.PartitionNum))
}

err = admin.CreateTopic(topic, &sarama.TopicDetail{
NumPartitions: config.PartitionNum,
ReplicationFactor: config.ReplicationFactor,
}, false)
// TODO identify the cause of "Topic with this name already exists"
if err != nil && !strings.Contains(err.Error(), "already exists") {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

log.Info("TiCDC create the topic",
zap.Int32("partition-num", config.PartitionNum),
zap.Int16("replication-factor", config.ReplicationFactor))

return nil
}

func getBrokerMessageMaxBytes(admin kafka.ClusterAdminClient) (int, error) {
_, controllerID, err := admin.DescribeCluster()
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
>>>>>>> f097a1294 (codec(cdc): fix encoder `max-message-bytes` (#4074))
}
captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)
Loading