From fe9773223302d10b0c47468005451cf7e23046d3 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 16 Aug 2022 17:13:45 +0800 Subject: [PATCH 1/3] add compress test --- tests/cache_table/run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/cache_table/run.sh b/tests/cache_table/run.sh index 0c2696df0..48ae47eda 100644 --- a/tests/cache_table/run.sh +++ b/tests/cache_table/run.sh @@ -4,7 +4,7 @@ set -e cd "$(dirname "$0")" -run_drainer & +run_drainer --compressor gzip & sleep 3 From 10ee044762f3a82de490549d1b95eca09bd54c02 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 16 Aug 2022 17:27:20 +0800 Subject: [PATCH 2/3] fix compress --- drainer/util.go | 2 +- pump/config.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/drainer/util.go b/drainer/util.go index 5e6420dec..bd8d9397d 100644 --- a/drainer/util.go +++ b/drainer/util.go @@ -43,7 +43,7 @@ import ( const ( maxKafkaMsgSize = 1 << 30 - maxGrpcMsgSize = int(^uint(0) >> 1) + maxGrpcMsgSize = int(^uint(0)>>1) - 4*1024*1024 // max grpc message size, leave 4MB as buffer, see https://github.com/pingcap/tidb-binlog/issues/1152 ) // taskGroup is a wrapper of `sync.WaitGroup`. diff --git a/pump/config.go b/pump/config.go index ced2cfdd6..edb705735 100644 --- a/pump/config.go +++ b/pump/config.go @@ -35,7 +35,7 @@ const ( defaultEtcdDialTimeout = 5 * time.Second defaultEtcdURLs = "http://127.0.0.1:2379" defaultListenAddr = "127.0.0.1:8250" - defaultMaxMsgSize = int(^uint(0) >> 1) // max grpc message size + defaultMaxMsgSize = int(^uint(0)>>1) - 4*1024*1024 // max grpc message size, leave 4MB as buffer, see https://github.com/pingcap/tidb-binlog/issues/1152 defaultHeartbeatInterval = 2 defaultGC = "7" defaultDataDir = "data.pump" From 0f34746f1acda729c365d76cce8975465f7badbe Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 16 Aug 2022 22:01:37 +0800 Subject: [PATCH 3/3] address comment --- drainer/util.go | 4 +++- pump/config.go | 10 ++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/drainer/util.go b/drainer/util.go index bd8d9397d..9d603cd56 100644 --- a/drainer/util.go +++ b/drainer/util.go @@ -43,7 +43,9 @@ import ( const ( maxKafkaMsgSize = 1 << 30 - maxGrpcMsgSize = int(^uint(0)>>1) - 4*1024*1024 // max grpc message size, leave 4MB as buffer, see https://github.com/pingcap/tidb-binlog/issues/1152 + // max grpc message size, leave 4MB as buffer. Because when grpc decompresses messages, it will leave a few buffer + // for this, which overflows the int64: https://github.com/grpc/grpc-go/blob/v1.44.0/rpc_util.go#L742 + maxGrpcMsgSize = int(^uint(0)>>1) - 4*1024*1024 ) // taskGroup is a wrapper of `sync.WaitGroup`. diff --git a/pump/config.go b/pump/config.go index edb705735..313cc6a9b 100644 --- a/pump/config.go +++ b/pump/config.go @@ -32,10 +32,12 @@ import ( ) const ( - defaultEtcdDialTimeout = 5 * time.Second - defaultEtcdURLs = "http://127.0.0.1:2379" - defaultListenAddr = "127.0.0.1:8250" - defaultMaxMsgSize = int(^uint(0)>>1) - 4*1024*1024 // max grpc message size, leave 4MB as buffer, see https://github.com/pingcap/tidb-binlog/issues/1152 + defaultEtcdDialTimeout = 5 * time.Second + defaultEtcdURLs = "http://127.0.0.1:2379" + defaultListenAddr = "127.0.0.1:8250" + // max grpc message size, leave 4MB as buffer. Because when grpc decompresses messages, it will leave a few buffer + // for this, which overflows the int64: https://github.com/grpc/grpc-go/blob/v1.44.0/rpc_util.go#L742 + defaultMaxMsgSize = int(^uint(0)>>1) - 4*1024*1024 defaultHeartbeatInterval = 2 defaultGC = "7" defaultDataDir = "data.pump"