Skip to content

Commit

Permalink
Chunk Handler isolating bytesTransferred updates / Channels exiting g…
Browse files Browse the repository at this point in the history
…racefully. (#33033)

* Rerecorded download and upload tests

* Remove ignored test recordings

* Added semaphore for bytes transferred, Rerecorded all tests (lowered wait time for failure)

* Undo change to nunit.runsettings

* Remove unnecessary recordings and fix formatting

* Added Changes to chunk handlers and disabled multiple single transfer tests

* undo changes to eng/nunit.runsettings

* WIP

* Updated commit block handler tests, renamed internal minor variables

* Add parallel commit block handler tests

* Added sequential tests and download chunk tests; Added semaphore to single read write for correct offset

* Change wait time to be longer to avoid flakeyness
  • Loading branch information
amnguye authored Dec 14, 2022
1 parent 84190fe commit 43c7323
Show file tree
Hide file tree
Showing 23 changed files with 6,134 additions and 170 deletions.
118 changes: 94 additions & 24 deletions sdk/storage/Azure.Storage.DataMovement/src/CommitChunkHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,47 @@
using Azure.Storage.DataMovement;
using System.Threading.Tasks;
using System.Threading;
using System.Threading.Channels;

namespace Azure.Storage.DataMovement
{
internal class CommitChunkHandler : IDisposable
internal class CommitChunkHandler : IAsyncDisposable
{
// Indicates whether the current thread is processing stage chunks.
private static Task _processStageChunkEvents;

#region Delegate Definitions
public delegate Task QueuePutBlockTaskInternal(long offset, long blockSize, long expectedLength);
public delegate Task QueueCommitBlockTaskInternal();
public delegate Task UpdateTransferStatusInternal(StorageTransferStatus status);
public delegate void ReportProgressInBytes(long bytesWritten);
public delegate Task InvokeFailedEventHandlerInternal(Exception ex);
#endregion Delegate Definitions

private readonly QueuePutBlockTaskInternal _queuePutBlockTask;
private readonly QueueCommitBlockTaskInternal _queueCommitBlockTask;
private readonly ReportProgressInBytes _reportProgressInBytes;
private readonly UpdateTransferStatusInternal _updateTransferStatus;
private readonly InvokeFailedEventHandlerInternal _invokeFailedEventHandler;

public struct Behaviors
{
public QueuePutBlockTaskInternal QueuePutBlockTask { get; set; }
public QueueCommitBlockTaskInternal QueueCommitBlockTask { get; set; }
public ReportProgressInBytes ReportProgressInBytes { get; set; }
public UpdateTransferStatusInternal UpdateTransferStatus { get; set; }
public InvokeFailedEventHandlerInternal InvokeFailedHandler { get; set; }
}

private event SyncAsyncEventHandler<StageChunkEventArgs> _commitBlockHandler;
internal SyncAsyncEventHandler<StageChunkEventArgs> GetCommitBlockHandler() => _commitBlockHandler;

/// <summary>
/// Create channel of <see cref="StageChunkEventArgs"/> to keep track of that are
/// waiting to update the bytesTransferredand other required operations.
/// </summary>
private readonly Channel<StageChunkEventArgs> _stageChunkChannel;
private readonly CancellationTokenSource _cancellationTokenSource;
private CancellationToken _cancellationToken => _cancellationTokenSource.Token;

private readonly SemaphoreSlim _currentBytesSemaphore;
private long _bytesTransferred;
private readonly long _expectedLength;
private readonly long _blockSize;
Expand All @@ -64,13 +74,25 @@ public CommitChunkHandler(
?? throw Errors.ArgumentNull(nameof(behaviors.ReportProgressInBytes));
_invokeFailedEventHandler = behaviors.InvokeFailedHandler
?? throw Errors.ArgumentNull(nameof(behaviors.InvokeFailedHandler));
_updateTransferStatus = behaviors.UpdateTransferStatus
?? throw Errors.ArgumentNull(nameof(behaviors.UpdateTransferStatus));

// Set expected length to perform commit task
_expectedLength = expectedLength;

// Create channel of finished Stage Chunk Args to update the bytesTransferred
// and for ending tasks like commit block.
// The size of the channel should never exceed 50k (limit on blocks in a block blob).
// and that's in the worst case that we never read from the channel and had a maximum chunk blob.
_stageChunkChannel = Channel.CreateUnbounded<StageChunkEventArgs>(
new UnboundedChannelOptions()
{
// Single reader is required as we can only read and write to bytesTransferred value
SingleReader = true,
});
_cancellationTokenSource = new CancellationTokenSource();
_processStageChunkEvents = Task.Run(() => NotifyOfPendingStageChunkEvents());

// Set bytes transferred to block size because we transferred the initial block
_currentBytesSemaphore = new SemaphoreSlim(1, 1);
_bytesTransferred = blockSize;

_blockSize = blockSize;
Expand All @@ -82,12 +104,25 @@ public CommitChunkHandler(
_commitBlockHandler += CommitBlockEvent;
}

public void Dispose()
public async ValueTask DisposeAsync()
{
CleanUp();
// We no longer have to read from the channel. We are not expecting any more requests.
_stageChunkChannel.Writer.Complete();
await _stageChunkChannel.Reader.Completion.ConfigureAwait(false);
if (!_cancellationTokenSource.IsCancellationRequested)
{
_cancellationTokenSource.Cancel();
}
_cancellationTokenSource.Dispose();

if (_currentBytesSemaphore != default)
{
_currentBytesSemaphore.Dispose();
}
DipsoseHandlers();
}

public void CleanUp()
public void DipsoseHandlers()
{
if (_transferType == TransferType.Sequential)
{
Expand All @@ -100,28 +135,63 @@ private async Task CommitBlockEvent(StageChunkEventArgs args)
{
if (args.Success)
{
Interlocked.Add(ref _bytesTransferred, args.BytesTransferred);
// Use progress tracker to get the amount of bytes transferred
if (_bytesTransferred == _expectedLength)
{
// Add CommitBlockList task to the channel
await _queueCommitBlockTask().ConfigureAwait(false);
}
else if (_bytesTransferred > _expectedLength)
{
await _updateTransferStatus(StorageTransferStatus.CompletedWithSkippedTransfers).ConfigureAwait(false);
await _invokeFailedEventHandler(
new Exception("Unexpected Error: Amount of bytes transferred exceeds expected length.")).ConfigureAwait(false);
}
_reportProgressInBytes(_bytesTransferred);
// Let's add to the channel, and our notifier will handle the chunks.
await _stageChunkChannel.Writer.WriteAsync(args, _cancellationToken).ConfigureAwait(false);
}
else
{
// Set status to completed
await _invokeFailedEventHandler(new Exception("Failure on Stage Block")).ConfigureAwait(false);
}
}

private async Task NotifyOfPendingStageChunkEvents()
{
try
{
while (await _stageChunkChannel.Reader.WaitToReadAsync(_cancellationToken).ConfigureAwait(false))
{
// Read one event argument at a time.
StageChunkEventArgs args = await _stageChunkChannel.Reader.ReadAsync(_cancellationToken).ConfigureAwait(false);
try
{
await _currentBytesSemaphore.WaitAsync(_cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// We should not continue if waiting on the semaphore has cancelled out.
return;
}

Interlocked.Add(ref _bytesTransferred, args.BytesTransferred);
// Use progress tracker to get the amount of bytes transferred
_reportProgressInBytes(_bytesTransferred);
if (_bytesTransferred == _expectedLength)
{
// Add CommitBlockList task to the channel
await _queueCommitBlockTask().ConfigureAwait(false);
_currentBytesSemaphore.Release();
return;
}
else if (_bytesTransferred > _expectedLength)
{
await _invokeFailedEventHandler(
new Exception("Unexpected Error: Amount of bytes transferred exceeds expected length.")).ConfigureAwait(false);
_currentBytesSemaphore.Release();
return;
}
_currentBytesSemaphore.Release();
}
}
catch (OperationCanceledException)
{
// If operation cancelled, no need to log the exception. As it's logged by whoever called the cancellation (e.g. disposal)
}
catch (Exception ex)
{
await _invokeFailedEventHandler(ex).ConfigureAwait(false);
}
}

private async Task QueueBlockEvent(StageChunkEventArgs args)
{
if (args.Success)
Expand Down
Loading

0 comments on commit 43c7323

Please sign in to comment.