Skip to content

Commit

Permalink
[Storage] [DataMovement] Improve checking on part completion for paus…
Browse files Browse the repository at this point in the history
…e/stop/completion status (Azure#45038)

* WIP

* Make CI happy

* Revert "Make CI happy"

This reverts commit a7e80cf.

* WIP

* Cleanup

* More cleanup
  • Loading branch information
amnguye authored and tejasm-microsoft committed Jul 22, 2024
1 parent 73c7c59 commit ab8ff3a
Showing 1 changed file with 11 additions and 14 deletions.
25 changes: 11 additions & 14 deletions sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,11 @@ internal abstract class JobPartInternal
/// </summary>
public SyncAsyncEventHandler<TransferItemCompletedEventArgs> SingleTransferCompletedEventHandler { get; internal set; }

private List<Task<bool>> _chunkTasks;
private List<TaskCompletionSource<bool>> _chunkTaskSources;
/// <summary>
/// Represents the current state of the job part.
/// </summary>
private int _currentChunkCount;
private int _completedChunkCount;
protected bool _queueingTasks = false;

/// <summary>
Expand Down Expand Up @@ -195,8 +198,8 @@ internal JobPartInternal(
StorageResourceCreationPreference.FailIfExists : createMode;

Length = length;
_chunkTasks = new List<Task<bool>>();
_chunkTaskSources = new List<TaskCompletionSource<bool>>();
_currentChunkCount = 0;
_completedChunkCount = 0;
}

public void SetQueueChunkDelegate(QueueChunkDelegate chunkDelegate)
Expand All @@ -212,26 +215,20 @@ public void SetQueueChunkDelegate(QueueChunkDelegate chunkDelegate)
/// <returns></returns>
public async Task QueueChunkToChannelAsync(Func<Task> chunkTask)
{
// Attach TaskCompletionSource
TaskCompletionSource<bool> chunkCompleted = new TaskCompletionSource<bool>(
false,
TaskCreationOptions.RunContinuationsAsynchronously);
_chunkTaskSources.Add(chunkCompleted);
_chunkTasks.Add(chunkCompleted.Task);

Interlocked.Increment(ref _currentChunkCount);
await QueueChunk(
async () =>
{
try
{
await Task.Run(chunkTask).ConfigureAwait(false);
chunkCompleted.SetResult(true);
await CheckAndUpdateCancellationStateAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
await InvokeFailedArg(ex).ConfigureAwait(false);
}
Interlocked.Increment(ref _completedChunkCount);
await CheckAndUpdateCancellationStateAsync().ConfigureAwait(false);
}).ConfigureAwait(false);
}

Expand Down Expand Up @@ -570,7 +567,7 @@ internal async Task CheckAndUpdateCancellationStateAsync()
if (JobPartStatus.State == DataTransferState.Pausing ||
JobPartStatus.State == DataTransferState.Stopping)
{
if (!_queueingTasks && _chunkTasks.All((Task task) => (task.IsCompleted)))
if (!_queueingTasks && _currentChunkCount == _completedChunkCount)
{
DataTransferState newState = JobPartStatus.State == DataTransferState.Pausing ?
DataTransferState.Paused :
Expand Down

0 comments on commit ab8ff3a

Please sign in to comment.