diff --git a/src/DynamicData.Tests/Cache/MergeManyCacheChangeSetsFixture.cs b/src/DynamicData.Tests/Cache/MergeManyCacheChangeSetsFixture.cs new file mode 100644 index 000000000..0b083f01d --- /dev/null +++ b/src/DynamicData.Tests/Cache/MergeManyCacheChangeSetsFixture.cs @@ -0,0 +1,812 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Linq; +using System.Reactive.Linq; +using DynamicData.Kernel; +using FluentAssertions; + +using Xunit; +using Xunit.Sdk; + +namespace DynamicData.Tests.Cache; + +public sealed class MergeManyCacheChangeSetsFixture : IDisposable +{ + const int MarketCount = 101; + const int PricesPerMarket = 103; + const int RemoveCount = 53; + const int ItemIdStride = 1000; + const decimal BasePrice = 10m; + const decimal PriceOffset = 10m; + const decimal HighestPrice = BasePrice + PriceOffset + 1.0m; + const decimal LowestPrice = BasePrice - 1.0m; + + private static readonly Random Random = new Random(0x21123737); + + private readonly ISourceCache _marketCache = new SourceCache(p => p.Id); + + private readonly ChangeSetAggregator _marketCacheResults; + + public MergeManyCacheChangeSetsFixture() + { + _marketCacheResults = _marketCache.Connect().AsAggregator(); + } + + [Fact] + public void AbleToInvokeFactory() + { + // having + var invoked = false; + IObservable> factory(IMarket m) + { + invoked = true; + return m.LatestPrices; + } + using var sub = _marketCache.Connect().MergeManyChangeSets(factory).Subscribe(); + + // when + _marketCache.AddOrUpdate(new Market(0)); + + // then + _marketCacheResults.Data.Count.Should().Be(1); + invoked.Should().BeTrue(); + Assert.Throws(() => _marketCache.Connect().MergeManyChangeSets((Func>>)null!, comparer: null!)); + Assert.Throws(() => _marketCache.Connect().MergeManyChangeSets(_ => Observable.Return(ChangeSet.Empty), comparer: null!)); + Assert.Throws(() => _marketCache.Connect().MergeManyChangeSets((Func>>)null!, null!, null!)); + Assert.Throws(() => ObservableCacheEx.MergeManyChangeSets(null!, (Func>>)null!, comparer: null!)); + Assert.Throws(() => ObservableCacheEx.MergeManyChangeSets(null!, (Func>>)null!, null!, null!)); + } + + [Fact] + public void AbleToInvokeFactoryWithKey() + { + // having + var invoked = false; + IObservable> factory(IMarket m, Guid g) + { + invoked = true; + return m.LatestPrices; + } + using var sub = _marketCache.Connect().MergeManyChangeSets(factory).Subscribe(); + + // when + _marketCache.AddOrUpdate(new Market(0)); + + // then + _marketCacheResults.Data.Count.Should().Be(1); + invoked.Should().BeTrue(); + Assert.Throws(() => _marketCache.Connect().MergeManyChangeSets((Func>>)null!, comparer: null!)); + Assert.Throws(() => _marketCache.Connect().MergeManyChangeSets((_, _) => Observable.Return(ChangeSet.Empty), comparer: null!)); + Assert.Throws(() => _marketCache.Connect().MergeManyChangeSets((Func>>)null!, null!, null!)); + Assert.Throws(() => ObservableCacheEx.MergeManyChangeSets(null!, (Func>>)null!, comparer: null!)); + Assert.Throws(() => ObservableCacheEx.MergeManyChangeSets(null!, (Func>>)null!, null!, null!)); + } + + [Fact] + public void AllExistingSubItemsPresentInResult() + { + // having + var markets = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray(); + using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator(); + markets.Select((m, index) => new { Market = m, Index = index }).ForEach(m => m.Market.AddRandomPrices(Random, m.Index * ItemIdStride, (m.Index * ItemIdStride) + PricesPerMarket)); + + // when + _marketCache.AddOrUpdate(markets); + + // then + _marketCacheResults.Data.Count.Should().Be(MarketCount); + markets.Sum(m => m.PricesCache.Count).Should().Be(MarketCount * PricesPerMarket); + results.Data.Count.Should().Be(MarketCount * PricesPerMarket); + results.Messages.Count.Should().Be(MarketCount); + results.Summary.Overall.Adds.Should().Be(MarketCount * PricesPerMarket); + results.Summary.Overall.Removes.Should().Be(0); + results.Summary.Overall.Updates.Should().Be(0); + } + + [Fact] + public void AllNewSubItemsPresentInResult() + { + // having + var markets = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray(); + using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator(); + _marketCache.AddOrUpdate(markets); + + // when + markets.Select((m, index) => new { Market = m, Index = index }).ForEach(m => m.Market.AddRandomPrices(Random, m.Index * ItemIdStride, (m.Index * ItemIdStride) + PricesPerMarket)); + + // then + _marketCacheResults.Data.Count.Should().Be(MarketCount); + markets.Sum(m => m.PricesCache.Count).Should().Be(MarketCount * PricesPerMarket); + results.Data.Count.Should().Be(MarketCount * PricesPerMarket); + results.Messages.Count.Should().Be(MarketCount); + results.Summary.Overall.Adds.Should().Be(MarketCount * PricesPerMarket); + results.Summary.Overall.Removes.Should().Be(0); + results.Summary.Overall.Updates.Should().Be(0); + } + + [Fact] + public void AllRefreshedSubItemsAreRefreshed() + { + // having + var markets = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray(); + using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator(); + _marketCache.AddOrUpdate(markets); + markets.Select((m, index) => new { Market = m, Index = index }).ForEach(m => m.Market.AddRandomPrices(Random, m.Index * ItemIdStride, (m.Index * ItemIdStride) + PricesPerMarket)); + + // when + markets.ForEach(m => m.RefreshAllPrices(Random)); + + // then + _marketCacheResults.Data.Count.Should().Be(MarketCount); + results.Data.Count.Should().Be(MarketCount * PricesPerMarket); + results.Messages.Count.Should().Be(MarketCount * 2); + results.Summary.Overall.Adds.Should().Be(MarketCount * PricesPerMarket); + results.Summary.Overall.Removes.Should().Be(0); + results.Summary.Overall.Updates.Should().Be(0); + results.Summary.Overall.Refreshes.Should().Be(MarketCount * PricesPerMarket); + } + + [Fact] + public void AnyDuplicateKeyValuesShouldBeHidden() + { + // having + var markets = Enumerable.Range(0, 2).Select(n => new Market(n)).ToArray(); + using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator(); + _marketCache.AddOrUpdate(markets); + + // when + markets[0].AddRandomPrices(Random, 0, PricesPerMarket); + markets[1].AddRandomPrices(Random, 0, PricesPerMarket); + + // then + _marketCacheResults.Data.Count.Should().Be(2); + results.Data.Count.Should().Be(PricesPerMarket); + results.Data.Items.Zip(markets[0].PricesCache.Items).ForEach(pair => pair.First.Should().Be(pair.Second)); + results.Summary.Overall.Adds.Should().Be(PricesPerMarket); + results.Summary.Overall.Removes.Should().Be(0); + results.Summary.Overall.Updates.Should().Be(0); + } + + [Fact] + public void AnyDuplicateValuesShouldBeNoOpWhenRemoved() + { + // having + var markets = Enumerable.Range(0, 2).Select(n => new Market(n)).ToArray(); + using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator(); + _marketCache.AddOrUpdate(markets); + markets[0].AddRandomPrices(Random, 0, PricesPerMarket); + markets[1].AddRandomPrices(Random, 0, PricesPerMarket); + + // when + markets[1].RemoveAllPrices(); + + // then + _marketCacheResults.Data.Count.Should().Be(2); + results.Data.Count.Should().Be(PricesPerMarket); + results.Data.Items.Zip(markets[0].PricesCache.Items).ForEach(pair => pair.First.Should().Be(pair.Second)); + results.Summary.Overall.Adds.Should().Be(PricesPerMarket); + results.Summary.Overall.Removes.Should().Be(0); + results.Summary.Overall.Updates.Should().Be(0); + } + + [Fact] + public void AnyDuplicateValuesShouldBeUnhiddenWhenOtherIsRemoved() + { + // having + var markets = Enumerable.Range(0, 2).Select(n => new Market(n)).ToArray(); + using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator(); + _marketCache.AddOrUpdate(markets); + markets[0].AddRandomPrices(Random, 0, PricesPerMarket); + markets[1].AddRandomPrices(Random, 0, PricesPerMarket); + + // when + _marketCache.Remove(markets[0]); + + // then + _marketCacheResults.Data.Count.Should().Be(1); + results.Data.Count.Should().Be(PricesPerMarket); + results.Data.Items.Zip(markets[1].PricesCache.Items).ForEach(pair => pair.First.Should().Be(pair.Second)); + results.Messages.Count.Should().Be(2); + results.Messages[1].Updates.Should().Be(PricesPerMarket); + } + + [Fact] + public void AnyDuplicateValuesShouldNotRefreshWhenHidden() + { + // having + var markets = Enumerable.Range(0, 2).Select(n => new Market(n)).ToArray(); + using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator(); + _marketCache.AddOrUpdate(markets); + markets[0].AddRandomPrices(Random, 0, PricesPerMarket); + markets[1].AddRandomPrices(Random, 0, PricesPerMarket); + + // when + markets[1].RefreshAllPrices(Random); + + // then + _marketCacheResults.Data.Count.Should().Be(2); + results.Data.Count.Should().Be(PricesPerMarket); + results.Summary.Overall.Refreshes.Should().Be(0); + results.Data.Items.Zip(markets[0].PricesCache.Items).ForEach(pair => pair.First.Should().Be(pair.Second)); + } + + [Fact] + public void AnyRemovedSubItemIsRemoved() + { + // having + var markets = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray(); + using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator(); + _marketCache.AddOrUpdate(markets); + markets.Select((m, index) => new { Market = m, Index = index }).ForEach(m => m.Market.AddRandomPrices(Random, m.Index * ItemIdStride, (m.Index * ItemIdStride) + PricesPerMarket)); + + // when + markets.ForEach(m => m.PricesCache.Edit(updater => updater.RemoveKeys(updater.Keys.Take(RemoveCount)))); + + // then + _marketCacheResults.Data.Count.Should().Be(MarketCount); + results.Data.Count.Should().Be(MarketCount * (PricesPerMarket - RemoveCount)); + results.Messages.Count.Should().Be(MarketCount * 2); + results.Messages[0].Adds.Should().Be(PricesPerMarket); + results.Summary.Overall.Adds.Should().Be(MarketCount * PricesPerMarket); + results.Summary.Overall.Removes.Should().Be(MarketCount * RemoveCount); + } + + [Fact] + public void AnySourceItemRemovedRemovesAllSourceValues() + { + // having + var markets = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray(); + using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator(); + _marketCache.AddOrUpdate(markets); + markets.Select((m, index) => new { Market = m, Index = index }).ForEach(m => m.Market.AddRandomPrices(Random, m.Index * ItemIdStride, (m.Index * ItemIdStride) + PricesPerMarket)); + + // when + _marketCache.Edit(updater => updater.RemoveKeys(updater.Keys.Take(RemoveCount))); + + // then + _marketCacheResults.Data.Count.Should().Be(MarketCount - RemoveCount); + results.Data.Count.Should().Be((MarketCount - RemoveCount) * PricesPerMarket); + results.Summary.Overall.Adds.Should().Be(MarketCount * PricesPerMarket); + results.Summary.Overall.Removes.Should().Be(PricesPerMarket * RemoveCount); + } + + [Fact] + public void ChangingSourceByUpdateRemovesPreviousAndAddsNewValues() + { + // having + using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator(); + var market = new Market(0); + market.AddRandomPrices(Random, 0, PricesPerMarket * 2); + _marketCache.AddOrUpdate(market); + var updatedMarket = new Market(market); + updatedMarket.AddRandomPrices(Random, PricesPerMarket, PricesPerMarket * 3); + + // when + _marketCache.AddOrUpdate(updatedMarket); + + // then + _marketCacheResults.Data.Count.Should().Be(1); + results.Data.Count.Should().Be(PricesPerMarket * 2); + results.Summary.Overall.Adds.Should().Be(PricesPerMarket * 3); + results.Summary.Overall.Updates.Should().Be(PricesPerMarket); + results.Summary.Overall.Removes.Should().Be(PricesPerMarket); + results.Data.Items.Zip(updatedMarket.PricesCache.Items).ForEach(pair => pair.First.Should().Be(pair.Second)); + } + + [Fact] + public void ComparerOnlyAddsBetterAddedValues() + { + // having + using var highPriceResults = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.HighPriceCompare).AsAggregator(); + using var lowPriceResults = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.LowPriceCompare).AsAggregator(); + var marketOriginal = new Market(0); + var marketLow = new Market(1); + var marketHigh = new Market(2); + marketOriginal.AddRandomPrices(Random, 0, PricesPerMarket); + _marketCache.AddOrUpdate(marketOriginal); + _marketCache.AddOrUpdate(marketLow); + _marketCache.AddOrUpdate(marketHigh); + + // when + marketLow.UpdatePrices(0, PricesPerMarket, LowestPrice); + marketHigh.UpdatePrices(0, PricesPerMarket, HighestPrice); + + // then + _marketCacheResults.Data.Count.Should().Be(3); + lowPriceResults.Data.Count.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket); + lowPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketLow.Id)); + highPriceResults.Data.Count.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket); + highPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketHigh.Id)); + } + + [Fact] + public void ComparerOnlyAddsBetterExistingValues() + { + // having + using var highPriceResults = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.HighPriceCompare).AsAggregator(); + using var lowPriceResults = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.LowPriceCompare).AsAggregator(); + var marketOriginal = new Market(0); + var marketLow = new Market(1); + var marketHigh = new Market(2); + marketOriginal.AddRandomPrices(Random, 0, PricesPerMarket); + _marketCache.AddOrUpdate(marketOriginal); + marketLow.UpdatePrices(0, PricesPerMarket, LowestPrice); + marketHigh.UpdatePrices(0, PricesPerMarket, HighestPrice); + + // when + _marketCache.AddOrUpdate(marketLow); + _marketCache.AddOrUpdate(marketHigh); + + // then + _marketCacheResults.Data.Count.Should().Be(3); + lowPriceResults.Data.Count.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket); + lowPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketLow.Id)); + highPriceResults.Data.Count.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket); + highPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketHigh.Id)); + } + + [Fact] + public void ComparerOnlyAddsBetterValuesOnSourceUpdate() + { + // having + using var highPriceResults = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.HighPriceCompare).AsAggregator(); + using var lowPriceResults = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.LowPriceCompare).AsAggregator(); + var marketOriginal = new Market(0); + var marketLow = new Market(1); + var marketLowLow = new Market(marketLow); + marketOriginal.AddRandomPrices(Random, 0, PricesPerMarket); + marketLow.UpdatePrices(0, PricesPerMarket, LowestPrice); + marketLowLow.UpdatePrices(0, PricesPerMarket, LowestPrice - 1); + _marketCache.AddOrUpdate(marketOriginal); + _marketCache.AddOrUpdate(marketLow); + + // when + _marketCache.AddOrUpdate(marketLowLow); + + // then + _marketCacheResults.Data.Count.Should().Be(2); + lowPriceResults.Data.Count.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Removes.Should().Be(0); + lowPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket * 2); + lowPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketLowLow.Id)); + highPriceResults.Data.Count.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Removes.Should().Be(0); + highPriceResults.Summary.Overall.Updates.Should().Be(0); + highPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketOriginal.Id)); + } + + [Fact] + public void ComparerUpdatesToCorrectValueOnRefresh() + { + // having + using var highPriceResults = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.HighPriceCompare).AsAggregator(); + using var lowPriceResults = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.LowPriceCompare).AsAggregator(); + var marketOriginal = new Market(0); + var marketFlipFlop = new Market(1); + marketOriginal.AddRandomPrices(Random, 0, PricesPerMarket); + marketFlipFlop.UpdatePrices(0, PricesPerMarket, HighestPrice); + _marketCache.AddOrUpdate(marketOriginal); + _marketCache.AddOrUpdate(marketFlipFlop); + + // when + marketFlipFlop.RefreshAllPrices(LowestPrice); + + // then + _marketCacheResults.Data.Count.Should().Be(2); + lowPriceResults.Data.Count.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Removes.Should().Be(0); + lowPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Refreshes.Should().Be(0); + lowPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketFlipFlop.Id)); + highPriceResults.Data.Count.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Removes.Should().Be(0); + highPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket * 2); + highPriceResults.Summary.Overall.Refreshes.Should().Be(0); + highPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketOriginal.Id)); + } + + [Fact] + public void ComparerUpdatesToCorrectValueOnRemove() + { + // having + using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator(); + using var lowPriceResults = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.LowPriceCompare).AsAggregator(); + using var highPriceResults = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.HighPriceCompare).AsAggregator(); + var marketOriginal = new Market(0); + var marketLow = new Market(1); + var marketHigh = new Market(2); + marketOriginal.AddRandomPrices(Random, 0, PricesPerMarket); + _marketCache.AddOrUpdate(marketOriginal); + _marketCache.AddOrUpdate(marketLow); + _marketCache.AddOrUpdate(marketHigh); + marketLow.UpdatePrices(0, PricesPerMarket, LowestPrice); + marketHigh.UpdatePrices(0, PricesPerMarket, HighestPrice); + + // when + _marketCache.Remove(marketLow); + + // then + _marketCacheResults.Data.Count.Should().Be(2); + results.Data.Count.Should().Be(PricesPerMarket); + results.Summary.Overall.Adds.Should().Be(PricesPerMarket); + results.Summary.Overall.Updates.Should().Be(0); + results.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketOriginal.Id)); + lowPriceResults.Data.Count.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Removes.Should().Be(0); + lowPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket * 2); + lowPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketOriginal.Id)); + highPriceResults.Data.Count.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Removes.Should().Be(0); + highPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket); + highPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketHigh.Id)); + } + + [Fact] + public void ComparerUpdatesToCorrectValueOnUpdate() + { + // having + using var highPriceResults = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.HighPriceCompare).AsAggregator(); + using var lowPriceResults = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.LowPriceCompare).AsAggregator(); + var marketOriginal = new Market(0); + var marketFlipFlop = new Market(1); + marketOriginal.AddRandomPrices(Random, 0, PricesPerMarket); + marketFlipFlop.UpdatePrices(0, PricesPerMarket, HighestPrice); + _marketCache.AddOrUpdate(marketOriginal); + _marketCache.AddOrUpdate(marketFlipFlop); + + // when + marketFlipFlop.UpdateAllPrices(LowestPrice); + + // then + _marketCacheResults.Data.Count.Should().Be(2); + lowPriceResults.Data.Count.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Removes.Should().Be(0); + lowPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Refreshes.Should().Be(0); + lowPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketFlipFlop.Id)); + highPriceResults.Data.Count.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Removes.Should().Be(0); + highPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket * 2); + highPriceResults.Summary.Overall.Refreshes.Should().Be(0); + highPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketOriginal.Id)); + } + + [Fact] + public void ComparerOnlyUpdatesVisibleValuesOnUpdate() + { + // having + using var highPriceResults = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.HighPriceCompare).AsAggregator(); + using var lowPriceResults = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.LowPriceCompare).AsAggregator(); + var marketOriginal = new Market(0); + var marketLow = new Market(1); + marketOriginal.AddRandomPrices(Random, 0, PricesPerMarket); + marketLow.UpdatePrices(0, PricesPerMarket, LowestPrice); + _marketCache.AddOrUpdate(marketOriginal); + _marketCache.AddOrUpdate(marketLow); + + // when + marketLow.UpdateAllPrices(LowestPrice - 1); + + // then + _marketCacheResults.Data.Count.Should().Be(2); + lowPriceResults.Data.Count.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Removes.Should().Be(0); + lowPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket * 2); + lowPriceResults.Summary.Overall.Refreshes.Should().Be(0); + lowPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketLow.Id)); + highPriceResults.Data.Count.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Removes.Should().Be(0); + highPriceResults.Summary.Overall.Updates.Should().Be(0); + highPriceResults.Summary.Overall.Refreshes.Should().Be(0); + highPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketOriginal.Id)); + } + + [Fact] + public void ComparerOnlyRefreshesVisibleValues() + { + // having + using var highPriceResults = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer, MarketPrice.HighPriceCompare).AsAggregator(); + using var lowPriceResults = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer, MarketPrice.LowPriceCompare).AsAggregator(); + var marketOriginal = new Market(0); + var marketLow = new Market(1); + marketOriginal.AddRandomPrices(Random, 0, PricesPerMarket); + marketLow.UpdatePrices(0, PricesPerMarket, LowestPrice); + _marketCache.AddOrUpdate(marketOriginal); + _marketCache.AddOrUpdate(marketLow); + + // when + marketLow.RefreshAllPrices(LowestPrice - 1); + + // then + _marketCacheResults.Data.Count.Should().Be(2); + lowPriceResults.Data.Count.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Removes.Should().Be(0); + lowPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket); + lowPriceResults.Summary.Overall.Refreshes.Should().Be(PricesPerMarket); + lowPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketLow.Id)); + highPriceResults.Data.Count.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); + highPriceResults.Summary.Overall.Removes.Should().Be(0); + highPriceResults.Summary.Overall.Updates.Should().Be(0); + highPriceResults.Summary.Overall.Refreshes.Should().Be(0); + highPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketOriginal.Id)); + } + + [Fact] + public void EqualityComparerHidesUpdatesWithoutChanges() + { + // having + var market = new Market(0); + using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator(); + market.UpdatePrices(0, PricesPerMarket, LowestPrice); + _marketCache.AddOrUpdate(market); + + // when + market.UpdatePrices(0, PricesPerMarket, LowestPrice); + + // then + _marketCacheResults.Data.Count.Should().Be(1); + results.Data.Count.Should().Be(PricesPerMarket); + results.Messages.Count.Should().Be(1); + results.Summary.Overall.Adds.Should().Be(PricesPerMarket); + results.Summary.Overall.Removes.Should().Be(0); + results.Summary.Overall.Updates.Should().Be(0); + results.Summary.Overall.Refreshes.Should().Be(0); + } + + [Fact] + public void EveryItemVisibleWhenSequenceCompletes() + { + // having + _marketCache.AddOrUpdate(Enumerable.Range(0, MarketCount).Select(n => new FixedMarket(Random, n * ItemIdStride, (n * ItemIdStride) + PricesPerMarket))); + + // when + using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices).AsAggregator(); + DisposeMarkets(); + + // then + results.Data.Count.Should().Be(PricesPerMarket * MarketCount); + results.Summary.Overall.Adds.Should().Be(PricesPerMarket * MarketCount); + results.Summary.Overall.Removes.Should().Be(0); + results.Summary.Overall.Updates.Should().Be(0); + results.Summary.Overall.Refreshes.Should().Be(0); + } + + [Theory] + [InlineData(false, false)] + [InlineData(false, true)] + [InlineData(true, false)] + [InlineData(true, true)] + public void MergedObservableCompletesOnlyWhenSourceAndAllChildrenComplete(bool completeSource, bool completeChildren) + { + // having + _marketCache.AddOrUpdate(Enumerable.Range(0, MarketCount).Select(n => new FixedMarket(Random, n * ItemIdStride, (n * ItemIdStride) + PricesPerMarket, completable: completeChildren))); + var hasSourceSequenceCompleted = false; + var hasMergedSequenceCompleted = false; + + using var cleanup = _marketCache.Connect().Do(_ => { }, () => hasSourceSequenceCompleted = true) + .MergeManyChangeSets(m => m.LatestPrices).Subscribe(_ => { }, () => hasMergedSequenceCompleted = true); + + // when + if (completeSource) + { + DisposeMarkets(); + } + + // then + hasSourceSequenceCompleted.Should().Be(completeSource); + hasMergedSequenceCompleted.Should().Be(completeSource && completeChildren); + } + + [Fact] + public void MergedObservableWillFailIfSourceFails() + { + // having + var markets = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray(); + _marketCache.AddOrUpdate(markets); + var receivedError = default(Exception); + var expectedError = new Exception("Test exception"); + var throwObservable = Observable.Throw>(expectedError); + + using var cleanup = _marketCache.Connect().Concat(throwObservable) + .MergeManyChangeSets(m => m.LatestPrices).Subscribe(_ => { }, err => receivedError = err); + + // when + DisposeMarkets(); + + // then + receivedError.Should().Be(expectedError); + } + + public void Dispose() + { + _marketCacheResults.Dispose(); + DisposeMarkets(); + } + + private void DisposeMarkets() + { + _marketCache.Items.ForEach(m => (m as IDisposable)?.Dispose()); + _marketCache.Dispose(); + _marketCache.Clear(); + } + + private interface IMarket + { + public string Name { get; } + + public Guid Id { get; } + + public IObservable> LatestPrices { get; } + } + + private class Market : IMarket, IDisposable + { + private readonly ISourceCache _latestPrices = new SourceCache(p => p.ItemId); + + private Market(string name, Guid id) + { + Name = name; + Id = id; + } + + public Market(Market market) : this(market.Name, market.Id) + { + } + + public Market(int name) : this($"Market #{name}", Guid.NewGuid()) + { + } + + public string Name { get; } + + public Guid Id { get; } + + public IObservable> LatestPrices => _latestPrices.Connect(); + + public ISourceCache PricesCache => _latestPrices; + + public MarketPrice CreatePrice(int itemId, decimal price) => new (itemId, price, Id); + + public void AddRandomIdPrices(Random r, int count, int minId, int maxId) => + _latestPrices.AddOrUpdate(Enumerable.Range(0, int.MaxValue).Select(_ => r.Next(minId, maxId)).Distinct().Take(count).Select(id => CreatePrice(id, RandomPrice(r)))); + + public void AddRandomPrices(Random r, int minId, int maxId) => + _latestPrices.AddOrUpdate(Enumerable.Range(minId, (maxId - minId)).Select(id => CreatePrice(id, RandomPrice(r)))); + + public void RefreshAllPrices(decimal newPrice) => + _latestPrices.Edit(updater => updater.Items.ForEach(cp => + { + cp.Price = newPrice; + updater.Refresh(cp); + })); + + public void RefreshAllPrices(Random r) => RefreshAllPrices(RandomPrice(r)); + + public void RefreshPrice(int id, decimal newPrice) => + _latestPrices.Edit(updater => updater.Lookup(id).IfHasValue(cp => + { + cp.Price = newPrice; + updater.Refresh(cp); + })); + + public void RemoveAllPrices() => _latestPrices.Clear(); + + public void RemovePrice(int itemId) => _latestPrices.Remove(itemId); + + public void UpdateAllPrices(decimal newPrice) => + _latestPrices.Edit(updater => updater.AddOrUpdate(updater.Items.Select(cp => CreatePrice(cp.ItemId, newPrice)))); + + public void UpdatePrices(int minId, int maxId, decimal newPrice) => + _latestPrices.AddOrUpdate(Enumerable.Range(minId, (maxId - minId)).Select(id => CreatePrice(id, newPrice))); + + public void Dispose() => _latestPrices.Dispose(); + } + + private static decimal RandomPrice(Random r) => BasePrice + ((decimal)r.NextDouble() * PriceOffset); + + private class MarketPrice + { + public static IEqualityComparer EqualityComparer { get; } = new CurrentPriceEqualityComparer(); + public static IComparer HighPriceCompare { get; } = new HighestPriceComparer(); + public static IComparer LowPriceCompare { get; } = new LowestPriceComparer(); + public static IComparer LatestPriceCompare { get; } = new LatestPriceComparer(); + + private decimal _price; + + public MarketPrice(int itemId, decimal price, Guid marketId) + { + ItemId = itemId; + MarketId = marketId; + Price = price; + } + + public decimal Price + { + get => _price; + set + { + _price = value; + TimeStamp = DateTimeOffset.UtcNow; + } + } + + public DateTimeOffset TimeStamp { get; private set; } + + public Guid MarketId { get; } + + public int ItemId { get; } + + private class CurrentPriceEqualityComparer : IEqualityComparer + { + public bool Equals([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) => x.MarketId.Equals(x.MarketId) && (x.ItemId == y.ItemId) && (x.Price == y.Price); + public int GetHashCode([DisallowNull] MarketPrice obj) => throw new NotImplementedException(); + } + + private class LowestPriceComparer : IComparer + { + public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) + { + Debug.Assert(x.ItemId == y.ItemId); + return x.Price.CompareTo(y.Price); + } + } + + private class HighestPriceComparer : IComparer + { + public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) + { + Debug.Assert(x.ItemId == y.ItemId); + return y.Price.CompareTo(x.Price); + } + } + + private class LatestPriceComparer : IComparer + { + public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) + { + Debug.Assert(x.ItemId == y.ItemId); + return x.TimeStamp.CompareTo(y.TimeStamp); + } + } + } + + private class FixedMarket : IMarket + { + public FixedMarket(Random r, int minId, int maxId, bool completable = true) + { + Id = Guid.NewGuid(); + LatestPrices = Enumerable.Range(minId, maxId - minId) + .Select(id => new MarketPrice(id, RandomPrice(r), Id)) + .AsObservableChangeSet(cp => cp.ItemId, completable: completable); + } + + public IObservable> LatestPrices { get; } + + public string Name => Id.ToString("B"); + + public Guid Id { get; } + } + +} diff --git a/src/DynamicData.Tests/Cache/MergeManyWithKeyOverloadFixture.cs b/src/DynamicData.Tests/Cache/MergeManyWithKeyOverloadFixture.cs index c8c694c7d..82e4cf791 100644 --- a/src/DynamicData.Tests/Cache/MergeManyWithKeyOverloadFixture.cs +++ b/src/DynamicData.Tests/Cache/MergeManyWithKeyOverloadFixture.cs @@ -103,6 +103,93 @@ public void SingleItemFailWillNotFailMergedStream() failed.Should().BeFalse(); } + /// + /// Merged stream does not complete if a child stream is still active. + /// + [Fact] + public void MergedStreamDoesNotCompleteWhileItemStreamActive() + { + var streamCompleted = false; + var sourceCompleted = false; + + var item = new ObjectWithObservable(1); + _source.AddOrUpdate(item); + + using var stream = _source.Connect().Do(_ => { }, () => sourceCompleted = true) + .MergeMany((o, i) => o.Observable).Subscribe(_ => { }, () => streamCompleted = true); + + _source.Dispose(); + + sourceCompleted.Should().BeTrue(); + streamCompleted.Should().BeFalse(); + } + + /// + /// Stream completes only when source and all child are complete. + /// + [Fact] + public void MergedStreamCompletesWhenSourceAndItemsComplete() + { + var streamCompleted = false; + var sourceCompleted = false; + + var item = new ObjectWithObservable(1); + _source.AddOrUpdate(item); + + using var stream = _source.Connect().Do(_ => { }, () => sourceCompleted = true) + .MergeMany((o, i) => o.Observable).Subscribe(_ => { }, () => streamCompleted = true); + + _source.Dispose(); + item.CompleteObservable(); + + sourceCompleted.Should().BeTrue(); + streamCompleted.Should().BeTrue(); + } + + /// + /// Stream completes even if one of the children fails. + /// + [Fact] + public void MergedStreamCompletesIfLastItemFails() + { + var receivedError = default(Exception); + var streamCompleted = false; + var sourceCompleted = false; + + var item = new ObjectWithObservable(1); + _source.AddOrUpdate(item); + + using var stream = _source.Connect().Do(_ => { }, () => sourceCompleted = true) + .MergeMany((o, i) => o.Observable).Subscribe(_ => { }, err => receivedError = err, () => streamCompleted = true); + + _source.Dispose(); + item.FailObservable(new Exception("Test exception")); + + receivedError.Should().Be(default); + sourceCompleted.Should().BeTrue(); + streamCompleted.Should().BeTrue(); + } + + /// + /// If the source stream has an error, the merged steam should also. + /// + [Fact] + public void MergedStreamFailsWhenSourceFails() + { + var receivedError = default(Exception); + var expectedError = new Exception("Test exception"); + var throwObservable = Observable.Throw>(expectedError); + var stream = _source.Connect().Concat(throwObservable) + .MergeMany((o, i) => o.Observable).Subscribe(_ => { }, err => receivedError = err); + + var item = new ObjectWithObservable(1); + _source.AddOrUpdate(item); + + _source.Dispose(); + + receivedError.Should().Be(expectedError); + } + private class ObjectWithObservable { private readonly ISubject _changed = new Subject(); diff --git a/src/DynamicData/Cache/Internal/MergeMany.cs b/src/DynamicData/Cache/Internal/MergeMany.cs index 3cd0846f0..bcdc17076 100644 --- a/src/DynamicData/Cache/Internal/MergeMany.cs +++ b/src/DynamicData/Cache/Internal/MergeMany.cs @@ -2,7 +2,10 @@ // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. +using System.Reactive; +using System.Reactive.Disposables; using System.Reactive.Linq; +using System.Reactive.Subjects; namespace DynamicData.Cache.Internal; @@ -36,8 +39,43 @@ public IObservable Run() return Observable.Create( observer => { + var counter = new SubscriptionCounter(); var locker = new object(); - return _source.SubscribeMany((t, key) => _observableSelector(t, key).Synchronize(locker).Subscribe(observer.OnNext, _ => { }, () => { })).Subscribe(_ => { }, observer.OnError); + var disposable = _source.Concat(counter.DeferCleanup) + .SubscribeMany((t, key) => + { + counter.Added(); + return _observableSelector(t, key).Synchronize(locker).Finally(() => counter.Finally()).Subscribe(observer.OnNext, _ => { }, () => { }); + }) + .Subscribe(_ => { }, observer.OnError, observer.OnCompleted); + + return new CompositeDisposable(disposable, counter); }); } + + private sealed class SubscriptionCounter : IDisposable + { + private readonly Subject> _subject = new(); + private int _subscriptionCount = 1; + + public IObservable> DeferCleanup => Observable.Defer(() => + { + CheckCompleted(); + return _subject.AsObservable(); + }); + + public void Added() => _ = Interlocked.Increment(ref _subscriptionCount); + + public void Finally() => CheckCompleted(); + + public void Dispose() => _subject.Dispose(); + + private void CheckCompleted() + { + if (Interlocked.Decrement(ref _subscriptionCount) == 0) + { + _subject.OnCompleted(); + } + } + } } diff --git a/src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs b/src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs new file mode 100644 index 000000000..d94213417 --- /dev/null +++ b/src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs @@ -0,0 +1,305 @@ +// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.Reactive.Disposables; +using System.Reactive.Linq; +using DynamicData.Kernel; + +namespace DynamicData.Cache.Internal; + +/// +/// Operator that is similiar to MergeMany but intelligently handles Cache ChangeSets. +/// +internal sealed class MergeManyCacheChangeSets + where TObject : notnull + where TKey : notnull + where TDestination : notnull + where TDestinationKey : notnull +{ + private readonly IObservable> _source; + + private readonly Func>> _changeSetSelector; + + private readonly IComparer? _comparer; + + private readonly IEqualityComparer? _equalityComparer; + + public MergeManyCacheChangeSets(IObservable> source, Func>> selector, IEqualityComparer? equalityComparer, IComparer? comparer) + { + _source = source; + _changeSetSelector = selector; + _comparer = comparer; + _equalityComparer = equalityComparer; + } + + public IObservable> Run() + { + return Observable.Create>( + observer => + { + var locker = new object(); + + // Transform to an observable cache of merge containers. + var sourceCacheOfCaches = _source + .IgnoreSameReferenceUpdate() + .WhereReasonsAre(ChangeReason.Add, ChangeReason.Remove, ChangeReason.Update) + .Synchronize(locker) + .Transform((obj, key) => new MergeContainer(_changeSetSelector(obj, key))) + .AsObservableCache(); + + var shared = sourceCacheOfCaches.Connect().Publish(); + + // this is manages all of the changes + var changeTracker = new ChangeTracker(sourceCacheOfCaches, _comparer, _equalityComparer); + + // merge the items back together + var allChanges = shared.MergeMany(mc => mc.Source) + .Synchronize(locker) + .Subscribe( + changes => changeTracker.ProcessChangeSet(changes, observer), + observer.OnError, + observer.OnCompleted); + + // when a source item is removed, all of its sub-items need to be removed + var removedItems = shared + .OnItemRemoved(mc => changeTracker.RemoveItems(mc.Cache.KeyValues, observer)) + .OnItemUpdated((_, prev) => changeTracker.RemoveItems(prev.Cache.KeyValues, observer)) + .Subscribe(); + + return new CompositeDisposable(sourceCacheOfCaches, allChanges, removedItems, shared.Connect()); + }); + } + + private class ChangeTracker + { + private readonly ChangeAwareCache _resultCache; + private readonly IObservableCache _sourceCache; + private readonly IComparer? _comparer; + private readonly IEqualityComparer? _equalityComparer; + + public ChangeTracker(IObservableCache sourceCache, IComparer? comparer, IEqualityComparer? equalityComparer) + { + _resultCache = new ChangeAwareCache(); + _sourceCache = sourceCache; + _comparer = comparer; + _equalityComparer = equalityComparer; + } + + public void RemoveItems(IEnumerable> items, IObserver> observer) + { + var sourceCaches = _sourceCache.Items.ToArray(); + + // Update the Published Value for each item being removed + if (items is IList> list) + { + // zero allocation enumerator + foreach (var item in EnumerableIList.Create(list)) + { + OnItemRemoved(sourceCaches, item.Value, item.Key); + } + } + else + { + foreach (var item in items) + { + OnItemRemoved(sourceCaches, item.Value, item.Key); + } + } + + EmitChanges(observer); + } + + public void ProcessChangeSet(IChangeSet changes, IObserver> observer) + { + var sourceCaches = _sourceCache.Items.ToArray(); + + foreach (var change in changes.ToConcreteType()) + { + switch (change.Reason) + { + case ChangeReason.Add: + OnItemAdded(change.Current, change.Key); + break; + + case ChangeReason.Remove: + OnItemRemoved(sourceCaches, change.Current, change.Key); + break; + + case ChangeReason.Update: + OnItemUpdated(sourceCaches, change.Current, change.Key, change.Previous); + break; + + case ChangeReason.Refresh: + OnItemRefreshed(sourceCaches, change.Current, change.Key); + break; + } + } + + EmitChanges(observer); + } + + private void EmitChanges(IObserver> observer) + { + var changeSet = _resultCache.CaptureChanges(); + if (changeSet.Count != 0) + { + observer.OnNext(changeSet); + } + } + + private void OnItemAdded(TDestination item, TDestinationKey key) + { + var cached = _resultCache.Lookup(key); + + // If no current value, then add it + if (!cached.HasValue) + { + _resultCache.Add(item, key); + } + else if (ShouldReplace(item, cached.Value)) + { + _resultCache.AddOrUpdate(item, key); + } + } + + private void OnItemRemoved(MergeContainer[] sourceCaches, TDestination item, TDestinationKey key) + { + var cached = _resultCache.Lookup(key); + + // If this key has been observed and the current value is being removed + if (cached.HasValue && CheckEquality(item, cached.Value)) + { + // Perform a full update to select the new downstream value (or remove it) + UpdateToBestValue(sourceCaches, key, cached); + } + } + + private void OnItemUpdated(MergeContainer[] sources, TDestination item, TDestinationKey key, Optional prev) + { + var cached = _resultCache.Lookup(key); + + // Received an update change for a key that hasn't been seen yet + // So use the updated value + if (!cached.HasValue) + { + _resultCache.Add(item, key); + return; + } + + if (_comparer is null) + { + // If the current value (or there is no way to tell) is being replaced by a different value + if ((!prev.HasValue || CheckEquality(prev.Value, cached.Value)) && !CheckEquality(item, cached.Value)) + { + // Update to the new value + _resultCache.AddOrUpdate(item, key); + } + } + else + { + // The current value is being replaced (or there is no way to tell), so do a full update to select the best one from all the choices + if (!prev.HasValue || CheckEquality(prev.Value, cached.Value)) + { + UpdateToBestValue(sources, key, cached); + } + else + { + // If the current value isn't being replaced, check to see if the replacement value is better than the current one + if (ShouldReplace(item, cached.Value)) + { + _resultCache.AddOrUpdate(item, key); + } + } + } + } + + private void OnItemRefreshed(MergeContainer[] sources, TDestination item, TDestinationKey key) + { + var cached = _resultCache.Lookup(key); + + // Received a refresh change for a key that hasn't been seen yet + // Nothing can be done, so ignore it + if (!cached.HasValue) + { + return; + } + + // In the sorting case, a refresh requires doing a full update because any change could alter what the best value is + // If we don't care about sorting OR if we do care, but re-selecting the best value didn't change anything + // AND the current value is the one being refreshed + if (((_comparer is null) || !UpdateToBestValue(sources, key, cached)) && CheckEquality(cached.Value, item)) + { + // Emit the refresh downstream + _resultCache.Refresh(key); + } + } + + private bool UpdateToBestValue(MergeContainer[] sources, TDestinationKey key, Optional current) + { + // Determine which value should be the one seen downstream + var candidate = SelectValue(sources, key); + if (candidate.HasValue) + { + // If there isn't a current value + if (!current.HasValue) + { + _resultCache.Add(candidate.Value, key); + return true; + } + + // If the candidate value isn't the same as the current value + if (!CheckEquality(current.Value, candidate.Value)) + { + _resultCache.AddOrUpdate(candidate.Value, key); + return true; + } + + // The value seen downstream is the one that should be + return false; + } + + // No best candidate available + _resultCache.Remove(key); + return true; + } + + private Optional SelectValue(MergeContainer[] sources, TDestinationKey key) + { + if (sources.Length == 0) + { + return Optional.None(); + } + + var values = sources.Select(s => s.Cache.Lookup(key)).Where(opt => opt.HasValue); + + if (_comparer is not null) + { + values = values.OrderBy(opt => opt.Value, _comparer); + } + + return values.FirstOrDefault(); + } + + private bool CheckEquality(TDestination left, TDestination right) => + ReferenceEquals(left, right) || (_equalityComparer?.Equals(left, right) ?? (_comparer?.Compare(left, right) == 0)); + + // Return true if candidate should replace current as the observed downstream value + private bool ShouldReplace(TDestination candidate, TDestination current) => + !ReferenceEquals(candidate, current) && (_comparer?.Compare(candidate, current) < 0); + } + + private class MergeContainer + { + public MergeContainer(IObservable> source) + { + Source = source.IgnoreSameReferenceUpdate().Do(Clone); + } + + public Cache Cache { get; } = new(); + + public IObservable> Source { get; } + + private void Clone(IChangeSet changes) => Cache.Clone(changes); + } +} diff --git a/src/DynamicData/Cache/ObservableCacheEx.cs b/src/DynamicData/Cache/ObservableCacheEx.cs index 16ed90801..3e4197283 100644 --- a/src/DynamicData/Cache/ObservableCacheEx.cs +++ b/src/DynamicData/Cache/ObservableCacheEx.cs @@ -2,6 +2,8 @@ // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. +using System.Collections; +using System.Collections.Generic; using System.Collections.ObjectModel; using System.ComponentModel; using System.Diagnostics.CodeAnalysis; @@ -2841,6 +2843,104 @@ public static IObservable MergeMany(t return new MergeMany(source, observableSelector).Run(); } + /// + /// Operator similiar to MergeMany except it is ChangeSet aware. It uses to transform each item in the source into a child and merges the result children together into a single stream of ChangeSets that correctly handles multiple Keys and removal of the parent items. + /// + /// The type of the object. + /// The type of the key. + /// The type of the destination. + /// The type of the destination key. + /// The Source Observable ChangeSet. + /// Factory Function used to create child changesets. + /// instance to determine which element to emit if the same key is emitted from multiple child changesets. + /// The result from merging the child changesets together. + /// Parameter was null. + public static IObservable> MergeManyChangeSets(this IObservable> source, Func>> observableSelector, IComparer comparer) + where TObject : notnull + where TKey : notnull + where TDestination : notnull + where TDestinationKey : notnull + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (observableSelector == null) throw new ArgumentNullException(nameof(observableSelector)); + + return source.MergeManyChangeSets((t, _) => observableSelector(t), comparer); + } + + /// + /// Operator similiar to MergeMany except it is ChangeSet aware. It uses to transform each item in the source into a child and merges the result children together into a single stream of ChangeSets that correctly handles multiple Keys and removal of the parent items. + /// + /// The type of the object. + /// The type of the key. + /// The type of the destination. + /// The type of the destination key. + /// The Source Observable ChangeSet. + /// Factory Function used to create child changesets. + /// instance to determine which element to emit if the same key is emitted from multiple child changesets. + /// The result from merging the child changesets together. + /// Parameter was null. + public static IObservable> MergeManyChangeSets(this IObservable> source, Func>> observableSelector, IComparer comparer) + where TObject : notnull + where TKey : notnull + where TDestination : notnull + where TDestinationKey : notnull + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (observableSelector == null) throw new ArgumentNullException(nameof(observableSelector)); + if (comparer == null) throw new ArgumentNullException(nameof(comparer)); + + return source.MergeManyChangeSets(observableSelector, equalityComparer: null, comparer: comparer); + } + + /// + /// Operator similiar to MergeMany except it is ChangeSet aware. It uses to transform each item in the source into a child and merges the result children together into a single stream of ChangeSets that correctly handles multiple Keys and removal of the parent items. + /// + /// The type of the object. + /// The type of the key. + /// The type of the destination. + /// The type of the destination key. + /// The Source Observable ChangeSet. + /// Factory Function used to create child changesets. + /// Optional instance to determine if two elements are the same. + /// Optional instance to determine which element to emit if the same key is emitted from multiple child changesets. + /// The result from merging the child changesets together. + /// Parameter was null. + public static IObservable> MergeManyChangeSets(this IObservable> source, Func>> observableSelector, IEqualityComparer? equalityComparer = null, IComparer? comparer = null) + where TObject : notnull + where TKey : notnull + where TDestination : notnull + where TDestinationKey : notnull + { + if (observableSelector == null) throw new ArgumentNullException(nameof(observableSelector)); + + return source.MergeManyChangeSets((t, _) => observableSelector(t), equalityComparer, comparer); + } + + /// + /// Operator similiar to MergeMany except it is ChangeSet aware. It uses to transform each item in the source into a child and merges the result children together into a single stream of ChangeSets that correctly handles multiple Keys and removal of the parent items. + /// + /// The type of the object. + /// The type of the key. + /// The type of the destination. + /// The type of the destination key. + /// The Source Observable ChangeSet. + /// Factory Function used to create child changesets. + /// Optional instance to determine if two elements are the same. + /// Optional instance to determine which element to emit if the same key is emitted from multiple child changesets. + /// The result from merging the child changesets together. + /// Parameter was null. + public static IObservable> MergeManyChangeSets(this IObservable> source, Func>> observableSelector, IEqualityComparer? equalityComparer = null, IComparer? comparer = null) + where TObject : notnull + where TKey : notnull + where TDestination : notnull + where TDestinationKey : notnull + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (observableSelector == null) throw new ArgumentNullException(nameof(observableSelector)); + + return new MergeManyCacheChangeSets(source, observableSelector, equalityComparer, comparer).Run(); + } + /// /// Dynamically merges the observable which is selected from each item in the stream, and un-merges the item /// when it is no longer part of the stream.