From 21d2590ac37bf53e5e687b8a341d50105ddbffe8 Mon Sep 17 00:00:00 2001 From: qupeng Date: Fri, 5 Jul 2019 16:53:40 +0800 Subject: [PATCH] tikvclient: fix a bug that double close channels. (#10991) --- go.mod | 4 +- go.sum | 10 ++- metrics/tikvclient.go | 4 +- store/tikv/client.go | 120 +++++++++++++++++++++----------- store/tikv/client_fail_test.go | 62 +++++++++++++++++ store/tikv/client_test.go | 11 ++- store/tikv/mock_tikv_service.go | 68 ++++++++++++++++++ store/tikv/tikvrpc/tikvrpc.go | 15 ++++ 8 files changed, 239 insertions(+), 55 deletions(-) create mode 100644 store/tikv/client_fail_test.go create mode 100644 store/tikv/mock_tikv_service.go diff --git a/go.mod b/go.mod index 0acf514d25d5d..64917a182d71d 100644 --- a/go.mod +++ b/go.mod @@ -33,15 +33,13 @@ require ( github.com/myesui/uuid v1.0.0 // indirect github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef - github.com/onsi/ginkgo v1.7.0 // indirect - github.com/onsi/gomega v1.4.3 // indirect github.com/opentracing/basictracer-go v1.0.0 github.com/opentracing/opentracing-go v1.0.2 github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8 github.com/pingcap/errors v0.11.4 github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e - github.com/pingcap/kvproto v0.0.0-20190619024611-a4759dfe3753 + github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531 github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 github.com/pingcap/parser v0.0.0-20190701123046-5768e68c1e65 github.com/pingcap/pd v0.0.0-20190617100349-293d4b5189bf diff --git a/go.sum b/go.sum index 84ba911a2e97c..675d987a4cd1b 100644 --- a/go.sum +++ b/go.sum @@ -137,12 +137,10 @@ github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef h1:K0Fn+DoFqNqktdZtdV3 github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef/go.mod h1:7WjlapSfwQyo6LNmIvEWzsW1hbBQfpUO4JWnuQRmva8= github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= +github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= -github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I= github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= -github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU= -github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/opentracing/basictracer-go v1.0.0 h1:YyUAhaEfjoWXclZVJ9sGoNct7j4TVk7lZWlQw5UXuoo= github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg= @@ -161,8 +159,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= -github.com/pingcap/kvproto v0.0.0-20190619024611-a4759dfe3753 h1:92t0y430CJF0tN1lvUhP5fhnYTFmssATJqwxQtvixYU= -github.com/pingcap/kvproto v0.0.0-20190619024611-a4759dfe3753/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= +github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531 h1:8xk2HobDwClB5E3Hv9TEPiS7K7bv3ykWHLyZzuUYywI= +github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= diff --git a/metrics/tikvclient.go b/metrics/tikvclient.go index 1f66c1986a15b..c52c8a33e0dde 100644 --- a/metrics/tikvclient.go +++ b/metrics/tikvclient.go @@ -180,13 +180,13 @@ var ( }) // TiKVPendingBatchRequests indicates the number of requests pending in the batch channel. - TiKVPendingBatchRequests = prometheus.NewGauge( + TiKVPendingBatchRequests = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "tidb", Subsystem: "tikvclient", Name: "pending_batch_requests", Help: "Pending batch requests", - }) + }, []string{"store"}) TiKVBatchWaitDuration = prometheus.NewHistogram( prometheus.HistogramOpts{ diff --git a/store/tikv/client.go b/store/tikv/client.go index e7e952d23f11e..9a87b45aaba0d 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -25,6 +25,7 @@ import ( "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/debugpb" "github.com/pingcap/kvproto/pkg/tikvpb" @@ -33,6 +34,7 @@ import ( "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/util/logutil" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -78,11 +80,14 @@ type connArray struct { batchCommandsCh chan *batchCommandsEntry batchCommandsClients []*batchCommandsClient tikvTransportLayerLoad uint64 + closed chan struct{} // Notify rpcClient to check the idle flag idleNotify *uint32 idle bool idleDetect *time.Timer + + pendingRequests prometheus.Gauge } type batchCommandsClient struct { @@ -105,17 +110,70 @@ func (c *batchCommandsClient) isStopped() bool { return atomic.LoadInt32(&c.closed) != 0 } +func (c *batchCommandsClient) send(request *tikvpb.BatchCommandsRequest, entries []*batchCommandsEntry) { + // Use the lock to protect the stream client won't be replaced by RecvLoop, + // and new added request won't be removed by `failPendingRequests`. + c.clientLock.Lock() + defer c.clientLock.Unlock() + for i, requestID := range request.RequestIds { + c.batched.Store(requestID, entries[i]) + } + if err := c.client.Send(request); err != nil { + logutil.BgLogger().Error( + "batch commands send error", + zap.String("target", c.target), + zap.Error(err), + ) + c.failPendingRequests(err) + } +} + +func (c *batchCommandsClient) recv() (*tikvpb.BatchCommandsResponse, error) { + failpoint.Inject("gotErrorInRecvLoop", func(_ failpoint.Value) (*tikvpb.BatchCommandsResponse, error) { + return nil, errors.New("injected error in batchRecvLoop") + }) + // When `conn.Close()` is called, `client.Recv()` will return an error. + return c.client.Recv() +} + +// `failPendingRequests` must be called in locked contexts in order to avoid double closing channels. func (c *batchCommandsClient) failPendingRequests(err error) { + failpoint.Inject("panicInFailPendingRequests", nil) c.batched.Range(func(key, value interface{}) bool { id, _ := key.(uint64) entry, _ := value.(*batchCommandsEntry) entry.err = err - close(entry.res) c.batched.Delete(id) + close(entry.res) return true }) } +func (c *batchCommandsClient) reCreateStreamingClient(err error) bool { + // Hold the lock to forbid batchSendLoop using the old client. + c.clientLock.Lock() + defer c.clientLock.Unlock() + c.failPendingRequests(err) // fail all pending requests. + + // Re-establish a application layer stream. TCP layer is handled by gRPC. + tikvClient := tikvpb.NewTikvClient(c.conn) + streamClient, err := tikvClient.BatchCommands(context.TODO()) + if err == nil { + logutil.BgLogger().Info( + "batchRecvLoop re-create streaming success", + zap.String("target", c.target), + ) + c.client = streamClient + return true + } + logutil.BgLogger().Error( + "batchRecvLoop re-create streaming fail", + zap.String("target", c.target), + zap.Error(err), + ) + return false +} + func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { defer func() { if r := recover(); r != nil { @@ -129,8 +187,7 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { }() for { - // When `conn.Close()` is called, `client.Recv()` will return an error. - resp, err := c.client.Recv() + resp, err := c.recv() if err != nil { now := time.Now() for { // try to re-create the streaming in the loop. @@ -143,28 +200,10 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { zap.Error(err), ) - // Hold the lock to forbid batchSendLoop using the old client. - c.clientLock.Lock() - c.failPendingRequests(err) // fail all pending requests. - - // Re-establish a application layer stream. TCP layer is handled by gRPC. - tikvClient := tikvpb.NewTikvClient(c.conn) - streamClient, err := tikvClient.BatchCommands(context.TODO()) - c.clientLock.Unlock() - - if err == nil { - logutil.BgLogger().Info( - "batchRecvLoop re-create streaming success", - zap.String("target", c.target), - ) - c.client = streamClient + if c.reCreateStreamingClient(err) { break } - logutil.BgLogger().Error( - "batchRecvLoop re-create streaming fail", - zap.String("target", c.target), - zap.Error(err), - ) + // TODO: Use a more smart backoff strategy. time.Sleep(time.Second) } @@ -208,6 +247,7 @@ func newConnArray(maxSize uint, addr string, security config.Security, idleNotif batchCommandsCh: make(chan *batchCommandsEntry, cfg.TiKVClient.MaxBatchSize), batchCommandsClients: make([]*batchCommandsClient, 0, maxSize), tikvTransportLayerLoad: 0, + closed: make(chan struct{}), idleNotify: idleNotify, idleDetect: time.NewTimer(idleTimeout), @@ -220,6 +260,7 @@ func newConnArray(maxSize uint, addr string, security config.Security, idleNotif func (a *connArray) Init(addr string, security config.Security) error { a.target = addr + a.pendingRequests = metrics.TiKVPendingBatchRequests.WithLabelValues(a.target) opt := grpc.WithInsecure() if len(security.ClusterSSLCA) != 0 { @@ -309,7 +350,10 @@ func (a *connArray) Close() { // After connections are closed, `batchRecvLoop`s will check the flag. atomic.StoreInt32(&c.closed, 1) } - close(a.batchCommandsCh) + // Don't close(batchCommandsCh) because when Close() is called, someone maybe + // calling SendRequest and writing batchCommandsCh, if we close it here the + // writing goroutine will panic. + close(a.closed) for i, c := range a.v { if c != nil { @@ -356,6 +400,8 @@ func (a *connArray) fetchAllPendingRequests( atomic.CompareAndSwapUint32(a.idleNotify, 0, 1) // This connArray to be recycled return + case <-a.closed: + return } if headEntry == nil { return @@ -450,7 +496,7 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) { requests = requests[:0] requestIDs = requestIDs[:0] - metrics.TiKVPendingBatchRequests.Set(float64(len(a.batchCommandsCh))) + a.pendingRequests.Set(float64(len(a.batchCommandsCh))) a.fetchAllPendingRequests(int(cfg.MaxBatchSize), &entries, &requests) if len(entries) < int(cfg.MaxBatchSize) && cfg.MaxBatchWaitTime > 0 { @@ -484,27 +530,12 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) { requestIDs = append(requestIDs, requestID) } - request := &tikvpb.BatchCommandsRequest{ + req := &tikvpb.BatchCommandsRequest{ Requests: requests, RequestIds: requestIDs, } - // Use the lock to protect the stream client won't be replaced by RecvLoop, - // and new added request won't be removed by `failPendingRequests`. - batchCommandsClient.clientLock.Lock() - for i, requestID := range request.RequestIds { - batchCommandsClient.batched.Store(requestID, entries[i]) - } - err := batchCommandsClient.client.Send(request) - batchCommandsClient.clientLock.Unlock() - if err != nil { - logutil.BgLogger().Error( - "batch commands send error", - zap.String("target", a.target), - zap.Error(err), - ) - batchCommandsClient.failPendingRequests(err) - } + batchCommandsClient.send(req, entries) } } @@ -547,6 +578,11 @@ func newRPCClient(security config.Security) *rpcClient { } } +// NewTestRPCClient is for some external tests. +func NewTestRPCClient() Client { + return newRPCClient(config.Security{}) +} + func (c *rpcClient) getConnArray(addr string) (*connArray, error) { c.RLock() if c.isClosed { diff --git a/store/tikv/client_fail_test.go b/store/tikv/client_fail_test.go new file mode 100644 index 0000000000000..78f75bf43a9e6 --- /dev/null +++ b/store/tikv/client_fail_test.go @@ -0,0 +1,62 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tikv + +import ( + "context" + "fmt" + "time" + + . "github.com/pingcap/check" + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/tikvpb" + "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/store/tikv/tikvrpc" +) + +func setGrpcConnectionCount(count uint) { + newConf := config.NewConfig() + newConf.TiKVClient.GrpcConnectionCount = count + config.StoreGlobalConfig(newConf) +} + +func (s *testClientSuite) TestPanicInRecvLoop(c *C) { + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/panicInFailPendingRequests", `panic`), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/gotErrorInRecvLoop", `return("0")`), IsNil) + + server, port := startMockTikvService() + c.Assert(port > 0, IsTrue) + + grpcConnectionCount := config.GetGlobalConfig().TiKVClient.GrpcConnectionCount + setGrpcConnectionCount(1) + addr := fmt.Sprintf("%s:%d", "127.0.0.1", port) + rpcClient := newRPCClient(config.Security{}) + + // Start batchRecvLoop, and it should panic in `failPendingRequests`. + _, err := rpcClient.getConnArray(addr) + c.Assert(err, IsNil) + + time.Sleep(time.Second) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/panicInFailPendingRequests"), IsNil) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/gotErrorInRecvLoop"), IsNil) + + req := &tikvrpc.Request{ + Type: tikvrpc.CmdEmpty, + Empty: &tikvpb.BatchCommandsEmptyRequest{}, + } + _, err = rpcClient.SendRequest(context.Background(), addr, req, time.Second) + c.Assert(err, IsNil) + server.Stop() + setGrpcConnectionCount(grpcConnectionCount) +} diff --git a/store/tikv/client_test.go b/store/tikv/client_test.go index 4d4d2fa1e7d38..895671bc76ad5 100644 --- a/store/tikv/client_test.go +++ b/store/tikv/client_test.go @@ -35,9 +35,15 @@ type testClientSuite struct { var _ = Suite(&testClientSuite{}) +func setMaxBatchSize(size uint) { + newConf := config.NewConfig() + newConf.TiKVClient.MaxBatchSize = size + config.StoreGlobalConfig(newConf) +} + func (s *testClientSuite) TestConn(c *C) { - globalConfig := config.GetGlobalConfig() - globalConfig.TiKVClient.MaxBatchSize = 0 // Disable batch. + maxBatchSize := config.GetGlobalConfig().TiKVClient.MaxBatchSize + setMaxBatchSize(0) client := newRPCClient(config.Security{}) @@ -55,6 +61,7 @@ func (s *testClientSuite) TestConn(c *C) { conn3, err := client.getConnArray(addr) c.Assert(err, NotNil) c.Assert(conn3, IsNil) + setMaxBatchSize(maxBatchSize) } func (s *testClientSuite) TestRemoveCanceledRequests(c *C) { diff --git a/store/tikv/mock_tikv_service.go b/store/tikv/mock_tikv_service.go new file mode 100644 index 0000000000000..2e5db005c36d6 --- /dev/null +++ b/store/tikv/mock_tikv_service.go @@ -0,0 +1,68 @@ +package tikv + +import ( + "fmt" + "net" + "time" + + "github.com/pingcap/kvproto/pkg/tikvpb" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +type server struct { + tikvpb.TikvServer +} + +func (s *server) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error { + for { + req, err := ss.Recv() + if err != nil { + logutil.BgLogger().Error("batch commands receive fail", zap.Error(err)) + return err + } + + responses := make([]*tikvpb.BatchCommandsResponse_Response, 0, len(req.GetRequestIds())) + for i := 0; i < len(req.GetRequestIds()); i++ { + responses = append(responses, &tikvpb.BatchCommandsResponse_Response{ + Cmd: &tikvpb.BatchCommandsResponse_Response_Empty{ + Empty: &tikvpb.BatchCommandsEmptyResponse{}, + }, + }) + } + + err = ss.Send(&tikvpb.BatchCommandsResponse{ + Responses: responses, + RequestIds: req.GetRequestIds(), + }) + if err != nil { + logutil.BgLogger().Error("batch commands send fail", zap.Error(err)) + return err + } + } +} + +// Try to start a gRPC server and retrun the server instance and binded port. +func startMockTikvService() (*grpc.Server, int) { + for port := 40000; port < 50000; port++ { + lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", port)) + if err != nil { + logutil.BgLogger().Error("can't listen", zap.Error(err)) + continue + } + s := grpc.NewServer(grpc.ConnectionTimeout(time.Minute)) + tikvpb.RegisterTikvServer(s, &server{}) + go func() { + if err = s.Serve(lis); err != nil { + logutil.BgLogger().Error( + "can't serve gRPC requests", + zap.Error(err), + ) + } + }() + return s, port + } + logutil.BgLogger().Error("can't start mock tikv service because no available ports") + return nil, -1 +} diff --git a/store/tikv/tikvrpc/tikvrpc.go b/store/tikv/tikvrpc/tikvrpc.go index 8ee060fdd49b4..b3cf78ee5e82e 100644 --- a/store/tikv/tikvrpc/tikvrpc.go +++ b/store/tikv/tikvrpc/tikvrpc.go @@ -66,6 +66,8 @@ const ( CmdSplitRegion CmdDebugGetRegionProperties CmdType = 2048 + iota + + CmdEmpty CmdType = 3072 + iota ) func (t CmdType) String() string { @@ -163,6 +165,8 @@ type Request struct { PessimisticRollback *kvrpcpb.PessimisticRollbackRequest DebugGetRegionProperties *debugpb.GetRegionPropertiesRequest + + Empty *tikvpb.BatchCommandsEmptyRequest } // ToBatchCommandsRequest converts the request to an entry in BatchCommands request. @@ -212,6 +216,8 @@ func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Reques return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_PessimisticLock{PessimisticLock: req.PessimisticLock}} case CmdPessimisticRollback: return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_PessimisticRollback{PessimisticRollback: req.PessimisticRollback}} + case CmdEmpty: + return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Empty{Empty: req.Empty}} } return nil } @@ -258,6 +264,8 @@ type Response struct { PessimisticRollback *kvrpcpb.PessimisticRollbackResponse DebugGetRegionProperties *debugpb.GetRegionPropertiesResponse + + Empty *tikvpb.BatchCommandsEmptyResponse } // FromBatchCommandsResponse converts a BatchCommands response to Response. @@ -307,6 +315,8 @@ func FromBatchCommandsResponse(res *tikvpb.BatchCommandsResponse_Response) *Resp return &Response{Type: CmdPessimisticLock, PessimisticLock: res.PessimisticLock} case *tikvpb.BatchCommandsResponse_Response_PessimisticRollback: return &Response{Type: CmdPessimisticRollback, PessimisticRollback: res.PessimisticRollback} + case *tikvpb.BatchCommandsResponse_Response_Empty: + return &Response{Type: CmdEmpty, Empty: res.Empty} } return nil } @@ -383,6 +393,7 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error { req.MvccGetByStartTs.Context = ctx case CmdSplitRegion: req.SplitRegion.Context = ctx + case CmdEmpty: default: return fmt.Errorf("invalid request type %v", req.Type) } @@ -505,6 +516,7 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) { resp.SplitRegion = &kvrpcpb.SplitRegionResponse{ RegionError: e, } + case CmdEmpty: default: return nil, fmt.Errorf("invalid request type %v", req.Type) } @@ -569,6 +581,7 @@ func (resp *Response) GetRegionError() (*errorpb.Error, error) { e = resp.MvccGetByStartTS.GetRegionError() case CmdSplitRegion: e = resp.SplitRegion.GetRegionError() + case CmdEmpty: default: return nil, fmt.Errorf("invalid response type %v", resp.Type) } @@ -641,6 +654,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp resp.MvccGetByStartTS, err = client.MvccGetByStartTs(ctx, req.MvccGetByStartTs) case CmdSplitRegion: resp.SplitRegion, err = client.SplitRegion(ctx, req.SplitRegion) + case CmdEmpty: + resp.Empty, err = &tikvpb.BatchCommandsEmptyResponse{}, nil default: return nil, errors.Errorf("invalid request type: %v", req.Type) }