Skip to content

Commit

Permalink
[Storage][DataMovement] Refactor/add bounds to CommitChunkHandler (#4…
Browse files Browse the repository at this point in the history
  • Loading branch information
jalauzon-msft authored Dec 4, 2024
1 parent 40f2b24 commit fa515eb
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 422 deletions.
173 changes: 39 additions & 134 deletions sdk/storage/Azure.Storage.DataMovement/src/CommitChunkHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,14 @@
// Licensed under the MIT License.

using System;
using Azure.Core;
using System.Threading.Tasks;
using System.Threading;
using System.Threading.Channels;
using Azure.Core.Pipeline;
using Azure.Storage.Common;

namespace Azure.Storage.DataMovement
{
internal class CommitChunkHandler : IDisposable
{
// Indicates whether the current thread is processing stage chunks.
private static Task _processStageChunkEvents;

#region Delegate Definitions
public delegate Task QueuePutBlockTaskInternal(long offset, long blockSize, long expectedLength, StorageResourceItemProperties properties);
public delegate Task QueueCommitBlockTaskInternal(StorageResourceItemProperties sourceProperties);
Expand All @@ -36,29 +30,24 @@ public struct Behaviors
public InvokeFailedEventHandlerInternal InvokeFailedHandler { get; set; }
}

private event SyncAsyncEventHandler<StageChunkEventArgs> _commitBlockHandler;
internal SyncAsyncEventHandler<StageChunkEventArgs> GetCommitBlockHandler() => _commitBlockHandler;

/// <summary>
/// Create channel of <see cref="StageChunkEventArgs"/> to keep track of that are
/// Create channel of <see cref="QueueStageChunkArgs"/> to keep track of that are
/// waiting to update the bytesTransferred and other required operations.
/// </summary>
private readonly Channel<StageChunkEventArgs> _stageChunkChannel;
private CancellationToken _cancellationToken;
private readonly IProcessor<QueueStageChunkArgs> _stageChunkProcessor;
private readonly CancellationToken _cancellationToken;

private long _bytesTransferred;
private readonly long _expectedLength;
private readonly long _blockSize;
private readonly DataTransferOrder _transferOrder;
private readonly ClientDiagnostics _clientDiagnostics;
private readonly StorageResourceItemProperties _sourceProperties;

public CommitChunkHandler(
long expectedLength,
long blockSize,
Behaviors behaviors,
DataTransferOrder transferOrder,
ClientDiagnostics clientDiagnostics,
StorageResourceItemProperties sourceProperties,
CancellationToken cancellationToken)
{
Expand All @@ -67,7 +56,14 @@ public CommitChunkHandler(
throw Errors.InvalidExpectedLength(expectedLength);
}
Argument.AssertNotNull(behaviors, nameof(behaviors));
Argument.AssertNotNull(clientDiagnostics, nameof(clientDiagnostics));

_cancellationToken = cancellationToken;
// Set bytes transferred to block size because we transferred the initial block
_bytesTransferred = blockSize;
_expectedLength = expectedLength;
_blockSize = blockSize;
_transferOrder = transferOrder;
_sourceProperties = sourceProperties;

_queuePutBlockTask = behaviors.QueuePutBlockTask
?? throw Errors.ArgumentNull(nameof(behaviors.QueuePutBlockTask));
Expand All @@ -78,152 +74,61 @@ public CommitChunkHandler(
_invokeFailedEventHandler = behaviors.InvokeFailedHandler
?? throw Errors.ArgumentNull(nameof(behaviors.InvokeFailedHandler));

// Set expected length to perform commit task
_expectedLength = expectedLength;

// Create channel of finished Stage Chunk Args to update the bytesTransferred
// and for ending tasks like commit block.
// The size of the channel should never exceed 50k (limit on blocks in a block blob).
// and that's in the worst case that we never read from the channel and had a maximum chunk blob.
_stageChunkChannel = Channel.CreateUnbounded<StageChunkEventArgs>(
new UnboundedChannelOptions()
{
// Single reader is required as we can only read and write to bytesTransferred value
SingleReader = true,
});
_cancellationToken = cancellationToken;

// Set bytes transferred to block size because we transferred the initial block
_bytesTransferred = blockSize;

_processStageChunkEvents = Task.Run(() => NotifyOfPendingStageChunkEvents());

_blockSize = blockSize;
_transferOrder = transferOrder;
if (_transferOrder == DataTransferOrder.Sequential)
{
_commitBlockHandler += SequentialBlockEvent;
}
_commitBlockHandler += ConcurrentBlockEvent;
_clientDiagnostics = clientDiagnostics;
_sourceProperties = sourceProperties;
_stageChunkProcessor = ChannelProcessing.NewProcessor<QueueStageChunkArgs>(
readers: 1,
capacity: DataMovementConstants.Channels.StageChunkCapacity);
_stageChunkProcessor.Process = ProcessCommitRange;
}

public void Dispose()
{
// We no longer have to read from the channel. We are not expecting any more requests.
_stageChunkChannel.Writer.TryComplete();
DisposeHandlers();
_stageChunkProcessor.TryComplete();
}

private void DisposeHandlers()
public async ValueTask QueueChunkAsync(QueueStageChunkArgs args)
{
if (_transferOrder == DataTransferOrder.Sequential)
{
_commitBlockHandler -= SequentialBlockEvent;
}
_commitBlockHandler -= ConcurrentBlockEvent;
await _stageChunkProcessor.QueueAsync(args).ConfigureAwait(false);
}

private async Task ConcurrentBlockEvent(StageChunkEventArgs args)
private async Task ProcessCommitRange(QueueStageChunkArgs args, CancellationToken cancellationToken = default)
{
try
{
if (args.Success)
{
// Let's add to the channel, and our notifier will handle the chunks.
_stageChunkChannel.Writer.TryWrite(args);
}
else
{
// Log an unexpected error since it came back unsuccessful
throw args.Exception;
}
}
catch (Exception ex)
{
// Log an unexpected error since it came back unsuccessful
await _invokeFailedEventHandler(ex).ConfigureAwait(false);
}
}
_bytesTransferred += args.BytesTransferred;
_reportProgressInBytes(args.BytesTransferred);

private async Task NotifyOfPendingStageChunkEvents()
{
try
{
while (await _stageChunkChannel.Reader.WaitToReadAsync(_cancellationToken).ConfigureAwait(false))
if (_bytesTransferred == _expectedLength)
{
// Read one event argument at a time.
StageChunkEventArgs args = await _stageChunkChannel.Reader.ReadAsync(_cancellationToken).ConfigureAwait(false);

// don't need to use Interlocked.Add() as we are reading one event at a time
// and _bytesTransferred is not being read/updated from any other thread
_bytesTransferred += args.BytesTransferred;

// Report the incremental bytes transferred
_reportProgressInBytes(args.BytesTransferred);

if (_bytesTransferred == _expectedLength)
{
// Add CommitBlockList task to the channel
await _queueCommitBlockTask(_sourceProperties).ConfigureAwait(false);
}
else if (_bytesTransferred > _expectedLength)
{
throw Errors.MismatchLengthTransferred(
expectedLength: _expectedLength,
actualLength: _bytesTransferred);
}
// Add CommitBlockList task to the channel
await _queueCommitBlockTask(_sourceProperties).ConfigureAwait(false);
}
}
catch (Exception ex)
{
await _invokeFailedEventHandler(ex).ConfigureAwait(false);
}
}

private async Task SequentialBlockEvent(StageChunkEventArgs args)
{
try
{
if (args.Success)
else if (_bytesTransferred < _expectedLength)
{
long oldOffset = args.Offset;
long newOffset = oldOffset + _blockSize;
if (newOffset < _expectedLength)
// If this is a sequential transfer, we need to queue the next chunk
if (_transferOrder == DataTransferOrder.Sequential)
{
long newOffset = args.Offset + _blockSize;
long blockLength = (newOffset + _blockSize < _expectedLength) ?
_blockSize :
_expectedLength - newOffset;
await _queuePutBlockTask(newOffset, blockLength, _expectedLength, _sourceProperties).ConfigureAwait(false);
_blockSize :
_expectedLength - newOffset;
await _queuePutBlockTask(
newOffset,
blockLength,
_expectedLength,
_sourceProperties).ConfigureAwait(false);
}
}
else
else // _bytesTransferred > _expectedLength
{
// Log an unexpected error since it came back unsuccessful
throw args.Exception;
throw Errors.MismatchLengthTransferred(
expectedLength: _expectedLength,
actualLength: _bytesTransferred);
}
}
catch (Exception ex)
{
// Log an unexpected error since it came back unsuccessful
await _invokeFailedEventHandler(ex).ConfigureAwait(false);
}
}

public async Task InvokeEvent(StageChunkEventArgs args)
{
// There's a race condition where the event handler was disposed and an event
// was already invoked, we should skip over this as the download chunk handler
// was already disposed, and we should just ignore any more incoming events.
if (_commitBlockHandler != null)
{
await _commitBlockHandler.RaiseAsync(
args,
nameof(CommitChunkHandler),
nameof(_commitBlockHandler),
_clientDiagnostics).ConfigureAwait(false);
}
}
}
}
21 changes: 1 addition & 20 deletions sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -504,26 +504,7 @@ internal static long ParseRangeTotalLength(string range)
return long.Parse(range.Substring(lengthSeparator + 1), CultureInfo.InvariantCulture);
}

internal static List<(long Offset, long Size)> GetRangeList(long blockSize, long fileLength)
{
// The list tracking blocks IDs we're going to commit
List<(long Offset, long Size)> partitions = new List<(long, long)>();

// Partition the stream into individual blocks
foreach ((long Offset, long Length) block in GetPartitionIndexes(fileLength, blockSize))
{
/* We need to do this first! Length is calculated on the fly based on stream buffer
* contents; We need to record the partition data first before consuming the stream
* asynchronously. */
partitions.Add(block);
}
return partitions;
}

/// <summary>
/// Partition a stream into a series of blocks buffered as needed by an array pool.
/// </summary>
private static IEnumerable<(long Offset, long Length)> GetPartitionIndexes(
protected static IEnumerable<(long Offset, long Length)> GetRanges(
long streamLength, // StreamLength needed to divide before hand
long blockSize)
{
Expand Down
27 changes: 27 additions & 0 deletions sdk/storage/Azure.Storage.DataMovement/src/QueueStageChunkArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

namespace Azure.Storage.DataMovement
{
/// <summary>
/// This class is interchangable for
/// Stage Block (Put Block), Stage Block From Uri (Put Block From URL),
/// Append Block (Append Block), Append Block From Uri (Append Block From URL),
/// Upload Page (Put Page), Upload Pages From Uri (Put Pages From URL)
///
/// Basically any transfer operation that must end in a Commit Block List
/// will end up using this internal event argument to track the success
/// and the bytes transferred to ensure the correct amount of bytes are tranferred.
/// </summary>
internal class QueueStageChunkArgs
{
public long Offset { get; }
public long BytesTransferred { get; }

public QueueStageChunkArgs(long offset, long bytesTransferred)
{
Offset = offset;
BytesTransferred = bytesTransferred;
}
}
}
Loading

0 comments on commit fa515eb

Please sign in to comment.