Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage/dataflux): increase test coverage #11093

Closed
wants to merge 62 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
3ab3831
test case for fast-list
akansha1812 Sep 25, 2024
559ae40
resolve comments
akansha1812 Sep 27, 2024
1cdc956
add object_lister and worksteal to fast_list
akansha1812 Oct 7, 2024
7eb0471
Merge branch 'googleapis:main' into main
akansha1812 Oct 7, 2024
1f05177
add unit tests with emulator
akansha1812 Oct 8, 2024
05b1515
Merge branch 'googleapis:main' into main
akansha1812 Oct 8, 2024
5bed09c
resolve PR errors
akansha1812 Oct 8, 2024
80054bf
reduce numobjects to resolve timeout error
akansha1812 Oct 8, 2024
e33adf5
reduce objects created for timeout error
akansha1812 Oct 8, 2024
b1b69b9
remove env variables for grpc and http
akansha1812 Oct 9, 2024
c2e56f8
run dataflux emulator tests
akansha1812 Oct 9, 2024
c78aaba
Merge branch 'main' into main
akansha1812 Oct 14, 2024
b714222
Merge branch 'main' into main
akansha1812 Oct 21, 2024
4028cb6
resolve comments
akansha1812 Oct 21, 2024
4851cc8
update ranges to nil when sequential listing is faster
akansha1812 Oct 21, 2024
0d494e0
default page size for seq listing is 5000
akansha1812 Oct 21, 2024
722e961
remove version enabled from TestDoSeqListingEmulated
akansha1812 Oct 21, 2024
f3c1571
increase emulator time
akansha1812 Oct 21, 2024
db1a757
make next_page more readable
akansha1812 Oct 22, 2024
56ab509
Merge branch 'main' into main
akansha1812 Oct 22, 2024
2b72b0e
to resolve race conditions
akansha1812 Oct 22, 2024
692e74e
rename goroutineID to id
akansha1812 Oct 22, 2024
a6cb0f4
Merge branch 'main' into main
akansha1812 Oct 23, 2024
1998873
move counter from beginning of the loop
akansha1812 Oct 25, 2024
7583e16
add mutex to error counter
akansha1812 Oct 25, 2024
4b1dcf0
emulator test for error counter and remove worker to track error from…
akansha1812 Oct 25, 2024
34f0840
Merge branch 'main' into main
akansha1812 Oct 25, 2024
564bbd3
test emulator error
akansha1812 Oct 25, 2024
ab830f6
grpc client returns context eror in desc
akansha1812 Oct 28, 2024
e14c31a
Merge branch 'main' into main
akansha1812 Oct 28, 2024
5af30c7
Merge branch 'main' into main
akansha1812 Oct 29, 2024
328f2b7
Merge branch 'main' into main
akansha1812 Oct 31, 2024
1934433
follow up test coverage
akansha1812 Oct 31, 2024
856c661
add unit tests
akansha1812 Nov 6, 2024
493037a
add unit tests
akansha1812 Nov 6, 2024
5529777
skip grpc
akansha1812 Nov 6, 2024
c6d1382
Merge branch 'main' into amaloo/followup
akansha1812 Nov 14, 2024
e708314
Merge branch 'main' into amaloo/followup
akansha1812 Nov 15, 2024
2bb40af
Merge branch 'main' into amaloo/followup
tritone Nov 18, 2024
4bfe01a
Merge branch 'main' into amaloo/followup
akansha1812 Nov 18, 2024
2989a3e
Merge branch 'googleapis:main' into amaloo/followup
akansha1812 Nov 19, 2024
37305ff
rename example test
akansha1812 Nov 14, 2024
5cc7f3e
rearrage lister methods
akansha1812 Nov 14, 2024
0a128f7
change close comment
akansha1812 Nov 14, 2024
25c2a11
rename context err counter
akansha1812 Nov 14, 2024
8147ecc
make var cc more descriptive
akansha1812 Nov 14, 2024
45708c6
reuse ctx var
akansha1812 Nov 14, 2024
a401461
update prefixAdjustedOffsets as per comment
akansha1812 Nov 14, 2024
2de2b3c
update prefixAdjustedOffsets as per comment
akansha1812 Nov 14, 2024
0dbb389
update comments
akansha1812 Nov 15, 2024
c747b10
switch case for open,worksteal and listing method
akansha1812 Nov 15, 2024
34e8d7b
typo
akansha1812 Nov 15, 2024
2d7e841
dynamic split
akansha1812 Nov 15, 2024
5e516f9
updated comments
akansha1812 Nov 15, 2024
0a91abd
update next_page to be more readable
akansha1812 Nov 18, 2024
2c53e45
update worksteal comments
akansha1812 Nov 18, 2024
1fafa6d
follow up test coverage
akansha1812 Oct 31, 2024
3d4e9eb
resolve comments
akansha1812 Nov 19, 2024
92cdb50
link docs for version listing
akansha1812 Nov 19, 2024
b1d7d98
update err msg
akansha1812 Nov 19, 2024
0202e26
update comment
akansha1812 Nov 19, 2024
6e0c251
update fast-list.go
akansha1812 Nov 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion storage/dataflux/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"google.golang.org/api/iterator"
)

func ExampleNextBatch_batch() {
func ExampleLister() {
ctx := context.Background()
// Pass in any client opts or set retry policy here.
client, err := storage.NewClient(ctx)
Expand Down
226 changes: 148 additions & 78 deletions storage/dataflux/fast_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"runtime"
"strings"
"sync"

"cloud.google.com/go/storage"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -62,6 +63,35 @@ type ListerInput struct {
SkipDirectoryObjects bool
}

// NewLister creates a new [Lister] that can be used to list objects in the given bucket.
func NewLister(c *storage.Client, in *ListerInput) *Lister {
bucket := c.Bucket(in.BucketName)

// If parallelism is not given, set default value to 10x the number of
// available CPU.
if in.Parallelism == 0 {
in.Parallelism = runtime.NumCPU() * 10
}
// Initialize range channel with entire namespace of object for given
// prefix, startoffset and endoffset. For the default range to list is
// entire namespace, start and end will be empty.
rangeChannel := make(chan *listRange, in.Parallelism*2)
start, end := prefixAdjustedOffsets(in.Query.StartOffset, in.Query.EndOffset, in.Query.Prefix)
rangeChannel <- &listRange{startRange: start, endRange: end}

lister := &Lister{
method: open,
parallelism: in.Parallelism,
pageToken: "",
bucket: bucket,
batchSize: in.BatchSize,
query: in.Query,
skipDirectoryObjects: in.SkipDirectoryObjects,
ranges: rangeChannel,
}
return lister
}

// Lister is used for interacting with Dataflux fast-listing. The caller should
// initialize it with NewLister() instead of creating it directly.
type Lister struct {
Expand Down Expand Up @@ -92,116 +122,156 @@ type Lister struct {
skipDirectoryObjects bool
}

// NewLister creates a new dataflux Lister to list objects in the give bucket.
func NewLister(c *storage.Client, in *ListerInput) *Lister {
bucket := c.Bucket(in.BucketName)

// If parallelism is not given, set default value to 10x the number of
// available CPU.
if in.Parallelism == 0 {
in.Parallelism = runtime.NumCPU() * 10
}
// Initialize range channel with entire namespace of object for given
// prefix, startoffset and endoffset. For the default range to list is
// entire namespace, start and end will be empty.
rangeChannel := make(chan *listRange, in.Parallelism*2)
start, end := updateStartEndOffset(in.Query.StartOffset, in.Query.EndOffset, in.Query.Prefix)
rangeChannel <- &listRange{startRange: start, endRange: end}

lister := &Lister{
method: open,
parallelism: in.Parallelism,
pageToken: "",
bucket: bucket,
batchSize: in.BatchSize,
query: in.Query,
skipDirectoryObjects: in.SkipDirectoryObjects,
ranges: rangeChannel,
}
return lister
}

// NextBatch runs worksteal algorithm and sequential listing in parallel to quickly
// return a list of objects in the bucket. For smaller dataset,
// sequential listing is expected to be faster. For larger dataset,
// NextBatch returns the next N objects in the bucket, where N is [ListerInput.BatchSize].
// In case of failure, all processes are stopped and an error is returned immediately. Create a new Lister to retry.
// For the first batch, both worksteal listing and sequential
// listing runs in parallel to quickly list N number of objects in the bucket. For subsequent
// batches, only the method which returned object faster in the first batch is used.
// For smaller dataset, sequential listing is expected to be faster. For larger dataset,
// worksteal listing is expected to be faster.
//
// Worksteal algorithm list objects in GCS bucket in parallel using multiple parallel
// workers and each worker in the list operation is able to steal work from its siblings
// once it has finished all currently slated listing work.
func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error) {
// countError tracks the number of failed listing methods.
countError := 0
var results []*storage.ObjectAttrs
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Errgroup takes care of running both methods in parallel. As soon as one of
// the method is complete, the running method also stops.
g, childCtx := errgroup.WithContext(ctx)

// To start listing method is Open and runs both worksteal and sequential listing
// in parallel. The method which completes first is used for all subsequent runs.

// TODO: Run worksteal listing when method is Open or WorkSteal.
var results []*storage.ObjectAttrs

// Run sequential listing when method is Open or Sequential.
if c.method != worksteal {
// For the first batch, listing method is open and runs both worksteal and sequential listing
// in parallel. The method which completes first is used for all subsequent NextBatch calls.
switch c.method {
case worksteal:
// Run worksteal algorithm for listing.
objects, err := c.workstealListing(ctx)
if err != nil {
return nil, fmt.Errorf("worksteal listing: %w", err)
}
results = objects
case sequential:
// Run GCS sequential listing.
objects, token, err := c.sequentialListing(ctx)
if err != nil {
return nil, fmt.Errorf("sequential listing: %w", err)
}
results = objects
c.pageToken = token
c.ranges = nil
case open:
// countError tracks the number of failed listing methods.
countErr := &countErr{counter: 0}

ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Errgroup takes care of running both methods in parallel. As soon as one of
// the method is complete, the running method also stops.
g, ctx := errgroup.WithContext(ctx)
wsCompletedfirst := false
seqCompletedfirst := false
var wsObjects []*storage.ObjectAttrs
var seqObjects []*storage.ObjectAttrs
var nextToken string
g.Go(func() error {
objects, err := c.workstealListing(ctx)
if err != nil {
countErr.increment()
return fmt.Errorf("worksteal listing: %w", err)
}
// Close context when worksteal listing is complete.
cancel()
wsCompletedfirst = true
wsObjects = objects

return nil
})
g.Go(func() error {
objects, nextToken, err := c.sequentialListing(childCtx)
objects, token, err := c.sequentialListing(ctx)
if err != nil {
countError++
return fmt.Errorf("error in running sequential listing: %w", err)
countErr.increment()
return fmt.Errorf("sequential listing: %w", err)
}
// If sequential listing completes first, set method to sequential listing
// and ranges to nil. The nextToken will be used to continue sequential listing.
results = objects
c.pageToken = nextToken
c.method = sequential
// Close context when sequential listing is complete.
cancel()
seqCompletedfirst = true
seqObjects = objects
nextToken = token

return nil
})
}

// Close all functions if either sequential listing or worksteal listing is complete.
err := g.Wait()

// If the error is not context.Canceled, then return error instead of falling back
// to the other method. This is so that the error can be fixed and user can take
// advantage of fast-listing.
// As one of the listing method completes, it is expected to cancel context for the
// only then return error. other method. If both sequential and worksteal listing
// fail due to context canceled, return error.
if err != nil && (!errors.Is(err, context.Canceled) || countError > 1) {
return nil, fmt.Errorf("failed waiting for sequntial and work steal lister : %w", err)
// Close all functions if either sequential listing or worksteal listing is complete.
err := g.Wait()

// If the error is not context.Canceled, then return error instead of falling back
// to the other method. This is so that the error can be fixed and user can take
// advantage of fast-listing.
// As one of the listing method completes, it is expected to cancel context and
// return context canceled error for the other method. Since context canceled is expected, it
// will not be considered an error. If both sequential and worksteal listing fail due
// to context canceled, then return error.
if err != nil && (!errors.Is(err, context.Canceled) || countErr.counter > 1) {
return nil, fmt.Errorf("dataflux : %w", err)
}
if wsCompletedfirst {
// If worksteal listing completes first, set method to worksteal listing and nextToken to "".
// The c.ranges channel will be used to continue worksteal listing.
results = wsObjects
c.pageToken = ""
c.method = worksteal
} else if seqCompletedfirst {
// If sequential listing completes first, set method to sequential listing
// and ranges to nil. The nextToken will be used to continue sequential listing.
results = seqObjects
c.pageToken = nextToken
c.method = sequential
c.ranges = nil
}
}

// If ranges for worksteal and pageToken for sequential listing is empty, then
// listing is complete.
if c.pageToken == "" {
if c.pageToken == "" && len(c.ranges) == 0 {
return results, iterator.Done
}
return results, nil
}

// Close closes the range channel of the Lister.
// Close is used to close the Lister.
func (c *Lister) Close() {
if c.ranges != nil {
close(c.ranges)
}
}

// updateStartEndOffset updates start and end offset based on prefix.
// If a prefix is given, adjust start and end value such that it lists
// objects with the given prefix. updateStartEndOffset assumes prefix will
// be added to the object name while listing objects in worksteal algorithm.
type countErr struct {
mu sync.Mutex
counter int
}

func (cc *countErr) increment() {
cc.mu.Lock()
defer cc.mu.Unlock()
cc.counter++
}

// prefixAdjustedOffsets returns a start and end offset adjusted from the given offsets based on the prefix, stripping the prefix.
// These offsets can be used by adding back the prefix, so that the original offsets do not need to be checked.

// This means that if the given offsets are out of range of the prefix
// (for example, offsets {start:"a", end: "b"}, with prefix "c" which is lexicographically
// outside of "a" to "b"), the returned offsets will ensure no strings fall in their range.

// Otherwise, if the offset is too permissive given the prefix, it returns an empty string
// to indicate there is no offset and all objects starting from or ending at the prefix should
// be listed.
//
// For example:
// start = "abc", end = "prefix_a", prefix = "prefix",
//
// end will change to "_a", prefix will be added in worksteal algorithm.
// "abc" is lexicographically smaller than "prefix". So start will be the first
// object with the given prefix.
// "abc" is lexicographically smaller than "prefix". The start offset indicates first
//
// Therefore start will change to ""(empty string) and end to "_a" .
func updateStartEndOffset(start, end, prefix string) (string, string) {
// object with the given prefix should be listed therefor start offset will be empty.
// The end offset will change to "_a" as the prefix is stripped.
// Therefore new offset will change to {start = "", end = "_a" }.
func prefixAdjustedOffsets(start, end, prefix string) (string, string) {
if prefix == "" {
return start, end
}
Expand Down
Loading
Loading