Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage/dataflux): add dataflux interface #10748

Merged
merged 46 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
0e7d8ad
dataflux initial commit
akansha1812 Aug 22, 2024
b21866c
interface for range splitter
akansha1812 Aug 22, 2024
a3e269e
add a valid license header
akansha1812 Aug 22, 2024
863fa26
add a valid license header to all dataflux files
akansha1812 Aug 22, 2024
10dd4a8
Merge branch 'main' into main
akansha1812 Aug 22, 2024
78090cd
Merge branch 'googleapis:main' into main
akansha1812 Aug 26, 2024
03aecaa
resolved comments
akansha1812 Aug 26, 2024
d3056e0
update sequential listing comment
akansha1812 Aug 26, 2024
b48d583
add end-to-end sequential listing
akansha1812 Aug 26, 2024
e246c3f
Merge branch 'main' into main
akansha1812 Aug 27, 2024
b1e7712
adding basic integration test
akansha1812 Aug 27, 2024
39de969
Merge branch 'main' into main
akansha1812 Aug 27, 2024
db3ad80
Merge branch 'googleapis:main' into main
akansha1812 Aug 28, 2024
98b632f
Merge branch 'googleapis:main' into main
akansha1812 Sep 3, 2024
c7787fb
Merge branch 'main' into main
akansha1812 Sep 4, 2024
8313e50
Merge branch 'main' into main
akansha1812 Sep 4, 2024
f2b5dc3
Merge branch 'main' into main
akansha1812 Sep 11, 2024
c90c6fc
chore: update gapic-generator-go to 0.47.0 (#10848)
gcf-owl-bot[bot] Sep 11, 2024
39ee892
chore(main): release auth 0.9.4 (#10846)
release-please[bot] Sep 11, 2024
bb69df7
feat(firestore): Adding distance threshold and result field (#10802)
bhshkh Sep 11, 2024
0feb258
remove pagination within sequential listing
akansha1812 Sep 11, 2024
ab44946
Merge branch 'main' into main
akansha1812 Sep 11, 2024
8b147b2
remove nextBatch_all example
akansha1812 Sep 11, 2024
8f8b7ac
remove close function from NewLister return value
akansha1812 Sep 11, 2024
18cffa6
remove close function from NewLister return value
akansha1812 Sep 11, 2024
67bb404
change listing as method of Lister
akansha1812 Sep 13, 2024
4691742
Merge branch 'main' into main
akansha1812 Sep 13, 2024
4dc54ab
Merge branch 'googleapis:main' into main
akansha1812 Sep 13, 2024
38abf10
Merge branch 'googleapis:main' into main
akansha1812 Sep 13, 2024
ae91a8a
Merge branch 'googleapis:main' into main
akansha1812 Sep 16, 2024
206f47e
chore: update gapic-generator-go to 0.47.0 (#10848)
gcf-owl-bot[bot] Sep 11, 2024
2d586ea
chore: update gapic-generator-go to 0.47.0 (#10848)
gcf-owl-bot[bot] Sep 11, 2024
71cb718
chore: update gapic-generator-go to 0.47.0 (#10848)
gcf-owl-bot[bot] Sep 11, 2024
31aa908
add integration test for next batch
akansha1812 Sep 16, 2024
39c8fd2
Reset extra file
akansha1812 Sep 17, 2024
2c8faf4
Merge branch 'googleapis:main' into main
akansha1812 Sep 18, 2024
64fbc61
sequential list to return object instead of pointer
akansha1812 Sep 18, 2024
d28cda2
fetch 5000 objects from gcs
akansha1812 Sep 18, 2024
081b8a7
fetch 5000 objects from gcs
akansha1812 Sep 18, 2024
b84b324
make worker status unexported, round up batchsize comment
akansha1812 Sep 19, 2024
7b3441e
Merge branch 'main' into main
akansha1812 Sep 19, 2024
e1b7da8
Merge branch 'main' into main
akansha1812 Sep 19, 2024
ed1e505
Merge branch 'googleapis:main' into main
akansha1812 Sep 19, 2024
367172c
add comment to use no acl
akansha1812 Sep 19, 2024
af913aa
update with go mod tidy
akansha1812 Sep 19, 2024
df9384a
Merge branch 'main' into main
akansha1812 Sep 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
363 changes: 214 additions & 149 deletions aiplatform/apiv1beta1/aiplatformpb/vertex_rag_data.pb.go

Large diffs are not rendered by default.

18 changes: 11 additions & 7 deletions storage/dataflux/fast_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ type ListerInput struct {
// BucketName is the name of the bucket to list objects from. Required.
BucketName string

// Parallelism is number of parallel workers to use for listing. Optional.
// Parallelism is number of parallel workers to use for listing. Default value is 10x number of available CPU. Optional.
Parallelism int

// BatchSize is the number of objects to list. Optional.
// BatchSize is the number of objects to list. Default value returns all objects at once. Optional.
BatchSize int

// Query is the query to filter objects for listing. Optional.
// Query is the query to filter objects for listing. Default value is nil. Optional.
Query storage.Query

// SkipDirectoryObjects is to indicate whether to list directory objects. Optional.
// SkipDirectoryObjects is to indicate whether to list directory objects. Default value is false. Optional.
SkipDirectoryObjects bool
}

Expand Down Expand Up @@ -100,6 +100,8 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error)
var results []*storage.ObjectAttrs
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Errgroup takes care of running both methods in parallel. As soon as one of the method
// is complete, the running method also stops.
g, childCtx := errgroup.WithContext(ctx)
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved

// To start listing method is Open and runs both worksteal and sequential listing in parallel.
Expand All @@ -109,7 +111,7 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error)
if c.method != worksteal {

g.Go(func() error {
objects, nextToken, err := sequentialListing(childCtx, *c)
objects, nextToken, err := c.sequentialListing(childCtx)
if err != nil {
countError++
return fmt.Errorf("error in running sequential listing: %w", err)
Expand All @@ -128,9 +130,11 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error)
// Close all functions if either sequential listing or worksteal listing is complete.
err := g.Wait()

// If there is not context.Canceled, then return error.
// If the error is not context.Canceled, then return error instead of falling back
// to the other method. This is so that the error can be fixed and user can take
// advantage of fast-listing.
// As one of the listing method completes, it is expected to cancel context for the other method.
// If both sequential and worksteal listing fail due to context canceled, then return error.
// If both sequential and worksteal listing fail due to context canceled, only then return error.
if err != nil && (!errors.Is(err, context.Canceled) || countError > 1) {
return nil, fmt.Errorf("failed waiting for sequntial and work steal lister : %w", err)
}
Expand Down
50 changes: 43 additions & 7 deletions storage/dataflux/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ var (
// These buckets are shared amongst download tests. They are created,
// populated with objects and cleaned up in TestMain.
httpTestBucket = downloadTestBucket{}
grpcTestBucket = downloadTestBucket{}
)

func TestMain(m *testing.M) {
Expand All @@ -67,7 +66,7 @@ func TestMain(m *testing.M) {
}

// Lists the all the objects in the bucket.
func TestIntegration_NextBatch(t *testing.T) {
func TestIntegration_NextBatch_All(t *testing.T) {
if testing.Short() {
t.Skip("Integration tests skipped in short mode")
}
Expand All @@ -85,14 +84,55 @@ func TestIntegration_NextBatch(t *testing.T) {

objects, err := df.NextBatch(ctx)
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil && err != iterator.Done {
t.Errorf("df.NextBatch(: %v", err)
t.Errorf("df.NextBatch : %v", err)
}

if len(objects) != len(httpTestBucket.objects) {
t.Errorf("expected to receive %d results, got %d results", len(httpTestBucket.objects), len(objects))
}
}

func TestIntegration_NextBatch(t *testing.T) {
if testing.Short() {
t.Skip("Integration tests skipped in short mode")
}
const landsatBucket = "gcp-public-data-landsat"
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
const landsatPrefix = "LC08/01/001/00"
wantObjects := 17225
ctx := context.Background()
c, err := storage.NewClient(ctx)
if err != nil {
t.Fatalf("NewClient: %v", err)
}

in := &ListerInput{
BucketName: landsatBucket,
Query: storage.Query{Prefix: landsatPrefix},
BatchSize: 2000,
}

df := NewLister(c, in)
defer df.Close()
totalObjects := 0
for {
objects, err := df.NextBatch(ctx)
if err != nil && err != iterator.Done {
t.Errorf("df.NextBatch : %v", err)
}
totalObjects += len(objects)
if err == iterator.Done {
break
}
if len(objects) > in.BatchSize {
t.Errorf("expected to receive %d objects in each batch, got %d objects in a batch", in.BatchSize, len(objects))
}
}
if totalObjects != wantObjects {
t.Errorf("expected to receive %d objects in results, got %d objects in results", wantObjects, totalObjects)

}
}

// generateRandomFileInGCS uploads a file with random contents to GCS and returns
// the crc32c hash of the contents.
func generateFileInGCS(ctx context.Context, o *storage.ObjectHandle, size int64) (uint32, error) {
Expand Down Expand Up @@ -257,7 +297,3 @@ func (tb *downloadTestBucket) Cleanup() error {

return b.Delete(ctx)
}

func crc32c(b []byte) uint32 {
return crc32.Checksum(b, crc32.MakeTable(crc32.Castagnoli))
}
1 change: 0 additions & 1 deletion storage/dataflux/range_splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ type listRange struct {

// newRangeSplitter creates a new RangeSplitter with the given alphabets.
func newRangeSplitter(alphabet string) *rangeSplitter {

return &rangeSplitter{}
}

Expand Down
53 changes: 28 additions & 25 deletions storage/dataflux/sequential.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package dataflux
import (
"context"
"fmt"
"math"
"strings"

"cloud.google.com/go/storage"
Expand All @@ -26,47 +25,51 @@ import (

const (
// defaultPageSize specifies the number of object results to include on a single page.
defaultPageSize = 5000
defaultPageSize = 2000
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
)

// sequentialListing performs a sequential listing on the given bucket.
// It returns a list of objects and the next token to use to continue listing.
// If the next token is empty, then listing is complete.
func sequentialListing(ctx context.Context, opts Lister) ([]*storage.ObjectAttrs, string, error) {
func (c *Lister) sequentialListing(ctx context.Context) ([]*storage.ObjectAttrs, string, error) {
var result []*storage.ObjectAttrs
var objectsListed int
var lastToken string
objectIterator := c.bucket.Objects(ctx, &c.query)
objectIterator.PageInfo().Token = c.pageToken
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
objectIterator.PageInfo().MaxSize = defaultPageSize

objectIterator := opts.bucket.Objects(ctx, &opts.query)

var numObject int
if opts.batchSize < defaultPageSize {
numObject = defaultPageSize
} else {
numObject = int(math.Floor(float64(opts.batchSize)/float64(defaultPageSize))) * defaultPageSize
}

pageInfo := objectIterator.PageInfo()
pageInfo.MaxSize = defaultPageSize
pageInfo.Token = opts.pageToken

i := 0
for {
// If page size is set, then stop listing after numPageRequest.
if opts.batchSize > 0 && i >= numObject {
nextToken, err := doListing(objectIterator, &result, c.skipDirectoryObjects, &objectsListed)
if err != nil {
return nil, "", fmt.Errorf("failed while listing objects: %w", err)
}
lastToken = nextToken
if nextToken == "" || (c.batchSize > 0 && objectsListed >= c.batchSize) {
break
}
i++
c.pageToken = nextToken
}
return result, lastToken, nil
}

func doListing(objectIterator *storage.ObjectIterator, result *[]*storage.ObjectAttrs, skipDirectoryObjects bool, objectsListed *int) (string, error) {
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
for {
attrs, err := objectIterator.Next()
// When last item for the assigned range is listed, then stop listing.
*objectsListed++
// Stop listing when all the requested objects have been listed.
if err == iterator.Done {
break
}
if err != nil {
return nil, "", fmt.Errorf("iterating through objects %w", err)
return "", fmt.Errorf("iterating through objects %w", err)
}
if !(opts.skipDirectoryObjects && strings.HasSuffix(attrs.Name, "/")) {
result = append(result, attrs)
if !(skipDirectoryObjects && strings.HasSuffix(attrs.Name, "/")) {
*result = append(*result, attrs)
}
if objectIterator.PageInfo().Remaining() == 0 {
break
}
}
return result, objectIterator.PageInfo().Token, nil
return objectIterator.PageInfo().Token, nil
}
2 changes: 1 addition & 1 deletion storage/dataflux/worksteal.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type worker struct {

// workstealListing is the main entry point of the worksteal algorithm.
// It performs worksteal to achieve highly dynamic object listing.
func workstealListing(ctx context.Context, opts Lister) []*storage.ObjectAttrs {
func (c *Lister) workstealListing(ctx context.Context) []*storage.ObjectAttrs {
return nil
}

Expand Down