Skip to content

Commit

Permalink
Test checkpointer on queue fail (#46936)
Browse files Browse the repository at this point in the history
  • Loading branch information
jaschrep-msft authored Nov 8, 2024
1 parent 5060ff5 commit e8ac77d
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 36 deletions.
50 changes: 32 additions & 18 deletions sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -369,24 +369,38 @@ public virtual async Task<DataTransfer> StartTransferAsync(
transferOptions ??= new DataTransferOptions();

string transferId = _generateTransferId();
await _checkpointer.AddNewJobAsync(
transferId,
sourceResource,
destinationResource,
cancellationToken).ConfigureAwait(false);

// TODO: if the below fails for any reason, this job will still be in the checkpointer.
// That seems not desirable.

DataTransfer dataTransfer = await BuildAndAddTransferJobAsync(
sourceResource,
destinationResource,
transferOptions,
transferId,
false,
cancellationToken).ConfigureAwait(false);

return dataTransfer;
try
{
await _checkpointer.AddNewJobAsync(
transferId,
sourceResource,
destinationResource,
cancellationToken).ConfigureAwait(false);

DataTransfer dataTransfer = await BuildAndAddTransferJobAsync(
sourceResource,
destinationResource,
transferOptions,
transferId,
false,
cancellationToken).ConfigureAwait(false);

return dataTransfer;
}
catch (Exception ex)
{
// cleanup any state for a job that didn't even start
try
{
_dataTransfers.Remove(transferId);
await _checkpointer.TryRemoveStoredTransferAsync(transferId, cancellationToken).ConfigureAwait(false);
}
catch (Exception cleanupEx)
{
throw new AggregateException(ex, cleanupEx);
}
throw;
}
}

private async Task<DataTransfer> BuildAndAddTransferJobAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Azure.Storage.DataMovement.Tests.Shared;
using Moq;
using NUnit.Framework;
using NUnit.Framework.Constraints;

namespace Azure.Storage.DataMovement.Tests;

Expand Down Expand Up @@ -286,6 +287,7 @@ public async Task BasicContainerTransfer(
[Combinatorial]
public async Task TransferFailAtQueue(
[Values(0, 1)] int failAt,
[Values(true, false)] bool throwCleanup,
[Values(true, false)] bool isContainer)
{
Uri srcUri = new("file:///foo/bar");
Expand All @@ -302,19 +304,34 @@ public async Task TransferFailAtQueue(
= GetBasicSetupResources(isContainer, srcUri, dstUri);

Exception expectedException = new();
switch (failAt)
Exception cleanupException = throwCleanup ? new() : null;
List<string> capturedTransferIds = new();
{
case 0:
jobBuilder.Setup(b => b.BuildJobAsync(It.IsAny<StorageResource>(), It.IsAny<StorageResource>(),
It.IsAny<DataTransferOptions>(), It.IsAny<ITransferCheckpointer>(), It.IsAny<string>(),
It.IsAny<bool>(), It.IsAny<CancellationToken>())
).Throws(expectedException);
break;
case 1:
checkpointer.Setup(c => c.AddNewJobAsync(It.IsAny<string>(), It.IsAny<StorageResource>(),
It.IsAny<StorageResource>(), It.IsAny<CancellationToken>())
).Throws(expectedException);
break;
var checkpointerAddJob = checkpointer.Setup(c => c.AddNewJobAsync(Capture.In(capturedTransferIds),
It.IsAny<StorageResource>(), It.IsAny<StorageResource>(), It.IsAny<CancellationToken>()));
var checkpointerRemoveJob = checkpointer.Setup(c => c.TryRemoveStoredTransferAsync(
It.IsAny<string>(), It.IsAny<CancellationToken>()));

switch (failAt)
{
case 0:
jobBuilder.Setup(b => b.BuildJobAsync(It.IsAny<StorageResource>(), It.IsAny<StorageResource>(),
It.IsAny<DataTransferOptions>(), It.IsAny<ITransferCheckpointer>(), It.IsAny<string>(),
It.IsAny<bool>(), It.IsAny<CancellationToken>())
).Throws(expectedException);
break;
case 1:
checkpointerAddJob.Throws(expectedException);
break;
}
if (throwCleanup)
{
checkpointerRemoveJob.Throws(cleanupException);
}
else
{
checkpointerRemoveJob.Returns(Task.FromResult(true));
}
}

await using TransferManager transferManager = new(
Expand All @@ -326,15 +343,21 @@ public async Task TransferFailAtQueue(
default);

DataTransfer transfer = null;

Assert.That(async () => transfer = await transferManager.StartTransferAsync(
srcResource,
dstResource), Throws.Exception.EqualTo(expectedException));
IConstraint throwsConstraint = throwCleanup
? Throws.TypeOf<AggregateException>().And.Property(nameof(AggregateException.InnerExceptions))
.EquivalentTo(new List<Exception>() { expectedException, cleanupException })
: Throws.Exception.EqualTo(expectedException);
Assert.That(async () => transfer = await transferManager.StartTransferAsync(srcResource, dstResource),
throwsConstraint);

Assert.That(transfer, Is.Null);

// TODO determine if checkpointer still has the job tracked even though it failed to queue (it shouldn't)
// need checkpointer API refactor for this
Assert.That(capturedTransferIds.Count, Is.EqualTo(1));
checkpointer.Verify(c => c.AddNewJobAsync(capturedTransferIds.First(), It.IsAny<StorageResource>(),
It.IsAny<StorageResource>(), It.IsAny<CancellationToken>()), Times.Once);
checkpointer.Verify(c => c.TryRemoveStoredTransferAsync(capturedTransferIds.First(),
It.IsAny<CancellationToken>()), Times.Once);
checkpointer.VerifyNoOtherCalls();
}

[Test]
Expand Down

0 comments on commit e8ac77d

Please sign in to comment.