Skip to content

Commit

Permalink
Merge branch 'master' into replace_datumcompare_13
Browse files Browse the repository at this point in the history
  • Loading branch information
wjhuang2016 authored Dec 16, 2021
2 parents a659379 + 83af272 commit 39e0fda
Show file tree
Hide file tree
Showing 62 changed files with 3,445 additions and 2,205 deletions.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ func (e *Engine) loadEngineMeta() error {
jsonBytes, closer, err := e.db.Get(engineMetaKey)
if err != nil {
if err == pebble.ErrNotFound {
log.L().Debug("local db missing engine meta", zap.Stringer("uuid", e.UUID), zap.Error(err))
log.L().Debug("local db missing engine meta", zap.Stringer("uuid", e.UUID), log.ShortError(err))
return nil
}
return err
Expand Down
12 changes: 6 additions & 6 deletions br/pkg/lightning/log/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ type FilterCore struct {
filters []string
}

// NewFilterCore returns a FilterCore.
// NewFilterCore returns a FilterCore, only logs under allowPackages will be written.
//
// Example, filter TiDB's log, `NewFilterCore(core, "github.com/pingcap/tidb/")`.
// Example, only write br's log and ignore any other, `NewFilterCore(core, "github.com/pingcap/tidb/br/")`.
// Note, must set AddCaller() to the logger.
func NewFilterCore(core zapcore.Core, filteredPackages ...string) *FilterCore {
func NewFilterCore(core zapcore.Core, allowPackages ...string) *FilterCore {
return &FilterCore{
Core: core,
filters: filteredPackages,
filters: allowPackages,
}
}

Expand All @@ -50,8 +50,8 @@ func (f *FilterCore) Write(entry zapcore.Entry, fields []zapcore.Field) error {
for i := range f.filters {
// Caller.Function is a package path-qualified function name.
if strings.Contains(entry.Caller.Function, f.filters[i]) {
return nil
return f.Core.Write(entry, fields)
}
}
return f.Core.Write(entry, fields)
return nil
}
13 changes: 5 additions & 8 deletions br/pkg/lightning/log/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ func (s *testFilterSuite) TestFilter(c *C) {
)

logger, buffer = log.MakeTestLogger(zap.WrapCore(func(c zapcore.Core) zapcore.Core {
return log.NewFilterCore(c, "github.com/pingcap/tidb/br/")
return log.NewFilterCore(c, "github.com/pingcap/br/")
}), zap.AddCaller())
logger.Warn("the message", zap.Int("number", 123456), zap.Ints("array", []int{7, 8, 9}))
c.Assert(buffer.Stripped(), HasLen, 0)

logger, buffer = log.MakeTestLogger(zap.WrapCore(func(c zapcore.Core) zapcore.Core {
return log.NewFilterCore(c, "github.com/pingcap/br/").With([]zap.Field{zap.String("a", "b")})
return log.NewFilterCore(c, "github.com/pingcap/tidb/br/").With([]zap.Field{zap.String("a", "b")})
}), zap.AddCaller())
logger.Warn("the message", zap.Int("number", 123456), zap.Ints("array", []int{7, 8, 9}))
c.Assert(
Expand All @@ -40,7 +40,7 @@ func (s *testFilterSuite) TestFilter(c *C) {
)

logger, buffer = log.MakeTestLogger(zap.WrapCore(func(c zapcore.Core) zapcore.Core {
return log.NewFilterCore(c, "github.com/pingcap/tidb/br/").With([]zap.Field{zap.String("a", "b")})
return log.NewFilterCore(c, "github.com/pingcap/br/").With([]zap.Field{zap.String("a", "b")})
}), zap.AddCaller())
logger.Warn("the message", zap.Int("number", 123456), zap.Ints("array", []int{7, 8, 9}))
c.Assert(buffer.Stripped(), HasLen, 0)
Expand All @@ -49,11 +49,8 @@ func (s *testFilterSuite) TestFilter(c *C) {
logger, buffer = log.MakeTestLogger(zap.WrapCore(func(c zapcore.Core) zapcore.Core {
return log.NewFilterCore(c, "github.com/pingcap/check/").With([]zap.Field{zap.String("a", "b")})
}), zap.AddCaller())
logger.Warn("the message", zap.String("stack", "github.com/pingcap/check/"))
c.Assert(
buffer.Stripped(), Equals,
`{"$lvl":"WARN","$msg":"the message","a":"b","stack":"github.com/pingcap/check/"}`,
)
logger.Warn("the message", zap.String("stack", "github.com/pingcap/tidb/br/"))
c.Assert(buffer.Stripped(), HasLen, 0)
}

// PASS: filter_test.go:82: testFilterSuite.BenchmarkFilterRegexMatchString 1000000 1163 ns/op
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func InitLogger(cfg *Config, tidbLoglevel string) error {
}
filterTiDBLog := zap.WrapCore(func(core zapcore.Core) zapcore.Core {
// Filter logs from TiDB and PD.
return NewFilterCore(core, "github.com/tikv/pd/")
return NewFilterCore(core, "github.com/pingcap/tidb/br/")
})
// "-" is a special config for log to stdout.
if len(cfg.File) > 0 && cfg.File != "-" {
Expand Down
8 changes: 4 additions & 4 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,18 +339,18 @@ func (rs *RegionSplitter) ScatterRegions(ctx context.Context, newRegions []*Regi
})
}

func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) error {
func CheckRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) error {
// current pd can't guarantee the consistency of returned regions
if len(regions) == 0 {
return errors.Annotatef(berrors.ErrPDBatchScanRegion, "scan region return empty result, startKey: %s, endkey: %s",
return errors.Annotatef(berrors.ErrPDBatchScanRegion, "scan region return empty result, startKey: %s, endKey: %s",
redact.Key(startKey), redact.Key(endKey))
}

if bytes.Compare(regions[0].Region.StartKey, startKey) > 0 {
return errors.Annotatef(berrors.ErrPDBatchScanRegion, "first region's startKey > startKey, startKey: %s, regionStartKey: %s",
redact.Key(startKey), redact.Key(regions[0].Region.StartKey))
} else if len(regions[len(regions)-1].Region.EndKey) != 0 && bytes.Compare(regions[len(regions)-1].Region.EndKey, endKey) < 0 {
return errors.Annotatef(berrors.ErrPDBatchScanRegion, "last region's endKey < startKey, startKey: %s, regionStartKey: %s",
return errors.Annotatef(berrors.ErrPDBatchScanRegion, "last region's endKey < endKey, endKey: %s, regionEndKey: %s",
redact.Key(endKey), redact.Key(regions[len(regions)-1].Region.EndKey))
}

Expand Down Expand Up @@ -398,7 +398,7 @@ func PaginateScanRegion(
break
}
}
if err := checkRegionConsistency(startKey, endKey, regions); err != nil {
if err := CheckRegionConsistency(startKey, endKey, regions); err != nil {
log.Warn("failed to scan region, retrying", logutil.ShortError(err))
return err
}
Expand Down
67 changes: 67 additions & 0 deletions br/pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,3 +445,70 @@ func (s *testRangeSuite) TestNeedSplit(c *C) {
// Out of region
c.Assert(restore.NeedSplit([]byte("e"), regions), IsNil)
}

func (s *testRangeSuite) TestRegionConsistency(c *C) {
cases := []struct {
startKey []byte
endKey []byte
err string
regions []*restore.RegionInfo
}{
{
codec.EncodeBytes([]byte{}, []byte("a")),
codec.EncodeBytes([]byte{}, []byte("a")),
"scan region return empty result, startKey: (.*?), endKey: (.*?)",
[]*restore.RegionInfo{},
},
{
codec.EncodeBytes([]byte{}, []byte("a")),
codec.EncodeBytes([]byte{}, []byte("a")),
"first region's startKey > startKey, startKey: (.*?), regionStartKey: (.*?)",
[]*restore.RegionInfo{
{
Region: &metapb.Region{
StartKey: codec.EncodeBytes([]byte{}, []byte("b")),
EndKey: codec.EncodeBytes([]byte{}, []byte("d")),
},
},
},
},
{
codec.EncodeBytes([]byte{}, []byte("b")),
codec.EncodeBytes([]byte{}, []byte("e")),
"last region's endKey < endKey, endKey: (.*?), regionEndKey: (.*?)",
[]*restore.RegionInfo{
{
Region: &metapb.Region{
StartKey: codec.EncodeBytes([]byte{}, []byte("b")),
EndKey: codec.EncodeBytes([]byte{}, []byte("d")),
},
},
},
},
{
codec.EncodeBytes([]byte{}, []byte("c")),
codec.EncodeBytes([]byte{}, []byte("e")),
"region endKey not equal to next region startKey(.*?)",
[]*restore.RegionInfo{
{
Region: &metapb.Region{
StartKey: codec.EncodeBytes([]byte{}, []byte("b")),
EndKey: codec.EncodeBytes([]byte{}, []byte("d")),
},
},
{
Region: &metapb.Region{
StartKey: codec.EncodeBytes([]byte{}, []byte("e")),
EndKey: codec.EncodeBytes([]byte{}, []byte("f")),
},
},
},
},
}
for _, ca := range cases {
c.Assert(
restore.CheckRegionConsistency(ca.startKey, ca.endKey, ca.regions),
ErrorMatches,
ca.err)
}
}
Loading

0 comments on commit 39e0fda

Please sign in to comment.