From 893024a84abd93f3bcbacc3771b104c19f5538c0 Mon Sep 17 00:00:00 2001 From: JakenVeina Date: Mon, 13 Nov 2023 18:54:09 -0600 Subject: [PATCH] Reworked the `.DisposeMany()` operator for both caches and lists to perform disposal after downstream operations have processed, rather than before, since the operator cannot guarantee that disposal is safe, when downstream consumers may still need to make use of these disposable items, in order to process removals. Also fixed a bug within both versions of the `.SubscribeMany()` operator, where errors were not being properly propagated through the stream, and instead could bubble up to stream inputs. Also fixed a bug within the cache version of `.MergeMany()` that allowed an internally-used subject to be disposed, without cleaning up all subscriptions that may later make use of it. --- .../Cache/DisposeManyFixture.cs | 147 +++++++++++++----- .../List/DisposeManyFixture.cs | 137 ++++++++++------ src/DynamicData/Cache/Internal/DisposeMany.cs | 87 +++++------ src/DynamicData/Cache/Internal/MergeMany.cs | 10 +- .../Cache/Internal/SubscribeMany.cs | 9 +- src/DynamicData/Cache/ObservableCacheEx.cs | 12 +- src/DynamicData/List/Internal/DisposeMany.cs | 74 +++++++++ .../List/Internal/SubscribeMany.cs | 9 +- src/DynamicData/List/ObservableListEx.cs | 14 +- 9 files changed, 346 insertions(+), 153 deletions(-) create mode 100644 src/DynamicData/List/Internal/DisposeMany.cs diff --git a/src/DynamicData.Tests/Cache/DisposeManyFixture.cs b/src/DynamicData.Tests/Cache/DisposeManyFixture.cs index 2a02b1827..0fbf3f7c7 100644 --- a/src/DynamicData.Tests/Cache/DisposeManyFixture.cs +++ b/src/DynamicData.Tests/Cache/DisposeManyFixture.cs @@ -1,5 +1,7 @@ using System; using System.Linq; +using System.Reactive.Linq; +using System.Reactive.Subjects; using FluentAssertions; @@ -7,81 +9,142 @@ namespace DynamicData.Tests.Cache; -public class DisposeManyFixture : IDisposable +public sealed class DisposeManyFixture : IDisposable { - private readonly ChangeSetAggregator _results; + private readonly Subject> _changeSetsSource; - private readonly ISourceCache _source; + private readonly SourceCache _itemsSource; - public DisposeManyFixture() - { - _source = new SourceCache(p => p.Id); - _results = new ChangeSetAggregator(_source.Connect().DisposeMany()); - } + private readonly ChangeSetAggregator _results; - [Fact] - public void AddWillNotCallDispose() + public DisposeManyFixture() { - _source.AddOrUpdate(new DisposableObject(1)); - - _results.Messages.Count.Should().Be(1, "Should be 1 updates"); - _results.Data.Count.Should().Be(1, "Should be 1 item in the cache"); - _results.Data.Items.First().IsDisposed.Should().Be(false, "Should not be disposed"); + _changeSetsSource = new(); + _itemsSource = new(item => item.Id); + _results = new(Observable.Merge(_changeSetsSource, _itemsSource.Connect()) + .DisposeMany() + .Do(onNext: changeSet => + { + foreach (var change in changeSet) + { + change.Current.IsDisposed.Should().BeFalse("items should not be disposed until after downstream notifications are processed"); + + if (change.Previous.HasValue) + change.Previous.Value.IsDisposed.Should().BeFalse("items should not be disposed until after downstream notifications are processed"); + } + }, + onError: _ => + { + foreach(var item in _itemsSource.Items) + item.IsDisposed.Should().BeFalse("items should not be disposed until after downstream notifications are processed"); + }, + onCompleted: () => + { + foreach(var item in _itemsSource.Items) + item.IsDisposed.Should().BeFalse("items should not be disposed until after downstream notifications are processed"); + })); } public void Dispose() { - _source.Dispose(); + _changeSetsSource.Dispose(); + _itemsSource.Dispose(); _results.Dispose(); } [Fact] - public void EverythingIsDisposedWhenStreamIsDisposed() + public void ItemsAreDisposedAfterRemovalOrReplacement() { - _source.AddOrUpdate(Enumerable.Range(1, 10).Select(i => new DisposableObject(i))); - _source.Clear(); - - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - _results.Messages[1].All(d => d.Current.IsDisposed).Should().BeTrue(); + var items = new[] + { + new DisposableObject(1), + new DisposableObject(2), + new DisposableObject(3), + new DisposableObject(4), + new DisposableObject(5), + new DisposableObject(1), + new DisposableObject(6), + new DisposableObject(7), + new DisposableObject(8) + }; + + // Exercise a variety of types of changesets. + _itemsSource.AddOrUpdate(items[0]); // Single add + _itemsSource.AddOrUpdate(items[1..5]); // Range add + _itemsSource.AddOrUpdate(items[5]); // Replace + _itemsSource.AddOrUpdate(items[5]); // Redundant update + _itemsSource.RemoveKey(4); // Single remove + _itemsSource.RemoveKeys(new[] { 1, 2 }); // Range remove + _itemsSource.Clear(); // Clear + _itemsSource.AddOrUpdate(items[6..9]); + _changeSetsSource.OnNext(new ChangeSet() // Refresh + { + new Change( + reason: ChangeReason.Refresh, + key: _itemsSource.Items.First().Id, + current: _itemsSource.Items.First()) + }); + _changeSetsSource.OnNext(new ChangeSet() // Move + { + new Change( + key: _itemsSource.Items.First().Id, + current: _itemsSource.Items.First(), + currentIndex: 1, + previousIndex: 0) + }); + + _results.Error.Should().BeNull(); + _results.Messages.Count.Should().Be(10, "10 updates were made to the source"); + _results.Data.Count.Should().Be(3, "3 items were not removed from the list"); + _results.Data.Items.All(item => item.IsDisposed).Should().BeFalse("items remaining in the list should not be disposed"); + items.Except(_results.Data.Items).All(item => item.IsDisposed).Should().BeTrue("items removed from the list should be disposed"); } [Fact] - public void RemoveWillCallDispose() + public void RemainingItemsAreDisposedAfterCompleted() { - _source.AddOrUpdate(new DisposableObject(1)); - _source.Remove(1); - - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - _results.Data.Count.Should().Be(0, "Should be 0 items in the cache"); - _results.Messages[1].First().Current.IsDisposed.Should().Be(true, "Should be disposed"); + _itemsSource.AddOrUpdate(new[] + { + new DisposableObject(1), + new DisposableObject(2), + new DisposableObject(3) + }); + + _itemsSource.Dispose(); + _changeSetsSource.OnCompleted(); + + _results.Error.Should().BeNull(); + _results.Messages.Count.Should().Be(1, "1 update was made to the source"); + _results.Data.Count.Should().Be(3, "3 items were not removed from the list"); + _results.Data.Items.All(item => item.IsDisposed).Should().BeTrue("Items remaining in the list should be disposed"); } [Fact] - public void UpdateWillCallDispose() + public void RemainingItemsAreDisposedAfterError() { - _source.AddOrUpdate(new DisposableObject(1)); - _source.AddOrUpdate(new DisposableObject(1)); + _itemsSource.AddOrUpdate(new DisposableObject(1)); + + var error = new Exception("Test Exception"); + _changeSetsSource.OnError(error); + + _itemsSource.AddOrUpdate(new DisposableObject(2)); - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - _results.Data.Count.Should().Be(1, "Should be 1 items in the cache"); - _results.Messages[1].First().Current.IsDisposed.Should().Be(false, "Current should not be disposed"); - _results.Messages[1].First().Previous.Value.IsDisposed.Should().Be(true, "Previous should be disposed"); + _results.Error.Should().Be(error); + _results.Messages.Count.Should().Be(1, "1 update was made to the source"); + _results.Data.Count.Should().Be(1, "1 item was not removed from the list"); + _results.Data.Items.All(item => item.IsDisposed).Should().BeTrue("items remaining in the list should be disposed"); } private class DisposableObject : IDisposable { public DisposableObject(int id) - { - Id = id; - } + => Id = id; public int Id { get; private set; } public bool IsDisposed { get; private set; } public void Dispose() - { - IsDisposed = true; - } + => IsDisposed = true; } } diff --git a/src/DynamicData.Tests/List/DisposeManyFixture.cs b/src/DynamicData.Tests/List/DisposeManyFixture.cs index 0c4542750..99af5832f 100644 --- a/src/DynamicData.Tests/List/DisposeManyFixture.cs +++ b/src/DynamicData.Tests/List/DisposeManyFixture.cs @@ -1,5 +1,8 @@ using System; using System.Linq; +using System.Reactive; +using System.Reactive.Linq; +using System.Reactive.Subjects; using FluentAssertions; @@ -7,84 +10,128 @@ namespace DynamicData.Tests.List; -public class DisposeManyFixture : IDisposable +public sealed class DisposeManyFixture : IDisposable { - private readonly ChangeSetAggregator _results; + private readonly Subject> _changeSetsSource; - private readonly ISourceList _source; + private readonly SourceList _itemsSource; - public DisposeManyFixture() - { - _source = new SourceList(); - _results = new ChangeSetAggregator(_source.Connect().DisposeMany()); - } + private readonly ChangeSetAggregator _results; - [Fact] - public void AddWillNotCallDispose() + public DisposeManyFixture() { - _source.Add(new DisposableObject(1)); - - _results.Messages.Count.Should().Be(1, "Should be 1 updates"); - _results.Data.Count.Should().Be(1, "Should be 1 item in the cache"); - _results.Data.Items.First().IsDisposed.Should().Be(false, "Should not be disposed"); + _changeSetsSource = new(); + _itemsSource = new(); + _results = new(Observable.Merge(_changeSetsSource, _itemsSource.Connect()) + .DisposeMany() + .Do(onNext: changeSet => + { + foreach (var change in changeSet) + { + if (change.Item.Current is not null) + change.Item.Current.IsDisposed.Should().BeFalse("items should not be disposed until after downstream notifications are processed"); + + if (change.Item.Previous.HasValue) + change.Item.Previous.Value.IsDisposed.Should().BeFalse("items should not be disposed until after downstream notifications are processed"); + + if (change.Range is not null) + foreach (var item in change.Range) + item.IsDisposed.Should().BeFalse("items should not be disposed until after downstream notifications are processed"); + } + }, + onError: _ => + { + foreach(var item in _itemsSource.Items) + item.IsDisposed.Should().BeFalse("items should not be disposed until after downstream notifications are processed"); + }, + onCompleted: () => + { + foreach(var item in _itemsSource.Items) + item.IsDisposed.Should().BeFalse("items should not be disposed until after downstream notifications are processed"); + })); } public void Dispose() { - _source.Dispose(); + _changeSetsSource.Dispose(); + _itemsSource.Dispose(); _results.Dispose(); } [Fact] - public void EverythingIsDisposedWhenStreamIsDisposed() + public void ItemsAreDisposedAfterRemovalOrReplacement() { - var toadd = Enumerable.Range(1, 10).Select(i => new DisposableObject(i)); - _source.AddRange(toadd); - _source.Clear(); - - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - - var itemsCleared = _results.Messages[1].First().Range; - itemsCleared.All(d => d.IsDisposed).Should().BeTrue(); + var items = Enumerable.Range(1, 10) + .Select(id => new DisposableObject(id)) + .ToArray(); + + // Exercise a variety of types of changesets. + _itemsSource.Add(items[0]); // Trivial single add + _itemsSource.AddRange(items[1..3]); // Trivial range add + _itemsSource.Insert(index: 1, item: items[3]); // Non-trivial single add + _itemsSource.InsertRange(index: 2, items: items[4..6]); // Non-trivial range add + _itemsSource.RemoveAt(index: 3); // Single remove + _itemsSource.RemoveRange(index: 2, count: 2); // Range remove + _itemsSource.ReplaceAt(index: 1, item: items[6]); // Replace + _itemsSource.Move(1, 0); // Move + _itemsSource.Clear(); // Clear + _itemsSource.AddRange(items[7..10]); + _changeSetsSource.OnNext(new ChangeSet() // Refresh + { + new(ListChangeReason.Refresh, current: _itemsSource.Items.First(), index: 0) + }); + + _results.Exception.Should().BeNull(); + _results.Messages.Count.Should().Be(11, "11 updates were made to the source"); + _results.Data.Count.Should().Be(3, "3 items were not removed from the list"); + _results.Data.Items.All(item => item.IsDisposed).Should().BeFalse("items remaining in the list should not be disposed"); + items.Except(_results.Data.Items).All(item => item.IsDisposed).Should().BeTrue("items removed from the list should be disposed"); } [Fact] - public void RemoveWillCallDispose() + public void RemainingItemsAreDisposedAfterCompleted() { - _source.Add(new DisposableObject(1)); - _source.RemoveAt(0); - - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - _results.Data.Count.Should().Be(0, "Should be 0 items in the cache"); - _results.Messages[1].First().Item.Current.IsDisposed.Should().Be(true, "Should be disposed"); + _itemsSource.AddRange(new[] + { + new DisposableObject(1), + new DisposableObject(2), + new DisposableObject(3), + }); + _itemsSource.Dispose(); + _changeSetsSource.OnCompleted(); + + _results.Exception.Should().BeNull(); + _results.Messages.Count.Should().Be(1, "1 update was made to the list"); + _results.Data.Count.Should().Be(3, "3 items were not removed from the list"); + _results.Data.Items.All(item => item.IsDisposed).Should().BeTrue("items remaining in the list should be disposed"); } [Fact] - public void UpdateWillCallDispose() + public void RemainingItemsAreDisposedAfterError() { - _source.Add(new DisposableObject(1)); - _source.ReplaceAt(0, new DisposableObject(1)); + _itemsSource.Add(new(1)); + + var error = new Exception("Test Exception"); + _changeSetsSource.OnError(error); + + _itemsSource.Add(new(2)); - _results.Messages.Count.Should().Be(2, "Should be 2 updates"); - _results.Data.Count.Should().Be(1, "Should be 1 items in the cache"); - _results.Messages[1].First().Item.Current.IsDisposed.Should().Be(false, "Current should not be disposed"); - _results.Messages[1].First().Item.Previous.Value.IsDisposed.Should().Be(true, "Previous should be disposed"); + _results.Exception.Should().Be(error); + _results.Messages.Count.Should().Be(1, "1 update was made to the list"); + _results.Data.Count.Should().Be(1, "1 item was not removed from the list"); + _results.Data.Items.All(item => item.IsDisposed).Should().BeTrue("Items remaining in the list should be disposed"); } private class DisposableObject : IDisposable { public DisposableObject(int id) - { - Id = id; - } + => Id = id; public int Id { get; private set; } public bool IsDisposed { get; private set; } public void Dispose() - { - IsDisposed = true; - } + => IsDisposed = true; } } diff --git a/src/DynamicData/Cache/Internal/DisposeMany.cs b/src/DynamicData/Cache/Internal/DisposeMany.cs index 2b0091c58..e9b8f1c5f 100644 --- a/src/DynamicData/Cache/Internal/DisposeMany.cs +++ b/src/DynamicData/Cache/Internal/DisposeMany.cs @@ -2,69 +2,64 @@ // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. -using System; -using System.Reactive.Disposables; +using System.Reactive; using System.Reactive.Linq; -using DynamicData.Kernel; - namespace DynamicData.Cache.Internal; internal sealed class DisposeMany where TObject : notnull where TKey : notnull { - private readonly Action _removeAction; - private readonly IObservable> _source; - public DisposeMany(IObservable> source, Action removeAction) - { - _source = source ?? throw new ArgumentNullException(nameof(source)); - _removeAction = removeAction ?? throw new ArgumentNullException(nameof(removeAction)); - } + public DisposeMany(IObservable> source) + => _source = source; public IObservable> Run() - { - return Observable.Create>( - observer => - { - var locker = new object(); - var cache = new Cache(); - var subscriber = _source.Synchronize(locker).Do(changes => RegisterForRemoval(changes, cache), observer.OnError).SubscribeSafe(observer); + => Observable.Create>(observer => + { + var cachedItems = new Cache(); - return Disposable.Create( - () => - { - subscriber.Dispose(); + return _source.SubscribeSafe(Observer.Create>( + onNext: changeSet => + { + observer.OnNext(changeSet); - lock (locker) + foreach (var change in changeSet.ToConcreteType()) + { + switch (change.Reason) { - cache.Items.ForEach(t => _removeAction(t)); - cache.Clear(); + case ChangeReason.Update: + if (change.Previous.HasValue && !EqualityComparer.Default.Equals(change.Current, change.Previous.Value)) + (change.Previous.Value as IDisposable)?.Dispose(); + break; + + case ChangeReason.Remove: + (change.Current as IDisposable)?.Dispose(); + break; } - }); - }); - } + } - private void RegisterForRemoval(IChangeSet changes, Cache cache) - { - changes.ToConcreteType().ForEach( - change => - { - switch (change.Reason) + cachedItems.Clone(changeSet); + }, + onError: error => { - case ChangeReason.Update: - // ReSharper disable once InconsistentlySynchronizedField - change.Previous.IfHasValue(t => _removeAction(t)); - break; + observer.OnError(error); + + foreach (var item in cachedItems.Items) + (item as IDisposable)?.Dispose(); + + cachedItems.Clear(); + }, + onCompleted: () => + { + observer.OnCompleted(); + + foreach (var item in cachedItems.Items) + (item as IDisposable)?.Dispose(); - case ChangeReason.Remove: - // ReSharper disable once InconsistentlySynchronizedField - _removeAction(change.Current); - break; - } - }); - cache.Clone(changes); - } + cachedItems.Clear(); + })); + }); } diff --git a/src/DynamicData/Cache/Internal/MergeMany.cs b/src/DynamicData/Cache/Internal/MergeMany.cs index bcdc17076..0c2ec85f8 100644 --- a/src/DynamicData/Cache/Internal/MergeMany.cs +++ b/src/DynamicData/Cache/Internal/MergeMany.cs @@ -68,7 +68,15 @@ private sealed class SubscriptionCounter : IDisposable public void Finally() => CheckCompleted(); - public void Dispose() => _subject.Dispose(); + public void Dispose() + { + if (Interlocked.Exchange(ref _subscriptionCount, 0) != 0) + { + _subject.OnCompleted(); + } + + _subject.Dispose(); + } private void CheckCompleted() { diff --git a/src/DynamicData/Cache/Internal/SubscribeMany.cs b/src/DynamicData/Cache/Internal/SubscribeMany.cs index ceba68998..b3c7aa240 100644 --- a/src/DynamicData/Cache/Internal/SubscribeMany.cs +++ b/src/DynamicData/Cache/Internal/SubscribeMany.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for full license information. using System; +using System.Reactive; using System.Reactive.Disposables; using System.Reactive.Linq; @@ -39,7 +40,13 @@ public IObservable> Run() observer => { var published = _source.Publish(); - var subscriptions = published.Transform((t, k) => _subscriptionFactory(t, k)).DisposeMany().Subscribe(); + var subscriptions = published + .Transform((t, k) => _subscriptionFactory(t, k)) + .DisposeMany() + .SubscribeSafe(Observer.Create>( + onNext: static _ => { }, + onError: observer.OnError, + onCompleted: static () => { })); return new CompositeDisposable(subscriptions, published.SubscribeSafe(observer), published.Connect()); }); diff --git a/src/DynamicData/Cache/ObservableCacheEx.cs b/src/DynamicData/Cache/ObservableCacheEx.cs index 368f490f1..7ea734875 100644 --- a/src/DynamicData/Cache/ObservableCacheEx.cs +++ b/src/DynamicData/Cache/ObservableCacheEx.cs @@ -1161,8 +1161,8 @@ public static IObservable> DeferUntilLoaded /// Disposes each item when no longer required. /// - /// Individual items are disposed when removed or replaced. All items - /// are disposed when the stream is disposed. + /// Individual items are disposed after removal or replacement changes have been sent downstream. + /// All items previously-published on the stream are disposed after the stream finalizes. /// /// The type of the object. /// The type of the key. @@ -1178,13 +1178,7 @@ public static IObservable> DisposeMany( throw new ArgumentNullException(nameof(source)); } - return new DisposeMany( - source, - t => - { - var d = t as IDisposable; - d?.Dispose(); - }).Run(); + return new DisposeMany(source).Run(); } /// diff --git a/src/DynamicData/List/Internal/DisposeMany.cs b/src/DynamicData/List/Internal/DisposeMany.cs new file mode 100644 index 000000000..675608b4a --- /dev/null +++ b/src/DynamicData/List/Internal/DisposeMany.cs @@ -0,0 +1,74 @@ +// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.Reactive; +using System.Reactive.Linq; + +namespace DynamicData.List.Internal; + +internal sealed class DisposeMany + where T : notnull +{ + private readonly IObservable> _source; + + public DisposeMany(IObservable> source) + => _source = source; + + public IObservable> Run() + => Observable.Create>(observer => + { + var cachedItems = new ChangeAwareList(); + + return _source.SubscribeSafe(Observer.Create>( + onNext: changeSet => + { + observer.OnNext(changeSet); + + foreach (var change in changeSet) + { + switch (change.Reason) + { + case ListChangeReason.Clear: + foreach (var item in cachedItems) + (item as IDisposable)?.Dispose(); + break; + + case ListChangeReason.Remove: + (change.Item.Current as IDisposable)?.Dispose(); + break; + + case ListChangeReason.RemoveRange: + foreach (var item in change.Range) + (item as IDisposable)?.Dispose(); + break; + + case ListChangeReason.Replace: + if (change.Item.Previous.HasValue) + (change.Item.Previous.Value as IDisposable)?.Dispose(); + break; + } + } + + cachedItems.Clone(changeSet); + }, + onError: error => + { + observer.OnError(error); + + foreach (var item in cachedItems) + (item as IDisposable)?.Dispose(); + + cachedItems.Clear(); + }, + onCompleted: () => + { + observer.OnCompleted(); + + foreach (var item in cachedItems) + (item as IDisposable)?.Dispose(); + + cachedItems.Clear(); + })); + }); +} diff --git a/src/DynamicData/List/Internal/SubscribeMany.cs b/src/DynamicData/List/Internal/SubscribeMany.cs index ce2605362..7daa4650e 100644 --- a/src/DynamicData/List/Internal/SubscribeMany.cs +++ b/src/DynamicData/List/Internal/SubscribeMany.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for full license information. using System; +using System.Reactive; using System.Reactive.Disposables; using System.Reactive.Linq; @@ -27,7 +28,13 @@ public IObservable> Run() observer => { var shared = _source.Publish(); - var subscriptions = shared.Transform(t => _subscriptionFactory(t)).DisposeMany().Subscribe(); + var subscriptions = shared + .Transform(t => _subscriptionFactory(t)) + .DisposeMany() + .SubscribeSafe(Observer.Create>( + onNext: static _ => { }, + onError: observer.OnError, + onCompleted: static () => { })); return new CompositeDisposable(subscriptions, shared.SubscribeSafe(observer), shared.Connect()); }); diff --git a/src/DynamicData/List/ObservableListEx.cs b/src/DynamicData/List/ObservableListEx.cs index 67520adae..5e9041874 100644 --- a/src/DynamicData/List/ObservableListEx.cs +++ b/src/DynamicData/List/ObservableListEx.cs @@ -621,8 +621,8 @@ public static IObservable> DeferUntilLoaded(this IObservableLis /// /// Disposes each item when no longer required. /// - /// Individual items are disposed when removed or replaced. All items - /// are disposed when the stream is disposed. + /// Individual items are disposed after removal or replacement changes have been sent downstream. + /// All items previously-published on the stream are disposed after the stream finalizes. /// /// The type of the object. /// The source. @@ -631,12 +631,10 @@ public static IObservable> DeferUntilLoaded(this IObservableLis public static IObservable> DisposeMany(this IObservable> source) where T : notnull { - return source.OnItemRemoved( - t => - { - var d = t as IDisposable; - d?.Dispose(); - }); + if (source is null) + throw new ArgumentNullException(nameof(source)); + + return new DisposeMany(source).Run(); } ///