diff --git a/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs b/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs index 058423f3bfbf8..e10f1098d52b3 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs @@ -508,6 +508,7 @@ await TransferFailedEventHandler.RaiseAsync( ClientDiagnostics) .ConfigureAwait(false); } + _dataTransfer.TransferStatus.SetFailedItem(); } try diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/AssemblyInfo.cs b/sdk/storage/Azure.Storage.DataMovement/tests/AssemblyInfo.cs new file mode 100644 index 0000000000000..2bdef50473db2 --- /dev/null +++ b/sdk/storage/Azure.Storage.DataMovement/tests/AssemblyInfo.cs @@ -0,0 +1,6 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2, PublicKey=0024000004800000940000000602000000240000525341310004000001000100c547cac37abd99c8db225ef2f6c8a3602f3b3606cc9891605d02baa56104f4cfc0734aa39b93bf7852f7d9266654753cc297e7d2edfe0bac1cdcf9f717241550e0a7b191195b7667bb4f64bcb8e2121380fd1d9d46ad2d92d2d15605093924cceaf74c4861eff62abf69b9291ed0a340e113be11e6a7d3113e92484cf7045cc7")] diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/Shared/MemoryTransferCheckpointer.cs b/sdk/storage/Azure.Storage.DataMovement/tests/Shared/MemoryTransferCheckpointer.cs index d6a1efc061a46..36da63a5221a1 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/Shared/MemoryTransferCheckpointer.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/Shared/MemoryTransferCheckpointer.cs @@ -57,7 +57,7 @@ public JobPartPlanHeader Plan public Dictionary Jobs { get; set; } = new(); - public Task AddNewJobAsync(string transferId, StorageResource source, StorageResource destination, CancellationToken cancellationToken = default) + public virtual Task AddNewJobAsync(string transferId, StorageResource source, StorageResource destination, CancellationToken cancellationToken = default) { CancellationHelper.ThrowIfCancellationRequested(cancellationToken); if (Jobs.ContainsKey(transferId)) @@ -71,12 +71,12 @@ public Task AddNewJobAsync(string transferId, StorageResource source, StorageRes Operation = JobPlanOperation.Upload, // TODO Source = source, Destination = destination, - Status = new() + Status = new(DataTransferState.Queued, false, false), }; return Task.CompletedTask; } - public Task AddNewJobPartAsync(string transferId, int partNumber, JobPartPlanHeader header, CancellationToken cancellationToken = default) + public virtual Task AddNewJobPartAsync(string transferId, int partNumber, JobPartPlanHeader header, CancellationToken cancellationToken = default) { CancellationHelper.ThrowIfCancellationRequested(cancellationToken); if (!Jobs.TryGetValue(transferId, out Job job)) @@ -90,18 +90,18 @@ public Task AddNewJobPartAsync(string transferId, int partNumber, JobPartPlanHea job.Parts.Add(partNumber, new() { Plan = header, - Status = new() + Status = new(DataTransferState.Queued, false, false), }); return Task.CompletedTask; } - public Task GetCurrentJobPartCountAsync(string transferId, CancellationToken cancellationToken = default) + public virtual Task GetCurrentJobPartCountAsync(string transferId, CancellationToken cancellationToken = default) { CancellationHelper.ThrowIfCancellationRequested(cancellationToken); return Task.FromResult(Jobs.TryGetValue(transferId, out Job job) ? job.Parts.Count : 0); } - public Task GetDataTransferPropertiesAsync(string transferId, CancellationToken cancellationToken = default) + public virtual Task GetDataTransferPropertiesAsync(string transferId, CancellationToken cancellationToken = default) { CancellationHelper.ThrowIfCancellationRequested(cancellationToken); if (!Jobs.TryGetValue(transferId, out Job job)) @@ -119,7 +119,7 @@ public Task GetDataTransferPropertiesAsync(string transf }); } - public Task GetJobPartAsync(string transferId, int partNumber, CancellationToken cancellationToken = default) + public virtual Task GetJobPartAsync(string transferId, int partNumber, CancellationToken cancellationToken = default) { CancellationHelper.ThrowIfCancellationRequested(cancellationToken); if (!Jobs.TryGetValue(transferId, out Job job)) @@ -133,7 +133,7 @@ public Task GetJobPartAsync(string transferId, int partNumber return Task.FromResult(part.Plan); } - public Task GetJobStatusAsync(string transferId, CancellationToken cancellationToken = default) + public virtual Task GetJobStatusAsync(string transferId, CancellationToken cancellationToken = default) { CancellationHelper.ThrowIfCancellationRequested(cancellationToken); if (!Jobs.TryGetValue(transferId, out Job job)) @@ -143,19 +143,19 @@ public Task GetJobStatusAsync(string transferId, Cancellatio return Task.FromResult(job.Status.DeepCopy()); } - public Task> GetStoredTransfersAsync(CancellationToken cancellationToken = default) + public virtual Task> GetStoredTransfersAsync(CancellationToken cancellationToken = default) { CancellationHelper.ThrowIfCancellationRequested(cancellationToken); return Task.FromResult(Jobs.Keys.ToList()); } - public Task IsEnumerationCompleteAsync(string transferId, CancellationToken cancellationToken = default) + public virtual Task IsEnumerationCompleteAsync(string transferId, CancellationToken cancellationToken = default) { CancellationHelper.ThrowIfCancellationRequested(cancellationToken); return Task.FromResult(Jobs.TryGetValue(transferId, out Job job) && job.EnumerationComplete); } - public Task SetEnumerationCompleteAsync(string transferId, CancellationToken cancellationToken = default) + public virtual Task SetEnumerationCompleteAsync(string transferId, CancellationToken cancellationToken = default) { CancellationHelper.ThrowIfCancellationRequested(cancellationToken); if (Jobs.TryGetValue(transferId, out Job job)) @@ -165,7 +165,7 @@ public Task SetEnumerationCompleteAsync(string transferId, CancellationToken can return Task.CompletedTask; } - public Task SetJobPartStatusAsync(string transferId, int partNumber, DataTransferStatus status, CancellationToken cancellationToken = default) + public virtual Task SetJobPartStatusAsync(string transferId, int partNumber, DataTransferStatus status, CancellationToken cancellationToken = default) { CancellationHelper.ThrowIfCancellationRequested(cancellationToken); if (Jobs.TryGetValue(transferId, out Job job) && job.Parts.TryGetValue(partNumber, out JobPart part)) @@ -175,7 +175,7 @@ public Task SetJobPartStatusAsync(string transferId, int partNumber, DataTransfe return Task.CompletedTask; } - public Task SetJobStatusAsync(string transferId, DataTransferStatus status, CancellationToken cancellationToken = default) + public virtual Task SetJobStatusAsync(string transferId, DataTransferStatus status, CancellationToken cancellationToken = default) { CancellationHelper.ThrowIfCancellationRequested(cancellationToken); if (Jobs.TryGetValue(transferId, out Job job)) @@ -185,7 +185,7 @@ public Task SetJobStatusAsync(string transferId, DataTransferStatus status, Canc return Task.CompletedTask; } - public Task TryRemoveStoredTransferAsync(string transferId, CancellationToken cancellationToken = default) + public virtual Task TryRemoveStoredTransferAsync(string transferId, CancellationToken cancellationToken = default) { CancellationHelper.ThrowIfCancellationRequested(cancellationToken); return Task.FromResult(Jobs.Remove(transferId)); diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/TransferManagerTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/TransferManagerTests.cs index 14bf961ed64f9..d25df44115457 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/TransferManagerTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/TransferManagerTests.cs @@ -12,10 +12,12 @@ using System.Threading.Tasks; using Azure.Core; using Azure.Core.Pipeline; +using Azure.Storage.DataMovement.JobPlan; using Azure.Storage.DataMovement.Tests.Shared; using Moq; using NUnit.Framework; using NUnit.Framework.Constraints; +using NUnit.Framework.Interfaces; namespace Azure.Storage.DataMovement.Tests; @@ -28,7 +30,7 @@ private static (StepProcessor JobProcessor, StepProcessor (new(), new(), new()); private static (StorageResource Source, StorageResource Destination, Func SrcThrowScope, Func DstThrowScope) - GetBasicSetupResources(bool isContainer, Uri srcUri, Uri dstUri) + GetBasicSetupResources(bool isContainer, Uri srcUri, Uri dstUri, bool includeDelete = false) { if (isContainer) { @@ -44,6 +46,7 @@ private static (StorageResource Source, StorageResource Destination, Func srcItem = new(MockBehavior.Strict); Mock dstItem = new(MockBehavior.Strict); (srcItem, dstItem).BasicSetup(srcUri, dstUri); + dstItem.Setup(item => item.DeleteIfExistsAsync(It.IsAny())); StorageResourceItemFailureWrapper srcWrapper = new(srcItem.Object); StorageResourceItemFailureWrapper dstWrapper = new(dstItem.Object); return (srcWrapper, dstWrapper, srcWrapper.ThrowScope, dstWrapper.ThrowScope); @@ -97,7 +100,7 @@ public async Task BasicItemTransfer( (var jobsProcessor, var partsProcessor, var chunksProcessor) = StepProcessors(); JobBuilder jobBuilder = new(ArrayPool.Shared, default, new ClientDiagnostics(ClientOptions.Default)); - Mock checkpointer = new(); + MemoryTransferCheckpointer checkpointer = new(); var resources = Enumerable.Range(0, items).Select(_ => { @@ -114,7 +117,7 @@ public async Task BasicItemTransfer( partsProcessor, chunksProcessor, jobBuilder, - checkpointer.Object, + checkpointer, default); List transfers = new(); @@ -138,6 +141,7 @@ public async Task BasicItemTransfer( srcResource.VerifyNoOtherCalls(); dstResource.VerifyNoOtherCalls(); } + Assert.That(checkpointer.Jobs.Count, Is.EqualTo(items), "Jobs not added to checkpointer."); Assert.That(jobsProcessor.ItemsInQueue, Is.EqualTo(items), "Error during initial Job queueing."); // process jobs @@ -151,6 +155,14 @@ public async Task BasicItemTransfer( srcResource.VerifyNoOtherCalls(); dstResource.VerifyNoOtherCalls(); } + foreach (MemoryTransferCheckpointer.Job job in checkpointer.Jobs.Values) + { + Assert.That(job.Parts.Count, Is.EqualTo(1), "Items should be single-part."); + Assert.That(job.Parts.Values.First().Status.State, Is.EqualTo(DataTransferState.Queued), "Bad part status."); + Assert.That(job.Parts.Keys.First(), Is.EqualTo(0), "Parts should be zero-indexed."); + Assert.That(job.EnumerationComplete, "Enumeration not marked comlete."); + Assert.That(job.Status.State, Is.EqualTo(DataTransferState.InProgress), "Transfer state not updated."); + } // process parts Assert.That(await partsProcessor.StepAll(), Is.EqualTo(items)); @@ -163,6 +175,13 @@ public async Task BasicItemTransfer( srcResource.VerifyNoOtherCalls(); dstResource.VerifyNoOtherCalls(); } + foreach (MemoryTransferCheckpointer.Job job in checkpointer.Jobs.Values) + { + foreach (MemoryTransferCheckpointer.JobPart part in job.Parts.Values) + { + Assert.That(part.Status.State, Is.EqualTo(DataTransferState.InProgress), "Part state not updated."); + } + } // process chunks Assert.That(await chunksProcessor.StepAll(), Is.EqualTo(expectedChunksInQueue)); @@ -175,12 +194,20 @@ public async Task BasicItemTransfer( dstResource.VerifyNoOtherCalls(); } - await Task.Delay(20); // TODO flaky that we need this; a random one will often fail without + await Task.Delay(50); // TODO flaky that we need this; a random one will often fail without foreach (DataTransfer transfer in transfers) { Assert.That(transfer.HasCompleted); } + foreach (MemoryTransferCheckpointer.Job job in checkpointer.Jobs.Values) + { + foreach (MemoryTransferCheckpointer.JobPart part in job.Parts.Values) + { + Assert.That(part.Status.State, Is.EqualTo(DataTransferState.Completed), "Part state not updated."); + } + Assert.That(job.Status.State, Is.EqualTo(DataTransferState.Completed), "Job state not updated."); + } } [Test] @@ -192,40 +219,34 @@ public async Task BasicContainerTransfer( { static int GetItemCountFromContainerIndex(int i) => i*i + 1; - int numJobParts = Enumerable.Range(1, numJobs).Select(GetItemCountFromContainerIndex).Sum(); + int totalJobParts = Enumerable.Range(1, numJobs).Select(GetItemCountFromContainerIndex).Sum(); int chunksPerPart = (int)Math.Ceiling((float)itemSize / chunkSize); // TODO: below should be only `items * chunksPerPart` but can't in some cases due to // a bug in how work items are processed on multipart uploads. - int numChunks = Math.Max(chunksPerPart - 1, 1) * numJobParts; + int numChunks = Math.Max(chunksPerPart - 1, 1) * totalJobParts; Uri srcUri = new("file:///foo/bar"); Uri dstUri = new("https://example.com/fizz/buzz"); (var jobsProcessor, var partsProcessor, var chunksProcessor) = StepProcessors(); JobBuilder jobBuilder = new(ArrayPool.Shared, default, new ClientDiagnostics(ClientOptions.Default)); - Mock checkpointer = new(); - - var resources = Enumerable.Range(1, numJobs).Select(i => - { - Mock srcResource = new(MockBehavior.Strict); - Mock dstResource = new(MockBehavior.Strict); - (srcResource, dstResource).BasicSetup(srcUri, dstUri, GetItemCountFromContainerIndex(i), itemSize); - return (Source: srcResource, Destination: dstResource); - }).ToList(); + MemoryTransferCheckpointer checkpointer = new(); await using TransferManager transferManager = new( jobsProcessor, partsProcessor, chunksProcessor, jobBuilder, - checkpointer.Object, + checkpointer, default); - List transfers = new(); + List<(DataTransfer Transfer, int ExpectedPartCount, Mock Source, Mock Destination)> transfers = new(); - // queue jobs - foreach ((Mock srcResource, Mock dstResource) in resources) + foreach (int i in Enumerable.Range(1, numJobs)) { + Mock srcResource = new(MockBehavior.Strict); + Mock dstResource = new(MockBehavior.Strict); + (srcResource, dstResource).BasicSetup(srcUri, dstUri, GetItemCountFromContainerIndex(i), itemSize); DataTransfer transfer = await transferManager.StartTransferAsync( srcResource.Object, dstResource.Object, @@ -234,22 +255,28 @@ public async Task BasicContainerTransfer( InitialTransferSize = chunkSize, MaximumTransferChunkSize = chunkSize, }); - Assert.That(transfer.HasCompleted, Is.False); - transfers.Add(transfer); + transfers.Add((transfer, GetItemCountFromContainerIndex(i), srcResource, dstResource)); + Assert.That(transfer.HasCompleted, Is.False); srcResource.VerifySourceResourceOnQueue(); dstResource.VerifyDestinationResourceOnQueue(); srcResource.VerifyNoOtherCalls(); dstResource.VerifyNoOtherCalls(); } + Assert.That(checkpointer.Jobs.Count, Is.EqualTo(numJobs), "Jobs not added to checkpointer."); Assert.That(jobsProcessor.ItemsInQueue, Is.EqualTo(numJobs), "Error during initial Job queueing."); // process jobs - Assert.That(await jobsProcessor.StepAll(), Is.EqualTo(numJobs)); - Assert.That(jobsProcessor.ItemsInQueue, Is.EqualTo(0)); - Assert.That(partsProcessor.ItemsInQueue, Is.EqualTo(numJobParts), "Error during Job => Part processing."); - foreach ((Mock srcResource, Mock dstResource) in resources) + Assert.That(await jobsProcessor.StepAll(), Is.EqualTo(numJobs), "Failed to step through jobs queue."); + Assert.That(jobsProcessor.ItemsInQueue, Is.EqualTo(0), "Failed to step through jobs queue."); + Assert.That(partsProcessor.ItemsInQueue, Is.EqualTo(totalJobParts), "Error during Job => Part processing."); + foreach ((DataTransfer transfer, int parts, Mock srcResource, Mock dstResource) in transfers) { + Assert.That(checkpointer.Jobs[transfer.Id].Parts.Count, Is.EqualTo(parts), "Containers should have several parts."); + Assert.That(checkpointer.Jobs[transfer.Id].Parts.Keys, Is.EquivalentTo(Enumerable.Range(0, checkpointer.Jobs[transfer.Id].Parts.Count).ToList()), + "Part nums should be sequential and zero-indexed."); + Assert.That(checkpointer.Jobs[transfer.Id].EnumerationComplete, "Enumeration not marked comlete."); + Assert.That(checkpointer.Jobs[transfer.Id].Status.State, Is.EqualTo(DataTransferState.InProgress), "Transfer state not updated."); srcResource.VerifySourceResourceOnJobProcess(); dstResource.VerifyDestinationResourceOnJobProcess(); srcResource.VerifyNoOtherCalls(); @@ -257,19 +284,23 @@ public async Task BasicContainerTransfer( } // process parts - Assert.That(await partsProcessor.StepAll(), Is.EqualTo(numJobParts)); - Assert.That(partsProcessor.ItemsInQueue, Is.EqualTo(0)); + Assert.That(await partsProcessor.StepAll(), Is.EqualTo(totalJobParts), "Failed to step through parts queue."); + Assert.That(partsProcessor.ItemsInQueue, Is.EqualTo(0), "Failed to step through parts queue."); Assert.That(chunksProcessor.ItemsInQueue, Is.EqualTo(numChunks), "Error during Part => Chunk processing."); - foreach ((Mock srcResource, Mock dstResource) in resources) + foreach ((DataTransfer transfer, int parts, Mock srcResource, Mock dstResource) in transfers) { + foreach (MemoryTransferCheckpointer.JobPart part in checkpointer.Jobs[transfer.Id].Parts.Values) + { + Assert.That(part.Status.State, Is.EqualTo(DataTransferState.InProgress), "Part state not updated."); + } srcResource.VerifyNoOtherCalls(); dstResource.VerifyNoOtherCalls(); } // process chunks - Assert.That(await chunksProcessor.StepAll(), Is.EqualTo(numChunks)); - Assert.That(chunksProcessor.ItemsInQueue, Is.EqualTo(0)); - foreach ((Mock srcResource, Mock dstResource) in resources) + Assert.That(await chunksProcessor.StepAll(), Is.EqualTo(numChunks), "Failed to step through chunks queue."); + Assert.That(chunksProcessor.ItemsInQueue, Is.EqualTo(0), "Failed to step through chunks queue."); + foreach ((DataTransfer transfer, int parts, Mock srcResource, Mock dstResource) in transfers) { srcResource.VerifyNoOtherCalls(); dstResource.VerifyNoOtherCalls(); @@ -277,9 +308,14 @@ public async Task BasicContainerTransfer( await Task.Delay(10); // TODO flaky that we need this; a random one will often fail without - foreach (DataTransfer transfer in transfers) + foreach ((DataTransfer transfer, int parts, Mock srcResource, Mock dstResource) in transfers) { Assert.That(transfer.HasCompleted); + foreach (MemoryTransferCheckpointer.JobPart part in checkpointer.Jobs[transfer.Id].Parts.Values) + { + Assert.That(part.Status.State, Is.EqualTo(DataTransferState.Completed), "Part state not updated."); + } + Assert.That(checkpointer.Jobs[transfer.Id].Status.State, Is.EqualTo(DataTransferState.Completed), "Job state not updated."); } } @@ -298,40 +334,36 @@ public async Task TransferFailAtQueue( { CallBase = true, }; - Mock checkpointer = new(); + Mock checkpointer = new() + { + CallBase = true, + }; (StorageResource srcResource, StorageResource dstResource, Func srcThrowScope, Func dstThrowScope) = GetBasicSetupResources(isContainer, srcUri, dstUri); Exception expectedException = new(); Exception cleanupException = throwCleanup ? new() : null; - List capturedTransferIds = new(); + bool expectTransferInCheckpointer = failAt == 0 && throwCleanup; + switch (failAt) { - var checkpointerAddJob = checkpointer.Setup(c => c.AddNewJobAsync(Capture.In(capturedTransferIds), - It.IsAny(), It.IsAny(), It.IsAny())); - var checkpointerRemoveJob = checkpointer.Setup(c => c.TryRemoveStoredTransferAsync( - It.IsAny(), It.IsAny())); - - switch (failAt) - { - case 0: - jobBuilder.Setup(b => b.BuildJobAsync(It.IsAny(), It.IsAny(), - It.IsAny(), It.IsAny(), It.IsAny(), - It.IsAny(), It.IsAny()) - ).Throws(expectedException); - break; - case 1: - checkpointerAddJob.Throws(expectedException); - break; - } - if (throwCleanup) - { - checkpointerRemoveJob.Throws(cleanupException); - } - else - { - checkpointerRemoveJob.Returns(Task.FromResult(true)); - } + case 0: + jobBuilder.Setup(b => b.BuildJobAsync(It.IsAny(), It.IsAny(), + It.IsAny(), It.IsAny(), It.IsAny(), + It.IsAny(), It.IsAny()) + ).Throws(expectedException); + break; + case 1: + checkpointer.Setup(c => c.AddNewJobAsync(It.IsAny(), It.IsAny(), + It.IsAny(), It.IsAny()) + ).Throws(expectedException); + break; + } + if (throwCleanup) + { + checkpointer.Setup(c => c.TryRemoveStoredTransferAsync( + It.IsAny(), It.IsAny()) + ).Throws(cleanupException); } await using TransferManager transferManager = new( @@ -352,10 +384,10 @@ public async Task TransferFailAtQueue( Assert.That(transfer, Is.Null); - Assert.That(capturedTransferIds.Count, Is.EqualTo(1)); - checkpointer.Verify(c => c.AddNewJobAsync(capturedTransferIds.First(), It.IsAny(), + Assert.That(checkpointer.Object.Jobs.Count, Is.EqualTo(expectTransferInCheckpointer ? 1 : 0)); + checkpointer.Verify(c => c.AddNewJobAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); - checkpointer.Verify(c => c.TryRemoveStoredTransferAsync(capturedTransferIds.First(), + checkpointer.Verify(c => c.TryRemoveStoredTransferAsync(It.IsAny(), It.IsAny()), Times.Once); checkpointer.VerifyNoOtherCalls(); } @@ -370,7 +402,14 @@ public async Task TransferFailAtJobProcess( (var jobsProcessor, var partsProcessor, var chunksProcessor) = StepProcessors(); JobBuilder jobBuilder = new(ArrayPool.Shared, default, new(ClientOptions.Default)); + + List capturedTransferIds = new(); + List capturedTransferStatuses = new(); Mock checkpointer = new(MockBehavior.Loose); + checkpointer.Setup(c => c.AddNewJobAsync(Capture.In(capturedTransferIds), + It.IsAny(), It.IsAny(), It.IsAny())); + checkpointer.Setup(c => c.SetJobStatusAsync(It.IsAny(), CaptureTransferStatus(capturedTransferStatuses), + It.IsAny())); (StorageResource srcResource, StorageResource dstResource, Func srcThrowScope, Func dstThrowScope) = GetBasicSetupResources(isContainer, srcUri, dstUri); @@ -383,17 +422,12 @@ public async Task TransferFailAtJobProcess( checkpointer.Object, default); - //Exception expectedException = new(); - //checkpointer.Setup(c => c.AddNewJobPartAsync(It.IsAny(), It.IsAny(), It.IsAny(), - // It.IsAny()) - //).Throws(expectedException); - // need to listen to events to get exception that takes place in processing List failures = new(); DataTransferOptions options = new(); options.ItemTransferFailed += e => { failures.Add(e); return Task.CompletedTask; }; - DataTransfer transfer = await transferManager.StartTransferAsync(srcResource, dstResource); + DataTransfer transfer = await transferManager.StartTransferAsync(srcResource, dstResource, options); using (srcThrowScope()) { @@ -401,11 +435,20 @@ public async Task TransferFailAtJobProcess( } Assert.That(jobsProcessor.ItemsInQueue, Is.Zero); Assert.That(partsProcessor.ItemsInQueue, Is.Zero); // because of failure - // TODO Failures in processing job into job part(s) should surface errors (currently doesn't) - // Assert.That(transfer.TransferStatus.HasFailedItems); - // Assert.That(failures, Is.Not.Empty); - // TODO determine checkpointer status of job parts - // need checkpointer API refactor for this + Assert.That(transfer.TransferStatus.HasFailedItems); + Assert.That(failures, Is.Not.Empty); + + string transferId = capturedTransferIds.First(); + checkpointer.Verify(c => c.AddNewJobAsync(transferId, It.IsAny(), It.IsAny(), + It.IsAny())); + checkpointer.Verify(c => c.SetJobStatusAsync(transferId, It.IsAny(), + It.IsAny()), Times.Exactly(3)); + Assert.That(capturedTransferStatuses[0].State, Is.EqualTo(DataTransferState.InProgress)); + Assert.That(capturedTransferStatuses[1].State, Is.EqualTo(DataTransferState.Stopping)); + Assert.That(capturedTransferStatuses[2].IsCompletedWithFailedItems); + checkpointer.VerifyNoOtherCalls(); + + // TODO checkpointer probably shouldn't be in this state. } [Test] @@ -418,10 +461,17 @@ public async Task TransferFailAtPartProcess( (var jobsProcessor, var partsProcessor, var chunksProcessor) = StepProcessors(); JobBuilder jobBuilder = new(ArrayPool.Shared, default, new(ClientOptions.Default)); - Mock checkpointer = new(MockBehavior.Loose); + + Mock checkpointer = new() + { + CallBase = true + }; + List capturedTransferStatuses = new(); + checkpointer.Setup(c => c.SetJobStatusAsync(It.IsAny(), CaptureTransferStatus(capturedTransferStatuses), + It.IsAny())); (StorageResource srcResource, StorageResource dstResource, Func srcThrowScope, Func dstThrowScope) - = GetBasicSetupResources(isContainer, srcUri, dstUri); + = GetBasicSetupResources(isContainer, srcUri, dstUri, includeDelete: true); await using TransferManager transferManager = new( jobsProcessor, @@ -451,8 +501,12 @@ public async Task TransferFailAtPartProcess( Assert.That(transfer.TransferStatus.HasFailedItems); Assert.That(failures, Is.Not.Empty); - // TODO determine checkpointer status of job chunks - // need checkpointer API refactor for this + + Assert.That(capturedTransferStatuses.Count, Is.EqualTo(3)); // TODO should be 4 + Assert.That(capturedTransferStatuses[0].State, Is.EqualTo(DataTransferState.InProgress)); + Assert.That(capturedTransferStatuses[1].State, Is.EqualTo(DataTransferState.InProgress)); + Assert.That(capturedTransferStatuses[2].State, Is.EqualTo(DataTransferState.Stopping)); + //Assert.That(capturedTransferStatuses[3].IsCompletedWithFailedItems); // TODO this should exist } [Test] @@ -486,6 +540,17 @@ public async Task MultipleTransfersAddedCheckpointer(int numJobs) }); Assert.That(jobsProcessor.ItemsInQueue, Is.EqualTo(numJobs), "Error during initial Job queueing."); } + + /// + /// DataTransferStatus is stateful across transfer. This makes it difficult to verify mocks, as verifications + /// are lazily performed. This captures deep copies of statuses for custom assertion. + /// + private static DataTransferStatus CaptureTransferStatus(ICollection statuses) + => Match.Create(status => + { + statuses.Add(status.DeepCopy()); + return true; + }); } internal static partial class MockExtensions