From ab8ff3acc9d420c38a607e4097d482058fddd79d Mon Sep 17 00:00:00 2001 From: Amanda Nguyen <48961492+amnguye@users.noreply.github.com> Date: Tue, 16 Jul 2024 13:41:42 -0700 Subject: [PATCH] [Storage] [DataMovement] Improve checking on part completion for pause/stop/completion status (#45038) * WIP * Make CI happy * Revert "Make CI happy" This reverts commit a7e80cfc27850b62075ea05c22475080ed79b7f4. * WIP * Cleanup * More cleanup --- .../src/JobPartInternal.cs | 25 ++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs b/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs index 979268acf8492..d0ca5e0d5a64f 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs @@ -127,8 +127,11 @@ internal abstract class JobPartInternal /// public SyncAsyncEventHandler SingleTransferCompletedEventHandler { get; internal set; } - private List> _chunkTasks; - private List> _chunkTaskSources; + /// + /// Represents the current state of the job part. + /// + private int _currentChunkCount; + private int _completedChunkCount; protected bool _queueingTasks = false; /// @@ -195,8 +198,8 @@ internal JobPartInternal( StorageResourceCreationPreference.FailIfExists : createMode; Length = length; - _chunkTasks = new List>(); - _chunkTaskSources = new List>(); + _currentChunkCount = 0; + _completedChunkCount = 0; } public void SetQueueChunkDelegate(QueueChunkDelegate chunkDelegate) @@ -212,26 +215,20 @@ public void SetQueueChunkDelegate(QueueChunkDelegate chunkDelegate) /// public async Task QueueChunkToChannelAsync(Func chunkTask) { - // Attach TaskCompletionSource - TaskCompletionSource chunkCompleted = new TaskCompletionSource( - false, - TaskCreationOptions.RunContinuationsAsynchronously); - _chunkTaskSources.Add(chunkCompleted); - _chunkTasks.Add(chunkCompleted.Task); - + Interlocked.Increment(ref _currentChunkCount); await QueueChunk( async () => { try { await Task.Run(chunkTask).ConfigureAwait(false); - chunkCompleted.SetResult(true); - await CheckAndUpdateCancellationStateAsync().ConfigureAwait(false); } catch (Exception ex) { await InvokeFailedArg(ex).ConfigureAwait(false); } + Interlocked.Increment(ref _completedChunkCount); + await CheckAndUpdateCancellationStateAsync().ConfigureAwait(false); }).ConfigureAwait(false); } @@ -570,7 +567,7 @@ internal async Task CheckAndUpdateCancellationStateAsync() if (JobPartStatus.State == DataTransferState.Pausing || JobPartStatus.State == DataTransferState.Stopping) { - if (!_queueingTasks && _chunkTasks.All((Task task) => (task.IsCompleted))) + if (!_queueingTasks && _currentChunkCount == _completedChunkCount) { DataTransferState newState = JobPartStatus.State == DataTransferState.Pausing ? DataTransferState.Paused :