Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

azfile: File upload methods #20631

Merged
merged 52 commits into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from 49 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
964e4d2
Adding share client
souravgupta-msft Feb 27, 2023
69446ae
More share client methods
souravgupta-msft Feb 28, 2023
bf3d82f
Adding tests for share client
souravgupta-msft Mar 3, 2023
0f48bdc
More tests
souravgupta-msft Mar 6, 2023
1456741
lint
souravgupta-msft Mar 6, 2023
dc37bbd
More tests
souravgupta-msft Mar 7, 2023
99256a6
Few changes
souravgupta-msft Mar 9, 2023
ecbd69b
directory client ctors
souravgupta-msft Mar 9, 2023
25aa885
Removing check for shared key
souravgupta-msft Mar 10, 2023
b3785f0
Merge branch 'sourav/shareClient' of https://github.com/Azure/azure-s…
souravgupta-msft Mar 10, 2023
4759643
Adding directory client apis
souravgupta-msft Mar 10, 2023
a72172b
Adding list files and directories api
souravgupta-msft Mar 14, 2023
fe3bfd7
Adding generate SAS method in directory client
souravgupta-msft Mar 14, 2023
a37d5d3
List and force close handles api
souravgupta-msft Mar 14, 2023
4e508c9
Adding tests for directory client
souravgupta-msft Mar 15, 2023
207f205
More tests
souravgupta-msft Mar 15, 2023
72fb334
Format check
souravgupta-msft Mar 16, 2023
03de239
Merge branch 'feature/azfile' of https://github.com/Azure/azure-sdk-f…
souravgupta-msft Mar 17, 2023
5dadfaf
FileSignatureValues to SignatureValues
souravgupta-msft Mar 17, 2023
c4f769a
Merge branch 'sourav/shareClient' of https://github.com/Azure/azure-s…
souravgupta-msft Mar 17, 2023
560e382
updating GetSASURL()
souravgupta-msft Mar 17, 2023
6df4890
Merge from feature branch
souravgupta-msft Mar 20, 2023
7ae34f5
file client ctors
souravgupta-msft Mar 20, 2023
a202b9a
Adding file client apis
souravgupta-msft Mar 20, 2023
97a0fe4
List and force close handles
souravgupta-msft Mar 21, 2023
d515768
File lease client
souravgupta-msft Mar 21, 2023
26df11e
Share lease client
souravgupta-msft Mar 21, 2023
49f3814
Adding tests
souravgupta-msft Mar 24, 2023
af21402
More tests
souravgupta-msft Mar 27, 2023
e0181a2
More tests
souravgupta-msft Mar 27, 2023
d620cd9
Few changes
souravgupta-msft Mar 28, 2023
cb306cd
Pull from feature branch
souravgupta-msft Mar 29, 2023
1e842b6
Lease client tests
souravgupta-msft Mar 29, 2023
32f9add
More lease client tests
souravgupta-msft Mar 29, 2023
d21f82f
Adding recordings
souravgupta-msft Mar 30, 2023
3d10edb
More recordings
souravgupta-msft Mar 30, 2023
d4da5b0
Update recordings
souravgupta-msft Mar 30, 2023
6913872
Adding recordings for directory client
souravgupta-msft Mar 30, 2023
530b8c7
Updating assets.json
souravgupta-msft Mar 30, 2023
c9a2aec
Adding recordings for share client
souravgupta-msft Mar 31, 2023
3207b8f
Adding recordings for service client
souravgupta-msft Mar 31, 2023
eece2da
Modifying options for CopyFileSMBInfo
souravgupta-msft Apr 4, 2023
293d84b
Adding recordings
souravgupta-msft Apr 4, 2023
65fd26e
Adding upload methods
souravgupta-msft Apr 5, 2023
209791c
Tests
souravgupta-msft Apr 11, 2023
279d149
UploadBuffer and UploadFile methods
souravgupta-msft Apr 11, 2023
7b36269
sync with feature branch
souravgupta-msft Apr 11, 2023
af73603
Adding UploadStream
souravgupta-msft Apr 13, 2023
f2963ea
source crc64 type update
souravgupta-msft Apr 13, 2023
fef6da8
PR comments
souravgupta-msft Apr 17, 2023
2947c04
Updating recordings
souravgupta-msft Apr 17, 2023
20b6b42
Review comment
souravgupta-msft Apr 25, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 189 additions & 0 deletions sdk/storage/azfile/file/chunkwriting.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
//go:build go1.18
// +build go1.18

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

package file

import (
"bytes"
"context"
"errors"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
"io"
"sync"
)

// chunkWriter provides methods to upload chunks that represent a file to a server.
// This allows us to provide a local implementation that fakes the server for hermetic testing.
type chunkWriter interface {
UploadRange(context.Context, int64, io.ReadSeekCloser, *UploadRangeOptions) (UploadRangeResponse, error)
}

// bufferManager provides an abstraction for the management of buffers.
// this is mostly for testing purposes, but does allow for different implementations without changing the algorithm.
type bufferManager[T ~[]byte] interface {
// Acquire returns the channel that contains the pool of buffers.
Acquire() <-chan T

// Release releases the buffer back to the pool for reuse/cleanup.
Release(T)

// Grow grows the number of buffers, up to the predefined max.
// It returns the total number of buffers or an error.
// No error is returned if the number of buffers has reached max.
// This is called only from the reading goroutine.
Grow() (int, error)

// Free cleans up all buffers.
Free()
}

// copyFromReader copies a source io.Reader to blob storage using concurrent uploads.
souravgupta-msft marked this conversation as resolved.
Show resolved Hide resolved
func copyFromReader[T ~[]byte](ctx context.Context, src io.Reader, dst chunkWriter, options UploadStreamOptions, getBufferManager func(maxBuffers int, bufferSize int64) bufferManager[T]) error {
options.setDefaults()

wg := sync.WaitGroup{} // Used to know when all outgoing chunks have finished processing
errCh := make(chan error, 1) // contains the first error encountered during processing
var err error

buffers := getBufferManager(options.Concurrency, options.ChunkSize)
defer buffers.Free()

// this controls the lifetime of the uploading goroutines.
// if an error is encountered, cancel() is called which will terminate all uploads.
// NOTE: the ordering is important here. cancel MUST execute before
// cleaning up the buffers so that any uploading goroutines exit first,
// releasing their buffers back to the pool for cleanup.
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// This goroutine grabs a buffer, reads from the stream into the buffer,
// then creates a goroutine to upload/stage the chunk.
for chunkNum := uint32(0); true; chunkNum++ {
var buffer T
select {
case buffer = <-buffers.Acquire():
// got a buffer
default:
// no buffer available; allocate a new buffer if possible
if _, err := buffers.Grow(); err != nil {
return err
}

// either grab the newly allocated buffer or wait for one to become available
buffer = <-buffers.Acquire()
}

var n int
n, err = io.ReadFull(src, buffer)

if n > 0 {
// some data was read, upload it
wg.Add(1) // We're posting a buffer to be sent

// NOTE: we must pass chunkNum as an arg to our goroutine else
// it's captured by reference and can change underneath us!
go func(chunkNum uint32) {
// Upload the outgoing chunk, matching the number of bytes read
offset := int64(chunkNum) * options.ChunkSize
uploadRangeOptions := options.getUploadRangeOptions()
_, err := dst.UploadRange(ctx, offset, streaming.NopCloser(bytes.NewReader(buffer[:n])), uploadRangeOptions)
if err != nil {
select {
case errCh <- err:
// error was set
default:
// some other error is already set
}
cancel()
}
buffers.Release(buffer) // The goroutine reading from the stream can reuse this buffer now

// signal that the chunk has been staged.
// we MUST do this after attempting to write to errCh
// to avoid it racing with the reading goroutine.
wg.Done()
}(chunkNum)
} else {
// nothing was read so the buffer is empty, send it back for reuse/clean-up.
buffers.Release(buffer)
}

if err != nil { // The reader is done, no more outgoing buffers
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
// these are expected errors, we don't surface those
err = nil
} else {
// some other error happened, terminate any outstanding uploads
cancel()
}
break
}
}

wg.Wait() // Wait for all outgoing chunks to complete

if err != nil {
// there was an error reading from src, favor this error over any error during staging
return err
}

select {
case err = <-errCh:
// there was an error during staging
return err
default:
// no error was encountered
}

// All chunks uploaded, return nil error
return nil
}

// mmbPool implements the bufferManager interface.
// it uses anonymous memory mapped files for buffers.
// don't use this type directly, use newMMBPool() instead.
type mmbPool struct {
buffers chan mmb
count int
max int
size int64
}

func newMMBPool(maxBuffers int, bufferSize int64) bufferManager[mmb] {
return &mmbPool{
buffers: make(chan mmb, maxBuffers),
max: maxBuffers,
size: bufferSize,
}
}

func (pool *mmbPool) Acquire() <-chan mmb {
return pool.buffers
}

func (pool *mmbPool) Grow() (int, error) {
if pool.count < pool.max {
buffer, err := newMMB(pool.size)
if err != nil {
return 0, err
}
pool.buffers <- buffer
pool.count++
}
return pool.count, nil
}

func (pool *mmbPool) Release(buffer mmb) {
pool.buffers <- buffer
}

func (pool *mmbPool) Free() {
for i := 0; i < pool.count; i++ {
buffer := <-pool.buffers
buffer.delete()
}
pool.count = 0
}
Loading