Skip to content

Commit

Permalink
fix(storage/dataflux): address deadlock when reading from ranges (#11303
Browse files Browse the repository at this point in the history
)

This change addresses an intermittent deadlock issue when reading from the range channel. The fix introduces a 200ms timeout on all reads off of range, opting to re-check for available work if the timeout is triggered. Since this timeout is only applied to reads off of the channel it should not introduce any new blocking behavior.
  • Loading branch information
jdnurme authored Dec 19, 2024
1 parent 5f985ab commit 32cbf56
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 11 deletions.
2 changes: 0 additions & 2 deletions storage/dataflux/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func TestMain(m *testing.M) {

// Lists the all the objects in the bucket.
func TestIntegration_NextBatch_All(t *testing.T) {
t.Skip("#11198")
if testing.Short() {
t.Skip("Integration tests skipped in short mode")
}
Expand All @@ -97,7 +96,6 @@ func TestIntegration_NextBatch_All(t *testing.T) {
}

func TestIntegration_NextBatch(t *testing.T) {
t.Skip("#11196")
// Accessing public bucket to list large number of files in batches.
// See https://cloud.google.com/storage/docs/public-datasets/landsat
if testing.Short() {
Expand Down
4 changes: 4 additions & 0 deletions storage/dataflux/range_splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ func (rs *rangeSplitter) convertStringRangeToMinimalIntRange(

// charPosition returns the index of the character in the alphabet set.
func (rs *rangeSplitter) charPosition(ch rune) (int, error) {
rs.mu.Lock() // Acquire the lock
defer rs.mu.Unlock() // Release the lock when the function exits
if idx, ok := rs.alphabetMap[ch]; ok {
return idx, nil
}
Expand All @@ -337,6 +339,8 @@ func (rs *rangeSplitter) charPosition(ch rune) (int, error) {
// convertRangeStringToArray transforms the range string into a rune slice while
// verifying the presence of each character in the alphabets.
func (rs *rangeSplitter) convertRangeStringToArray(rangeString string) ([]rune, error) {
rs.mu.Lock() // Acquire the lock
defer rs.mu.Unlock() // Release the lock when the function exits
for _, char := range rangeString {
if _, exists := rs.alphabetMap[char]; !exists {
return nil, fmt.Errorf("character %c in range string %q is not found in the alphabet array", char, rangeString)
Expand Down
13 changes: 5 additions & 8 deletions storage/dataflux/worksteal.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func (c *Lister) workstealListing(ctx context.Context) ([]*storage.ObjectAttrs,
if err != nil {
return nil, fmt.Errorf("creating new range splitter: %w", err)
}

g, ctx := errgroup.WithContext(ctx)
// Initialize all workers as idle.
for i := 0; i < c.parallelism; i++ {
Expand Down Expand Up @@ -126,13 +125,12 @@ func (w *worker) doWorkstealListing(ctx context.Context) error {
// If a worker is idle, sleep for a while before checking the next update.
// Worker status is changed to active when it finds work in range channel.
if w.status == idle {
if len(w.lister.ranges) == 0 {
time.Sleep(sleepDurationWhenIdle)
continue
} else {
newRange := <-w.lister.ranges
select {
case newRange := <-w.lister.ranges:
<-w.idleChannel
w.updateWorker(newRange.startRange, newRange.endRange, active)
case <-time.After(sleepDurationWhenIdle):
continue
}
}
// Active worker to list next page of objects within the range
Expand All @@ -153,7 +151,7 @@ func (w *worker) doWorkstealListing(ctx context.Context) error {

// If listing not complete and idle workers are available, split the range
// and give half of work to idle worker.
for len(w.idleChannel)-len(w.lister.ranges) > 0 && ctx.Err() == nil {
if len(w.idleChannel)-len(w.lister.ranges) > 0 && ctx.Err() == nil {
// Split range and upload half of work for idle worker.
splitPoint, err := w.rangesplitter.splitRange(w.startRange, w.endRange, 1)
if err != nil {
Expand Down Expand Up @@ -191,7 +189,6 @@ func (w *worker) shutDownSignal() bool {
w.result.mu.Unlock()

alreadyListedBatchSizeObjects := w.lister.batchSize > 0 && lenResult >= w.lister.batchSize

return noMoreObjects || alreadyListedBatchSizeObjects
}

Expand Down
1 change: 0 additions & 1 deletion storage/dataflux/worksteal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
)

func TestWorkstealListingEmulated(t *testing.T) {
t.Skip("https://github.com/googleapis/google-cloud-go/issues/11205")
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client *storage.Client) {

attrs := &storage.BucketAttrs{
Expand Down

0 comments on commit 32cbf56

Please sign in to comment.