diff --git a/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj b/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj
index c058b0b216b17..a5030bcee6561 100644
--- a/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj
+++ b/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj
@@ -25,6 +25,7 @@ System.Threading.Channel<T>
+
@@ -44,7 +45,6 @@ System.Threading.Channel<T>
-
diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs
index 317636579a05f..834d8ad88ed1d 100644
--- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs
+++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs
@@ -1,6 +1,10 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
+
namespace System.Threading.Channels
{
/// Provides static methods for creating channels.
@@ -9,7 +13,7 @@ public static partial class Channel
/// Creates an unbounded channel usable by any number of readers and writers concurrently.
/// The created channel.
public static Channel CreateUnbounded() =>
- new UnboundedChannel(runContinuationsAsynchronously: true);
+ new UnboundedChannel>(new(new()), runContinuationsAsynchronously: true);
/// Creates an unbounded channel subject to the provided options.
/// Specifies the type of data in the channel.
@@ -27,7 +31,7 @@ public static Channel CreateUnbounded(UnboundedChannelOptions options)
return new SingleConsumerUnboundedChannel(!options.AllowSynchronousContinuations);
}
- return new UnboundedChannel(!options.AllowSynchronousContinuations);
+ return new UnboundedChannel>(new(new()), !options.AllowSynchronousContinuations);
}
/// Creates a channel with the specified maximum capacity.
@@ -71,5 +75,32 @@ public static Channel CreateBounded(BoundedChannelOptions options, Action<
return new BoundedChannel(options.Capacity, options.FullMode, !options.AllowSynchronousContinuations, itemDropped);
}
+
+ /// Provides an for a .
+ private readonly struct UnboundedChannelConcurrentQueue(ConcurrentQueue queue) : IUnboundedChannelQueue
+ {
+ private readonly ConcurrentQueue _queue = queue;
+
+ ///
+ public bool IsThreadSafe => true;
+
+ ///
+ public void Enqueue(T item) => _queue.Enqueue(item);
+
+ ///
+ public bool TryDequeue([MaybeNullWhen(false)] out T item) => _queue.TryDequeue(out item);
+
+ ///
+ public bool TryPeek([MaybeNullWhen(false)] out T item) => _queue.TryPeek(out item);
+
+ ///
+ public int Count => _queue.Count;
+
+ ///
+ public bool IsEmpty => _queue.IsEmpty;
+
+ ///
+ public IEnumerator GetEnumerator() => _queue.GetEnumerator();
+ }
}
}
diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.netcoreapp.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.netcoreapp.cs
index 6c24b3e41ec7b..c8fc9baebae7d 100644
--- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.netcoreapp.cs
+++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.netcoreapp.cs
@@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.
using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
namespace System.Threading.Channels
{
@@ -9,13 +10,14 @@ namespace System.Threading.Channels
public static partial class Channel
{
/// Creates an unbounded prioritized channel usable by any number of readers and writers concurrently.
+ /// Specifies the type of data in the channel.
/// The created channel.
///
/// is used to determine priority of elements.
/// The next item read from the channel will be the element available in the channel with the lowest priority value.
///
public static Channel CreateUnboundedPrioritized() =>
- new UnboundedPrioritizedChannel(runContinuationsAsynchronously: true, comparer: null);
+ new UnboundedChannel>(new(new()), runContinuationsAsynchronously: true);
/// Creates an unbounded prioritized channel subject to the provided options.
/// Specifies the type of data in the channel.
@@ -30,7 +32,45 @@ public static Channel CreateUnboundedPrioritized(UnboundedPrioritizedChann
{
ArgumentNullException.ThrowIfNull(options);
- return new UnboundedPrioritizedChannel(!options.AllowSynchronousContinuations, options.Comparer);
+ return new UnboundedChannel>(new(new(options.Comparer)), !options.AllowSynchronousContinuations);
+ }
+
+ /// Provides an for a .
+ private readonly struct UnboundedChannelPriorityQueue(PriorityQueue queue) : IUnboundedChannelQueue
+ {
+ private readonly PriorityQueue _queue = queue;
+
+ ///
+ public bool IsThreadSafe => false;
+
+ ///
+ public void Enqueue(T item) => _queue.Enqueue(true, item);
+
+ ///
+ public bool TryDequeue([MaybeNullWhen(false)] out T item) => _queue.TryDequeue(out _, out item);
+
+ ///
+ public bool TryPeek([MaybeNullWhen(false)] out T item) => _queue.TryPeek(out _, out item);
+
+ ///
+ public int Count => _queue.Count;
+
+ ///
+ public bool IsEmpty => _queue.Count == 0;
+
+ ///
+ public IEnumerator GetEnumerator()
+ {
+ List list = [];
+ foreach ((bool _, T Priority) item in _queue.UnorderedItems)
+ {
+ list.Add(item.Priority);
+ }
+
+ list.Sort(_queue.Comparer);
+
+ return list.GetEnumerator();
+ }
}
}
}
diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IDebugEnumerator.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IDebugEnumerator.cs
index a3d072ee9f7cb..af2a77bb1bf77 100644
--- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IDebugEnumerator.cs
+++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IDebugEnumerator.cs
@@ -11,7 +11,7 @@ internal interface IDebugEnumerable
IEnumerator GetEnumerator();
}
- internal sealed class DebugEnumeratorDebugView
+ internal class DebugEnumeratorDebugView
{
public DebugEnumeratorDebugView(IDebugEnumerable enumerable)
{
@@ -26,4 +26,6 @@ public DebugEnumeratorDebugView(IDebugEnumerable enumerable)
[DebuggerBrowsable(DebuggerBrowsableState.RootHidden)]
public T[] Items { get; }
}
+
+ internal sealed class DebugEnumeratorDebugView(IDebugEnumerable enumerable) : DebugEnumeratorDebugView(enumerable);
}
diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IUnboundedChannelQueue.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IUnboundedChannelQueue.cs
new file mode 100644
index 0000000000000..b1b65a1dffeb1
--- /dev/null
+++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/IUnboundedChannelQueue.cs
@@ -0,0 +1,35 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
+
+namespace System.Threading.Channels
+{
+ /// Representation of the queue data structure used by .
+ internal interface IUnboundedChannelQueue : IDebugEnumerable
+ {
+ /// Gets whether the other members are safe to use concurrently with each other and themselves.
+ bool IsThreadSafe { get; }
+
+ /// Enqueues an item into the queue.
+ /// The item to enqueue.
+ void Enqueue(T item);
+
+ /// Dequeues an item from the queue, if possible.
+ /// The dequeued item, or default if the queue was empty.
+ /// Whether an item was dequeued.
+ bool TryDequeue([MaybeNullWhen(false)] out T item);
+
+ /// Peeks at the next item from the queue that would be dequeued, if possible.
+ /// The peeked item, or default if the queue was empty.
+ /// Whether an item was peeked.
+ bool TryPeek([MaybeNullWhen(false)] out T item);
+
+ /// Gets the number of elements in the queue.
+ int Count { get; }
+
+ /// Gets whether the queue is empty.
+ bool IsEmpty { get; }
+ }
+}
diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedChannel.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedChannel.cs
index fb3facf83dc47..ad7ee0e3608d3 100644
--- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedChannel.cs
+++ b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedChannel.cs
@@ -5,19 +5,20 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
+using System.Runtime.CompilerServices;
using System.Threading.Tasks;
namespace System.Threading.Channels
{
/// Provides a buffered channel of unbounded capacity.
[DebuggerDisplay("Items = {ItemsCountForDebugger}, Closed = {ChannelIsClosedForDebugger}")]
- [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))]
- internal sealed class UnboundedChannel : Channel, IDebugEnumerable
+ [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<,>))]
+ internal sealed class UnboundedChannel : Channel, IDebugEnumerable where TQueue : struct, IUnboundedChannelQueue
{
/// Task that indicates the channel has completed.
private readonly TaskCompletionSource _completion;
/// The items in the channel.
- private readonly ConcurrentQueue _items = new ConcurrentQueue();
+ private readonly TQueue _items;
/// Readers blocked reading from the channel.
private readonly Deque> _blockedReaders = new Deque>();
/// Whether to force continuations to be executed asynchronously from producer writes.
@@ -29,8 +30,9 @@ internal sealed class UnboundedChannel : Channel, IDebugEnumerable
private Exception? _doneWriting;
/// Initialize the channel.
- internal UnboundedChannel(bool runContinuationsAsynchronously)
+ internal UnboundedChannel(TQueue items, bool runContinuationsAsynchronously)
{
+ _items = items;
_runContinuationsAsynchronously = runContinuationsAsynchronously;
_completion = new TaskCompletionSource(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None);
Reader = new UnboundedChannelReader(this);
@@ -38,14 +40,14 @@ internal UnboundedChannel(bool runContinuationsAsynchronously)
}
[DebuggerDisplay("Items = {Count}")]
- [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))]
+ [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<,>))]
private sealed class UnboundedChannelReader : ChannelReader, IDebugEnumerable
{
- internal readonly UnboundedChannel _parent;
+ internal readonly UnboundedChannel _parent;
private readonly AsyncOperation _readerSingleton;
private readonly AsyncOperation _waiterSingleton;
- internal UnboundedChannelReader(UnboundedChannel parent)
+ internal UnboundedChannelReader(UnboundedChannel parent)
{
_parent = parent;
_readerSingleton = new AsyncOperation(parent._runContinuationsAsynchronously, pooled: true);
@@ -68,8 +70,8 @@ public override ValueTask ReadAsync(CancellationToken cancellationToken)
}
// Dequeue an item if we can.
- UnboundedChannel parent = _parent;
- if (parent._items.TryDequeue(out T? item))
+ UnboundedChannel parent = _parent;
+ if (parent._items.IsThreadSafe && parent._items.TryDequeue(out T? item))
{
CompleteIfDone(parent);
return new ValueTask(item);
@@ -112,24 +114,60 @@ public override ValueTask ReadAsync(CancellationToken cancellationToken)
public override bool TryRead([MaybeNullWhen(false)] out T item)
{
- UnboundedChannel parent = _parent;
+ UnboundedChannel parent = _parent;
+ return parent._items.IsThreadSafe ?
+ LockFree(parent, out item) :
+ Locked(parent, out item);
- // Dequeue an item if we can
- if (parent._items.TryDequeue(out item))
+ static bool LockFree(UnboundedChannel parent, [MaybeNullWhen(false)] out T item)
{
- CompleteIfDone(parent);
- return true;
+ if (parent._items.TryDequeue(out item))
+ {
+ CompleteIfDone(parent);
+ return true;
+ }
+
+ item = default;
+ return false;
}
- item = default;
- return false;
+ static bool Locked(UnboundedChannel parent, [MaybeNullWhen(false)] out T item)
+ {
+ lock (parent.SyncObj)
+ {
+ if (parent._items.TryDequeue(out item))
+ {
+ CompleteIfDone(parent);
+ return true;
+ }
+ }
+
+ item = default;
+ return false;
+ }
}
- public override bool TryPeek([MaybeNullWhen(false)] out T item) =>
- _parent._items.TryPeek(out item);
+ public override bool TryPeek([MaybeNullWhen(false)] out T item)
+ {
+ UnboundedChannel parent = _parent;
+ return parent._items.IsThreadSafe ?
+ parent._items.TryPeek(out item) :
+ Locked(parent, out item);
+
+ // Separated out to keep the try/finally from preventing TryPeek from being inlined
+ static bool Locked(UnboundedChannel parent, [MaybeNullWhen(false)] out T item)
+ {
+ lock (parent.SyncObj)
+ {
+ return parent._items.TryPeek(out item);
+ }
+ }
+ }
- private static void CompleteIfDone(UnboundedChannel parent)
+ private static void CompleteIfDone(UnboundedChannel parent)
{
+ Debug.Assert(parent._items.IsThreadSafe || Monitor.IsEntered(parent.SyncObj));
+
if (parent._doneWriting != null && parent._items.IsEmpty)
{
// If we've now emptied the items queue and we're not getting any more, complete.
@@ -144,12 +182,12 @@ public override ValueTask WaitToReadAsync(CancellationToken cancellationTo
return new ValueTask(Task.FromCanceled(cancellationToken));
}
- if (!_parent._items.IsEmpty)
+ if (_parent._items.IsThreadSafe && !_parent._items.IsEmpty)
{
return new ValueTask(true);
}
- UnboundedChannel parent = _parent;
+ UnboundedChannel parent = _parent;
lock (parent.SyncObj)
{
@@ -192,15 +230,15 @@ public override ValueTask WaitToReadAsync(CancellationToken cancellationTo
}
[DebuggerDisplay("Items = {ItemsCountForDebugger}")]
- [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))]
+ [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<,>))]
private sealed class UnboundedChannelWriter : ChannelWriter, IDebugEnumerable
{
- internal readonly UnboundedChannel _parent;
- internal UnboundedChannelWriter(UnboundedChannel parent) => _parent = parent;
+ internal readonly UnboundedChannel _parent;
+ internal UnboundedChannelWriter(UnboundedChannel parent) => _parent = parent;
public override bool TryComplete(Exception? error)
{
- UnboundedChannel parent = _parent;
+ UnboundedChannel parent = _parent;
bool completeTask;
lock (parent.SyncObj)
@@ -240,7 +278,7 @@ public override bool TryComplete(Exception? error)
public override bool TryWrite(T item)
{
- UnboundedChannel parent = _parent;
+ UnboundedChannel parent = _parent;
while (true)
{
AsyncOperation? blockedReader = null;
@@ -321,7 +359,7 @@ public override ValueTask WriteAsync(T item, CancellationToken cancellationToken
}
/// Gets the object used to synchronize access to all state on this instance.
- private object SyncObj => _items;
+ private object SyncObj => _blockedReaders;
[Conditional("DEBUG")]
private void AssertInvariants()
diff --git a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedPriorityChannel.cs b/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedPriorityChannel.cs
deleted file mode 100644
index 7af18b9413ee2..0000000000000
--- a/src/libraries/System.Threading.Channels/src/System/Threading/Channels/UnboundedPriorityChannel.cs
+++ /dev/null
@@ -1,369 +0,0 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-
-using System.Collections.Generic;
-using System.Diagnostics;
-using System.Diagnostics.CodeAnalysis;
-using System.Threading.Tasks;
-
-// This file is primarily a copy of UnboundedChannel, subsequently tweaked to account for differences
-// between ConcurrentQueue and PriorityQueue, e.g. that PQ isn't thread safe and so fast
-// paths outside of locks need to be removed, that Enqueue/Dequeue methods take priorities, etc. Any
-// changes made to this or that file should largely be kept in sync.
-
-namespace System.Threading.Channels
-{
- /// Provides a buffered channel of unbounded capacity.
- [DebuggerDisplay("Items = {ItemsCountForDebugger}, Closed = {ChannelIsClosedForDebugger}")]
- [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))]
- internal sealed class UnboundedPrioritizedChannel : Channel, IDebugEnumerable
- {
- /// Task that indicates the channel has completed.
- private readonly TaskCompletionSource _completion;
- /// The items in the channel.
- /// To avoid double storing of a potentially large struct T, the priority doubles as the element and the element is ignored.
- private readonly PriorityQueue _items;
- /// Readers blocked reading from the channel.
- private readonly Deque> _blockedReaders = new Deque>();
- /// Whether to force continuations to be executed asynchronously from producer writes.
- private readonly bool _runContinuationsAsynchronously;
-
- /// Readers waiting for a notification that data is available.
- private AsyncOperation? _waitingReadersTail;
- /// Set to non-null once Complete has been called.
- private Exception? _doneWriting;
-
- /// Initialize the channel.
- internal UnboundedPrioritizedChannel(bool runContinuationsAsynchronously, IComparer? comparer)
- {
- _runContinuationsAsynchronously = runContinuationsAsynchronously;
- _completion = new TaskCompletionSource(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None);
- Reader = new UnboundedPrioritizedChannelReader(this);
- Writer = new UnboundedPrioritizedChannelWriter(this);
- _items = new PriorityQueue(comparer);
- }
-
- [DebuggerDisplay("Items = {Count}")]
- [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))]
- private sealed class UnboundedPrioritizedChannelReader : ChannelReader, IDebugEnumerable
- {
- internal readonly UnboundedPrioritizedChannel _parent;
- private readonly AsyncOperation _readerSingleton;
- private readonly AsyncOperation _waiterSingleton;
-
- internal UnboundedPrioritizedChannelReader(UnboundedPrioritizedChannel parent)
- {
- _parent = parent;
- _readerSingleton = new AsyncOperation(parent._runContinuationsAsynchronously, pooled: true);
- _waiterSingleton = new AsyncOperation(parent._runContinuationsAsynchronously, pooled: true);
- }
-
- public override Task Completion => _parent._completion.Task;
-
- public override bool CanCount => true;
-
- public override bool CanPeek => true;
-
- public override int Count => _parent._items.Count;
-
- public override ValueTask ReadAsync(CancellationToken cancellationToken)
- {
- if (cancellationToken.IsCancellationRequested)
- {
- return ValueTask.FromCanceled(cancellationToken);
- }
-
- // Dequeue an item if we can.
- UnboundedPrioritizedChannel parent = _parent;
- lock (parent.SyncObj)
- {
- parent.AssertInvariants();
-
- // Try to dequeue again, now that we hold the lock.
- if (parent._items.TryDequeue(out _, out T? item))
- {
- CompleteIfDone(parent);
- return new ValueTask(item);
- }
-
- // There are no items, so if we're done writing, fail.
- if (parent._doneWriting != null)
- {
- return ChannelUtilities.GetInvalidCompletionValueTask(parent._doneWriting);
- }
-
- // If we're able to use the singleton reader, do so.
- if (!cancellationToken.CanBeCanceled)
- {
- AsyncOperation singleton = _readerSingleton;
- if (singleton.TryOwnAndReset())
- {
- parent._blockedReaders.EnqueueTail(singleton);
- return singleton.ValueTaskOfT;
- }
- }
-
- // Otherwise, create and queue a reader.
- var reader = new AsyncOperation(parent._runContinuationsAsynchronously, cancellationToken);
- parent._blockedReaders.EnqueueTail(reader);
- return reader.ValueTaskOfT;
- }
- }
-
- public override bool TryRead([MaybeNullWhen(false)] out T item)
- {
- UnboundedPrioritizedChannel parent = _parent;
- lock (parent.SyncObj)
- {
- // Dequeue an item if we can
- if (parent._items.TryDequeue(out _, out item))
- {
- CompleteIfDone(parent);
- return true;
- }
-
- item = default;
- return false;
- }
- }
-
- public override bool TryPeek([MaybeNullWhen(false)] out T item) =>
- _parent._items.TryPeek(out _, out item);
-
- private static void CompleteIfDone(UnboundedPrioritizedChannel parent)
- {
- Debug.Assert(Monitor.IsEntered(parent.SyncObj));
-
- if (parent._doneWriting != null && parent._items.Count == 0)
- {
- // If we've now emptied the items queue and we're not getting any more, complete.
- ChannelUtilities.Complete(parent._completion, parent._doneWriting);
- }
- }
-
- public override ValueTask WaitToReadAsync(CancellationToken cancellationToken)
- {
- if (cancellationToken.IsCancellationRequested)
- {
- return ValueTask.FromCanceled(cancellationToken);
- }
-
- UnboundedPrioritizedChannel parent = _parent;
- lock (parent.SyncObj)
- {
- parent.AssertInvariants();
-
- // Try again to read now that we're synchronized with writers.
- if (parent._items.Count != 0)
- {
- return new ValueTask(true);
- }
-
- // There are no items, so if we're done writing, there's never going to be data available.
- if (parent._doneWriting != null)
- {
- return parent._doneWriting != ChannelUtilities.s_doneWritingSentinel ?
- ValueTask.FromException(parent._doneWriting) :
- default;
- }
-
- // If we're able to use the singleton waiter, do so.
- if (!cancellationToken.CanBeCanceled)
- {
- AsyncOperation singleton = _waiterSingleton;
- if (singleton.TryOwnAndReset())
- {
- ChannelUtilities.QueueWaiter(ref parent._waitingReadersTail, singleton);
- return singleton.ValueTaskOfT;
- }
- }
-
- // Otherwise, create and queue a waiter.
- var waiter = new AsyncOperation(parent._runContinuationsAsynchronously, cancellationToken);
- ChannelUtilities.QueueWaiter(ref parent._waitingReadersTail, waiter);
- return waiter.ValueTaskOfT;
- }
- }
-
- /// Gets an enumerator the debugger can use to show the contents of the channel.
- IEnumerator IDebugEnumerable.GetEnumerator() => _parent.GetEnumerator();
- }
-
- [DebuggerDisplay("Items = {ItemsCountForDebugger}")]
- [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))]
- private sealed class UnboundedPrioritizedChannelWriter : ChannelWriter, IDebugEnumerable
- {
- internal readonly UnboundedPrioritizedChannel _parent;
- internal UnboundedPrioritizedChannelWriter(UnboundedPrioritizedChannel parent) => _parent = parent;
-
- public override bool TryComplete(Exception? error)
- {
- UnboundedPrioritizedChannel parent = _parent;
- bool completeTask;
-
- lock (parent.SyncObj)
- {
- parent.AssertInvariants();
-
- // If we've already marked the channel as completed, bail.
- if (parent._doneWriting != null)
- {
- return false;
- }
-
- // Mark that we're done writing.
- parent._doneWriting = error ?? ChannelUtilities.s_doneWritingSentinel;
- completeTask = parent._items.Count == 0;
- }
-
- // If there are no items in the queue, complete the channel's task,
- // as no more data can possibly arrive at this point. We do this outside
- // of the lock in case we'll be running synchronous completions, and we
- // do it before completing blocked/waiting readers, so that when they
- // wake up they'll see the task as being completed.
- if (completeTask)
- {
- ChannelUtilities.Complete(parent._completion, error);
- }
-
- // At this point, _blockedReaders and _waitingReaders will not be mutated:
- // they're only mutated by readers while holding the lock, and only if _doneWriting is null.
- // freely manipulate _blockedReaders and _waitingReaders without any concurrency concerns.
- ChannelUtilities.FailOperations, T>(parent._blockedReaders, ChannelUtilities.CreateInvalidCompletionException(error));
- ChannelUtilities.WakeUpWaiters(ref parent._waitingReadersTail, result: false, error: error);
-
- // Successfully transitioned to completed.
- return true;
- }
-
- public override bool TryWrite(T item)
- {
- UnboundedPrioritizedChannel parent = _parent;
- while (true)
- {
- AsyncOperation? blockedReader = null;
- AsyncOperation? waitingReadersTail = null;
- lock (parent.SyncObj)
- {
- // If writing has already been marked as done, fail the write.
- parent.AssertInvariants();
- if (parent._doneWriting != null)
- {
- return false;
- }
-
- // If there aren't any blocked readers, just add the data to the queue,
- // and let any waiting readers know that they should try to read it.
- // We can only complete such waiters here under the lock if they run
- // continuations asynchronously (otherwise the synchronous continuations
- // could be invoked under the lock). If we don't complete them here, we
- // need to do so outside of the lock.
- if (parent._blockedReaders.IsEmpty)
- {
- parent._items.Enqueue(true, item);
- waitingReadersTail = parent._waitingReadersTail;
- if (waitingReadersTail == null)
- {
- return true;
- }
- parent._waitingReadersTail = null;
- }
- else
- {
- // There were blocked readers. Grab one, and then complete it outside of the lock.
- blockedReader = parent._blockedReaders.DequeueHead();
- }
- }
-
- if (blockedReader != null)
- {
- // Complete the reader. It's possible the reader was canceled, in which
- // case we loop around to try everything again.
- if (blockedReader.TrySetResult(item))
- {
- return true;
- }
- }
- else
- {
- // Wake up all of the waiters. Since we've released the lock, it's possible
- // we could cause some spurious wake-ups here, if we tell a waiter there's
- // something available but all data has already been removed. It's a benign
- // race condition, though, as consumers already need to account for such things.
- ChannelUtilities.WakeUpWaiters(ref waitingReadersTail, result: true);
- return true;
- }
- }
- }
-
- public override ValueTask WaitToWriteAsync(CancellationToken cancellationToken)
- {
- Exception? doneWriting = _parent._doneWriting;
- return
- cancellationToken.IsCancellationRequested ? ValueTask.FromCanceled(cancellationToken) :
- doneWriting == null ? new ValueTask(true) : // unbounded writing can always be done if we haven't completed
- doneWriting != ChannelUtilities.s_doneWritingSentinel ? ValueTask.FromException(doneWriting) :
- default;
- }
-
- public override ValueTask WriteAsync(T item, CancellationToken cancellationToken) =>
- cancellationToken.IsCancellationRequested ? ValueTask.FromCanceled(cancellationToken) :
- TryWrite(item) ? default :
- ValueTask.FromException(ChannelUtilities.CreateInvalidCompletionException(_parent._doneWriting));
-
- /// Gets the number of items in the channel. This should only be used by the debugger.
- private int ItemsCountForDebugger => _parent._items.Count;
-
- /// Gets an enumerator the debugger can use to show the contents of the channel.
- IEnumerator IDebugEnumerable.GetEnumerator() => _parent.GetEnumerator();
- }
-
- /// Gets the object used to synchronize access to all state on this instance.
- private object SyncObj => _items;
-
- [Conditional("DEBUG")]
- private void AssertInvariants()
- {
- Debug.Assert(SyncObj != null, "The sync obj must not be null.");
- Debug.Assert(Monitor.IsEntered(SyncObj), "Invariants can only be validated while holding the lock.");
-
- if (_items.Count != 0)
- {
- if (_runContinuationsAsynchronously)
- {
- Debug.Assert(_blockedReaders.IsEmpty, "There's data available, so there shouldn't be any blocked readers.");
- Debug.Assert(_waitingReadersTail == null, "There's data available, so there shouldn't be any waiting readers.");
- }
- Debug.Assert(!_completion.Task.IsCompleted, "We still have data available, so shouldn't be completed.");
- }
- if ((!_blockedReaders.IsEmpty || _waitingReadersTail != null) && _runContinuationsAsynchronously)
- {
- Debug.Assert(_items.Count == 0, "There are blocked/waiting readers, so there shouldn't be any data available.");
- }
- if (_completion.Task.IsCompleted)
- {
- Debug.Assert(_doneWriting != null, "We're completed, so we must be done writing.");
- }
- }
-
- /// Gets the number of items in the channel. This should only be used by the debugger.
- private int ItemsCountForDebugger => _items.Count;
-
- /// Report if the channel is closed or not. This should only be used by the debugger.
- private bool ChannelIsClosedForDebugger => _doneWriting != null;
-
- /// Gets an enumerator the debugger can use to show the contents of the channel.
- public IEnumerator GetEnumerator()
- {
- List list = [];
- foreach ((bool _, T Priority) item in _items.UnorderedItems)
- {
- list.Add(item.Priority);
- }
-
- list.Sort(_items.Comparer);
-
- return list.GetEnumerator();
- }
- }
-}