Skip to content

Commit

Permalink
Merge branch 'release-3.0' into automated-cherry-pick-of-pingcap#10837-…
Browse files Browse the repository at this point in the history
…upstream-release-3.0
  • Loading branch information
sre-bot authored Oct 18, 2019
2 parents 0c6c6aa + 7749430 commit fd7e80e
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 15 deletions.
21 changes: 7 additions & 14 deletions store/tikv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,23 +145,16 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
a.v[i] = conn

if allowBatch {
// Initialize batch streaming clients.
tikvClient := tikvpb.NewTikvClient(conn)
streamClient, err := tikvClient.BatchCommands(context.TODO())
if err != nil {
a.Close()
return errors.Trace(err)
}
batchClient := &batchCommandsClient{
target: a.target,
conn: conn,
client: streamClient,
batched: sync.Map{},
idAlloc: 0,
closed: 0,
target: a.target,
conn: conn,
batched: sync.Map{},
idAlloc: 0,
closed: 0,
tikvClientCfg: cfg.TiKVClient,
tikvLoad: &a.tikvTransportLayerLoad,
}
a.batchCommandsClients = append(a.batchCommandsClients, batchClient)
go batchClient.batchRecvLoop(cfg.TiKVClient, &a.tikvTransportLayerLoad)
}
}
go tikvrpc.CheckStreamTimeoutLoop(a.streamTimeout)
Expand Down
28 changes: 27 additions & 1 deletion store/tikv/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ type batchCommandsClient struct {
batched sync.Map
idAlloc uint64

tikvClientCfg config.TiKVClient
tikvLoad *uint64

// closed indicates the batch client is closed explicitly or not.
closed int32
// tryLock protects client when re-create the streaming.
Expand Down Expand Up @@ -239,7 +242,6 @@ func (c *batchCommandsClient) failPendingRequests(err error) {

func (c *batchCommandsClient) reCreateStreamingClientOnce(err error) error {
c.failPendingRequests(err) // fail all pending requests.

// Re-establish a application layer stream. TCP layer is handled by gRPC.
tikvClient := tikvpb.NewTikvClient(c.conn)
streamClient, err := tikvClient.BatchCommands(context.TODO())
Expand Down Expand Up @@ -443,10 +445,34 @@ func (a *batchConn) getClientAndSend(entries []*batchCommandsEntry, requests []*
RequestIds: requestIDs,
}

if err := cli.initBatchClient(); err != nil {
logutil.Logger(context.Background()).Warn(
"init create streaming fail",
zap.String("target", cli.target),
zap.Error(err),
)
return
}

cli.send(req, entries)
return
}

func (c *batchCommandsClient) initBatchClient() error {
if c.client != nil {
return nil
}
// Initialize batch streaming clients.
tikvClient := tikvpb.NewTikvClient(c.conn)
streamClient, err := tikvClient.BatchCommands(context.TODO())
if err != nil {
return errors.Trace(err)
}
c.client = streamClient
go c.batchRecvLoop(c.tikvClientCfg, c.tikvLoad)
return nil
}

func (a *batchConn) Close() {
// Close all batchRecvLoop.
for _, c := range a.batchCommandsClients {
Expand Down

0 comments on commit fd7e80e

Please sign in to comment.