Skip to content

Commit

Permalink
PR Feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanwoctopusdeploy committed Dec 6, 2023
1 parent 6b3e28d commit 8c406ed
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 167 deletions.
6 changes: 1 addition & 5 deletions source/Halibut.Tests/CancellationViaClientProxyFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,7 @@ await lockService.WaitForFileToBeDeletedAsync(
// Give time for the cancellation to do something
await Task.Delay(TimeSpan.FromSeconds(2), CancellationToken);

#pragma warning disable VSTHRD003
#pragma warning disable VSTHRD003
(await AssertionExtensions.Should(async () => await inFlightRequest).ThrowAsync<Exception>())
#pragma warning restore VSTHRD003
#pragma warning restore VSTHRD003
(await AssertException.Throws<Exception>(inFlightRequest))
.And.Should().Match(x => x is TransferringRequestCancelledException || (x is HalibutClientException && x.As<HalibutClientException>().Message.Contains("The Request was cancelled while Transferring")));
}
}
Expand Down
4 changes: 2 additions & 2 deletions source/Halibut.Tests/PollingServiceTimeoutsFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ public async Task WhenThePollingRequestMaximumMessageProcessingTimeoutIsReached_

Wait.UntilActionSucceeds(() =>
{
connectionsObserver.ConnectionClosedCount.Should().Be(1, "Cancelling the PendingRequest does not cause the TCP Connection to be cancelled to stop the in-flight request");
connectionsObserver.ConnectionAcceptedCount.Should().Be(2, "The Service won't have reconnected after the request was cancelled");
connectionsObserver.ConnectionClosedCount.Should().Be(1, "Cancelling the PendingRequest should have caused the TCP Connection to be terminated to stop the in-flight request");
connectionsObserver.ConnectionAcceptedCount.Should().Be(2, "The Service should have reconnected after the request was cancelled");
}, TimeSpan.FromSeconds(30), Logger, CancellationToken);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,13 +383,8 @@ public async Task QueueAndWait_CancellingAPendingRequestAfterItIsDequeued_Should
cancellationTokenSource.Cancel();
await Task.Delay(1000, CancellationToken);
dequeued!.CancellationToken.IsCancellationRequested.Should().BeTrue("Should have cancelled the request");


#pragma warning disable VSTHRD003
#pragma warning disable VSTHRD003
await AssertionExtensions.Should(() => queueAndWaitTask).ThrowAsync<OperationCanceledException>();
#pragma warning restore VSTHRD003
#pragma warning restore VSTHRD003
await AssertException.Throws<OperationCanceledException>(queueAndWaitTask);

// Assert
dequeued?.RequestMessage.Should().NotBeNull().And.Be(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,52 +9,64 @@ namespace Halibut.Tests.Support.PendingRequestQueueFactories
/// <summary>
/// CancelWhenRequestDequeuedPendingRequestQueueFactory cancels the cancellation token source when a request is queued
/// </summary>
public class CancelWhenRequestDequeuedPendingRequestQueueFactory : IPendingRequestQueueFactory
class CancelWhenRequestDequeuedPendingRequestQueueFactory : IPendingRequestQueueFactory
{
readonly CancellationTokenSource cancellationTokenSource;
readonly Func<bool>? shouldCancelOnDequeue;
readonly Action<ResponseMessage>? onResponseApplied;
readonly IPendingRequestQueueFactory inner;

public CancelWhenRequestDequeuedPendingRequestQueueFactory(IPendingRequestQueueFactory inner, CancellationTokenSource cancellationTokenSource, Func<bool>? shouldCancelOnDequeue = null, Action<ResponseMessage>? onResponseApplied = null)
{
this.cancellationTokenSource = cancellationTokenSource;
this.shouldCancelOnDequeue = shouldCancelOnDequeue;
this.inner = inner;
this.onResponseApplied = onResponseApplied;
}

public IPendingRequestQueue CreateQueue(Uri endpoint)
{
return new Decorator(inner.CreateQueue(endpoint), cancellationTokenSource, shouldCancelOnDequeue, onResponseApplied);
}

class Decorator : IPendingRequestQueue
{
readonly CancellationTokenSource[] cancellationTokenSources;
readonly IPendingRequestQueueFactory inner;
readonly CancellationTokenSource cancellationTokenSource;
readonly Func<bool>? shouldCancel;
readonly Action<ResponseMessage>? onResponseApplied;
readonly IPendingRequestQueue inner;

public CancelWhenRequestDequeuedPendingRequestQueueFactory(IPendingRequestQueueFactory inner, CancellationTokenSource[] cancellationTokenSources)
public Decorator(IPendingRequestQueue inner, CancellationTokenSource cancellationTokenSource, Func<bool>? shouldCancel, Action<ResponseMessage>? onResponseApplied)
{
this.cancellationTokenSources = cancellationTokenSources;
this.inner = inner;
this.cancellationTokenSource = cancellationTokenSource;
this.shouldCancel = shouldCancel;
this.onResponseApplied = onResponseApplied;
}

public CancelWhenRequestDequeuedPendingRequestQueueFactory(IPendingRequestQueueFactory inner, CancellationTokenSource cancellationTokenSource): this(inner, new []{ cancellationTokenSource })
{
}
public bool IsEmpty => inner.IsEmpty;
public int Count => inner.Count;

public IPendingRequestQueue CreateQueue(Uri endpoint)
public async Task ApplyResponse(ResponseMessage response, ServiceEndPoint destination)
{
return new Decorator(inner.CreateQueue(endpoint), cancellationTokenSources);
onResponseApplied?.Invoke(response);
await inner.ApplyResponse(response, destination);
}

class Decorator : IPendingRequestQueue
public async Task<RequestMessageWithCancellationToken?> DequeueAsync(CancellationToken cancellationToken)
{
readonly CancellationTokenSource[] cancellationTokenSources;
readonly IPendingRequestQueue inner;
var response = await inner.DequeueAsync(cancellationToken);

public Decorator(IPendingRequestQueue inner, CancellationTokenSource[] cancellationTokenSources)
if (shouldCancel?.Invoke() ?? true)
{
this.inner = inner;
this.cancellationTokenSources = cancellationTokenSources;
cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(2));
}

public bool IsEmpty => inner.IsEmpty;
public int Count => inner.Count;
public async Task ApplyResponse(ResponseMessage response, ServiceEndPoint destination) => await inner.ApplyResponse(response, destination);

public async Task<RequestMessageWithCancellationToken?> DequeueAsync(CancellationToken cancellationToken)
{
var response = await inner.DequeueAsync(cancellationToken);

Parallel.ForEach(cancellationTokenSources, cancellationTokenSource => cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(2)));

return response;
}

public async Task<ResponseMessage> QueueAndWaitAsync(RequestMessage request, CancellationToken cancellationTokens)
=> await inner.QueueAndWaitAsync(request, cancellationTokens);
return response;
}

public async Task<ResponseMessage> QueueAndWaitAsync(RequestMessage request, CancellationToken requestCancellationToken)
=> await inner.QueueAndWaitAsync(request, requestCancellationToken);
}
}
}
}
122 changes: 0 additions & 122 deletions source/Halibut.Tests/WhenCancellingARequestForAPollingTentacle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,127 +120,5 @@ void OnResponseApplied(ResponseMessage response)

return (requestCancellationTokenSource, halibutProxyRequestOptions);
}

/// <summary>
/// CancelWhenRequestQueuedPendingRequestQueueFactory cancels the cancellation token source when a request is queued
/// </summary>
class CancelWhenRequestQueuedPendingRequestQueueFactory : IPendingRequestQueueFactory
{
readonly CancellationTokenSource cancellationTokenSource;
readonly IPendingRequestQueueFactory inner;

public CancelWhenRequestQueuedPendingRequestQueueFactory(IPendingRequestQueueFactory inner, CancellationTokenSource cancellationTokenSource)
{
this.cancellationTokenSource = cancellationTokenSource;
this.inner = inner;
}

public IPendingRequestQueue CreateQueue(Uri endpoint)
{
return new Decorator(inner.CreateQueue(endpoint), cancellationTokenSource);
}

class Decorator : IPendingRequestQueue
{
readonly CancellationTokenSource cancellationTokenSource;
readonly IPendingRequestQueue inner;

public Decorator(IPendingRequestQueue inner, CancellationTokenSource cancellationTokenSource)
{
this.inner = inner;
this.cancellationTokenSource = cancellationTokenSource;
}

public bool IsEmpty => inner.IsEmpty;
public int Count => inner.Count;
public async Task ApplyResponse(ResponseMessage response, ServiceEndPoint destination)
{
await inner.ApplyResponse(response, destination);
}

public async Task<RequestMessageWithCancellationToken?> DequeueAsync(CancellationToken cancellationToken) => await inner.DequeueAsync(cancellationToken);

public async Task<ResponseMessage> QueueAndWaitAsync(RequestMessage request, CancellationToken requestCancellationToken)
{
var task = Task.Run(async () =>
{
while (inner.IsEmpty)
{
await Task.Delay(TimeSpan.FromMilliseconds(10), CancellationToken.None);
}

cancellationTokenSource.Cancel();
},
CancellationToken.None);

var result = await inner.QueueAndWaitAsync(request, requestCancellationToken);
await task;
return result;
}
}
}

/// <summary>
/// CancelWhenRequestDequeuedPendingRequestQueueFactory cancels the cancellation token source when a request is queued
/// </summary>
class CancelWhenRequestDequeuedPendingRequestQueueFactory : IPendingRequestQueueFactory
{
readonly CancellationTokenSource cancellationTokenSource;
readonly Func<bool> shouldCancelOnDequeue;
readonly Action<ResponseMessage> onResponseApplied;
readonly IPendingRequestQueueFactory inner;

public CancelWhenRequestDequeuedPendingRequestQueueFactory(IPendingRequestQueueFactory inner, CancellationTokenSource cancellationTokenSource, Func<bool> shouldCancelOnDequeue, Action<ResponseMessage> onResponseApplied)
{
this.cancellationTokenSource = cancellationTokenSource;
this.shouldCancelOnDequeue = shouldCancelOnDequeue;
this.inner = inner;
this.onResponseApplied = onResponseApplied;
}

public IPendingRequestQueue CreateQueue(Uri endpoint)
{
return new Decorator(inner.CreateQueue(endpoint), cancellationTokenSource, shouldCancelOnDequeue, onResponseApplied);
}

class Decorator : IPendingRequestQueue
{
readonly CancellationTokenSource cancellationTokenSource;
readonly Func<bool> shouldCancel;
readonly Action<ResponseMessage> onResponseApplied;
readonly IPendingRequestQueue inner;

public Decorator(IPendingRequestQueue inner, CancellationTokenSource cancellationTokenSource, Func<bool> shouldCancel, Action<ResponseMessage> onResponseApplied)
{
this.inner = inner;
this.cancellationTokenSource = cancellationTokenSource;
this.shouldCancel = shouldCancel;
this.onResponseApplied = onResponseApplied;
}

public bool IsEmpty => inner.IsEmpty;
public int Count => inner.Count;
public async Task ApplyResponse(ResponseMessage response, ServiceEndPoint destination)
{
onResponseApplied(response);
await inner.ApplyResponse(response, destination);
}

public async Task<RequestMessageWithCancellationToken?> DequeueAsync(CancellationToken cancellationToken)
{
var response = await inner.DequeueAsync(cancellationToken);

if (shouldCancel())
{
cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(2));
}

return response;
}

public async Task<ResponseMessage> QueueAndWaitAsync(RequestMessage request, CancellationToken requestCancellationToken)
=> await inner.QueueAndWaitAsync(request, requestCancellationToken);
}
}
}
}

0 comments on commit 8c406ed

Please sign in to comment.