From 7749430eedfeb7c137c17ef4481363f6be73de0c Mon Sep 17 00:00:00 2001 From: lysu Date: Fri, 18 Oct 2019 14:24:14 +0800 Subject: [PATCH] tikv: non-blocking establish superbatch connection with timeout (#12733) (#12814) --- store/tikv/client.go | 21 +++++++-------------- store/tikv/client_batch.go | 28 +++++++++++++++++++++++++++- 2 files changed, 34 insertions(+), 15 deletions(-) diff --git a/store/tikv/client.go b/store/tikv/client.go index c263648903a64..b32dd0a9848e8 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -145,23 +145,16 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint a.v[i] = conn if allowBatch { - // Initialize batch streaming clients. - tikvClient := tikvpb.NewTikvClient(conn) - streamClient, err := tikvClient.BatchCommands(context.TODO()) - if err != nil { - a.Close() - return errors.Trace(err) - } batchClient := &batchCommandsClient{ - target: a.target, - conn: conn, - client: streamClient, - batched: sync.Map{}, - idAlloc: 0, - closed: 0, + target: a.target, + conn: conn, + batched: sync.Map{}, + idAlloc: 0, + closed: 0, + tikvClientCfg: cfg.TiKVClient, + tikvLoad: &a.tikvTransportLayerLoad, } a.batchCommandsClients = append(a.batchCommandsClients, batchClient) - go batchClient.batchRecvLoop(cfg.TiKVClient, &a.tikvTransportLayerLoad) } } go tikvrpc.CheckStreamTimeoutLoop(a.streamTimeout) diff --git a/store/tikv/client_batch.go b/store/tikv/client_batch.go index 9aaba5da7b14b..86e535ee03cfb 100644 --- a/store/tikv/client_batch.go +++ b/store/tikv/client_batch.go @@ -192,6 +192,9 @@ type batchCommandsClient struct { batched sync.Map idAlloc uint64 + tikvClientCfg config.TiKVClient + tikvLoad *uint64 + // closed indicates the batch client is closed explicitly or not. closed int32 // tryLock protects client when re-create the streaming. @@ -239,7 +242,6 @@ func (c *batchCommandsClient) failPendingRequests(err error) { func (c *batchCommandsClient) reCreateStreamingClientOnce(err error) error { c.failPendingRequests(err) // fail all pending requests. - // Re-establish a application layer stream. TCP layer is handled by gRPC. tikvClient := tikvpb.NewTikvClient(c.conn) streamClient, err := tikvClient.BatchCommands(context.TODO()) @@ -443,10 +445,34 @@ func (a *batchConn) getClientAndSend(entries []*batchCommandsEntry, requests []* RequestIds: requestIDs, } + if err := cli.initBatchClient(); err != nil { + logutil.Logger(context.Background()).Warn( + "init create streaming fail", + zap.String("target", cli.target), + zap.Error(err), + ) + return + } + cli.send(req, entries) return } +func (c *batchCommandsClient) initBatchClient() error { + if c.client != nil { + return nil + } + // Initialize batch streaming clients. + tikvClient := tikvpb.NewTikvClient(c.conn) + streamClient, err := tikvClient.BatchCommands(context.TODO()) + if err != nil { + return errors.Trace(err) + } + c.client = streamClient + go c.batchRecvLoop(c.tikvClientCfg, c.tikvLoad) + return nil +} + func (a *batchConn) Close() { // Close all batchRecvLoop. for _, c := range a.batchCommandsClients {