diff --git a/sdk/storage/azblob/blob/client.go b/sdk/storage/azblob/blob/client.go index c93db3041702..55de9b34958a 100644 --- a/sdk/storage/azblob/blob/client.go +++ b/sdk/storage/azblob/blob/client.go @@ -8,16 +8,16 @@ package blob import ( "context" - "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" - "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" "io" "os" "sync" "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" "github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/internal/base" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/internal/exported" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/internal/generated" @@ -324,8 +324,8 @@ func (b *Client) GetSASURL(permissions sas.BlobPermissions, expiry time.Time, o // Concurrent Download Functions ----------------------------------------------------------------------------------------- -// download downloads an Azure blob to a WriterAt in parallel. -func (b *Client) download(ctx context.Context, writer io.WriterAt, o downloadOptions) (int64, error) { +// downloadBuffer downloads an Azure blob to a WriterAt in parallel. +func (b *Client) downloadBuffer(ctx context.Context, writer io.WriterAt, o downloadOptions) (int64, error) { if o.BlockSize == 0 { o.BlockSize = DefaultDownloadBlockSize } @@ -353,6 +353,7 @@ func (b *Client) download(ctx context.Context, writer io.WriterAt, o downloadOpt OperationName: "downloadBlobToWriterAt", TransferSize: count, ChunkSize: o.BlockSize, + NumChunks: uint16(((count - 1) / o.BlockSize) + 1), Concurrency: o.Concurrency, Operation: func(ctx context.Context, chunkStart int64, count int64) error { downloadBlobOptions := o.getDownloadBlobOptions(HTTPRange{ @@ -391,6 +392,168 @@ func (b *Client) download(ctx context.Context, writer io.WriterAt, o downloadOpt return count, nil } +// downloadFile downloads an Azure blob to a Writer. The blocks are downloaded parallely, +// but written to file serially +func (b *Client) downloadFile(ctx context.Context, writer io.Writer, o downloadOptions) (int64, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + if o.BlockSize == 0 { + o.BlockSize = DefaultDownloadBlockSize + } + + if o.Concurrency == 0 { + o.Concurrency = DefaultConcurrency + } + + count := o.Range.Count + if count == CountToEnd { //Calculate size if not specified + gr, err := b.GetProperties(ctx, o.getBlobPropertiesOptions()) + if err != nil { + return 0, err + } + count = *gr.ContentLength - o.Range.Offset + } + + if count <= 0 { + // The file is empty, there is nothing to download. + return 0, nil + } + + progress := int64(0) + progressLock := &sync.Mutex{} + + // helper routine to get body + getBodyForRange := func(ctx context.Context, chunkStart, size int64) (io.ReadCloser, error) { + downloadBlobOptions := o.getDownloadBlobOptions(HTTPRange{ + Offset: chunkStart + o.Range.Offset, + Count: size, + }, nil) + dr, err := b.DownloadStream(ctx, downloadBlobOptions) + if err != nil { + return nil, err + } + + var body io.ReadCloser = dr.NewRetryReader(ctx, &o.RetryReaderOptionsPerBlock) + if o.Progress != nil { + rangeProgress := int64(0) + body = streaming.NewResponseProgress( + body, + func(bytesTransferred int64) { + diff := bytesTransferred - rangeProgress + rangeProgress = bytesTransferred + progressLock.Lock() + progress += diff + o.Progress(progress) + progressLock.Unlock() + }) + } + + return body, nil + } + + // if file fits in a single buffer, we'll download here. + if count <= o.BlockSize { + body, err := getBodyForRange(ctx, int64(0), count) + if err != nil { + return 0, err + } + defer body.Close() + + return io.Copy(writer, body) + } + + buffers := shared.NewMMBPool(int(o.Concurrency), o.BlockSize) + defer buffers.Free() + aquireBuffer := func() ([]byte, error) { + select { + case b := <-buffers.Acquire(): + // got a buffer + return b, nil + default: + // no buffer available; allocate a new buffer if possible + if _, err := buffers.Grow(); err != nil { + return nil, err + } + + // either grab the newly allocated buffer or wait for one to become available + return <-buffers.Acquire(), nil + } + } + + numChunks := uint16((count-1)/o.BlockSize) + 1 + blocks := make([]chan []byte, numChunks) + for b := range blocks { + blocks[b] = make(chan []byte) + } + + /* + * We have created as many channels as the number of chunks we have. + * Each downloaded block will be sent to the channel matching its + * sequece number, i.e. 0th block is sent to 0th channel, 1st block + * to 1st channel and likewise. The blocks are then read and written + * to the file serially by below goroutine. Do note that the blocks + * blocks are still downloaded parallelly from n/w, only serailized + * and written to file here. + */ + writerError := make(chan error) + go func(ch chan error) { + for _, block := range blocks { + select { + case <-ctx.Done(): + return + case block := <-block: + _, err := writer.Write(block) + buffers.Release(block) + if err != nil { + ch <- err + return + } + } + } + ch <- nil + }(writerError) + + // Prepare and do parallel download. + err := shared.DoBatchTransfer(ctx, &shared.BatchTransferOptions{ + OperationName: "downloadBlobToWriterAt", + TransferSize: count, + ChunkSize: o.BlockSize, + NumChunks: numChunks, + Concurrency: o.Concurrency, + Operation: func(ctx context.Context, chunkStart int64, count int64) error { + buff, err := aquireBuffer() + if err != nil { + return err + } + + body, err := getBodyForRange(ctx, chunkStart, count) + if err != nil { + buffers.Release(buff) + return nil + } + + _, err = io.ReadFull(body, buff[:count]) + body.Close() + if err != nil { + return err + } + + blockIndex := (chunkStart / o.BlockSize) + blocks[blockIndex] <- buff + return nil + }, + }) + + if err != nil { + return 0, err + } + // error from writer thread. + if err = <-writerError; err != nil { + return 0, err + } + return count, nil +} + // DownloadStream reads a range of bytes from a blob. The response also includes the blob's properties and metadata. // For more information, see https://docs.microsoft.com/rest/api/storageservices/get-blob. func (b *Client) DownloadStream(ctx context.Context, o *DownloadStreamOptions) (DownloadStreamResponse, error) { @@ -419,7 +582,7 @@ func (b *Client) DownloadBuffer(ctx context.Context, buffer []byte, o *DownloadB if o == nil { o = &DownloadBufferOptions{} } - return b.download(ctx, shared.NewBytesWriter(buffer), (downloadOptions)(*o)) + return b.downloadBuffer(ctx, shared.NewBytesWriter(buffer), (downloadOptions)(*o)) } // DownloadFile downloads an Azure blob to a local file. @@ -458,7 +621,7 @@ func (b *Client) DownloadFile(ctx context.Context, file *os.File, o *DownloadFil } if size > 0 { - return b.download(ctx, file, *do) + return b.downloadFile(ctx, file, *do) } else { // if the blob's size is 0, there is no need in downloading it return 0, nil } diff --git a/sdk/storage/azblob/blob/constants.go b/sdk/storage/azblob/blob/constants.go index 8a9107954381..4c34d26dfbd5 100644 --- a/sdk/storage/azblob/blob/constants.go +++ b/sdk/storage/azblob/blob/constants.go @@ -9,6 +9,7 @@ package blob import ( "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/internal/exported" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/internal/generated" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/internal/shared" ) const ( @@ -18,6 +19,9 @@ const ( // DefaultDownloadBlockSize is default block size DefaultDownloadBlockSize = int64(4 * 1024 * 1024) // 4MB + + // DefaultConcurrency is the default number of blocks downloaded or uploaded in parallel + DefaultConcurrency = shared.DefaultConcurrency ) // BlobType defines values for BlobType diff --git a/sdk/storage/azblob/blockblob/chunkwriting.go b/sdk/storage/azblob/blockblob/chunkwriting.go index 340d4bc76fbf..212255d4c66b 100644 --- a/sdk/storage/azblob/blockblob/chunkwriting.go +++ b/sdk/storage/azblob/blockblob/chunkwriting.go @@ -18,6 +18,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming" "github.com/Azure/azure-sdk-for-go/sdk/internal/uuid" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/internal/shared" ) // blockWriter provides methods to upload blocks that represent a file to a server and commit them. @@ -28,27 +29,8 @@ type blockWriter interface { CommitBlockList(context.Context, []string, *CommitBlockListOptions) (CommitBlockListResponse, error) } -// bufferManager provides an abstraction for the management of buffers. -// this is mostly for testing purposes, but does allow for different implementations without changing the algorithm. -type bufferManager[T ~[]byte] interface { - // Acquire returns the channel that contains the pool of buffers. - Acquire() <-chan T - - // Release releases the buffer back to the pool for reuse/cleanup. - Release(T) - - // Grow grows the number of buffers, up to the predefined max. - // It returns the total number of buffers or an error. - // No error is returned if the number of buffers has reached max. - // This is called only from the reading goroutine. - Grow() (int, error) - - // Free cleans up all buffers. - Free() -} - // copyFromReader copies a source io.Reader to blob storage using concurrent uploads. -func copyFromReader[T ~[]byte](ctx context.Context, src io.Reader, dst blockWriter, options UploadStreamOptions, getBufferManager func(maxBuffers int, bufferSize int64) bufferManager[T]) (CommitBlockListResponse, error) { +func copyFromReader[T ~[]byte](ctx context.Context, src io.Reader, dst blockWriter, options UploadStreamOptions, getBufferManager func(maxBuffers int, bufferSize int64) shared.BufferManager[T]) (CommitBlockListResponse, error) { options.setDefaults() wg := sync.WaitGroup{} // Used to know when all outgoing blocks have finished processing @@ -265,49 +247,3 @@ func (ubi uuidBlockID) WithBlockNumber(blockNumber uint32) uuidBlockID { func (ubi uuidBlockID) ToBase64() string { return blockID(ubi).ToBase64() } - -// mmbPool implements the bufferManager interface. -// it uses anonymous memory mapped files for buffers. -// don't use this type directly, use newMMBPool() instead. -type mmbPool struct { - buffers chan mmb - count int - max int - size int64 -} - -func newMMBPool(maxBuffers int, bufferSize int64) bufferManager[mmb] { - return &mmbPool{ - buffers: make(chan mmb, maxBuffers), - max: maxBuffers, - size: bufferSize, - } -} - -func (pool *mmbPool) Acquire() <-chan mmb { - return pool.buffers -} - -func (pool *mmbPool) Grow() (int, error) { - if pool.count < pool.max { - buffer, err := newMMB(pool.size) - if err != nil { - return 0, err - } - pool.buffers <- buffer - pool.count++ - } - return pool.count, nil -} - -func (pool *mmbPool) Release(buffer mmb) { - pool.buffers <- buffer -} - -func (pool *mmbPool) Free() { - for i := 0; i < pool.count; i++ { - buffer := <-pool.buffers - buffer.delete() - } - pool.count = 0 -} diff --git a/sdk/storage/azblob/blockblob/chunkwriting_test.go b/sdk/storage/azblob/blockblob/chunkwriting_test.go index 26f01f9a5180..363d3ae19248 100644 --- a/sdk/storage/azblob/blockblob/chunkwriting_test.go +++ b/sdk/storage/azblob/blockblob/chunkwriting_test.go @@ -16,6 +16,7 @@ import ( "testing" "time" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/internal/shared" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -115,7 +116,7 @@ func calcMD5(data []byte) string { // used to track proper acquisition and closing of buffers type bufMgrTracker struct { - inner bufferManager[mmb] + inner shared.BufferManager[shared.Mmb] Count int // total count of allocated buffers Freed bool // buffers were freed @@ -123,11 +124,11 @@ type bufMgrTracker struct { func newBufMgrTracker(maxBuffers int, bufferSize int64) *bufMgrTracker { return &bufMgrTracker{ - inner: newMMBPool(maxBuffers, bufferSize), + inner: shared.NewMMBPool(maxBuffers, bufferSize), } } -func (pool *bufMgrTracker) Acquire() <-chan mmb { +func (pool *bufMgrTracker) Acquire() <-chan shared.Mmb { return pool.inner.Acquire() } @@ -140,7 +141,7 @@ func (pool *bufMgrTracker) Grow() (int, error) { return n, nil } -func (pool *bufMgrTracker) Release(buffer mmb) { +func (pool *bufMgrTracker) Release(buffer shared.Mmb) { pool.inner.Release(buffer) } @@ -161,7 +162,7 @@ func TestSlowDestCopyFrom(t *testing.T) { errs := make(chan error, 1) go func() { - _, err := copyFromReader(context.Background(), bytes.NewReader(bigSrc), fakeBB, UploadStreamOptions{}, func(maxBuffers int, bufferSize int64) bufferManager[mmb] { + _, err := copyFromReader(context.Background(), bytes.NewReader(bigSrc), fakeBB, UploadStreamOptions{}, func(maxBuffers int, bufferSize int64) shared.BufferManager[shared.Mmb] { tracker = newBufMgrTracker(maxBuffers, bufferSize) return tracker }) @@ -270,7 +271,7 @@ func TestCopyFromReader(t *testing.T) { var tracker *bufMgrTracker - _, err := copyFromReader(test.ctx, bytes.NewReader(from), fakeBB, test.o, func(maxBuffers int, bufferSize int64) bufferManager[mmb] { + _, err := copyFromReader(test.ctx, bytes.NewReader(from), fakeBB, test.o, func(maxBuffers int, bufferSize int64) shared.BufferManager[shared.Mmb] { tracker = newBufMgrTracker(maxBuffers, bufferSize) return tracker }) @@ -322,7 +323,7 @@ func TestCopyFromReaderReadError(t *testing.T) { reader: bytes.NewReader(make([]byte, 5*_1MiB)), failOn: 2, } - _, err := copyFromReader(context.Background(), &rf, fakeBB, UploadStreamOptions{}, func(maxBuffers int, bufferSize int64) bufferManager[mmb] { + _, err := copyFromReader(context.Background(), &rf, fakeBB, UploadStreamOptions{}, func(maxBuffers int, bufferSize int64) shared.BufferManager[shared.Mmb] { tracker = newBufMgrTracker(maxBuffers, bufferSize) return tracker }) diff --git a/sdk/storage/azblob/blockblob/client.go b/sdk/storage/azblob/blockblob/client.go index eedf4c2362e6..8b542f85a8be 100644 --- a/sdk/storage/azblob/blockblob/client.go +++ b/sdk/storage/azblob/blockblob/client.go @@ -466,6 +466,7 @@ func (bb *Client) uploadFromReader(ctx context.Context, reader io.ReaderAt, actu OperationName: "uploadFromReader", TransferSize: actualSize, ChunkSize: o.BlockSize, + NumChunks: uint16(((actualSize - 1) / o.BlockSize) + 1), Concurrency: o.Concurrency, Operation: func(ctx context.Context, offset int64, chunkSize int64) error { // This function is called once per block. @@ -560,7 +561,7 @@ func (bb *Client) UploadStream(ctx context.Context, body io.Reader, o *UploadStr return UploadStreamResponse{}, bloberror.UnsupportedChecksum } - result, err := copyFromReader(ctx, body, bb, *o, newMMBPool) + result, err := copyFromReader(ctx, body, bb, *o, shared.NewMMBPool) if err != nil { return CommitBlockListResponse{}, err } diff --git a/sdk/storage/azblob/client_test.go b/sdk/storage/azblob/client_test.go index 579b4751eb92..79cb2f661aad 100644 --- a/sdk/storage/azblob/client_test.go +++ b/sdk/storage/azblob/client_test.go @@ -404,6 +404,8 @@ func performUploadAndDownloadFileTest(t *testing.T, _require *require.Assertions destBuffer = make([]byte, downloadCount) } + _, err = destFile.Seek(0, 0) + _require.NoError(err) n, err := destFile.Read(destBuffer) _require.NoError(err) @@ -662,9 +664,15 @@ func (s *AZBlobUnrecordedTestsSuite) TestBasicDoBatchTransfer() { totalSizeCount := int64(0) runCount := int64(0) + numChunks := uint16(0) + if test.chunkSize != 0 { + numChunks = uint16(((test.transferSize - 1) / test.chunkSize) + 1) + } + err := shared.DoBatchTransfer(ctx, &shared.BatchTransferOptions{ TransferSize: test.transferSize, ChunkSize: test.chunkSize, + NumChunks: numChunks, Concurrency: test.concurrency, Operation: func(ctx context.Context, offset int64, chunkSize int64) error { atomic.AddInt64(&totalSizeCount, chunkSize) @@ -707,6 +715,7 @@ func (s *AZBlobUnrecordedTestsSuite) TestDoBatchTransferWithError() { err := shared.DoBatchTransfer(ctx, &shared.BatchTransferOptions{ TransferSize: 5, ChunkSize: 1, + NumChunks: 5, Concurrency: 5, Operation: func(ctx context.Context, offset int64, chunkSize int64) error { // simulate doing some work (HTTP call in real scenarios) diff --git a/sdk/storage/azblob/internal/shared/batch_transfer.go b/sdk/storage/azblob/internal/shared/batch_transfer.go index ec5541bfbb13..c1b3a3d27296 100644 --- a/sdk/storage/azblob/internal/shared/batch_transfer.go +++ b/sdk/storage/azblob/internal/shared/batch_transfer.go @@ -11,10 +11,15 @@ import ( "errors" ) +const ( + DefaultConcurrency = 5 +) + // BatchTransferOptions identifies options used by doBatchTransfer. type BatchTransferOptions struct { TransferSize int64 ChunkSize int64 + NumChunks uint16 Concurrency uint16 Operation func(ctx context.Context, offset int64, chunkSize int64) error OperationName string @@ -28,13 +33,12 @@ func DoBatchTransfer(ctx context.Context, o *BatchTransferOptions) error { } if o.Concurrency == 0 { - o.Concurrency = 5 // default concurrency + o.Concurrency = DefaultConcurrency // default concurrency } // Prepare and do parallel operations. - numChunks := uint16(((o.TransferSize - 1) / o.ChunkSize) + 1) operationChannel := make(chan func() error, o.Concurrency) // Create the channel that release 'concurrency' goroutines concurrently - operationResponseChannel := make(chan error, numChunks) // Holds each response + operationResponseChannel := make(chan error, o.NumChunks) // Holds each response ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -50,10 +54,10 @@ func DoBatchTransfer(ctx context.Context, o *BatchTransferOptions) error { } // Add each chunk's operation to the channel. - for chunkNum := uint16(0); chunkNum < numChunks; chunkNum++ { + for chunkNum := uint16(0); chunkNum < o.NumChunks; chunkNum++ { curChunkSize := o.ChunkSize - if chunkNum == numChunks-1 { // Last chunk + if chunkNum == o.NumChunks-1 { // Last chunk curChunkSize = o.TransferSize - (int64(chunkNum) * o.ChunkSize) // Remove size of all transferred chunks from total } offset := int64(chunkNum) * o.ChunkSize @@ -65,7 +69,7 @@ func DoBatchTransfer(ctx context.Context, o *BatchTransferOptions) error { // Wait for the operations to complete. var firstErr error = nil - for chunkNum := uint16(0); chunkNum < numChunks; chunkNum++ { + for chunkNum := uint16(0); chunkNum < o.NumChunks; chunkNum++ { responseError := <-operationResponseChannel // record the first error (the original error which should cause the other chunks to fail with canceled context) if responseError != nil && firstErr == nil { diff --git a/sdk/storage/azblob/internal/shared/buffer_manager.go b/sdk/storage/azblob/internal/shared/buffer_manager.go new file mode 100644 index 000000000000..e3aa4a4886de --- /dev/null +++ b/sdk/storage/azblob/internal/shared/buffer_manager.go @@ -0,0 +1,70 @@ +//go:build go1.18 +// +build go1.18 + +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +package shared + +type BufferManager[T ~[]byte] interface { + // Acquire returns the channel that contains the pool of buffers. + Acquire() <-chan T + + // Release releases the buffer back to the pool for reuse/cleanup. + Release(T) + + // Grow grows the number of buffers, up to the predefined max. + // It returns the total number of buffers or an error. + // No error is returned if the number of buffers has reached max. + // This is called only from the reading goroutine. + Grow() (int, error) + + // Free cleans up all buffers. + Free() +} + +// mmbPool implements the bufferManager interface. +// it uses anonymous memory mapped files for buffers. +// don't use this type directly, use newMMBPool() instead. +type mmbPool struct { + buffers chan Mmb + count int + max int + size int64 +} + +func NewMMBPool(maxBuffers int, bufferSize int64) BufferManager[Mmb] { + return &mmbPool{ + buffers: make(chan Mmb, maxBuffers), + max: maxBuffers, + size: bufferSize, + } +} + +func (pool *mmbPool) Acquire() <-chan Mmb { + return pool.buffers +} + +func (pool *mmbPool) Grow() (int, error) { + if pool.count < pool.max { + buffer, err := NewMMB(pool.size) + if err != nil { + return 0, err + } + pool.buffers <- buffer + pool.count++ + } + return pool.count, nil +} + +func (pool *mmbPool) Release(buffer Mmb) { + pool.buffers <- buffer +} + +func (pool *mmbPool) Free() { + for i := 0; i < pool.count; i++ { + buffer := <-pool.buffers + buffer.Delete() + } + pool.count = 0 +} diff --git a/sdk/storage/azblob/blockblob/mmf_unix.go b/sdk/storage/azblob/internal/shared/mmf_unix.go similarity index 89% rename from sdk/storage/azblob/blockblob/mmf_unix.go rename to sdk/storage/azblob/internal/shared/mmf_unix.go index 5f7f4d828e64..cdcadf311607 100644 --- a/sdk/storage/azblob/blockblob/mmf_unix.go +++ b/sdk/storage/azblob/internal/shared/mmf_unix.go @@ -5,7 +5,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. -package blockblob +package shared import ( "fmt" @@ -14,20 +14,20 @@ import ( ) // mmb is a memory mapped buffer -type mmb []byte +type Mmb []byte // newMMB creates a new memory mapped buffer with the specified size -func newMMB(size int64) (mmb, error) { +func NewMMB(size int64) (Mmb, error) { prot, flags := syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE addr, err := syscall.Mmap(-1, 0, int(size), prot, flags) if err != nil { return nil, os.NewSyscallError("Mmap", err) } - return mmb(addr), nil + return Mmb(addr), nil } // delete cleans up the memory mapped buffer -func (m *mmb) delete() { +func (m *Mmb) Delete() { err := syscall.Munmap(*m) *m = nil if err != nil { diff --git a/sdk/storage/azblob/blockblob/mmf_windows.go b/sdk/storage/azblob/internal/shared/mmf_windows.go similarity index 82% rename from sdk/storage/azblob/blockblob/mmf_windows.go rename to sdk/storage/azblob/internal/shared/mmf_windows.go index 3f966d65b887..ef9fdc2a1f07 100644 --- a/sdk/storage/azblob/blockblob/mmf_windows.go +++ b/sdk/storage/azblob/internal/shared/mmf_windows.go @@ -4,7 +4,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. -package blockblob +package shared import ( "fmt" @@ -14,11 +14,11 @@ import ( "unsafe" ) -// mmb is a memory mapped buffer -type mmb []byte +// Mmb is a memory mapped buffer +type Mmb []byte -// newMMB creates a new memory mapped buffer with the specified size -func newMMB(size int64) (mmb, error) { +// NewMMB creates a new memory mapped buffer with the specified size +func NewMMB(size int64) (Mmb, error) { const InvalidHandleValue = ^uintptr(0) // -1 prot, access := uint32(syscall.PAGE_READWRITE), uint32(syscall.FILE_MAP_WRITE) @@ -35,7 +35,7 @@ func newMMB(size int64) (mmb, error) { return nil, os.NewSyscallError("MapViewOfFile", err) } - m := mmb{} + m := Mmb{} h := (*reflect.SliceHeader)(unsafe.Pointer(&m)) h.Data = addr h.Len = int(size) @@ -43,10 +43,10 @@ func newMMB(size int64) (mmb, error) { return m, nil } -// delete cleans up the memory mapped buffer -func (m *mmb) delete() { +// Delete cleans up the memory mapped buffer +func (m *Mmb) Delete() { addr := uintptr(unsafe.Pointer(&(([]byte)(*m)[0]))) - *m = mmb{} + *m = Mmb{} err := syscall.UnmapViewOfFile(addr) if err != nil { // if we get here, there is likely memory corruption.