Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage/dataflux): add dataflux interface #10748

Merged
merged 46 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
0e7d8ad
dataflux initial commit
akansha1812 Aug 22, 2024
b21866c
interface for range splitter
akansha1812 Aug 22, 2024
a3e269e
add a valid license header
akansha1812 Aug 22, 2024
863fa26
add a valid license header to all dataflux files
akansha1812 Aug 22, 2024
10dd4a8
Merge branch 'main' into main
akansha1812 Aug 22, 2024
78090cd
Merge branch 'googleapis:main' into main
akansha1812 Aug 26, 2024
03aecaa
resolved comments
akansha1812 Aug 26, 2024
d3056e0
update sequential listing comment
akansha1812 Aug 26, 2024
b48d583
add end-to-end sequential listing
akansha1812 Aug 26, 2024
e246c3f
Merge branch 'main' into main
akansha1812 Aug 27, 2024
b1e7712
adding basic integration test
akansha1812 Aug 27, 2024
39de969
Merge branch 'main' into main
akansha1812 Aug 27, 2024
db3ad80
Merge branch 'googleapis:main' into main
akansha1812 Aug 28, 2024
98b632f
Merge branch 'googleapis:main' into main
akansha1812 Sep 3, 2024
c7787fb
Merge branch 'main' into main
akansha1812 Sep 4, 2024
8313e50
Merge branch 'main' into main
akansha1812 Sep 4, 2024
f2b5dc3
Merge branch 'main' into main
akansha1812 Sep 11, 2024
c90c6fc
chore: update gapic-generator-go to 0.47.0 (#10848)
gcf-owl-bot[bot] Sep 11, 2024
39ee892
chore(main): release auth 0.9.4 (#10846)
release-please[bot] Sep 11, 2024
bb69df7
feat(firestore): Adding distance threshold and result field (#10802)
bhshkh Sep 11, 2024
0feb258
remove pagination within sequential listing
akansha1812 Sep 11, 2024
ab44946
Merge branch 'main' into main
akansha1812 Sep 11, 2024
8b147b2
remove nextBatch_all example
akansha1812 Sep 11, 2024
8f8b7ac
remove close function from NewLister return value
akansha1812 Sep 11, 2024
18cffa6
remove close function from NewLister return value
akansha1812 Sep 11, 2024
67bb404
change listing as method of Lister
akansha1812 Sep 13, 2024
4691742
Merge branch 'main' into main
akansha1812 Sep 13, 2024
4dc54ab
Merge branch 'googleapis:main' into main
akansha1812 Sep 13, 2024
38abf10
Merge branch 'googleapis:main' into main
akansha1812 Sep 13, 2024
ae91a8a
Merge branch 'googleapis:main' into main
akansha1812 Sep 16, 2024
206f47e
chore: update gapic-generator-go to 0.47.0 (#10848)
gcf-owl-bot[bot] Sep 11, 2024
2d586ea
chore: update gapic-generator-go to 0.47.0 (#10848)
gcf-owl-bot[bot] Sep 11, 2024
71cb718
chore: update gapic-generator-go to 0.47.0 (#10848)
gcf-owl-bot[bot] Sep 11, 2024
31aa908
add integration test for next batch
akansha1812 Sep 16, 2024
39c8fd2
Reset extra file
akansha1812 Sep 17, 2024
2c8faf4
Merge branch 'googleapis:main' into main
akansha1812 Sep 18, 2024
64fbc61
sequential list to return object instead of pointer
akansha1812 Sep 18, 2024
d28cda2
fetch 5000 objects from gcs
akansha1812 Sep 18, 2024
081b8a7
fetch 5000 objects from gcs
akansha1812 Sep 18, 2024
b84b324
make worker status unexported, round up batchsize comment
akansha1812 Sep 19, 2024
7b3441e
Merge branch 'main' into main
akansha1812 Sep 19, 2024
e1b7da8
Merge branch 'main' into main
akansha1812 Sep 19, 2024
ed1e505
Merge branch 'googleapis:main' into main
akansha1812 Sep 19, 2024
367172c
add comment to use no acl
akansha1812 Sep 19, 2024
af913aa
update with go mod tidy
akansha1812 Sep 19, 2024
df9384a
Merge branch 'main' into main
akansha1812 Sep 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions storage/dataflux/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Dataflux for Google Cloud Storage Go client library

## Overview
The purpose of this client is to quickly list data stored in GCS.

## Fast List
The fast list component of this client leverages GCS API to parallelize the listing of files within a GCS bucket. It does this by implementing a workstealing algorithm, where each worker in the list operation is able to steal work from its siblings once it has finished all currently stated listing work. This parallelization leads to a significant real world speed increase than sequential listing. Note that paralellization is limited by the machine on which the client runs.

Benchmarking has demonstrated that the larger the object count, the better Dataflux performs when compared to a linear listing. Around 100k objects, users will see improvemement in listing speed.

### Example Usage

First create a `storage.Client` to use throughout your application:

[snip]:# (storage-1)
```go
ctx := context.Background()
client, err := storage.NewClient(ctx)
if err != nil {
log.Fatal(err)
}
```

[snip]:# (storage-2)
```go

// storage.Query to filter objects that the user wants to list.
query := storage.Query{}
// Input for fast-listing.
dfopts := dataflux.ListerInput{
BucketName: "bucket",
Parallelism: 500,
BatchSize: 500000,
Query: query,
}

// Construct a dataflux lister.
df, close = dataflux.NewLister(sc, dfopts)
defer close()

// List objects in GCS bucket.
for {
objects, err := df.NextBatch(ctx)

if err == iterator.Done {
// No more objects in the bucket to list.
break
}
if err != nil {
log.Fatal(err)
}
// TODO: process objects
}
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
```

### Fast List Benchmark Results
VM used : n2d-standard-48
Region: us-central1-a
NIC type: gVNIC
|File Count|VM Core Count|List Time Without Dataflux |List Time With Dataflux|
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
|------------|-------------|--------------------------|-----------------------|
|5000000 Obj |48 Core |319.72s |17.35s |
|1999032 Obj |48 Core |139.54s |8.98s |
|578703 Obj |48 Core |32.90s |5.71s |
|10448 Obj |48 Core |750.50ms |637.17ms |
27 changes: 27 additions & 0 deletions storage/dataflux/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2024 Google LLC
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/*
Package dataflux provides an easy way to parallelize listing in Google
Cloud Storage.

More information about Google Cloud Storage is available at
https://cloud.google.com/storage/docs.

See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts,
connection pooling and similar aspects of this package.

NOTE: This package is in preview. It is not stable, and is likely to change.
*/
package dataflux // import "cloud.google.com/go/storage/dataflux"
69 changes: 69 additions & 0 deletions storage/dataflux/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dataflux_test

import (
"context"
"log"

"cloud.google.com/go/storage"
"cloud.google.com/go/storage/dataflux"
"google.golang.org/api/iterator"
)

func ExampleNextBatch_batch() {
ctx := context.Background()
// Pass in any client opts or set retry policy here.
client, err := storage.NewClient(ctx)
if err != nil {
// handle error
}

// Create dataflux fast-list input and provide desired options,
// including number of workers, batch size, query to filer objects, etc.
in := &dataflux.ListerInput{
BucketName: "mybucket",
// Optionally specify params to apply to lister.
Parallelism: 100,
BatchSize: 500000,
Query: storage.Query{},
SkipDirectoryObjects: false,
}

// Create Lister with desired options, including number of workers,
// part size, per operation timeout, etc.
df := dataflux.NewLister(client, in)
defer df.Close()

var numOfObjects int

for {
objects, err := df.NextBatch(ctx)
if err != nil {
// handle error
}

if err == iterator.Done {
numOfObjects += len(objects)
// No more objects in the bucket to list.
break
}
if err != nil {
// handle error
}
numOfObjects += len(objects)
}
log.Printf("listing %d objects in bucket %q is complete.", numOfObjects, in.BucketName)
}
153 changes: 153 additions & 0 deletions storage/dataflux/fast_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dataflux

import (
"context"
"errors"
"fmt"

"cloud.google.com/go/storage"
"golang.org/x/sync/errgroup"
"google.golang.org/api/iterator"
)

// listingMethod represents the method of listing.
type listingMethod int

const (
// open when any method can be used to list.
open listingMethod = iota
// sequential when the listing is done sequentially.
sequential
// worksteal when the listing is done using work stealing algorithm.
worksteal
)

// ListerInput contains options for listing objects.
type ListerInput struct {
// BucketName is the name of the bucket to list objects from. Required.
BucketName string

// Parallelism is number of parallel workers to use for listing. Default value is 10x number of available CPU. Optional.
Parallelism int

// BatchSize is the number of objects to list. Default value returns all objects at once. Optional.
BatchSize int

// Query is the query to filter objects for listing. Default value is nil. Optional.
Query storage.Query

// SkipDirectoryObjects is to indicate whether to list directory objects. Default value is false. Optional.
SkipDirectoryObjects bool
}

// Lister is used for interacting with Dataflux fast-listing.
// The caller should initialize it with NewLister() instead of creating it directly.
type Lister struct {
// method indicates the listing method(open, sequential, worksteal) to be used for listing.
method listingMethod

// pageToken is the token to use for sequential listing.
pageToken string

// bucket is the bucket handle to list objects from.
bucket *storage.BucketHandle

// batchSize is the number of objects to list.
batchSize int

// query is the query to filter objects for listing.
query storage.Query

// skipDirectoryObjects is to indicate whether to list directory objects.
skipDirectoryObjects bool
}

// NewLister creates a new dataflux Lister to list objects in the give bucket.
func NewLister(c *storage.Client, in *ListerInput) *Lister {
bucket := c.Bucket(in.BucketName)
lister := &Lister{
method: open,
pageToken: "",
bucket: bucket,
batchSize: in.BatchSize,
query: in.Query,
skipDirectoryObjects: in.SkipDirectoryObjects,
}
return lister
}

// NextBatch runs worksteal algorithm and sequential listing in parallel to quickly
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
// return a list of objects in the bucket. For smaller dataset,
// sequential listing is expected to be faster. For larger dataset,
// worksteal listing is expected to be faster.
func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error) {
// countError tracks the number of failed listing methods.
countError := 0
var results []*storage.ObjectAttrs
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Errgroup takes care of running both methods in parallel. As soon as one of the method
// is complete, the running method also stops.
g, childCtx := errgroup.WithContext(ctx)
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved

// To start listing method is Open and runs both worksteal and sequential listing in parallel.
// The method which completes first is used for all subsequent runs.
// TODO: Run worksteal listing when method is Open or WorkSteal.
// Run sequential listing when method is Open or Sequential.
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
if c.method != worksteal {

g.Go(func() error {
objects, nextToken, err := c.sequentialListing(childCtx)
if err != nil {
countError++
return fmt.Errorf("error in running sequential listing: %w", err)
}
// If sequential listing completes first, set method to sequential listing and ranges to nil.
// The nextToken will be used to continue sequential listing.
results = objects
c.pageToken = nextToken
c.method = sequential
// Close context when sequential listing is complete.
cancel()
return nil
})
}

// Close all functions if either sequential listing or worksteal listing is complete.
err := g.Wait()

// If the error is not context.Canceled, then return error instead of falling back
// to the other method. This is so that the error can be fixed and user can take
// advantage of fast-listing.
// As one of the listing method completes, it is expected to cancel context for the other method.
// If both sequential and worksteal listing fail due to context canceled, only then return error.
if err != nil && (!errors.Is(err, context.Canceled) || countError > 1) {
return nil, fmt.Errorf("failed waiting for sequntial and work steal lister : %w", err)
}

// If ranges for worksteal and pageToken for sequential listing is empty, then listing is complete.
if c.pageToken == "" {
return results, iterator.Done
}
return results, nil
}

// Close closes the range channel of the Lister.
func (c *Lister) Close() {

// TODO: Close range channel for worksteal lister.
}
Loading