diff --git a/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs b/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs index 979268acf8492..d0ca5e0d5a64f 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs @@ -127,8 +127,11 @@ internal abstract class JobPartInternal /// public SyncAsyncEventHandler SingleTransferCompletedEventHandler { get; internal set; } - private List> _chunkTasks; - private List> _chunkTaskSources; + /// + /// Represents the current state of the job part. + /// + private int _currentChunkCount; + private int _completedChunkCount; protected bool _queueingTasks = false; /// @@ -195,8 +198,8 @@ internal JobPartInternal( StorageResourceCreationPreference.FailIfExists : createMode; Length = length; - _chunkTasks = new List>(); - _chunkTaskSources = new List>(); + _currentChunkCount = 0; + _completedChunkCount = 0; } public void SetQueueChunkDelegate(QueueChunkDelegate chunkDelegate) @@ -212,26 +215,20 @@ public void SetQueueChunkDelegate(QueueChunkDelegate chunkDelegate) /// public async Task QueueChunkToChannelAsync(Func chunkTask) { - // Attach TaskCompletionSource - TaskCompletionSource chunkCompleted = new TaskCompletionSource( - false, - TaskCreationOptions.RunContinuationsAsynchronously); - _chunkTaskSources.Add(chunkCompleted); - _chunkTasks.Add(chunkCompleted.Task); - + Interlocked.Increment(ref _currentChunkCount); await QueueChunk( async () => { try { await Task.Run(chunkTask).ConfigureAwait(false); - chunkCompleted.SetResult(true); - await CheckAndUpdateCancellationStateAsync().ConfigureAwait(false); } catch (Exception ex) { await InvokeFailedArg(ex).ConfigureAwait(false); } + Interlocked.Increment(ref _completedChunkCount); + await CheckAndUpdateCancellationStateAsync().ConfigureAwait(false); }).ConfigureAwait(false); } @@ -570,7 +567,7 @@ internal async Task CheckAndUpdateCancellationStateAsync() if (JobPartStatus.State == DataTransferState.Pausing || JobPartStatus.State == DataTransferState.Stopping) { - if (!_queueingTasks && _chunkTasks.All((Task task) => (task.IsCompleted))) + if (!_queueingTasks && _currentChunkCount == _completedChunkCount) { DataTransferState newState = JobPartStatus.State == DataTransferState.Pausing ? DataTransferState.Paused :