From 26961eaf2ab4cef450c33b410b143a0e52bf2bb0 Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Fri, 28 Jan 2022 12:49:37 -0700 Subject: [PATCH 1/2] Fix deadlock when erroring writes are slow --- sdk/storage/azblob/blockblob/chunkwriting.go | 44 +++++++++++++++-- .../azblob/blockblob/chunkwriting_test.go | 47 ++++++++++++++++++- 2 files changed, 86 insertions(+), 5 deletions(-) diff --git a/sdk/storage/azblob/blockblob/chunkwriting.go b/sdk/storage/azblob/blockblob/chunkwriting.go index 0ed98c403281..0bcd4b3686f3 100644 --- a/sdk/storage/azblob/blockblob/chunkwriting.go +++ b/sdk/storage/azblob/blockblob/chunkwriting.go @@ -13,12 +13,12 @@ import ( "encoding/binary" "errors" "fmt" - "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/internal/shared" "io" "sync" "sync/atomic" "github.com/Azure/azure-sdk-for-go/sdk/internal/uuid" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/internal/shared" ) // blockWriter provides methods to upload blocks that represent a file to a server and commit them. @@ -188,9 +188,7 @@ func (c *copier) write(chunk copierChunk) { // close commits our blocks to blob storage and closes our writer. func (c *copier) close() error { - c.wg.Wait() - - if err := c.getErr(); err != nil { + if err := c.waitForFinish(); err != nil { return err } @@ -200,6 +198,44 @@ func (c *copier) close() error { return err } +// waitForFinish waits for all writes to complete while combining errors from errCh +func (c *copier) waitForFinish() error { + var err error + done := make(chan struct{}) + go func() { + // when write latencies are long, several errors might have occurred + // drain them all as we wait for writes to complete. + err = c.drainErrs(done) + }() + + c.wg.Wait() + close(done) + return err +} + +// drainErrs drains all outstanding errors from writes +func (c *copier) drainErrs(done chan struct{}) error { + var err error + for { + select { + case <-done: + return err + default: + if writeErr := c.getErr(); writeErr != nil { + err = combineErrs(err, writeErr) + } + } + } +} + +// combineErrs combines err with newErr so multiple errors can be represented +func combineErrs(err, newErr error) error { + if err == nil { + return newErr + } + return fmt.Errorf("%s, %w", err.Error(), newErr) +} + // id allows the creation of unique IDs based on UUID4 + an int32. This auto-increments. type id struct { u [64]byte diff --git a/sdk/storage/azblob/blockblob/chunkwriting_test.go b/sdk/storage/azblob/blockblob/chunkwriting_test.go index afbf7c0e63da..addc3fbd97be 100644 --- a/sdk/storage/azblob/blockblob/chunkwriting_test.go +++ b/sdk/storage/azblob/blockblob/chunkwriting_test.go @@ -30,6 +30,7 @@ type fakeBlockWriter struct { path string block int32 errOnBlock int32 + stageDelay time.Duration } func newFakeBlockWriter() *fakeBlockWriter { @@ -49,7 +50,12 @@ func newFakeBlockWriter() *fakeBlockWriter { func (f *fakeBlockWriter) StageBlock(_ context.Context, blockID string, body io.ReadSeekCloser, _ *StageBlockOptions) (StageBlockResponse, error) { n := atomic.AddInt32(&f.block, 1) - if n == f.errOnBlock { + + if f.stageDelay > 0 { + time.Sleep(f.stageDelay) + } + + if f.errOnBlock > -1 && n >= f.errOnBlock { return StageBlockResponse{}, io.ErrNoProgress } @@ -192,6 +198,45 @@ func TestGetErr(t *testing.T) { } } +func TestSlowDestCopyFrom(t *testing.T) { + p, err := createSrcFile(_1MiB + 500*1024) //This should cause 2 reads + if err != nil { + panic(err) + } + defer func(name string) { + _ = os.Remove(name) + }(p) + + from, err := os.Open(p) + if err != nil { + panic(err) + } + defer from.Close() + + br := newFakeBlockWriter() + defer br.cleanup() + + br.stageDelay = 200 * time.Millisecond + br.errOnBlock = 0 + + errs := make(chan error, 1) + go func() { + _, err := copyFromReader(context.Background(), from, br, UploadStreamOptions{}) + errs <- err + }() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + select { + case <-ctx.Done(): + failMsg := "TestSlowDestCopyFrom(slow writes shouldn't cause deadlock) failed: Context expired, copy deadlocked" + t.Error(failMsg) + case <-errs: + return + } +} + func TestCopyFromReader(t *testing.T) { t.Parallel() From a574b46ede63e501c6296c9b8118dc7ba0868103 Mon Sep 17 00:00:00 2001 From: Joel Hendrix Date: Wed, 21 Sep 2022 09:40:49 -0700 Subject: [PATCH 2/2] cancel copy on first error --- sdk/storage/azblob/blockblob/chunkwriting.go | 50 ++++---------------- 1 file changed, 9 insertions(+), 41 deletions(-) diff --git a/sdk/storage/azblob/blockblob/chunkwriting.go b/sdk/storage/azblob/blockblob/chunkwriting.go index 0bcd4b3686f3..16927ecf895e 100644 --- a/sdk/storage/azblob/blockblob/chunkwriting.go +++ b/sdk/storage/azblob/blockblob/chunkwriting.go @@ -181,14 +181,20 @@ func (c *copier) write(chunk copierChunk) { stageBlockOptions := c.o.getStageBlockOptions() _, err := c.to.StageBlock(c.ctx, chunk.id, shared.NopCloser(bytes.NewReader(chunk.buffer[:chunk.length])), stageBlockOptions) if err != nil { - c.errCh <- fmt.Errorf("write error: %w", err) - return + select { + case c.errCh <- err: + // failed to stage block, cancel the copy + default: + // don't block the goroutine if there's a pending error + } } } // close commits our blocks to blob storage and closes our writer. func (c *copier) close() error { - if err := c.waitForFinish(); err != nil { + c.wg.Wait() + + if err := c.getErr(); err != nil { return err } @@ -198,44 +204,6 @@ func (c *copier) close() error { return err } -// waitForFinish waits for all writes to complete while combining errors from errCh -func (c *copier) waitForFinish() error { - var err error - done := make(chan struct{}) - go func() { - // when write latencies are long, several errors might have occurred - // drain them all as we wait for writes to complete. - err = c.drainErrs(done) - }() - - c.wg.Wait() - close(done) - return err -} - -// drainErrs drains all outstanding errors from writes -func (c *copier) drainErrs(done chan struct{}) error { - var err error - for { - select { - case <-done: - return err - default: - if writeErr := c.getErr(); writeErr != nil { - err = combineErrs(err, writeErr) - } - } - } -} - -// combineErrs combines err with newErr so multiple errors can be represented -func combineErrs(err, newErr error) error { - if err == nil { - return newErr - } - return fmt.Errorf("%s, %w", err.Error(), newErr) -} - // id allows the creation of unique IDs based on UUID4 + an int32. This auto-increments. type id struct { u [64]byte