From a3427ee7200445312fbcaed774e129ce00cbe8a2 Mon Sep 17 00:00:00 2001 From: Nathan Willoughby Date: Mon, 27 Nov 2023 17:24:56 +1000 Subject: [PATCH] Fix a bug in co-operative cancellation causing a request to be cancelled before it should be --- .../PollingServiceTimeoutsFixture.cs | 112 ++++++++++++++++++ .../PendingRequestQueueFixture.cs | 70 +++++------ ...AndLatestServiceBuilderExtensionMethods.cs | 7 -- ...ReceivingRequestMessagesTimeoutsFixture.cs | 8 +- .../ServiceModel/PendingRequestQueueAsync.cs | 47 +++++--- .../PortForwarder.cs | 2 + 6 files changed, 183 insertions(+), 63 deletions(-) create mode 100644 source/Halibut.Tests/PollingServiceTimeoutsFixture.cs diff --git a/source/Halibut.Tests/PollingServiceTimeoutsFixture.cs b/source/Halibut.Tests/PollingServiceTimeoutsFixture.cs new file mode 100644 index 000000000..7090e3f10 --- /dev/null +++ b/source/Halibut.Tests/PollingServiceTimeoutsFixture.cs @@ -0,0 +1,112 @@ +using System; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; +using FluentAssertions; +using Halibut.Tests.Support; +using Halibut.Tests.Support.TestAttributes; +using Halibut.Tests.Support.TestCases; +using Halibut.Tests.TestServices; +using Halibut.Tests.TestServices.Async; +using Halibut.TestUtils.Contracts; +using NUnit.Framework; + +namespace Halibut.Tests +{ + public class PollingServiceTimeoutsFixture : BaseTest + { + [Test] + [LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false)] + public async Task WhenThePollingRequestQueueTimeoutIsReached_TheRequestShouldTimeout(ClientAndServiceTestCase clientAndServiceTestCase) + { + var halibutTimeoutsAndLimits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build(); + halibutTimeoutsAndLimits.PollingRequestQueueTimeout = TimeSpan.FromSeconds(5); + + await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() + .As() + .NoService() + .WithStandardServices() + .WithHalibutTimeoutsAndLimits(halibutTimeoutsAndLimits) + .Build(CancellationToken)) + { + var client = clientAndService.CreateAsyncClient(); + + var stopwatch = Stopwatch.StartNew(); + (await AssertAsync.Throws(async () => await client.SayHelloAsync("Hello", new(CancellationToken)))) + .And.Message.Should().Contain("A request was sent to a polling endpoint, but the polling endpoint did not collect the request within the allowed time (00:00:05), so the request timed out."); + stopwatch.Stop(); + + stopwatch.Elapsed.Should().BeLessThan(TimeSpan.FromSeconds(8), "Should have timed out quickly"); + } + } + + [Test] + [LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false)] + public async Task WhenThePollingRequestQueueTimeoutIsReached_ButTheResponseIsReceivedBeforeThePollingRequestMaximumMessageProcessingTimeoutIsReached_TheRequestShouldSucceed(ClientAndServiceTestCase clientAndServiceTestCase) + { + var halibutTimeoutsAndLimits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build(); + halibutTimeoutsAndLimits.PollingRequestQueueTimeout = TimeSpan.FromSeconds(5); + halibutTimeoutsAndLimits.PollingRequestMaximumMessageProcessingTimeout = TimeSpan.FromSeconds(100); + + var responseDelay = TimeSpan.FromSeconds(10); + + await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() + .As() + .WithDoSomeActionService(() => Thread.Sleep(responseDelay)) + .WithHalibutTimeoutsAndLimits(halibutTimeoutsAndLimits) + .WithInstantReconnectPollingRetryPolicy() + .Build(CancellationToken)) + { + var doSomeActionClient = clientAndService.CreateAsyncClient(); + + var stopwatch = Stopwatch.StartNew(); + await doSomeActionClient.ActionAsync(new(CancellationToken)); + stopwatch.Stop(); + + stopwatch.Elapsed.Should() + .BeGreaterThan(halibutTimeoutsAndLimits.PollingRequestQueueTimeout, "Should have waited longer than the PollingRequestQueueTimeout").And + .BeLessThan(responseDelay + TimeSpan.FromSeconds(5), "Should have received the response after the 10 second delay + 5 second buffer"); + } + } + + [Test] + [LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false)] + public async Task WhenThePollingRequestMaximumMessageProcessingTimeoutIsReached_TheRequestShouldTimeout_AndTheTransferringPendingRequestCancelled(ClientAndServiceTestCase clientAndServiceTestCase) + { + var halibutTimeoutsAndLimits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build(); + halibutTimeoutsAndLimits.PollingRequestQueueTimeout = TimeSpan.FromSeconds(5); + halibutTimeoutsAndLimits.PollingRequestMaximumMessageProcessingTimeout = TimeSpan.FromSeconds(6); + + var waitSemaphore = new SemaphoreSlim(0, 1); + var connectionsObserver = new TestConnectionsObserver(); + + await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() + .As() + .WithDoSomeActionService(() => waitSemaphore.Wait(CancellationToken)) + .WithHalibutTimeoutsAndLimits(halibutTimeoutsAndLimits) + .WithInstantReconnectPollingRetryPolicy() + .WithConnectionObserverOnTcpServer(connectionsObserver) + .Build(CancellationToken)) + { + var doSomeActionClient = clientAndService.CreateAsyncClient(); + + var stopwatch = Stopwatch.StartNew(); + (await AssertAsync.Throws(async () => await doSomeActionClient.ActionAsync(new(CancellationToken)))) + .And.Message.Should().Contain("A request was sent to a polling endpoint, the polling endpoint collected it but did not respond in the allowed time (00:00:06), so the request timed out."); + stopwatch.Stop(); + + stopwatch.Elapsed.Should().BeLessThan(TimeSpan.FromSeconds(15), "Should have timed out quickly"); + + connectionsObserver.ConnectionAcceptedCount.Should().Be(1, "A single TCP connection should have been created"); + + waitSemaphore.Release(); + + Wait.UntilActionSucceeds(() => + { + connectionsObserver.ConnectionClosedCount.Should().Be(1, "Cancelling the PendingRequest should have caused the TCP Connection to be cancelled to stop the in-flight request"); + connectionsObserver.ConnectionAcceptedCount.Should().Be(2, "The Service should have reconnected after the request was cancelled"); + }, TimeSpan.FromSeconds(30), Logger, CancellationToken); + } + } + } +} \ No newline at end of file diff --git a/source/Halibut.Tests/ServiceModel/PendingRequestQueueFixture.cs b/source/Halibut.Tests/ServiceModel/PendingRequestQueueFixture.cs index a14e71136..e410e733d 100644 --- a/source/Halibut.Tests/ServiceModel/PendingRequestQueueFixture.cs +++ b/source/Halibut.Tests/ServiceModel/PendingRequestQueueFixture.cs @@ -8,7 +8,6 @@ using FluentAssertions; using Halibut.ServiceModel; using Halibut.Tests.Builders; -using Halibut.Tests.Support.TestAttributes; using Halibut.Transport.Protocol; using NUnit.Framework; @@ -25,17 +24,16 @@ public async Task QueueAndWait_WillContinueWaitingUntilResponseIsApplied() var sut = new PendingRequestQueueBuilder().WithEndpoint(endpoint).Build(); var request = new RequestMessageBuilder(endpoint).Build(); var expectedResponse = ResponseMessageBuilder.FromRequest(request).Build(); - + var queueAndWaitTask = await StartQueueAndWaitAndWaitForRequestToBeQueued(sut, request, CancellationToken); await sut.DequeueAsync(CancellationToken); - // Act await Task.Delay(1000, CancellationToken); queueAndWaitTask.IsCompleted.Should().BeFalse(); await sut.ApplyResponse(expectedResponse, request.Destination); - + // Assert var response = await queueAndWaitTask; response.Should().Be(expectedResponse); @@ -85,7 +83,7 @@ public async Task QueueAndWait_WhenPollingRequestQueueTimeoutIsReached_WillStopW var request = new RequestMessageBuilder(endpoint) .WithServiceEndpoint(seb => seb.WithPollingRequestQueueTimeout(TimeSpan.FromMilliseconds(1000))) .Build(); - + // Act var stopwatch = Stopwatch.StartNew(); var queueAndWaitTask = await StartQueueAndWaitAndWaitForRequestToBeQueued(sut, request, CancellationToken); @@ -118,8 +116,10 @@ public async Task QueueAndWait_WhenRequestIsDequeued_ButPollingRequestQueueTimeo // Act var stopwatch = Stopwatch.StartNew(); - var (queueAndWaitTask, _) = await QueueAndDequeueRequest_ForTimeoutTestingOnly_ToCopeWithRaceCondition(sut, request, CancellationToken); + var (queueAndWaitTask, dequeued) = await QueueAndDequeueRequest_ForTimeoutTestingOnly_ToCopeWithRaceCondition(sut, request, CancellationToken); + var response = await queueAndWaitTask; + dequeued.CancellationToken.IsCancellationRequested.Should().BeTrue("Should have cancelled the request when the PollingRequestMaximumMessageProcessingTimeout is reached"); // Assert // Although we sleep for 2 second, sometimes it can be just under. So be generous with the buffer. @@ -130,7 +130,7 @@ public async Task QueueAndWait_WhenRequestIsDequeued_ButPollingRequestQueueTimeo var next = await sut.DequeueAsync(CancellationToken); next.Should().BeNull(); } - + [Test] public async Task QueueAndWait_WhenRequestIsDequeued_ButPollingRequestQueueTimeoutIsReached_ShouldWaitTillRequestRespondsAndClearRequest() { @@ -141,28 +141,31 @@ public async Task QueueAndWait_WhenRequestIsDequeued_ButPollingRequestQueueTimeo .WithEndpoint(endpoint) .WithPollingQueueWaitTimeout(TimeSpan.Zero) // Remove delay, otherwise we wait the full 20 seconds for DequeueAsync at the end of the test .Build(); + var request = new RequestMessageBuilder(endpoint) .WithServiceEndpoint(seb => seb.WithPollingRequestQueueTimeout(TimeSpan.FromMilliseconds(1000))) .Build(); + var expectedResponse = ResponseMessageBuilder.FromRequest(request).Build(); // Act var (queueAndWaitTask, dequeued) = await QueueAndDequeueRequest_ForTimeoutTestingOnly_ToCopeWithRaceCondition(sut, request, CancellationToken); await Task.Delay(2000, CancellationToken); + dequeued.CancellationToken.IsCancellationRequested.Should().BeFalse("Should not have cancelled the request after PollingRequestQueueTimeout is reached"); await sut.ApplyResponse(expectedResponse, request.Destination); var response = await queueAndWaitTask; // Assert - dequeued.Should().NotBeNull("We should have removed the item from the queue before it timed out.").And.Be(request); + dequeued.RequestMessage.Should().NotBeNull("We should have removed the item from the queue before it timed out.").And.Be(request); response.Should().Be(expectedResponse); var next = await sut.DequeueAsync(CancellationToken); next.Should().BeNull(); } - + [Test] public async Task QueueAndWait_AddingMultipleItemsToQueueInOrder_ShouldDequeueInOrder() { @@ -250,7 +253,7 @@ public async Task QueueAndWait_Can_Queue_Dequeue_Apply_VeryLargeNumberOfRequests .ToList(); await Task.WhenAll(dequeueTasks); - + // Assert await ApplyResponsesConcurrentlyAndEnsureAllQueueResponsesMatch(sut, requestsInOrder, queueAndWaitTasksInOrder); } @@ -265,7 +268,7 @@ public async Task QueueAndWait_Can_Queue_Dequeue_WhenRequestsAreBeingCancelled() const int minimumCancelledRequest = 100; var sut = new PendingRequestQueueBuilder().WithEndpoint(endpoint).Build(); - + var requestsInOrder = Enumerable.Range(0, totalRequest) .Select(_ => new RequestMessageBuilder(endpoint).Build()) .ToList(); @@ -276,13 +279,13 @@ public async Task QueueAndWait_Can_Queue_Dequeue_WhenRequestsAreBeingCancelled() { var requestCancellationTokenSource = new CancellationTokenSource(); return new Tuple, CancellationTokenSource>( - StartQueueAndWait(sut, request, requestCancellationTokenSource.Token), + StartQueueAndWait(sut, request, requestCancellationTokenSource.Token), requestCancellationTokenSource); }) .ToList(); await WaitForQueueCountToBecome(sut, requestsInOrder.Count); - + var index = 0; var cancelled = 0; var dequeueTasks = new ConcurrentBag>(); @@ -293,10 +296,10 @@ public async Task QueueAndWait_Can_Queue_Dequeue_WhenRequestsAreBeingCancelled() { var currentIndex = Interlocked.Increment(ref index); - if(currentIndex % 2 == 0) + if (currentIndex % 2 == 0) { Interlocked.Increment(ref cancelled); - queueAndWaitTasksInOrder.ElementAt(index-1).Item2.Cancel(); + queueAndWaitTasksInOrder.ElementAt(index - 1).Item2.Cancel(); } } }); @@ -378,16 +381,17 @@ public async Task QueueAndWait_CancellingAPendingRequestAfterItIsDequeued_Should // Cancel, and give the queue time to start waiting for a response cancellationTokenSource.Cancel(); await Task.Delay(1000, CancellationToken); - + dequeued.CancellationToken.IsCancellationRequested.Should().BeTrue("Should have cancelled the request"); + await AssertionExtensions.Should(() => queueAndWaitTask).ThrowAsync(); - + // Assert dequeued.RequestMessage.Should().NotBeNull().And.Be(request); - + var next = await sut.DequeueAsync(CancellationToken); next.Should().BeNull(); } - + [Test] public async Task DequeueAsync_WithNothingQueued_WillWaitPollingQueueWaitTimeout_ShouldReturnNull() { @@ -398,7 +402,7 @@ public async Task DequeueAsync_WithNothingQueued_WillWaitPollingQueueWaitTimeout .WithEndpoint(endpoint) .WithPollingQueueWaitTimeout(TimeSpan.FromSeconds(1)) .Build(); - + // Act var stopwatch = Stopwatch.StartNew(); var request = await sut.DequeueAsync(CancellationToken); @@ -424,7 +428,7 @@ public async Task DequeueAsync_WithNothingQueued_WillWaitPollingQueueWaitTimeout var previousRequest = new RequestMessageBuilder(endpoint).Build(); var expectedPreviousResponse = ResponseMessageBuilder.FromRequest(previousRequest).Build(); - var queueAndWaitTask = await StartQueueAndWaitAndWaitForRequestToBeQueued(sut, previousRequest ,CancellationToken); + var queueAndWaitTask = await StartQueueAndWaitAndWaitForRequestToBeQueued(sut, previousRequest, CancellationToken); await sut.DequeueAsync(CancellationToken); await sut.ApplyResponse(expectedPreviousResponse, previousRequest.Destination); await queueAndWaitTask; @@ -432,7 +436,7 @@ public async Task DequeueAsync_WithNothingQueued_WillWaitPollingQueueWaitTimeout // Act var stopwatch = Stopwatch.StartNew(); var dequeuedRequest = await sut.DequeueAsync(CancellationToken); - + // Assert // Although we sleep for 1 second, sometimes it can be just under. So be generous with the buffer. stopwatch.Elapsed.Should().BeGreaterThan(TimeSpan.FromMilliseconds(800)); @@ -456,9 +460,9 @@ public async Task DequeueAsync_WillContinueWaitingUntilItemIsQueued() await Task.Delay(1000, CancellationToken); var queueAndWaitTask = StartQueueAndWait(sut, request, CancellationToken); - + var dequeuedRequest = await dequeueTask; - + // Assert // Although we sleep for 1 second, sometimes it can be just under. So be generous with the buffer. stopwatch.Elapsed.Should().BeGreaterThan(TimeSpan.FromMilliseconds(800)); @@ -470,7 +474,7 @@ public async Task DequeueAsync_WillContinueWaitingUntilItemIsQueued() var response = await queueAndWaitTask; response.Should().Be(expectedResponse); } - + [Test] public async Task DequeueAsync_WithMultipleDequeueCallsWaiting_WhenSingleRequestIsQueued_ThenOnlyOneCallersReceivesRequest() { @@ -480,7 +484,7 @@ public async Task DequeueAsync_WithMultipleDequeueCallsWaiting_WhenSingleRequest var sut = new PendingRequestQueueBuilder().WithEndpoint(endpoint).Build(); var request = new RequestMessageBuilder(endpoint).Build(); var expectedResponse = ResponseMessageBuilder.FromRequest(request).Build(); - + var dequeueTasks = Enumerable.Range(0, 30) .Select(_ => sut.DequeueAsync(CancellationToken)) .ToArray(); @@ -558,8 +562,8 @@ async Task WaitForQueueCountToBecome(IPendingRequestQueue pendingRequestQueue, i } Task StartQueueAndWait( - IPendingRequestQueue pendingRequestQueue, - RequestMessage request, + IPendingRequestQueue pendingRequestQueue, + RequestMessage request, CancellationToken requestCancellationToken) { var task = Task.Run( @@ -568,8 +572,8 @@ Task StartQueueAndWait( return task; } - async Task<(Task queueAndWaitTask, RequestMessage dequeued)> QueueAndDequeueRequest_ForTimeoutTestingOnly_ToCopeWithRaceCondition( - IPendingRequestQueue sut, + async Task<(Task queueAndWaitTask, RequestMessageWithCancellationToken dequeued)> QueueAndDequeueRequest_ForTimeoutTestingOnly_ToCopeWithRaceCondition( + IPendingRequestQueue sut, RequestMessage request, CancellationToken cancellationToken) { @@ -588,7 +592,7 @@ Task StartQueueAndWait( // So if dequeued is null, then try again. if (dequeued is not null) { - return (queueAndWaitTask, dequeued.RequestMessage); + return (queueAndWaitTask, dequeued); } cancellationToken.ThrowIfCancellationRequested(); @@ -606,9 +610,9 @@ static async Task ApplyResponsesConcurrentlyAndEnsureAllQueueResponsesMatch( //Concurrently apply responses to prove this does not cause issues. var applyResponseTasks = requestsInOrder - .Select((r,i) => Task.Factory.StartNew(async () => await sut.ApplyResponse(expectedResponsesInOrder[i], r.Destination))) + .Select((r, i) => Task.Factory.StartNew(async () => await sut.ApplyResponse(expectedResponsesInOrder[i], r.Destination))) .ToList(); - + await Task.WhenAll(applyResponseTasks); var index = 0; diff --git a/source/Halibut.Tests/Support/LatestClientAndLatestServiceBuilderExtensionMethods.cs b/source/Halibut.Tests/Support/LatestClientAndLatestServiceBuilderExtensionMethods.cs index 997801c1d..389414122 100644 --- a/source/Halibut.Tests/Support/LatestClientAndLatestServiceBuilderExtensionMethods.cs +++ b/source/Halibut.Tests/Support/LatestClientAndLatestServiceBuilderExtensionMethods.cs @@ -59,13 +59,6 @@ public static LatestClientAndLatestServiceBuilder WithInstantReconnectPollingRet { return builder.WithPollingReconnectRetryPolicy(() => new RetryPolicy(1, TimeSpan.Zero, TimeSpan.Zero)); } - - public static LatestClientAndLatestServiceBuilder WhenTestingAsyncClient(this LatestClientAndLatestServiceBuilder builder, ClientAndServiceTestCase clientAndServiceTestCase, Action action) - { - - action(builder); - return builder; - } public static LatestClientAndLatestServiceBuilder WithConnectionObserverOnTcpServer(this LatestClientAndLatestServiceBuilder builder, IConnectionsObserver connectionsObserver) { diff --git a/source/Halibut.Tests/Timeouts/SendingAndReceivingRequestMessagesTimeoutsFixture.cs b/source/Halibut.Tests/Timeouts/SendingAndReceivingRequestMessagesTimeoutsFixture.cs index 5a1616394..bbf1d4e34 100644 --- a/source/Halibut.Tests/Timeouts/SendingAndReceivingRequestMessagesTimeoutsFixture.cs +++ b/source/Halibut.Tests/Timeouts/SendingAndReceivingRequestMessagesTimeoutsFixture.cs @@ -29,12 +29,7 @@ public async Task HalibutTimeoutsAndLimits_AppliesToTcpClientReceiveTimeout(Clie .WithPortForwarding(out var portForwarderRef) .WithEchoService() .WithDoSomeActionService(() => portForwarderRef.Value.PauseExistingConnections()) - .WhenTestingAsyncClient(clientAndServiceTestCase, b => - { - b.WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimitsForTestsBuilder().Build() - .WithAllTcpTimeoutsTo(TimeSpan.FromSeconds(133)) - .WithTcpClientReceiveTimeout(expectedTimeout)); - }) + .WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimitsForTestsBuilder().Build().WithAllTcpTimeoutsTo(TimeSpan.FromSeconds(133)).WithTcpClientReceiveTimeout(expectedTimeout)) .WithInstantReconnectPollingRetryPolicy() .Build(CancellationToken)) { @@ -48,6 +43,7 @@ public async Task HalibutTimeoutsAndLimits_AppliesToTcpClientReceiveTimeout(Clie sw.Stop(); Logger.Error(e, "Received error"); AssertExceptionMessageLooksLikeAReadTimeout(e); + sw.Elapsed.Should().BeGreaterThan(expectedTimeout - TimeSpan.FromSeconds(2), "The receive timeout should apply, not the shorter heart beat timeout") // -2s give it a little slack to avoid it timed out slightly too early. .And .BeLessThan(expectedTimeout + HalibutTimeoutsAndLimitsForTestsBuilder.HalfTheTcpReceiveTimeout, "We should be timing out on the tcp receive timeout"); diff --git a/source/Halibut/ServiceModel/PendingRequestQueueAsync.cs b/source/Halibut/ServiceModel/PendingRequestQueueAsync.cs index e0c8b5195..65df5e2b8 100644 --- a/source/Halibut/ServiceModel/PendingRequestQueueAsync.cs +++ b/source/Halibut/ServiceModel/PendingRequestQueueAsync.cs @@ -199,7 +199,11 @@ public async Task WaitUntilComplete(CancellationToken cancellationToken) try { - responseSet = await WaitForResponseToBeSet(request.Destination.PollingRequestQueueTimeout, cancellationToken); + responseSet = await WaitForResponseToBeSet( + request.Destination.PollingRequestQueueTimeout, + // Don't cancel a dequeued request as we need to wait PollingRequestMaximumMessageProcessingTimeout for it to complete + cancelTheRequestWhenTransferHasBegun: false, + cancellationToken); if (responseSet) { @@ -237,7 +241,11 @@ public async Task WaitUntilComplete(CancellationToken cancellationToken) if (waitForTransferToComplete) { - responseSet = await WaitForResponseToBeSet(request.Destination.PollingRequestMaximumMessageProcessingTimeout, cancellationToken); + responseSet = await WaitForResponseToBeSet( + request.Destination.PollingRequestMaximumMessageProcessingTimeout, + // Cancel the dequeued request to force Reads and Writes to be cancelled + cancelTheRequestWhenTransferHasBegun: true, + cancellationToken); if (responseSet) { @@ -272,7 +280,7 @@ public async Task WaitUntilComplete(CancellationToken cancellationToken) } } - async Task WaitForResponseToBeSet(TimeSpan timeout, CancellationToken cancellationToken) + async Task WaitForResponseToBeSet(TimeSpan timeout, bool cancelTheRequestWhenTransferHasBegun, CancellationToken cancellationToken) { using var timeoutCancellationTokenSource = new CancellationTokenSource(timeout); @@ -283,16 +291,24 @@ async Task WaitForResponseToBeSet(TimeSpan timeout, CancellationToken canc } catch (OperationCanceledException ex) { - // Cancel the Request to force cancellation to the socket if it is currently being transferred - CancelRequest(); - - if (timeoutCancellationTokenSource.IsCancellationRequested) - { - return false; - } - using (await transferLock.LockAsync(CancellationToken.None)) { + if (transferBegun && cancelTheRequestWhenTransferHasBegun) + { + // Cancel the dequeued request. This will cause co-operative cancellation on the thread dequeuing the request + pendingRequestCancellationTokenSource.Cancel(); + } + else if (!transferBegun) + { + // Cancel the queued request. This will flag the request as cancelled to stop it being dequeued + pendingRequestCancellationTokenSource.Cancel(); + } + + if (timeoutCancellationTokenSource.IsCancellationRequested) + { + return false; + } + throw transferBegun ? new TransferringRequestCancelledException(ex) : new ConnectingRequestCancelledException(ex); } } @@ -311,7 +327,9 @@ public async Task BeginTransfer() { using (await transferLock.LockAsync(CancellationToken.None)) { - if (completed) + // Check if the request has already been completed or if the request has been cancelled + // to ensure we don't dequeue an already completed or already cancelled request + if (completed || pendingRequestCancellationTokenSource.IsCancellationRequested) { return false; } @@ -335,11 +353,6 @@ public void SetResponse(ResponseMessage response) responseWaiter.Set(); } - void CancelRequest() - { - pendingRequestCancellationTokenSource.Cancel(); - } - public void Dispose() { pendingRequestCancellationTokenSource?.Dispose(); diff --git a/source/Octopus.TestPortForwarder/PortForwarder.cs b/source/Octopus.TestPortForwarder/PortForwarder.cs index 83806e5a6..736d29b25 100644 --- a/source/Octopus.TestPortForwarder/PortForwarder.cs +++ b/source/Octopus.TestPortForwarder/PortForwarder.cs @@ -7,6 +7,7 @@ using System.Threading.Tasks; using Serilog; using System.Diagnostics.CodeAnalysis; +using Serilog.Core; namespace Octopus.TestPortForwarder { @@ -221,6 +222,7 @@ public void UnPauseExistingConnections() public void PauseExistingConnections() { + logger.Information("Pausing existing connections"); lock (pumps) { foreach (var pump in pumps)