From 6f1f360dea37cac69a51db9bdd09549dda67b0cf Mon Sep 17 00:00:00 2001 From: qupeng Date: Sun, 30 Jun 2019 20:11:59 +0800 Subject: [PATCH 01/11] tikvclient: fix a bug that double close channels. Signed-off-by: qupeng --- metrics/tikvclient.go | 4 ++-- store/tikv/client.go | 53 ++++++++++++++++++++++++++----------------- 2 files changed, 34 insertions(+), 23 deletions(-) diff --git a/metrics/tikvclient.go b/metrics/tikvclient.go index 1f66c1986a15b..c52c8a33e0dde 100644 --- a/metrics/tikvclient.go +++ b/metrics/tikvclient.go @@ -180,13 +180,13 @@ var ( }) // TiKVPendingBatchRequests indicates the number of requests pending in the batch channel. - TiKVPendingBatchRequests = prometheus.NewGauge( + TiKVPendingBatchRequests = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "tidb", Subsystem: "tikvclient", Name: "pending_batch_requests", Help: "Pending batch requests", - }) + }, []string{"store"}) TiKVBatchWaitDuration = prometheus.NewHistogram( prometheus.HistogramOpts{ diff --git a/store/tikv/client.go b/store/tikv/client.go index 56ffbcf666a9f..d5ab29a0106ee 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -109,6 +109,7 @@ func (c *batchCommandsClient) isStopped() bool { return atomic.LoadInt32(&c.closed) != 0 } +// `failPendingRequests` must be called in locked contexts in order to avoid double closing channels. func (c *batchCommandsClient) failPendingRequests(err error) { c.batched.Range(func(key, value interface{}) bool { id, _ := key.(uint64) @@ -120,6 +121,32 @@ func (c *batchCommandsClient) failPendingRequests(err error) { }) } +func (c *batchCommandsClient) reCreateStreamingClient(err error) bool { + // Hold the lock to forbid batchSendLoop using the old client. + c.clientLock.Lock() + defer c.clientLock.Unlock() + + c.failPendingRequests(err) // fail all pending requests. + + // Re-establish a application layer stream. TCP layer is handled by gRPC. + tikvClient := tikvpb.NewTikvClient(c.conn) + streamClient, err := tikvClient.BatchCommands(context.TODO()) + if err == nil { + logutil.BgLogger().Info( + "batchRecvLoop re-create streaming success", + zap.String("target", c.target), + ) + c.client = streamClient + return true + } + logutil.BgLogger().Error( + "batchRecvLoop re-create streaming fail", + zap.String("target", c.target), + zap.Error(err), + ) + return false +} + func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { defer func() { if r := recover(); r != nil { @@ -147,28 +174,10 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { zap.Error(err), ) - // Hold the lock to forbid batchSendLoop using the old client. - c.clientLock.Lock() - c.failPendingRequests(err) // fail all pending requests. - - // Re-establish a application layer stream. TCP layer is handled by gRPC. - tikvClient := tikvpb.NewTikvClient(c.conn) - streamClient, err := tikvClient.BatchCommands(context.TODO()) - c.clientLock.Unlock() - - if err == nil { - logutil.BgLogger().Info( - "batchRecvLoop re-create streaming success", - zap.String("target", c.target), - ) - c.client = streamClient + if c.reCreateStreamingClient(err) { break } - logutil.BgLogger().Error( - "batchRecvLoop re-create streaming fail", - zap.String("target", c.target), - zap.Error(err), - ) + // TODO: Use a more smart backoff strategy. time.Sleep(time.Second) } @@ -455,7 +464,7 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) { requests = requests[:0] requestIDs = requestIDs[:0] - metrics.TiKVPendingBatchRequests.Set(float64(len(a.batchCommandsCh))) + metrics.TiKVPendingBatchRequests.WithLabelValues(a.target).Set(float64(len(a.batchCommandsCh))) a.fetchAllPendingRequests(int(cfg.MaxBatchSize), &entries, &requests) if len(entries) < int(cfg.MaxBatchSize) && cfg.MaxBatchWaitTime > 0 { @@ -508,7 +517,9 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) { zap.String("target", a.target), zap.Error(err), ) + batchCommandsClient.clientLock.Lock() batchCommandsClient.failPendingRequests(err) + batchCommandsClient.clientLock.Unlock() } } } From 9b9c9df3cec993566038e6818f841c8a18a78070 Mon Sep 17 00:00:00 2001 From: qupeng Date: Wed, 3 Jul 2019 12:19:27 +0800 Subject: [PATCH 02/11] address comments --- store/tikv/client.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/store/tikv/client.go b/store/tikv/client.go index 64ef870874697..46603c783e4bb 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/util/logutil" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -83,6 +84,8 @@ type connArray struct { idleNotify *uint32 idle bool idleDetect *time.Timer + + pendingRequests prometheus.Gauge } type batchCommandsClient struct { @@ -229,6 +232,7 @@ func newConnArray(maxSize uint, addr string, security config.Security, idleNotif func (a *connArray) Init(addr string, security config.Security) error { a.target = addr + a.pendingRequests = metrics.TiKVPendingBatchRequests.WithLabelValues(a.target) opt := grpc.WithInsecure() if len(security.ClusterSSLCA) != 0 { @@ -459,7 +463,7 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) { requests = requests[:0] requestIDs = requestIDs[:0] - metrics.TiKVPendingBatchRequests.WithLabelValues(a.target).Set(float64(len(a.batchCommandsCh))) + a.pendingRequests.Set(float64(len(a.batchCommandsCh))) a.fetchAllPendingRequests(int(cfg.MaxBatchSize), &entries, &requests) if len(entries) < int(cfg.MaxBatchSize) && cfg.MaxBatchWaitTime > 0 { From 9de3d8d53d4f14a4f848491bb4e453dbdd05fbda Mon Sep 17 00:00:00 2001 From: qupeng Date: Wed, 3 Jul 2019 13:47:50 +0800 Subject: [PATCH 03/11] add test case --- store/tikv/client.go | 18 ++- store/tikv/client_fail_test.go | 50 ++++++++ store/tikv/mock_tikv_service.go | 205 ++++++++++++++++++++++++++++++++ 3 files changed, 271 insertions(+), 2 deletions(-) create mode 100644 store/tikv/client_fail_test.go create mode 100644 store/tikv/mock_tikv_service.go diff --git a/store/tikv/client.go b/store/tikv/client.go index 46603c783e4bb..ec663dcc44ed3 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -16,6 +16,7 @@ package tikv import ( "context" + "fmt" "io" "math" "strconv" @@ -25,6 +26,7 @@ import ( "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/debugpb" "github.com/pingcap/kvproto/pkg/tikvpb" @@ -110,6 +112,9 @@ func (c *batchCommandsClient) isStopped() bool { // `failPendingRequests` must be called in locked contexts in order to avoid double closing channels. func (c *batchCommandsClient) failPendingRequests(err error) { + failpoint.Inject("panicInFailPendingRequests", func(_ failpoint.Value) { + panic("panic in fail pending requests") + }) c.batched.Range(func(key, value interface{}) bool { id, _ := key.(uint64) entry, _ := value.(*batchCommandsEntry) @@ -159,8 +164,17 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { }() for { - // When `conn.Close()` is called, `client.Recv()` will return an error. - resp, err := c.client.Recv() + var err error = nil + var resp *tikvpb.BatchCommandsResponse = nil + failpoint.Inject("gotErrorInRecvLoop", func(_ failpoint.Value) { + fmt.Printf("gotErrorInRecvLoop is injected") + err = errors.New("injected error in batchRecvLoop") + }) + if err == nil { + // When `conn.Close()` is called, `client.Recv()` will return an error. + resp, err = c.client.Recv() + } + if err != nil { now := time.Now() for { // try to re-create the streaming in the loop. diff --git a/store/tikv/client_fail_test.go b/store/tikv/client_fail_test.go new file mode 100644 index 0000000000000..bad926a1ee71d --- /dev/null +++ b/store/tikv/client_fail_test.go @@ -0,0 +1,50 @@ +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tikv + +import ( + "context" + "fmt" + "time" + + . "github.com/pingcap/check" + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/store/tikv/tikvrpc" +) + +func (s *testClientSuite) TestPanicInRecvLoop(c *C) { + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/client/panicInFailPendingRequests", `return("0")`), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/client/gotErrorInRecvLoop", `return("0")`), IsNil) + + port := startMockTikvService() + c.Assert(port > 0, IsTrue) + + addr := fmt.Sprintf("%s:%d", "127.0.0.1", port) + rpcClient := newRPCClient(config.Security{}) + + _, err := rpcClient.getConnArray(addr) + c.Assert(err, IsNil) + + time.Sleep(time.Hour) + + req := &tikvrpc.Request{ + Type: tikvrpc.CmdGet, + Get: &kvrpcpb.GetRequest{}, + } + + _, err = rpcClient.SendRequest(context.Background(), addr, req, time.Second) + fmt.Printf("error--------------------------------: %v", err) +} diff --git a/store/tikv/mock_tikv_service.go b/store/tikv/mock_tikv_service.go new file mode 100644 index 0000000000000..f3abe36f11e3a --- /dev/null +++ b/store/tikv/mock_tikv_service.go @@ -0,0 +1,205 @@ +package tikv + +import ( + "context" + "fmt" + "net" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/coprocessor" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/tikvpb" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +type server struct{} + +func (s *server) KvGet(context.Context, *kvrpcpb.GetRequest) (*kvrpcpb.GetResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) KvScan(context.Context, *kvrpcpb.ScanRequest) (*kvrpcpb.ScanResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) KvPrewrite(context.Context, *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) KvPessimisticLock(context.Context, *kvrpcpb.PessimisticLockRequest) (*kvrpcpb.PessimisticLockResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) KVPessimisticRollback(context.Context, *kvrpcpb.PessimisticRollbackRequest) (*kvrpcpb.PessimisticRollbackResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) KvCommit(context.Context, *kvrpcpb.CommitRequest) (*kvrpcpb.CommitResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) KvImport(context.Context, *kvrpcpb.ImportRequest) (*kvrpcpb.ImportResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) KvCleanup(context.Context, *kvrpcpb.CleanupRequest) (*kvrpcpb.CleanupResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) KvBatchGet(context.Context, *kvrpcpb.BatchGetRequest) (*kvrpcpb.BatchGetResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) KvBatchRollback(context.Context, *kvrpcpb.BatchRollbackRequest) (*kvrpcpb.BatchRollbackResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) KvScanLock(context.Context, *kvrpcpb.ScanLockRequest) (*kvrpcpb.ScanLockResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) KvResolveLock(context.Context, *kvrpcpb.ResolveLockRequest) (*kvrpcpb.ResolveLockResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) KvGC(context.Context, *kvrpcpb.GCRequest) (*kvrpcpb.GCResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) KvDeleteRange(context.Context, *kvrpcpb.DeleteRangeRequest) (*kvrpcpb.DeleteRangeResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) RawGet(context.Context, *kvrpcpb.RawGetRequest) (*kvrpcpb.RawGetResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) RawBatchGet(context.Context, *kvrpcpb.RawBatchGetRequest) (*kvrpcpb.RawBatchGetResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) RawPut(context.Context, *kvrpcpb.RawPutRequest) (*kvrpcpb.RawPutResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) RawBatchPut(context.Context, *kvrpcpb.RawBatchPutRequest) (*kvrpcpb.RawBatchPutResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) RawDelete(context.Context, *kvrpcpb.RawDeleteRequest) (*kvrpcpb.RawDeleteResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) RawBatchDelete(context.Context, *kvrpcpb.RawBatchDeleteRequest) (*kvrpcpb.RawBatchDeleteResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) RawScan(context.Context, *kvrpcpb.RawScanRequest) (*kvrpcpb.RawScanResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) RawDeleteRange(context.Context, *kvrpcpb.RawDeleteRangeRequest) (*kvrpcpb.RawDeleteRangeResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) RawBatchScan(context.Context, *kvrpcpb.RawBatchScanRequest) (*kvrpcpb.RawBatchScanResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) UnsafeDestroyRange(context.Context, *kvrpcpb.UnsafeDestroyRangeRequest) (*kvrpcpb.UnsafeDestroyRangeResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) Coprocessor(context.Context, *coprocessor.Request) (*coprocessor.Response, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) CoprocessorStream(*coprocessor.Request, tikvpb.Tikv_CoprocessorStreamServer) error { + return errors.New("unimplemented in the mock service") +} + +func (s *server) Raft(tikvpb.Tikv_RaftServer) error { + return errors.New("unimplemented in the mock service") +} + +func (s *server) BatchRaft(tikvpb.Tikv_BatchRaftServer) error { + return errors.New("unimplemented in the mock service") +} + +func (s *server) Snapshot(tikvpb.Tikv_SnapshotServer) error { + return errors.New("unimplemented in the mock service") +} + +func (s *server) SplitRegion(context.Context, *kvrpcpb.SplitRegionRequest) (*kvrpcpb.SplitRegionResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) ReadIndex(context.Context, *kvrpcpb.ReadIndexRequest) (*kvrpcpb.ReadIndexResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) MvccGetByKey(context.Context, *kvrpcpb.MvccGetByKeyRequest) (*kvrpcpb.MvccGetByKeyResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) MvccGetByStartTs(context.Context, *kvrpcpb.MvccGetByStartTsRequest) (*kvrpcpb.MvccGetByStartTsResponse, error) { + return nil, errors.New("unimplemented in the mock service") +} + +func (s *server) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error { + for { + req, err := ss.Recv() + if err != nil { + logutil.BgLogger().Error("batch commands receive fail", zap.Error(err)) + return err + } + + responses := make([]*tikvpb.BatchCommandsResponse_Response, 0, len(req.GetRequestIds())) + for i := 0; i < len(req.GetRequestIds()); i++ { + responses = append(responses, &tikvpb.BatchCommandsResponse_Response{ + Cmd: &tikvpb.BatchCommandsResponse_Response_Get{ + Get: &kvrpcpb.GetResponse{ + Value: []byte{'a', 'b', 'c'}, + }, + }, + }) + } + + err = ss.Send(&tikvpb.BatchCommandsResponse{ + Responses: responses, + RequestIds: req.GetRequestIds(), + }) + if err != nil { + logutil.BgLogger().Error("batch commands send fail", zap.Error(err)) + return err + } + } + return nil +} + +// Try to start a gRPC server and retrun the binded port. +func startMockTikvService() int { + for port := 40000; port < 50000; port++ { + lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", port)) + if err != nil { + logutil.BgLogger().Error("can't listen", zap.Error(err)) + continue + } + s := grpc.NewServer(grpc.ConnectionTimeout(time.Minute)) + tikvpb.RegisterTikvServer(s, &server{}) + go func() { + if err = s.Serve(lis); err != nil { + logutil.BgLogger().Error( + "can't serve gRPC requests", + zap.Error(err), + ) + } + }() + return port + } + logutil.BgLogger().Error("can't start mock tikv service because no available ports") + return -1 +} From 8b1d72bf8465868899c8931fdafe90dced48b53a Mon Sep 17 00:00:00 2001 From: qupeng Date: Wed, 3 Jul 2019 16:40:59 +0800 Subject: [PATCH 04/11] address comments --- store/tikv/client.go | 4 +- store/tikv/client_fail_test.go | 8 +- store/tikv/mock_tikv_service.go | 137 +------------------------------- 3 files changed, 9 insertions(+), 140 deletions(-) diff --git a/store/tikv/client.go b/store/tikv/client.go index ec663dcc44ed3..871bb1790bcf6 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -112,9 +112,7 @@ func (c *batchCommandsClient) isStopped() bool { // `failPendingRequests` must be called in locked contexts in order to avoid double closing channels. func (c *batchCommandsClient) failPendingRequests(err error) { - failpoint.Inject("panicInFailPendingRequests", func(_ failpoint.Value) { - panic("panic in fail pending requests") - }) + failpoint.Inject("panicInFailPendingRequests", nil) c.batched.Range(func(key, value interface{}) bool { id, _ := key.(uint64) entry, _ := value.(*batchCommandsEntry) diff --git a/store/tikv/client_fail_test.go b/store/tikv/client_fail_test.go index bad926a1ee71d..bea7a9c8d032d 100644 --- a/store/tikv/client_fail_test.go +++ b/store/tikv/client_fail_test.go @@ -26,8 +26,12 @@ import ( ) func (s *testClientSuite) TestPanicInRecvLoop(c *C) { - c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/client/panicInFailPendingRequests", `return("0")`), IsNil) - c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/client/gotErrorInRecvLoop", `return("0")`), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/panicInFailPendingRequests", `panic`), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/gotErrorInRecvLoop", `return("0")`), IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/panicInFailPendingRequests", `return("0")`), IsNil) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/gotErrorInRecvLoop", `return("0")`), IsNil) + }() port := startMockTikvService() c.Assert(port > 0, IsTrue) diff --git a/store/tikv/mock_tikv_service.go b/store/tikv/mock_tikv_service.go index f3abe36f11e3a..d5cf3a811ef90 100644 --- a/store/tikv/mock_tikv_service.go +++ b/store/tikv/mock_tikv_service.go @@ -1,13 +1,10 @@ package tikv import ( - "context" "fmt" "net" "time" - "github.com/pingcap/errors" - "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pingcap/tidb/util/logutil" @@ -15,138 +12,8 @@ import ( "google.golang.org/grpc" ) -type server struct{} - -func (s *server) KvGet(context.Context, *kvrpcpb.GetRequest) (*kvrpcpb.GetResponse, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) KvScan(context.Context, *kvrpcpb.ScanRequest) (*kvrpcpb.ScanResponse, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) KvPrewrite(context.Context, *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) KvPessimisticLock(context.Context, *kvrpcpb.PessimisticLockRequest) (*kvrpcpb.PessimisticLockResponse, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) KVPessimisticRollback(context.Context, *kvrpcpb.PessimisticRollbackRequest) (*kvrpcpb.PessimisticRollbackResponse, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) KvCommit(context.Context, *kvrpcpb.CommitRequest) (*kvrpcpb.CommitResponse, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) KvImport(context.Context, *kvrpcpb.ImportRequest) (*kvrpcpb.ImportResponse, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) KvCleanup(context.Context, *kvrpcpb.CleanupRequest) (*kvrpcpb.CleanupResponse, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) KvBatchGet(context.Context, *kvrpcpb.BatchGetRequest) (*kvrpcpb.BatchGetResponse, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) KvBatchRollback(context.Context, *kvrpcpb.BatchRollbackRequest) (*kvrpcpb.BatchRollbackResponse, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) KvScanLock(context.Context, *kvrpcpb.ScanLockRequest) (*kvrpcpb.ScanLockResponse, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) KvResolveLock(context.Context, *kvrpcpb.ResolveLockRequest) (*kvrpcpb.ResolveLockResponse, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) KvGC(context.Context, *kvrpcpb.GCRequest) (*kvrpcpb.GCResponse, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) KvDeleteRange(context.Context, *kvrpcpb.DeleteRangeRequest) (*kvrpcpb.DeleteRangeResponse, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) RawGet(context.Context, *kvrpcpb.RawGetRequest) (*kvrpcpb.RawGetResponse, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) RawBatchGet(context.Context, *kvrpcpb.RawBatchGetRequest) (*kvrpcpb.RawBatchGetResponse, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) RawPut(context.Context, *kvrpcpb.RawPutRequest) (*kvrpcpb.RawPutResponse, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) RawBatchPut(context.Context, *kvrpcpb.RawBatchPutRequest) (*kvrpcpb.RawBatchPutResponse, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) RawDelete(context.Context, *kvrpcpb.RawDeleteRequest) (*kvrpcpb.RawDeleteResponse, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) RawBatchDelete(context.Context, *kvrpcpb.RawBatchDeleteRequest) (*kvrpcpb.RawBatchDeleteResponse, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) RawScan(context.Context, *kvrpcpb.RawScanRequest) (*kvrpcpb.RawScanResponse, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) RawDeleteRange(context.Context, *kvrpcpb.RawDeleteRangeRequest) (*kvrpcpb.RawDeleteRangeResponse, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) RawBatchScan(context.Context, *kvrpcpb.RawBatchScanRequest) (*kvrpcpb.RawBatchScanResponse, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) UnsafeDestroyRange(context.Context, *kvrpcpb.UnsafeDestroyRangeRequest) (*kvrpcpb.UnsafeDestroyRangeResponse, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) Coprocessor(context.Context, *coprocessor.Request) (*coprocessor.Response, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) CoprocessorStream(*coprocessor.Request, tikvpb.Tikv_CoprocessorStreamServer) error { - return errors.New("unimplemented in the mock service") -} - -func (s *server) Raft(tikvpb.Tikv_RaftServer) error { - return errors.New("unimplemented in the mock service") -} - -func (s *server) BatchRaft(tikvpb.Tikv_BatchRaftServer) error { - return errors.New("unimplemented in the mock service") -} - -func (s *server) Snapshot(tikvpb.Tikv_SnapshotServer) error { - return errors.New("unimplemented in the mock service") -} - -func (s *server) SplitRegion(context.Context, *kvrpcpb.SplitRegionRequest) (*kvrpcpb.SplitRegionResponse, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) ReadIndex(context.Context, *kvrpcpb.ReadIndexRequest) (*kvrpcpb.ReadIndexResponse, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) MvccGetByKey(context.Context, *kvrpcpb.MvccGetByKeyRequest) (*kvrpcpb.MvccGetByKeyResponse, error) { - return nil, errors.New("unimplemented in the mock service") -} - -func (s *server) MvccGetByStartTs(context.Context, *kvrpcpb.MvccGetByStartTsRequest) (*kvrpcpb.MvccGetByStartTsResponse, error) { - return nil, errors.New("unimplemented in the mock service") +type server struct { + tikvpb.TikvServer } func (s *server) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error { From 4aa72479ef44cd698fd9b8619f93c22da18ba851 Mon Sep 17 00:00:00 2001 From: qupeng Date: Wed, 3 Jul 2019 17:11:34 +0800 Subject: [PATCH 05/11] address comments --- store/tikv/client.go | 42 ++++++++++++++++----------------- store/tikv/client_fail_test.go | 13 +++++----- store/tikv/mock_tikv_service.go | 1 - 3 files changed, 26 insertions(+), 30 deletions(-) diff --git a/store/tikv/client.go b/store/tikv/client.go index 871bb1790bcf6..f160748b13748 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -16,7 +16,6 @@ package tikv import ( "context" - "fmt" "io" "math" "strconv" @@ -110,6 +109,24 @@ func (c *batchCommandsClient) isStopped() bool { return atomic.LoadInt32(&c.closed) != 0 } +func (c *batchCommandsClient) send(request *tikvpb.BatchCommandsRequest, entries []*batchCommandsEntry) { + // Use the lock to protect the stream client won't be replaced by RecvLoop, + // and new added request won't be removed by `failPendingRequests`. + c.clientLock.Lock() + defer c.clientLock.Unlock() + for i, requestID := range request.RequestIds { + c.batched.Store(requestID, entries[i]) + } + if err := c.client.Send(request); err != nil { + logutil.BgLogger().Error( + "batch commands send error", + zap.String("target", c.target), + zap.Error(err), + ) + c.failPendingRequests(err) + } +} + // `failPendingRequests` must be called in locked contexts in order to avoid double closing channels. func (c *batchCommandsClient) failPendingRequests(err error) { failpoint.Inject("panicInFailPendingRequests", nil) @@ -127,7 +144,6 @@ func (c *batchCommandsClient) reCreateStreamingClient(err error) bool { // Hold the lock to forbid batchSendLoop using the old client. c.clientLock.Lock() defer c.clientLock.Unlock() - c.failPendingRequests(err) // fail all pending requests. // Re-establish a application layer stream. TCP layer is handled by gRPC. @@ -165,7 +181,6 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { var err error = nil var resp *tikvpb.BatchCommandsResponse = nil failpoint.Inject("gotErrorInRecvLoop", func(_ failpoint.Value) { - fmt.Printf("gotErrorInRecvLoop is injected") err = errors.New("injected error in batchRecvLoop") }) if err == nil { @@ -509,29 +524,12 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) { requestIDs = append(requestIDs, requestID) } - request := &tikvpb.BatchCommandsRequest{ + req := &tikvpb.BatchCommandsRequest{ Requests: requests, RequestIds: requestIDs, } - // Use the lock to protect the stream client won't be replaced by RecvLoop, - // and new added request won't be removed by `failPendingRequests`. - batchCommandsClient.clientLock.Lock() - for i, requestID := range request.RequestIds { - batchCommandsClient.batched.Store(requestID, entries[i]) - } - err := batchCommandsClient.client.Send(request) - batchCommandsClient.clientLock.Unlock() - if err != nil { - logutil.BgLogger().Error( - "batch commands send error", - zap.String("target", a.target), - zap.Error(err), - ) - batchCommandsClient.clientLock.Lock() - batchCommandsClient.failPendingRequests(err) - batchCommandsClient.clientLock.Unlock() - } + batchCommandsClient.send(req, entries) } } diff --git a/store/tikv/client_fail_test.go b/store/tikv/client_fail_test.go index bea7a9c8d032d..2f9b8480ca144 100644 --- a/store/tikv/client_fail_test.go +++ b/store/tikv/client_fail_test.go @@ -28,27 +28,26 @@ import ( func (s *testClientSuite) TestPanicInRecvLoop(c *C) { c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/panicInFailPendingRequests", `panic`), IsNil) c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/gotErrorInRecvLoop", `return("0")`), IsNil) - defer func() { - c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/panicInFailPendingRequests", `return("0")`), IsNil) - c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/gotErrorInRecvLoop", `return("0")`), IsNil) - }() port := startMockTikvService() c.Assert(port > 0, IsTrue) + config.GetGlobalConfig().TiKVClient.GrpcConnectionCount = 1 addr := fmt.Sprintf("%s:%d", "127.0.0.1", port) rpcClient := newRPCClient(config.Security{}) + // Start batchRecvLoop, and it should panic in `failPendingRequests`. _, err := rpcClient.getConnArray(addr) c.Assert(err, IsNil) - time.Sleep(time.Hour) + time.Sleep(time.Second) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/panicInFailPendingRequests"), IsNil) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/gotErrorInRecvLoop"), IsNil) req := &tikvrpc.Request{ Type: tikvrpc.CmdGet, Get: &kvrpcpb.GetRequest{}, } - _, err = rpcClient.SendRequest(context.Background(), addr, req, time.Second) - fmt.Printf("error--------------------------------: %v", err) + c.Assert(err, IsNil) } diff --git a/store/tikv/mock_tikv_service.go b/store/tikv/mock_tikv_service.go index d5cf3a811ef90..68862766b4f60 100644 --- a/store/tikv/mock_tikv_service.go +++ b/store/tikv/mock_tikv_service.go @@ -44,7 +44,6 @@ func (s *server) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error { return err } } - return nil } // Try to start a gRPC server and retrun the binded port. From c4c4d02a351f8b8f23990b5210ed239daa1f1da4 Mon Sep 17 00:00:00 2001 From: qupeng Date: Wed, 3 Jul 2019 19:50:12 +0800 Subject: [PATCH 06/11] export `NewTestRPCClient` for schrodinger tests. --- store/tikv/client.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/store/tikv/client.go b/store/tikv/client.go index f160748b13748..922ea4e923a10 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -572,6 +572,11 @@ func newRPCClient(security config.Security) *rpcClient { } } +// NewTestRPCClient is for some external tests. +func NewTestRPCClient() Client { + return newRPCClient(config.Security{}) +} + func (c *rpcClient) getConnArray(addr string) (*connArray, error) { c.RLock() if c.isClosed { From e4c9654684290e3ca84e4c94a3ef1bbcfd17b502 Mon Sep 17 00:00:00 2001 From: qupeng Date: Thu, 4 Jul 2019 13:38:22 +0800 Subject: [PATCH 07/11] address comments --- store/tikv/client.go | 19 +++++++++---------- store/tikv/client_fail_test.go | 5 ++++- store/tikv/mock_tikv_service.go | 8 ++++---- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/store/tikv/client.go b/store/tikv/client.go index 922ea4e923a10..5e976b7c5775e 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -127,6 +127,14 @@ func (c *batchCommandsClient) send(request *tikvpb.BatchCommandsRequest, entries } } +func (c *batchCommandsClient) recv() (*tikvpb.BatchCommandsResponse, error) { + failpoint.Inject("gotErrorInRecvLoop", func(_ failpoint.Value) (*tikvpb.BatchCommandsResponse, error) { + return nil, errors.New("injected error in batchRecvLoop") + }) + // When `conn.Close()` is called, `client.Recv()` will return an error. + return c.client.Recv() +} + // `failPendingRequests` must be called in locked contexts in order to avoid double closing channels. func (c *batchCommandsClient) failPendingRequests(err error) { failpoint.Inject("panicInFailPendingRequests", nil) @@ -178,16 +186,7 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { }() for { - var err error = nil - var resp *tikvpb.BatchCommandsResponse = nil - failpoint.Inject("gotErrorInRecvLoop", func(_ failpoint.Value) { - err = errors.New("injected error in batchRecvLoop") - }) - if err == nil { - // When `conn.Close()` is called, `client.Recv()` will return an error. - resp, err = c.client.Recv() - } - + resp, err := c.recv() if err != nil { now := time.Now() for { // try to re-create the streaming in the loop. diff --git a/store/tikv/client_fail_test.go b/store/tikv/client_fail_test.go index 2f9b8480ca144..96b64c7272bc6 100644 --- a/store/tikv/client_fail_test.go +++ b/store/tikv/client_fail_test.go @@ -29,9 +29,10 @@ func (s *testClientSuite) TestPanicInRecvLoop(c *C) { c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/panicInFailPendingRequests", `panic`), IsNil) c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/gotErrorInRecvLoop", `return("0")`), IsNil) - port := startMockTikvService() + server, port := startMockTikvService() c.Assert(port > 0, IsTrue) + grpcConnectionCount := config.GetGlobalConfig().TiKVClient.GrpcConnectionCount config.GetGlobalConfig().TiKVClient.GrpcConnectionCount = 1 addr := fmt.Sprintf("%s:%d", "127.0.0.1", port) rpcClient := newRPCClient(config.Security{}) @@ -50,4 +51,6 @@ func (s *testClientSuite) TestPanicInRecvLoop(c *C) { } _, err = rpcClient.SendRequest(context.Background(), addr, req, time.Second) c.Assert(err, IsNil) + server.Stop() + config.GetGlobalConfig().TiKVClient.GrpcConnectionCount = grpcConnectionCount } diff --git a/store/tikv/mock_tikv_service.go b/store/tikv/mock_tikv_service.go index 68862766b4f60..a3541892038ef 100644 --- a/store/tikv/mock_tikv_service.go +++ b/store/tikv/mock_tikv_service.go @@ -46,8 +46,8 @@ func (s *server) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error { } } -// Try to start a gRPC server and retrun the binded port. -func startMockTikvService() int { +// Try to start a gRPC server and retrun the server instance and binded port. +func startMockTikvService() (*grpc.Server, int) { for port := 40000; port < 50000; port++ { lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", port)) if err != nil { @@ -64,8 +64,8 @@ func startMockTikvService() int { ) } }() - return port + return s, port } logutil.BgLogger().Error("can't start mock tikv service because no available ports") - return -1 + return nil, -1 } From a8c2f5f3d8e37eb7ad3fc861bf211f5da9258413 Mon Sep 17 00:00:00 2001 From: qupeng Date: Thu, 4 Jul 2019 15:36:18 +0800 Subject: [PATCH 08/11] address comments --- store/tikv/client_fail_test.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/store/tikv/client_fail_test.go b/store/tikv/client_fail_test.go index 96b64c7272bc6..bf44ab088a66d 100644 --- a/store/tikv/client_fail_test.go +++ b/store/tikv/client_fail_test.go @@ -1,4 +1,4 @@ -// Copyright 2016 PingCAP, Inc. +// Copyright 2019 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -25,6 +25,12 @@ import ( "github.com/pingcap/tidb/store/tikv/tikvrpc" ) +func setGrpcConnectionCount(count uint) { + newConf := config.NewConfig() + newConf.TiKVClient.GrpcConnectionCount = count + config.StoreGlobalConfig(newConf) +} + func (s *testClientSuite) TestPanicInRecvLoop(c *C) { c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/panicInFailPendingRequests", `panic`), IsNil) c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/gotErrorInRecvLoop", `return("0")`), IsNil) @@ -33,7 +39,7 @@ func (s *testClientSuite) TestPanicInRecvLoop(c *C) { c.Assert(port > 0, IsTrue) grpcConnectionCount := config.GetGlobalConfig().TiKVClient.GrpcConnectionCount - config.GetGlobalConfig().TiKVClient.GrpcConnectionCount = 1 + setGrpcConnectionCount(1) addr := fmt.Sprintf("%s:%d", "127.0.0.1", port) rpcClient := newRPCClient(config.Security{}) @@ -52,5 +58,5 @@ func (s *testClientSuite) TestPanicInRecvLoop(c *C) { _, err = rpcClient.SendRequest(context.Background(), addr, req, time.Second) c.Assert(err, IsNil) server.Stop() - config.GetGlobalConfig().TiKVClient.GrpcConnectionCount = grpcConnectionCount + setGrpcConnectionCount(grpcConnectionCount) } From 2d2eeb71e172c98e112d9933800969d02414fd80 Mon Sep 17 00:00:00 2001 From: qupeng Date: Thu, 4 Jul 2019 18:42:46 +0800 Subject: [PATCH 09/11] fix test case --- go.mod | 4 +--- go.sum | 10 ++++------ store/tikv/client_fail_test.go | 6 +++--- store/tikv/mock_tikv_service.go | 7 ++----- store/tikv/tikvrpc/tikvrpc.go | 10 ++++++++++ 5 files changed, 20 insertions(+), 17 deletions(-) diff --git a/go.mod b/go.mod index 0acf514d25d5d..64917a182d71d 100644 --- a/go.mod +++ b/go.mod @@ -33,15 +33,13 @@ require ( github.com/myesui/uuid v1.0.0 // indirect github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef - github.com/onsi/ginkgo v1.7.0 // indirect - github.com/onsi/gomega v1.4.3 // indirect github.com/opentracing/basictracer-go v1.0.0 github.com/opentracing/opentracing-go v1.0.2 github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8 github.com/pingcap/errors v0.11.4 github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e - github.com/pingcap/kvproto v0.0.0-20190619024611-a4759dfe3753 + github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531 github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 github.com/pingcap/parser v0.0.0-20190701123046-5768e68c1e65 github.com/pingcap/pd v0.0.0-20190617100349-293d4b5189bf diff --git a/go.sum b/go.sum index 84ba911a2e97c..675d987a4cd1b 100644 --- a/go.sum +++ b/go.sum @@ -137,12 +137,10 @@ github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef h1:K0Fn+DoFqNqktdZtdV3 github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef/go.mod h1:7WjlapSfwQyo6LNmIvEWzsW1hbBQfpUO4JWnuQRmva8= github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= +github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= -github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I= github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= -github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU= -github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/opentracing/basictracer-go v1.0.0 h1:YyUAhaEfjoWXclZVJ9sGoNct7j4TVk7lZWlQw5UXuoo= github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg= @@ -161,8 +159,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= -github.com/pingcap/kvproto v0.0.0-20190619024611-a4759dfe3753 h1:92t0y430CJF0tN1lvUhP5fhnYTFmssATJqwxQtvixYU= -github.com/pingcap/kvproto v0.0.0-20190619024611-a4759dfe3753/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= +github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531 h1:8xk2HobDwClB5E3Hv9TEPiS7K7bv3ykWHLyZzuUYywI= +github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= diff --git a/store/tikv/client_fail_test.go b/store/tikv/client_fail_test.go index bf44ab088a66d..78f75bf43a9e6 100644 --- a/store/tikv/client_fail_test.go +++ b/store/tikv/client_fail_test.go @@ -20,7 +20,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/failpoint" - "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/store/tikv/tikvrpc" ) @@ -52,8 +52,8 @@ func (s *testClientSuite) TestPanicInRecvLoop(c *C) { c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/gotErrorInRecvLoop"), IsNil) req := &tikvrpc.Request{ - Type: tikvrpc.CmdGet, - Get: &kvrpcpb.GetRequest{}, + Type: tikvrpc.CmdEmpty, + Empty: &tikvpb.BatchCommandsEmptyRequest{}, } _, err = rpcClient.SendRequest(context.Background(), addr, req, time.Second) c.Assert(err, IsNil) diff --git a/store/tikv/mock_tikv_service.go b/store/tikv/mock_tikv_service.go index a3541892038ef..2e5db005c36d6 100644 --- a/store/tikv/mock_tikv_service.go +++ b/store/tikv/mock_tikv_service.go @@ -5,7 +5,6 @@ import ( "net" "time" - "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" @@ -27,10 +26,8 @@ func (s *server) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error { responses := make([]*tikvpb.BatchCommandsResponse_Response, 0, len(req.GetRequestIds())) for i := 0; i < len(req.GetRequestIds()); i++ { responses = append(responses, &tikvpb.BatchCommandsResponse_Response{ - Cmd: &tikvpb.BatchCommandsResponse_Response_Get{ - Get: &kvrpcpb.GetResponse{ - Value: []byte{'a', 'b', 'c'}, - }, + Cmd: &tikvpb.BatchCommandsResponse_Response_Empty{ + Empty: &tikvpb.BatchCommandsEmptyResponse{}, }, }) } diff --git a/store/tikv/tikvrpc/tikvrpc.go b/store/tikv/tikvrpc/tikvrpc.go index 8ee060fdd49b4..809c12f553f82 100644 --- a/store/tikv/tikvrpc/tikvrpc.go +++ b/store/tikv/tikvrpc/tikvrpc.go @@ -66,6 +66,8 @@ const ( CmdSplitRegion CmdDebugGetRegionProperties CmdType = 2048 + iota + + CmdEmpty CmdType = 3072 + iota ) func (t CmdType) String() string { @@ -163,6 +165,8 @@ type Request struct { PessimisticRollback *kvrpcpb.PessimisticRollbackRequest DebugGetRegionProperties *debugpb.GetRegionPropertiesRequest + + Empty *tikvpb.BatchCommandsEmptyRequest } // ToBatchCommandsRequest converts the request to an entry in BatchCommands request. @@ -212,6 +216,8 @@ func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Reques return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_PessimisticLock{PessimisticLock: req.PessimisticLock}} case CmdPessimisticRollback: return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_PessimisticRollback{PessimisticRollback: req.PessimisticRollback}} + case CmdEmpty: + return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Empty{Empty: req.Empty}} } return nil } @@ -258,6 +264,8 @@ type Response struct { PessimisticRollback *kvrpcpb.PessimisticRollbackResponse DebugGetRegionProperties *debugpb.GetRegionPropertiesResponse + + Empty *tikvpb.BatchCommandsEmptyResponse } // FromBatchCommandsResponse converts a BatchCommands response to Response. @@ -307,6 +315,8 @@ func FromBatchCommandsResponse(res *tikvpb.BatchCommandsResponse_Response) *Resp return &Response{Type: CmdPessimisticLock, PessimisticLock: res.PessimisticLock} case *tikvpb.BatchCommandsResponse_Response_PessimisticRollback: return &Response{Type: CmdPessimisticRollback, PessimisticRollback: res.PessimisticRollback} + case *tikvpb.BatchCommandsResponse_Response_Empty: + return &Response{Type: CmdEmpty, Empty: res.Empty} } return nil } From f8d481b53d4e65db40e303b74496e889558ff25f Mon Sep 17 00:00:00 2001 From: qupeng Date: Thu, 4 Jul 2019 21:00:28 +0800 Subject: [PATCH 10/11] fix test --- store/tikv/client_test.go | 11 +++++++++-- store/tikv/tikvrpc/tikvrpc.go | 5 +++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/store/tikv/client_test.go b/store/tikv/client_test.go index 4d4d2fa1e7d38..895671bc76ad5 100644 --- a/store/tikv/client_test.go +++ b/store/tikv/client_test.go @@ -35,9 +35,15 @@ type testClientSuite struct { var _ = Suite(&testClientSuite{}) +func setMaxBatchSize(size uint) { + newConf := config.NewConfig() + newConf.TiKVClient.MaxBatchSize = size + config.StoreGlobalConfig(newConf) +} + func (s *testClientSuite) TestConn(c *C) { - globalConfig := config.GetGlobalConfig() - globalConfig.TiKVClient.MaxBatchSize = 0 // Disable batch. + maxBatchSize := config.GetGlobalConfig().TiKVClient.MaxBatchSize + setMaxBatchSize(0) client := newRPCClient(config.Security{}) @@ -55,6 +61,7 @@ func (s *testClientSuite) TestConn(c *C) { conn3, err := client.getConnArray(addr) c.Assert(err, NotNil) c.Assert(conn3, IsNil) + setMaxBatchSize(maxBatchSize) } func (s *testClientSuite) TestRemoveCanceledRequests(c *C) { diff --git a/store/tikv/tikvrpc/tikvrpc.go b/store/tikv/tikvrpc/tikvrpc.go index 809c12f553f82..b3cf78ee5e82e 100644 --- a/store/tikv/tikvrpc/tikvrpc.go +++ b/store/tikv/tikvrpc/tikvrpc.go @@ -393,6 +393,7 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error { req.MvccGetByStartTs.Context = ctx case CmdSplitRegion: req.SplitRegion.Context = ctx + case CmdEmpty: default: return fmt.Errorf("invalid request type %v", req.Type) } @@ -515,6 +516,7 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) { resp.SplitRegion = &kvrpcpb.SplitRegionResponse{ RegionError: e, } + case CmdEmpty: default: return nil, fmt.Errorf("invalid request type %v", req.Type) } @@ -579,6 +581,7 @@ func (resp *Response) GetRegionError() (*errorpb.Error, error) { e = resp.MvccGetByStartTS.GetRegionError() case CmdSplitRegion: e = resp.SplitRegion.GetRegionError() + case CmdEmpty: default: return nil, fmt.Errorf("invalid response type %v", resp.Type) } @@ -651,6 +654,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp resp.MvccGetByStartTS, err = client.MvccGetByStartTs(ctx, req.MvccGetByStartTs) case CmdSplitRegion: resp.SplitRegion, err = client.SplitRegion(ctx, req.SplitRegion) + case CmdEmpty: + resp.Empty, err = &tikvpb.BatchCommandsEmptyResponse{}, nil default: return nil, errors.Errorf("invalid request type: %v", req.Type) } From ac735592438c2d9494790ade2386a3c5cb3ec91c Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Fri, 5 Jul 2019 16:39:27 +0800 Subject: [PATCH 11/11] fix panic when sendBatchRequest on conn.Array and close rpcClient simultaneously --- store/tikv/client.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/store/tikv/client.go b/store/tikv/client.go index 5e976b7c5775e..9a87b45aaba0d 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -80,6 +80,7 @@ type connArray struct { batchCommandsCh chan *batchCommandsEntry batchCommandsClients []*batchCommandsClient tikvTransportLayerLoad uint64 + closed chan struct{} // Notify rpcClient to check the idle flag idleNotify *uint32 @@ -142,8 +143,8 @@ func (c *batchCommandsClient) failPendingRequests(err error) { id, _ := key.(uint64) entry, _ := value.(*batchCommandsEntry) entry.err = err - close(entry.res) c.batched.Delete(id) + close(entry.res) return true }) } @@ -246,6 +247,7 @@ func newConnArray(maxSize uint, addr string, security config.Security, idleNotif batchCommandsCh: make(chan *batchCommandsEntry, cfg.TiKVClient.MaxBatchSize), batchCommandsClients: make([]*batchCommandsClient, 0, maxSize), tikvTransportLayerLoad: 0, + closed: make(chan struct{}), idleNotify: idleNotify, idleDetect: time.NewTimer(idleTimeout), @@ -348,7 +350,10 @@ func (a *connArray) Close() { // After connections are closed, `batchRecvLoop`s will check the flag. atomic.StoreInt32(&c.closed, 1) } - close(a.batchCommandsCh) + // Don't close(batchCommandsCh) because when Close() is called, someone maybe + // calling SendRequest and writing batchCommandsCh, if we close it here the + // writing goroutine will panic. + close(a.closed) for i, c := range a.v { if c != nil { @@ -395,6 +400,8 @@ func (a *connArray) fetchAllPendingRequests( atomic.CompareAndSwapUint32(a.idleNotify, 0, 1) // This connArray to be recycled return + case <-a.closed: + return } if headEntry == nil { return