Skip to content

Commit

Permalink
br-stream: fix the issue that key is not in region. (#32877)
Browse files Browse the repository at this point in the history
* add ObjPrefix to speed up walkDir in br-stream

* add start/end key to filter unnecessay key
  • Loading branch information
3pointer authored Mar 9, 2022
1 parent ebd27f8 commit ce0b12c
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 6 deletions.
3 changes: 2 additions & 1 deletion br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1335,7 +1335,8 @@ const (
// ReadStreamMetaByTS is used for streaming task. collect all meta file by TS.
func (rc *Client) ReadStreamMetaByTS(ctx context.Context, restoreTS uint64) ([]*backuppb.Metadata, error) {
streamBackupMetaFiles := make([]*backuppb.Metadata, 0)
err := rc.storage.WalkDir(ctx, &storage.WalkOption{}, func(path string, size int64) error {
opt := &storage.WalkOption{ObjPrefix: streamBackupMetaPrefix}
err := rc.storage.WalkDir(ctx, opt, func(path string, size int64) error {
if strings.Contains(path, streamBackupMetaPrefix) {
m := &backuppb.Metadata{}
b, err := rc.storage.ReadFile(ctx, path)
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,8 @@ func (importer *FileImporter) downloadAndApplyKVFile(
Length: 0,
IsDelete: file.Type == backuppb.FileType_Delete,
RestoreTs: restoreTs,
StartKey: regionInfo.Region.GetStartKey(),
EndKey: regionInfo.Region.GetEndKey(),
}

reqCtx := &kvrpcpb.Context{
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/storage/azblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,9 @@ func (s *AzureBlobStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func
if opt == nil {
opt = &WalkOption{}
}

if len(opt.ObjPrefix) != 0 {
return errors.New("azure storage not support ObjPrefix for now")
}
prefix := path.Join(s.options.Prefix, opt.SubDir)
if len(prefix) > 0 && !strings.HasSuffix(prefix, "/") {
prefix += "/"
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ func (s *gcsStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin
if opt == nil {
opt = &WalkOption{}
}
if len(opt.ObjPrefix) != 0 {
return errors.New("gcs storage not support ObjPrefix for now")
}

prefix := path.Join(s.gcs.Prefix, opt.SubDir)
if len(prefix) > 0 && !strings.HasSuffix(prefix, "/") {
Expand Down
8 changes: 8 additions & 0 deletions br/pkg/storage/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"os"
"path/filepath"
"strings"

"github.com/pingcap/errors"
)
Expand Down Expand Up @@ -56,6 +57,9 @@ func (l *LocalStorage) FileExists(ctx context.Context, name string) (bool, error
// function; the second argument is the size in byte of the file determined
// by path.
func (l *LocalStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error {
if opt == nil {
opt = &WalkOption{}
}
base := filepath.Join(l.base, opt.SubDir)
return filepath.Walk(base, func(path string, f os.FileInfo, err error) error {
if os.IsNotExist(err) {
Expand All @@ -73,6 +77,10 @@ func (l *LocalStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(str
// so use Rel to convert to relative path to l.base
path, _ = filepath.Rel(l.base, path)

if !strings.HasPrefix(path, opt.ObjPrefix) {
return nil
}

size := f.Size()
// if not a regular file, we need to use os.stat to get the real file size
if !f.Mode().IsRegular() {
Expand Down
11 changes: 11 additions & 0 deletions br/pkg/storage/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,15 @@ func TestWalkDirWithSoftLinkFile(t *testing.T) {
})
require.NoError(t, err)
require.Equal(t, 2, i)

names = []string{name2}
i = 0
err = store.WalkDir(context.TODO(), &WalkOption{ObjPrefix: "test.warehouse.1"}, func(path string, size int64) error {
require.Equal(t, names[i], path)
require.Equal(t, int64(len(data)), size)
i++
return nil
})
require.NoError(t, err)
require.Equal(t, 1, i)
}
5 changes: 5 additions & 0 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,11 @@ func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin
if len(prefix) > 0 && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}

if len(opt.ObjPrefix) != 0 {
prefix += opt.ObjPrefix
}

maxKeys := int64(1000)
if opt.ListCount > 0 {
maxKeys = opt.ListCount
Expand Down
31 changes: 30 additions & 1 deletion br/pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ func TestWalkDir(t *testing.T) {
}, nil
}).
After(thirdCall)
s.s3.EXPECT().
fifthCall := s.s3.EXPECT().
ListObjectsWithContext(ctx, gomock.Any()).
DoAndReturn(func(_ context.Context, input *s3.ListObjectsInput, opt ...request.Option) (*s3.ListObjectsOutput, error) {
require.Equal(t, aws.StringValue(contents[3].Key), aws.StringValue(input.Marker))
Expand All @@ -957,6 +957,20 @@ func TestWalkDir(t *testing.T) {
}, nil
}).
After(fourthCall)
s.s3.EXPECT().
ListObjectsWithContext(ctx, gomock.Any()).
DoAndReturn(func(_ context.Context, input *s3.ListObjectsInput, opt ...request.Option) (*s3.ListObjectsOutput, error) {
require.Equal(t, "bucket", aws.StringValue(input.Bucket))
require.Equal(t, "prefix/sp/1", aws.StringValue(input.Prefix))
require.Equal(t, "", aws.StringValue(input.Marker))
require.Equal(t, int64(3), aws.Int64Value(input.MaxKeys))
require.Equal(t, "", aws.StringValue(input.Delimiter))
return &s3.ListObjectsOutput{
IsTruncated: aws.Bool(false),
Contents: contents[2:],
}, nil
}).
After(fifthCall)

// Ensure we receive the items in order.
i := 0
Expand Down Expand Up @@ -987,6 +1001,21 @@ func TestWalkDir(t *testing.T) {
)
require.NoError(t, err)
require.Len(t, contents, i)

// Ensure we receive the items in order with prefix.
i = 2
err = s.storage.WalkDir(
ctx,
&WalkOption{SubDir: "sp", ObjPrefix: "1", ListCount: 3},
func(path string, size int64) error {
require.Equal(t, *contents[i].Key, "prefix/"+path, "index = %d", i)
require.Equal(t, *contents[i].Size, size, "index = %d", i)
i++
return nil
},
)
require.NoError(t, err)
require.Len(t, contents, i)
}

// TestWalkDirBucket checks WalkDir retrieves all directory content under a bucket.
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ const (
type WalkOption struct {
// walk on SubDir of specify directory
SubDir string
// ObjPrefix used fo prefix search in storage.
// it can save lots of time when we want find specify prefix objects in storage.
// For example. we have 10000 <Hash>.sst files and 10 backupmeta.(\d+) files.
// we can use ObjPrefix = "backupmeta" to retrieve all meta files quickly.
ObjPrefix string
// ListCount is the number of entries per page.
//
// In cloud storages such as S3 and GCS, the files listed and sent in pages.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,4 @@ replace github.com/pingcap/tidb/parser => ./parser
// fix potential security issue(CVE-2020-26160) introduced by indirect dependency.
replace github.com/dgrijalva/jwt-go => github.com/form3tech-oss/jwt-go v3.2.6-0.20210809144907-32ab6a8243d7+incompatible

replace github.com/pingcap/kvproto => github.com/pingcap/kvproto v0.0.0-20220222112015-bc0822b00b1d
replace github.com/pingcap/kvproto => github.com/pingcap/kvproto v0.0.0-20220307052005-13a8f820a4e0
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -567,8 +567,8 @@ github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 h1:Pe2LbxRmbTfAoKJ65bZL
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059/go.mod h1:fMRU1BA1y+r89AxUoaAar4JjrhUkVDt0o0Np6V8XbDQ=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20220222112015-bc0822b00b1d h1:Zg8MIlra7ASn8g8rHeG3cPv4bdQRMvOxAlG+S8l3AYg=
github.com/pingcap/kvproto v0.0.0-20220222112015-bc0822b00b1d/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220307052005-13a8f820a4e0 h1:Nk+eGhG8mWb0UCfMid5xy0PD6Je/dpn1xihykqoPIwQ=
github.com/pingcap/kvproto v0.0.0-20220307052005-13a8f820a4e0/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
Expand Down

0 comments on commit ce0b12c

Please sign in to comment.