Skip to content

Commit

Permalink
Feature: GroupOnObservable Operator (#847)
Browse files Browse the repository at this point in the history
New Group Operator that groups values based on the latest value returned from an observable created using the given factory function.
- Values are not observed downstream until their Observable fires once
- Any event after the first will have the effect of moving the value from one group to another
- Any errors in the grouping observable will bring down the entire stream
- If the source observable completes, the downstream sequence will also complete (and the sequence from each Group will complete).
- If the number of items in a group drops to zero, the group will be removed and any subscribers to the Group's Cache will see OnComplete
- If the number of items in a group drops to zero, but within the same changeset, another item gets added to that group, the group will NOT be removed/re-added.
  • Loading branch information
dwcullop authored Feb 10, 2024
1 parent 7922e03 commit 04e2ff8
Show file tree
Hide file tree
Showing 10 changed files with 875 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1367,6 +1367,14 @@ namespace DynamicData
where TObject : notnull
where TKey : notnull
where TGroupKey : notnull { }
public static System.IObservable<DynamicData.IGroupChangeSet<TObject, TKey, TGroupKey>> GroupOnObservable<TObject, TKey, TGroupKey>(this System.IObservable<DynamicData.IChangeSet<TObject, TKey>> source, System.Func<TObject, System.IObservable<TGroupKey>> groupObservableSelector)
where TObject : notnull
where TKey : notnull
where TGroupKey : notnull { }
public static System.IObservable<DynamicData.IGroupChangeSet<TObject, TKey, TGroupKey>> GroupOnObservable<TObject, TKey, TGroupKey>(this System.IObservable<DynamicData.IChangeSet<TObject, TKey>> source, System.Func<TObject, TKey, System.IObservable<TGroupKey>> groupObservableSelector)
where TObject : notnull
where TKey : notnull
where TGroupKey : notnull { }
public static System.IObservable<DynamicData.IGroupChangeSet<TObject, TKey, TGroupKey>> GroupOnProperty<TObject, TKey, TGroupKey>(this System.IObservable<DynamicData.IChangeSet<TObject, TKey>> source, System.Linq.Expressions.Expression<System.Func<TObject, TGroupKey>> propertySelector, System.TimeSpan? propertyChangedThrottle = default, System.Reactive.Concurrency.IScheduler? scheduler = null)
where TObject : System.ComponentModel.INotifyPropertyChanged
where TKey : notnull
Expand Down Expand Up @@ -2869,6 +2877,21 @@ namespace DynamicData.Tests
public void Dispose() { }
protected virtual void Dispose(bool isDisposing) { }
}
public class GroupChangeSetAggregator<TObject, TKey, TGroupKey> : System.IDisposable
where TObject : notnull
where TKey : notnull
where TGroupKey : notnull
{
public GroupChangeSetAggregator(System.IObservable<DynamicData.IGroupChangeSet<TObject, TKey, TGroupKey>> source) { }
public DynamicData.IObservableCache<DynamicData.IGroup<TObject, TKey, TGroupKey>, TGroupKey> Data { get; }
public System.Exception? Error { get; }
public DynamicData.IObservableCache<DynamicData.Tests.ChangeSetAggregator<TObject, TKey>, TGroupKey> Groups { get; }
public bool IsCompleted { get; }
public System.Collections.Generic.IReadOnlyList<DynamicData.IGroupChangeSet<TObject, TKey, TGroupKey>> Messages { get; }
public DynamicData.Diagnostics.ChangeSummary Summary { get; }
public void Dispose() { }
protected virtual void Dispose(bool disposing) { }
}
public static class ListTextEx
{
public static DynamicData.Tests.ChangeSetAggregator<T> AsAggregator<T>(this System.IObservable<DynamicData.IChangeSet<T>> source)
Expand Down Expand Up @@ -2914,6 +2937,10 @@ namespace DynamicData.Tests
public static DynamicData.Tests.VirtualChangeSetAggregator<TObject, TKey> AsAggregator<TObject, TKey>(this System.IObservable<DynamicData.IVirtualChangeSet<TObject, TKey>> source)
where TObject : notnull
where TKey : notnull { }
public static DynamicData.Tests.GroupChangeSetAggregator<TValue, TKey, TGroupKey> AsAggregator<TValue, TKey, TGroupKey>(this System.IObservable<DynamicData.IGroupChangeSet<TValue, TKey, TGroupKey>> source)
where TValue : notnull
where TKey : notnull
where TGroupKey : notnull { }
}
public class VirtualChangeSetAggregator<TObject, TKey> : System.IDisposable
where TObject : notnull
Expand Down
345 changes: 345 additions & 0 deletions src/DynamicData.Tests/Cache/GroupOnObservableFixture.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,345 @@
using System;
using System.Linq;
using Bogus;
using DynamicData.Tests.Domain;
using DynamicData.Binding;
using System.Reactive.Linq;
using FluentAssertions;
using Xunit;

using Person = DynamicData.Tests.Domain.Person;
using System.Threading.Tasks;
using DynamicData.Kernel;

namespace DynamicData.Tests.Cache;

public class GroupOnObservableFixture : IDisposable
{
#if DEBUG
private const int InitialCount = 7;
private const int AddCount = 5;
private const int RemoveCount = 3;
private const int UpdateCount = 2;
#else
private const int InitialCount = 103;
private const int AddCount = 53;
private const int RemoveCount = 37;
private const int UpdateCount = 101;
#endif
private readonly SourceCache<Person, string> _personCache = new (p => p.UniqueKey);
private readonly ChangeSetAggregator<Person, string> _personResults;
private readonly GroupChangeSetAggregator<Person, string, Color> _favoriteColorResults;
private readonly Faker<Person> _personFaker;
private readonly Randomizer _randomizer = new(0x3141_5926);

public GroupOnObservableFixture()
{
_personFaker = Fakers.Person.Clone().WithSeed(_randomizer);
_personResults = _personCache.Connect().AsAggregator();
_favoriteColorResults = _personCache.Connect().GroupOnObservable(CreateFavoriteColorObservable).AsAggregator();
}

[Fact]
public void ResultContainsAllInitialChildren()
{
// Arrange

// Act
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));

// Assert
_personResults.Data.Count.Should().Be(InitialCount);
_personResults.Messages.Count.Should().Be(1, "The child observables fire on subscription so everything should appear as a single changeset");
VerifyGroupingResults();
}

[Fact]
public void ResultContainsAddedValues()
{
// Arrange
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));

// Act
_personCache.AddOrUpdate(_personFaker.Generate(AddCount));

// Assert
_personResults.Data.Count.Should().Be(InitialCount + AddCount);
_personResults.Messages.Count.Should().Be(2, "Initial Adds and then the subsequent Additions should each be a single message");
VerifyGroupingResults();
}

[Fact]
public void ResultDoesNotContainRemovedValues()
{
// Arrange
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));

// Act
_personCache.RemoveKeys(_randomizer.ListItems(_personCache.Items.ToList(), RemoveCount).Select(p => p.UniqueKey));

// Assert
_personResults.Data.Count.Should().Be(InitialCount - RemoveCount);
_personResults.Messages.Count.Should().Be(2, "1 for Adds and 1 for Removes");
VerifyGroupingResults();
}

[Fact]
public void ResultContainsUpdatedValues()
{
// Arrange
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));
var replacements = _randomizer.ListItems(_personCache.Items.ToList(), UpdateCount)
.Select(replacePerson => Person.CloneUniqueId(_personFaker.Generate(), replacePerson));

// Act
_personCache.AddOrUpdate(replacements);

// Assert
_personResults.Data.Count.Should().Be(InitialCount, "Only replacements were made");
_personResults.Messages.Count.Should().Be(2, "1 for Adds and 1 for Updates");
VerifyGroupingResults();
}

[Fact]
public void GroupRemovedWhenEmpty()
{
// Arrange
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));
var usedColorList = _personCache.Items.Select(p => p.FavoriteColor).Distinct().ToList();
var removeColor = _randomizer.ListItem(usedColorList);
var colorCount = usedColorList.Count;

// Act
_personCache.Edit(updater => updater.Remove(updater.Items.Where(p => p.FavoriteColor == removeColor).Select(p => p.UniqueKey)));

// Assert
_personCache.Items.Select(p => p.FavoriteColor).Distinct().Count().Should().Be(colorCount - 1);
_personResults.Messages.Count.Should().Be(2, "1 for Adds and 1 for Removes");
_favoriteColorResults.Messages.Count.Should().Be(2, "1 for Adds and 1 for Removes");
_favoriteColorResults.Summary.Overall.Adds.Should().Be(colorCount);
_favoriteColorResults.Summary.Overall.Removes.Should().Be(1);
VerifyGroupingResults();
}

[Fact]
public void GroupNotRemovedIfAddedBackImmediately()
{
// Arrange
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));
var usedColorList = _personCache.Items.Select(p => p.FavoriteColor).Distinct().ToList();
var removeColor = _randomizer.ListItem(usedColorList);
var colorCount = usedColorList.Count;

// Act
_personCache.Edit(updater =>
{
updater.Remove(updater.Items.Where(p => p.FavoriteColor == removeColor).Select(p => p.UniqueKey));
var newPerson = _personFaker.Generate();
newPerson.FavoriteColor = removeColor;
updater.AddOrUpdate(newPerson);
});

// Assert
_personCache.Items.Select(p => p.FavoriteColor).Distinct().Count().Should().Be(colorCount);
_personResults.Messages.Count.Should().Be(2, "1 for Adds and 1 for Other Added Value");
_favoriteColorResults.Messages.Count.Should().Be(1, "Shouldn't be removed/re-added");
_favoriteColorResults.Summary.Overall.Adds.Should().Be(colorCount);
_favoriteColorResults.Summary.Overall.Removes.Should().Be(0);
VerifyGroupingResults();
}

[Fact]
public void GroupingSequenceCompletesWhenEmpty()
{
// Arrange
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));
var usedColorList = _personCache.Items.Select(p => p.FavoriteColor).Distinct().ToList();
var removeColor = _randomizer.ListItem(usedColorList);

var results = _personCache.Connect().GroupOnObservable(CreateFavoriteColorObservable)
.Filter(grp => grp.Key == removeColor)
.Take(1)
.MergeMany(grp => grp.Cache.Connect())
.AsAggregator();

// Act
_personCache.Edit(updater => updater.Remove(updater.Items.Where(p => p.FavoriteColor == removeColor).Select(p => p.UniqueKey)));

// Assert
results.IsCompleted.Should().BeTrue();
VerifyGroupingResults();
}

[Fact]
public void AllSequencesCompleteWhenSourceIsDisposed()
{
// Arrange
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));

var results = _personCache.Connect().GroupOnObservable(CreateFavoriteColorObservable)
.MergeMany(grp => grp.Cache.Connect())
.AsAggregator();

// Act
_personCache.Dispose();

// Assert
results.IsCompleted.Should().BeTrue();
VerifyGroupingResults();
}

[Fact]
public void AllGroupsRemovedWhenCleared()
{
// Arrange
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));
var usedColorList = _personCache.Items.Select(p => p.FavoriteColor).Distinct().ToList();
var colorCount = usedColorList.Count;

// Act
_personCache.Clear();

// Assert
_personCache.Items.Count().Should().Be(0);
_personResults.Messages.Count.Should().Be(2, "1 for Adds and 1 for Removes");
_favoriteColorResults.Summary.Overall.Adds.Should().Be(colorCount);
_favoriteColorResults.Summary.Overall.Removes.Should().Be(colorCount);
VerifyGroupingResults();
}

[Fact]
public void ResultsContainsCorrectRegroupedValues()
{
// Arrange
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));

// Act
Enumerable.Range(0, UpdateCount).ForEach(_ => RandomFavoriteColorChange());

// Assert
VerifyGroupingResults();
}

[Fact]
public async Task ResultsContainsCorrectRegroupedValuesAsync()
{
// Arrange
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));
var tasks = Enumerable.Range(0, UpdateCount).Select(_ => Task.Run(RandomFavoriteColorChange));

// Act
await Task.WhenAll(tasks.ToArray());

// Assert
VerifyGroupingResults();
}

[Theory]
[InlineData(false)]
[InlineData(true)]
public void ResultCompletesOnlyWhenSourceCompletes(bool completeSource)
{
// Arrange
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));

// Act
if (completeSource)
{
_personCache.Dispose();
}

// Assert
_personResults.IsCompleted.Should().Be(completeSource);
}

[Fact]
public void ResultFailsIfSourceFails()
{
// Arrange
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));
var expectedError = new Exception("Expected");
var throwObservable = Observable.Throw<IChangeSet<Person, string>>(expectedError);
using var results = _personCache.Connect().Concat(throwObservable).GroupOnObservable(CreateFavoriteColorObservable).AsAggregator();

// Act
_personCache.Dispose();

// Assert
results.Error.Should().Be(expectedError);
}

[Fact]
public void ResultFailsIfGroupObservableFails()
{
// Arrange
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));
var expectedError = new Exception("Expected");
var throwObservable = Observable.Throw<Color>(expectedError);

// Act
using var results = _personCache.Connect().GroupOnObservable((person, key) => CreateFavoriteColorObservable(person, key).Take(1).Concat(throwObservable)).AsAggregator();

// Assert
results.Error.Should().Be(expectedError);
}

[Fact]
public void OnErrorFiresIfSelectorThrows()
{
// Arrange
_personCache.AddOrUpdate(_personFaker.Generate(InitialCount));
var expectedError = new Exception("Expected");

// Act
using var results = _personCache.Connect().GroupOnObservable<Person, string, Color>(_ => throw expectedError).AsAggregator();

// Assert
results.Error.Should().Be(expectedError);
}

public void Dispose()
{
_favoriteColorResults.Dispose();
_personResults.Dispose();
_personCache.Dispose();
}

private void RandomFavoriteColorChange()
{
var person = _randomizer.ListItem(_personCache.Items.ToList());
lock (person)
{
// Pick a new favorite color
person.FavoriteColor = _randomizer.RandomColor(person.FavoriteColor);
}
}

private void VerifyGroupingResults() =>
VerifyGroupingResults(_personCache, _personResults, _favoriteColorResults);

private static void VerifyGroupingResults(ISourceCache<Person, string> personCache, ChangeSetAggregator<Person, string> personResults, GroupChangeSetAggregator<Person, string, Color> favoriteColorResults)
{
var expectedPersons = personCache.Items.ToList();
var expectedGroupings = personCache.Items.GroupBy(p => p.FavoriteColor).ToList();

// These should be subsets of each other
expectedPersons.Should().BeEquivalentTo(personResults.Data.Items);
favoriteColorResults.Groups.Count.Should().Be(expectedGroupings.Count);

// Check each group
foreach (var grouping in expectedGroupings)
{
var color = grouping.Key;
var expectedGroup = grouping.ToList();
var optionalGroup = favoriteColorResults.Groups.Lookup(color);

optionalGroup.HasValue.Should().BeTrue();
var actualGroup = optionalGroup.Value.Data.Items.ToList();

expectedGroup.Should().BeEquivalentTo(actualGroup);
}
}

private static IObservable<Color> CreateFavoriteColorObservable(Person person, string key) =>
person.WhenPropertyChanged(p => p.FavoriteColor).Select(change => change.Value);
}
Loading

0 comments on commit 04e2ff8

Please sign in to comment.