Skip to content

Commit

Permalink
initial commit (#47003)
Browse files Browse the repository at this point in the history
  • Loading branch information
nickliu-msft authored Nov 7, 2024
1 parent 313748c commit 65c1075
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 65 deletions.
18 changes: 9 additions & 9 deletions sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ await QueueChunk(
}
catch (Exception ex)
{
await InvokeFailedArg(ex).ConfigureAwait(false);
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
}
Interlocked.Increment(ref _completedChunkCount);
await CheckAndUpdateCancellationStateAsync().ConfigureAwait(false);
Expand All @@ -234,9 +234,9 @@ await QueueChunk(
}

/// <summary>
/// Processes the job to job parts
/// Processes the job part to chunks
/// </summary>
/// <returns>An IEnumerable that contains the job chunks</returns>
/// <returns>The task that's queueing up the chunks</returns>
public abstract Task ProcessPartToChunkAsync();

/// <summary>
Expand Down Expand Up @@ -285,11 +285,11 @@ internal async Task OnTransferStateChangedAsync(DataTransferState transferState)
else if (JobPartStatus.HasCompletedSuccessfully)
{
_progressTracker.IncrementCompletedFiles();
await InvokeSingleCompletedArg().ConfigureAwait(false);
await InvokeSingleCompletedArgAsync().ConfigureAwait(false);
}

// Set the status in the checkpointer
await SetCheckpointerStatus().ConfigureAwait(false);
await SetCheckpointerStatusAsync().ConfigureAwait(false);

await PartTransferStatusEventHandler.RaiseAsync(
new TransferStatusEventArgs(
Expand All @@ -313,7 +313,7 @@ internal void ReportBytesWritten(long bytesTransferred)
_progressTracker.IncrementBytesTransferred(bytesTransferred);
}

public async virtual Task InvokeSingleCompletedArg()
public async virtual Task InvokeSingleCompletedArgAsync()
{
if (SingleTransferCompletedEventHandler != null)
{
Expand All @@ -334,7 +334,7 @@ await SingleTransferCompletedEventHandler.RaiseAsync(
/// <summary>
/// Invokes Skipped Argument Event.
/// </summary>
public async virtual Task InvokeSkippedArg()
public async virtual Task InvokeSkippedArgAsync()
{
if (TransferSkippedEventHandler != null)
{
Expand Down Expand Up @@ -375,7 +375,7 @@ await PartTransferStatusEventHandler.RaiseAsync(
/// <summary>
/// Invokes Failed Argument Event.
/// </summary>
public async virtual Task InvokeFailedArg(Exception ex)
public async virtual Task InvokeFailedArgAsync(Exception ex)
{
if (ex is not OperationCanceledException &&
ex is not TaskCanceledException &&
Expand Down Expand Up @@ -485,7 +485,7 @@ await _checkpointer.AddNewJobPartAsync(
}
}

internal async virtual Task SetCheckpointerStatus()
internal async virtual Task SetCheckpointerStatusAsync()
{
await _checkpointer.SetJobPartStatusAsync(
transferId: _dataTransfer.Id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ public static ServiceToServiceJobPart CreateJobPartFromCheckpoint(
createPreference: createPreference);
}

/// <summary>
/// Processes the job part to chunks
/// </summary>
/// <returns>The task that's queueing up the chunks</returns>
public override async Task ProcessPartToChunkAsync()
{
try
Expand All @@ -195,7 +199,7 @@ await _destinationResource.SetPermissionsAsync(
fileLength = sourceProperties.ResourceLength;
if (!fileLength.HasValue)
{
await InvokeFailedArg(Errors.UnableToGetLength()).ConfigureAwait(false);
await InvokeFailedArgAsync(Errors.UnableToGetLength()).ConfigureAwait(false);
return;
}
long length = fileLength.Value;
Expand Down Expand Up @@ -244,7 +248,7 @@ await QueueStageBlockRequest(
}
catch (Exception ex)
{
await InvokeFailedArg(ex).ConfigureAwait(false);
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
}
}

Expand All @@ -269,17 +273,17 @@ await _destinationResource.CopyFromUriAsync(
when (_createMode == StorageResourceCreationPreference.SkipIfExists
&& exception.ErrorCode == "BlobAlreadyExists")
{
await InvokeSkippedArg().ConfigureAwait(false);
await InvokeSkippedArgAsync().ConfigureAwait(false);
}
catch (InvalidOperationException ex)
when (_createMode == StorageResourceCreationPreference.SkipIfExists
&& ex.Message.Contains("Cannot overwrite file."))
{
await InvokeSkippedArg().ConfigureAwait(false);
await InvokeSkippedArgAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
await InvokeFailedArg(ex).ConfigureAwait(false);
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
}
}

Expand Down Expand Up @@ -316,11 +320,11 @@ await _destinationResource.CopyBlockFromUriAsync(
when (_createMode == StorageResourceCreationPreference.SkipIfExists
&& exception.ErrorCode == "BlobAlreadyExists")
{
await InvokeSkippedArg().ConfigureAwait(false);
await InvokeSkippedArgAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
await InvokeFailedArg(ex).ConfigureAwait(false);
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
}
return false;
}
Expand Down Expand Up @@ -349,7 +353,7 @@ internal static CommitChunkHandler.Behaviors GetBlockListCommitHandlerBehaviors(
QueuePutBlockTask = jobPart.QueueStageBlockRequest,
QueueCommitBlockTask = jobPart.CompleteTransferAsync,
ReportProgressInBytes = jobPart.ReportBytesWritten,
InvokeFailedHandler = jobPart.InvokeFailedArg,
InvokeFailedHandler = jobPart.InvokeFailedArgAsync,
};
}
#endregion
Expand All @@ -372,7 +376,7 @@ await _destinationResource.CompleteTransferAsync(
}
catch (Exception ex)
{
await InvokeFailedArg(ex).ConfigureAwait(false);
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
}
}

Expand Down Expand Up @@ -456,11 +460,11 @@ await _commitBlockHandler.InvokeEvent(
// before uploading to it.
if (_createMode == StorageResourceCreationPreference.FailIfExists)
{
await InvokeFailedArg(ex).ConfigureAwait(false);
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
}
else // (_createMode == StorageResourceCreateMode.Skip)
{
await InvokeSkippedArg().ConfigureAwait(false);
await InvokeSkippedArgAsync().ConfigureAwait(false);
}
}
catch (Exception ex)
Expand All @@ -481,21 +485,21 @@ await _commitBlockHandler.InvokeEvent(
{
// If the _commitBlockHandler has been disposed before we call to it
// we should at least filter the exception to error handling just in case.
await InvokeFailedArg(ex).ConfigureAwait(false);
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
}
}
}

public override async Task InvokeSkippedArg()
public override async Task InvokeSkippedArgAsync()
{
DisposeHandlers();
await base.InvokeSkippedArg().ConfigureAwait(false);
await base.InvokeSkippedArgAsync().ConfigureAwait(false);
}

public override async Task InvokeFailedArg(Exception ex)
public override async Task InvokeFailedArgAsync(Exception ex)
{
DisposeHandlers();
await base.InvokeFailedArg(ex).ConfigureAwait(false);
await base.InvokeFailedArgAsync(ex).ConfigureAwait(false);
}

internal void DisposeHandlers()
Expand Down
24 changes: 12 additions & 12 deletions sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public static StreamToUriJobPart CreateJobPartFromCheckpoint(
}

/// <summary>
/// Processes the job to job parts
/// Processes the job part to chunks
/// </summary>
/// <returns>The task that's queueing up the chunks</returns>
public override async Task ProcessPartToChunkAsync()
Expand Down Expand Up @@ -244,12 +244,12 @@ await QueueStageBlockRequest(
else
{
// TODO: logging when given the event handler
await InvokeFailedArg(Errors.UnableToGetLength()).ConfigureAwait(false);
await InvokeFailedArgAsync(Errors.UnableToGetLength()).ConfigureAwait(false);
}
}
catch (Exception ex)
{
await InvokeFailedArg(ex).ConfigureAwait(false);
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
}
}

Expand All @@ -275,16 +275,16 @@ await InitialUploadCall(
catch (RequestFailedException r)
when (r.ErrorCode == "BlobAlreadyExists" && _createMode == StorageResourceCreationPreference.SkipIfExists)
{
await InvokeSkippedArg().ConfigureAwait(false);
await InvokeSkippedArgAsync().ConfigureAwait(false);
}
catch (InvalidOperationException i)
when (i.Message.Contains("Cannot overwrite file.") && _createMode == StorageResourceCreationPreference.SkipIfExists)
{
await InvokeSkippedArg().ConfigureAwait(false);
await InvokeSkippedArgAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
await InvokeFailedArg(ex).ConfigureAwait(false);
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
}

// Do not continue if we need to skip or there was an error.
Expand Down Expand Up @@ -381,7 +381,7 @@ internal static CommitChunkHandler.Behaviors GetBlockListCommitHandlerBehaviors(
QueuePutBlockTask = jobPart.QueueStageBlockRequest,
QueueCommitBlockTask = jobPart.CompleteTransferAsync,
ReportProgressInBytes = jobPart.ReportBytesWritten,
InvokeFailedHandler = jobPart.InvokeFailedArg,
InvokeFailedHandler = jobPart.InvokeFailedArgAsync,
};
}
#endregion
Expand Down Expand Up @@ -453,7 +453,7 @@ await _commitBlockHandler.InvokeEvent(
{
// If the _commitBlockHandler has been disposed before we call to it
// we should at least filter the exception to error handling just in case.
await InvokeFailedArg(ex).ConfigureAwait(false);
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
}
}
}
Expand Down Expand Up @@ -555,16 +555,16 @@ private static async Task<Stream> GetOffsetPartitionInternal(
cancellationToken: cancellationToken).ConfigureAwait(false);
}

public override async Task InvokeSkippedArg()
public override async Task InvokeSkippedArgAsync()
{
DisposeHandlers();
await base.InvokeSkippedArg().ConfigureAwait(false);
await base.InvokeSkippedArgAsync().ConfigureAwait(false);
}

public override async Task InvokeFailedArg(Exception ex)
public override async Task InvokeFailedArgAsync(Exception ex)
{
DisposeHandlers();
await base.InvokeFailedArg(ex).ConfigureAwait(false);
await base.InvokeFailedArgAsync(ex).ConfigureAwait(false);
}

internal void DisposeHandlers()
Expand Down
Loading

0 comments on commit 65c1075

Please sign in to comment.