Skip to content

Commit

Permalink
[Storage][DataMovement] Various fixes to transfer logic for chunks (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jalauzon-msft authored Nov 14, 2023
1 parent 559c8c8 commit 75ebdfa
Show file tree
Hide file tree
Showing 13 changed files with 397 additions and 196 deletions.
11 changes: 5 additions & 6 deletions sdk/storage/Azure.Storage.DataMovement/src/CommitChunkHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

namespace Azure.Storage.DataMovement
{
internal class CommitChunkHandler : IAsyncDisposable
internal class CommitChunkHandler : IDisposable
{
// Indicates whether the current thread is processing stage chunks.
private static Task _processStageChunkEvents;
Expand Down Expand Up @@ -40,7 +40,7 @@ public struct Behaviors

/// <summary>
/// Create channel of <see cref="StageChunkEventArgs"/> to keep track of that are
/// waiting to update the bytesTransferredand other required operations.
/// waiting to update the bytesTransferred and other required operations.
/// </summary>
private readonly Channel<StageChunkEventArgs> _stageChunkChannel;
private CancellationToken _cancellationToken;
Expand Down Expand Up @@ -106,20 +106,19 @@ public CommitChunkHandler(
_clientDiagnostics = clientDiagnostics;
}

public async ValueTask DisposeAsync()
public void Dispose()
{
// We no longer have to read from the channel. We are not expecting any more requests.
_stageChunkChannel.Writer.TryComplete();
await _stageChunkChannel.Reader.Completion.ConfigureAwait(false);

if (_currentBytesSemaphore != default)
{
_currentBytesSemaphore.Dispose();
}
DipsoseHandlers();
DisposeHandlers();
}

private void DipsoseHandlers()
private void DisposeHandlers()
{
if (_transferOrder == DataTransferOrder.Sequential)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

namespace Azure.Storage.DataMovement
{
internal class DownloadChunkHandler : IAsyncDisposable
internal class DownloadChunkHandler : IDisposable
{
// Indicates whether the current thread is processing stage chunks.
private static Task _processDownloadRangeEvents;
Expand Down Expand Up @@ -49,7 +49,7 @@ public struct Behaviors

/// <summary>
/// Create channel of <see cref="DownloadRangeEventArgs"/> to keep track of that are
/// waiting to update the bytesTransferredand other required operations.
/// waiting to update the bytesTransferred and other required operations.
/// </summary>
private readonly Channel<DownloadRangeEventArgs> _downloadRangeChannel;
private CancellationToken _cancellationToken;
Expand Down Expand Up @@ -156,10 +156,9 @@ public DownloadChunkHandler(
ClientDiagnostics = clientDiagnostics;
}

public async ValueTask DisposeAsync()
public void Dispose()
{
_downloadRangeChannel.Writer.TryComplete();
await _downloadRangeChannel.Reader.Completion.ConfigureAwait(false);

if (_currentBytesSemaphore != default)
{
Expand Down
15 changes: 8 additions & 7 deletions sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ internal abstract class JobPartInternal

private List<Task<bool>> _chunkTasks;
private List<TaskCompletionSource<bool>> _chunkTaskSources;
protected bool _queueingTasks = false;

/// <summary>
/// Array pools for reading from streams to upload
Expand Down Expand Up @@ -557,15 +558,15 @@ internal static long ParseRangeTotalLength(string range)

internal async Task CheckAndUpdateCancellationStateAsync()
{
if (_chunkTasks.All((Task task) => (task.IsCompleted)))
if (JobPartStatus.State == DataTransferState.Pausing ||
JobPartStatus.State == DataTransferState.Stopping)
{
if (JobPartStatus.State == DataTransferState.Pausing)
if (!_queueingTasks && _chunkTasks.All((Task task) => (task.IsCompleted)))
{
await OnTransferStateChangedAsync(DataTransferState.Paused).ConfigureAwait(false);
}
else if (JobPartStatus.State == DataTransferState.Stopping)
{
await OnTransferStateChangedAsync(DataTransferState.Completed).ConfigureAwait(false);
DataTransferState newState = JobPartStatus.State == DataTransferState.Pausing ?
DataTransferState.Paused :
DataTransferState.Completed;
await OnTransferStateChangedAsync(newState).ConfigureAwait(false);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

namespace Azure.Storage.DataMovement
{
internal class ServiceToServiceJobPart : JobPartInternal, IAsyncDisposable
internal class ServiceToServiceJobPart : JobPartInternal, IDisposable
{
public delegate Task CommitBlockTaskInternal(CancellationToken cancellationToken);
public CommitBlockTaskInternal CommitBlockTask { get; internal set; }
Expand Down Expand Up @@ -112,9 +112,9 @@ private ServiceToServiceJobPart(
{
}

public async ValueTask DisposeAsync()
public void Dispose()
{
await DisposeHandlers().ConfigureAwait(false);
DisposeHandlers();
}

/// <summary>
Expand Down Expand Up @@ -231,13 +231,8 @@ await StartSingleCallCopy(length).ConfigureAwait(false))
}
else // Sequential
{
// Queue partitioned block task
await QueueChunkToChannelAsync(
async () =>
await PutBlockFromUri(
offset: commitBlockList[0].Offset,
blockLength: commitBlockList[0].Length,
expectedLength: length).ConfigureAwait(false)).ConfigureAwait(false);
// Queue the first partitioned block task
await QueueStageBlockRequest(commitBlockList[0].Offset, commitBlockList[0].Length, length).ConfigureAwait(false);
}
}
else
Expand Down Expand Up @@ -341,10 +336,10 @@ internal static CommitChunkHandler.Behaviors GetBlockListCommitHandlerBehaviors(
{
return new CommitChunkHandler.Behaviors
{
QueuePutBlockTask = async (long offset, long blockSize, long expectedLength) => await jobPart.PutBlockFromUri(offset, blockSize, expectedLength).ConfigureAwait(false),
QueueCommitBlockTask = async () => await jobPart.CompleteTransferAsync().ConfigureAwait(false),
ReportProgressInBytes = (long bytesWritten) => jobPart.ReportBytesWritten(bytesWritten),
InvokeFailedHandler = async (ex) => await jobPart.InvokeFailedArg(ex).ConfigureAwait(false),
QueuePutBlockTask = jobPart.QueueStageBlockRequest,
QueueCommitBlockTask = jobPart.CompleteTransferAsync,
ReportProgressInBytes = jobPart.ReportBytesWritten,
InvokeFailedHandler = jobPart.InvokeFailedArg,
};
}
#endregion
Expand All @@ -359,7 +354,7 @@ await _destinationResource.CompleteTransferAsync(
cancellationToken: _cancellationToken).ConfigureAwait(false);

// Dispose the handlers
await DisposeHandlers().ConfigureAwait(false);
DisposeHandlers();

// Set completion status to completed
await OnTransferStateChangedAsync(DataTransferState.Completed).ConfigureAwait(false);
Expand All @@ -372,17 +367,24 @@ await _destinationResource.CompleteTransferAsync(

private async Task QueueStageBlockRequests(List<(long Offset, long Size)> commitBlockList, long expectedLength)
{
_queueingTasks = true;
// Partition the stream into individual blocks
foreach ((long Offset, long Length) block in commitBlockList)
{
// Queue partitioned block task
await QueueChunkToChannelAsync(
async () =>
await PutBlockFromUri(
block.Offset,
block.Length,
expectedLength).ConfigureAwait(false)).ConfigureAwait(false);
await QueueStageBlockRequest(block.Offset, block.Length, expectedLength).ConfigureAwait(false);
}
_queueingTasks = false;
}

private Task QueueStageBlockRequest(long offset, long blockSize, long expectedLength)
{
return QueueChunkToChannelAsync(
async () =>
await PutBlockFromUri(
offset,
blockSize,
expectedLength).ConfigureAwait(false));
}

internal async Task PutBlockFromUri(
Expand Down Expand Up @@ -452,21 +454,21 @@ await _commitBlockHandler.InvokeEvent(

public override async Task InvokeSkippedArg()
{
await DisposeHandlers().ConfigureAwait(false);
DisposeHandlers();
await base.InvokeSkippedArg().ConfigureAwait(false);
}

public override async Task InvokeFailedArg(Exception ex)
{
await DisposeHandlers().ConfigureAwait(false);
DisposeHandlers();
await base.InvokeFailedArg(ex).ConfigureAwait(false);
}

internal async Task DisposeHandlers()
internal void DisposeHandlers()
{
if (_commitBlockHandler != default)
{
await _commitBlockHandler.DisposeAsync().ConfigureAwait(false);
_commitBlockHandler.Dispose();
}
}

Expand Down
57 changes: 29 additions & 28 deletions sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

namespace Azure.Storage.DataMovement
{
internal class StreamToUriJobPart : JobPartInternal, IAsyncDisposable
internal class StreamToUriJobPart : JobPartInternal, IDisposable
{
/// <summary>
/// Will handle the calling the commit block list API once
Expand Down Expand Up @@ -112,9 +112,9 @@ private StreamToUriJobPart(
{
}

public async ValueTask DisposeAsync()
public void Dispose()
{
await DisposeHandlers().ConfigureAwait(false);
DisposeHandlers();
}

/// <summary>
Expand Down Expand Up @@ -182,7 +182,7 @@ public static StreamToUriJobPart CreateJobPartFromCheckpoint(
public override async Task ProcessPartToChunkAsync()
{
// Attempt to get the length, it's possible the file could
// not be accesible (or does not exist).
// not be accessible (or does not exist).
string operationName = $"{nameof(TransferManager.StartTransferAsync)}";
await OnTransferStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false);
long? fileLength = default;
Expand Down Expand Up @@ -224,13 +224,8 @@ await CreateDestinationResource(
}
else // Sequential
{
// Queue paritioned block task
await QueueChunkToChannelAsync(
async () =>
await StageBlockInternal(
rangeList[0].Offset,
rangeList[0].Length,
length).ConfigureAwait(false)).ConfigureAwait(false);
// Queue the first partitioned block task
await QueueStageBlockRequest(rangeList[0].Offset, rangeList[0].Length, length).ConfigureAwait(false);
}
}
}
Expand Down Expand Up @@ -355,11 +350,10 @@ internal static CommitChunkHandler.Behaviors GetBlockListCommitHandlerBehaviors(
{
return new CommitChunkHandler.Behaviors
{
QueuePutBlockTask = async (long offset, long blockSize, long expectedLength) => await jobPart.StageBlockInternal(offset, blockSize, expectedLength).ConfigureAwait(false),
QueueCommitBlockTask = async () => await jobPart.CompleteTransferAsync().ConfigureAwait(false),
ReportProgressInBytes = (long bytesWritten) =>
jobPart.ReportBytesWritten(bytesWritten),
InvokeFailedHandler = async (ex) => await jobPart.InvokeFailedArg(ex).ConfigureAwait(false),
QueuePutBlockTask = jobPart.QueueStageBlockRequest,
QueueCommitBlockTask = jobPart.CompleteTransferAsync,
ReportProgressInBytes = jobPart.ReportBytesWritten,
InvokeFailedHandler = jobPart.InvokeFailedArg,
};
}
#endregion
Expand Down Expand Up @@ -439,25 +433,32 @@ await _destinationResource.CompleteTransferAsync(
cancellationToken: _cancellationToken).ConfigureAwait(false);

// Dispose the handlers
await DisposeHandlers().ConfigureAwait(false);
DisposeHandlers();

// Set completion status to completed
await OnTransferStateChangedAsync(DataTransferState.Completed).ConfigureAwait(false);
}

private async Task QueueStageBlockRequests(List<(long Offset, long Size)> rangeList, long completeLength)
{
_queueingTasks = true;
// Partition the stream into individual blocks
foreach ((long Offset, long Length) block in rangeList)
{
// Queue paritioned block task
await QueueChunkToChannelAsync(
async () =>
await StageBlockInternal(
block.Offset,
block.Length,
completeLength).ConfigureAwait(false)).ConfigureAwait(false);
// Queue partitioned block task
await QueueStageBlockRequest(block.Offset, block.Length, completeLength).ConfigureAwait(false);
}
_queueingTasks = false;
}

private Task QueueStageBlockRequest(long offset, long blockSize, long expectedLength)
{
return QueueChunkToChannelAsync(
async () =>
await StageBlockInternal(
offset,
blockSize,
expectedLength).ConfigureAwait(false));
}

/// <summary>
Expand Down Expand Up @@ -501,21 +502,21 @@ private static async Task<Stream> GetOffsetPartitionInternal(

public override async Task InvokeSkippedArg()
{
await DisposeHandlers().ConfigureAwait(false);
DisposeHandlers();
await base.InvokeSkippedArg().ConfigureAwait(false);
}

public override async Task InvokeFailedArg(Exception ex)
{
await DisposeHandlers().ConfigureAwait(false);
DisposeHandlers();
await base.InvokeFailedArg(ex).ConfigureAwait(false);
}

internal async Task DisposeHandlers()
internal void DisposeHandlers()
{
if (_commitBlockHandler != default)
{
await _commitBlockHandler.DisposeAsync().ConfigureAwait(false);
_commitBlockHandler.Dispose();
}
}
}
Expand Down
Loading

0 comments on commit 75ebdfa

Please sign in to comment.