Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Co-operatively cancel all RPC Calls #576

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions source/Halibut.Tests/BadCertificatesTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public async Task FailWhenPollingServicePresentsWrongCertificate_ButServiceIsCon
});

// Act
var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None)), CancellationToken);
var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token)), CancellationToken);

// Interestingly the message exchange error is logged to a non polling looking URL, perhaps because it has not been identified?
Wait.UntilActionSucceeds(() => {
Expand Down Expand Up @@ -148,14 +148,14 @@ public async Task FailWhenPollingServiceHasThumbprintRemovedViaTrustOnly(ClientA
});

// Works normally
await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None));
await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None));
await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token));
await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token));

// Act
clientAndBuilder.Client.TrustOnly(new List<string>());

// Assert
var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None)), CancellationToken);
var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token)), CancellationToken);

await Task.Delay(3000, CancellationToken);

Expand All @@ -164,8 +164,7 @@ public async Task FailWhenPollingServiceHasThumbprintRemovedViaTrustOnly(ClientA
var exception = await AssertException.Throws<Exception>(incrementCount);

exception.And.Should().Match(e => e.GetType() == typeof(HalibutClientException)
|| e.GetType() == typeof(OperationCanceledException)
|| e.GetType() == typeof(TaskCanceledException));
|| e is OperationCanceledException);
}
}

Expand Down Expand Up @@ -213,8 +212,8 @@ public async Task FailWhenClientPresentsWrongCertificateToPollingService(ClientA
point.PollingRequestQueueTimeout = TimeSpan.FromSeconds(2000);
});

var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None)), CancellationToken);

var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token)), CancellationToken);
Func<LogEvent, bool> hasExpectedLog = logEvent =>
logEvent.FormattedMessage.Contains("The server at")
&& logEvent.FormattedMessage.Contains("presented an unexpected security certificate");
Expand Down Expand Up @@ -270,7 +269,7 @@ public async Task FailWhenPollingServicePresentsWrongCertificate(ClientAndServic
point.PollingRequestQueueTimeout = TimeSpan.FromSeconds(10);
});

var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token, CancellationToken.None)), CancellationToken);
var incrementCount = Task.Run(async () => await clientCountingService.IncrementAsync(new HalibutProxyRequestOptions(cts.Token)), CancellationToken);

// Interestingly the message exchange error is logged to a non polling looking URL, perhaps because it has not been identified?
Wait.UntilActionSucceeds(() => { AllLogs(serviceLoggers).Select(l => l.FormattedMessage).ToArray()
Expand Down
86 changes: 12 additions & 74 deletions source/Halibut.Tests/CancellationViaClientProxyFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Halibut.Logging;
using Halibut.Exceptions;
using Halibut.ServiceModel;
using Halibut.Tests.Support;
using Halibut.Tests.Support.ExtensionMethods;
Expand All @@ -21,27 +21,11 @@ public class CancellationViaClientProxyFixture : BaseTest
[Test]
[LatestClientAndLatestServiceTestCases(testNetworkConditions: false)]
[LatestClientAndPreviousServiceVersionsTestCases(testNetworkConditions: false)]
public async Task HalibutProxyRequestOptions_ConnectingCancellationToken_CanCancel_ConnectingOrQueuedRequests(ClientAndServiceTestCase clientAndServiceTestCase)
public async Task HalibutProxyRequestOptions_RequestCancellationToken_CanCancel_ConnectingOrQueuedRequests(ClientAndServiceTestCase clientAndServiceTestCase)
{
var tokenSourceToCancel = new CancellationTokenSource();
var halibutRequestOption = new HalibutProxyRequestOptions(tokenSourceToCancel.Token, CancellationToken.None);
var halibutRequestOption = new HalibutProxyRequestOptions(tokenSourceToCancel.Token);

await CanCancel_ConnectingOrQueuedRequests(clientAndServiceTestCase, tokenSourceToCancel, halibutRequestOption);
}

[Test]
[LatestClientAndLatestServiceTestCases(testNetworkConditions: false)]
[LatestClientAndPreviousServiceVersionsTestCases(testNetworkConditions: false)]
public async Task HalibutProxyRequestOptions_InProgressCancellationToken_CanCancel_ConnectingOrQueuedRequests(ClientAndServiceTestCase clientAndServiceTestCase)
{
var tokenSourceToCancel = new CancellationTokenSource();
var halibutRequestOption = new HalibutProxyRequestOptions(CancellationToken.None, tokenSourceToCancel.Token);

await CanCancel_ConnectingOrQueuedRequests(clientAndServiceTestCase, tokenSourceToCancel, halibutRequestOption);
}

async Task CanCancel_ConnectingOrQueuedRequests(ClientAndServiceTestCase clientAndServiceTestCase, CancellationTokenSource tokenSourceToCancel, HalibutProxyRequestOptions halibutRequestOption)
{
await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder()
.WithPortForwarding(out var portForwarderRef, port => PortForwarderUtil.ForwardingToLocalPort(port).Build())
.WithStandardServices()
Expand All @@ -55,64 +39,19 @@ async Task CanCancel_ConnectingOrQueuedRequests(ClientAndServiceTestCase clientA
point => point.TryAndConnectForALongTime());

tokenSourceToCancel.CancelAfter(TimeSpan.FromMilliseconds(100));
(await AssertException.Throws<Exception>(() => echo.IncrementAsync(halibutRequestOption)))
.And.Message.Contains("The operation was canceled");

(await AssertionExtensions.Should(() => echo.IncrementAsync(halibutRequestOption)).ThrowAsync<Exception>())
.And.Should().Match(x => x is ConnectingRequestCancelledException || (x is HalibutClientException && x.As<HalibutClientException>().Message.Contains("The Request was cancelled while Connecting")));

portForwarderRef.Value.ReturnToNormalMode();

await echo.IncrementAsync(new HalibutProxyRequestOptions(CancellationToken, CancellationToken.None));
await echo.IncrementAsync(new HalibutProxyRequestOptions(CancellationToken));

(await echo.GetCurrentValueAsync(new HalibutProxyRequestOptions(CancellationToken, CancellationToken.None)))
(await echo.GetCurrentValueAsync(new HalibutProxyRequestOptions(CancellationToken)))
.Should().Be(1, "Since we cancelled the first call");
}
}

[Test]
[LatestClientAndLatestServiceTestCases(testNetworkConditions: false)]
[LatestClientAndPreviousServiceVersionsTestCases(testNetworkConditions: false)]
public async Task HalibutProxyRequestOptions_ConnectingCancellationToken_CanNotCancel_InFlightRequests(ClientAndServiceTestCase clientAndServiceTestCase)
{
await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder()
.WithHalibutLoggingLevel(LogLevel.Trace)
.WithStandardServices()
.Build(CancellationToken))
{
var lockService = clientAndService.CreateAsyncClient<ILockService, IAsyncClientLockServiceWithOptions>();

var tokenSourceToCancel = new CancellationTokenSource();
using var tmpDir = new TemporaryDirectory();
var fileThatOnceDeletedEndsTheCall = tmpDir.CreateRandomFile();
var callStartedFile = tmpDir.RandomFileName();

var inFlightRequest = Task.Run(async () => await lockService.WaitForFileToBeDeletedAsync(
fileThatOnceDeletedEndsTheCall,
callStartedFile,
new HalibutProxyRequestOptions(tokenSourceToCancel.Token, CancellationToken.None)));

Logger.Information("Waiting for the RPC call to be inflight");
while (!File.Exists(callStartedFile))
{
await Task.Delay(TimeSpan.FromMilliseconds(100), CancellationToken);
}

// The call is now in flight. Call cancel on the cancellation token for that in flight request.
tokenSourceToCancel.Cancel();

// Give time for the cancellation to do something
await Task.Delay(TimeSpan.FromSeconds(2), CancellationToken);

if (inFlightRequest.Status == TaskStatus.Faulted) await inFlightRequest;

inFlightRequest.IsCompleted.Should().Be(false, $"The cancellation token can not cancel in flight requests. Current state: {inFlightRequest.Status}");

File.Delete(fileThatOnceDeletedEndsTheCall);

// Now the lock is released we should be able to complete the request.
await inFlightRequest;
}
}

[Test]
// TODO: ASYNC ME UP!
// net48 does not support cancellation of the request as the DeflateStream ends up using Begin and End methods which don't get passed the cancellation token
Expand All @@ -123,7 +62,7 @@ public async Task HalibutProxyRequestOptions_ConnectingCancellationToken_CanNotC
[LatestClientAndLatestServiceTestCases(testNetworkConditions: false)]
[LatestClientAndPreviousServiceVersionsTestCases(testNetworkConditions: false)]
#endif
public async Task HalibutProxyRequestOptions_InProgressCancellationToken_CanCancel_InFlightRequests(ClientAndServiceTestCase clientAndServiceTestCase)
public async Task HalibutProxyRequestOptions_RequestCancellationToken_CanCancel_InFlightRequests(ClientAndServiceTestCase clientAndServiceTestCase)
{
await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder()
.WithStandardServices()
Expand All @@ -141,7 +80,7 @@ public async Task HalibutProxyRequestOptions_InProgressCancellationToken_CanCanc
await lockService.WaitForFileToBeDeletedAsync(
fileThatOnceDeletedEndsTheCall,
callStartedFile,
new HalibutProxyRequestOptions(CancellationToken.None, tokenSourceToCancel.Token));
new HalibutProxyRequestOptions(tokenSourceToCancel.Token));
});

Logger.Information("Waiting for the RPC call to be inflight");
Expand All @@ -157,8 +96,7 @@ await lockService.WaitForFileToBeDeletedAsync(
await Task.Delay(TimeSpan.FromSeconds(2), CancellationToken);

(await AssertException.Throws<Exception>(inFlightRequest))
.And
.Should().Match<Exception>(x => x is OperationCanceledException || (x.GetType() == typeof(HalibutClientException) && x.Message.Contains("The ReadAsync operation was cancelled")));
.And.Should().Match(x => x is TransferringRequestCancelledException || (x is HalibutClientException && x.As<HalibutClientException>().Message.Contains("The Request was cancelled while Transferring")));
}
}

Expand Down Expand Up @@ -186,7 +124,7 @@ public async Task HalibutProxyRequestOptionsCanBeSentToLatestAndOldServicesThatP
{
var echo = clientAndService.CreateAsyncClient<IEchoService, IAsyncClientEchoServiceWithOptions>();

(await echo.SayHelloAsync("Hello!!", new HalibutProxyRequestOptions(CancellationToken, null)))
(await echo.SayHelloAsync("Hello!!", new HalibutProxyRequestOptions(CancellationToken)))
.Should()
.Be("Hello!!...");
}
Expand Down
Loading