Skip to content

Commit

Permalink
rollback change
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv committed Nov 23, 2021
1 parent 7c10715 commit 58f035d
Showing 1 changed file with 27 additions and 39 deletions.
66 changes: 27 additions & 39 deletions br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"io"
"math"
"sort"
"sync"
"time"

"github.com/cockroachdb/pebble"
Expand Down Expand Up @@ -260,45 +259,34 @@ func (manager *DuplicateManager) sendRequestToTiKV(ctx context.Context,
}
tryTimes := 0
indexHandles := makePendingIndexHandlesWithCapacity(0)
eg, rpcctx := errgroup.WithContext(ctx)
unfinishedRegions := make([]*metapb.Region, 0)
var rgLock sync.Mutex
for len(regions) > 0 {
if tryTimes > maxRetryTimes {
return errors.Errorf("retry time exceed limit")
}
unfinishedRegions := make([]*restore.RegionInfo, 0)
waitingClients := make([]import_sstpb.ImportSST_DuplicateDetectClient, 0)
waitingRegions := make([]*restore.RegionInfo, 0)
watingRegions := make([]*restore.RegionInfo, 0)
for idx, region := range regions {
r := region
manager.remoteWorkerPool.ApplyOnErrorGroup(eg, func() error {
_, start, _ := codec.DecodeBytes(r.Region.StartKey, []byte{})
_, end, _ := codec.DecodeBytes(r.Region.EndKey, []byte{})
if bytes.Compare(startKey, r.Region.StartKey) > 0 {
start = req.start
}
if r.Region.EndKey == nil || len(r.Region.EndKey) == 0 || bytes.Compare(endKey, r.Region.EndKey) < 0 {
end = req.end
}

logger.Debug("[detect-dupe] get duplicate stream",
zap.Int("localStreamID", idx),
logutil.Region(region.Region),
logutil.Leader(region.Leader),
logutil.Key("regionStartKey", start),
logutil.Key("regionEndKey", end))
cli, err := manager.getDuplicateStream(ctx, region, start, end)
if err != nil {
r, err := manager.splitCli.GetRegionByID(ctx, region.Region.GetId())
if err != nil {
unfinishedRegions = append(unfinishedRegions, region)
} else {
unfinishedRegions = append(unfinishedRegions, r)
}
return nil
}
})
if len(waitingClients) > manager.regionConcurrency {
r := regions[idx:]
unfinishedRegions = append(unfinishedRegions, r...)
break
}
_, start, _ := codec.DecodeBytes(region.Region.StartKey, []byte{})
_, end, _ := codec.DecodeBytes(region.Region.EndKey, []byte{})
if bytes.Compare(startKey, region.Region.StartKey) > 0 {
start = req.start
}
if region.Region.EndKey == nil || len(region.Region.EndKey) == 0 || bytes.Compare(endKey, region.Region.EndKey) < 0 {
end = req.end
}

logger.Debug("[detect-dupe] get duplicate stream",
zap.Int("localStreamID", idx),
logutil.Region(region.Region),
logutil.Leader(region.Leader),
logutil.Key("regionStartKey", start),
logutil.Key("regionEndKey", end))
cli, err := manager.getDuplicateStream(ctx, region, start, end)
if err != nil {
r, err := manager.splitCli.GetRegionByID(ctx, region.Region.GetId())
Expand All @@ -309,7 +297,7 @@ func (manager *DuplicateManager) sendRequestToTiKV(ctx context.Context,
}
} else {
waitingClients = append(waitingClients, cli)
waitingRegions = append(waitingRegions, region)
watingRegions = append(watingRegions, region)
}
}

Expand All @@ -323,7 +311,7 @@ func (manager *DuplicateManager) sendRequestToTiKV(ctx context.Context,
}

for idx, cli := range waitingClients {
region := waitingRegions[idx]
region := watingRegions[idx]
cliLogger := logger.With(
zap.Int("localStreamID", idx),
logutil.Region(region.Region),
Expand Down Expand Up @@ -362,9 +350,9 @@ func (manager *DuplicateManager) sendRequestToTiKV(ctx context.Context,
cliLogger.Warn("[detect-dupe] meet key error in duplicate detect response from TiKV, retry again ",
zap.String("RegionError", resp.GetRegionError().GetMessage()))

r, err := restore.PaginateScanRegion(ctx, manager.splitCli, waitingRegions[idx].Region.GetStartKey(), waitingRegions[idx].Region.GetEndKey(), scanRegionLimit)
r, err := restore.PaginateScanRegion(ctx, manager.splitCli, watingRegions[idx].Region.GetStartKey(), watingRegions[idx].Region.GetEndKey(), scanRegionLimit)
if err != nil {
unfinishedRegions = append(unfinishedRegions, waitingRegions[idx])
unfinishedRegions = append(unfinishedRegions, watingRegions[idx])
} else {
unfinishedRegions = append(unfinishedRegions, r...)
}
Expand Down Expand Up @@ -723,15 +711,15 @@ func (manager *DuplicateManager) getDuplicateStream(ctx context.Context,
KeyOnly: false,
}
stream, err := cli.DuplicateDetect(ctx, req)
return stream, errors.Trace(err)
return stream, err
}

func (manager *DuplicateManager) getImportClient(ctx context.Context, peer *metapb.Peer) (import_sstpb.ImportSSTClient, error) {
conn, err := manager.connPool.GetGrpcConn(ctx, peer.GetStoreId(), 1, func(ctx context.Context) (*grpc.ClientConn, error) {
return manager.makeConn(ctx, peer.GetStoreId())
})
if err != nil {
return nil, errors.Trace(err)
return nil, err
}
return import_sstpb.NewImportSSTClient(conn), nil
}
Expand Down

0 comments on commit 58f035d

Please sign in to comment.