Skip to content

Commit

Permalink
Add ability to Cancel Polling Requests to the Socket (#576)
Browse files Browse the repository at this point in the history
Combine connecting and in progress cancellation tokens
  • Loading branch information
nathanwoctopusdeploy authored Dec 11, 2023
1 parent a11f38e commit ac3deb7
Show file tree
Hide file tree
Showing 36 changed files with 623 additions and 626 deletions.
17 changes: 8 additions & 9 deletions source/Halibut.Tests/BadCertificatesTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public async Task FailWhenPollingServicePresentsWrongCertificate_ButServiceIsCon
});

// Act
var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None)), CancellationToken);
var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token)), CancellationToken);

// Interestingly the message exchange error is logged to a non polling looking URL, perhaps because it has not been identified?
Wait.UntilActionSucceeds(() => {
Expand Down Expand Up @@ -148,14 +148,14 @@ public async Task FailWhenPollingServiceHasThumbprintRemovedViaTrustOnly(ClientA
});

// Works normally
await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None));
await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None));
await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token));
await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token));

// Act
clientAndBuilder.Client.TrustOnly(new List<string>());

// Assert
var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None)), CancellationToken);
var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token)), CancellationToken);

await Task.Delay(3000, CancellationToken);

Expand All @@ -164,8 +164,7 @@ public async Task FailWhenPollingServiceHasThumbprintRemovedViaTrustOnly(ClientA
var exception = await AssertException.Throws<Exception>(incrementCount);

exception.And.Should().Match(e => e.GetType() == typeof(HalibutClientException)
|| e.GetType() == typeof(OperationCanceledException)
|| e.GetType() == typeof(TaskCanceledException));
|| e is OperationCanceledException);
}
}

Expand Down Expand Up @@ -213,8 +212,8 @@ public async Task FailWhenClientPresentsWrongCertificateToPollingService(ClientA
point.PollingRequestQueueTimeout = TimeSpan.FromSeconds(2000);
});

var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None)), CancellationToken);

var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token)), CancellationToken);
Func<LogEvent, bool> hasExpectedLog = logEvent =>
logEvent.FormattedMessage.Contains("The server at")
&& logEvent.FormattedMessage.Contains("presented an unexpected security certificate");
Expand Down Expand Up @@ -270,7 +269,7 @@ public async Task FailWhenPollingServicePresentsWrongCertificate(ClientAndServic
point.PollingRequestQueueTimeout = TimeSpan.FromSeconds(10);
});

var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None)), CancellationToken);
var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token)), CancellationToken);

// Interestingly the message exchange error is logged to a non polling looking URL, perhaps because it has not been identified?
Wait.UntilActionSucceeds(() => { AllLogs(serviceLoggers).Select(l => l.FormattedMessage).ToArray()
Expand Down
86 changes: 12 additions & 74 deletions source/Halibut.Tests/CancellationViaClientProxyFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Halibut.Logging;
using Halibut.Exceptions;
using Halibut.ServiceModel;
using Halibut.Tests.Support;
using Halibut.Tests.Support.ExtensionMethods;
Expand All @@ -21,27 +21,11 @@ public class CancellationViaClientProxyFixture : BaseTest
[Test]
[LatestClientAndLatestServiceTestCases(testNetworkConditions: false)]
[LatestClientAndPreviousServiceVersionsTestCases(testNetworkConditions: false)]
public async Task HalibutProxyRequestOptions_ConnectingCancellationToken_CanCancel_ConnectingOrQueuedRequests(ClientAndServiceTestCase clientAndServiceTestCase)
public async Task HalibutProxyRequestOptions_RequestCancellationToken_CanCancel_ConnectingOrQueuedRequests(ClientAndServiceTestCase clientAndServiceTestCase)
{
var tokenSourceToCancel = new CancellationTokenSource();
var halibutRequestOption = new HalibutProxyRequestOptions(tokenSourceToCancel.Token, CancellationToken.None);
var halibutRequestOption = new HalibutProxyRequestOptions(tokenSourceToCancel.Token);

await CanCancel_ConnectingOrQueuedRequests(clientAndServiceTestCase, tokenSourceToCancel, halibutRequestOption);
}

[Test]
[LatestClientAndLatestServiceTestCases(testNetworkConditions: false)]
[LatestClientAndPreviousServiceVersionsTestCases(testNetworkConditions: false)]
public async Task HalibutProxyRequestOptions_InProgressCancellationToken_CanCancel_ConnectingOrQueuedRequests(ClientAndServiceTestCase clientAndServiceTestCase)
{
var tokenSourceToCancel = new CancellationTokenSource();
var halibutRequestOption = new HalibutProxyRequestOptions(CancellationToken.None, tokenSourceToCancel.Token);

await CanCancel_ConnectingOrQueuedRequests(clientAndServiceTestCase, tokenSourceToCancel, halibutRequestOption);
}

async Task CanCancel_ConnectingOrQueuedRequests(ClientAndServiceTestCase clientAndServiceTestCase, CancellationTokenSource tokenSourceToCancel, HalibutProxyRequestOptions halibutRequestOption)
{
await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder()
.WithPortForwarding(out var portForwarderRef, port => PortForwarderUtil.ForwardingToLocalPort(port).Build())
.WithStandardServices()
Expand All @@ -55,64 +39,19 @@ async Task CanCancel_ConnectingOrQueuedRequests(ClientAndServiceTestCase clientA
point => point.TryAndConnectForALongTime());

tokenSourceToCancel.CancelAfter(TimeSpan.FromMilliseconds(100));
(await AssertException.Throws<Exception>(() => echo.IncrementAsync(halibutRequestOption)))
.And.Message.Contains("The operation was canceled");

(await AssertionExtensions.Should(() => echo.IncrementAsync(halibutRequestOption)).ThrowAsync<Exception>())
.And.Should().Match(x => x is ConnectingRequestCancelledException || (x is HalibutClientException && x.As<HalibutClientException>().Message.Contains("The Request was cancelled while Connecting")));

portForwarderRef.Value.ReturnToNormalMode();

await echo.IncrementAsync(new HalibutProxyRequestOptions(CancellationToken, CancellationToken.None));
await echo.IncrementAsync(new HalibutProxyRequestOptions(CancellationToken));

(await echo.GetCurrentValueAsync(new HalibutProxyRequestOptions(CancellationToken, CancellationToken.None)))
(await echo.GetCurrentValueAsync(new HalibutProxyRequestOptions(CancellationToken)))
.Should().Be(1, "Since we cancelled the first call");
}
}

[Test]
[LatestClientAndLatestServiceTestCases(testNetworkConditions: false)]
[LatestClientAndPreviousServiceVersionsTestCases(testNetworkConditions: false)]
public async Task HalibutProxyRequestOptions_ConnectingCancellationToken_CanNotCancel_InFlightRequests(ClientAndServiceTestCase clientAndServiceTestCase)
{
await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder()
.WithHalibutLoggingLevel(LogLevel.Trace)
.WithStandardServices()
.Build(CancellationToken))
{
var lockService = clientAndService.CreateAsyncClient<ILockService, IAsyncClientLockServiceWithOptions>();

var tokenSourceToCancel = new CancellationTokenSource();
using var tmpDir = new TemporaryDirectory();
var fileThatOnceDeletedEndsTheCall = tmpDir.CreateRandomFile();
var callStartedFile = tmpDir.RandomFileName();

var inFlightRequest = Task.Run(async () => await lockService.WaitForFileToBeDeletedAsync(
fileThatOnceDeletedEndsTheCall,
callStartedFile,
new HalibutProxyRequestOptions(tokenSourceToCancel.Token, CancellationToken.None)));

Logger.Information("Waiting for the RPC call to be inflight");
while (!File.Exists(callStartedFile))
{
await Task.Delay(TimeSpan.FromMilliseconds(100), CancellationToken);
}

// The call is now in flight. Call cancel on the cancellation token for that in flight request.
tokenSourceToCancel.Cancel();

// Give time for the cancellation to do something
await Task.Delay(TimeSpan.FromSeconds(2), CancellationToken);

if (inFlightRequest.Status == TaskStatus.Faulted) await inFlightRequest;

inFlightRequest.IsCompleted.Should().Be(false, $"The cancellation token can not cancel in flight requests. Current state: {inFlightRequest.Status}");

File.Delete(fileThatOnceDeletedEndsTheCall);

// Now the lock is released we should be able to complete the request.
await inFlightRequest;
}
}

[Test]
// TODO: ASYNC ME UP!
// net48 does not support cancellation of the request as the DeflateStream ends up using Begin and End methods which don't get passed the cancellation token
Expand All @@ -123,7 +62,7 @@ public async Task HalibutProxyRequestOptions_ConnectingCancellationToken_CanNotC
[LatestClientAndLatestServiceTestCases(testNetworkConditions: false)]
[LatestClientAndPreviousServiceVersionsTestCases(testNetworkConditions: false)]
#endif
public async Task HalibutProxyRequestOptions_InProgressCancellationToken_CanCancel_InFlightRequests(ClientAndServiceTestCase clientAndServiceTestCase)
public async Task HalibutProxyRequestOptions_RequestCancellationToken_CanCancel_InFlightRequests(ClientAndServiceTestCase clientAndServiceTestCase)
{
await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder()
.WithStandardServices()
Expand All @@ -141,7 +80,7 @@ public async Task HalibutProxyRequestOptions_InProgressCancellationToken_CanCanc
await lockService.WaitForFileToBeDeletedAsync(
fileThatOnceDeletedEndsTheCall,
callStartedFile,
new HalibutProxyRequestOptions(CancellationToken.None, tokenSourceToCancel.Token));
new HalibutProxyRequestOptions(tokenSourceToCancel.Token));
});

Logger.Information("Waiting for the RPC call to be inflight");
Expand All @@ -157,8 +96,7 @@ await lockService.WaitForFileToBeDeletedAsync(
await Task.Delay(TimeSpan.FromSeconds(2), CancellationToken);

(await AssertException.Throws<Exception>(inFlightRequest))
.And
.Should().Match<Exception>(x => x is OperationCanceledException || (x.GetType() == typeof(HalibutClientException) && x.Message.Contains("The ReadAsync operation was cancelled")));
.And.Should().Match(x => x is TransferringRequestCancelledException || (x is HalibutClientException && x.As<HalibutClientException>().Message.Contains("The Request was cancelled while Transferring")));
}
}

Expand Down Expand Up @@ -186,7 +124,7 @@ public async Task HalibutProxyRequestOptionsCanBeSentToLatestAndOldServicesThatP
{
var echo = clientAndService.CreateAsyncClient<IEchoService, IAsyncClientEchoServiceWithOptions>();

(await echo.SayHelloAsync("Hello!!", new HalibutProxyRequestOptions(CancellationToken, null)))
(await echo.SayHelloAsync("Hello!!", new HalibutProxyRequestOptions(CancellationToken)))
.Should()
.Be("Hello!!...");
}
Expand Down
Loading

0 comments on commit ac3deb7

Please sign in to comment.