Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a bug in cooperative cancellation for Polling Services causing a request to time out to quickly #558

Merged
merged 1 commit into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions source/Halibut.Tests/PollingServiceTimeoutsFixture.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Halibut.Tests.Support;
using Halibut.Tests.Support.TestAttributes;
using Halibut.Tests.Support.TestCases;
using Halibut.Tests.TestServices;
using Halibut.Tests.TestServices.Async;
using Halibut.TestUtils.Contracts;
using NUnit.Framework;

namespace Halibut.Tests
{
public class PollingServiceTimeoutsFixture : BaseTest
{
[Test]
[LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false)]
public async Task WhenThePollingRequestQueueTimeoutIsReached_TheRequestShouldTimeout(ClientAndServiceTestCase clientAndServiceTestCase)
{
var halibutTimeoutsAndLimits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build();
halibutTimeoutsAndLimits.PollingRequestQueueTimeout = TimeSpan.FromSeconds(5);

await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder()
.As<LatestClientAndLatestServiceBuilder>()
.NoService()
.WithStandardServices()
.WithHalibutTimeoutsAndLimits(halibutTimeoutsAndLimits)
.Build(CancellationToken))
{
var client = clientAndService.CreateAsyncClient<IEchoService, IAsyncClientEchoServiceWithOptions>();

var stopwatch = Stopwatch.StartNew();
(await AssertAsync.Throws<HalibutClientException>(async () => await client.SayHelloAsync("Hello", new(CancellationToken))))
.And.Message.Should().Contain("A request was sent to a polling endpoint, but the polling endpoint did not collect the request within the allowed time (00:00:05), so the request timed out.");
stopwatch.Stop();

stopwatch.Elapsed.Should().BeLessThan(TimeSpan.FromSeconds(8), "Should have timed out quickly");
}
}

[Test]
[LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false)]
public async Task WhenThePollingRequestQueueTimeoutIsReached_ButTheResponseIsReceivedBeforeThePollingRequestMaximumMessageProcessingTimeoutIsReached_TheRequestShouldSucceed(ClientAndServiceTestCase clientAndServiceTestCase)
{
var halibutTimeoutsAndLimits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build();
halibutTimeoutsAndLimits.PollingRequestQueueTimeout = TimeSpan.FromSeconds(5);
halibutTimeoutsAndLimits.PollingRequestMaximumMessageProcessingTimeout = TimeSpan.FromSeconds(100);

var responseDelay = TimeSpan.FromSeconds(10);

await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder()
.As<LatestClientAndLatestServiceBuilder>()
.WithDoSomeActionService(() => Thread.Sleep(responseDelay))
.WithHalibutTimeoutsAndLimits(halibutTimeoutsAndLimits)
.WithInstantReconnectPollingRetryPolicy()
.Build(CancellationToken))
{
var doSomeActionClient = clientAndService.CreateAsyncClient<IDoSomeActionService, IAsyncClientDoSomeActionServiceWithOptions>();

var stopwatch = Stopwatch.StartNew();
await doSomeActionClient.ActionAsync(new(CancellationToken));
stopwatch.Stop();

stopwatch.Elapsed.Should()
.BeGreaterThan(halibutTimeoutsAndLimits.PollingRequestQueueTimeout, "Should have waited longer than the PollingRequestQueueTimeout").And
.BeLessThan(responseDelay + TimeSpan.FromSeconds(5), "Should have received the response after the 10 second delay + 5 second buffer");
}
}

[Test]
[LatestClientAndLatestServiceTestCases(testNetworkConditions: false, testListening: false)]
public async Task WhenThePollingRequestMaximumMessageProcessingTimeoutIsReached_TheRequestShouldTimeout_AndTheTransferringPendingRequestCancelled(ClientAndServiceTestCase clientAndServiceTestCase)
{
var halibutTimeoutsAndLimits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build();
halibutTimeoutsAndLimits.PollingRequestQueueTimeout = TimeSpan.FromSeconds(5);
halibutTimeoutsAndLimits.PollingRequestMaximumMessageProcessingTimeout = TimeSpan.FromSeconds(6);

var waitSemaphore = new SemaphoreSlim(0, 1);
var connectionsObserver = new TestConnectionsObserver();

await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder()
.As<LatestClientAndLatestServiceBuilder>()
.WithDoSomeActionService(() => waitSemaphore.Wait(CancellationToken))
.WithHalibutTimeoutsAndLimits(halibutTimeoutsAndLimits)
.WithInstantReconnectPollingRetryPolicy()
.WithConnectionObserverOnTcpServer(connectionsObserver)
.Build(CancellationToken))
{
var doSomeActionClient = clientAndService.CreateAsyncClient<IDoSomeActionService, IAsyncClientDoSomeActionServiceWithOptions>();

var stopwatch = Stopwatch.StartNew();
(await AssertAsync.Throws<HalibutClientException>(async () => await doSomeActionClient.ActionAsync(new(CancellationToken))))
.And.Message.Should().Contain("A request was sent to a polling endpoint, the polling endpoint collected it but did not respond in the allowed time (00:00:06), so the request timed out.");
stopwatch.Stop();

stopwatch.Elapsed.Should().BeLessThan(TimeSpan.FromSeconds(15), "Should have timed out quickly");

connectionsObserver.ConnectionAcceptedCount.Should().Be(1, "A single TCP connection should have been created");

waitSemaphore.Release();

Wait.UntilActionSucceeds(() =>
{
connectionsObserver.ConnectionClosedCount.Should().Be(1, "Cancelling the PendingRequest should have caused the TCP Connection to be cancelled to stop the in-flight request");
connectionsObserver.ConnectionAcceptedCount.Should().Be(2, "The Service should have reconnected after the request was cancelled");
}, TimeSpan.FromSeconds(30), Logger, CancellationToken);
}
}
}
}
70 changes: 37 additions & 33 deletions source/Halibut.Tests/ServiceModel/PendingRequestQueueFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using FluentAssertions;
using Halibut.ServiceModel;
using Halibut.Tests.Builders;
using Halibut.Tests.Support.TestAttributes;
using Halibut.Transport.Protocol;
using NUnit.Framework;

Expand All @@ -25,17 +24,16 @@ public async Task QueueAndWait_WillContinueWaitingUntilResponseIsApplied()
var sut = new PendingRequestQueueBuilder().WithEndpoint(endpoint).Build();
var request = new RequestMessageBuilder(endpoint).Build();
var expectedResponse = ResponseMessageBuilder.FromRequest(request).Build();

var queueAndWaitTask = await StartQueueAndWaitAndWaitForRequestToBeQueued(sut, request, CancellationToken);
await sut.DequeueAsync(CancellationToken);


// Act
await Task.Delay(1000, CancellationToken);
queueAndWaitTask.IsCompleted.Should().BeFalse();

await sut.ApplyResponse(expectedResponse, request.Destination);

// Assert
var response = await queueAndWaitTask;
response.Should().Be(expectedResponse);
Expand Down Expand Up @@ -85,7 +83,7 @@ public async Task QueueAndWait_WhenPollingRequestQueueTimeoutIsReached_WillStopW
var request = new RequestMessageBuilder(endpoint)
.WithServiceEndpoint(seb => seb.WithPollingRequestQueueTimeout(TimeSpan.FromMilliseconds(1000)))
.Build();

// Act
var stopwatch = Stopwatch.StartNew();
var queueAndWaitTask = await StartQueueAndWaitAndWaitForRequestToBeQueued(sut, request, CancellationToken);
Expand Down Expand Up @@ -118,8 +116,10 @@ public async Task QueueAndWait_WhenRequestIsDequeued_ButPollingRequestQueueTimeo

// Act
var stopwatch = Stopwatch.StartNew();
var (queueAndWaitTask, _) = await QueueAndDequeueRequest_ForTimeoutTestingOnly_ToCopeWithRaceCondition(sut, request, CancellationToken);
var (queueAndWaitTask, dequeued) = await QueueAndDequeueRequest_ForTimeoutTestingOnly_ToCopeWithRaceCondition(sut, request, CancellationToken);

var response = await queueAndWaitTask;
dequeued.CancellationToken.IsCancellationRequested.Should().BeTrue("Should have cancelled the request when the PollingRequestMaximumMessageProcessingTimeout is reached");

// Assert
// Although we sleep for 2 second, sometimes it can be just under. So be generous with the buffer.
Expand All @@ -130,7 +130,7 @@ public async Task QueueAndWait_WhenRequestIsDequeued_ButPollingRequestQueueTimeo
var next = await sut.DequeueAsync(CancellationToken);
next.Should().BeNull();
}

[Test]
public async Task QueueAndWait_WhenRequestIsDequeued_ButPollingRequestQueueTimeoutIsReached_ShouldWaitTillRequestRespondsAndClearRequest()
{
Expand All @@ -141,28 +141,31 @@ public async Task QueueAndWait_WhenRequestIsDequeued_ButPollingRequestQueueTimeo
.WithEndpoint(endpoint)
.WithPollingQueueWaitTimeout(TimeSpan.Zero) // Remove delay, otherwise we wait the full 20 seconds for DequeueAsync at the end of the test
.Build();

var request = new RequestMessageBuilder(endpoint)
.WithServiceEndpoint(seb => seb.WithPollingRequestQueueTimeout(TimeSpan.FromMilliseconds(1000)))
.Build();

var expectedResponse = ResponseMessageBuilder.FromRequest(request).Build();

// Act
var (queueAndWaitTask, dequeued) = await QueueAndDequeueRequest_ForTimeoutTestingOnly_ToCopeWithRaceCondition(sut, request, CancellationToken);

await Task.Delay(2000, CancellationToken);
dequeued.CancellationToken.IsCancellationRequested.Should().BeFalse("Should not have cancelled the request after PollingRequestQueueTimeout is reached");

await sut.ApplyResponse(expectedResponse, request.Destination);

var response = await queueAndWaitTask;

// Assert
dequeued.Should().NotBeNull("We should have removed the item from the queue before it timed out.").And.Be(request);
dequeued.RequestMessage.Should().NotBeNull("We should have removed the item from the queue before it timed out.").And.Be(request);
response.Should().Be(expectedResponse);

var next = await sut.DequeueAsync(CancellationToken);
next.Should().BeNull();
}

[Test]
public async Task QueueAndWait_AddingMultipleItemsToQueueInOrder_ShouldDequeueInOrder()
{
Expand Down Expand Up @@ -250,7 +253,7 @@ public async Task QueueAndWait_Can_Queue_Dequeue_Apply_VeryLargeNumberOfRequests
.ToList();

await Task.WhenAll(dequeueTasks);

// Assert
await ApplyResponsesConcurrentlyAndEnsureAllQueueResponsesMatch(sut, requestsInOrder, queueAndWaitTasksInOrder);
}
Expand All @@ -265,7 +268,7 @@ public async Task QueueAndWait_Can_Queue_Dequeue_WhenRequestsAreBeingCancelled()
const int minimumCancelledRequest = 100;

var sut = new PendingRequestQueueBuilder().WithEndpoint(endpoint).Build();

var requestsInOrder = Enumerable.Range(0, totalRequest)
.Select(_ => new RequestMessageBuilder(endpoint).Build())
.ToList();
Expand All @@ -276,13 +279,13 @@ public async Task QueueAndWait_Can_Queue_Dequeue_WhenRequestsAreBeingCancelled()
{
var requestCancellationTokenSource = new CancellationTokenSource();
return new Tuple<Task<ResponseMessage>, CancellationTokenSource>(
StartQueueAndWait(sut, request, requestCancellationTokenSource.Token),
StartQueueAndWait(sut, request, requestCancellationTokenSource.Token),
requestCancellationTokenSource);
})
.ToList();

await WaitForQueueCountToBecome(sut, requestsInOrder.Count);

var index = 0;
var cancelled = 0;
var dequeueTasks = new ConcurrentBag<Task<RequestMessageWithCancellationToken>>();
Expand All @@ -293,10 +296,10 @@ public async Task QueueAndWait_Can_Queue_Dequeue_WhenRequestsAreBeingCancelled()
{
var currentIndex = Interlocked.Increment(ref index);

if(currentIndex % 2 == 0)
if (currentIndex % 2 == 0)
{
Interlocked.Increment(ref cancelled);
queueAndWaitTasksInOrder.ElementAt(index-1).Item2.Cancel();
queueAndWaitTasksInOrder.ElementAt(index - 1).Item2.Cancel();
}
}
});
Expand Down Expand Up @@ -378,16 +381,17 @@ public async Task QueueAndWait_CancellingAPendingRequestAfterItIsDequeued_Should
// Cancel, and give the queue time to start waiting for a response
cancellationTokenSource.Cancel();
await Task.Delay(1000, CancellationToken);

dequeued.CancellationToken.IsCancellationRequested.Should().BeTrue("Should have cancelled the request");

await AssertionExtensions.Should(() => queueAndWaitTask).ThrowAsync<OperationCanceledException>();

// Assert
dequeued.RequestMessage.Should().NotBeNull().And.Be(request);

var next = await sut.DequeueAsync(CancellationToken);
next.Should().BeNull();
}

[Test]
public async Task DequeueAsync_WithNothingQueued_WillWaitPollingQueueWaitTimeout_ShouldReturnNull()
{
Expand All @@ -398,7 +402,7 @@ public async Task DequeueAsync_WithNothingQueued_WillWaitPollingQueueWaitTimeout
.WithEndpoint(endpoint)
.WithPollingQueueWaitTimeout(TimeSpan.FromSeconds(1))
.Build();

// Act
var stopwatch = Stopwatch.StartNew();
var request = await sut.DequeueAsync(CancellationToken);
Expand All @@ -424,15 +428,15 @@ public async Task DequeueAsync_WithNothingQueued_WillWaitPollingQueueWaitTimeout
var previousRequest = new RequestMessageBuilder(endpoint).Build();
var expectedPreviousResponse = ResponseMessageBuilder.FromRequest(previousRequest).Build();

var queueAndWaitTask = await StartQueueAndWaitAndWaitForRequestToBeQueued(sut, previousRequest ,CancellationToken);
var queueAndWaitTask = await StartQueueAndWaitAndWaitForRequestToBeQueued(sut, previousRequest, CancellationToken);
await sut.DequeueAsync(CancellationToken);
await sut.ApplyResponse(expectedPreviousResponse, previousRequest.Destination);
await queueAndWaitTask;

// Act
var stopwatch = Stopwatch.StartNew();
var dequeuedRequest = await sut.DequeueAsync(CancellationToken);

// Assert
// Although we sleep for 1 second, sometimes it can be just under. So be generous with the buffer.
stopwatch.Elapsed.Should().BeGreaterThan(TimeSpan.FromMilliseconds(800));
Expand All @@ -456,9 +460,9 @@ public async Task DequeueAsync_WillContinueWaitingUntilItemIsQueued()
await Task.Delay(1000, CancellationToken);

var queueAndWaitTask = StartQueueAndWait(sut, request, CancellationToken);

var dequeuedRequest = await dequeueTask;

// Assert
// Although we sleep for 1 second, sometimes it can be just under. So be generous with the buffer.
stopwatch.Elapsed.Should().BeGreaterThan(TimeSpan.FromMilliseconds(800));
Expand All @@ -470,7 +474,7 @@ public async Task DequeueAsync_WillContinueWaitingUntilItemIsQueued()
var response = await queueAndWaitTask;
response.Should().Be(expectedResponse);
}

[Test]
public async Task DequeueAsync_WithMultipleDequeueCallsWaiting_WhenSingleRequestIsQueued_ThenOnlyOneCallersReceivesRequest()
{
Expand All @@ -480,7 +484,7 @@ public async Task DequeueAsync_WithMultipleDequeueCallsWaiting_WhenSingleRequest
var sut = new PendingRequestQueueBuilder().WithEndpoint(endpoint).Build();
var request = new RequestMessageBuilder(endpoint).Build();
var expectedResponse = ResponseMessageBuilder.FromRequest(request).Build();

var dequeueTasks = Enumerable.Range(0, 30)
.Select(_ => sut.DequeueAsync(CancellationToken))
.ToArray();
Expand Down Expand Up @@ -558,8 +562,8 @@ async Task WaitForQueueCountToBecome(IPendingRequestQueue pendingRequestQueue, i
}

Task<ResponseMessage> StartQueueAndWait(
IPendingRequestQueue pendingRequestQueue,
RequestMessage request,
IPendingRequestQueue pendingRequestQueue,
RequestMessage request,
CancellationToken requestCancellationToken)
{
var task = Task.Run(
Expand All @@ -568,8 +572,8 @@ Task<ResponseMessage> StartQueueAndWait(
return task;
}

async Task<(Task<ResponseMessage> queueAndWaitTask, RequestMessage dequeued)> QueueAndDequeueRequest_ForTimeoutTestingOnly_ToCopeWithRaceCondition(
IPendingRequestQueue sut,
async Task<(Task<ResponseMessage> queueAndWaitTask, RequestMessageWithCancellationToken dequeued)> QueueAndDequeueRequest_ForTimeoutTestingOnly_ToCopeWithRaceCondition(
IPendingRequestQueue sut,
RequestMessage request,
CancellationToken cancellationToken)
{
Expand All @@ -588,7 +592,7 @@ Task<ResponseMessage> StartQueueAndWait(
// So if dequeued is null, then try again.
if (dequeued is not null)
{
return (queueAndWaitTask, dequeued.RequestMessage);
return (queueAndWaitTask, dequeued);
}

cancellationToken.ThrowIfCancellationRequested();
Expand All @@ -606,9 +610,9 @@ static async Task ApplyResponsesConcurrentlyAndEnsureAllQueueResponsesMatch(

//Concurrently apply responses to prove this does not cause issues.
var applyResponseTasks = requestsInOrder
.Select((r,i) => Task.Factory.StartNew(async () => await sut.ApplyResponse(expectedResponsesInOrder[i], r.Destination)))
.Select((r, i) => Task.Factory.StartNew(async () => await sut.ApplyResponse(expectedResponsesInOrder[i], r.Destination)))
.ToList();

await Task.WhenAll(applyResponseTasks);

var index = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,6 @@ public static LatestClientAndLatestServiceBuilder WithInstantReconnectPollingRet
{
return builder.WithPollingReconnectRetryPolicy(() => new RetryPolicy(1, TimeSpan.Zero, TimeSpan.Zero));
}

public static LatestClientAndLatestServiceBuilder WhenTestingAsyncClient(this LatestClientAndLatestServiceBuilder builder, ClientAndServiceTestCase clientAndServiceTestCase, Action<LatestClientAndLatestServiceBuilder> action)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed as everything is Async

{

action(builder);
return builder;
}

public static LatestClientAndLatestServiceBuilder WithConnectionObserverOnTcpServer(this LatestClientAndLatestServiceBuilder builder, IConnectionsObserver connectionsObserver)
{
Expand Down
Loading