Skip to content

Commit

Permalink
Re-designed the .ToObservableChangeSet() operator for both caches a…
Browse files Browse the repository at this point in the history
…nd lists, for better independence, proper error handling, and improved performance. Resolves reactivemarbles#635.
  • Loading branch information
JakenVeina committed Dec 6, 2023
1 parent d93fb94 commit 9cc2ef8
Show file tree
Hide file tree
Showing 9 changed files with 1,185 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,7 @@ namespace DynamicData
public static readonly DynamicData.IChangeSet<T> Empty;
public ChangeSet() { }
public ChangeSet(System.Collections.Generic.IEnumerable<DynamicData.Change<T>> items) { }
public ChangeSet(int capacity) { }
public int Adds { get; }
public int Moves { get; }
public int Refreshes { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,7 @@ namespace DynamicData
public static readonly DynamicData.IChangeSet<T> Empty;
public ChangeSet() { }
public ChangeSet(System.Collections.Generic.IEnumerable<DynamicData.Change<T>> items) { }
public ChangeSet(int capacity) { }
public int Adds { get; }
public int Moves { get; }
public int Refreshes { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,7 @@ namespace DynamicData
public static readonly DynamicData.IChangeSet<T> Empty;
public ChangeSet() { }
public ChangeSet(System.Collections.Generic.IEnumerable<DynamicData.Change<T>> items) { }
public ChangeSet(int capacity) { }
public int Adds { get; }
public int Moves { get; }
public int Refreshes { get; }
Expand Down
206 changes: 197 additions & 9 deletions src/DynamicData.Tests/Cache/ToObservableChangeSetFixture.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
Expand All @@ -15,6 +17,63 @@ namespace DynamicData.Tests.Cache;
public class ToObservableChangeSetFixture
: ReactiveTest
{
[Fact]
public void NextItemToExpireIsReplaced_ExpirationIsRescheduledIfNeeded()
{
using var source = new Subject<Item>();

var scheduler = new TestScheduler();

using var results = new ChangeSetAggregator<Item, int>(source
.ToObservableChangeSet(
keySelector: static item => item.Id,
expireAfter: static item => item.Lifetime,
scheduler: scheduler));

var item1 = new Item() { Id = 1, Lifetime = TimeSpan.FromMilliseconds(10) };
source.OnNext(item1);
scheduler.AdvanceBy(1);

// Extend the expiration to a later time
var item2 = new Item() { Id = 1, Lifetime = TimeSpan.FromMilliseconds(20) };
source.OnNext(item2);
scheduler.AdvanceBy(1);

results.Error.Should().BeNull();
results.Messages.Count.Should().Be(2, "2 items were emitted");
results.Data.Items.Should().BeEquivalentTo(new[] { item2 }, "2 items were emitted, 1 of which was a replacement");

scheduler.AdvanceTo(TimeSpan.FromMilliseconds(10).Ticks);

results.Error.Should().BeNull();
results.Messages.Count.Should().Be(2, "no changes should have occurred, since the last check");
results.Data.Items.Should().BeEquivalentTo(new[] { item2 }, "no changes should have occurred, since the last check");

// Shorten the expiration to an earlier time (5ms from now is 15m total)
var item3 = new Item() { Id = 1, Lifetime = TimeSpan.FromMilliseconds(5) };
source.OnNext(item3);
scheduler.AdvanceBy(1);

results.Error.Should().BeNull();
results.Messages.Count.Should().Be(3, "1 item was emitted, since the last check");
results.Data.Items.Should().BeEquivalentTo(new[] { item3 }, "1 item was replaced, since the last check");

// One more update with no changes to the expiration
var item4 = new Item() { Id = 1, Lifetime = TimeSpan.FromMilliseconds(5) };
source.OnNext(item4);
scheduler.AdvanceBy(1);

results.Error.Should().BeNull();
results.Messages.Count.Should().Be(4, "1 item was emitted, since the last check");
results.Data.Items.Should().BeEquivalentTo(new[] { item4 }, "1 item was replaced, since the last check");

scheduler.AdvanceTo(TimeSpan.FromMilliseconds(15).Ticks);

results.Error.Should().BeNull();
results.Messages.Count.Should().Be(5, "1 expiration should have occurred, since the last check");
results.Data.Items.Should().BeEmpty("the last item should have expired, since the last check");
}

[Fact]
public void ExpirationIsGiven_RemovalIsScheduled()
{
Expand Down Expand Up @@ -112,15 +171,88 @@ public void ExpirationIsGiven_RemovalIsScheduled()
results.Data.Items.Should().BeEquivalentTo(new[] { item9 }, "item #11 should have expired");
}

[Fact]
public void ItemIsEvictedBeforeExpiration_ExpirationIsCancelled()
{
using var source = new Subject<IEnumerable<Item>>();

var scheduler = new TestScheduler();

using var results = new ChangeSetAggregator<Item, int>(source
.ToObservableChangeSet(
keySelector: static item => item.Id,
expireAfter: static item => item.Lifetime,
limitSizeTo: 3,
scheduler: scheduler));

var item1 = new Item() { Id = 1, Lifetime = TimeSpan.FromMilliseconds(10) };
var item2 = new Item() { Id = 2, Lifetime = TimeSpan.FromMilliseconds(10) };
var item3 = new Item() { Id = 3, Lifetime = TimeSpan.FromMilliseconds(10) };
source.OnNext(new[] { item1, item2, item3 });
scheduler.AdvanceBy(1);

var item4 = new Item() { Id = 4 };
source.OnNext(new[] { item4 });
scheduler.AdvanceBy(1);

results.Error.Should().BeNull();
results.Messages.Count.Should().Be(2, "2 item sets were emitted");
results.Data.Items.Should().BeEquivalentTo(new[] { item2, item3, item4 }, "the size limit of the collection was 3");

scheduler.AdvanceTo(TimeSpan.FromMilliseconds(10).Ticks);

results.Error.Should().BeNull();
results.Messages.Count.Should().Be(3, "2 items should have expired, at the same time, since the last check");
results.Data.Items.Should().BeEquivalentTo(new[] { item4 }, "2 items should have expired, since the last check");
}

[Fact]
public void ItemExpiresBeforeEviction_EvictionIsSkipped()
{
using var source = new Subject<IEnumerable<Item>>();

var scheduler = new TestScheduler();

using var results = new ChangeSetAggregator<Item, int>(source
.ToObservableChangeSet(
keySelector: static item => item.Id,
expireAfter: static item => item.Lifetime,
limitSizeTo: 3,
scheduler: scheduler));

var item1 = new Item() { Id = 1, Lifetime = TimeSpan.FromMilliseconds(10) };
var item2 = new Item() { Id = 2 };
var item3 = new Item() { Id = 3 };
source.OnNext(new[] { item1, item2, item3 });
scheduler.AdvanceBy(1);

results.Error.Should().BeNull();
results.Messages.Count.Should().Be(1, "1 item set was emitted");
results.Data.Items.Should().BeEquivalentTo(new[] { item1, item2, item3 }, "the size limit of the collection was 3");

scheduler.AdvanceTo(TimeSpan.FromMilliseconds(10).Ticks);

results.Error.Should().BeNull();
results.Messages.Count.Should().Be(2, "1 expiration should have occurred, since the last check");
results.Data.Items.Should().BeEquivalentTo(new[] { item2, item3 }, "item #1 should have expired");

var item4 = new Item() { Id = 4 };
source.OnNext(new[] { item4 });
scheduler.AdvanceBy(1);

results.Error.Should().BeNull();
results.Messages.Count.Should().Be(3, "1 item set was emitted, since the last check");
results.Data.Items.Should().BeEquivalentTo(new[] { item2, item3, item4 }, "no eviction should have occurred");
}

[Fact]
public void KeySelectorIsNull_ThrowsException()
=> FluentActions.Invoking(() => ObservableCacheEx.ToObservableChangeSet<object, object>(
source: new Subject<object>(),
keySelector: null!))
.Should().Throw<ArgumentNullException>();

[Fact(Skip = "Outstanding bug, error re-throws, instead of emitting on the stream")]
[System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "xUnit1004:Test methods should not be skipped", Justification = "Bug to be fixed")]
[Fact]
public void KeySelectorThrows_SubscriptionReceivesError()
{
using var source = new Subject<Item>();
Expand All @@ -142,8 +274,31 @@ public void KeySelectorThrows_SubscriptionReceivesError()
results.Data.Items.Should().BeEquivalentTo(new[] { item1 }, "1 item was emitted before an error occurred");
}

[Fact(Skip = "Outstanding bug, completion is not forwarded")]
[System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "xUnit1004:Test methods should not be skipped", Justification = "Bug to be fixed")]
[Fact]
public void LimitToSizeIs0_ChangeSetsAreEmpty()
{
using var source = new Subject<Item>();

using var results = new ChangeSetAggregator<Item, int>(source
.ToObservableChangeSet(
keySelector: static item => item.Id,
limitSizeTo: 0));

var item1 = new Item() { Id = 1 };
source.OnNext(item1);

var item2 = new Item() { Id = 2 };
source.OnNext(item2);

var item3 = new Item() { Id = 3 };
source.OnNext(item3);

results.Error.Should().BeNull();
results.Messages.Count.Should().Be(3, "3 items were emitted");
results.Data.Items.Should().BeEmpty("the size limit of the collection was 0");
}

[Fact]
public void RemovalsArePending_CompletionWaitsForRemovals()
{
using var source = new Subject<IEnumerable<Item>>();
Expand All @@ -164,19 +319,20 @@ public void RemovalsArePending_CompletionWaitsForRemovals()

source.OnCompleted();

results.Error.Should().BeNull();
results.Completed.Should().BeFalse("item removals have been scheduled, and not completed");
results.Messages.Count.Should().Be(1, "1 item set was emitted");
results.Data.Items.Should().BeEquivalentTo(new[] { item1, item2, item3 }, "3 items were emitted");

scheduler.AdvanceTo(TimeSpan.FromMilliseconds(30).Ticks);

results.Error.Should().BeNull();
results.Completed.Should().BeTrue("the source has completed, and no outstanding expirations remain");
results.Messages.Count.Should().Be(3, "2 expirations should have occurred, since the last check");
results.Data.Items.Should().BeEquivalentTo(new[] { item2 }, "3 items were emitted, and 2 should have expired");
}

[Fact(Skip = "Outstanding bug, https://github.com/reactivemarbles/DynamicData/issues/635")]
[System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "xUnit1004:Test methods should not be skipped", Justification = "Bug to be fixed")]
[Fact]
public void SourceCompletesImmediately_SubscriptionCompletes()
{
var item = new Item() { Id = 1 };
Expand All @@ -191,6 +347,7 @@ public void SourceCompletesImmediately_SubscriptionCompletes()
using var results = new ChangeSetAggregator<Item, int>(source
.ToObservableChangeSet(static item => item.Id));

results.Error.Should().BeNull();
results.Completed.Should().BeTrue("the source has completed, and no outstanding expirations remain");
results.Messages.Count.Should().Be(1, "1 item was emitted");
results.Data.Items.Should().BeEquivalentTo(new[] { item }, "1 item was emitted");
Expand Down Expand Up @@ -246,12 +403,11 @@ public void SizeLimitIsExceeded_OldestItemsAreRemoved()
scheduler.AdvanceBy(1);

results.Error.Should().BeNull();
results.Messages.Count.Should().Be(9, "6 item sets were emitted by the source, 3 of which triggered followup evictions");
results.Messages.Count.Should().Be(6, "6 item sets were emitted by the source");
results.Data.Items.Should().BeEquivalentTo(new[] { item5, item6, item9, item10, item11 }, "the size limit of the collection was 5");
}

[Fact(Skip = "Outstanding bug, notifications are not synchronized, initial item emits after error")]
[System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "xUnit1004:Test methods should not be skipped", Justification = "Bug to be fixed")]
[Fact]
public void SourceErrorsImmediately_SubscriptionReceivesError()
{
var item = new Item() { Id = 1 };
Expand Down Expand Up @@ -322,6 +478,38 @@ public void SourceIsNull_ThrowsException()
keySelector: static item => item))
.Should().Throw<ArgumentNullException>();

[Fact]
public void ThreadPoolSchedulerIsUsed_ExpirationIsThreadSafe()
{
var testDuration = TimeSpan.FromSeconds(1);
var maxItemLifetime = TimeSpan.FromMilliseconds(50);

using var source = new Subject<Item>();

using var results = new ChangeSetAggregator<Item, int>(source
.ToObservableChangeSet(
keySelector: static item => item.Id,
expireAfter: static item => item.Lifetime,
limitSizeTo: 1000,
scheduler: ThreadPoolScheduler.Instance));

var nextItemId = 1;
var rng = new Random(Seed: 1234567);

var stopwatch = new Stopwatch();
stopwatch.Start();
while (stopwatch.Elapsed < testDuration)
{
source.OnNext(new()
{
Id = nextItemId++,
Lifetime = TimeSpan.FromMilliseconds(rng.Next(maxItemLifetime.Milliseconds + 1))
});
}

results.Error.Should().BeNull();
}

public class Item
{
public int Id { get; init; }
Expand Down
Loading

0 comments on commit 9cc2ef8

Please sign in to comment.