Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drainer: fix kafka message limit problem #1039

Merged
merged 14 commits into from
Oct 11, 2021
Merged
1 change: 1 addition & 0 deletions cmd/drainer/drainer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ port = 3306
# kafka-addrs = "127.0.0.1:9092"
# kafka-version = "0.8.2.0"
# kafka-max-messages = 1024
# kafka-max-message-size = 1073741824 # configure max kafka **client** message size
# kafka-client-id = "tidb_binlog"
#
# the topic name drainer will push msg, the default name is <cluster-id>_obinlog
Expand Down
9 changes: 6 additions & 3 deletions drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,9 @@ func (cfg *Config) Parse(args []string) error {
return errors.Trace(err)
}

initializeSaramaGlobalConfig()
if cfg.SyncerCfg.DestDBType == "kafka" {
initializeSaramaGlobalConfig(cfg.SyncerCfg.To.KafkaMaxMessageSize)
}
return cfg.validate()
}

Expand Down Expand Up @@ -439,8 +441,9 @@ func (cfg *Config) adjustConfig() error {
}

if cfg.SyncerCfg.DestDBType == "kafka" {
maxMsgSize = maxKafkaMsgSize

if cfg.SyncerCfg.To.KafkaMaxMessageSize <= 0 {
cfg.SyncerCfg.To.KafkaMaxMessageSize = maxKafkaMsgSize
}
// get KafkaAddrs from zookeeper if ZkAddrs is setted
if cfg.SyncerCfg.To.ZKAddrs != "" {
zkClient, err := newZKFromConnectionString(cfg.SyncerCfg.To.ZKAddrs, time.Second*5, time.Second*60)
Expand Down
13 changes: 10 additions & 3 deletions drainer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ package drainer
import (
"bytes"
"fmt"
"math"
"os"
"path"
"testing"
"time"

"github.com/BurntSushi/toml"
"github.com/Shopify/sarama"
"github.com/pingcap/check"
. "github.com/pingcap/check"
"github.com/pingcap/parser/mysql"
Expand Down Expand Up @@ -222,7 +224,6 @@ func (t *testDrainerSuite) TestAdjustConfig(c *C) {
c.Assert(err, IsNil)
c.Assert(cfg.SyncerCfg.DestDBType, Equals, "file")
c.Assert(cfg.SyncerCfg.WorkerCount, Equals, 1)
c.Assert(maxMsgSize, Equals, maxGrpcMsgSize)

cfg = NewConfig()
err = cfg.adjustConfig()
Expand Down Expand Up @@ -352,12 +353,15 @@ func (t *testKafkaSuite) TestConfigDestDBTypeKafka(c *C) {
c.Assert(cfg.SyncerCfg.To.KafkaAddrs, Matches, defaultKafkaAddrs)
c.Assert(cfg.SyncerCfg.To.KafkaVersion, Equals, defaultKafkaVersion)
c.Assert(cfg.SyncerCfg.To.KafkaMaxMessages, Equals, 1024)
c.Assert(maxMsgSize, Equals, maxKafkaMsgSize)
c.Assert(sarama.MaxResponseSize, Equals, int32(maxKafkaMsgSize))
c.Assert(sarama.MaxRequestSize, Equals, int32(maxKafkaMsgSize)+1)

// With Zookeeper address
// With Zookeeper address and maxKafkaMsgSize
maxInt32 := math.MaxInt32
cfg = NewConfig()
cfg.SyncerCfg.To = new(dsync.DBConfig)
cfg.SyncerCfg.To.ZKAddrs = "host1:2181"
cfg.SyncerCfg.To.KafkaMaxMessageSize = int32(maxInt32)
err = cfg.Parse(args)
c.Assert(err, IsNil)
c.Assert(cfg.MetricsAddr, Equals, "192.168.15.10:9091")
Expand All @@ -371,4 +375,7 @@ func (t *testKafkaSuite) TestConfigDestDBTypeKafka(c *C) {
c.Assert(cfg.SyncerCfg.To.KafkaAddrs, Matches, `(192\.0\.2\.1:9092,192\.0\.2\.2:9092|192\.0\.2\.2:9092,192\.0\.2\.1:9092)`)
c.Assert(cfg.SyncerCfg.To.KafkaVersion, Equals, defaultKafkaVersion)
c.Assert(cfg.SyncerCfg.To.KafkaMaxMessages, Equals, 1024)
c.Assert(sarama.MaxResponseSize, Equals, int32(maxInt32))
c.Assert(sarama.MaxRequestSize, Equals, int32(maxInt32))
initializeSaramaGlobalConfig(maxKafkaMsgSize)
}
2 changes: 1 addition & 1 deletion drainer/pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (p *Pump) createPullBinlogsClient(ctx context.Context, last int64) error {
p.grpcConn.Close()
}

callOpts := []grpc.CallOption{grpc.MaxCallRecvMsgSize(maxMsgSize)}
callOpts := []grpc.CallOption{grpc.MaxCallRecvMsgSize(maxGrpcMsgSize)}

if compressor, ok := getCompressorName(ctx); ok {
p.logger.Info("pump grpc compression enabled")
Expand Down
14 changes: 8 additions & 6 deletions drainer/sync/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ type DBConfig struct {

Merge bool `toml:"merge" json:"merge"`

ZKAddrs string `toml:"zookeeper-addrs" json:"zookeeper-addrs"`
KafkaAddrs string `toml:"kafka-addrs" json:"kafka-addrs"`
KafkaVersion string `toml:"kafka-version" json:"kafka-version"`
KafkaMaxMessages int `toml:"kafka-max-messages" json:"kafka-max-messages"`
KafkaClientID string `toml:"kafka-client-id" json:"kafka-client-id"`
TopicName string `toml:"topic-name" json:"topic-name"`
ZKAddrs string `toml:"zookeeper-addrs" json:"zookeeper-addrs"`
KafkaAddrs string `toml:"kafka-addrs" json:"kafka-addrs"`
KafkaVersion string `toml:"kafka-version" json:"kafka-version"`
KafkaMaxMessages int `toml:"kafka-max-messages" json:"kafka-max-messages"`
KafkaClientID string `toml:"kafka-client-id" json:"kafka-client-id"`
KafkaMaxMessageSize int32 `toml:"kafka-max-message-size" json:"kafka-max-message-size"`
TopicName string `toml:"topic-name" json:"topic-name"`

// get it from pd
ClusterID uint64 `toml:"-" json:"-"`
}
Expand Down
18 changes: 9 additions & 9 deletions drainer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,8 @@ import (
)

const (
maxKafkaMsgSize = 1024 * 1024 * 1024
maxGrpcMsgSize = math.MaxInt32
)

var (
maxMsgSize = maxGrpcMsgSize
maxKafkaMsgSize = 1 << 30
maxGrpcMsgSize = int(^uint(0) >> 1)
)

// taskGroup is a wrapper of `sync.WaitGroup`.
Expand Down Expand Up @@ -132,10 +128,14 @@ func GenCheckPointCfg(cfg *Config, id uint64) (*checkpoint.Config, error) {
return checkpointCfg, nil
}

func initializeSaramaGlobalConfig() {
sarama.MaxResponseSize = int32(maxKafkaMsgSize)
func initializeSaramaGlobalConfig(kafkaMsgSize int32) {
sarama.MaxResponseSize = kafkaMsgSize
// add 1 to avoid confused log: Producer.MaxMessageBytes must be smaller than MaxRequestSize; it will be ignored
sarama.MaxRequestSize = int32(maxKafkaMsgSize) + 1
if kafkaMsgSize < math.MaxInt32 {
sarama.MaxRequestSize = kafkaMsgSize + 1
} else {
sarama.MaxRequestSize = kafkaMsgSize
}
}

func getDDLJob(tiStore kv.Storage, id int64) (*model.Job, error) {
Expand Down
5 changes: 2 additions & 3 deletions pump/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"crypto/tls"
"flag"
"fmt"
"math"
"net"
"net/url"
"os"
Expand All @@ -36,7 +35,7 @@ const (
defaultEtcdDialTimeout = 5 * time.Second
defaultEtcdURLs = "http://127.0.0.1:2379"
defaultListenAddr = "127.0.0.1:8250"
defautMaxMsgSize = math.MaxInt32 // max grpc message size
defaultMaxMsgSize = int(^uint(0) >> 1) // max grpc message size
defaultHeartbeatInterval = 2
defaultGC = "7"
defaultDataDir = "data.pump"
Expand Down Expand Up @@ -111,7 +110,7 @@ func NewConfig() *Config {

// global config
fs.BoolVar(&GlobalConfig.enableDebug, "enable-debug", false, "enable print debug log")
fs.IntVar(&GlobalConfig.maxMsgSize, "max-message-size", defautMaxMsgSize, "max message size tidb produce into pump")
fs.IntVar(&GlobalConfig.maxMsgSize, "max-message-size", defaultMaxMsgSize, "max message size tidb produce into pump")
fs.Int64Var(new(int64), "binlog-file-size", 0, "DEPRECATED")
fs.BoolVar(new(bool), "enable-binlog-slice", false, "DEPRECATED")
fs.IntVar(new(int), "binlog-slice-size", 0, "DEPRECATED")
Expand Down
2 changes: 1 addition & 1 deletion pump/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func init() {
// it must be set before any real grpc operation.
grpc.EnableTracing = false
GlobalConfig = &globalConfig{
maxMsgSize: defautMaxMsgSize,
maxMsgSize: defaultMaxMsgSize,
}
}

Expand Down