Skip to content

Commit

Permalink
Reworked the .DisposeMany() operator for both caches and lists to p…
Browse files Browse the repository at this point in the history
…erform disposal after downstream operations have processed, rather than before, since the operator cannot guarantee that disposal is safe, when downstream consumers may still need to make use of these disposable items, in order to process removals.

Also fixed a bug within both versions of the `.SubscribeMany()` operator, where errors were not being properly propagated through the stream, and instead could bubble up to stream inputs.

Also fixed a bug within the cache version of `.MergeMany()` that allowed an internally-used subject to be disposed, without cleaning up all subscriptions that may later make use of it.
  • Loading branch information
JakenVeina committed Nov 15, 2023
1 parent c8f9693 commit 893024a
Show file tree
Hide file tree
Showing 9 changed files with 346 additions and 153 deletions.
147 changes: 105 additions & 42 deletions src/DynamicData.Tests/Cache/DisposeManyFixture.cs
Original file line number Diff line number Diff line change
@@ -1,87 +1,150 @@
using System;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;

using FluentAssertions;

using Xunit;

namespace DynamicData.Tests.Cache;

public class DisposeManyFixture : IDisposable
public sealed class DisposeManyFixture : IDisposable
{
private readonly ChangeSetAggregator<DisposableObject, int> _results;
private readonly Subject<IChangeSet<DisposableObject, int>> _changeSetsSource;

private readonly ISourceCache<DisposableObject, int> _source;
private readonly SourceCache<DisposableObject, int> _itemsSource;

public DisposeManyFixture()
{
_source = new SourceCache<DisposableObject, int>(p => p.Id);
_results = new ChangeSetAggregator<DisposableObject, int>(_source.Connect().DisposeMany());
}
private readonly ChangeSetAggregator<DisposableObject, int> _results;

[Fact]
public void AddWillNotCallDispose()
public DisposeManyFixture()
{
_source.AddOrUpdate(new DisposableObject(1));

_results.Messages.Count.Should().Be(1, "Should be 1 updates");
_results.Data.Count.Should().Be(1, "Should be 1 item in the cache");
_results.Data.Items.First().IsDisposed.Should().Be(false, "Should not be disposed");
_changeSetsSource = new();
_itemsSource = new(item => item.Id);
_results = new(Observable.Merge(_changeSetsSource, _itemsSource.Connect())
.DisposeMany()
.Do(onNext: changeSet =>
{
foreach (var change in changeSet)
{
change.Current.IsDisposed.Should().BeFalse("items should not be disposed until after downstream notifications are processed");

if (change.Previous.HasValue)
change.Previous.Value.IsDisposed.Should().BeFalse("items should not be disposed until after downstream notifications are processed");
}
},
onError: _ =>
{
foreach(var item in _itemsSource.Items)
item.IsDisposed.Should().BeFalse("items should not be disposed until after downstream notifications are processed");
},
onCompleted: () =>
{
foreach(var item in _itemsSource.Items)
item.IsDisposed.Should().BeFalse("items should not be disposed until after downstream notifications are processed");
}));
}

public void Dispose()
{
_source.Dispose();
_changeSetsSource.Dispose();
_itemsSource.Dispose();
_results.Dispose();
}

[Fact]
public void EverythingIsDisposedWhenStreamIsDisposed()
public void ItemsAreDisposedAfterRemovalOrReplacement()
{
_source.AddOrUpdate(Enumerable.Range(1, 10).Select(i => new DisposableObject(i)));
_source.Clear();

_results.Messages.Count.Should().Be(2, "Should be 2 updates");
_results.Messages[1].All(d => d.Current.IsDisposed).Should().BeTrue();
var items = new[]
{
new DisposableObject(1),
new DisposableObject(2),
new DisposableObject(3),
new DisposableObject(4),
new DisposableObject(5),
new DisposableObject(1),
new DisposableObject(6),
new DisposableObject(7),
new DisposableObject(8)
};

// Exercise a variety of types of changesets.
_itemsSource.AddOrUpdate(items[0]); // Single add
_itemsSource.AddOrUpdate(items[1..5]); // Range add
_itemsSource.AddOrUpdate(items[5]); // Replace
_itemsSource.AddOrUpdate(items[5]); // Redundant update
_itemsSource.RemoveKey(4); // Single remove
_itemsSource.RemoveKeys(new[] { 1, 2 }); // Range remove
_itemsSource.Clear(); // Clear
_itemsSource.AddOrUpdate(items[6..9]);
_changeSetsSource.OnNext(new ChangeSet<DisposableObject, int>() // Refresh
{
new Change<DisposableObject, int>(
reason: ChangeReason.Refresh,
key: _itemsSource.Items.First().Id,
current: _itemsSource.Items.First())
});
_changeSetsSource.OnNext(new ChangeSet<DisposableObject, int>() // Move
{
new Change<DisposableObject, int>(
key: _itemsSource.Items.First().Id,
current: _itemsSource.Items.First(),
currentIndex: 1,
previousIndex: 0)
});

_results.Error.Should().BeNull();
_results.Messages.Count.Should().Be(10, "10 updates were made to the source");
_results.Data.Count.Should().Be(3, "3 items were not removed from the list");
_results.Data.Items.All(item => item.IsDisposed).Should().BeFalse("items remaining in the list should not be disposed");
items.Except(_results.Data.Items).All(item => item.IsDisposed).Should().BeTrue("items removed from the list should be disposed");
}

[Fact]
public void RemoveWillCallDispose()
public void RemainingItemsAreDisposedAfterCompleted()
{
_source.AddOrUpdate(new DisposableObject(1));
_source.Remove(1);

_results.Messages.Count.Should().Be(2, "Should be 2 updates");
_results.Data.Count.Should().Be(0, "Should be 0 items in the cache");
_results.Messages[1].First().Current.IsDisposed.Should().Be(true, "Should be disposed");
_itemsSource.AddOrUpdate(new[]
{
new DisposableObject(1),
new DisposableObject(2),
new DisposableObject(3)
});

_itemsSource.Dispose();
_changeSetsSource.OnCompleted();

_results.Error.Should().BeNull();
_results.Messages.Count.Should().Be(1, "1 update was made to the source");
_results.Data.Count.Should().Be(3, "3 items were not removed from the list");
_results.Data.Items.All(item => item.IsDisposed).Should().BeTrue("Items remaining in the list should be disposed");
}

[Fact]
public void UpdateWillCallDispose()
public void RemainingItemsAreDisposedAfterError()
{
_source.AddOrUpdate(new DisposableObject(1));
_source.AddOrUpdate(new DisposableObject(1));
_itemsSource.AddOrUpdate(new DisposableObject(1));

var error = new Exception("Test Exception");
_changeSetsSource.OnError(error);

_itemsSource.AddOrUpdate(new DisposableObject(2));

_results.Messages.Count.Should().Be(2, "Should be 2 updates");
_results.Data.Count.Should().Be(1, "Should be 1 items in the cache");
_results.Messages[1].First().Current.IsDisposed.Should().Be(false, "Current should not be disposed");
_results.Messages[1].First().Previous.Value.IsDisposed.Should().Be(true, "Previous should be disposed");
_results.Error.Should().Be(error);
_results.Messages.Count.Should().Be(1, "1 update was made to the source");
_results.Data.Count.Should().Be(1, "1 item was not removed from the list");
_results.Data.Items.All(item => item.IsDisposed).Should().BeTrue("items remaining in the list should be disposed");
}

private class DisposableObject : IDisposable
{
public DisposableObject(int id)
{
Id = id;
}
=> Id = id;

public int Id { get; private set; }

public bool IsDisposed { get; private set; }

public void Dispose()
{
IsDisposed = true;
}
=> IsDisposed = true;
}
}
137 changes: 92 additions & 45 deletions src/DynamicData.Tests/List/DisposeManyFixture.cs
Original file line number Diff line number Diff line change
@@ -1,90 +1,137 @@
using System;
using System.Linq;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

using FluentAssertions;

using Xunit;

namespace DynamicData.Tests.List;

public class DisposeManyFixture : IDisposable
public sealed class DisposeManyFixture : IDisposable
{
private readonly ChangeSetAggregator<DisposableObject> _results;
private readonly Subject<IChangeSet<DisposableObject>> _changeSetsSource;

private readonly ISourceList<DisposableObject> _source;
private readonly SourceList<DisposableObject> _itemsSource;

public DisposeManyFixture()
{
_source = new SourceList<DisposableObject>();
_results = new ChangeSetAggregator<DisposableObject>(_source.Connect().DisposeMany());
}
private readonly ChangeSetAggregator<DisposableObject> _results;

[Fact]
public void AddWillNotCallDispose()
public DisposeManyFixture()
{
_source.Add(new DisposableObject(1));

_results.Messages.Count.Should().Be(1, "Should be 1 updates");
_results.Data.Count.Should().Be(1, "Should be 1 item in the cache");
_results.Data.Items.First().IsDisposed.Should().Be(false, "Should not be disposed");
_changeSetsSource = new();
_itemsSource = new();
_results = new(Observable.Merge(_changeSetsSource, _itemsSource.Connect())
.DisposeMany()
.Do(onNext: changeSet =>
{
foreach (var change in changeSet)
{
if (change.Item.Current is not null)
change.Item.Current.IsDisposed.Should().BeFalse("items should not be disposed until after downstream notifications are processed");

if (change.Item.Previous.HasValue)
change.Item.Previous.Value.IsDisposed.Should().BeFalse("items should not be disposed until after downstream notifications are processed");

if (change.Range is not null)
foreach (var item in change.Range)
item.IsDisposed.Should().BeFalse("items should not be disposed until after downstream notifications are processed");
}
},
onError: _ =>
{
foreach(var item in _itemsSource.Items)
item.IsDisposed.Should().BeFalse("items should not be disposed until after downstream notifications are processed");
},
onCompleted: () =>
{
foreach(var item in _itemsSource.Items)
item.IsDisposed.Should().BeFalse("items should not be disposed until after downstream notifications are processed");
}));
}

public void Dispose()
{
_source.Dispose();
_changeSetsSource.Dispose();
_itemsSource.Dispose();
_results.Dispose();
}

[Fact]
public void EverythingIsDisposedWhenStreamIsDisposed()
public void ItemsAreDisposedAfterRemovalOrReplacement()
{
var toadd = Enumerable.Range(1, 10).Select(i => new DisposableObject(i));
_source.AddRange(toadd);
_source.Clear();

_results.Messages.Count.Should().Be(2, "Should be 2 updates");

var itemsCleared = _results.Messages[1].First().Range;
itemsCleared.All(d => d.IsDisposed).Should().BeTrue();
var items = Enumerable.Range(1, 10)
.Select(id => new DisposableObject(id))
.ToArray();

// Exercise a variety of types of changesets.
_itemsSource.Add(items[0]); // Trivial single add
_itemsSource.AddRange(items[1..3]); // Trivial range add
_itemsSource.Insert(index: 1, item: items[3]); // Non-trivial single add
_itemsSource.InsertRange(index: 2, items: items[4..6]); // Non-trivial range add
_itemsSource.RemoveAt(index: 3); // Single remove
_itemsSource.RemoveRange(index: 2, count: 2); // Range remove
_itemsSource.ReplaceAt(index: 1, item: items[6]); // Replace
_itemsSource.Move(1, 0); // Move
_itemsSource.Clear(); // Clear
_itemsSource.AddRange(items[7..10]);
_changeSetsSource.OnNext(new ChangeSet<DisposableObject>() // Refresh
{
new(ListChangeReason.Refresh, current: _itemsSource.Items.First(), index: 0)
});

_results.Exception.Should().BeNull();
_results.Messages.Count.Should().Be(11, "11 updates were made to the source");
_results.Data.Count.Should().Be(3, "3 items were not removed from the list");
_results.Data.Items.All(item => item.IsDisposed).Should().BeFalse("items remaining in the list should not be disposed");
items.Except(_results.Data.Items).All(item => item.IsDisposed).Should().BeTrue("items removed from the list should be disposed");
}

[Fact]
public void RemoveWillCallDispose()
public void RemainingItemsAreDisposedAfterCompleted()
{
_source.Add(new DisposableObject(1));
_source.RemoveAt(0);

_results.Messages.Count.Should().Be(2, "Should be 2 updates");
_results.Data.Count.Should().Be(0, "Should be 0 items in the cache");
_results.Messages[1].First().Item.Current.IsDisposed.Should().Be(true, "Should be disposed");
_itemsSource.AddRange(new[]
{
new DisposableObject(1),
new DisposableObject(2),
new DisposableObject(3),
});
_itemsSource.Dispose();
_changeSetsSource.OnCompleted();

_results.Exception.Should().BeNull();
_results.Messages.Count.Should().Be(1, "1 update was made to the list");
_results.Data.Count.Should().Be(3, "3 items were not removed from the list");
_results.Data.Items.All(item => item.IsDisposed).Should().BeTrue("items remaining in the list should be disposed");
}

[Fact]
public void UpdateWillCallDispose()
public void RemainingItemsAreDisposedAfterError()
{
_source.Add(new DisposableObject(1));
_source.ReplaceAt(0, new DisposableObject(1));
_itemsSource.Add(new(1));

var error = new Exception("Test Exception");
_changeSetsSource.OnError(error);

_itemsSource.Add(new(2));

_results.Messages.Count.Should().Be(2, "Should be 2 updates");
_results.Data.Count.Should().Be(1, "Should be 1 items in the cache");
_results.Messages[1].First().Item.Current.IsDisposed.Should().Be(false, "Current should not be disposed");
_results.Messages[1].First().Item.Previous.Value.IsDisposed.Should().Be(true, "Previous should be disposed");
_results.Exception.Should().Be(error);
_results.Messages.Count.Should().Be(1, "1 update was made to the list");
_results.Data.Count.Should().Be(1, "1 item was not removed from the list");
_results.Data.Items.All(item => item.IsDisposed).Should().BeTrue("Items remaining in the list should be disposed");
}

private class DisposableObject : IDisposable
{
public DisposableObject(int id)
{
Id = id;
}
=> Id = id;

public int Id { get; private set; }

public bool IsDisposed { get; private set; }

public void Dispose()
{
IsDisposed = true;
}
=> IsDisposed = true;
}
}
Loading

0 comments on commit 893024a

Please sign in to comment.