From 70567f2c7f3dfe0268ba97cb2e342d3a3e33b2a2 Mon Sep 17 00:00:00 2001 From: Jocelyn <41338290+jaschrep-msft@users.noreply.github.com> Date: Wed, 18 Sep 2024 15:32:30 -0400 Subject: [PATCH] Core `TransferManager` tests (#46014) * Transfermanager public constructor calls DI * testing and minor changes move delegate out of class for easier typing. privatize some fields testing transfer manager item transfer with mocked dependencies. * container test * usings * rename test * address nitpicks * fix --- .../tests/Shared/MockExtensions.cs | 12 + .../src/ChannelProcessing.cs | 5 +- .../src/JobBuilder.cs | 12 +- .../src/TransferManager.cs | 34 +- .../Azure.Storage.DataMovement.Tests.csproj | 4 +- .../tests/Models/StepProcessor.cs | 74 +++ .../tests/TransferManagerTests.cs | 440 ++++++++++++++++++ 7 files changed, 557 insertions(+), 24 deletions(-) create mode 100644 sdk/storage/Azure.Storage.DataMovement/tests/Models/StepProcessor.cs create mode 100644 sdk/storage/Azure.Storage.DataMovement/tests/TransferManagerTests.cs diff --git a/sdk/storage/Azure.Storage.Common/tests/Shared/MockExtensions.cs b/sdk/storage/Azure.Storage.Common/tests/Shared/MockExtensions.cs index e4cc6cea73ab6..2ff7e8aa078cf 100644 --- a/sdk/storage/Azure.Storage.Common/tests/Shared/MockExtensions.cs +++ b/sdk/storage/Azure.Storage.Common/tests/Shared/MockExtensions.cs @@ -62,5 +62,17 @@ public static void BasicSetup(this Mock stream, bool canRead, bool canWr .Throws(); } } + + public static void VerifyDisposal(this Mock mock) + where T : class, IDisposable + { + mock.Verify(m => m.Dispose(), Times.Once); + } + + public static void VerifyAsyncDisposal(this Mock mock) + where T : class, IAsyncDisposable + { + mock.Verify(m => m.DisposeAsync(), Times.Once); + } } } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/ChannelProcessing.cs b/sdk/storage/Azure.Storage.DataMovement/src/ChannelProcessing.cs index 17d5180a3dd7a..e7526c80c1dce 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/ChannelProcessing.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/ChannelProcessing.cs @@ -7,10 +7,11 @@ using System.Threading.Channels; using System.Threading.Tasks; using Azure.Storage.Common; -using static Azure.Storage.DataMovement.ChannelProcessing; namespace Azure.Storage.DataMovement; +internal delegate Task ProcessAsync(T item, CancellationToken cancellationToken); + internal interface IProcessor : IDisposable { ValueTask QueueAsync(TItem item, CancellationToken cancellationToken = default); @@ -19,8 +20,6 @@ internal interface IProcessor : IDisposable internal static class ChannelProcessing { - public delegate Task ProcessAsync(T item, CancellationToken cancellationToken); - public static IProcessor NewProcessor(int parallelism) { Argument.AssertInRange(parallelism, 1, int.MaxValue, nameof(parallelism)); diff --git a/sdk/storage/Azure.Storage.DataMovement/src/JobBuilder.cs b/sdk/storage/Azure.Storage.DataMovement/src/JobBuilder.cs index a7f483f70a1b1..ade8ed94c82e6 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/JobBuilder.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/JobBuilder.cs @@ -11,7 +11,7 @@ namespace Azure.Storage.DataMovement; internal class JobBuilder { - internal readonly ArrayPool _arrayPool; + private readonly ArrayPool _arrayPool; /// /// Defines the error handling method to follow when an error is seen. Defaults to @@ -19,9 +19,15 @@ internal class JobBuilder /// /// See . /// - internal readonly DataTransferErrorMode _errorHandling; + private readonly DataTransferErrorMode _errorHandling; - internal ClientDiagnostics ClientDiagnostics { get; } + private ClientDiagnostics ClientDiagnostics { get; } + + /// + /// Mocking constructor. + /// + protected JobBuilder() + { } internal JobBuilder( ArrayPool arrayPool, diff --git a/sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs b/sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs index 5195f68b068e4..d7c1aafd0e771 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs @@ -44,6 +44,8 @@ public class TransferManager : IAsyncDisposable private readonly CancellationTokenSource _cancellationTokenSource = new(); private CancellationToken _cancellationToken => _cancellationTokenSource.Token; + private readonly Func _generateTransferId; + /// /// Protected constructor for mocking. /// @@ -55,20 +57,18 @@ protected TransferManager() /// /// Options that will apply to all transfers started by this TransferManager. public TransferManager(TransferManagerOptions options = default) - { - _jobsProcessor = ChannelProcessing.NewProcessor(parallelism: 1); - _partsProcessor = ChannelProcessing.NewProcessor(DataMovementConstants.MaxJobPartReaders); - _chunksProcessor = ChannelProcessing.NewProcessor>(options?.MaximumConcurrency ?? DataMovementConstants.MaxJobChunkTasks); - _jobBuilder = new( - ArrayPool.Shared, + : this( + ChannelProcessing.NewProcessor(parallelism: 1), + ChannelProcessing.NewProcessor(DataMovementConstants.MaxJobPartReaders), + ChannelProcessing.NewProcessor>(options?.MaximumConcurrency ?? DataMovementConstants.MaxJobChunkTasks), + new(ArrayPool.Shared, options?.ErrorHandling ?? DataTransferErrorMode.StopOnAnyFailure, - new ClientDiagnostics(options?.ClientOptions ?? ClientOptions.Default)); - TransferCheckpointStoreOptions checkpointerOptions = options?.CheckpointerOptions != default ? new TransferCheckpointStoreOptions(options.CheckpointerOptions) : default; - _checkpointer = checkpointerOptions != default ? checkpointerOptions.GetCheckpointer() : CreateDefaultCheckpointer(); - _resumeProviders = options?.ResumeProviders != null ? new(options.ResumeProviders) : new(); - - ConfigureProcessorCallbacks(); - } + new ClientDiagnostics(options?.ClientOptions ?? ClientOptions.Default)), + (options?.CheckpointerOptions != default ? new TransferCheckpointStoreOptions(options.CheckpointerOptions) : default) + ?.GetCheckpointer() ?? CreateDefaultCheckpointer(), + options?.ResumeProviders != null ? new List(options.ResumeProviders) : new(), + default) + {} /// /// Dependency injection constructor. @@ -79,14 +79,16 @@ internal TransferManager( IProcessor> chunksProcessor, JobBuilder jobBuilder, TransferCheckpointer checkpointer, - ICollection resumeProviders) + ICollection resumeProviders, + Func generateTransferId = default) { _jobsProcessor = jobsProcessor; _partsProcessor = partsProcessor; _chunksProcessor = chunksProcessor; _jobBuilder = jobBuilder; - _resumeProviders = new(resumeProviders); + _resumeProviders = new(resumeProviders ?? new List()); _checkpointer = checkpointer; + _generateTransferId = generateTransferId ?? (() => Guid.NewGuid().ToString()); ConfigureProcessorCallbacks(); } @@ -345,7 +347,7 @@ public virtual async Task StartTransferAsync( transferOptions ??= new DataTransferOptions(); - string transferId = Guid.NewGuid().ToString(); + string transferId = _generateTransferId(); await _checkpointer.AddNewJobAsync( transferId, sourceResource, diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/Azure.Storage.DataMovement.Tests.csproj b/sdk/storage/Azure.Storage.DataMovement/tests/Azure.Storage.DataMovement.Tests.csproj index 8afd7735a0168..b5e3c42359976 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/Azure.Storage.DataMovement.Tests.csproj +++ b/sdk/storage/Azure.Storage.DataMovement/tests/Azure.Storage.DataMovement.Tests.csproj @@ -31,8 +31,8 @@ - - + + diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/Models/StepProcessor.cs b/sdk/storage/Azure.Storage.DataMovement/tests/Models/StepProcessor.cs new file mode 100644 index 0000000000000..4d5287b13d9cc --- /dev/null +++ b/sdk/storage/Azure.Storage.DataMovement/tests/Models/StepProcessor.cs @@ -0,0 +1,74 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace Azure.Storage.DataMovement.Tests +{ + /// + /// Processor that only processes items one by one from + /// its queue only when invoked for each item. + /// + /// + internal class StepProcessor : IProcessor + { + private readonly Queue _queue = new(); + + public int ItemsInQueue => _queue.Count; + + /// + public ProcessAsync Process { get; set; } + + /// + public ValueTask QueueAsync(T item, CancellationToken cancellationToken = default) + { + _queue.Enqueue(item); + return new(Task.CompletedTask); + } + + /// + /// Attmpts to read an item from internal queue, then completes + /// a call to on it. + /// + /// + /// Whether or not an item was successfully read from the queue. + /// + public async ValueTask TryStepAsync(CancellationToken cancellationToken = default) + { + if (_queue.Count > 0) + { + await Process?.Invoke(_queue.Dequeue(), cancellationToken); + return true; + } + else + { + return false; + } + } + + public async ValueTask StepMany(int maxSteps, CancellationToken cancellationToken = default) + { + int steps = 0; + while (steps < maxSteps && await TryStepAsync(cancellationToken)) + { + steps++; + } + return steps; + } + + public async ValueTask StepAll(CancellationToken cancellationToken = default) + { + int steps = 0; + while (await TryStepAsync(cancellationToken)) + { + steps++; + } + return steps; + } + + public void Dispose() + { } + } +} diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/TransferManagerTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/TransferManagerTests.cs new file mode 100644 index 0000000000000..3a9cf04c919da --- /dev/null +++ b/sdk/storage/Azure.Storage.DataMovement/tests/TransferManagerTests.cs @@ -0,0 +1,440 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using Azure.Core; +using Azure.Core.Pipeline; +using Moq; +using NUnit.Framework; + +namespace Azure.Storage.DataMovement.Tests; + +public class TransferManagerTests +{ + [Test] + public async Task BasicProcessorLifetime() + { + Mock> jobsProcessor = new(); + Mock> partsProcessor = new(); + Mock>> chunksProcessor = new(); + + await using (TransferManager _ = new( + jobsProcessor.Object, + partsProcessor.Object, + chunksProcessor.Object, + default, default, default)) + { + jobsProcessor.VerifyTransferManagerCtorInvocations(); + partsProcessor.VerifyTransferManagerCtorInvocations(); + chunksProcessor.VerifyTransferManagerCtorInvocations(); + + jobsProcessor.VerifyNoOtherCalls(); + partsProcessor.VerifyNoOtherCalls(); + chunksProcessor.VerifyNoOtherCalls(); + } + jobsProcessor.VerifyDisposal(); + partsProcessor.VerifyDisposal(); + chunksProcessor.VerifyDisposal(); + + jobsProcessor.VerifyNoOtherCalls(); + partsProcessor.VerifyNoOtherCalls(); + chunksProcessor.VerifyNoOtherCalls(); + } + + [Test] + [Combinatorial] + public async Task BasicItemTransfer( + [Values(1, 5)] int items, + [Values(333, 500, 1024)] int itemSize, + [Values(333, 1024)] int chunkSize) + { + int chunksPerPart = (int)Math.Ceiling((float)itemSize / chunkSize); + // TODO: below should be only `items * chunksPerPart` but can't in some cases due to + // a bug in how work items are processed on multipart uploads. + int expectedChunksInQueue = Math.Max(chunksPerPart-1, 1) * items; + + Uri srcUri = new("file:///foo/bar"); + Uri dstUri = new("https://example.com/fizz/buzz"); + + StepProcessor jobsProcessor = new(); + StepProcessor partsProcessor = new(); + StepProcessor> chunksProcessor = new(); + Mock jobBuilder = new(ArrayPool.Shared, default, new ClientDiagnostics(ClientOptions.Default)); + Mock checkpointer = new(); + + var resources = Enumerable.Range(0, items).Select(_ => + { + Mock srcResource = new(MockBehavior.Strict); + Mock dstResource = new(MockBehavior.Strict); + + (srcResource, dstResource).BasicSetup(srcUri, dstUri, itemSize); + + return (Source: srcResource, Destination: dstResource); + }).ToList(); + + await using TransferManager transferManager = new( + jobsProcessor, + partsProcessor, + chunksProcessor, + jobBuilder.Object, + checkpointer.Object, + default); + + List transfers = new(); + + // queue jobs + foreach ((Mock srcResource, Mock dstResource) in resources) + { + DataTransfer transfer = await transferManager.StartTransferAsync( + srcResource.Object, + dstResource.Object, + new() + { + InitialTransferSize = chunkSize, + MaximumTransferChunkSize = chunkSize, + }); + Assert.That(transfer.HasCompleted, Is.False); + transfers.Add(transfer); + + srcResource.VerifySourceResourceOnQueue(); + dstResource.VerifyDestinationResourceOnQueue(); + srcResource.VerifyNoOtherCalls(); + dstResource.VerifyNoOtherCalls(); + } + Assert.That(jobsProcessor.ItemsInQueue, Is.EqualTo(items), "Error during initial Job queueing."); + + // process jobs + Assert.That(await jobsProcessor.StepAll(), Is.EqualTo(items)); + Assert.That(jobsProcessor.ItemsInQueue, Is.EqualTo(0)); + Assert.That(partsProcessor.ItemsInQueue, Is.EqualTo(items), "Error during Job => Part processing."); + foreach ((Mock srcResource, Mock dstResource) in resources) + { + srcResource.VerifySourceResourceOnJobProcess(); + dstResource.VerifyDestinationResourceOnJobProcess(); + srcResource.VerifyNoOtherCalls(); + dstResource.VerifyNoOtherCalls(); + } + + // process parts + Assert.That(await partsProcessor.StepAll(), Is.EqualTo(items)); + Assert.That(partsProcessor.ItemsInQueue, Is.EqualTo(0)); + Assert.That(chunksProcessor.ItemsInQueue, Is.EqualTo(expectedChunksInQueue), "Error during Part => Chunk processing."); + foreach ((Mock srcResource, Mock dstResource) in resources) + { + srcResource.VerifySourceResourceOnPartProcess(); + dstResource.VerifyDestinationResourceOnPartProcess(); + srcResource.VerifyNoOtherCalls(); + dstResource.VerifyNoOtherCalls(); + } + + // process chunks + Assert.That(await chunksProcessor.StepAll(), Is.EqualTo(expectedChunksInQueue)); + Assert.That(chunksProcessor.ItemsInQueue, Is.EqualTo(0)); + foreach ((Mock srcResource, Mock dstResource) in resources) + { + srcResource.VerifySourceResourceOnChunkProcess(); + dstResource.VerifyDestinationResourceOnChunkProcess(); + srcResource.VerifyNoOtherCalls(); + dstResource.VerifyNoOtherCalls(); + } + + await Task.Delay(20); // TODO flaky that we need this; a random one will often fail without + + foreach (DataTransfer transfer in transfers) + { + Assert.That(transfer.HasCompleted); + } + } + + [Test] + [Combinatorial] + public async Task BasicContainerTransfer( + [Values(1, 5)] int numJobs, + [Values(333, 500, 1024)] int itemSize, + [Values(333, 1024)] int chunkSize) + { + static int GetItemCountFromContainerIndex(int i) => i*i + 1; + + int numJobParts = Enumerable.Range(1, numJobs).Select(GetItemCountFromContainerIndex).Sum(); + int chunksPerPart = (int)Math.Ceiling((float)itemSize / chunkSize); + // TODO: below should be only `items * chunksPerPart` but can't in some cases due to + // a bug in how work items are processed on multipart uploads. + int numChunks = Math.Max(chunksPerPart - 1, 1) * numJobParts; + + Uri srcUri = new("file:///foo/bar"); + Uri dstUri = new("https://example.com/fizz/buzz"); + + StepProcessor jobsProcessor = new(); + StepProcessor partsProcessor = new(); + StepProcessor> chunksProcessor = new(); + Mock jobBuilder = new(ArrayPool.Shared, default, new ClientDiagnostics(ClientOptions.Default)); + Mock checkpointer = new(); + + var resources = Enumerable.Range(1, numJobs).Select(i => + { + Mock srcResource = new(MockBehavior.Strict); + Mock dstResource = new(MockBehavior.Strict); + (srcResource, dstResource).BasicSetup(srcUri, dstUri, GetItemCountFromContainerIndex(i), itemSize); + return (Source: srcResource, Destination: dstResource); + }).ToList(); + + await using TransferManager transferManager = new( + jobsProcessor, + partsProcessor, + chunksProcessor, + jobBuilder.Object, + checkpointer.Object, + default); + + List transfers = new(); + + // queue jobs + foreach ((Mock srcResource, Mock dstResource) in resources) + { + DataTransfer transfer = await transferManager.StartTransferAsync( + srcResource.Object, + dstResource.Object, + new() + { + InitialTransferSize = chunkSize, + MaximumTransferChunkSize = chunkSize, + }); + Assert.That(transfer.HasCompleted, Is.False); + transfers.Add(transfer); + + srcResource.VerifySourceResourceOnQueue(); + dstResource.VerifyDestinationResourceOnQueue(); + srcResource.VerifyNoOtherCalls(); + dstResource.VerifyNoOtherCalls(); + } + Assert.That(jobsProcessor.ItemsInQueue, Is.EqualTo(numJobs), "Error during initial Job queueing."); + + // process jobs + Assert.That(await jobsProcessor.StepAll(), Is.EqualTo(numJobs)); + Assert.That(jobsProcessor.ItemsInQueue, Is.EqualTo(0)); + Assert.That(partsProcessor.ItemsInQueue, Is.EqualTo(numJobParts), "Error during Job => Part processing."); + foreach ((Mock srcResource, Mock dstResource) in resources) + { + srcResource.VerifySourceResourceOnJobProcess(); + dstResource.VerifyDestinationResourceOnJobProcess(); + srcResource.VerifyNoOtherCalls(); + dstResource.VerifyNoOtherCalls(); + } + + // process parts + Assert.That(await partsProcessor.StepAll(), Is.EqualTo(numJobParts)); + Assert.That(partsProcessor.ItemsInQueue, Is.EqualTo(0)); + Assert.That(chunksProcessor.ItemsInQueue, Is.EqualTo(numChunks), "Error during Part => Chunk processing."); + foreach ((Mock srcResource, Mock dstResource) in resources) + { + srcResource.VerifyNoOtherCalls(); + dstResource.VerifyNoOtherCalls(); + } + + // process chunks + Assert.That(await chunksProcessor.StepAll(), Is.EqualTo(numChunks)); + Assert.That(chunksProcessor.ItemsInQueue, Is.EqualTo(0)); + foreach ((Mock srcResource, Mock dstResource) in resources) + { + srcResource.VerifyNoOtherCalls(); + dstResource.VerifyNoOtherCalls(); + } + + await Task.Delay(10); // TODO flaky that we need this; a random one will often fail without + + foreach (DataTransfer transfer in transfers) + { + Assert.That(transfer.HasCompleted); + } + } +} + +internal static partial class MockExtensions +{ + public static void SetupQueueAsync(this Mock> processor, Action onQueue = default) + { + var setup = processor.Setup(p => p.QueueAsync(It.IsNotNull(), It.IsNotNull())) + .Returns(new ValueTask(Task.CompletedTask)); + if (onQueue != default) + { + setup.Callback(onQueue); + } + } + + public static void BasicSetup( + this (Mock Source, Mock Destination) items, + Uri srcUri, + Uri dstUri, + int itemSize = Constants.KB + ) + { + items.Source.Setup(r => r.Uri).Returns(srcUri); + items.Destination.Setup(r => r.Uri).Returns(dstUri); + + items.Source.SetupGet(r => r.ResourceId).Returns("Mock"); + items.Destination.SetupGet(r => r.ResourceId).Returns("Mock"); + + items.Destination.SetupGet(r => r.TransferType).Returns(default(DataTransferOrder)); + items.Destination.SetupGet(r => r.MaxSupportedChunkSize).Returns(Constants.GB); + + items.Source.Setup(r => r.GetPropertiesAsync(It.IsAny())) + .Returns(Task.FromResult(new StorageResourceItemProperties(resourceLength: itemSize, default, default, default))); + + items.Source.Setup(r => r.ReadStreamAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns((position, length, cancellation) + => Task.FromResult(new StorageResourceReadStreamResult( + new Mock().Object, + new HttpRange(position, length), + new(itemSize, default, default, new())))); + items.Destination.Setup(r => r.CopyFromStreamAsync( + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + items.Destination.Setup(r => r.CompleteTransferAsync( + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + } + + public static void BasicSetup( + this (Mock Source, Mock Destination) containers, + Uri srcUri, + Uri dstUri, + int numItems = 5, + int itemSize = Constants.KB + ) + { + var subResources = Enumerable.Range(0, numItems).Select(_ => + { + string name = "/" + Guid.NewGuid().ToString(); + var items = ( + Source: new Mock(MockBehavior.Strict), + Destination: new Mock(MockBehavior.Strict)); + items.BasicSetup(new Uri(srcUri.ToString() + name), new Uri(dstUri.ToString() + name), itemSize); + items.Source.SetupGet(r => r.IsContainer).Returns(false); + return items; + }).ToList(); + + async IAsyncEnumerable SubResourcesAsAsyncEnumerable( + StorageResourceContainer src, + [EnumeratorCancellation] CancellationToken ct) + { + foreach (int i in Enumerable.Range(0, numItems)) + { + yield return await Task.FromResult(subResources[i].Source.Object); + } + } + + containers.Source.SetupGet(r => r.Uri).Returns(srcUri); + containers.Destination.SetupGet(r => r.Uri).Returns(dstUri); + + containers.Source.Setup(r => r.GetStorageResourcesAsync(It.IsAny(), It.IsAny())) + .Returns(SubResourcesAsAsyncEnumerable); + + containers.Destination.Setup(r => r.GetStorageResourceReference(It.IsAny(), It.IsAny())) + .Returns((path, resId) => subResources + .Where(pair => pair.Source.Object.Uri.AbsolutePath.Contains(path)) + .FirstOrDefault().Destination?.Object + ); + } + + public static void VerifyTransferManagerCtorInvocations(this Mock> processor) + { + processor.VerifySet(p => p.Process = It.IsNotNull>(), Times.Once()); + } + + #region StorageResource calls TransferManager processing stages + public static void VerifySourceResourceOnQueue(this Mock srcResource) + { + srcResource.VerifyGet(r => r.Uri); + } + + public static void VerifyDestinationResourceOnQueue(this Mock dstResource) + { + dstResource.VerifyGet(r => r.Uri); + } + + public static void VerifySourceResourceOnQueue(this Mock srcResource) + { + srcResource.VerifyGet(r => r.Uri); + } + + public static void VerifyDestinationResourceOnQueue(this Mock dstResource) + { + dstResource.VerifyGet(r => r.Uri); + } + + public static void VerifySourceResourceOnJobProcess(this Mock srcResource) + { + srcResource.VerifyGet(r => r.Uri); + srcResource.VerifyGet(r => r.ResourceId); + } + + public static void VerifyDestinationResourceOnJobProcess(this Mock dstResource) + { + dstResource.VerifyGet(r => r.Uri); + dstResource.VerifyGet(r => r.ResourceId); + dstResource.VerifyGet(r => r.MaxSupportedChunkSize); + } + + public static void VerifySourceResourceOnJobProcess(this Mock srcResource) + { + srcResource.Verify( + r => r.GetStorageResourcesAsync(It.IsAny(), It.IsAny()), + Times.Once); + srcResource.VerifyGet(r => r.Uri, Times.AtLeastOnce()); + } + + public static void VerifyDestinationResourceOnJobProcess(this Mock dstResource) + { + dstResource.Verify( + r => r.GetStorageResourceReference(It.IsAny(), It.IsAny()), + Times.AtLeastOnce()); + } + + public static void VerifySourceResourceOnPartProcess(this Mock srcResource) + { + srcResource.Verify(r => r.GetPropertiesAsync(It.IsAny()), Times.Once); + // TODO: a bug in multipart uploading can result in the first chunk being uploaded at part process + // verify at most once to ensure there are no more than this bug. + srcResource.Verify(r => r.ReadStreamAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.AtMostOnce); + } + + public static void VerifyDestinationResourceOnPartProcess(this Mock dstResource) + { + dstResource.VerifyGet(r => r.TransferType, Times.AtMost(9999)); + // TODO: a bug in multipart uploading can result in the first chunk being uploaded at part process + // verify at most once to ensure there are no more than this bug. + dstResource.Verify(r => r.CopyFromStreamAsync( + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), + It.IsAny(), It.IsAny()), + Times.AtMostOnce); + } + + public static void VerifySourceResourceOnChunkProcess(this Mock srcResource) + { + srcResource.Verify(r => r.ReadStreamAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.AtLeastOnce); + } + + public static void VerifyDestinationResourceOnChunkProcess(this Mock dstResource) + { + dstResource.Verify(r => r.CopyFromStreamAsync( + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), + It.IsAny(), It.IsAny()), + Times.AtLeastOnce); + dstResource.Verify(r => r.CompleteTransferAsync( + It.IsAny(), + It.IsAny(), + It.IsAny()), + Times.AtMostOnce); // TODO why don't we complete single part transfers? + } + #endregion +}