Skip to content

Commit

Permalink
Fix deadlock when erroring writes are slow (#16937)
Browse files Browse the repository at this point in the history
* Fix deadlock when erroring writes are slow

* cancel copy on first error

Co-authored-by: Joel Hendrix <[email protected]>
  • Loading branch information
MasslessParticle and jhendrixMSFT authored Sep 21, 2022
1 parent e248e40 commit 7e8768a
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 4 deletions.
10 changes: 7 additions & 3 deletions sdk/storage/azblob/blockblob/chunkwriting.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import (
"encoding/binary"
"errors"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/internal/shared"
"io"
"sync"
"sync/atomic"

"github.com/Azure/azure-sdk-for-go/sdk/internal/uuid"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/internal/shared"
)

// blockWriter provides methods to upload blocks that represent a file to a server and commit them.
Expand Down Expand Up @@ -181,8 +181,12 @@ func (c *copier) write(chunk copierChunk) {
stageBlockOptions := c.o.getStageBlockOptions()
_, err := c.to.StageBlock(c.ctx, chunk.id, shared.NopCloser(bytes.NewReader(chunk.buffer[:chunk.length])), stageBlockOptions)
if err != nil {
c.errCh <- fmt.Errorf("write error: %w", err)
return
select {
case c.errCh <- err:
// failed to stage block, cancel the copy
default:
// don't block the goroutine if there's a pending error
}
}
}

Expand Down
47 changes: 46 additions & 1 deletion sdk/storage/azblob/blockblob/chunkwriting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type fakeBlockWriter struct {
path string
block int32
errOnBlock int32
stageDelay time.Duration
}

func newFakeBlockWriter() *fakeBlockWriter {
Expand All @@ -49,7 +50,12 @@ func newFakeBlockWriter() *fakeBlockWriter {

func (f *fakeBlockWriter) StageBlock(_ context.Context, blockID string, body io.ReadSeekCloser, _ *StageBlockOptions) (StageBlockResponse, error) {
n := atomic.AddInt32(&f.block, 1)
if n == f.errOnBlock {

if f.stageDelay > 0 {
time.Sleep(f.stageDelay)
}

if f.errOnBlock > -1 && n >= f.errOnBlock {
return StageBlockResponse{}, io.ErrNoProgress
}

Expand Down Expand Up @@ -192,6 +198,45 @@ func TestGetErr(t *testing.T) {
}
}

func TestSlowDestCopyFrom(t *testing.T) {
p, err := createSrcFile(_1MiB + 500*1024) //This should cause 2 reads
if err != nil {
panic(err)
}
defer func(name string) {
_ = os.Remove(name)
}(p)

from, err := os.Open(p)
if err != nil {
panic(err)
}
defer from.Close()

br := newFakeBlockWriter()
defer br.cleanup()

br.stageDelay = 200 * time.Millisecond
br.errOnBlock = 0

errs := make(chan error, 1)
go func() {
_, err := copyFromReader(context.Background(), from, br, UploadStreamOptions{})
errs <- err
}()

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

select {
case <-ctx.Done():
failMsg := "TestSlowDestCopyFrom(slow writes shouldn't cause deadlock) failed: Context expired, copy deadlocked"
t.Error(failMsg)
case <-errs:
return
}
}

func TestCopyFromReader(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 7e8768a

Please sign in to comment.