diff --git a/Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/GroupByCompletion.cs b/Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/GroupByCompletion.cs
new file mode 100644
index 000000000..9ecba76c5
--- /dev/null
+++ b/Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/GroupByCompletion.cs
@@ -0,0 +1,71 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT License.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Reactive.Linq;
+
+using BenchmarkDotNet.Attributes;
+
+namespace Benchmarks.System.Reactive
+{
+ ///
+ /// Completion of a wide fan-out/in scenario.
+ ///
+ ///
+ ///
+ /// This was added to address https://github.com/dotnet/reactive/issues/2005 in which completion
+ /// takes longer and longer to handle as the number of groups increases.
+ ///
+ ///
+ /// The queries in this benchmark represent the common 'fan out/in' pattern in Rx. It is often
+ /// useful to split a stream into groups to enable per-group processing, and then to recombine
+ /// the data back into a single stream. These benchmarks don't do any per-group processing, so
+ /// they might look pointless, but we're trying to measure the minimum unavoidable overhead
+ /// that any code using this technique will encounter.
+ ///
+ ///
+ [MemoryDiagnoser]
+ public class GroupByCompletion
+ {
+ private IObservable observable;
+
+ [Params(200_000, 1_000_000)]
+ public int NumberOfSamples { get; set; }
+
+ [Params(10, 100, 1_000, 10_000, 100_000, 150_000, 200_000)]
+ public int NumberOfGroups { get; set; }
+
+ [GlobalSetup]
+ public void GlobalSetup()
+ {
+ var data = new int[NumberOfSamples];
+ for (var i = 0; i < data.Length; ++i)
+ {
+ data[i] = i;
+ }
+
+ observable = data.ToObservable();
+ }
+
+ [Benchmark]
+ public void GroupBySelectMany()
+ {
+ var numberOfGroups = NumberOfGroups;
+
+ observable!.GroupBy(value => value % numberOfGroups)
+ .SelectMany(groupOfInts => groupOfInts)
+ .Subscribe(intValue => { });
+ }
+
+ [Benchmark]
+ public void GroupByMerge()
+ {
+ var numberOfGroups = NumberOfGroups;
+
+ observable!.GroupBy(value => value % numberOfGroups)
+ .Merge()
+ .Subscribe(intValue => { });
+ }
+ }
+}
diff --git a/Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/Program.cs b/Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/Program.cs
index 135fb0138..9d8dce558 100644
--- a/Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/Program.cs
+++ b/Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/Program.cs
@@ -27,7 +27,8 @@ private static void Main()
typeof(ScalarScheduleBenchmark),
typeof(StableCompositeDisposableBenchmark),
typeof(SubjectBenchmark),
- typeof(ComparisonAsyncBenchmark)
+ typeof(ComparisonAsyncBenchmark),
+ typeof(GroupByCompletion)
#if (CURRENT)
,typeof(AppendPrependBenchmark)
,typeof(PrependVsStartWtihBenchmark)
diff --git a/Rx.NET/Source/src/System.Reactive/Disposables/CompositeDisposable.cs b/Rx.NET/Source/src/System.Reactive/Disposables/CompositeDisposable.cs
index a0af531f8..99ab6437d 100644
--- a/Rx.NET/Source/src/System.Reactive/Disposables/CompositeDisposable.cs
+++ b/Rx.NET/Source/src/System.Reactive/Disposables/CompositeDisposable.cs
@@ -4,6 +4,7 @@
using System.Collections;
using System.Collections.Generic;
+using System.Linq;
using System.Threading;
namespace System.Reactive.Disposables
@@ -16,10 +17,17 @@ public sealed class CompositeDisposable : ICollection, ICancelable
{
private readonly object _gate = new();
private bool _disposed;
- private List _disposables;
+ private object _disposables;
private int _count;
private const int ShrinkThreshold = 64;
+ // The maximum number of items to keep in a list before switching to a dictionary.
+ // Issue https://github.com/dotnet/reactive/issues/2005 reported that when a SelectMany
+ // observes large numbers (1000s) of observables, the CompositeDisposable it uses to
+ // keep track of all of the inner observables it creates becomes a bottleneck when the
+ // subscription completes.
+ private const int MaximumLinearSearchThreshold = 1024;
+
// Default initial capacity of the _disposables list in case
// The number of items is not known upfront
private const int DefaultCapacity = 16;
@@ -29,7 +37,7 @@ public sealed class CompositeDisposable : ICollection, ICancelable
///
public CompositeDisposable()
{
- _disposables = [];
+ _disposables = new List();
}
///
@@ -60,11 +68,11 @@ public CompositeDisposable(params IDisposable[] disposables)
throw new ArgumentNullException(nameof(disposables));
}
- _disposables = ToList(disposables);
+ (_disposables, _) = ToListOrDictionary(disposables);
// _count can be read by other threads and thus should be properly visible
// also releases the _disposables contents so it becomes thread-safe
- Volatile.Write(ref _count, _disposables.Count);
+ Volatile.Write(ref _count, disposables.Length);
}
///
@@ -80,14 +88,14 @@ public CompositeDisposable(IEnumerable disposables)
throw new ArgumentNullException(nameof(disposables));
}
- _disposables = ToList(disposables);
+ (_disposables, var count) = ToListOrDictionary(disposables);
// _count can be read by other threads and thus should be properly visible
// also releases the _disposables contents so it becomes thread-safe
- Volatile.Write(ref _count, _disposables.Count);
+ Volatile.Write(ref _count, count);
}
- private static List ToList(IEnumerable disposables)
+ private static (object Collection, int Count) ToListOrDictionary(IEnumerable disposables)
{
var capacity = disposables switch
{
@@ -96,6 +104,26 @@ public CompositeDisposable(IEnumerable disposables)
_ => DefaultCapacity
};
+ if (capacity > MaximumLinearSearchThreshold)
+ {
+ var dictionary = new Dictionary(capacity);
+ var disposableCount = 0;
+ foreach (var d in disposables)
+ {
+ if (d == null)
+ {
+ throw new ArgumentException(Strings_Core.DISPOSABLES_CANT_CONTAIN_NULL, nameof(disposables));
+ }
+
+ dictionary.TryGetValue(d, out var thisDisposableCount);
+ dictionary[d] = thisDisposableCount + 1;
+
+ disposableCount += 1;
+ }
+
+ return (dictionary, disposableCount);
+ }
+
var list = new List(capacity);
// do the copy and null-check in one step to avoid a
@@ -110,7 +138,14 @@ public CompositeDisposable(IEnumerable disposables)
list.Add(d);
}
- return list;
+ if (list.Count > MaximumLinearSearchThreshold)
+ {
+ // We end up here if we didn't know the count up front because it's an
+ // IEnumerable and not an ICollection, and it then turns out that
+ // the number of items exceeds our maximum tolerance for linear search.
+ }
+
+ return (list, list.Count);
}
///
@@ -134,7 +169,37 @@ public void Add(IDisposable item)
{
if (!_disposed)
{
- _disposables.Add(item);
+ if (_disposables is List listDisposables)
+ {
+ listDisposables.Add(item);
+
+ // Once we get to thousands of items (which happens with wide fan-out/in configurations)
+ // the cost of linear search becomes too high. We switch to a dictionary at that point.
+ // See https://github.com/dotnet/reactive/issues/2005
+ if (listDisposables.Count > MaximumLinearSearchThreshold)
+ {
+ // If we've blown through this threshold, chances are there's more to come,
+ // so allocate some more spare capacity.
+ var dictionary = new Dictionary(listDisposables.Count + (listDisposables.Count / 4));
+ foreach (var d in listDisposables)
+ {
+ if (d is not null)
+ {
+ dictionary.TryGetValue(d, out var thisDisposableCount);
+ dictionary[d] = thisDisposableCount + 1;
+ }
+ }
+
+ _disposables = dictionary;
+ }
+
+ }
+ else
+ {
+ var dictionaryDisposables = (Dictionary)_disposables;
+ dictionaryDisposables.TryGetValue(item, out var thisDisposableCount);
+ dictionaryDisposables[item] = thisDisposableCount + 1;
+ }
// If read atomically outside the lock, it should be written atomically inside
// the plain read on _count is fine here because manipulation always happens
@@ -180,28 +245,49 @@ public bool Remove(IDisposable item)
// read fields as infrequently as possible
var current = _disposables;
- var i = current.IndexOf(item);
- if (i < 0)
+ if (current is List currentList)
{
- // not found, just return
- return false;
- }
-
- current[i] = null;
+ var i = currentList.IndexOf(item);
+ if (i < 0)
+ {
+ // not found, just return
+ return false;
+ }
- if (current.Capacity > ShrinkThreshold && _count < current.Capacity / 2)
- {
- var fresh = new List(current.Capacity / 2);
+ currentList[i] = null;
- foreach (var d in current)
+ if (currentList.Capacity > ShrinkThreshold && _count < currentList.Capacity / 2)
{
- if (d != null)
+ var fresh = new List(currentList.Capacity / 2);
+
+ foreach (var d in currentList)
{
- fresh.Add(d);
+ if (d != null)
+ {
+ fresh.Add(d);
+ }
}
+
+ _disposables = fresh;
+ }
+ }
+ else
+ {
+ var dictionaryDisposables = (Dictionary)_disposables;
+ if (!dictionaryDisposables.TryGetValue(item, out var thisDisposableCount))
+ {
+ return false;
}
- _disposables = fresh;
+ thisDisposableCount -= 1;
+ if (thisDisposableCount == 0)
+ {
+ dictionaryDisposables.Remove(item);
+ }
+ else
+ {
+ dictionaryDisposables[item] = thisDisposableCount;
+ }
}
// make sure the Count property sees an atomic update
@@ -221,13 +307,15 @@ public bool Remove(IDisposable item)
///
public void Dispose()
{
- List? currentDisposables = null;
+ List? currentDisposablesList = null;
+ Dictionary? currentDisposablesDictionary = null;
lock (_gate)
{
if (!_disposed)
{
- currentDisposables = _disposables;
+ currentDisposablesList = _disposables as List;
+ currentDisposablesDictionary = _disposables as Dictionary;
// nulling out the reference is faster no risk to
// future Add/Remove because _disposed will be true
@@ -239,13 +327,24 @@ public void Dispose()
}
}
- if (currentDisposables != null)
+ if (currentDisposablesList is not null)
{
- foreach (var d in currentDisposables)
+ foreach (var d in currentDisposablesList)
{
+ // Although we don't all nulls in from the outside, we implement Remove
+ // by setting entries to null, and shrinking the list if it gets too sparse.
+ // So some entries may be null.
d?.Dispose();
}
}
+
+ if (currentDisposablesDictionary is not null)
+ {
+ foreach (var kv in currentDisposablesDictionary)
+ {
+ kv.Key.Dispose();
+ }
+ }
}
///
@@ -265,8 +364,18 @@ public void Clear()
var current = _disposables;
- previousDisposables = current.ToArray();
- current.Clear();
+ if (current is List currentList)
+ {
+ previousDisposables = currentList.ToArray();
+ currentList.Clear();
+ }
+ else
+ {
+ var currentDictionary = (Dictionary)current;
+ previousDisposables = new IDisposable[currentDictionary.Count];
+ currentDictionary.Keys.CopyTo(previousDisposables!, 0);
+ currentDictionary.Clear();
+ }
Volatile.Write(ref _count, 0);
}
@@ -297,7 +406,10 @@ public bool Contains(IDisposable item)
return false;
}
- return _disposables.Contains(item);
+ var current = _disposables;
+ return current is List list
+ ? list.Contains(item)
+ : ((Dictionary) current).ContainsKey(item);
}
}
@@ -336,12 +448,27 @@ public void CopyTo(IDisposable[] array, int arrayIndex)
}
var i = arrayIndex;
-
- foreach (var d in _disposables)
+
+ var current = _disposables;
+
+ if (current is List currentList)
+ {
+ foreach (var d in currentList)
+ {
+ if (d != null)
+ {
+ array[i++] = d;
+ }
+ }
+ }
+ else
{
- if (d != null)
+ foreach (var kv in (Dictionary)current)
{
- array[i++] = d;
+ for (var j = 0; j < kv.Value; j++)
+ {
+ array[i++] = kv.Key;
+ }
}
}
}
@@ -365,9 +492,11 @@ public IEnumerator GetEnumerator()
return EmptyEnumerator;
}
+ var current = _disposables;
+
// the copy is unavoidable but the creation
// of an outer IEnumerable is avoidable
- return new CompositeEnumerator(_disposables.ToArray());
+ return new CompositeEnumerator(current is List currentList ? currentList.ToArray() : ((Dictionary)current).Keys.ToArray());
}
}