Skip to content

Commit

Permalink
fix: catch OperationCanceledException when timeout before next acquis…
Browse files Browse the repository at this point in the history
…ition expire
  • Loading branch information
aneojgurhem committed Jan 16, 2024
1 parent 8425333 commit af8f08c
Showing 1 changed file with 76 additions and 13 deletions.
89 changes: 76 additions & 13 deletions Common/src/Pollster/TaskQueueBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,45 +30,108 @@ public abstract class TaskQueueBase


private readonly Queue<Exception> exceptions_ = new();
private readonly bool singleReader_;

/// <summary>
/// Create an instance
/// </summary>
/// <param name="singleReader">whether only one reader can retrieve item from the queue</param>
public TaskQueueBase(bool singleReader)
=> channel_ = Channel.CreateBounded<TaskHandler>(new BoundedChannelOptions(1)
{
Capacity = 1,
FullMode = BoundedChannelFullMode.Wait,
SingleReader = singleReader,
SingleWriter = true,
});
{
singleReader_ = singleReader;
channel_ = Channel.CreateBounded<TaskHandler>(new BoundedChannelOptions(1)
{
Capacity = 1,
FullMode = BoundedChannelFullMode.Wait,
SingleReader = singleReader,
SingleWriter = true,
});
}

/// <summary>
/// Put a handler in the queue
/// </summary>
/// <param name="handler">the handler to insert</param>
/// <param name="cancellationToken">Token used to cancel the execution of the method</param>
/// <returns>
/// Task representing the asynchronous execution of the method
/// </returns>
public async Task WriteAsync(TaskHandler handler,
CancellationToken cancellationToken)
=> await channel_.Writer.WriteAsync(handler,
cancellationToken)
.ConfigureAwait(false);

/// <summary>
/// Wait for the availability of the next write.
/// Remove and dispose the current handler if timeout expires.
/// </summary>
/// <param name="timeout">Timeout before the handler is removed and disposed</param>
/// <param name="cancellationToken">Token used to cancel the execution of the method</param>
/// <returns>
/// Task representing the asynchronous execution of the method
/// </returns>
/// <exception cref="InvalidOperationException">if this method is used when the queue is in single mode</exception>
public async Task WaitForNextWriteAsync(TimeSpan timeout,
CancellationToken cancellationToken)
{
if (singleReader_)
{
throw new InvalidOperationException("Cannot use this method in single reader mode");
}

using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(timeout);

await channel_.Writer.WaitToWriteAsync(cts.Token)
.ConfigureAwait(false);

if (channel_.Reader.TryRead(out var handler))
try
{
// block until handler is consumed or token is cancelled
await channel_.Writer.WaitToWriteAsync(cts.Token)
.ConfigureAwait(false);
}
catch (Exception e)
{
await handler.DisposeAsync()
.ConfigureAwait(false);
// Consumer took too long to retrieve the handler or there was an unrecoverable error
// so we remove the handler from the channel and dispose it.
if (channel_.Reader.TryRead(out var handler))
{
await handler.DisposeAsync()
.ConfigureAwait(false);
}

// if the wait was cancelled because it reached the timeout, we ignore the error
if (e is not OperationCanceledException || !cts.IsCancellationRequested)
{
throw;
}
}
}

/// <summary>
/// Retrieve an handler
/// </summary>
/// <param name="cancellationToken">Token used to cancel the execution of the method</param>
/// <returns>
/// Task representing the asynchronous execution of the method
/// </returns>
public async Task<TaskHandler> ReadAsync(CancellationToken cancellationToken)
=> await channel_.Reader.ReadAsync(cancellationToken)
.ConfigureAwait(false);

/// <summary>
/// Add an exception in the internal exception list
/// </summary>
/// <param name="e">the exception to add</param>
public void AddException(Exception e)
=> exceptions_.Enqueue(e);

/// <summary>
/// Get and remove an exception from the internal list of exception
/// </summary>
/// <param name="e">the exception to return</param>
/// <returns>
/// Whether there is an exception in the internal list
/// </returns>
public bool RemoveException([MaybeNullWhen(false)] out Exception e)
{
var r = exceptions_.Count > 0;
Expand Down

0 comments on commit af8f08c

Please sign in to comment.