From 6839e373a69bf8e407d462bbe02406348b70e23d Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Wed, 8 Nov 2023 16:06:19 -0800 Subject: [PATCH 1/5] Refactor transfer sizes, add default to overwrite enum --- .../src/DataTransferOptions.cs | 8 ++- .../src/JobPartInternal.cs | 4 +- .../src/ServiceToServiceJobPart.cs | 15 +---- .../src/Shared/DataMovementExtensions.cs | 67 ++++++++++++------- .../src/StorageResourceCreationPreference.cs | 37 +++++----- .../src/StreamToUriJobPart.cs | 15 +---- .../src/TransferJobInternal.cs | 20 ++++-- .../src/UriToStreamJobPart.cs | 15 +---- 8 files changed, 89 insertions(+), 92 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement/src/DataTransferOptions.cs b/sdk/storage/Azure.Storage.DataMovement/src/DataTransferOptions.cs index df2ed1a309b2c..de3e150d61e86 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/DataTransferOptions.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/DataTransferOptions.cs @@ -90,8 +90,12 @@ public bool Equals(DataTransferOptions obj) && InitialTransferSize == obj?.InitialTransferSize; /// - /// Optional to configure overwrite - /// behavior. Will default to . + /// Configures the behavior when a transfer encounters a resource that + /// already exists. + /// + /// Will default to + /// which will be when + /// starting a new transfer and the value used to start a transfer when resuming a transfer. /// public StorageResourceCreationPreference CreationPreference { get; set; } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs b/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs index 3c0544cc2488f..51891c5fca277 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs @@ -168,7 +168,6 @@ internal JobPartInternal( _sourceResource = sourceResource; _destinationResource = destinationResource; _errorHandling = errorHandling; - _createMode = createMode; _failureType = JobPartFailureType.None; _checkpointer = checkpointer; _progressTracker = progressTracker; @@ -189,6 +188,9 @@ internal JobPartInternal( _transferChunkSize = Math.Min( transferChunkSize ?? DataMovementConstants.DefaultChunkSize, _destinationResource.MaxSupportedChunkSize); + // Set the default create mode + _createMode = createMode == StorageResourceCreationPreference.Default ? + StorageResourceCreationPreference.FailIfExists : createMode; Length = length; _chunkTasks = new List>(); diff --git a/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs index f5bc1d1bd8a4f..6b44998a93ed4 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs @@ -61,8 +61,8 @@ private ServiceToServiceJobPart( partNumber: partNumber, sourceResource: sourceResource, destinationResource: destinationResource, - transferChunkSize: job._maximumTransferChunkSize, - initialTransferSize: job._initialTransferSize, + transferChunkSize: transferChunkSize ?? job._maximumTransferChunkSize, + initialTransferSize: initialTransferSize ?? job._initialTransferSize, errorHandling: job._errorMode, createMode: job._creationPreference, checkpointer: job._checkpointer, @@ -78,17 +78,6 @@ private ServiceToServiceJobPart( jobPartStatus: jobPartStatus, length: length) { - // If transfer sizes null at the job level (from options bag) then - // override the default with the provided values if present. - // Else, they were set correctly by the base constructor. - if (!job._maximumTransferChunkSize.HasValue && transferChunkSize.HasValue) - { - _transferChunkSize = transferChunkSize.Value; - } - if (!job._initialTransferSize.HasValue && initialTransferSize.HasValue) - { - _initialTransferSize = initialTransferSize.Value; - } } public async ValueTask DisposeAsync() diff --git a/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementExtensions.cs b/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementExtensions.cs index 51c0281141095..da59b6718a2fb 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementExtensions.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementExtensions.cs @@ -3,7 +3,6 @@ using System; using System.IO; -using System.Threading.Tasks; using Azure.Storage.DataMovement.JobPlan; namespace Azure.Storage.DataMovement @@ -28,15 +27,18 @@ public static StreamToUriJobPart ToJobPartAsync( // Convert stream to job plan header JobPartPlanHeader header = JobPartPlanHeader.Deserialize(planFileStream); - DataTransferStatus jobPartStatus = header.JobPartStatus; + // Override header values if options were specified by user. + long initialTransferSize = baseJob._initialTransferSize ?? header.InitialTransferSize; + long transferChunkSize = baseJob._maximumTransferChunkSize ?? header.ChunkSize; + StreamToUriJobPart jobPart = StreamToUriJobPart.CreateJobPartFromCheckpoint( job: baseJob, partNumber: Convert.ToInt32(header.PartNumber), sourceResource: sourceResource, destinationResource: destinationResource, - jobPartStatus: jobPartStatus, - initialTransferSize: header.InitialTransferSize, - transferChunkSize: header.ChunkSize); + jobPartStatus: header.JobPartStatus, + initialTransferSize: initialTransferSize, + transferChunkSize: transferChunkSize); jobPart.VerifyJobPartPlanHeader(header); @@ -53,15 +55,18 @@ public static ServiceToServiceJobPart ToJobPartAsync( // Convert stream to job plan header JobPartPlanHeader header = JobPartPlanHeader.Deserialize(planFileStream); - DataTransferStatus jobPartStatus = header.JobPartStatus; + // Override header values if options were specified by user. + long initialTransferSize = baseJob._initialTransferSize ?? header.InitialTransferSize; + long transferChunkSize = baseJob._maximumTransferChunkSize ?? header.ChunkSize; + ServiceToServiceJobPart jobPart = ServiceToServiceJobPart.CreateJobPartFromCheckpoint( job: baseJob, partNumber: Convert.ToInt32(header.PartNumber), sourceResource: sourceResource, destinationResource: destinationResource, - jobPartStatus: jobPartStatus, - initialTransferSize: header.InitialTransferSize, - transferChunkSize: header.ChunkSize); + jobPartStatus: header.JobPartStatus, + initialTransferSize: initialTransferSize, + transferChunkSize: transferChunkSize); jobPart.VerifyJobPartPlanHeader(header); @@ -78,15 +83,18 @@ public static UriToStreamJobPart ToJobPartAsync( // Convert stream to job plan header JobPartPlanHeader header = JobPartPlanHeader.Deserialize(planFileStream); - DataTransferStatus jobPartStatus = header.JobPartStatus; + // Override header values if options were specified by user. + long initialTransferSize = baseJob._initialTransferSize ?? header.InitialTransferSize; + long transferChunkSize = baseJob._maximumTransferChunkSize ?? header.ChunkSize; + UriToStreamJobPart jobPart = UriToStreamJobPart.CreateJobPartFromCheckpoint( job: baseJob, partNumber: Convert.ToInt32(header.PartNumber), sourceResource: sourceResource, destinationResource: destinationResource, - jobPartStatus: jobPartStatus, - initialTransferSize: header.InitialTransferSize, - transferChunkSize: header.ChunkSize); + jobPartStatus: header.JobPartStatus, + initialTransferSize: initialTransferSize, + transferChunkSize: transferChunkSize); jobPart.VerifyJobPartPlanHeader(header); @@ -107,15 +115,18 @@ public static StreamToUriJobPart ToJobPartAsync( string childSourceName = childSourcePath.Substring(sourceResource.Uri.AbsoluteUri.Length + 1); string childDestinationPath = header.DestinationPath; string childDestinationName = childDestinationPath.Substring(destinationResource.Uri.AbsoluteUri.Length + 1); - DataTransferStatus jobPartStatus = header.JobPartStatus; + // Override header values if options were specified by user. + long initialTransferSize = baseJob._initialTransferSize ?? header.InitialTransferSize; + long transferChunkSize = baseJob._maximumTransferChunkSize ?? header.ChunkSize; + StreamToUriJobPart jobPart = StreamToUriJobPart.CreateJobPartFromCheckpoint( job: baseJob, partNumber: Convert.ToInt32(header.PartNumber), sourceResource: sourceResource.GetStorageResourceReference(childSourceName), destinationResource: destinationResource.GetStorageResourceReference(childDestinationName), - jobPartStatus: jobPartStatus, - initialTransferSize: header.InitialTransferSize, - transferChunkSize: header.ChunkSize); + jobPartStatus: header.JobPartStatus, + initialTransferSize: initialTransferSize, + transferChunkSize: transferChunkSize); jobPart.VerifyJobPartPlanHeader(header); @@ -134,15 +145,18 @@ public static ServiceToServiceJobPart ToJobPartAsync( string childSourcePath = header.SourcePath; string childDestinationPath = header.DestinationPath; - DataTransferStatus jobPartStatus = header.JobPartStatus; + // Override header values if options were specified by user. + long initialTransferSize = baseJob._initialTransferSize ?? header.InitialTransferSize; + long transferChunkSize = baseJob._maximumTransferChunkSize ?? header.ChunkSize; + ServiceToServiceJobPart jobPart = ServiceToServiceJobPart.CreateJobPartFromCheckpoint( job: baseJob, partNumber: Convert.ToInt32(header.PartNumber), sourceResource: sourceResource.GetStorageResourceReference(childSourcePath.Substring(sourceResource.Uri.AbsoluteUri.Length + 1)), destinationResource: destinationResource.GetStorageResourceReference(childDestinationPath.Substring(destinationResource.Uri.AbsoluteUri.Length + 1)), - jobPartStatus: jobPartStatus, - initialTransferSize: header.InitialTransferSize, - transferChunkSize: header.ChunkSize); + jobPartStatus: header.JobPartStatus, + initialTransferSize: initialTransferSize, + transferChunkSize: transferChunkSize); jobPart.VerifyJobPartPlanHeader(header); @@ -164,15 +178,18 @@ public static UriToStreamJobPart ToJobPartAsync( string childSourceName = childSourcePath.Substring(sourceResource.Uri.AbsoluteUri.Length + 1); string childDestinationPath = header.DestinationPath; string childDestinationName = childDestinationPath.Substring(destinationResource.Uri.AbsoluteUri.Length + 1); - DataTransferStatus jobPartStatus = header.JobPartStatus; + // Override header values if options were specified by user. + long initialTransferSize = baseJob._initialTransferSize ?? header.InitialTransferSize; + long transferChunkSize = baseJob._maximumTransferChunkSize ?? header.ChunkSize; + UriToStreamJobPart jobPart = UriToStreamJobPart.CreateJobPartFromCheckpoint( job: baseJob, partNumber: Convert.ToInt32(header.PartNumber), sourceResource: sourceResource.GetStorageResourceReference(childSourceName), destinationResource: destinationResource.GetStorageResourceReference(childDestinationName), - jobPartStatus: jobPartStatus, - initialTransferSize: header.InitialTransferSize, - transferChunkSize: header.ChunkSize); + jobPartStatus: header.JobPartStatus, + initialTransferSize: initialTransferSize, + transferChunkSize: transferChunkSize); jobPart.VerifyJobPartPlanHeader(header); diff --git a/sdk/storage/Azure.Storage.DataMovement/src/StorageResourceCreationPreference.cs b/sdk/storage/Azure.Storage.DataMovement/src/StorageResourceCreationPreference.cs index e77230b529dea..48092b6132fa8 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/StorageResourceCreationPreference.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/StorageResourceCreationPreference.cs @@ -1,39 +1,38 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -using System; -using System.Collections.Generic; -using System.Text; namespace Azure.Storage.DataMovement { /// - /// Defines how creating a transfer file resource should go - /// if the resource already exists or does not exist. + /// Defines the behavior when a transfer resource already exists. /// public enum StorageResourceCreationPreference { /// - /// If the file/blob already exists in the destination path, a failure will be thrown. - /// All parallel downloads in progress will finish, but no further - /// files in the directory to download will continue. - /// - /// If ErrorHandlingOptions.ContinueOnFailure is enabled, then this will get overrided - /// and the transfer will complete regardless of failure. + /// The default value will be when starting a new transfer + /// and will be set to the value specified on start when resuming a transfer. + /// + Default = 0, + + /// + /// If the resource already exists in the destination path, a failure will be thrown. + /// + /// The value for will determine if + /// the transfer continues after the failure or not. /// - FailIfExists = 0, + FailIfExists = 1, /// - /// Overwrites the file if it already exists. No error will be thrown. + /// Overwrites the resource if it already exists. No error will be thrown. /// - OverwriteIfExists = 1, + OverwriteIfExists = 2, /// - /// If the file/blob already exists in the destination path, no failure will be thrown. - /// The file will simply be skipped over and other parallel downloads in progress - /// will finish and the rest of the files in the directory to download will continue. + /// If the resource already exists in the destination path, no failure will be thrown. + /// The resource will simply be skipped over and the transfer will continue. /// - /// If ErrorHandlingOptions.StopOnAnyFailures is set, the download will still be skipped. + /// If ErrorHandlingOptions.StopOnAnyFailures is set, the resource will still be skipped. /// - SkipIfExists = 2, + SkipIfExists = 3, } } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs index f84fb697597c7..ed5f4ea41b93a 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs @@ -61,8 +61,8 @@ private StreamToUriJobPart( partNumber: partNumber, sourceResource: sourceResource, destinationResource: destinationResource, - transferChunkSize: job._maximumTransferChunkSize, - initialTransferSize: job._initialTransferSize, + transferChunkSize: transferChunkSize ?? job._maximumTransferChunkSize, + initialTransferSize: initialTransferSize ?? job._initialTransferSize, errorHandling: job._errorMode, createMode: job._creationPreference, checkpointer: job._checkpointer, @@ -78,17 +78,6 @@ private StreamToUriJobPart( jobPartStatus: jobPartStatus, length: length) { - // If transfer sizes null at the job level (from options bag) then - // override the default with the provided values if present. - // Else, they were set correctly by the base constructor. - if (!job._maximumTransferChunkSize.HasValue && transferChunkSize.HasValue) - { - _transferChunkSize = transferChunkSize.Value; - } - if (!job._initialTransferSize.HasValue && initialTransferSize.HasValue) - { - _initialTransferSize = initialTransferSize.Value; - } } public async ValueTask DisposeAsync() diff --git a/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs b/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs index 182125ece7ee2..ec99f39ae8bce 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs @@ -142,6 +142,8 @@ private TransferJobInternal( QueueChunkTaskInternal queueChunkTask, TransferCheckpointer checkPointer, DataTransferErrorMode errorHandling, + long? initialTransferSize, + long? maximumTransferChunkSize, StorageResourceCreationPreference creationPreference, ArrayPool arrayPool, SyncAsyncEventHandler statusEventHandler, @@ -154,9 +156,7 @@ private TransferJobInternal( _dataTransfer = dataTransfer ?? throw Errors.ArgumentNull(nameof(dataTransfer)); _dataTransfer.TransferStatus.TrySetTransferStateChange(DataTransferState.Queued); - _errorMode = errorHandling; _checkpointer = checkPointer; - _creationPreference = creationPreference; QueueChunkTask = queueChunkTask; _arrayPool = arrayPool; _jobParts = new List(); @@ -164,6 +164,14 @@ private TransferJobInternal( _pendingJobParts = 0; _cancellationToken = dataTransfer._state.CancellationTokenSource.Token; + // These options come straight from user-provided options bags and are saved + // as-is on the job. They may be adjusted with the defaults on job part + // construction (regular or from checkpoint). + _initialTransferSize = initialTransferSize; + _maximumTransferChunkSize = maximumTransferChunkSize; + _errorMode = errorHandling; + _creationPreference = creationPreference; + JobPartStatusEvents += JobPartEvent; TransferStatusEventHandler = statusEventHandler; TransferFailedEventHandler = failedEventHandler; @@ -189,6 +197,8 @@ internal TransferJobInternal( queueChunkTask, checkpointer, errorHandling, + transferOptions.InitialTransferSize, + transferOptions.MaximumTransferChunkSize, transferOptions.CreationPreference, arrayPool, transferOptions.GetTransferStatus(), @@ -200,8 +210,6 @@ internal TransferJobInternal( _sourceResource = sourceResource; _destinationResource = destinationResource; _isSingleResource = true; - _initialTransferSize = transferOptions?.InitialTransferSize; - _maximumTransferChunkSize = transferOptions?.MaximumTransferChunkSize; _progressTracker = new TransferProgressTracker(transferOptions?.ProgressHandlerOptions); } @@ -222,6 +230,8 @@ internal TransferJobInternal( queueChunkTask, checkpointer, errorHandling, + transferOptions.InitialTransferSize, + transferOptions.MaximumTransferChunkSize, transferOptions.CreationPreference, arrayPool, transferOptions.GetTransferStatus(), @@ -233,8 +243,6 @@ internal TransferJobInternal( _sourceResourceContainer = sourceResource; _destinationResourceContainer = destinationResource; _isSingleResource = false; - _initialTransferSize = transferOptions?.InitialTransferSize; - _maximumTransferChunkSize = transferOptions?.MaximumTransferChunkSize; _progressTracker = new TransferProgressTracker(transferOptions?.ProgressHandlerOptions); } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs index b0c10b2d8c6ba..f5d9f5467fda2 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs @@ -63,8 +63,8 @@ private UriToStreamJobPart( partNumber: partNumber, sourceResource: sourceResource, destinationResource: destinationResource, - transferChunkSize: job._maximumTransferChunkSize, - initialTransferSize: job._initialTransferSize, + transferChunkSize: transferChunkSize ?? job._maximumTransferChunkSize, + initialTransferSize: initialTransferSize ?? job._initialTransferSize, errorHandling: job._errorMode, createMode: job._creationPreference, checkpointer: job._checkpointer, @@ -80,17 +80,6 @@ private UriToStreamJobPart( jobPartStatus: jobPartStatus, length: length) { - // If transfer sizes null at the job level (from options bag) then - // override the default with the provided values if present. - // Else, they were set correctly by the base constructor. - if (!job._maximumTransferChunkSize.HasValue && transferChunkSize.HasValue) - { - _transferChunkSize = transferChunkSize.Value; - } - if (!job._initialTransferSize.HasValue && initialTransferSize.HasValue) - { - _initialTransferSize = initialTransferSize.Value; - } } public async ValueTask DisposeAsync() From d0ab5fcbc32d8f409b531be77706e1be5fdfcf05 Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Wed, 8 Nov 2023 18:05:45 -0800 Subject: [PATCH 2/5] Update checkpoint schema --- .../src/JobPlan/JobPartPlanHeader.cs | 23 +++++++++--------- .../src/Shared/DataMovementConstants.cs | 4 +-- .../src/Shared/DataMovementExtensions.cs | 9 +------ .../src/Shared/Errors.DataMovement.cs | 5 ---- .../tests/CheckpointerTesting.cs | 5 ++-- .../tests/JobPartPlanHeaderTests.cs | 9 +++++-- .../SampleJobPartPlanFile.b3.ndmpart | Bin 151 -> 151 bytes 7 files changed, 24 insertions(+), 31 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPartPlanHeader.cs b/sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPartPlanHeader.cs index 85b7db205362e..59c357dc5c4be 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPartPlanHeader.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/JobPlan/JobPartPlanHeader.cs @@ -51,12 +51,12 @@ internal class JobPartPlanHeader public string DestinationPath; /// - /// Whether the destination should be overriden or not. + /// The resource creation preference. /// - public bool Overwrite; + public StorageResourceCreationPreference CreatePreference; /// - /// Ths intial transfer size for the transfer. + /// Ths initial transfer size for the transfer. /// public long InitialTransferSize; @@ -84,7 +84,7 @@ public JobPartPlanHeader( string destinationTypeId, string sourcePath, string destinationPath, - bool overwrite, + StorageResourceCreationPreference createPreference, long initialTransferSize, long chunkSize, byte priority, @@ -133,7 +133,7 @@ public JobPartPlanHeader( DestinationTypeId = destinationTypeId; SourcePath = sourcePath; DestinationPath = destinationPath; - Overwrite = overwrite; + CreatePreference = createPreference; InitialTransferSize = initialTransferSize; ChunkSize = chunkSize; Priority = priority; @@ -173,8 +173,8 @@ public void Serialize(Stream stream) byte[] destinationPathBytes = Encoding.UTF8.GetBytes(DestinationPath); writer.WriteVariableLengthFieldInfo(destinationPathBytes.Length, ref currentVariableLengthIndex); - // Overwrite - writer.Write(Overwrite); + // CreatePreference + writer.Write((byte)CreatePreference); // InitialTransferSize writer.Write(InitialTransferSize); @@ -233,9 +233,8 @@ public static JobPartPlanHeader Deserialize(Stream stream) int destinationPathOffset = reader.ReadInt32(); int destinationPathLength = reader.ReadInt32(); - // Overwrite - byte overwriteByte = reader.ReadByte(); - bool overwrite = Convert.ToBoolean(overwriteByte); + // CreatePreference + StorageResourceCreationPreference createPreference = (StorageResourceCreationPreference)reader.ReadByte(); // InitialTransferSize long initialTransferSize = reader.ReadInt64(); @@ -277,7 +276,7 @@ public static JobPartPlanHeader Deserialize(Stream stream) destinationTypeId, sourcePath, destinationPath, - overwrite, + createPreference, initialTransferSize, chunkSize, priority, @@ -303,7 +302,7 @@ internal bool Equals(JobPartPlanHeader other) (DestinationTypeId == other.DestinationTypeId) && (SourcePath == other.SourcePath) && (DestinationPath == other.DestinationPath) && - (Overwrite == other.Overwrite) && + (CreatePreference == other.CreatePreference) && (InitialTransferSize == other.InitialTransferSize) && (ChunkSize == other.ChunkSize) && (Priority == other.Priority) && diff --git a/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementConstants.cs b/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementConstants.cs index 9eb0aaf20b33f..a28ca12db63f7 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementConstants.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementConstants.cs @@ -126,8 +126,8 @@ internal static class JobPartPlanFile internal const int SourcePathLengthIndex = SourcePathOffsetIndex + IntSizeInBytes; internal const int DestinationPathOffsetIndex = SourcePathLengthIndex + IntSizeInBytes; internal const int DestinationPathLengthIndex = DestinationPathOffsetIndex + IntSizeInBytes; - internal const int OverwriteIndex = DestinationPathLengthIndex + IntSizeInBytes; - internal const int InitialTransferSizeIndex = OverwriteIndex + OneByte; + internal const int CreatePreferenceIndex = DestinationPathLengthIndex + IntSizeInBytes; + internal const int InitialTransferSizeIndex = CreatePreferenceIndex + OneByte; internal const int ChunkSizeIndex = InitialTransferSizeIndex + LongSizeInBytes; internal const int PriorityIndex = ChunkSizeIndex + LongSizeInBytes; internal const int JobPartStatusIndex = PriorityIndex + OneByte; diff --git a/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementExtensions.cs b/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementExtensions.cs index da59b6718a2fb..5be0e9e2bf913 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementExtensions.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementExtensions.cs @@ -214,7 +214,7 @@ internal static JobPartPlanHeader ToJobPartPlanHeader(this JobPartInternal jobPa destinationTypeId: jobPart._destinationResource.ResourceId, sourcePath: sourcePath, destinationPath: destinationPath, - overwrite: jobPart._createMode == StorageResourceCreationPreference.OverwriteIfExists, + createPreference: jobPart._createMode, initialTransferSize: jobPart._initialTransferSize, chunkSize: jobPart._transferChunkSize, priority: 0, // TODO: add priority feature @@ -254,13 +254,6 @@ internal static void VerifyJobPartPlanHeader(this JobPartInternal jobPart, JobPa { throw Errors.MismatchResumeTransferArguments(nameof(header.DestinationPath), header.DestinationPath, passedDestinationPath); } - - // Check CreateMode / Overwrite - if ((header.Overwrite && jobPart._createMode != StorageResourceCreationPreference.OverwriteIfExists) || - (!header.Overwrite && jobPart._createMode == StorageResourceCreationPreference.OverwriteIfExists)) - { - throw Errors.MismatchResumeCreateMode(header.Overwrite, jobPart._createMode); - } } } } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/Shared/Errors.DataMovement.cs b/sdk/storage/Azure.Storage.DataMovement/src/Shared/Errors.DataMovement.cs index 8795df72decf9..85e9e193ac703 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/Shared/Errors.DataMovement.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/Shared/Errors.DataMovement.cs @@ -87,11 +87,6 @@ public static ArgumentException MismatchResumeTransferArguments(string elementNa $"Checkpointer Value: {checkpointerValue}\n" + $"New Value: {passedValue}"); - public static ArgumentException MismatchResumeCreateMode(bool checkpointerValue, StorageResourceCreationPreference passedValue) - => new ArgumentException($"Mismatch Value to Resume Job: The value to overwrite / create files when they exist does not match the stored value in the transfer checkpointer. Please ensure the value passed to resume the transfer matches the value in order to prevent overwriting or failing files.\n" + - $"Checkpointer Value to overwrite was set to {checkpointerValue.ToString()}.\n" + - $"The value passed in was {passedValue.ToString()}"); - public static InvalidOperationException SingleDownloadLengthMismatch(long expectedLength, long actualLength) => new InvalidOperationException($"Download length {actualLength} did not match expected length {expectedLength}."); diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/CheckpointerTesting.cs b/sdk/storage/Azure.Storage.DataMovement/tests/CheckpointerTesting.cs index 17787870cccdb..5f81ef296a35c 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/CheckpointerTesting.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/CheckpointerTesting.cs @@ -21,6 +21,7 @@ internal static class CheckpointerTesting internal const string DefaultDestinationTypeId = "BlockBlob"; internal const string DefaultDestinationPath = "C:/sample-destination"; internal const string DefaultWebDestinationPath = "https://example.com/destination"; + internal const StorageResourceCreationPreference DefaultCreatePreference = StorageResourceCreationPreference.FailIfExists; internal const long DefaultInitialTransferSize = 32 * Constants.MB; internal const long DefaultChunkSize = 4 * Constants.MB; internal const byte DefaultPriority = 0; @@ -41,7 +42,7 @@ internal static JobPartPlanHeader CreateDefaultJobPartHeader( string destinationTypeId = DefaultDestinationTypeId, string sourcePath = DefaultSourcePath, string destinationPath = DefaultDestinationPath, - bool overwrite = false, + StorageResourceCreationPreference createPreference = DefaultCreatePreference, long initialTransferSize = DefaultInitialTransferSize, long chunkSize = DefaultChunkSize, byte priority = DefaultPriority, @@ -62,7 +63,7 @@ internal static JobPartPlanHeader CreateDefaultJobPartHeader( destinationTypeId, sourcePath, destinationPath, - overwrite, + createPreference, initialTransferSize, chunkSize, priority, diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/JobPartPlanHeaderTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/JobPartPlanHeaderTests.cs index 087bcc27f643d..4241bbc29d7d5 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/JobPartPlanHeaderTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/JobPartPlanHeaderTests.cs @@ -28,7 +28,7 @@ public void Ctor() Assert.AreEqual(DefaultDestinationTypeId, header.DestinationTypeId); Assert.AreEqual(DefaultSourcePath, header.SourcePath); Assert.AreEqual(DefaultDestinationPath, header.DestinationPath); - Assert.IsFalse(header.Overwrite); + Assert.AreEqual(DefaultCreatePreference, header.CreatePreference); Assert.AreEqual(DefaultInitialTransferSize, header.InitialTransferSize); Assert.AreEqual(DefaultChunkSize, header.ChunkSize); Assert.AreEqual(DefaultPriority, header.Priority); @@ -55,6 +55,11 @@ public void Serialize() CollectionAssert.AreEqual(expected, actual); } + + //using (FileStream fs = File.OpenWrite(@"D:\azure-sdk-for-net\sdk\storage\Azure.Storage.DataMovement\tests\Resources\SampleJobPartPlanFile.b3.ndmpart")) + //{ + // header.Serialize(fs); + //} } [Test] @@ -146,7 +151,7 @@ private void DeserializeAndVerify( Assert.AreEqual(DefaultDestinationTypeId, deserializedHeader.DestinationTypeId); Assert.AreEqual(DefaultSourcePath, deserializedHeader.SourcePath); Assert.AreEqual(DefaultDestinationPath, deserializedHeader.DestinationPath); - Assert.IsFalse(deserializedHeader.Overwrite); + Assert.AreEqual(DefaultCreatePreference, deserializedHeader.CreatePreference); Assert.AreEqual(DefaultInitialTransferSize, deserializedHeader.InitialTransferSize); Assert.AreEqual(DefaultChunkSize, deserializedHeader.ChunkSize); Assert.AreEqual(DefaultPriority, deserializedHeader.Priority); diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/Resources/SampleJobPartPlanFile.b3.ndmpart b/sdk/storage/Azure.Storage.DataMovement/tests/Resources/SampleJobPartPlanFile.b3.ndmpart index 3bcf786284a646c1f8d1986ee989384a35e5a674..9b01bc97cdd22c0b9b7fcb49f1711b246cb3d6ab 100644 GIT binary patch delta 11 ScmbQvIGu4q3?t*jSStV&u>(f{ delta 11 ScmbQvIGu4q3?swDSStV&tpi2? From f61e4290b2bf09fd2f63acc7943d3d010997af3a Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Thu, 9 Nov 2023 13:05:44 -0800 Subject: [PATCH 3/5] Consume create preference from checkpoint --- .../src/DataTransferOptions.cs | 5 +- .../src/ServiceToServiceJobPart.cs | 52 +++++++++++++++---- .../src/Shared/DataMovementExtensions.cs | 36 ++++++++++--- .../src/StorageResourceCreationPreference.cs | 7 ++- .../src/StreamToUriJobPart.cs | 52 +++++++++++++++---- .../src/UriToStreamJobPart.cs | 52 +++++++++++++++---- .../tests/JobPartPlanHeaderTests.cs | 5 -- 7 files changed, 168 insertions(+), 41 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement/src/DataTransferOptions.cs b/sdk/storage/Azure.Storage.DataMovement/src/DataTransferOptions.cs index de3e150d61e86..0b8fb84930b7c 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/DataTransferOptions.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/DataTransferOptions.cs @@ -95,7 +95,10 @@ public bool Equals(DataTransferOptions obj) /// /// Will default to /// which will be when - /// starting a new transfer and the value used to start a transfer when resuming a transfer. + /// starting a new transfer. + /// When resuming a transfer, the value will default to the value used when first starting + /// the transfer for all resources that were successfully enumerated and the regular default + /// for any remaining resources. /// public StorageResourceCreationPreference CreationPreference { get; set; } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs index 6b44998a93ed4..794b18da85aca 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs @@ -53,16 +53,13 @@ private ServiceToServiceJobPart( int partNumber, StorageResourceItem sourceResource, StorageResourceItem destinationResource, - DataTransferStatus jobPartStatus = default, - long? length = default, - long? initialTransferSize = default, - long? transferChunkSize = default) + long? length = default) : base(dataTransfer: job._dataTransfer, partNumber: partNumber, sourceResource: sourceResource, destinationResource: destinationResource, - transferChunkSize: transferChunkSize ?? job._maximumTransferChunkSize, - initialTransferSize: initialTransferSize ?? job._initialTransferSize, + transferChunkSize: job._maximumTransferChunkSize, + initialTransferSize: job._initialTransferSize, errorHandling: job._errorMode, createMode: job._creationPreference, checkpointer: job._checkpointer, @@ -75,11 +72,46 @@ private ServiceToServiceJobPart( singleTransferEventHandler: job.TransferItemCompletedEventHandler, clientDiagnostics: job.ClientDiagnostics, cancellationToken: job._cancellationToken, - jobPartStatus: jobPartStatus, + jobPartStatus: default, length: length) { } + /// + /// Creating transfer job based on a checkpoint file. + /// + private ServiceToServiceJobPart( + ServiceToServiceTransferJob job, + int partNumber, + StorageResourceItem sourceResource, + StorageResourceItem destinationResource, + DataTransferStatus jobPartStatus, + long initialTransferSize, + long transferChunkSize, + StorageResourceCreationPreference createPreference) + : base(dataTransfer: job._dataTransfer, + partNumber: partNumber, + sourceResource: sourceResource, + destinationResource: destinationResource, + transferChunkSize: transferChunkSize, + initialTransferSize: initialTransferSize, + errorHandling: job._errorMode, + createMode: createPreference, + checkpointer: job._checkpointer, + progressTracker: job._progressTracker, + arrayPool: job.UploadArrayPool, + jobPartEventHandler: job.GetJobPartStatus(), + statusEventHandler: job.TransferStatusEventHandler, + failedEventHandler: job.TransferFailedEventHandler, + skippedEventHandler: job.TransferSkippedEventHandler, + singleTransferEventHandler: job.TransferItemCompletedEventHandler, + clientDiagnostics: job.ClientDiagnostics, + cancellationToken: job._cancellationToken, + jobPartStatus: jobPartStatus, + length: default) + { + } + public async ValueTask DisposeAsync() { await DisposeHandlers().ConfigureAwait(false); @@ -129,7 +161,8 @@ public static ServiceToServiceJobPart CreateJobPartFromCheckpoint( StorageResourceItem destinationResource, DataTransferStatus jobPartStatus, long initialTransferSize, - long transferChunkSize) + long transferChunkSize, + StorageResourceCreationPreference createPreference) { return new ServiceToServiceJobPart( job: job, @@ -138,7 +171,8 @@ public static ServiceToServiceJobPart CreateJobPartFromCheckpoint( destinationResource: destinationResource, jobPartStatus: jobPartStatus, initialTransferSize: initialTransferSize, - transferChunkSize: transferChunkSize); + transferChunkSize: transferChunkSize, + createPreference: createPreference); } public override async Task ProcessPartToChunkAsync() diff --git a/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementExtensions.cs b/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementExtensions.cs index 5be0e9e2bf913..db7154fd34d80 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementExtensions.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementExtensions.cs @@ -30,6 +30,9 @@ public static StreamToUriJobPart ToJobPartAsync( // Override header values if options were specified by user. long initialTransferSize = baseJob._initialTransferSize ?? header.InitialTransferSize; long transferChunkSize = baseJob._maximumTransferChunkSize ?? header.ChunkSize; + StorageResourceCreationPreference createPreference = + baseJob._creationPreference != StorageResourceCreationPreference.Default ? + baseJob._creationPreference : header.CreatePreference; StreamToUriJobPart jobPart = StreamToUriJobPart.CreateJobPartFromCheckpoint( job: baseJob, @@ -38,7 +41,8 @@ public static StreamToUriJobPart ToJobPartAsync( destinationResource: destinationResource, jobPartStatus: header.JobPartStatus, initialTransferSize: initialTransferSize, - transferChunkSize: transferChunkSize); + transferChunkSize: transferChunkSize, + createPreference: createPreference); jobPart.VerifyJobPartPlanHeader(header); @@ -58,6 +62,9 @@ public static ServiceToServiceJobPart ToJobPartAsync( // Override header values if options were specified by user. long initialTransferSize = baseJob._initialTransferSize ?? header.InitialTransferSize; long transferChunkSize = baseJob._maximumTransferChunkSize ?? header.ChunkSize; + StorageResourceCreationPreference createPreference = + baseJob._creationPreference != StorageResourceCreationPreference.Default ? + baseJob._creationPreference : header.CreatePreference; ServiceToServiceJobPart jobPart = ServiceToServiceJobPart.CreateJobPartFromCheckpoint( job: baseJob, @@ -66,7 +73,8 @@ public static ServiceToServiceJobPart ToJobPartAsync( destinationResource: destinationResource, jobPartStatus: header.JobPartStatus, initialTransferSize: initialTransferSize, - transferChunkSize: transferChunkSize); + transferChunkSize: transferChunkSize, + createPreference: createPreference); jobPart.VerifyJobPartPlanHeader(header); @@ -86,6 +94,9 @@ public static UriToStreamJobPart ToJobPartAsync( // Override header values if options were specified by user. long initialTransferSize = baseJob._initialTransferSize ?? header.InitialTransferSize; long transferChunkSize = baseJob._maximumTransferChunkSize ?? header.ChunkSize; + StorageResourceCreationPreference createPreference = + baseJob._creationPreference != StorageResourceCreationPreference.Default ? + baseJob._creationPreference : header.CreatePreference; UriToStreamJobPart jobPart = UriToStreamJobPart.CreateJobPartFromCheckpoint( job: baseJob, @@ -94,7 +105,8 @@ public static UriToStreamJobPart ToJobPartAsync( destinationResource: destinationResource, jobPartStatus: header.JobPartStatus, initialTransferSize: initialTransferSize, - transferChunkSize: transferChunkSize); + transferChunkSize: transferChunkSize, + createPreference: createPreference); jobPart.VerifyJobPartPlanHeader(header); @@ -118,6 +130,9 @@ public static StreamToUriJobPart ToJobPartAsync( // Override header values if options were specified by user. long initialTransferSize = baseJob._initialTransferSize ?? header.InitialTransferSize; long transferChunkSize = baseJob._maximumTransferChunkSize ?? header.ChunkSize; + StorageResourceCreationPreference createPreference = + baseJob._creationPreference != StorageResourceCreationPreference.Default ? + baseJob._creationPreference : header.CreatePreference; StreamToUriJobPart jobPart = StreamToUriJobPart.CreateJobPartFromCheckpoint( job: baseJob, @@ -126,7 +141,8 @@ public static StreamToUriJobPart ToJobPartAsync( destinationResource: destinationResource.GetStorageResourceReference(childDestinationName), jobPartStatus: header.JobPartStatus, initialTransferSize: initialTransferSize, - transferChunkSize: transferChunkSize); + transferChunkSize: transferChunkSize, + createPreference: createPreference); jobPart.VerifyJobPartPlanHeader(header); @@ -148,6 +164,9 @@ public static ServiceToServiceJobPart ToJobPartAsync( // Override header values if options were specified by user. long initialTransferSize = baseJob._initialTransferSize ?? header.InitialTransferSize; long transferChunkSize = baseJob._maximumTransferChunkSize ?? header.ChunkSize; + StorageResourceCreationPreference createPreference = + baseJob._creationPreference != StorageResourceCreationPreference.Default ? + baseJob._creationPreference : header.CreatePreference; ServiceToServiceJobPart jobPart = ServiceToServiceJobPart.CreateJobPartFromCheckpoint( job: baseJob, @@ -156,7 +175,8 @@ public static ServiceToServiceJobPart ToJobPartAsync( destinationResource: destinationResource.GetStorageResourceReference(childDestinationPath.Substring(destinationResource.Uri.AbsoluteUri.Length + 1)), jobPartStatus: header.JobPartStatus, initialTransferSize: initialTransferSize, - transferChunkSize: transferChunkSize); + transferChunkSize: transferChunkSize, + createPreference: createPreference); jobPart.VerifyJobPartPlanHeader(header); @@ -181,6 +201,9 @@ public static UriToStreamJobPart ToJobPartAsync( // Override header values if options were specified by user. long initialTransferSize = baseJob._initialTransferSize ?? header.InitialTransferSize; long transferChunkSize = baseJob._maximumTransferChunkSize ?? header.ChunkSize; + StorageResourceCreationPreference createPreference = + baseJob._creationPreference != StorageResourceCreationPreference.Default ? + baseJob._creationPreference : header.CreatePreference; UriToStreamJobPart jobPart = UriToStreamJobPart.CreateJobPartFromCheckpoint( job: baseJob, @@ -189,7 +212,8 @@ public static UriToStreamJobPart ToJobPartAsync( destinationResource: destinationResource.GetStorageResourceReference(childDestinationName), jobPartStatus: header.JobPartStatus, initialTransferSize: initialTransferSize, - transferChunkSize: transferChunkSize); + transferChunkSize: transferChunkSize, + createPreference: createPreference); jobPart.VerifyJobPartPlanHeader(header); diff --git a/sdk/storage/Azure.Storage.DataMovement/src/StorageResourceCreationPreference.cs b/sdk/storage/Azure.Storage.DataMovement/src/StorageResourceCreationPreference.cs index 48092b6132fa8..5de453473b787 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/StorageResourceCreationPreference.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/StorageResourceCreationPreference.cs @@ -9,8 +9,11 @@ namespace Azure.Storage.DataMovement public enum StorageResourceCreationPreference { /// - /// The default value will be when starting a new transfer - /// and will be set to the value specified on start when resuming a transfer. + /// The default value will be when + /// starting a new transfer. + /// When resuming a transfer, the value will default to the value used when first starting + /// the transfer for all resources that were successfully enumerated and the regular default + /// for any remaining resources. /// Default = 0, diff --git a/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs index ed5f4ea41b93a..4d6fcc04d49be 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs @@ -53,16 +53,13 @@ private StreamToUriJobPart( int partNumber, StorageResourceItem sourceResource, StorageResourceItem destinationResource, - DataTransferStatus jobPartStatus = default, - long? length = default, - long? initialTransferSize = default, - long? transferChunkSize = default) + long? length = default) : base(dataTransfer: job._dataTransfer, partNumber: partNumber, sourceResource: sourceResource, destinationResource: destinationResource, - transferChunkSize: transferChunkSize ?? job._maximumTransferChunkSize, - initialTransferSize: initialTransferSize ?? job._initialTransferSize, + transferChunkSize: job._maximumTransferChunkSize, + initialTransferSize: job._initialTransferSize, errorHandling: job._errorMode, createMode: job._creationPreference, checkpointer: job._checkpointer, @@ -75,11 +72,46 @@ private StreamToUriJobPart( singleTransferEventHandler: job.TransferItemCompletedEventHandler, clientDiagnostics: job.ClientDiagnostics, cancellationToken: job._cancellationToken, - jobPartStatus: jobPartStatus, + jobPartStatus: default, length: length) { } + /// + /// Creating transfer job based on a checkpoint file. + /// + private StreamToUriJobPart( + StreamToUriTransferJob job, + int partNumber, + StorageResourceItem sourceResource, + StorageResourceItem destinationResource, + DataTransferStatus jobPartStatus, + long initialTransferSize, + long transferChunkSize, + StorageResourceCreationPreference createPreference) + : base(dataTransfer: job._dataTransfer, + partNumber: partNumber, + sourceResource: sourceResource, + destinationResource: destinationResource, + transferChunkSize: transferChunkSize, + initialTransferSize: initialTransferSize, + errorHandling: job._errorMode, + createMode: createPreference, + checkpointer: job._checkpointer, + progressTracker: job._progressTracker, + arrayPool: job.UploadArrayPool, + jobPartEventHandler: job.GetJobPartStatus(), + statusEventHandler: job.TransferStatusEventHandler, + failedEventHandler: job.TransferFailedEventHandler, + skippedEventHandler: job.TransferSkippedEventHandler, + singleTransferEventHandler: job.TransferItemCompletedEventHandler, + clientDiagnostics: job.ClientDiagnostics, + cancellationToken: job._cancellationToken, + jobPartStatus: jobPartStatus, + length: default) + { + } + public async ValueTask DisposeAsync() { await DisposeHandlers().ConfigureAwait(false); @@ -129,7 +161,8 @@ public static StreamToUriJobPart CreateJobPartFromCheckpoint( StorageResourceItem destinationResource, DataTransferStatus jobPartStatus, long initialTransferSize, - long transferChunkSize) + long transferChunkSize, + StorageResourceCreationPreference createPreference) { return new StreamToUriJobPart( job: job, @@ -138,7 +171,8 @@ public static StreamToUriJobPart CreateJobPartFromCheckpoint( destinationResource: destinationResource, jobPartStatus: jobPartStatus, initialTransferSize: initialTransferSize, - transferChunkSize: transferChunkSize); + transferChunkSize: transferChunkSize, + createPreference: createPreference); } /// diff --git a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs index f5d9f5467fda2..7787f1e77a473 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs @@ -55,16 +55,13 @@ private UriToStreamJobPart( int partNumber, StorageResourceItem sourceResource, StorageResourceItem destinationResource, - DataTransferStatus jobPartStatus = default, - long? length = default, - long? initialTransferSize = default, - long? transferChunkSize = default) + long? length = default) : base(dataTransfer: job._dataTransfer, partNumber: partNumber, sourceResource: sourceResource, destinationResource: destinationResource, - transferChunkSize: transferChunkSize ?? job._maximumTransferChunkSize, - initialTransferSize: initialTransferSize ?? job._initialTransferSize, + transferChunkSize: job._maximumTransferChunkSize, + initialTransferSize: job._initialTransferSize, errorHandling: job._errorMode, createMode: job._creationPreference, checkpointer: job._checkpointer, @@ -77,11 +74,46 @@ private UriToStreamJobPart( singleTransferEventHandler: job.TransferItemCompletedEventHandler, clientDiagnostics: job.ClientDiagnostics, cancellationToken: job._cancellationToken, - jobPartStatus: jobPartStatus, + jobPartStatus: default, length: length) { } + /// + /// Creating transfer job based on a checkpoint file. + /// + private UriToStreamJobPart( + UriToStreamTransferJob job, + int partNumber, + StorageResourceItem sourceResource, + StorageResourceItem destinationResource, + DataTransferStatus jobPartStatus, + long initialTransferSize, + long transferChunkSize, + StorageResourceCreationPreference createPreference) + : base(dataTransfer: job._dataTransfer, + partNumber: partNumber, + sourceResource: sourceResource, + destinationResource: destinationResource, + transferChunkSize: transferChunkSize, + initialTransferSize: initialTransferSize, + errorHandling: job._errorMode, + createMode: createPreference, + checkpointer: job._checkpointer, + progressTracker: job._progressTracker, + arrayPool: job.UploadArrayPool, + jobPartEventHandler: job.GetJobPartStatus(), + statusEventHandler: job.TransferStatusEventHandler, + failedEventHandler: job.TransferFailedEventHandler, + skippedEventHandler: job.TransferSkippedEventHandler, + singleTransferEventHandler: job.TransferItemCompletedEventHandler, + clientDiagnostics: job.ClientDiagnostics, + cancellationToken: job._cancellationToken, + jobPartStatus: jobPartStatus, + length: default) + { + } + public async ValueTask DisposeAsync() { await DisposeHandlers().ConfigureAwait(false); @@ -131,7 +163,8 @@ public static UriToStreamJobPart CreateJobPartFromCheckpoint( StorageResourceItem destinationResource, DataTransferStatus jobPartStatus, long initialTransferSize, - long transferChunkSize) + long transferChunkSize, + StorageResourceCreationPreference createPreference) { return new UriToStreamJobPart( job: job, @@ -140,7 +173,8 @@ public static UriToStreamJobPart CreateJobPartFromCheckpoint( destinationResource: destinationResource, jobPartStatus: jobPartStatus, initialTransferSize: initialTransferSize, - transferChunkSize: transferChunkSize); + transferChunkSize: transferChunkSize, + createPreference: createPreference); } /// diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/JobPartPlanHeaderTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/JobPartPlanHeaderTests.cs index 4241bbc29d7d5..c28be33eff62a 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/JobPartPlanHeaderTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/JobPartPlanHeaderTests.cs @@ -55,11 +55,6 @@ public void Serialize() CollectionAssert.AreEqual(expected, actual); } - - //using (FileStream fs = File.OpenWrite(@"D:\azure-sdk-for-net\sdk\storage\Azure.Storage.DataMovement\tests\Resources\SampleJobPartPlanFile.b3.ndmpart")) - //{ - // header.Serialize(fs); - //} } [Test] From 07efd3ceef347302b53b543a3df71a0088fab6a5 Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Thu, 9 Nov 2023 13:59:50 -0800 Subject: [PATCH 4/5] Export API --- .../api/Azure.Storage.DataMovement.net6.0.cs | 7 ++++--- .../api/Azure.Storage.DataMovement.netstandard2.0.cs | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs index 32d9e6fc5850c..9647fee5b873e 100644 --- a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs +++ b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs @@ -142,9 +142,10 @@ public StorageResourceCopyFromUriOptions() { } } public enum StorageResourceCreationPreference { - FailIfExists = 0, - OverwriteIfExists = 1, - SkipIfExists = 2, + Default = 0, + FailIfExists = 1, + OverwriteIfExists = 2, + SkipIfExists = 3, } public abstract partial class StorageResourceItem : Azure.Storage.DataMovement.StorageResource { diff --git a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs index 32d9e6fc5850c..9647fee5b873e 100644 --- a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs +++ b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs @@ -142,9 +142,10 @@ public StorageResourceCopyFromUriOptions() { } } public enum StorageResourceCreationPreference { - FailIfExists = 0, - OverwriteIfExists = 1, - SkipIfExists = 2, + Default = 0, + FailIfExists = 1, + OverwriteIfExists = 2, + SkipIfExists = 3, } public abstract partial class StorageResourceItem : Azure.Storage.DataMovement.StorageResource { From 89759af337d1bf66ac3a8d73ce0abf6f2589bdf0 Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Thu, 9 Nov 2023 17:31:15 -0800 Subject: [PATCH 5/5] Update CHANGELOG --- sdk/storage/Azure.Storage.DataMovement/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/storage/Azure.Storage.DataMovement/CHANGELOG.md b/sdk/storage/Azure.Storage.DataMovement/CHANGELOG.md index 798bb1c7af63c..84b4c779c3fea 100644 --- a/sdk/storage/Azure.Storage.DataMovement/CHANGELOG.md +++ b/sdk/storage/Azure.Storage.DataMovement/CHANGELOG.md @@ -54,7 +54,7 @@ - [BREAKING CHANGE] Renamed `StorageTransferStatus` to `DataTransferStatus` - [BREAKING CHANGE] Changed `DataTransferStatus` from `enum` to a `class`. - [BREAKING CHANGE] Renamed `StorageResourceCreateMode` to `StorageResourceCreationPreference`. -- [BREAKING CHANGE] Renamed `StorageResourceCreationPreference` values from `Fail` to `FailIfExists`, `Overwrite` to `OverwriteIfExists` and `Skip` to `SkipIfExists`. `None` was removed, use `FailIfExists` instead. +- [BREAKING CHANGE] Renamed `StorageResourceCreationPreference` values from `Fail` to `FailIfExists`, `Overwrite` to `OverwriteIfExists`, `Skip` to `SkipIfExists` and `None` to `Default` which will default to `FailIfExists`. - [BREAKING CHANGE] Renamed `DataTransferOptions.CreateMode` to `CreationPreference`. - [BREAKING CHANGE] Changed `StorageTransferProgress` constructor from `public` to `protected internal`. - [BREAKING CHANGE] Renamed `StorageTransferProgress` to `DataTransferProgress`.