Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunk Handler isolating bytesTransferred updates / Channels exiting gracefully. #33033

Merged
merged 13 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 94 additions & 24 deletions sdk/storage/Azure.Storage.DataMovement/src/CommitChunkHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,47 @@
using Azure.Storage.DataMovement;
using System.Threading.Tasks;
using System.Threading;
using System.Threading.Channels;

namespace Azure.Storage.DataMovement
{
internal class CommitChunkHandler : IDisposable
internal class CommitChunkHandler : IAsyncDisposable
{
// Indicates whether the current thread is processing stage chunks.
private static Task _processStageChunkEvents;

#region Delegate Definitions
public delegate Task QueuePutBlockTaskInternal(long offset, long blockSize, long expectedLength);
public delegate Task QueueCommitBlockTaskInternal();
public delegate Task UpdateTransferStatusInternal(StorageTransferStatus status);
public delegate void ReportProgressInBytes(long bytesWritten);
public delegate Task InvokeFailedEventHandlerInternal(Exception ex);
#endregion Delegate Definitions

private readonly QueuePutBlockTaskInternal _queuePutBlockTask;
private readonly QueueCommitBlockTaskInternal _queueCommitBlockTask;
private readonly ReportProgressInBytes _reportProgressInBytes;
private readonly UpdateTransferStatusInternal _updateTransferStatus;
private readonly InvokeFailedEventHandlerInternal _invokeFailedEventHandler;

public struct Behaviors
{
public QueuePutBlockTaskInternal QueuePutBlockTask { get; set; }
public QueueCommitBlockTaskInternal QueueCommitBlockTask { get; set; }
public ReportProgressInBytes ReportProgressInBytes { get; set; }
public UpdateTransferStatusInternal UpdateTransferStatus { get; set; }
public InvokeFailedEventHandlerInternal InvokeFailedHandler { get; set; }
}

private event SyncAsyncEventHandler<StageChunkEventArgs> _commitBlockHandler;
internal SyncAsyncEventHandler<StageChunkEventArgs> GetCommitBlockHandler() => _commitBlockHandler;

/// <summary>
/// Create channel of <see cref="StageChunkEventArgs"/> to keep track of that are
/// waiting to update the bytesTransferredand other required operations.
/// </summary>
private readonly Channel<StageChunkEventArgs> _stageChunkChannel;
private readonly CancellationTokenSource _cancellationTokenSource;
private CancellationToken _cancellationToken => _cancellationTokenSource.Token;

private readonly SemaphoreSlim _currentBytesSemaphore;
private long _bytesTransferred;
private readonly long _expectedLength;
private readonly long _blockSize;
Expand All @@ -64,13 +74,25 @@ public CommitChunkHandler(
?? throw Errors.ArgumentNull(nameof(behaviors.ReportProgressInBytes));
_invokeFailedEventHandler = behaviors.InvokeFailedHandler
?? throw Errors.ArgumentNull(nameof(behaviors.InvokeFailedHandler));
_updateTransferStatus = behaviors.UpdateTransferStatus
?? throw Errors.ArgumentNull(nameof(behaviors.UpdateTransferStatus));

// Set expected length to perform commit task
_expectedLength = expectedLength;

// Create channel of finished Stage Chunk Args to update the bytesTransferred
// and for ending tasks like commit block.
// The size of the channel should never exceed 50k (limit on blocks in a block blob).
// and that's in the worst case that we never read from the channel and had a maximum chunk blob.
_stageChunkChannel = Channel.CreateUnbounded<StageChunkEventArgs>(
new UnboundedChannelOptions()
{
// Single reader is required as we can only read and write to bytesTransferred value
SingleReader = true,
});
_cancellationTokenSource = new CancellationTokenSource();
_processStageChunkEvents = Task.Run(() => NotifyOfPendingStageChunkEvents());

// Set bytes transferred to block size because we transferred the initial block
_currentBytesSemaphore = new SemaphoreSlim(1, 1);
_bytesTransferred = blockSize;

_blockSize = blockSize;
Expand All @@ -82,12 +104,25 @@ public CommitChunkHandler(
_commitBlockHandler += CommitBlockEvent;
}

public void Dispose()
public async ValueTask DisposeAsync()
{
CleanUp();
// We no longer have to read from the channel. We are not expecting any more requests.
_stageChunkChannel.Writer.Complete();
await _stageChunkChannel.Reader.Completion.ConfigureAwait(false);
if (!_cancellationTokenSource.IsCancellationRequested)
{
_cancellationTokenSource.Cancel();
}
_cancellationTokenSource.Dispose();

if (_currentBytesSemaphore != default)
{
_currentBytesSemaphore.Dispose();
}
DipsoseHandlers();
}

public void CleanUp()
public void DipsoseHandlers()
{
if (_transferType == TransferType.Sequential)
{
Expand All @@ -100,28 +135,63 @@ private async Task CommitBlockEvent(StageChunkEventArgs args)
{
if (args.Success)
{
Interlocked.Add(ref _bytesTransferred, args.BytesTransferred);
// Use progress tracker to get the amount of bytes transferred
if (_bytesTransferred == _expectedLength)
{
// Add CommitBlockList task to the channel
await _queueCommitBlockTask().ConfigureAwait(false);
}
else if (_bytesTransferred > _expectedLength)
{
await _updateTransferStatus(StorageTransferStatus.CompletedWithSkippedTransfers).ConfigureAwait(false);
await _invokeFailedEventHandler(
new Exception("Unexpected Error: Amount of bytes transferred exceeds expected length.")).ConfigureAwait(false);
}
_reportProgressInBytes(_bytesTransferred);
// Let's add to the channel, and our notifier will handle the chunks.
await _stageChunkChannel.Writer.WriteAsync(args, _cancellationToken).ConfigureAwait(false);
}
else
{
// Set status to completed
await _invokeFailedEventHandler(new Exception("Failure on Stage Block")).ConfigureAwait(false);
}
}

private async Task NotifyOfPendingStageChunkEvents()
{
try
{
while (await _stageChunkChannel.Reader.WaitToReadAsync(_cancellationToken).ConfigureAwait(false))
{
// Read one event argument at a time.
StageChunkEventArgs args = await _stageChunkChannel.Reader.ReadAsync(_cancellationToken).ConfigureAwait(false);
try
{
await _currentBytesSemaphore.WaitAsync(_cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// We should not continue if waiting on the semaphore has cancelled out.
return;
}

Interlocked.Add(ref _bytesTransferred, args.BytesTransferred);
amnguye marked this conversation as resolved.
Show resolved Hide resolved
// Use progress tracker to get the amount of bytes transferred
_reportProgressInBytes(_bytesTransferred);
if (_bytesTransferred == _expectedLength)
{
// Add CommitBlockList task to the channel
await _queueCommitBlockTask().ConfigureAwait(false);
_currentBytesSemaphore.Release();
return;
}
else if (_bytesTransferred > _expectedLength)
{
await _invokeFailedEventHandler(
new Exception("Unexpected Error: Amount of bytes transferred exceeds expected length.")).ConfigureAwait(false);
_currentBytesSemaphore.Release();
return;
}
_currentBytesSemaphore.Release();
}
}
catch (OperationCanceledException)
{
// If operation cancelled, no need to log the exception. As it's logged by whoever called the cancellation (e.g. disposal)
}
catch (Exception ex)
{
await _invokeFailedEventHandler(ex).ConfigureAwait(false);
}
}

private async Task QueueBlockEvent(StageChunkEventArgs args)
{
if (args.Success)
Expand Down
Loading