From 6b3e28dabdbc9014fd18dda3816dfa017293c7bc Mon Sep 17 00:00:00 2001 From: Nathan Willoughby Date: Thu, 23 Nov 2023 14:59:05 +1000 Subject: [PATCH] Add ability to Cancel Polling Requests to the Socket Combine connecting and in progress cancellation tokens --- source/Halibut.Tests/BadCertificatesTests.cs | 17 +- .../CancellationViaClientProxyFixture.cs | 92 ++----- .../Halibut.Tests/ExceptionContractFixture.cs | 37 ++- .../PollingServiceTimeoutsFixture.cs | 15 +- .../ResponseMessageCacheFixture.cs | 8 +- .../PendingRequestQueueFixture.cs | 126 +++++----- ...AndLatestServiceBuilderExtensionMethods.cs | 7 - ...questDequeuedPendingRequestQueueFactory.cs | 6 +- ...RequestQueuedPendingRequestQueueFactory.cs | 78 +++--- ...ReceivingRequestMessagesTimeoutsFixture.cs | 8 +- .../ConnectionObserverFixture.cs | 4 +- .../Transport/Protocol/ProtocolFixture.cs | 14 +- .../Transport/SecureClientFixture.cs | 4 +- ...enCancellingARequestForAPollingTentacle.cs | 225 +++++++++++++----- .../Exceptions/RequestCancelledException.cs | 47 ++++ source/Halibut/HalibutRuntime.cs | 16 +- .../HalibutProxyRequestOptions.cs | 25 +- .../ServiceModel/HalibutProxyWithAsync.cs | 27 +-- .../ServiceModel/IPendingRequestQueue.cs | 4 +- .../ServiceModel/PendingRequestQueueAsync.cs | 95 +++++--- .../ServiceModel/RequestCancellationTokens.cs | 56 ----- source/Halibut/Transport/ISecureClient.cs | 4 +- source/Halibut/Transport/PollingClient.cs | 30 +-- .../Protocol/MessageExchangeProtocol.cs | 23 +- .../Transport/Protocol/RequestMessage.cs | 1 - .../RequestMessageWithCancellationToken.cs | 16 ++ .../Transport/Protocol/ResponseMessage.cs | 5 +- source/Halibut/Transport/SecureClient.cs | 22 +- .../Transport/SecureListeningClient.cs | 50 ++-- .../Transport/SecureWebSocketClient.cs | 15 +- .../Transport/Streams/NetworkTimeoutStream.cs | 80 +++---- source/Halibut/Util/Try.cs | 13 + .../PortForwarder.cs | 2 + 33 files changed, 607 insertions(+), 565 deletions(-) create mode 100644 source/Halibut/Exceptions/RequestCancelledException.cs delete mode 100644 source/Halibut/ServiceModel/RequestCancellationTokens.cs create mode 100644 source/Halibut/Transport/Protocol/RequestMessageWithCancellationToken.cs diff --git a/source/Halibut.Tests/BadCertificatesTests.cs b/source/Halibut.Tests/BadCertificatesTests.cs index f1ddb33c3..953360548 100644 --- a/source/Halibut.Tests/BadCertificatesTests.cs +++ b/source/Halibut.Tests/BadCertificatesTests.cs @@ -99,7 +99,7 @@ public async Task FailWhenPollingServicePresentsWrongCertificate_ButServiceIsCon }); // Act - var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None)), CancellationToken); + var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token)), CancellationToken); // Interestingly the message exchange error is logged to a non polling looking URL, perhaps because it has not been identified? Wait.UntilActionSucceeds(() => { @@ -148,14 +148,14 @@ public async Task FailWhenPollingServiceHasThumbprintRemovedViaTrustOnly(ClientA }); // Works normally - await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None)); - await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None)); + await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token)); + await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token)); // Act clientAndBuilder.Client.TrustOnly(new List()); // Assert - var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None)), CancellationToken); + var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token)), CancellationToken); await Task.Delay(3000, CancellationToken); @@ -164,8 +164,7 @@ public async Task FailWhenPollingServiceHasThumbprintRemovedViaTrustOnly(ClientA var exception = await AssertException.Throws(incrementCount); exception.And.Should().Match(e => e.GetType() == typeof(HalibutClientException) - || e.GetType() == typeof(OperationCanceledException) - || e.GetType() == typeof(TaskCanceledException)); + || e is OperationCanceledException); } } @@ -213,8 +212,8 @@ public async Task FailWhenClientPresentsWrongCertificateToPollingService(ClientA point.PollingRequestQueueTimeout = TimeSpan.FromSeconds(2000); }); - var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None)), CancellationToken); - + var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token)), CancellationToken); + Func hasExpectedLog = logEvent => logEvent.FormattedMessage.Contains("The server at") && logEvent.FormattedMessage.Contains("presented an unexpected security certificate"); @@ -270,7 +269,7 @@ public async Task FailWhenPollingServicePresentsWrongCertificate(ClientAndServic point.PollingRequestQueueTimeout = TimeSpan.FromSeconds(10); }); - var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None)), CancellationToken); + var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token)), CancellationToken); // Interestingly the message exchange error is logged to a non polling looking URL, perhaps because it has not been identified? Wait.UntilActionSucceeds(() => { AllLogs(serviceLoggers).Select(l => l.FormattedMessage).ToArray() diff --git a/source/Halibut.Tests/CancellationViaClientProxyFixture.cs b/source/Halibut.Tests/CancellationViaClientProxyFixture.cs index 33fe49b27..046364373 100644 --- a/source/Halibut.Tests/CancellationViaClientProxyFixture.cs +++ b/source/Halibut.Tests/CancellationViaClientProxyFixture.cs @@ -3,7 +3,7 @@ using System.Threading; using System.Threading.Tasks; using FluentAssertions; -using Halibut.Logging; +using Halibut.Exceptions; using Halibut.ServiceModel; using Halibut.Tests.Support; using Halibut.Tests.Support.ExtensionMethods; @@ -21,27 +21,11 @@ public class CancellationViaClientProxyFixture : BaseTest [Test] [LatestClientAndLatestServiceTestCases(testNetworkConditions: false)] [LatestClientAndPreviousServiceVersionsTestCases(testNetworkConditions: false)] - public async Task HalibutProxyRequestOptions_ConnectingCancellationToken_CanCancel_ConnectingOrQueuedRequests(ClientAndServiceTestCase clientAndServiceTestCase) + public async Task HalibutProxyRequestOptions_RequestCancellationToken_CanCancel_ConnectingOrQueuedRequests(ClientAndServiceTestCase clientAndServiceTestCase) { var tokenSourceToCancel = new CancellationTokenSource(); - var halibutRequestOption = new HalibutProxyRequestOptions(tokenSourceToCancel.Token, CancellationToken.None); + var halibutRequestOption = new HalibutProxyRequestOptions(tokenSourceToCancel.Token); - await CanCancel_ConnectingOrQueuedRequests(clientAndServiceTestCase, tokenSourceToCancel, halibutRequestOption); - } - - [Test] - [LatestClientAndLatestServiceTestCases(testNetworkConditions: false)] - [LatestClientAndPreviousServiceVersionsTestCases(testNetworkConditions: false)] - public async Task HalibutProxyRequestOptions_InProgressCancellationToken_CanCancel_ConnectingOrQueuedRequests(ClientAndServiceTestCase clientAndServiceTestCase) - { - var tokenSourceToCancel = new CancellationTokenSource(); - var halibutRequestOption = new HalibutProxyRequestOptions(CancellationToken.None, tokenSourceToCancel.Token); - - await CanCancel_ConnectingOrQueuedRequests(clientAndServiceTestCase, tokenSourceToCancel, halibutRequestOption); - } - - async Task CanCancel_ConnectingOrQueuedRequests(ClientAndServiceTestCase clientAndServiceTestCase, CancellationTokenSource tokenSourceToCancel, HalibutProxyRequestOptions halibutRequestOption) - { await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() .WithPortForwarding(out var portForwarderRef, port => PortForwarderUtil.ForwardingToLocalPort(port).Build()) .WithStandardServices() @@ -55,64 +39,19 @@ async Task CanCancel_ConnectingOrQueuedRequests(ClientAndServiceTestCase clientA point => point.TryAndConnectForALongTime()); tokenSourceToCancel.CancelAfter(TimeSpan.FromMilliseconds(100)); - - (await AssertException.Throws(() => echo.IncrementAsync(halibutRequestOption))) - .And.Message.Contains("The operation was canceled"); + + (await AssertionExtensions.Should(() => echo.IncrementAsync(halibutRequestOption)).ThrowAsync()) + .And.Should().Match(x => x is ConnectingRequestCancelledException || (x is HalibutClientException && x.As().Message.Contains("The Request was cancelled while Connecting"))); portForwarderRef.Value.ReturnToNormalMode(); - await echo.IncrementAsync(new HalibutProxyRequestOptions(CancellationToken, CancellationToken.None)); + await echo.IncrementAsync(new HalibutProxyRequestOptions(CancellationToken)); - (await echo.GetCurrentValueAsync(new HalibutProxyRequestOptions(CancellationToken, CancellationToken.None))) + (await echo.GetCurrentValueAsync(new HalibutProxyRequestOptions(CancellationToken))) .Should().Be(1, "Since we cancelled the first call"); } } - [Test] - [LatestClientAndLatestServiceTestCases(testNetworkConditions: false)] - [LatestClientAndPreviousServiceVersionsTestCases(testNetworkConditions: false)] - public async Task HalibutProxyRequestOptions_ConnectingCancellationToken_CanNotCancel_InFlightRequests(ClientAndServiceTestCase clientAndServiceTestCase) - { - await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() - .WithHalibutLoggingLevel(LogLevel.Trace) - .WithStandardServices() - .Build(CancellationToken)) - { - var lockService = clientAndService.CreateAsyncClient(); - - var tokenSourceToCancel = new CancellationTokenSource(); - using var tmpDir = new TemporaryDirectory(); - var fileThatOnceDeletedEndsTheCall = tmpDir.CreateRandomFile(); - var callStartedFile = tmpDir.RandomFileName(); - - var inFlightRequest = Task.Run(async () => await lockService.WaitForFileToBeDeletedAsync( - fileThatOnceDeletedEndsTheCall, - callStartedFile, - new HalibutProxyRequestOptions(tokenSourceToCancel.Token, CancellationToken.None))); - - Logger.Information("Waiting for the RPC call to be inflight"); - while (!File.Exists(callStartedFile)) - { - await Task.Delay(TimeSpan.FromMilliseconds(100), CancellationToken); - } - - // The call is now in flight. Call cancel on the cancellation token for that in flight request. - tokenSourceToCancel.Cancel(); - - // Give time for the cancellation to do something - await Task.Delay(TimeSpan.FromSeconds(2), CancellationToken); - - if (inFlightRequest.Status == TaskStatus.Faulted) await inFlightRequest; - - inFlightRequest.IsCompleted.Should().Be(false, $"The cancellation token can not cancel in flight requests. Current state: {inFlightRequest.Status}"); - - File.Delete(fileThatOnceDeletedEndsTheCall); - - // Now the lock is released we should be able to complete the request. - await inFlightRequest; - } - } - [Test] // TODO: ASYNC ME UP! // net48 does not support cancellation of the request as the DeflateStream ends up using Begin and End methods which don't get passed the cancellation token @@ -123,7 +62,7 @@ public async Task HalibutProxyRequestOptions_ConnectingCancellationToken_CanNotC [LatestClientAndLatestServiceTestCases(testNetworkConditions: false)] [LatestClientAndPreviousServiceVersionsTestCases(testNetworkConditions: false)] #endif - public async Task HalibutProxyRequestOptions_InProgressCancellationToken_CanCancel_InFlightRequests(ClientAndServiceTestCase clientAndServiceTestCase) + public async Task HalibutProxyRequestOptions_RequestCancellationToken_CanCancel_InFlightRequests(ClientAndServiceTestCase clientAndServiceTestCase) { await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() .WithStandardServices() @@ -141,7 +80,7 @@ public async Task HalibutProxyRequestOptions_InProgressCancellationToken_CanCanc await lockService.WaitForFileToBeDeletedAsync( fileThatOnceDeletedEndsTheCall, callStartedFile, - new HalibutProxyRequestOptions(CancellationToken.None, tokenSourceToCancel.Token)); + new HalibutProxyRequestOptions(tokenSourceToCancel.Token)); }); Logger.Information("Waiting for the RPC call to be inflight"); @@ -156,9 +95,12 @@ await lockService.WaitForFileToBeDeletedAsync( // Give time for the cancellation to do something await Task.Delay(TimeSpan.FromSeconds(2), CancellationToken); - (await AssertException.Throws(inFlightRequest)) - .And - .Should().Match(x => x is OperationCanceledException || (x.GetType() == typeof(HalibutClientException) && x.Message.Contains("The ReadAsync operation was cancelled"))); +#pragma warning disable VSTHRD003 +#pragma warning disable VSTHRD003 + (await AssertionExtensions.Should(async () => await inFlightRequest).ThrowAsync()) +#pragma warning restore VSTHRD003 +#pragma warning restore VSTHRD003 + .And.Should().Match(x => x is TransferringRequestCancelledException || (x is HalibutClientException && x.As().Message.Contains("The Request was cancelled while Transferring"))); } } @@ -186,7 +128,7 @@ public async Task HalibutProxyRequestOptionsCanBeSentToLatestAndOldServicesThatP { var echo = clientAndService.CreateAsyncClient(); - (await echo.SayHelloAsync("Hello!!", new HalibutProxyRequestOptions(CancellationToken, null))) + (await echo.SayHelloAsync("Hello!!", new HalibutProxyRequestOptions(CancellationToken))) .Should() .Be("Hello!!..."); } diff --git a/source/Halibut.Tests/ExceptionContractFixture.cs b/source/Halibut.Tests/ExceptionContractFixture.cs index 01e58f4d3..59507c934 100644 --- a/source/Halibut.Tests/ExceptionContractFixture.cs +++ b/source/Halibut.Tests/ExceptionContractFixture.cs @@ -33,7 +33,7 @@ public async Task WhenThePollingRequestQueueTimeoutIsReached_AHalibutClientExcep var client = clientOnly.CreateClientWithoutService(); - (await AssertException.Throws(async () => await client.SayHelloAsync("Hello", new(CancellationToken, CancellationToken.None)))) + (await AssertException.Throws(async () => await client.SayHelloAsync("Hello", new(CancellationToken)))) .And.Message.Should().Contain("A request was sent to a polling endpoint, but the polling endpoint did not collect the request within the allowed time (00:00:05), so the request timed out."); } @@ -55,7 +55,7 @@ public async Task WhenThePollingRequestMaximumMessageProcessingTimeoutIsReached_ var doSomeActionClient = clientAndService.CreateAsyncClient(); - (await AssertException.Throws(async () => await doSomeActionClient.ActionAsync(new(CancellationToken, CancellationToken.None)))) + (await AssertException.Throws(async () => await doSomeActionClient.ActionAsync(new(CancellationToken)))) .And.Message.Should().Contain("A request was sent to a polling endpoint, the polling endpoint collected it but did not respond in the allowed time (00:00:06), so the request timed out."); waitSemaphore.Release(); @@ -73,7 +73,7 @@ public async Task WhenThePollingRequestIsCancelledWhileQueued_AnOperationCancele var client = clientOnly.CreateClientWithoutService(); - await AssertException.Throws(async () => await client.SayHelloAsync("Hello", new(cancellationTokenSource.Token, CancellationToken.None))); + await AssertException.Throws(async () => await client.SayHelloAsync("Hello", new(cancellationTokenSource.Token))); } [Test] @@ -92,7 +92,7 @@ public async Task WhenThePollingRequestIsCancelledWhileDequeued_AnOperationCance var doSomeActionClient = clientAndService.CreateAsyncClient(); - await AssertException.Throws(async () => await doSomeActionClient.ActionAsync(new(CancellationToken.None, cancellationTokenSource.Token))); + await AssertException.Throws(async () => await doSomeActionClient.ActionAsync(new(cancellationTokenSource.Token))); waitSemaphore.Release(); } @@ -113,7 +113,7 @@ public async Task WhenTheListeningRequestFailsToBeSent_AsTheServiceDoesNotAccept var client = clientAndService.CreateAsyncClient(); - (await AssertException.Throws(async () => await client.SayHelloAsync("Hello", new(CancellationToken, CancellationToken.None)))) + (await AssertException.Throws(async () => await client.SayHelloAsync("Hello", new(CancellationToken)))) .And.Message.Should().ContainAny( $"An error occurred when sending a request to '{clientAndService.ServiceUri}', before the request could begin: No connection could be made because the target machine actively refused it", $"An error occurred when sending a request to '{clientAndService.ServiceUri}', before the request could begin: Connection refused"); @@ -136,7 +136,7 @@ public async Task WhenTheListeningRequestFailsToBeSent_AsTheServiceRejectsTheCon var client = clientAndService.CreateAsyncClient(); - (await AssertException.Throws(async () => await client.SayHelloAsync("Hello", new(CancellationToken, CancellationToken.None)))) + (await AssertException.Throws(async () => await client.SayHelloAsync("Hello", new(CancellationToken)))) .And.Message.Should().ContainAny( $"An error occurred when sending a request to '{clientAndService.ServiceUri}', before the request could begin: Unable to read data from the transport connection:", $"An error occurred when sending a request to '{clientAndService.ServiceUri}', before the request could begin: Unable to write data to the transport connection:"); @@ -158,7 +158,7 @@ public async Task WhenTheListeningRequestFailsToBeSent_AsTheConnectionAttemptToT // We need to use a non localhost address to get a timeout var client = clientOnly.CreateClient(new Uri("https://20.5.79.31:10933/")); - (await AssertException.Throws(async () => await client.SayHelloAsync("Hello", new(CancellationToken, CancellationToken.None)))) + (await AssertException.Throws(async () => await client.SayHelloAsync("Hello", new(CancellationToken)))) .And.Message.Should().Contain( $"An error occurred when sending a request to 'https://20.5.79.31:10933/', before the request could begin: The client was unable to establish the initial connection within the timeout 00:00:01."); } @@ -186,10 +186,10 @@ public async Task WhenTheListeningRequestIsCancelledImmediatelyWhileTryingToConn (await AssertException.Throws(async () => { - var task = client.SayHelloAsync("Hello", new(cancellationTokenSource.Token, CancellationToken.None)); + var task = client.SayHelloAsync("Hello", new(cancellationTokenSource.Token)); cancellationTokenSource.Cancel(); await task; - })).And.Message.Should().Contain($"An error occurred when sending a request to 'https://20.5.79.31:10933/', after the request began: The operation was canceled."); + })).And.Message.Should().Contain($"An error occurred when sending a request to 'https://20.5.79.31:10933/', after the request began: The Request was cancelled while Connecting."); } [Test] @@ -215,8 +215,8 @@ public async Task WhenTheListeningRequestIsCancelledWhileConnecting_AHalibutClie var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5)); - (await AssertException.Throws(async () => await client.SayHelloAsync("Hello", new(cancellationTokenSource.Token, CancellationToken.None)))) - .And.Message.Should().Contain("An error occurred when sending a request to 'https://20.5.79.31:10933/', after the request began: The operation was canceled."); + (await AssertException.Throws(async () => await client.SayHelloAsync("Hello", new(cancellationTokenSource.Token)))) + .And.Message.Should().Contain("An error occurred when sending a request to 'https://20.5.79.31:10933/', after the request began: The Request was cancelled while Connecting."); } [Test] @@ -240,15 +240,8 @@ public async Task WhenTheListeningRequestIsCancelledWhileConnecting_AndTheConnec var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5)); -#if NETFRAMEWORK - // net48 does not support cancellation of the request as the DeflateStream ends up using Begin and End methods which don't get passed the cancellation token - // This results in a different exception message / a different code path executing. This highlights an inconsistency in the response from - // Halibut due to the way the code is written, so leaving this here as it should really be consistent. - await AssertException.Throws(async () => await client.SayHelloAsync("Hello", new(cancellationTokenSource.Token, CancellationToken.None))); -#else - var exception = (await AssertException.Throws(async () => await client.SayHelloAsync("Hello", new(cancellationTokenSource.Token, CancellationToken.None)))).And; - exception.Message.Should().Be($"An error occurred when sending a request to '{clientAndService.ServiceUri}', after the request began: The ReadAsync operation was cancelled."); -#endif + var exception = (await AssertException.Throws(async () => await client.SayHelloAsync("Hello", new(cancellationTokenSource.Token)))).And; + exception.Message.Should().Be($"An error occurred when sending a request to '{clientAndService.ServiceUri}', after the request began: The Request was cancelled while Connecting."); } // net48 does not support cancellation of the request as the DeflateStream ends up using Begin and End methods which don't get passed the cancellation token @@ -279,8 +272,8 @@ public async Task WhenTheListeningRequestIsCancelledWhileInProgress_AnOperationC cancellationTokenSource.Cancel(); }); - (await AssertException.Throws(async () => await doSomeActionClient.ActionAsync(new(cancellationTokenSource.Token, cancellationTokenSource.Token)))) - .And.Message.Should().Contain($"An error occurred when sending a request to '{clientAndService.ServiceUri}', after the request began: The ReadAsync operation was cancelled."); + (await AssertException.Throws(async () => await doSomeActionClient.ActionAsync(new(cancellationTokenSource.Token)))) + .And.Message.Should().Contain($"An error occurred when sending a request to '{clientAndService.ServiceUri}', after the request began: The Request was cancelled while Transferring."); waitSemaphore.Release(); await cancellationTask; diff --git a/source/Halibut.Tests/PollingServiceTimeoutsFixture.cs b/source/Halibut.Tests/PollingServiceTimeoutsFixture.cs index 9697b3b39..1107cdbab 100644 --- a/source/Halibut.Tests/PollingServiceTimeoutsFixture.cs +++ b/source/Halibut.Tests/PollingServiceTimeoutsFixture.cs @@ -24,13 +24,13 @@ public async Task WhenThePollingRequestQueueTimeoutIsReached_TheRequestShouldTim await using (var clientOnly = await clientAndServiceTestCase.CreateClientOnlyTestCaseBuilder() .AsLatestClientBuilder() - .WithHalibutTimeoutsAndLimits(halibutTimeoutsAndLimits) - .Build(CancellationToken)) + .WithHalibutTimeoutsAndLimits(halibutTimeoutsAndLimits) + .Build(CancellationToken)) { var client = clientOnly.CreateClientWithoutService(); var stopwatch = Stopwatch.StartNew(); - (await AssertException.Throws(async () => await client.SayHelloAsync("Hello", new(CancellationToken, CancellationToken.None)))) + (await AssertException.Throws(async () => await client.SayHelloAsync("Hello", new(CancellationToken)))) .And.Message.Should().Contain("A request was sent to a polling endpoint, but the polling endpoint did not collect the request within the allowed time (00:00:05), so the request timed out."); stopwatch.Stop(); @@ -58,7 +58,7 @@ public async Task WhenThePollingRequestQueueTimeoutIsReached_ButTheResponseIsRec var doSomeActionClient = clientAndService.CreateAsyncClient(); var stopwatch = Stopwatch.StartNew(); - await doSomeActionClient.ActionAsync(new(CancellationToken, CancellationToken.None)); + await doSomeActionClient.ActionAsync(new(CancellationToken)); stopwatch.Stop(); stopwatch.Elapsed.Should() @@ -88,7 +88,7 @@ public async Task WhenThePollingRequestMaximumMessageProcessingTimeoutIsReached_ var doSomeActionClient = clientAndService.CreateAsyncClient(); var stopwatch = Stopwatch.StartNew(); - (await AssertException.Throws(async () => await doSomeActionClient.ActionAsync(new(CancellationToken, CancellationToken.None)))) + (await AssertException.Throws(async () => await doSomeActionClient.ActionAsync(new(CancellationToken)))) .And.Message.Should().Contain("A request was sent to a polling endpoint, the polling endpoint collected it but did not respond in the allowed time (00:00:06), so the request timed out."); stopwatch.Stop(); @@ -100,9 +100,8 @@ public async Task WhenThePollingRequestMaximumMessageProcessingTimeoutIsReached_ Wait.UntilActionSucceeds(() => { - // Leaving these asserts as when cooperative cancellation is supported it should cause them to fail at which point they can be fixed to assert cancellation to the socket works as expected. - connectionsObserver.ConnectionClosedCount.Should().Be(0, "Cancelling the PendingRequest does not cause the TCP Connection to be cancelled to stop the in-flight request"); - connectionsObserver.ConnectionAcceptedCount.Should().Be(1, "The Service won't have reconnected after the request was cancelled"); + connectionsObserver.ConnectionClosedCount.Should().Be(1, "Cancelling the PendingRequest does not cause the TCP Connection to be cancelled to stop the in-flight request"); + connectionsObserver.ConnectionAcceptedCount.Should().Be(2, "The Service won't have reconnected after the request was cancelled"); }, TimeSpan.FromSeconds(30), Logger, CancellationToken); } } diff --git a/source/Halibut.Tests/ResponseMessageCacheFixture.cs b/source/Halibut.Tests/ResponseMessageCacheFixture.cs index 23a8a697d..5bfe75633 100644 --- a/source/Halibut.Tests/ResponseMessageCacheFixture.cs +++ b/source/Halibut.Tests/ResponseMessageCacheFixture.cs @@ -48,8 +48,8 @@ public async Task ForAServiceThatDoesNotSupportCaching_WithClientInterface_Respo { var client = clientAndService.CreateAsyncClient(); - var result1 = await client.NonCachableCallAsync(new HalibutProxyRequestOptions(CancellationToken, CancellationToken.None)); - var result2 = await client.NonCachableCallAsync(new HalibutProxyRequestOptions(CancellationToken, CancellationToken.None)); + var result1 = await client.NonCachableCallAsync(new HalibutProxyRequestOptions(CancellationToken)); + var result2 = await client.NonCachableCallAsync(new HalibutProxyRequestOptions(CancellationToken)); result1.Should().NotBe(result2); } @@ -84,8 +84,8 @@ public async Task ForAServiceThatSupportsCaching_WithClientInterface_ResponseSho { var client = clientAndService.CreateAsyncClient(); - var result1 = await client.CachableCallAsync(new HalibutProxyRequestOptions(CancellationToken, CancellationToken.None)); - var result2 = await client.CachableCallAsync(new HalibutProxyRequestOptions(CancellationToken, CancellationToken.None)); + var result1 = await client.CachableCallAsync(new HalibutProxyRequestOptions(CancellationToken)); + var result2 = await client.CachableCallAsync(new HalibutProxyRequestOptions(CancellationToken)); result1.Should().Be(result2); } diff --git a/source/Halibut.Tests/ServiceModel/PendingRequestQueueFixture.cs b/source/Halibut.Tests/ServiceModel/PendingRequestQueueFixture.cs index c32f78ef5..8857b78dc 100644 --- a/source/Halibut.Tests/ServiceModel/PendingRequestQueueFixture.cs +++ b/source/Halibut.Tests/ServiceModel/PendingRequestQueueFixture.cs @@ -9,7 +9,6 @@ using Halibut.ServiceModel; using Halibut.Tests.Builders; using Halibut.Tests.Support; -using Halibut.Tests.Support.TestAttributes; using Halibut.Transport.Protocol; using NUnit.Framework; @@ -26,17 +25,16 @@ public async Task QueueAndWait_WillContinueWaitingUntilResponseIsApplied() var sut = new PendingRequestQueueBuilder().WithEndpoint(endpoint).Build(); var request = new RequestMessageBuilder(endpoint).Build(); var expectedResponse = ResponseMessageBuilder.FromRequest(request).Build(); - + var queueAndWaitTask = await StartQueueAndWaitAndWaitForRequestToBeQueued(sut, request, CancellationToken); await sut.DequeueAsync(CancellationToken); - // Act await Task.Delay(1000, CancellationToken); queueAndWaitTask.IsCompleted.Should().BeFalse(); await sut.ApplyResponse(expectedResponse, request.Destination); - + // Assert var response = await queueAndWaitTask; response.Should().Be(expectedResponse); @@ -86,7 +84,7 @@ public async Task QueueAndWait_WhenPollingRequestQueueTimeoutIsReached_WillStopW var request = new RequestMessageBuilder(endpoint) .WithServiceEndpoint(seb => seb.WithPollingRequestQueueTimeout(TimeSpan.FromMilliseconds(1000))) .Build(); - + // Act var stopwatch = Stopwatch.StartNew(); var queueAndWaitTask = await StartQueueAndWaitAndWaitForRequestToBeQueued(sut, request, CancellationToken); @@ -119,8 +117,10 @@ public async Task QueueAndWait_WhenRequestIsDequeued_ButPollingRequestQueueTimeo // Act var stopwatch = Stopwatch.StartNew(); - var (queueAndWaitTask, _) = await QueueAndDequeueRequest_ForTimeoutTestingOnly_ToCopeWithRaceCondition(sut, request, CancellationToken); + var (queueAndWaitTask, dequeued) = await QueueAndDequeueRequest_ForTimeoutTestingOnly_ToCopeWithRaceCondition(sut, request, CancellationToken); + var response = await queueAndWaitTask; + dequeued.CancellationToken.IsCancellationRequested.Should().BeTrue("Should have cancelled the request when the PollingRequestMaximumMessageProcessingTimeout is reached"); // Assert // Although we sleep for 2 second, sometimes it can be just under. So be generous with the buffer. @@ -131,7 +131,7 @@ public async Task QueueAndWait_WhenRequestIsDequeued_ButPollingRequestQueueTimeo var next = await sut.DequeueAsync(CancellationToken); next.Should().BeNull(); } - + [Test] public async Task QueueAndWait_WhenRequestIsDequeued_ButPollingRequestQueueTimeoutIsReached_ShouldWaitTillRequestRespondsAndClearRequest() { @@ -142,28 +142,31 @@ public async Task QueueAndWait_WhenRequestIsDequeued_ButPollingRequestQueueTimeo .WithEndpoint(endpoint) .WithPollingQueueWaitTimeout(TimeSpan.Zero) // Remove delay, otherwise we wait the full 20 seconds for DequeueAsync at the end of the test .Build(); + var request = new RequestMessageBuilder(endpoint) .WithServiceEndpoint(seb => seb.WithPollingRequestQueueTimeout(TimeSpan.FromMilliseconds(1000))) .Build(); + var expectedResponse = ResponseMessageBuilder.FromRequest(request).Build(); // Act var (queueAndWaitTask, dequeued) = await QueueAndDequeueRequest_ForTimeoutTestingOnly_ToCopeWithRaceCondition(sut, request, CancellationToken); await Task.Delay(2000, CancellationToken); + dequeued.CancellationToken.IsCancellationRequested.Should().BeFalse("Should not have cancelled the request after PollingRequestQueueTimeout is reached"); await sut.ApplyResponse(expectedResponse, request.Destination); var response = await queueAndWaitTask; // Assert - dequeued.Should().NotBeNull("We should have removed the item from the queue before it timed out.").And.Be(request); + dequeued.RequestMessage.Should().NotBeNull("We should have removed the item from the queue before it timed out.").And.Be(request); response.Should().Be(expectedResponse); var next = await sut.DequeueAsync(CancellationToken); next.Should().BeNull(); } - + [Test] public async Task QueueAndWait_AddingMultipleItemsToQueueInOrder_ShouldDequeueInOrder() { @@ -188,7 +191,7 @@ public async Task QueueAndWait_AddingMultipleItemsToQueueInOrder_ShouldDequeueIn foreach (var expectedRequest in requestsInOrder) { var request = await sut.DequeueAsync(CancellationToken); - request.Should().Be(expectedRequest); + request!.RequestMessage.Should().Be(expectedRequest); } await ApplyResponsesConcurrentlyAndEnsureAllQueueResponsesMatch(sut, requestsInOrder, queueAndWaitTasksInOrder); @@ -218,7 +221,7 @@ public async Task QueueAndWait_AddingMultipleItemsToQueueConcurrently_AllRequest for (int i = 0; i < requestsInOrder.Count; i++) { var request = await sut.DequeueAsync(CancellationToken); - requests.Add(request); + requests.Add(request?.RequestMessage); } requests.Should().BeEquivalentTo(requestsInOrder); @@ -251,7 +254,7 @@ public async Task QueueAndWait_Can_Queue_Dequeue_Apply_VeryLargeNumberOfRequests .ToList(); await Task.WhenAll(dequeueTasks); - + // Assert await ApplyResponsesConcurrentlyAndEnsureAllQueueResponsesMatch(sut, requestsInOrder, queueAndWaitTasksInOrder); } @@ -266,7 +269,7 @@ public async Task QueueAndWait_Can_Queue_Dequeue_WhenRequestsAreBeingCancelled() const int minimumCancelledRequest = 100; var sut = new PendingRequestQueueBuilder().WithEndpoint(endpoint).Build(); - + var requestsInOrder = Enumerable.Range(0, totalRequest) .Select(_ => new RequestMessageBuilder(endpoint).Build()) .ToList(); @@ -277,16 +280,16 @@ public async Task QueueAndWait_Can_Queue_Dequeue_WhenRequestsAreBeingCancelled() { var requestCancellationTokenSource = new CancellationTokenSource(); return new Tuple, CancellationTokenSource>( - StartQueueAndWait(sut, request, requestCancellationTokenSource.Token), + StartQueueAndWait(sut, request, requestCancellationTokenSource.Token), requestCancellationTokenSource); }) .ToList(); await WaitForQueueCountToBecome(sut, requestsInOrder.Count); - + var index = 0; var cancelled = 0; - var dequeueTasks = new ConcurrentBag>(); + var dequeueTasks = new ConcurrentBag>(); var cancelSomeTask = Task.Run(() => { @@ -294,10 +297,10 @@ public async Task QueueAndWait_Can_Queue_Dequeue_WhenRequestsAreBeingCancelled() { var currentIndex = Interlocked.Increment(ref index); - if(currentIndex % 2 == 0) + if (currentIndex % 2 == 0) { Interlocked.Increment(ref cancelled); - queueAndWaitTasksInOrder.ElementAt(index-1).Item2.Cancel(); + queueAndWaitTasksInOrder.ElementAt(index - 1).Item2.Cancel(); } } }); @@ -357,7 +360,7 @@ public async Task QueueAndWait_CancellingAPendingRequestBeforeItIsDequeued_Shoul } [Test] - public async Task QueueAndWait_CancellingAPendingRequestAfterItIsDequeued_ShouldWaitTillRequestRespondsAndClearRequest() + public async Task QueueAndWait_CancellingAPendingRequestAfterItIsDequeued_ShouldThrowExceptionAndClearRequest() { // Arrange const string endpoint = "poll://endpoint001"; @@ -366,8 +369,8 @@ public async Task QueueAndWait_CancellingAPendingRequestAfterItIsDequeued_Should .WithEndpoint(endpoint) .WithPollingQueueWaitTimeout(TimeSpan.Zero) // Remove delay, otherwise we wait the full 20 seconds for DequeueAsync at the end of the test .Build(); + var request = new RequestMessageBuilder(endpoint).Build(); - var expectedResponse = ResponseMessageBuilder.FromRequest(request).Build(); var cancellationTokenSource = new CancellationTokenSource(); @@ -379,47 +382,22 @@ public async Task QueueAndWait_CancellingAPendingRequestAfterItIsDequeued_Should // Cancel, and give the queue time to start waiting for a response cancellationTokenSource.Cancel(); await Task.Delay(1000, CancellationToken); + dequeued!.CancellationToken.IsCancellationRequested.Should().BeTrue("Should have cancelled the request"); - await sut.ApplyResponse(expectedResponse, request.Destination); - - var response = await queueAndWaitTask; - + +#pragma warning disable VSTHRD003 +#pragma warning disable VSTHRD003 + await AssertionExtensions.Should(() => queueAndWaitTask).ThrowAsync(); +#pragma warning restore VSTHRD003 +#pragma warning restore VSTHRD003 + // Assert - dequeued.Should().NotBeNull().And.Be(request); - response.Should().Be(expectedResponse); - + dequeued?.RequestMessage.Should().NotBeNull().And.Be(request); + var next = await sut.DequeueAsync(CancellationToken); next.Should().BeNull(); } - [Test] - public async Task QueueAndWait_CancellingAPendingRequestAfterItIsDequeued_AndPollingRequestMaximumMessageProcessingTimeoutIsReached_WillStopWaiting() - { - // Arrange - const string endpoint = "poll://endpoint001"; - - var sut = new PendingRequestQueueBuilder().WithEndpoint(endpoint).Build(); - var request = new RequestMessageBuilder(endpoint) - .WithServiceEndpoint(seb => seb.WithPollingRequestMaximumMessageProcessingTimeout(TimeSpan.FromMilliseconds(1000))) - .Build(); - - var cancellationTokenSource = new CancellationTokenSource(); - - // Act - var stopwatch = Stopwatch.StartNew(); - var queueAndWaitTask = await StartQueueAndWaitAndWaitForRequestToBeQueued(sut, request, cancellationTokenSource.Token); - - await sut.DequeueAsync(CancellationToken); - cancellationTokenSource.Cancel(); - var response = await queueAndWaitTask; - - // Assert - // Although we sleep for 1 second, sometimes it can be just under. So be generous with the buffer. - stopwatch.Elapsed.Should().BeGreaterThan(TimeSpan.FromMilliseconds(800)); - response.Id.Should().Be(request.Id); - response.Error!.Message.Should().Be("A request was sent to a polling endpoint, the polling endpoint collected it but did not respond in the allowed time (00:00:01), so the request timed out."); - } - [Test] public async Task DequeueAsync_WithNothingQueued_WillWaitPollingQueueWaitTimeout_ShouldReturnNull() { @@ -430,7 +408,7 @@ public async Task DequeueAsync_WithNothingQueued_WillWaitPollingQueueWaitTimeout .WithEndpoint(endpoint) .WithPollingQueueWaitTimeout(TimeSpan.FromSeconds(1)) .Build(); - + // Act var stopwatch = Stopwatch.StartNew(); var request = await sut.DequeueAsync(CancellationToken); @@ -456,7 +434,7 @@ public async Task DequeueAsync_WithNothingQueued_WillWaitPollingQueueWaitTimeout var previousRequest = new RequestMessageBuilder(endpoint).Build(); var expectedPreviousResponse = ResponseMessageBuilder.FromRequest(previousRequest).Build(); - var queueAndWaitTask = await StartQueueAndWaitAndWaitForRequestToBeQueued(sut, previousRequest ,CancellationToken); + var queueAndWaitTask = await StartQueueAndWaitAndWaitForRequestToBeQueued(sut, previousRequest, CancellationToken); await sut.DequeueAsync(CancellationToken); await sut.ApplyResponse(expectedPreviousResponse, previousRequest.Destination); await queueAndWaitTask; @@ -464,7 +442,7 @@ public async Task DequeueAsync_WithNothingQueued_WillWaitPollingQueueWaitTimeout // Act var stopwatch = Stopwatch.StartNew(); var dequeuedRequest = await sut.DequeueAsync(CancellationToken); - + // Assert // Although we sleep for 1 second, sometimes it can be just under. So be generous with the buffer. stopwatch.Elapsed.Should().BeGreaterThan(TimeSpan.FromMilliseconds(800)); @@ -488,21 +466,21 @@ public async Task DequeueAsync_WillContinueWaitingUntilItemIsQueued() await Task.Delay(1000, CancellationToken); var queueAndWaitTask = StartQueueAndWait(sut, request, CancellationToken); - + var dequeuedRequest = await dequeueTask; - + // Assert // Although we sleep for 1 second, sometimes it can be just under. So be generous with the buffer. stopwatch.Elapsed.Should().BeGreaterThan(TimeSpan.FromMilliseconds(800)); - dequeuedRequest.Should().Be(request); + dequeuedRequest!.RequestMessage.Should().Be(request); // Apply a response so we can prove this counts as taking a message. await sut.ApplyResponse(expectedResponse, request.Destination); var response = await queueAndWaitTask; response.Should().Be(expectedResponse); } - + [Test] public async Task DequeueAsync_WithMultipleDequeueCallsWaiting_WhenSingleRequestIsQueued_ThenOnlyOneCallersReceivesRequest() { @@ -512,7 +490,7 @@ public async Task DequeueAsync_WithMultipleDequeueCallsWaiting_WhenSingleRequest var sut = new PendingRequestQueueBuilder().WithEndpoint(endpoint).Build(); var request = new RequestMessageBuilder(endpoint).Build(); var expectedResponse = ResponseMessageBuilder.FromRequest(request).Build(); - + var dequeueTasks = Enumerable.Range(0, 30) .Select(_ => sut.DequeueAsync(CancellationToken)) .ToArray(); @@ -529,7 +507,7 @@ public async Task DequeueAsync_WithMultipleDequeueCallsWaiting_WhenSingleRequest await queueAndWaitTask; var singleDequeuedRequest = await dequeueTasks.Should().ContainSingle(t => t.Result != null).Subject; - singleDequeuedRequest.Should().Be(request); + singleDequeuedRequest!.RequestMessage.Should().Be(request); } [Test] @@ -570,11 +548,11 @@ public async Task ApplyResponse_AfterRequestHasBeenCollected_AndWaitingHasBeenCa async Task> StartQueueAndWaitAndWaitForRequestToBeQueued( IPendingRequestQueue pendingRequestQueue, RequestMessage request, - CancellationToken queueAndWaitCancellationToken) + CancellationToken requestCancellationToken) { var count = pendingRequestQueue.Count; - var task = StartQueueAndWait(pendingRequestQueue, request, queueAndWaitCancellationToken); + var task = StartQueueAndWait(pendingRequestQueue, request, requestCancellationToken); await WaitForQueueCountToBecome(pendingRequestQueue, count + 1); @@ -589,24 +567,26 @@ async Task WaitForQueueCountToBecome(IPendingRequestQueue pendingRequestQueue, i } } - Task StartQueueAndWait(IPendingRequestQueue pendingRequestQueue, RequestMessage request, CancellationToken queueAndWaitCancellationToken) + Task StartQueueAndWait( + IPendingRequestQueue pendingRequestQueue, + RequestMessage request, + CancellationToken requestCancellationToken) { var task = Task.Run( - async () => await pendingRequestQueue.QueueAndWaitAsync(request, new RequestCancellationTokens(queueAndWaitCancellationToken, CancellationToken.None)), + async () => await pendingRequestQueue.QueueAndWaitAsync(request, requestCancellationToken), CancellationToken); return task; } - async Task<(Task queueAndWaitTask, RequestMessage dequeued)> QueueAndDequeueRequest_ForTimeoutTestingOnly_ToCopeWithRaceCondition( - IPendingRequestQueue sut, + async Task<(Task queueAndWaitTask, RequestMessageWithCancellationToken dequeued)> QueueAndDequeueRequest_ForTimeoutTestingOnly_ToCopeWithRaceCondition( + IPendingRequestQueue sut, RequestMessage request, CancellationToken cancellationToken) { //For most tests, this is not a good method to use. It is a fix for some specific tests to cope with a race condition when Team City runs out of resources (and causes tests to become flaky) - while (true) { - var queueAndWaitTask = await StartQueueAndWaitAndWaitForRequestToBeQueued(sut, request, CancellationToken); + var queueAndWaitTask = await StartQueueAndWaitAndWaitForRequestToBeQueued(sut, request, cancellationToken); sut.Count.Should().Be(1, "Item should be queued"); var dequeued = await sut.DequeueAsync(cancellationToken); @@ -638,7 +618,7 @@ static async Task ApplyResponsesConcurrentlyAndEnsureAllQueueResponsesMatch( var applyResponseTasks = requestsInOrder .Select((r,i) => Task.Run(async () => await sut.ApplyResponse(expectedResponsesInOrder[i], r.Destination))) .ToList(); - + await Task.WhenAll(applyResponseTasks); var index = 0; diff --git a/source/Halibut.Tests/Support/LatestClientAndLatestServiceBuilderExtensionMethods.cs b/source/Halibut.Tests/Support/LatestClientAndLatestServiceBuilderExtensionMethods.cs index 997801c1d..389414122 100644 --- a/source/Halibut.Tests/Support/LatestClientAndLatestServiceBuilderExtensionMethods.cs +++ b/source/Halibut.Tests/Support/LatestClientAndLatestServiceBuilderExtensionMethods.cs @@ -59,13 +59,6 @@ public static LatestClientAndLatestServiceBuilder WithInstantReconnectPollingRet { return builder.WithPollingReconnectRetryPolicy(() => new RetryPolicy(1, TimeSpan.Zero, TimeSpan.Zero)); } - - public static LatestClientAndLatestServiceBuilder WhenTestingAsyncClient(this LatestClientAndLatestServiceBuilder builder, ClientAndServiceTestCase clientAndServiceTestCase, Action action) - { - - action(builder); - return builder; - } public static LatestClientAndLatestServiceBuilder WithConnectionObserverOnTcpServer(this LatestClientAndLatestServiceBuilder builder, IConnectionsObserver connectionsObserver) { diff --git a/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestDequeuedPendingRequestQueueFactory.cs b/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestDequeuedPendingRequestQueueFactory.cs index d62718653..d61291bdb 100644 --- a/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestDequeuedPendingRequestQueueFactory.cs +++ b/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestDequeuedPendingRequestQueueFactory.cs @@ -44,7 +44,7 @@ public Decorator(IPendingRequestQueue inner, CancellationTokenSource[] cancellat public int Count => inner.Count; public async Task ApplyResponse(ResponseMessage response, ServiceEndPoint destination) => await inner.ApplyResponse(response, destination); - public async Task DequeueAsync(CancellationToken cancellationToken) + public async Task DequeueAsync(CancellationToken cancellationToken) { var response = await inner.DequeueAsync(cancellationToken); @@ -53,8 +53,8 @@ public Decorator(IPendingRequestQueue inner, CancellationTokenSource[] cancellat return response; } - public async Task QueueAndWaitAsync(RequestMessage request, RequestCancellationTokens requestCancellationTokens) - => await inner.QueueAndWaitAsync(request, requestCancellationTokens); + public async Task QueueAndWaitAsync(RequestMessage request, CancellationToken cancellationTokens) + => await inner.QueueAndWaitAsync(request, cancellationTokens); } } } diff --git a/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestQueuedPendingRequestQueueFactory.cs b/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestQueuedPendingRequestQueueFactory.cs index ccd56b065..ce5f0cde5 100644 --- a/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestQueuedPendingRequestQueueFactory.cs +++ b/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestQueuedPendingRequestQueueFactory.cs @@ -10,57 +10,57 @@ namespace Halibut.Tests.Support.PendingRequestQueueFactories /// CancelWhenRequestQueuedPendingRequestQueueFactory cancels the cancellation token source when a request is queued /// public class CancelWhenRequestQueuedPendingRequestQueueFactory : IPendingRequestQueueFactory + { + readonly CancellationTokenSource[] cancellationTokenSources; + readonly IPendingRequestQueueFactory inner; + + public CancelWhenRequestQueuedPendingRequestQueueFactory(IPendingRequestQueueFactory inner, CancellationTokenSource[] cancellationTokenSources) + { + this.cancellationTokenSources = cancellationTokenSources; + this.inner = inner; + } + + public CancelWhenRequestQueuedPendingRequestQueueFactory(IPendingRequestQueueFactory inner, CancellationTokenSource cancellationTokenSource) : this(inner, new[]{ cancellationTokenSource }) { + } + + public IPendingRequestQueue CreateQueue(Uri endpoint) + { + return new Decorator(inner.CreateQueue(endpoint), cancellationTokenSources); + } + + class Decorator : IPendingRequestQueue { readonly CancellationTokenSource[] cancellationTokenSources; - readonly IPendingRequestQueueFactory inner; + readonly IPendingRequestQueue inner; - public CancelWhenRequestQueuedPendingRequestQueueFactory(IPendingRequestQueueFactory inner, CancellationTokenSource[] cancellationTokenSources) + public Decorator(IPendingRequestQueue inner, CancellationTokenSource[] cancellationTokenSources) { - this.cancellationTokenSources = cancellationTokenSources; this.inner = inner; + this.cancellationTokenSources = cancellationTokenSources; } - public CancelWhenRequestQueuedPendingRequestQueueFactory(IPendingRequestQueueFactory inner, CancellationTokenSource cancellationTokenSource) : this(inner, new[]{ cancellationTokenSource }) { - } + public bool IsEmpty => inner.IsEmpty; + public int Count => inner.Count; + public async Task ApplyResponse(ResponseMessage response, ServiceEndPoint destination) => await inner.ApplyResponse(response, destination); + public async Task DequeueAsync(CancellationToken cancellationToken) => await inner.DequeueAsync(cancellationToken); - public IPendingRequestQueue CreateQueue(Uri endpoint) + public async Task QueueAndWaitAsync(RequestMessage request, CancellationToken cancellationTokens) { - return new Decorator(inner.CreateQueue(endpoint), cancellationTokenSources); - } - - class Decorator : IPendingRequestQueue - { - readonly CancellationTokenSource[] cancellationTokenSources; - readonly IPendingRequestQueue inner; - - public Decorator(IPendingRequestQueue inner, CancellationTokenSource[] cancellationTokenSources) - { - this.inner = inner; - this.cancellationTokenSources = cancellationTokenSources; - } - - public bool IsEmpty => inner.IsEmpty; - public int Count => inner.Count; - public async Task ApplyResponse(ResponseMessage response, ServiceEndPoint destination) => await inner.ApplyResponse(response, destination); - public async Task DequeueAsync(CancellationToken cancellationToken) => await inner.DequeueAsync(cancellationToken); - - public async Task QueueAndWaitAsync(RequestMessage request, RequestCancellationTokens requestCancellationTokens) - { - var task = Task.Run(async () => + var task = Task.Run(async () => + { + while (inner.IsEmpty) { - while (inner.IsEmpty) - { - await Task.Delay(TimeSpan.FromMilliseconds(10), CancellationToken.None); - } + await Task.Delay(TimeSpan.FromMilliseconds(10), CancellationToken.None); + } - Parallel.ForEach(cancellationTokenSources, cancellationTokenSource => cancellationTokenSource.Cancel()); - }, - CancellationToken.None); + Parallel.ForEach(cancellationTokenSources, cancellationTokenSource => cancellationTokenSource.Cancel()); + }, + CancellationToken.None); - var result = await inner.QueueAndWaitAsync(request, requestCancellationTokens); - await task; - return result; - } + var result = await inner.QueueAndWaitAsync(request, cancellationTokens); + await task; + return result; } } + } } diff --git a/source/Halibut.Tests/Timeouts/SendingAndReceivingRequestMessagesTimeoutsFixture.cs b/source/Halibut.Tests/Timeouts/SendingAndReceivingRequestMessagesTimeoutsFixture.cs index 68e11d444..7807bd2c7 100644 --- a/source/Halibut.Tests/Timeouts/SendingAndReceivingRequestMessagesTimeoutsFixture.cs +++ b/source/Halibut.Tests/Timeouts/SendingAndReceivingRequestMessagesTimeoutsFixture.cs @@ -29,12 +29,7 @@ public async Task HalibutTimeoutsAndLimits_AppliesToTcpClientReceiveTimeout(Clie .WithPortForwarding(out var portForwarderRef, port => PortForwarderUtil.ForwardingToLocalPort(port).WithPortForwarderDataLogging(clientAndServiceTestCase.ServiceConnectionType).Build()) .WithEchoService() .WithDoSomeActionService(() => portForwarderRef.Value.PauseExistingConnections()) - .WhenTestingAsyncClient(clientAndServiceTestCase, b => - { - b.WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimitsForTestsBuilder().Build() - .WithAllTcpTimeoutsTo(TimeSpan.FromSeconds(133)) - .WithTcpClientReceiveTimeout(expectedTimeout)); - }) + .WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimitsForTestsBuilder().Build().WithAllTcpTimeoutsTo(TimeSpan.FromSeconds(133)).WithTcpClientReceiveTimeout(expectedTimeout)) .WithInstantReconnectPollingRetryPolicy() .Build(CancellationToken)) { @@ -48,6 +43,7 @@ public async Task HalibutTimeoutsAndLimits_AppliesToTcpClientReceiveTimeout(Clie sw.Stop(); Logger.Error(e, "Received error"); AssertExceptionMessageLooksLikeAReadTimeout(e); + sw.Elapsed.Should().BeGreaterThan(expectedTimeout - TimeSpan.FromSeconds(2), "The receive timeout should apply, not the shorter heart beat timeout") // -2s give it a little slack to avoid it timed out slightly too early. .And .BeLessThan(expectedTimeout + HalibutTimeoutsAndLimitsForTestsBuilder.HalfTheTcpReceiveTimeout, "We should be timing out on the tcp receive timeout"); diff --git a/source/Halibut.Tests/Transport/Observability/ConnectionObserverFixture.cs b/source/Halibut.Tests/Transport/Observability/ConnectionObserverFixture.cs index d780a6cf6..0dc0133ff 100644 --- a/source/Halibut.Tests/Transport/Observability/ConnectionObserverFixture.cs +++ b/source/Halibut.Tests/Transport/Observability/ConnectionObserverFixture.cs @@ -93,7 +93,7 @@ public async Task ObserveUnauthorizedPollingConnections(ClientAndServiceTestCase var echo = clientAndBuilder.CreateAsyncClient( point => { point.PollingRequestQueueTimeout = TimeSpan.FromSeconds(2000); }); - var sayHelloTask = Task.Run(async () => await echo.SayHelloAsync("hello", new HalibutProxyRequestOptions(token, CancellationToken.None)), CancellationToken); + var sayHelloTask = Task.Run(async () => await echo.SayHelloAsync("hello", new HalibutProxyRequestOptions(token)), CancellationToken); await Task.Delay(3000, CancellationToken); @@ -126,7 +126,7 @@ public async Task ObserveUnauthorizedPollingWebSocketConnections(ClientAndServic var echo = clientAndBuilder.CreateAsyncClient( point => { point.PollingRequestQueueTimeout = TimeSpan.FromSeconds(2000); }); - var sayHelloTask = Task.Run(async () => await echo.SayHelloAsync("hello", new HalibutProxyRequestOptions(token, CancellationToken.None)), CancellationToken); + var sayHelloTask = Task.Run(async () => await echo.SayHelloAsync("hello", new HalibutProxyRequestOptions(token)), CancellationToken); await Task.Delay(3000, CancellationToken); diff --git a/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs b/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs index ddb13aa58..aadb0cc6e 100644 --- a/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs +++ b/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs @@ -146,9 +146,9 @@ public async Task ShouldExchangeAsServerOfSubscriberAsync() { stream.SetRemoteIdentity(new RemoteIdentity(RemoteIdentityType.Subscriber, new Uri("poll://12831"))); var requestQueue = Substitute.For(); - var queue = new Queue(); - queue.Enqueue(new RequestMessage()); - queue.Enqueue(new RequestMessage()); + var queue = new Queue(); + queue.Enqueue(new (new RequestMessage(), CancellationToken.None)); + queue.Enqueue(new (new RequestMessage(), CancellationToken.None)); requestQueue.DequeueAsync(CancellationToken.None).Returns(ci => queue.Count > 0 ? queue.Dequeue() : null); stream.SetNumberOfReads(2); @@ -219,16 +219,16 @@ public async Task ShouldExchangeAsServerOfSubscriberWithPoolingAsync() { stream.SetRemoteIdentity(new RemoteIdentity(RemoteIdentityType.Subscriber, new Uri("poll://12831"))); var requestQueue = Substitute.For(); - var queue = new Queue(); + var queue = new Queue(); requestQueue.DequeueAsync(CancellationToken.None).Returns(ci => queue.Count > 0 ? queue.Dequeue() : null); - queue.Enqueue(new RequestMessage()); - queue.Enqueue(new RequestMessage()); + queue.Enqueue(new (new RequestMessage(), CancellationToken.None)); + queue.Enqueue(new (new RequestMessage(), CancellationToken.None)); stream.SetNumberOfReads(2); await protocol.ExchangeAsServerAsync(req => Task.FromResult(ResponseMessage.FromException(req, new Exception("Divide by zero"))), ri => requestQueue, CancellationToken.None); - queue.Enqueue(new RequestMessage()); + queue.Enqueue(new (new RequestMessage(), CancellationToken.None)); stream.SetNumberOfReads(1); diff --git a/source/Halibut.Tests/Transport/SecureClientFixture.cs b/source/Halibut.Tests/Transport/SecureClientFixture.cs index f4d25ef6b..b137b8084 100644 --- a/source/Halibut.Tests/Transport/SecureClientFixture.cs +++ b/source/Halibut.Tests/Transport/SecureClientFixture.cs @@ -82,9 +82,7 @@ public async Task SecureClientClearsPoolWhenAllConnectionsCorrupt() var secureClient = new SecureListeningClient((s, l) => GetProtocol(s, l), endpoint, Certificates.Octopus, log, connectionManager, tcpConnectionFactory); ResponseMessage response = null!; - using var requestCancellationTokens = new RequestCancellationTokens(CancellationToken.None, CancellationToken.None); - - await secureClient.ExecuteTransactionAsync(async (mep, ct) => response = await mep.ExchangeAsClientAsync(request, ct), requestCancellationTokens); + await secureClient.ExecuteTransactionAsync(async (mep, ct) => response = await mep.ExchangeAsClientAsync(request, ct), CancellationToken.None); // The pool should be cleared after the second failure await stream.Received(2).IdentifyAsClientAsync(Arg.Any()); diff --git a/source/Halibut.Tests/WhenCancellingARequestForAPollingTentacle.cs b/source/Halibut.Tests/WhenCancellingARequestForAPollingTentacle.cs index 6aa4edcc4..a2b440092 100644 --- a/source/Halibut.Tests/WhenCancellingARequestForAPollingTentacle.cs +++ b/source/Halibut.Tests/WhenCancellingARequestForAPollingTentacle.cs @@ -1,9 +1,11 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; using FluentAssertions; +using Halibut.Exceptions; using Halibut.Logging; using Halibut.ServiceModel; using Halibut.Tests.Support; @@ -12,6 +14,8 @@ using Halibut.Tests.Support.TestCases; using Halibut.Tests.TestServices; using Halibut.Tests.TestServices.Async; +using Halibut.TestUtils.Contracts; +using Halibut.Transport.Protocol; using NUnit.Framework; namespace Halibut.Tests @@ -21,15 +25,10 @@ public class WhenCancellingARequestForAPollingTentacle public class AndTheRequestIsStillQueued : BaseTest { [Test] - [LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false, additionalParameters: new object[] { true, false })] - [LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false, additionalParameters: new object[] { false, true })] - [LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false, additionalParameters: new object[] { true, true })] - public async Task TheRequestShouldBeCancelled_WhenTheConnectingOrInProgressCancellationTokenIsCancelled_OnAsyncClients( - ClientAndServiceTestCase clientAndServiceTestCase, - bool connectingCancellationTokenCancelled, - bool inProgressCancellationTokenCancelled) + [LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false)] + public async Task TheRequestShouldBeCancelled_WhenTheRequestCancellationTokenIsCancelled(ClientAndServiceTestCase clientAndServiceTestCase) { - var (tokenSourcesToCancel, halibutProxyRequestOptions) = CreateTokenSourceAndHalibutProxyRequestOptions(connectingCancellationTokenCancelled, inProgressCancellationTokenCancelled); + var (tokenSourcesToCancel, halibutProxyRequestOptions) = CreateTokenSourceAndHalibutProxyRequestOptions(); await using (var client = await clientAndServiceTestCase.CreateClientOnlyTestCaseBuilder() .AsLatestClientBuilder() @@ -38,7 +37,7 @@ public async Task TheRequestShouldBeCancelled_WhenTheConnectingOrInProgressCance { var doSomeActionService = client.CreateClientWithoutService(); - await AssertException.Throws(() => doSomeActionService.ActionAsync(halibutProxyRequestOptions)); + await AssertionExtensions.Should(() => doSomeActionService.ActionAsync(halibutProxyRequestOptions)).ThrowAsync(); } } } @@ -47,99 +46,201 @@ public class AndTheRequestHasBeenDequeuedButNoResponseReceived : BaseTest { [Test] [LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false)] - public async Task TheRequestShouldNotBeCancelled_WhenTheConnectingCancellationTokenIsCancelled_OnAsyncClients(ClientAndServiceTestCase clientAndServiceTestCase) + public async Task TheRequestShouldBeCancelled_WhenTheRequestCancellationTokenIsCancelled(ClientAndServiceTestCase clientAndServiceTestCase) { + var responseMessages = new List(); + var shouldCancelWhenRequestDequeued = false; var calls = new List(); - var (tokenSourcesToCancel, halibutProxyRequestOptions) = CreateTokenSourceAndHalibutProxyRequestOptions( - connectingCancellationTokenCancelled: true, - inProgressCancellationTokenCancelled: false); + var (tokenSourceToCancel, halibutProxyRequestOptions) = CreateTokenSourceAndHalibutProxyRequestOptions(); await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() .AsLatestClientAndLatestServiceBuilder() + .WithHalibutLoggingLevel(LogLevel.Trace) .WithDoSomeActionService(() => { calls.Add(DateTime.UtcNow); - while (!tokenSourcesToCancel.All(x => x.IsCancellationRequested)) + while (!tokenSourceToCancel.IsCancellationRequested) { - Thread.Sleep(TimeSpan.FromMilliseconds(10)); + // Wait until the request is cancelled + Thread.Sleep(TimeSpan.FromMilliseconds(100)); } Thread.Sleep(TimeSpan.FromSeconds(1)); }) - .WithPendingRequestQueueFactoryBuilder(builder => builder.WithDecorator((_, inner) => new CancelWhenRequestDequeuedPendingRequestQueueFactory(inner, tokenSourcesToCancel))) + .WithEchoService() + .WithPendingRequestQueueFactoryBuilder(builder => builder.WithDecorator((_, inner) => + new CancelWhenRequestDequeuedPendingRequestQueueFactory(inner, tokenSourceToCancel, ShouldCancelOnDequeue, OnResponseApplied))) .Build(CancellationToken)) { - var doSomeActionService = clientAndService.CreateAsyncClient(); - await doSomeActionService.ActionAsync(halibutProxyRequestOptions); + shouldCancelWhenRequestDequeued = true; + var doSomeActionService = clientAndService.CreateAsyncClient(); + var echoService = clientAndService.CreateAsyncClient(); + + await AssertionExtensions.Should(() => doSomeActionService.ActionAsync(halibutProxyRequestOptions)).ThrowAsync(); + + // Ensure we can send another message to the Service which will validate the Client had the request cancelled to the socket + shouldCancelWhenRequestDequeued = false; + var started = Stopwatch.StartNew(); + await echoService.SayHelloAsync(".", new HalibutProxyRequestOptions(CancellationToken.None)); + // This should return quickly + started.Elapsed.Should().BeLessThan(TimeSpan.FromSeconds(10)); } calls.Should().HaveCount(1); + + // Wait for all responses have been received + await Task.Delay(TimeSpan.FromSeconds(5)); + + // Ensure we did not get a valid response back from the doSomeActionService and that the request was cancelled to the socket. + responseMessages.Should().HaveCount(2); + responseMessages.ElementAt(0).Id.Should().Contain("IDoSomeActionService::ActionAsync"); + responseMessages.ElementAt(0).Error.Should().NotBeNull(); + responseMessages.ElementAt(0).Error!.Message.Should().Contain("The Request was cancelled while Transferring"); + responseMessages.ElementAt(1).Error.Should().BeNull(); + responseMessages.ElementAt(1).Id.Should().Contain("IEchoService::SayHelloAsync"); + + bool ShouldCancelOnDequeue() + { + return shouldCancelWhenRequestDequeued; + } + + void OnResponseApplied(ResponseMessage response) + { + responseMessages.Add(response); + } } + } - [Test] - [LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false, additionalParameters: new object[]{ false, true })] - [LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false, additionalParameters: new object[]{ true, true })] - public async Task TheRequestShouldBeCancelled_WhenTheInProgressCancellationTokenIsCancelled_OnAsyncClients( - ClientAndServiceTestCase clientAndServiceTestCase, - bool connectingCancellationTokenCancelled, - bool inProgressCancellationTokenCancelled) - { - var calls = new List(); - var (tokenSourcesToCancel, halibutProxyRequestOptions) = CreateTokenSourceAndHalibutProxyRequestOptions(connectingCancellationTokenCancelled, inProgressCancellationTokenCancelled); + static (CancellationTokenSource TokenSourceToCancel, HalibutProxyRequestOptions HalibutProxyRequestOptions) CreateTokenSourceAndHalibutProxyRequestOptions() + { + var requestCancellationTokenSource = new CancellationTokenSource(); + var halibutProxyRequestOptions = new HalibutProxyRequestOptions(requestCancellationTokenSource.Token); + + return (requestCancellationTokenSource, halibutProxyRequestOptions); + } - await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() - .AsLatestClientAndLatestServiceBuilder() - .WithHalibutLoggingLevel(LogLevel.Trace) - .WithDoSomeActionService(() => - { - calls.Add(DateTime.UtcNow); + /// + /// CancelWhenRequestQueuedPendingRequestQueueFactory cancels the cancellation token source when a request is queued + /// + class CancelWhenRequestQueuedPendingRequestQueueFactory : IPendingRequestQueueFactory + { + readonly CancellationTokenSource cancellationTokenSource; + readonly IPendingRequestQueueFactory inner; - while (!tokenSourcesToCancel.All(x => x.IsCancellationRequested)) - { - Thread.Sleep(TimeSpan.FromMilliseconds(10)); - } + public CancelWhenRequestQueuedPendingRequestQueueFactory(IPendingRequestQueueFactory inner, CancellationTokenSource cancellationTokenSource) + { + this.cancellationTokenSource = cancellationTokenSource; + this.inner = inner; + } + + public IPendingRequestQueue CreateQueue(Uri endpoint) + { + return new Decorator(inner.CreateQueue(endpoint), cancellationTokenSource); + } - Thread.Sleep(TimeSpan.FromSeconds(1)); - }) - .WithPendingRequestQueueFactoryBuilder(builder => builder.WithDecorator((_, inner) => new CancelWhenRequestDequeuedPendingRequestQueueFactory(inner, tokenSourcesToCancel))) - .Build(CancellationToken)) + class Decorator : IPendingRequestQueue + { + readonly CancellationTokenSource cancellationTokenSource; + readonly IPendingRequestQueue inner; + + public Decorator(IPendingRequestQueue inner, CancellationTokenSource cancellationTokenSource) { - var doSomeActionService = clientAndService.CreateAsyncClient(); + this.inner = inner; + this.cancellationTokenSource = cancellationTokenSource; + } - await AssertException.Throws(() => doSomeActionService.ActionAsync(halibutProxyRequestOptions)); + public bool IsEmpty => inner.IsEmpty; + public int Count => inner.Count; + public async Task ApplyResponse(ResponseMessage response, ServiceEndPoint destination) + { + await inner.ApplyResponse(response, destination); } - calls.Should().HaveCount(1); + public async Task DequeueAsync(CancellationToken cancellationToken) => await inner.DequeueAsync(cancellationToken); + + public async Task QueueAndWaitAsync(RequestMessage request, CancellationToken requestCancellationToken) + { + var task = Task.Run(async () => + { + while (inner.IsEmpty) + { + await Task.Delay(TimeSpan.FromMilliseconds(10), CancellationToken.None); + } + + cancellationTokenSource.Cancel(); + }, + CancellationToken.None); + + var result = await inner.QueueAndWaitAsync(request, requestCancellationToken); + await task; + return result; + } } } - static (CancellationTokenSource[] ToeknSourcesToCancel, HalibutProxyRequestOptions HalibutProxyRequestOptions) CreateTokenSourceAndHalibutProxyRequestOptions( - bool connectingCancellationTokenCancelled, - bool inProgressCancellationTokenCancelled) + /// + /// CancelWhenRequestDequeuedPendingRequestQueueFactory cancels the cancellation token source when a request is queued + /// + class CancelWhenRequestDequeuedPendingRequestQueueFactory : IPendingRequestQueueFactory { - var connectingCancellationTokenSource = new CancellationTokenSource(); - var inProgressCancellationTokenSource = new CancellationTokenSource(); - - CancellationTokenSource[] tokenSourcesToCancel; + readonly CancellationTokenSource cancellationTokenSource; + readonly Func shouldCancelOnDequeue; + readonly Action onResponseApplied; + readonly IPendingRequestQueueFactory inner; - if (connectingCancellationTokenCancelled && inProgressCancellationTokenCancelled) + public CancelWhenRequestDequeuedPendingRequestQueueFactory(IPendingRequestQueueFactory inner, CancellationTokenSource cancellationTokenSource, Func shouldCancelOnDequeue, Action onResponseApplied) { - tokenSourcesToCancel = new [] { connectingCancellationTokenSource, inProgressCancellationTokenSource }; + this.cancellationTokenSource = cancellationTokenSource; + this.shouldCancelOnDequeue = shouldCancelOnDequeue; + this.inner = inner; + this.onResponseApplied = onResponseApplied; } - else if (connectingCancellationTokenCancelled) + + public IPendingRequestQueue CreateQueue(Uri endpoint) { - tokenSourcesToCancel = new [] { connectingCancellationTokenSource }; + return new Decorator(inner.CreateQueue(endpoint), cancellationTokenSource, shouldCancelOnDequeue, onResponseApplied); } - else + + class Decorator : IPendingRequestQueue { - tokenSourcesToCancel = new [] { inProgressCancellationTokenSource }; - } + readonly CancellationTokenSource cancellationTokenSource; + readonly Func shouldCancel; + readonly Action onResponseApplied; + readonly IPendingRequestQueue inner; - var halibutProxyRequestOptions = new HalibutProxyRequestOptions(connectingCancellationTokenSource.Token, inProgressCancellationTokenSource.Token); - - return (tokenSourcesToCancel, halibutProxyRequestOptions); + public Decorator(IPendingRequestQueue inner, CancellationTokenSource cancellationTokenSource, Func shouldCancel, Action onResponseApplied) + { + this.inner = inner; + this.cancellationTokenSource = cancellationTokenSource; + this.shouldCancel = shouldCancel; + this.onResponseApplied = onResponseApplied; + } + + public bool IsEmpty => inner.IsEmpty; + public int Count => inner.Count; + public async Task ApplyResponse(ResponseMessage response, ServiceEndPoint destination) + { + onResponseApplied(response); + await inner.ApplyResponse(response, destination); + } + + public async Task DequeueAsync(CancellationToken cancellationToken) + { + var response = await inner.DequeueAsync(cancellationToken); + + if (shouldCancel()) + { + cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(2)); + } + + return response; + } + + public async Task QueueAndWaitAsync(RequestMessage request, CancellationToken requestCancellationToken) + => await inner.QueueAndWaitAsync(request, requestCancellationToken); + } } } } \ No newline at end of file diff --git a/source/Halibut/Exceptions/RequestCancelledException.cs b/source/Halibut/Exceptions/RequestCancelledException.cs new file mode 100644 index 000000000..18553a41b --- /dev/null +++ b/source/Halibut/Exceptions/RequestCancelledException.cs @@ -0,0 +1,47 @@ +using System; + +namespace Halibut.Exceptions +{ + public abstract class RequestCancelledException : OperationCanceledException + { + protected RequestCancelledException(string message, Exception innerException) + : base(message, innerException) + { + } + } + + public class ConnectingRequestCancelledException : RequestCancelledException + { + public ConnectingRequestCancelledException(Exception innerException) + : this("The Request was cancelled while Connecting.", innerException) + { + } + + public ConnectingRequestCancelledException(string message, Exception innerException) + : base(message, innerException) + { + } + public ConnectingRequestCancelledException(string message, string innerException) + : base(message, new Exception(innerException)) + { + } + } + + public class TransferringRequestCancelledException : RequestCancelledException + { + public TransferringRequestCancelledException(Exception innerException) + : this("The Request was cancelled while Transferring.", innerException) + { + } + + public TransferringRequestCancelledException(string message, Exception innerException) + : base(message, innerException) + { + } + + public TransferringRequestCancelledException(string message, string innerException) + : base(message, new Exception(innerException)) + { + } + } +} \ No newline at end of file diff --git a/source/Halibut/HalibutRuntime.cs b/source/Halibut/HalibutRuntime.cs index b0eb9d35e..6a151f507 100644 --- a/source/Halibut/HalibutRuntime.cs +++ b/source/Halibut/HalibutRuntime.cs @@ -192,11 +192,11 @@ public TAsyncClientService CreateAsyncClient(Serv var logger = logs.ForEndpoint(endpoint.BaseUri); var proxy = DispatchProxyAsync.Create(); - (proxy as HalibutProxyWithAsync)!.Configure(SendOutgoingRequestAsync, typeof(TService), endpoint, logger, CancellationToken.None); + (proxy as HalibutProxyWithAsync)!.Configure(SendOutgoingRequestAsync, typeof(TService), endpoint, logger); return proxy; } - async Task SendOutgoingRequestAsync(RequestMessage request, MethodInfo methodInfo, RequestCancellationTokens requestCancellationTokens) + async Task SendOutgoingRequestAsync(RequestMessage request, MethodInfo methodInfo, CancellationToken cancellationToken) { var endPoint = request.Destination; @@ -212,10 +212,10 @@ async Task SendOutgoingRequestAsync(RequestMessage request, Met switch (endPoint.BaseUri.Scheme.ToLowerInvariant()) { case "https": - response = await SendOutgoingHttpsRequestAsync(request, requestCancellationTokens).ConfigureAwait(false); + response = await SendOutgoingHttpsRequestAsync(request, cancellationToken).ConfigureAwait(false); break; case "poll": - response = await SendOutgoingPollingRequestAsync(request, requestCancellationTokens).ConfigureAwait(false); + response = await SendOutgoingPollingRequestAsync(request, cancellationToken).ConfigureAwait(false); break; default: throw new ArgumentException("Unknown endpoint type: " + endPoint.BaseUri.Scheme); } @@ -225,7 +225,7 @@ async Task SendOutgoingRequestAsync(RequestMessage request, Met return response; } - async Task SendOutgoingHttpsRequestAsync(RequestMessage request, RequestCancellationTokens requestCancellationTokens) + async Task SendOutgoingHttpsRequestAsync(RequestMessage request, CancellationToken cancellationToken) { var client = new SecureListeningClient(ExchangeProtocolBuilder(), request.Destination, serverCertificate, logs.ForEndpoint(request.Destination.BaseUri), connectionManager, tcpConnectionFactory); @@ -236,15 +236,15 @@ await client.ExecuteTransactionAsync( { response = await protocol.ExchangeAsClientAsync(request, cts).ConfigureAwait(false); }, - requestCancellationTokens).ConfigureAwait(false); + cancellationToken).ConfigureAwait(false); return response; } - async Task SendOutgoingPollingRequestAsync(RequestMessage request, RequestCancellationTokens requestCancellationTokens) + async Task SendOutgoingPollingRequestAsync(RequestMessage request, CancellationToken cancellationToken) { var queue = GetQueue(request.Destination.BaseUri); - return await queue.QueueAndWaitAsync(request, requestCancellationTokens); + return await queue.QueueAndWaitAsync(request, cancellationToken); } async Task HandleIncomingRequestAsync(RequestMessage request) diff --git a/source/Halibut/ServiceModel/HalibutProxyRequestOptions.cs b/source/Halibut/ServiceModel/HalibutProxyRequestOptions.cs index 598442d4a..253c6046c 100644 --- a/source/Halibut/ServiceModel/HalibutProxyRequestOptions.cs +++ b/source/Halibut/ServiceModel/HalibutProxyRequestOptions.cs @@ -1,5 +1,3 @@ - - using System.Threading; namespace Halibut.ServiceModel @@ -7,28 +5,13 @@ namespace Halibut.ServiceModel public class HalibutProxyRequestOptions { /// - /// When cancelled, will only stop an RPC call if it is known to not be received by the service. - /// For a Listening Service, cancellation can occur when the Client is still connecting to the Service. - /// For a Polling Service, cancellation can occur when the Client has queued a Request but the Service has not yet Dequeued it. - /// - public CancellationToken? ConnectingCancellationToken { get; } - - /// - /// When cancelled, will try to cancel an in-progress / in-flight RPC call. - /// This is a best effort cancellation and is not guaranteed. - /// For Sync Halibut, providing this cancellation token is not supported. - /// For Async Halibut this will attempt to cancel the RPC call. - /// If the call is to a Listening Service, then cancellation is performed all the way down to the Socket operations. - /// if the call is to a Polling Service, then cancellation is performed all the way down to the Polling Queue, - /// this means the client can cancel the call but the service will still process the request and return a response. + /// When cancelled, will cancel a connecting or in-progress/in-flight RPC call. /// - public CancellationToken? InProgressRequestCancellationToken { get; } + public CancellationToken RequestCancellationToken { get; } - public HalibutProxyRequestOptions(CancellationToken? connectingCancellationToken, - CancellationToken? inProgressRequestCancellationToken) + public HalibutProxyRequestOptions(CancellationToken cancellationToken) { - ConnectingCancellationToken = connectingCancellationToken; - InProgressRequestCancellationToken = inProgressRequestCancellationToken; + RequestCancellationToken = cancellationToken; } } } \ No newline at end of file diff --git a/source/Halibut/ServiceModel/HalibutProxyWithAsync.cs b/source/Halibut/ServiceModel/HalibutProxyWithAsync.cs index 3d8ac8d53..b549bf887 100644 --- a/source/Halibut/ServiceModel/HalibutProxyWithAsync.cs +++ b/source/Halibut/ServiceModel/HalibutProxyWithAsync.cs @@ -9,7 +9,7 @@ namespace Halibut.ServiceModel { - public delegate Task MessageRouter(RequestMessage request, MethodInfo serviceMethod, RequestCancellationTokens requestCancellationTokens); + public delegate Task MessageRouter(RequestMessage request, MethodInfo serviceMethod, CancellationToken cancellationToken); public class HalibutProxyWithAsync : DispatchProxyAsync { @@ -19,7 +19,6 @@ public class HalibutProxyWithAsync : DispatchProxyAsync ServiceEndPoint endPoint; long callId; bool configured; - CancellationToken globalCancellationToken; ILog logger; #pragma warning restore CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable. @@ -27,13 +26,11 @@ public void Configure( MessageRouter messageRouter, Type contractType, ServiceEndPoint endPoint, - ILog logger, - CancellationToken cancellationToken) + ILog logger) { this.messageRouter = messageRouter; this.contractType = contractType; this.endPoint = endPoint; - this.globalCancellationToken = cancellationToken; this.configured = true; this.logger = logger; } @@ -74,9 +71,7 @@ public override async Task InvokeAsyncT(MethodInfo asyncMethod, object[] a var request = CreateRequest(asyncMethod, serviceMethod, args); - using var requestCancellationTokens = RequestCancellationTokens(halibutProxyRequestOptions); - - var response = await messageRouter(request, serviceMethod, requestCancellationTokens); + var response = await messageRouter(request, serviceMethod, halibutProxyRequestOptions?.RequestCancellationToken ?? CancellationToken.None); EnsureNotError(response); @@ -105,19 +100,7 @@ RequestMessage CreateRequest(MethodInfo asyncMethod, MethodInfo targetMethod, ob }; return request; } - - RequestCancellationTokens RequestCancellationTokens(HalibutProxyRequestOptions? halibutProxyRequestOptions) - { - if (halibutProxyRequestOptions == null) - { - return new RequestCancellationTokens(globalCancellationToken, CancellationToken.None); - } - - return new RequestCancellationTokens( - halibutProxyRequestOptions.ConnectingCancellationToken ?? CancellationToken.None, - halibutProxyRequestOptions.InProgressRequestCancellationToken ?? CancellationToken.None); - } - + void EnsureNotError(ResponseMessage responseMessage) { if (responseMessage == null) @@ -167,7 +150,7 @@ internal static void ThrowExceptionFromReceivedError(ServerError error, ILog log } } - catch (Exception exception) when (!(exception is HalibutClientException)) + catch (Exception exception) when (exception is not HalibutClientException && exception is not RequestCancelledException) { // Something went wrong trying to understand the ServerError revert back to the old behaviour of just // throwing a standard halibut client exception. diff --git a/source/Halibut/ServiceModel/IPendingRequestQueue.cs b/source/Halibut/ServiceModel/IPendingRequestQueue.cs index ac8fc9fff..a744e0857 100644 --- a/source/Halibut/ServiceModel/IPendingRequestQueue.cs +++ b/source/Halibut/ServiceModel/IPendingRequestQueue.cs @@ -10,7 +10,7 @@ public interface IPendingRequestQueue bool IsEmpty { get; } int Count { get; } Task ApplyResponse(ResponseMessage response, ServiceEndPoint destination); - Task DequeueAsync(CancellationToken cancellationToken); - Task QueueAndWaitAsync(RequestMessage request, RequestCancellationTokens requestCancellationTokens); + Task DequeueAsync(CancellationToken cancellationToken); + Task QueueAndWaitAsync(RequestMessage request, CancellationToken cancellationToken); } } \ No newline at end of file diff --git a/source/Halibut/ServiceModel/PendingRequestQueueAsync.cs b/source/Halibut/ServiceModel/PendingRequestQueueAsync.cs index c647ddbf4..666cb5aa9 100644 --- a/source/Halibut/ServiceModel/PendingRequestQueueAsync.cs +++ b/source/Halibut/ServiceModel/PendingRequestQueueAsync.cs @@ -4,6 +4,7 @@ using System.Threading; using System.Threading.Tasks; using Halibut.Diagnostics; +using Halibut.Exceptions; using Halibut.Transport.Protocol; using Nito.AsyncEx; @@ -29,20 +30,27 @@ public PendingRequestQueueAsync(ILog log, TimeSpan pollingQueueWaitTimeout) this.pollingQueueWaitTimeout = pollingQueueWaitTimeout; } - public async Task QueueAndWaitAsync(RequestMessage request, RequestCancellationTokens requestCancellationTokens) + public async Task QueueAndWaitAsync(RequestMessage request, CancellationToken cancellationToken) { using var pending = new PendingRequest(request, log); - using (await queueLock.LockAsync(requestCancellationTokens.LinkedCancellationToken)) + try + { + using (await queueLock.LockAsync(cancellationToken)) + { + queue.Add(pending); + inProgress.Add(request.Id, pending); + itemAddedToQueue.Set(); + } + } + catch (OperationCanceledException ex) { - queue.Add(pending); - inProgress.Add(request.Id, pending); - itemAddedToQueue.Set(); + throw new ConnectingRequestCancelledException(ex); } try { - await pending.WaitUntilComplete(requestCancellationTokens); + await pending.WaitUntilComplete(cancellationToken); } finally { @@ -80,7 +88,7 @@ public int Count } } - public async Task DequeueAsync(CancellationToken cancellationToken) + public async Task DequeueAsync(CancellationToken cancellationToken) { var timer = Stopwatch.StartNew(); @@ -96,7 +104,7 @@ public int Count var result = await pending.BeginTransfer(); if (result) { - return pending.Request; + return new (pending.Request, pending.PendingRequestCancellationToken); } } } @@ -169,17 +177,21 @@ class PendingRequest : IDisposable readonly SemaphoreSlim transferLock = new(1, 1); bool transferBegun; bool completed; + readonly CancellationTokenSource pendingRequestCancellationTokenSource; ResponseMessage? response; public PendingRequest(RequestMessage request, ILog log) { this.request = request; this.log = log; + + pendingRequestCancellationTokenSource = new CancellationTokenSource(); + PendingRequestCancellationToken = pendingRequestCancellationTokenSource.Token; } public RequestMessage Request => request; - public async Task WaitUntilComplete(RequestCancellationTokens requestCancellationTokens) + public async Task WaitUntilComplete(CancellationToken cancellationToken) { log.Write(EventType.MessageExchange, "Request {0} was queued", request); @@ -188,19 +200,20 @@ public async Task WaitUntilComplete(RequestCancellationTokens requestCancellatio try { - responseSet = await WaitForResponseToBeSet(request.Destination.PollingRequestQueueTimeout, requestCancellationTokens.LinkedCancellationToken); + responseSet = await WaitForResponseToBeSet( + request.Destination.PollingRequestQueueTimeout, + // Don't cancel a dequeued request as we need to wait PollingRequestMaximumMessageProcessingTimeout for it to complete + cancelTheRequestWhenTransferHasBegun: false, + cancellationToken); + if (responseSet) { log.Write(EventType.MessageExchange, "Request {0} was collected by the polling endpoint", request); return; } } - catch (Exception ex) when (ex is TaskCanceledException or OperationCanceledException) + catch (RequestCancelledException) { - // responseWaiter.Set is only called when the request has been collected and the response received. - // It is possible that the transfer has already started once the requestCancellationTokens.LinkedCancellationToke is cancelled - // If the requestCancellationTokens.InProgressCancellationToken is Ct.None or not cancelled then - // we cannot walk away from the request as it is already in progress and no longer in the connecting phase cancelled = true; using (await transferLock.LockAsync(CancellationToken.None)) @@ -211,11 +224,6 @@ public async Task WaitUntilComplete(RequestCancellationTokens requestCancellatio log.Write(EventType.MessageExchange, "Request {0} was cancelled before it could be collected by the polling endpoint", request); throw; } - - if (!requestCancellationTokens.CanCancelInProgressRequest()) - { - log.Write(EventType.MessageExchange, "Request {0} was cancelled after it had been collected by the polling endpoint and will not be cancelled", request); - } } } @@ -234,8 +242,12 @@ public async Task WaitUntilComplete(RequestCancellationTokens requestCancellatio if (waitForTransferToComplete) { - // We cannot use requestCancellationTokens.ConnectingCancellationToken here, because if we were cancelled, and the transfer has begun, we should attempt to wait for it. - responseSet = await WaitForResponseToBeSet(request.Destination.PollingRequestMaximumMessageProcessingTimeout, requestCancellationTokens.InProgressRequestCancellationToken); + responseSet = await WaitForResponseToBeSet( + request.Destination.PollingRequestMaximumMessageProcessingTimeout, + // Cancel the dequeued request to force Reads and Writes to be cancelled + cancelTheRequestWhenTransferHasBegun: true, + cancellationToken); + if (responseSet) { // We end up here when the request is cancelled but already being transferred so we need to adjust the log message accordingly @@ -250,7 +262,7 @@ public async Task WaitUntilComplete(RequestCancellationTokens requestCancellatio } else { - if (requestCancellationTokens.InProgressRequestCancellationToken.IsCancellationRequested) + if (cancellationToken.IsCancellationRequested) { log.Write(EventType.MessageExchange, "Request {0} was cancelled before a response was received", request); SetResponse(ResponseMessage.FromException(request, new TimeoutException($"A request was sent to a polling endpoint, the polling endpoint collected it but the request was cancelled before the polling endpoint responded."))); @@ -269,18 +281,37 @@ public async Task WaitUntilComplete(RequestCancellationTokens requestCancellatio } } - async Task WaitForResponseToBeSet(TimeSpan timeout, CancellationToken cancellationToken) + async Task WaitForResponseToBeSet(TimeSpan timeout, bool cancelTheRequestWhenTransferHasBegun, CancellationToken cancellationToken) { - using var cancellationTokenSource = new CancellationTokenSource(timeout); + using var timeoutCancellationTokenSource = new CancellationTokenSource(timeout); + try { - using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationTokenSource.Token, cancellationToken); + using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(timeoutCancellationTokenSource.Token, cancellationToken); await responseWaiter.WaitAsync(linkedTokenSource.Token); } - catch (OperationCanceledException) + catch (OperationCanceledException ex) { - if (cancellationTokenSource.IsCancellationRequested) return false; - throw; + using (await transferLock.LockAsync(CancellationToken.None)) + { + if (transferBegun && cancelTheRequestWhenTransferHasBegun) + { + // Cancel the dequeued request. This will cause co-operative cancellation on the thread dequeuing the request + pendingRequestCancellationTokenSource.Cancel(); + } + else if (!transferBegun) + { + // Cancel the queued request. This will flag the request as cancelled to stop it being dequeued + pendingRequestCancellationTokenSource.Cancel(); + } + + if (timeoutCancellationTokenSource.IsCancellationRequested) + { + return false; + } + + throw transferBegun ? new TransferringRequestCancelledException(ex) : new ConnectingRequestCancelledException(ex); + } } return true; @@ -297,7 +328,9 @@ public async Task BeginTransfer() { using (await transferLock.LockAsync(CancellationToken.None)) { - if (completed) + // Check if the request has already been completed or if the request has been cancelled + // to ensure we don't dequeue an already completed or already cancelled request + if (completed || pendingRequestCancellationTokenSource.IsCancellationRequested) { return false; } @@ -313,6 +346,7 @@ public async Task BeginTransfer() } public ResponseMessage Response => response ?? throw new InvalidOperationException("Response has not been set."); + public CancellationToken PendingRequestCancellationToken { get; } public void SetResponse(ResponseMessage response) { @@ -322,6 +356,7 @@ public void SetResponse(ResponseMessage response) public void Dispose() { + pendingRequestCancellationTokenSource?.Dispose(); transferLock?.Dispose(); } } diff --git a/source/Halibut/ServiceModel/RequestCancellationTokens.cs b/source/Halibut/ServiceModel/RequestCancellationTokens.cs deleted file mode 100644 index e89e77d50..000000000 --- a/source/Halibut/ServiceModel/RequestCancellationTokens.cs +++ /dev/null @@ -1,56 +0,0 @@ -using System; -using System.Threading; - -namespace Halibut.ServiceModel -{ - public class RequestCancellationTokens : IDisposable - { - CancellationTokenSource? linkedCancellationTokenSource; - - public RequestCancellationTokens(CancellationToken connectingCancellationToken, CancellationToken inProgressRequestCancellationToken) - { - ConnectingCancellationToken = connectingCancellationToken; - InProgressRequestCancellationToken = inProgressRequestCancellationToken; - - if (ConnectingCancellationToken == CancellationToken.None && InProgressRequestCancellationToken == CancellationToken.None) - { - LinkedCancellationToken = CancellationToken.None; - } - else if (InProgressRequestCancellationToken == CancellationToken.None) - { - LinkedCancellationToken = ConnectingCancellationToken; - } - else if (ConnectingCancellationToken == CancellationToken.None) - { - LinkedCancellationToken = InProgressRequestCancellationToken; - } - else if (ConnectingCancellationToken == InProgressRequestCancellationToken) - { - LinkedCancellationToken = ConnectingCancellationToken; - } - else - { - linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(ConnectingCancellationToken, InProgressRequestCancellationToken); - - LinkedCancellationToken = linkedCancellationTokenSource.Token; - } - } - - public CancellationToken ConnectingCancellationToken { get; set; } - public CancellationToken InProgressRequestCancellationToken { get; set; } - - public CancellationToken LinkedCancellationToken { get; private set; } - - public void Dispose() - { - LinkedCancellationToken = CancellationToken.None; - linkedCancellationTokenSource?.Dispose(); - linkedCancellationTokenSource = null; - } - - public bool CanCancelInProgressRequest() - { - return InProgressRequestCancellationToken != CancellationToken.None; - } - } -} \ No newline at end of file diff --git a/source/Halibut/Transport/ISecureClient.cs b/source/Halibut/Transport/ISecureClient.cs index 97d41970d..2b5a36a39 100644 --- a/source/Halibut/Transport/ISecureClient.cs +++ b/source/Halibut/Transport/ISecureClient.cs @@ -1,5 +1,5 @@ +using System.Threading; using System.Threading.Tasks; -using Halibut.ServiceModel; using Halibut.Transport.Protocol; namespace Halibut.Transport @@ -7,6 +7,6 @@ namespace Halibut.Transport public interface ISecureClient { ServiceEndPoint ServiceEndpoint { get; } - Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, RequestCancellationTokens requestCancellationTokens); + Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, CancellationToken cancellationToken); } } diff --git a/source/Halibut/Transport/PollingClient.cs b/source/Halibut/Transport/PollingClient.cs index 2c6157db6..c5f0e7cdf 100644 --- a/source/Halibut/Transport/PollingClient.cs +++ b/source/Halibut/Transport/PollingClient.cs @@ -15,13 +15,9 @@ public class PollingClient : IPollingClient readonly ILog log; readonly ISecureClient secureClient; readonly Uri subscription; - Task? pollingClientLoopTask; - readonly CancellationTokenSource workingCancellationTokenSource; - readonly CancellationToken cancellationToken; - + readonly CancellationTokenSource cancellationTokenSource; readonly Func createRetryPolicy; - RequestCancellationTokens? requestCancellationTokens; public PollingClient(Uri subscription, ISecureClient secureClient, Func> handleIncomingRequestAsync, ILog log, CancellationToken cancellationToken, Func createRetryPolicy) { @@ -29,33 +25,31 @@ public PollingClient(Uri subscription, ISecureClient secureClient, Func await ExecutePollingLoopAsyncCatchingExceptions(requestCancellationTokens)); + var requestCancellationToken = cancellationTokenSource.Token; + pollingClientLoopTask = Task.Run(async () => await ExecutePollingLoopAsyncCatchingExceptions(requestCancellationToken)); } public void Dispose() { - Try.CatchingError(workingCancellationTokenSource.Cancel, _ => { }); - Try.CatchingError(workingCancellationTokenSource.Dispose, _ => { }); - Try.CatchingError(() => requestCancellationTokens?.Dispose(), _ => { }); + Try.CatchingError(cancellationTokenSource.Cancel, _ => { }); + Try.CatchingError(cancellationTokenSource.Dispose, _ => { }); } /// /// Runs ExecutePollingLoopAsync but catches any exception that falls out of it, log here /// rather than let it be unobserved. We are not expecting an exception but just in case. /// - async Task ExecutePollingLoopAsyncCatchingExceptions(RequestCancellationTokens requestCancellationTokens) + async Task ExecutePollingLoopAsyncCatchingExceptions(CancellationToken cancellationToken) { try { - await ExecutePollingLoopAsync(requestCancellationTokens); + await ExecutePollingLoopAsync(cancellationToken); } catch (Exception e) { @@ -63,11 +57,11 @@ async Task ExecutePollingLoopAsyncCatchingExceptions(RequestCancellationTokens r } } - async Task ExecutePollingLoopAsync(RequestCancellationTokens requestCancellationTokens) + async Task ExecutePollingLoopAsync(CancellationToken cancellationToken) { var retry = createRetryPolicy(); var sleepFor = TimeSpan.Zero; - while (!requestCancellationTokens.LinkedCancellationToken.IsCancellationRequested) + while (!cancellationToken.IsCancellationRequested) { try { @@ -80,7 +74,7 @@ await secureClient.ExecuteTransactionAsync(async (protocol, ct) => // Subsequent connection issues will try and reconnect quickly and then back-off retry.Success(); await protocol.ExchangeAsSubscriberAsync(subscription, handleIncomingRequestAsync, int.MaxValue, ct); - }, requestCancellationTokens); + }, cancellationToken); retry.Success(); } finally @@ -98,7 +92,7 @@ await secureClient.ExecuteTransactionAsync(async (protocol, ct) => } finally { - await Task.Delay(sleepFor, requestCancellationTokens.LinkedCancellationToken); + await Task.Delay(sleepFor, cancellationToken); } } } diff --git a/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs b/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs index 1732c42e4..ad414b066 100644 --- a/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs +++ b/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs @@ -3,6 +3,7 @@ using System.Threading; using System.Threading.Tasks; using Halibut.Diagnostics; +using Halibut.Exceptions; using Halibut.ServiceModel; using Halibut.Transport.Observability; @@ -214,26 +215,36 @@ async Task ProcessSubscriberAsync(IPendingRequestQueue pendingRequests, Cancella } } - async Task ProcessReceiverInternalAsync(IPendingRequestQueue pendingRequests, RequestMessage? nextRequest, CancellationToken cancellationToken) + async Task ProcessReceiverInternalAsync(IPendingRequestQueue pendingRequests, RequestMessageWithCancellationToken? nextRequest, CancellationToken cancellationToken) { try { if (nextRequest != null) { - var response = await SendAndReceiveRequest(nextRequest, cancellationToken); - await pendingRequests.ApplyResponse(response, nextRequest.Destination); + using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(nextRequest.CancellationToken, cancellationToken); + var linkedCancellationToken = linkedTokenSource.Token; + + var response = await SendAndReceiveRequest(nextRequest.RequestMessage, linkedCancellationToken); + await pendingRequests.ApplyResponse(response, nextRequest.RequestMessage.Destination); } else { - await stream.SendAsync(nextRequest, cancellationToken); + await stream.SendAsync(null, cancellationToken); } } catch (Exception ex) { if (nextRequest != null) { - var response = ResponseMessage.FromException(nextRequest, ex); - await pendingRequests.ApplyResponse(response, nextRequest.Destination); + var cancellationException = nextRequest.CancellationToken.IsCancellationRequested ? new TransferringRequestCancelledException(ex) : ex; + + var response = ResponseMessage.FromException(nextRequest.RequestMessage, cancellationException); + await pendingRequests.ApplyResponse(response, nextRequest.RequestMessage.Destination); + + if (nextRequest.CancellationToken.IsCancellationRequested) + { + throw cancellationException; + } } return false; diff --git a/source/Halibut/Transport/Protocol/RequestMessage.cs b/source/Halibut/Transport/Protocol/RequestMessage.cs index 7b284d327..25daec614 100644 --- a/source/Halibut/Transport/Protocol/RequestMessage.cs +++ b/source/Halibut/Transport/Protocol/RequestMessage.cs @@ -1,5 +1,4 @@ using System; -using Halibut.Diagnostics; using Newtonsoft.Json; namespace Halibut.Transport.Protocol diff --git a/source/Halibut/Transport/Protocol/RequestMessageWithCancellationToken.cs b/source/Halibut/Transport/Protocol/RequestMessageWithCancellationToken.cs new file mode 100644 index 000000000..d85c71e2a --- /dev/null +++ b/source/Halibut/Transport/Protocol/RequestMessageWithCancellationToken.cs @@ -0,0 +1,16 @@ +using System.Threading; + +namespace Halibut.Transport.Protocol +{ + public class RequestMessageWithCancellationToken + { + public RequestMessageWithCancellationToken(RequestMessage requestMessage, CancellationToken cancellationToken) + { + RequestMessage = requestMessage; + CancellationToken = cancellationToken; + } + + public RequestMessage RequestMessage { get; } + public CancellationToken CancellationToken { get; } + } +} \ No newline at end of file diff --git a/source/Halibut/Transport/Protocol/ResponseMessage.cs b/source/Halibut/Transport/Protocol/ResponseMessage.cs index 6f51b116f..b5cd4cc54 100644 --- a/source/Halibut/Transport/Protocol/ResponseMessage.cs +++ b/source/Halibut/Transport/Protocol/ResponseMessage.cs @@ -1,5 +1,6 @@ using System; using Halibut.Diagnostics; +using Halibut.Exceptions; using Newtonsoft.Json; namespace Halibut.Transport.Protocol @@ -35,10 +36,12 @@ public static ResponseMessage FromException(RequestMessage request, Exception ex internal static ServerError ServerErrorFromException(Exception ex) { string? errorType = null; - if (ex is HalibutClientException) + + if (ex is HalibutClientException or RequestCancelledException) { errorType = ex.GetType().FullName; } + return new ServerError { Message = ex.UnpackFromContainers().Message, Details = ex.ToString(), HalibutErrorType = errorType }; } } diff --git a/source/Halibut/Transport/SecureClient.cs b/source/Halibut/Transport/SecureClient.cs index 8d9519915..82bd44b61 100644 --- a/source/Halibut/Transport/SecureClient.cs +++ b/source/Halibut/Transport/SecureClient.cs @@ -8,9 +8,7 @@ using System.Threading; using System.Threading.Tasks; using Halibut.Diagnostics; -using Halibut.ServiceModel; using Halibut.Transport.Protocol; -using Halibut.Transport.Streams; using Halibut.Util; namespace Halibut.Transport @@ -40,7 +38,7 @@ public SecureClient(ExchangeProtocolBuilder protocolBuilder, public ServiceEndPoint ServiceEndpoint { get; } - public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, RequestCancellationTokens requestCancellationTokens) + public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, CancellationToken cancellationToken) { var retryInterval = ServiceEndpoint.RetryListeningSleepInterval; @@ -53,7 +51,7 @@ public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, R { if (i > 0) { - await Task.Delay(retryInterval, requestCancellationTokens.LinkedCancellationToken).ConfigureAwait(false); + await Task.Delay(retryInterval, cancellationToken).ConfigureAwait(false); log.Write(EventType.OpeningNewConnection, $"Retrying connection to {ServiceEndpoint.Format()} - attempt #{i}."); } @@ -69,14 +67,11 @@ public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, R tcpConnectionFactory, ServiceEndpoint, log, - requestCancellationTokens.LinkedCancellationToken).ConfigureAwait(false); + cancellationToken).ConfigureAwait(false); // Beyond this point, we have no way to be certain that the server hasn't tried to process a request; therefore, we can't retry after this point retryAllowed = false; - - // TODO: Enhancement: Pass the RequestCancellationTokens to the protocol handler so that it can cancel - // PrepareExchangeAsClientAsync as part of the ConnectingCancellationToken being cancelled - await protocolHandler(connection.Protocol, requestCancellationTokens.InProgressRequestCancellationToken).ConfigureAwait(false); + await protocolHandler(connection.Protocol, cancellationToken).ConfigureAwait(false); } catch { @@ -84,14 +79,17 @@ public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, R { await connection.DisposeAsync(); } - + if (connectionManager.IsDisposed) + { return; + } + throw; } // Only return the connection to the pool if all went well - await connectionManager.ReleaseConnectionAsync(ServiceEndpoint, connection, requestCancellationTokens.InProgressRequestCancellationToken); + await connectionManager.ReleaseConnectionAsync(ServiceEndpoint, connection, cancellationToken); } catch (AuthenticationException ex) { @@ -125,7 +123,7 @@ public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, R // against all connections in the pool being bad if (i == 1) { - await connectionManager.ClearPooledConnectionsAsync(ServiceEndpoint, log, requestCancellationTokens.InProgressRequestCancellationToken); + await connectionManager.ClearPooledConnectionsAsync(ServiceEndpoint, log, cancellationToken); } } catch (IOException ex) when (ex.IsSocketConnectionReset()) diff --git a/source/Halibut/Transport/SecureListeningClient.cs b/source/Halibut/Transport/SecureListeningClient.cs index da6fe555b..cfab4f2db 100644 --- a/source/Halibut/Transport/SecureListeningClient.cs +++ b/source/Halibut/Transport/SecureListeningClient.cs @@ -5,9 +5,10 @@ using System.Security.Authentication; using System.Security.Cryptography.X509Certificates; using System.Text; +using System.Threading; using System.Threading.Tasks; using Halibut.Diagnostics; -using Halibut.ServiceModel; +using Halibut.Exceptions; using Halibut.Transport.Protocol; using Halibut.Util; @@ -38,7 +39,7 @@ public SecureListeningClient(ExchangeProtocolBuilder exchangeProtocolBuilder, public ServiceEndPoint ServiceEndpoint { get; } - public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, RequestCancellationTokens requestCancellationTokens) + public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, CancellationToken cancellationToken) { var retryInterval = ServiceEndpoint.RetryListeningSleepInterval; @@ -51,8 +52,15 @@ public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, R { if (i > 0) { - await Task.Delay(retryInterval, requestCancellationTokens.LinkedCancellationToken).ConfigureAwait(false); - log.Write(EventType.OpeningNewConnection, $"Retrying connection to {ServiceEndpoint.Format()} - attempt #{i}."); + try + { + await Task.Delay(retryInterval, cancellationToken).ConfigureAwait(false); + log.Write(EventType.OpeningNewConnection, $"Retrying connection to {ServiceEndpoint.Format()} - attempt #{i}."); + } + catch (Exception ex) when (cancellationToken.IsCancellationRequested) + { + throw new ConnectingRequestCancelledException(ex); + } } try @@ -62,19 +70,31 @@ public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, R IConnection? connection = null; try { - connection = await connectionManager.AcquireConnectionAsync( - exchangeProtocolBuilder, - tcpConnectionFactory, - ServiceEndpoint, - log, - requestCancellationTokens.LinkedCancellationToken).ConfigureAwait(false); + try + { + connection = await connectionManager.AcquireConnectionAsync( + exchangeProtocolBuilder, + tcpConnectionFactory, + ServiceEndpoint, + log, + cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) when (cancellationToken.IsCancellationRequested) + { + throw new ConnectingRequestCancelledException(ex); + } // Beyond this point, we have no way to be certain that the server hasn't tried to process a request; therefore, we can't retry after this point retryAllowed = false; - // TODO: Enhancement: Pass the RequestCancellationTokens to the protocol handler so that it can cancel - // PrepareExchangeAsClientAsync as part of the ConnectingCancellationToken being cancelled - await protocolHandler(connection.Protocol, requestCancellationTokens.InProgressRequestCancellationToken).ConfigureAwait(false); + try + { + await protocolHandler(connection.Protocol, cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) when (cancellationToken.IsCancellationRequested) + { + throw new TransferringRequestCancelledException(ex); + } } catch { @@ -89,7 +109,7 @@ public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, R } // Only return the connection to the pool if all went well - await connectionManager.ReleaseConnectionAsync(ServiceEndpoint, connection, requestCancellationTokens.InProgressRequestCancellationToken); + await connectionManager.ReleaseConnectionAsync(ServiceEndpoint, connection, cancellationToken); } catch (AuthenticationException ex) { @@ -128,7 +148,7 @@ public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, R // against all connections in the pool being bad if (i == 1) { - await connectionManager.ClearPooledConnectionsAsync(ServiceEndpoint, log, requestCancellationTokens.LinkedCancellationToken); + await connectionManager.ClearPooledConnectionsAsync(ServiceEndpoint, log, cancellationToken); } } catch (IOException ex) when (ex.IsSocketConnectionReset()) diff --git a/source/Halibut/Transport/SecureWebSocketClient.cs b/source/Halibut/Transport/SecureWebSocketClient.cs index a86d8fce6..ba92d2cbf 100644 --- a/source/Halibut/Transport/SecureWebSocketClient.cs +++ b/source/Halibut/Transport/SecureWebSocketClient.cs @@ -50,7 +50,7 @@ public SecureWebSocketClient(ExchangeProtocolBuilder protocolBuilder, public ServiceEndPoint ServiceEndpoint => serviceEndpoint; - public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, RequestCancellationTokens requestCancellationTokens) + public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, CancellationToken cancellationToken) { var retryInterval = ServiceEndpoint.RetryListeningSleepInterval; @@ -63,7 +63,7 @@ public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, R { if (i > 0) { - await Task.Delay(retryInterval, requestCancellationTokens.LinkedCancellationToken).ConfigureAwait(false); + await Task.Delay(retryInterval, cancellationToken).ConfigureAwait(false); log.Write(EventType.OpeningNewConnection, $"Retrying connection to {serviceEndpoint.Format()} - attempt #{i}."); } @@ -79,14 +79,11 @@ public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, R new WebSocketConnectionFactory(clientCertificate, halibutTimeoutsAndLimits, streamFactory), serviceEndpoint, log, - requestCancellationTokens.LinkedCancellationToken).ConfigureAwait(false); + cancellationToken).ConfigureAwait(false); // Beyond this point, we have no way to be certain that the server hasn't tried to process a request; therefore, we can't retry after this point retryAllowed = false; - - // TODO: Enhancement: Pass the RequestCancellationTokens to the protocol handler so that it can cancel - // PrepareExchangeAsClientAsync as part of the ConnectingCancellationToken being cancelled - await protocolHandler(connection.Protocol, requestCancellationTokens.InProgressRequestCancellationToken).ConfigureAwait(false); + await protocolHandler(connection.Protocol, cancellationToken).ConfigureAwait(false); } catch { @@ -99,7 +96,7 @@ public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, R } // Only return the connection to the pool if all went well - await connectionManager.ReleaseConnectionAsync(serviceEndpoint, connection, requestCancellationTokens.InProgressRequestCancellationToken); + await connectionManager.ReleaseConnectionAsync(serviceEndpoint, connection, cancellationToken); } catch (AuthenticationException aex) { @@ -136,7 +133,7 @@ public async Task ExecuteTransactionAsync(ExchangeActionAsync protocolHandler, R // against all connections in the pool being bad if (i == 1) { - await connectionManager.ClearPooledConnectionsAsync(serviceEndpoint, log, requestCancellationTokens.InProgressRequestCancellationToken); + await connectionManager.ClearPooledConnectionsAsync(serviceEndpoint, log, cancellationToken); } } diff --git a/source/Halibut/Transport/Streams/NetworkTimeoutStream.cs b/source/Halibut/Transport/Streams/NetworkTimeoutStream.cs index a3b0df101..4221082d2 100644 --- a/source/Halibut/Transport/Streams/NetworkTimeoutStream.cs +++ b/source/Halibut/Transport/Streams/NetworkTimeoutStream.cs @@ -3,6 +3,7 @@ using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; +using Halibut.Util; #if NETFRAMEWORK using System.Runtime.Remoting; @@ -13,8 +14,8 @@ namespace Halibut.Transport.Streams class NetworkTimeoutStream : AsyncStream { readonly Stream inner; - bool hasTimedOut = false; - Exception? timeoutException = null; + bool hasCancelledOrTimedOut = false; + Exception? cancellationOrTimeoutException = null; public NetworkTimeoutStream(Stream inner) { @@ -28,7 +29,7 @@ public override async ValueTask DisposeAsync() public override async Task FlushAsync(CancellationToken cancellationToken) { - ThrowIfAlreadyTimedOut(); + ThrowIfAlreadyCancelledOrTimedOut(); await WrapWithCancellationAndTimeout( async ct => @@ -44,7 +45,7 @@ await WrapWithCancellationAndTimeout( public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - ThrowIfAlreadyTimedOut(); + ThrowIfAlreadyCancelledOrTimedOut(); return await WrapWithCancellationAndTimeout( async ct => await inner.ReadAsync(buffer, offset, count, ct), @@ -56,7 +57,7 @@ public override async Task ReadAsync(byte[] buffer, int offset, int count, public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - ThrowIfAlreadyTimedOut(); + ThrowIfAlreadyCancelledOrTimedOut(); await WrapWithCancellationAndTimeout( async ct => @@ -73,7 +74,7 @@ await WrapWithCancellationAndTimeout( #if !NETFRAMEWORK public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) { - ThrowIfAlreadyTimedOut(); + ThrowIfAlreadyCancelledOrTimedOut(); return await WrapWithCancellationAndTimeout( async ct => await inner.ReadAsync(buffer, ct), @@ -85,7 +86,7 @@ public override async ValueTask ReadAsync(Memory buffer, Cancellation public override async ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) { - ThrowIfAlreadyTimedOut(); + ThrowIfAlreadyCancelledOrTimedOut(); await WrapWithCancellationAndTimeout( async ct => @@ -107,7 +108,7 @@ public override void Close() public override async Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) { - ThrowIfAlreadyTimedOut(); + ThrowIfAlreadyCancelledOrTimedOut(); await base.CopyToAsync(destination, bufferSize, cancellationToken); } @@ -129,14 +130,14 @@ public override int Read(byte[] buffer, int offset, int count) public override long Seek(long offset, SeekOrigin origin) { - ThrowIfAlreadyTimedOut(); + ThrowIfAlreadyCancelledOrTimedOut(); return inner.Seek(offset, origin); } public override void SetLength(long value) { - ThrowIfAlreadyTimedOut(); + ThrowIfAlreadyCancelledOrTimedOut(); inner.SetLength(value); } @@ -154,14 +155,14 @@ public override void WriteByte(byte value) #if !NETFRAMEWORK public override void CopyTo(Stream destination, int bufferSize) { - ThrowIfAlreadyTimedOut(); + ThrowIfAlreadyCancelledOrTimedOut(); base.CopyTo(destination, bufferSize); } public override int Read(Span buffer) { - ThrowIfAlreadyTimedOut(); + ThrowIfAlreadyCancelledOrTimedOut(); try { @@ -181,7 +182,7 @@ public override int Read(Span buffer) public override void Write(ReadOnlySpan buffer) { - ThrowIfAlreadyTimedOut(); + ThrowIfAlreadyCancelledOrTimedOut(); try { @@ -203,14 +204,14 @@ public override void Write(ReadOnlySpan buffer) #if NETFRAMEWORK public override ObjRef CreateObjRef(Type requestedType) { - ThrowIfAlreadyTimedOut(); + ThrowIfAlreadyCancelledOrTimedOut(); return inner.CreateObjRef(requestedType); } public override object? InitializeLifetimeService() { - ThrowIfAlreadyTimedOut(); + ThrowIfAlreadyCancelledOrTimedOut(); return inner.InitializeLifetimeService(); } @@ -220,12 +221,12 @@ public override int ReadTimeout { get { - ThrowIfAlreadyTimedOut(); + ThrowIfAlreadyCancelledOrTimedOut(); return inner.ReadTimeout; } set { - ThrowIfAlreadyTimedOut(); + ThrowIfAlreadyCancelledOrTimedOut(); inner.ReadTimeout = value; } } @@ -234,12 +235,12 @@ public override int WriteTimeout { get { - ThrowIfAlreadyTimedOut(); + ThrowIfAlreadyCancelledOrTimedOut(); return inner.WriteTimeout; } set { - ThrowIfAlreadyTimedOut(); + ThrowIfAlreadyCancelledOrTimedOut(); inner.WriteTimeout = value; } } @@ -248,7 +249,7 @@ public override bool CanTimeout { get { - ThrowIfAlreadyTimedOut(); + ThrowIfAlreadyCancelledOrTimedOut(); return inner.CanTimeout; } } @@ -257,7 +258,7 @@ public override bool CanRead { get { - ThrowIfAlreadyTimedOut(); + ThrowIfAlreadyCancelledOrTimedOut(); return inner.CanRead; } } @@ -266,7 +267,7 @@ public override bool CanSeek { get { - ThrowIfAlreadyTimedOut(); + ThrowIfAlreadyCancelledOrTimedOut(); return inner.CanSeek; } } @@ -275,7 +276,7 @@ public override bool CanWrite { get { - ThrowIfAlreadyTimedOut(); + ThrowIfAlreadyCancelledOrTimedOut(); return inner.CanWrite; } } @@ -284,7 +285,7 @@ public override long Length { get { - ThrowIfAlreadyTimedOut(); + ThrowIfAlreadyCancelledOrTimedOut(); return inner.Length; } } @@ -293,12 +294,12 @@ public override long Position { get { - ThrowIfAlreadyTimedOut(); + ThrowIfAlreadyCancelledOrTimedOut(); return inner.Position; } set { - ThrowIfAlreadyTimedOut(); + ThrowIfAlreadyCancelledOrTimedOut(); inner.Position = value; } } @@ -327,15 +328,10 @@ async Task WrapWithCancellationAndTimeout( async Task SafelyDisposeStream(Exception exception) { - try - { - timeoutException = exception; - hasTimedOut = true; - await inner.DisposeAsync(); - } - catch - { - } + cancellationOrTimeoutException = exception; + hasCancelledOrTimedOut = true; + + await Try.CatchingError(async () => await DisposeAsync(), _ => { }); } Exception CreateExceptionOnTimeout() @@ -347,7 +343,7 @@ Exception CreateExceptionOnTimeout() void TryCloseOnTimeout(Action action) { - ThrowIfAlreadyTimedOut(); + ThrowIfAlreadyCancelledOrTimedOut(); try { @@ -367,7 +363,7 @@ void TryCloseOnTimeout(Action action) T TryCloseOnTimeout(Func action) { - ThrowIfAlreadyTimedOut(); + ThrowIfAlreadyCancelledOrTimedOut(); try { @@ -387,8 +383,8 @@ T TryCloseOnTimeout(Func action) void CloseOnTimeout(Exception ex) { - timeoutException = ex; - hasTimedOut = true; + cancellationOrTimeoutException = ex; + hasCancelledOrTimedOut = true; inner.Close(); } @@ -402,11 +398,11 @@ static bool IsTimeoutException(Exception exception) return exception.InnerException != null && IsTimeoutException(exception.InnerException); } - void ThrowIfAlreadyTimedOut() + void ThrowIfAlreadyCancelledOrTimedOut() { - if (hasTimedOut) + if (hasCancelledOrTimedOut) { - throw timeoutException ?? new SocketException((int)SocketError.TimedOut); + throw cancellationOrTimeoutException ?? new SocketException((int)SocketError.TimedOut); } } } diff --git a/source/Halibut/Util/Try.cs b/source/Halibut/Util/Try.cs index d45fedcf1..4c01d353c 100644 --- a/source/Halibut/Util/Try.cs +++ b/source/Halibut/Util/Try.cs @@ -1,5 +1,6 @@ using System; using System.IO; +using System.Threading.Tasks; namespace Halibut.Util { @@ -17,6 +18,18 @@ public static void CatchingError(Action tryThisAction, Action onFailu } } + public static async Task CatchingError(Func tryThisAction, Action onFailure) + { + try + { + await tryThisAction(); + } + catch (Exception e) + { + onFailure(e); + } + } + public static SilentStreamDisposer CatchingErrorOnDisposal(Stream streamToDispose, Action onFailure) { return new SilentStreamDisposer(streamToDispose, onFailure); diff --git a/source/Octopus.TestPortForwarder/PortForwarder.cs b/source/Octopus.TestPortForwarder/PortForwarder.cs index de016df6f..4393b1258 100644 --- a/source/Octopus.TestPortForwarder/PortForwarder.cs +++ b/source/Octopus.TestPortForwarder/PortForwarder.cs @@ -7,6 +7,7 @@ using System.Threading.Tasks; using Serilog; using System.Diagnostics.CodeAnalysis; +using Serilog.Core; namespace Octopus.TestPortForwarder { @@ -236,6 +237,7 @@ public void UnPauseExistingConnections() public void PauseExistingConnections() { + logger.Information("Pausing existing connections"); lock (pumps) { foreach (var pump in pumps)