Skip to content

Commit

Permalink
feat(storage/dataflux): add worksteal algorithm to fast-listing (#10913)
Browse files Browse the repository at this point in the history
  • Loading branch information
akansha1812 authored Sep 29, 2024
1 parent ece7426 commit 015b52c
Show file tree
Hide file tree
Showing 5 changed files with 483 additions and 31 deletions.
120 changes: 98 additions & 22 deletions storage/dataflux/fast_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"errors"
"fmt"
"runtime"
"strings"

"cloud.google.com/go/storage"
"golang.org/x/sync/errgroup"
Expand All @@ -41,53 +43,80 @@ type ListerInput struct {
// BucketName is the name of the bucket to list objects from. Required.
BucketName string

// Parallelism is number of parallel workers to use for listing. Default value is 10x number of available CPU. Optional.
// Parallelism is number of parallel workers to use for listing.
// Default value is 10x number of available CPU. Optional.
Parallelism int

// BatchSize is the number of objects to list. Default value returns all objects at once. Optional.
// The number of objects returned will be rounded up to a multiple of gcs page size.
// BatchSize is the number of objects to list. Default value returns
// all objects at once. The number of objects returned will be
// rounded up to a multiple of gcs page size. Optional.
BatchSize int

// Query is the query to filter objects for listing. Default value is nil. Optional.
// Use ProjectionNoACL for faster listing. Including ACLs increases latency while fetching objects.
// Query is the query to filter objects for listing. Default value is nil.
// Use ProjectionNoACL for faster listing. Including ACLs increases
// latency while fetching objects. Optional.
Query storage.Query

// SkipDirectoryObjects is to indicate whether to list directory objects. Default value is false. Optional.
// SkipDirectoryObjects is to indicate whether to list directory objects.
// Default value is false. Optional.
SkipDirectoryObjects bool
}

// Lister is used for interacting with Dataflux fast-listing.
// The caller should initialize it with NewLister() instead of creating it directly.
// Lister is used for interacting with Dataflux fast-listing. The caller should
// initialize it with NewLister() instead of creating it directly.
type Lister struct {
// method indicates the listing method(open, sequential, worksteal) to be used for listing.
// method indicates the listing method(open, sequential, worksteal) to
// be used for listing.
method listingMethod

// pageToken is the token to use for sequential listing.
pageToken string

// bucket is the bucket handle to list objects from.
bucket *storage.BucketHandle

// batchSize is the number of objects to list.
batchSize int

// parallelism is number of parallel workers to use for listing.
parallelism int

// query is the query to filter objects for listing.
query storage.Query

// pageToken is the token to use for sequential listing.
pageToken string

// ranges is the channel to store the start and end ranges to be listed
// by the workers in worksteal listing.
ranges chan *listRange

// skipDirectoryObjects is to indicate whether to list directory objects.
skipDirectoryObjects bool
}

// NewLister creates a new dataflux Lister to list objects in the give bucket.
func NewLister(c *storage.Client, in *ListerInput) *Lister {
bucket := c.Bucket(in.BucketName)

// If parallelism is not given, set default value to 10x the number of
// available CPU.
if in.Parallelism == 0 {
in.Parallelism = runtime.NumCPU() * 10
}
// Initialize range channel with entire namespace of object for given
// prefix, startoffset and endoffset. For the default range to list is
// entire namespace, start and end will be empty.
rangeChannel := make(chan *listRange, in.Parallelism*2)
start, end := updateStartEndOffset(in.Query.StartOffset, in.Query.EndOffset, in.Query.Prefix)
rangeChannel <- &listRange{startRange: start, endRange: end}

lister := &Lister{
method: open,
parallelism: in.Parallelism,
pageToken: "",
bucket: bucket,
batchSize: in.BatchSize,
query: in.Query,
skipDirectoryObjects: in.SkipDirectoryObjects,
ranges: rangeChannel,
}
return lister
}
Expand All @@ -102,13 +131,15 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error)
var results []*storage.ObjectAttrs
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Errgroup takes care of running both methods in parallel. As soon as one of the method
// is complete, the running method also stops.
// Errgroup takes care of running both methods in parallel. As soon as one of
// the method is complete, the running method also stops.
g, childCtx := errgroup.WithContext(ctx)

// To start listing method is Open and runs both worksteal and sequential listing in parallel.
// The method which completes first is used for all subsequent runs.
// To start listing method is Open and runs both worksteal and sequential listing
// in parallel. The method which completes first is used for all subsequent runs.

// TODO: Run worksteal listing when method is Open or WorkSteal.

// Run sequential listing when method is Open or Sequential.
if c.method != worksteal {

Expand All @@ -118,8 +149,8 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error)
countError++
return fmt.Errorf("error in running sequential listing: %w", err)
}
// If sequential listing completes first, set method to sequential listing and ranges to nil.
// The nextToken will be used to continue sequential listing.
// If sequential listing completes first, set method to sequential listing
// and ranges to nil. The nextToken will be used to continue sequential listing.
results = objects
c.pageToken = nextToken
c.method = sequential
Expand All @@ -135,13 +166,15 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error)
// If the error is not context.Canceled, then return error instead of falling back
// to the other method. This is so that the error can be fixed and user can take
// advantage of fast-listing.
// As one of the listing method completes, it is expected to cancel context for the other method.
// If both sequential and worksteal listing fail due to context canceled, only then return error.
// As one of the listing method completes, it is expected to cancel context for the
// only then return error. other method. If both sequential and worksteal listing
// fail due to context canceled, return error.
if err != nil && (!errors.Is(err, context.Canceled) || countError > 1) {
return nil, fmt.Errorf("failed waiting for sequntial and work steal lister : %w", err)
}

// If ranges for worksteal and pageToken for sequential listing is empty, then listing is complete.
// If ranges for worksteal and pageToken for sequential listing is empty, then
// listing is complete.
if c.pageToken == "" {
return results, iterator.Done
}
Expand All @@ -150,6 +183,49 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error)

// Close closes the range channel of the Lister.
func (c *Lister) Close() {
if c.ranges != nil {
close(c.ranges)
}
}

// TODO: Close range channel for worksteal lister.
// updateStartEndOffset updates start and end offset based on prefix.
// If a prefix is given, adjust start and end value such that it lists
// objects with the given prefix. updateStartEndOffset assumes prefix will
// be added to the object name while listing objects in worksteal algorithm.
//
// For example:
// start = "abc", end = "prefix_a", prefix = "prefix",
//
// end will change to "_a", prefix will be added in worksteal algorithm.
// "abc" is lexicographically smaller than "prefix". So start will be the first
// object with the given prefix.
//
// Therefore start will change to ""(empty string) and end to "_a" .
func updateStartEndOffset(start, end, prefix string) (string, string) {
if prefix == "" {
return start, end
}
if start != "" && end != "" && start >= end {
return start, start
}
if start != "" {
if start <= prefix {
start = ""
} else if strings.HasPrefix(start, prefix) {
start = start[len(prefix):]
} else {
return start, start
}
}

if end != "" {
if len(end) > len(prefix) && strings.HasPrefix(end, prefix) {
end = end[len(prefix):]
} else if end > prefix {
end = ""
} else {
return end, end
}
}
return start, end
}
194 changes: 194 additions & 0 deletions storage/dataflux/fast_list_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dataflux

import (
"runtime"
"testing"

"cloud.google.com/go/storage"
)

func TestUpdateStartEndOffset(t *testing.T) {
testcase := []struct {
desc string
start string
end string
prefix string
wantStart string
wantEnd string
}{
// List all objects with the given prefix.
{
desc: "start and end are empty",
start: "",
end: "",
prefix: "pre",
wantStart: "",
wantEnd: "",
},
{
desc: "start is longer and lexicographically before prefix",
start: "abcqre",
end: "",
prefix: "pre",
wantStart: "",
wantEnd: "",
},
{
desc: "start value same as prefix",
start: "pre",
end: "",
prefix: "pre",
wantStart: "",
wantEnd: "",
},
{
desc: "lexicographically start comes before prefix and end after prefix",
start: "abc",
end: "xyz",
prefix: "pre",
wantStart: "",
wantEnd: "",
},
// List bounded objects within the given prefix.
{
desc: "start value contains prefix",
start: "pre_a",
end: "",
prefix: "pre",
wantStart: "_a",
wantEnd: "",
},
{
desc: "end value contains prefix",
start: "",
end: "pre_x",
prefix: "pre",
wantStart: "",
wantEnd: "_x",
},
// With empty prefix, start and end will not be affected.
{
desc: "prefix is empty",
start: "abc",
end: "xyz",
prefix: "",
wantStart: "abc",
wantEnd: "xyz",
},
{
desc: "start is lexicographically higher than end",
start: "xyz",
end: "abc",
prefix: "",
wantStart: "xyz",
wantEnd: "abc",
},
// Cases where no objects will be listed when prefix is given.
{
desc: "end is same as prefix",
start: "",
end: "pre",
prefix: "pre",
wantStart: "pre",
wantEnd: "pre",
},
{
desc: "start is lexicographically higher than end with prefix",
start: "xyz",
end: "abc",
prefix: "pre",
wantStart: "xyz",
wantEnd: "xyz",
},
{
desc: "start is lexicographically higher than prefix",
start: "xyz",
end: "",
prefix: "pre",
wantStart: "xyz",
wantEnd: "xyz",
},
}

for _, tc := range testcase {
t.Run(tc.desc, func(t *testing.T) {
gotStart, gotEnd := updateStartEndOffset(tc.start, tc.end, tc.prefix)
if gotStart != tc.wantStart || gotEnd != tc.wantEnd {
t.Errorf("updateStartEndOffset(%q, %q, %q) got = (%q, %q), want = (%q, %q)", tc.start, tc.end, tc.prefix, gotStart, gotEnd, tc.wantStart, tc.wantEnd)
}
})
}
}

func TestNewLister(t *testing.T) {
gcs := &storage.Client{}
bucketName := "test-bucket"
testcase := []struct {
desc string
query storage.Query
parallelism int
wantStart string
wantEnd string
wantParallelism int
}{
{
desc: "start and end are empty",
query: storage.Query{Prefix: "pre"},
parallelism: 1,
wantStart: "",
wantEnd: "",
wantParallelism: 1,
},
{
desc: "start is longer than prefix",
query: storage.Query{Prefix: "pre", StartOffset: "pre_a"},
parallelism: 1,
wantStart: "_a",
wantEnd: "",
wantParallelism: 1,
},
{
desc: "start and end are empty",
query: storage.Query{Prefix: "pre"},
parallelism: 0,
wantStart: "",
wantEnd: "",
wantParallelism: 10 * runtime.NumCPU(),
},
}

for _, tc := range testcase {
t.Run(tc.desc, func(t *testing.T) {
in := ListerInput{
BucketName: bucketName,
BatchSize: 0,
Query: tc.query,
Parallelism: tc.parallelism,
}
df := NewLister(gcs, &in)
defer df.Close()
if len(df.ranges) != 1 {
t.Errorf("NewLister(%v, %v %v, %v) got len of ranges = %v, want = %v", bucketName, 1, 0, tc.query, len(df.ranges), 1)
}
ranges := <-df.ranges
if df.method != open || df.pageToken != "" || ranges.startRange != tc.wantStart || ranges.endRange != tc.wantEnd || df.parallelism != tc.wantParallelism {
t.Errorf("NewLister(%q, %d, %d, %v) got = (method: %v, token: %q, start: %q, end: %q, parallelism: %d), want = (method: %v, token: %q, start: %q, end: %q, parallelism: %d)", bucketName, 1, 0, tc.query, df.method, df.pageToken, ranges.startRange, ranges.endRange, df.parallelism, open, "", tc.wantStart, tc.wantEnd, tc.wantParallelism)
}

})
}
}
Loading

0 comments on commit 015b52c

Please sign in to comment.