From 4ffeec06a7a99bc581118307297e2689fe361132 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Thu, 21 Dec 2023 14:51:55 -0800 Subject: [PATCH] Feature: Multithreaded Stress Test for List-to-Cache MergeManyChangeSets (#806) * Fix operator and added stress test to prevent regressions * Fix test issue * Improvements to try and fix the stress test for List-List * Code synchronization --- .../Cache/MergeManyChangeSetsCacheFixture.cs | 21 ++-- .../Cache/MergeManyChangeSetsListFixture.cs | 2 - .../List/MergeManyChangeSetsCacheFixture.cs | 117 +++++++++++++++++- .../Internal/MergeManyCacheChangeSets.cs | 1 + .../MergeManyCacheChangeSetsSourceCompare.cs | 4 +- .../List/Internal/MergeManyCacheChangeSets.cs | 28 +++-- .../List/Internal/MergeManyListChangeSets.cs | 25 ++-- 7 files changed, 155 insertions(+), 43 deletions(-) diff --git a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs index 896656840..ccc897f65 100644 --- a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs +++ b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs @@ -39,9 +39,16 @@ public sealed class MergeManyChangeSetsCacheFixture : IDisposable private readonly ChangeSetAggregator _marketCacheResults; - private readonly Randomizer _randomizer = new (0x21123737); + private readonly Faker _marketFaker; - public MergeManyChangeSetsCacheFixture() => _marketCacheResults = _marketCache.Connect().AsAggregator(); + private readonly Randomizer _randomizer; + + public MergeManyChangeSetsCacheFixture() + { + _randomizer = new(0x21123737); + _marketFaker = Fakers.Market.WithSeed(_randomizer); + _marketCacheResults = _marketCache.Connect().AsAggregator(); + } [Theory] [InlineData(5, 7)] @@ -77,17 +84,13 @@ IObservable AddRemoveStress(int marketCount, int priceCount, int parallel, }); IObservable AddRemoveMarkets(int ownerCount, int parallel, IScheduler scheduler) => - _randomizer.Interval(MaxAddTime, scheduler).Select(_ => new Market(_randomizer.Utf16String(5, 10, true))) - //.Parallelize(ownerCount, parallel, obs => obs.StressAddRemove(_marketCache, _ => GetRemoveTime(), scheduler)) - .Take(ownerCount) - .StressAddRemove(_marketCache, _ => GetRemoveTime(), scheduler) + _marketFaker.IntervalGenerate(MaxAddTime, scheduler) + .Parallelize(ownerCount, parallel, obs => obs.StressAddRemove(_marketCache, _ => GetRemoveTime(), scheduler)) .Finally(_marketCache.Dispose); IObservable AddRemovePrices(Market market, int priceCount, int parallel, IScheduler scheduler) => _randomizer.Interval(MaxAddTime, scheduler).Select(_ => market.CreateUniquePrice(_ => GetRandomPrice())) - //.Parallelize(animalCount, parallel, obs => obs.StressAddRemove(owner.Animals, _ => GetRemoveTime(), scheduler)) - .Take(priceCount) - .StressAddRemove(market.PricesCache, _ => GetRemoveTime(), scheduler) + .Parallelize(priceCount, parallel, obs => obs.StressAddRemove(market.PricesCache, _ => GetRemoveTime(), scheduler)) .Finally(market.PricesCache.Dispose); var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices); diff --git a/src/DynamicData.Tests/Cache/MergeManyChangeSetsListFixture.cs b/src/DynamicData.Tests/Cache/MergeManyChangeSetsListFixture.cs index 1c1506e2f..d5fe81e3d 100644 --- a/src/DynamicData.Tests/Cache/MergeManyChangeSetsListFixture.cs +++ b/src/DynamicData.Tests/Cache/MergeManyChangeSetsListFixture.cs @@ -29,8 +29,6 @@ public sealed class MergeManyChangeSetsListFixture : IDisposable const int AddRangeSize = 53; const int RemoveRangeSize = 37; #endif - private static readonly TimeSpan s_MaxAddTime = TimeSpan.FromSeconds(0.250); - private static readonly TimeSpan s_MaxRemoveTime = TimeSpan.FromSeconds(0.100); private readonly ISourceCache _animalOwners = new SourceCache(o => o.Id); private readonly ChangeSetAggregator _animalOwnerResults; diff --git a/src/DynamicData.Tests/List/MergeManyChangeSetsCacheFixture.cs b/src/DynamicData.Tests/List/MergeManyChangeSetsCacheFixture.cs index 56e04ef5f..1cf96a1d4 100644 --- a/src/DynamicData.Tests/List/MergeManyChangeSetsCacheFixture.cs +++ b/src/DynamicData.Tests/List/MergeManyChangeSetsCacheFixture.cs @@ -1,7 +1,12 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Reactive.Concurrency; +using System.Reactive.Disposables; +using System.Reactive; using System.Reactive.Linq; +using System.Threading.Tasks; +using Bogus; using DynamicData.Kernel; using DynamicData.Tests.Domain; using DynamicData.Tests.Utilities; @@ -28,15 +33,97 @@ public sealed class MergeManyChangeSetsCacheFixture : IDisposable const decimal HighestPrice = BasePrice + PriceOffset + 1.0m; const decimal LowestPrice = BasePrice - 1.0m; - private static readonly Random Random = new (0x03251976); - - private static decimal GetRandomPrice() => MarketPrice.RandomPrice(Random, BasePrice, PriceOffset); - private readonly ISourceList _marketList = new SourceList(); private readonly ChangeSetAggregator _marketListResults; - public MergeManyChangeSetsCacheFixture() => _marketListResults = _marketList.Connect().AsAggregator(); + private readonly Faker _marketFaker; + + private readonly Randomizer _randomizer; + + public MergeManyChangeSetsCacheFixture() + { + _randomizer = new(0x03251976); + _marketFaker = Fakers.Market.WithSeed(_randomizer); + _marketListResults = _marketList.Connect().AsAggregator(); + } + + + [Theory] + [InlineData(5, 7)] + [InlineData(10, 50)] + [InlineData(5, 100)] + [InlineData(200, 500)] + [InlineData(100, 5)] + public async Task MultiThreadedStressTest(int marketCount, int priceCount) + { + var MaxAddTime = TimeSpan.FromSeconds(0.250); + var MaxRemoveTime = TimeSpan.FromSeconds(0.100); + + TimeSpan? GetRemoveTime() => _randomizer.Bool() ? _randomizer.TimeSpan(MaxRemoveTime) : null; + + IObservable AddRemoveStress(int marketCount, int priceCount, int parallel, IScheduler scheduler) => + Observable.Create(observer => new CompositeDisposable + { + AddRemoveMarkets(marketCount, parallel, scheduler) + .Subscribe( + onNext: _ => { }, + onError: ex => observer.OnError(ex)), + + _marketList.Connect() + .MergeMany(market => AddRemovePrices((Market)market, priceCount, parallel, scheduler)) + .Subscribe( + onNext: _ => { }, + onError: ex => observer.OnError(ex), + onCompleted: () => + { + observer.OnNext(Unit.Default); + observer.OnCompleted(); + }) + }); + + IObservable AddRemoveMarkets(int ownerCount, int parallel, IScheduler scheduler) => + _marketFaker.IntervalGenerate(MaxAddTime, scheduler) + .Parallelize(ownerCount, parallel, obs => obs.StressAddRemove(_marketList, _ => GetRemoveTime(), scheduler)) + .Finally(_marketList.Dispose); + + IObservable AddRemovePrices(Market market, int priceCount, int parallel, IScheduler scheduler) => + _randomizer.Interval(MaxAddTime, scheduler).Select(_ => market.CreateUniquePrice(_ => GetRandomPrice())) + .Parallelize(priceCount, parallel, obs => obs.StressAddRemove(market.PricesCache, _ => GetRemoveTime(), scheduler)) + .Finally(market.PricesCache.Dispose); + + var merged = _marketList.Connect().MergeManyChangeSets(market => market.LatestPrices); + using var priceResults = merged.AsAggregator(); + + var adding = true; + + // Start asynchrononously modifying the parent list and the child lists + using var addingSub = AddRemoveStress(marketCount, priceCount, Environment.ProcessorCount, TaskPoolScheduler.Default) + .Finally(() => adding = false) + .Subscribe(); + + // Subscribe / unsubscribe over and over while the collections are being modified + do + { + // Ensure items are being added asynchronously before subscribing to changes + await Task.Yield(); + + { + // Subscribe + var mergedSub = merged.Subscribe(); + + // Let other threads run + await Task.Yield(); + + // Unsubscribe + mergedSub.Dispose(); + } + } + while (adding); + + // Verify the results + CheckResultContents(_marketListResults, priceResults); + } [Fact] public void NullChecks() @@ -449,7 +536,7 @@ public void ComparerOnlyAddsBetterValuesOnSourceReplace() lowPriceResults.Data.Count.Should().Be(PricesPerMarket); lowPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); lowPriceResults.Summary.Overall.Removes.Should().Be(0); - lowPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket * 3); + lowPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket * 2); lowPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketLowLow.Id)); highPriceResults.Data.Count.Should().Be(PricesPerMarket); highPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket); @@ -739,10 +826,28 @@ public void Dispose() DisposeMarkets(); } + private void AddUniquePrices(Market[] markets) => markets.ForEach(m => m.AddUniquePrices(PricesPerMarket, _ => GetRandomPrice())); + + private void CheckResultContents(ChangeSetAggregator marketResults, ChangeSetAggregator priceResults) + { + var expectedMarkets = _marketList.Items.ToList(); + var expectedPrices = expectedMarkets.SelectMany(market => ((Market)market).PricesCache.Items).ToList(); + + // These should be subsets of each other + expectedMarkets.Should().BeSubsetOf(marketResults.Data.Items); + marketResults.Data.Items.Count().Should().Be(expectedMarkets.Count); + + // These should be subsets of each other + expectedPrices.Should().BeSubsetOf(priceResults.Data.Items); + priceResults.Data.Items.Count().Should().Be(expectedPrices.Count); + } + private void DisposeMarkets() { _marketList.Items.ForEach(m => (m as IDisposable)?.Dispose()); _marketList.Dispose(); _marketList.Clear(); } + + private decimal GetRandomPrice() => MarketPrice.RandomPrice(_randomizer, BasePrice, PriceOffset); } diff --git a/src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs b/src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs index 957b2962e..2aba0cab3 100644 --- a/src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs +++ b/src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs @@ -29,6 +29,7 @@ public IObservable> Run() => Observabl // This is manages all of the changes var changeTracker = new ChangeSetMergeTracker(() => sourceCacheOfCaches.Items, comparer, equalityComparer); + // Share a connection to the source cache var shared = sourceCacheOfCaches.Connect().Publish(); // Merge the child changeset changes together and apply to the tracker diff --git a/src/DynamicData/Cache/Internal/MergeManyCacheChangeSetsSourceCompare.cs b/src/DynamicData/Cache/Internal/MergeManyCacheChangeSetsSourceCompare.cs index f71552727..5d09bf087 100644 --- a/src/DynamicData/Cache/Internal/MergeManyCacheChangeSetsSourceCompare.cs +++ b/src/DynamicData/Cache/Internal/MergeManyCacheChangeSetsSourceCompare.cs @@ -34,7 +34,7 @@ public IObservable> Run() => Observabl .AsObservableCache(); // Share a single connection to the cache - var shared = sourceCacheOfCaches.Connect().Publish(); + var shared = sourceCacheOfCaches.Connect().Synchronize(locker).Publish(); // This is manages all of the changes var changeTracker = new ChangeSetMergeTracker(() => sourceCacheOfCaches.Items, _comparer, _equalityComparer); @@ -48,7 +48,6 @@ public IObservable> Run() => Observabl // When a source item is removed, all of its sub-items need to be removed var removedItems = shared - .Synchronize(locker) .OnItemRemoved(mc => changeTracker.RemoveItems(mc.Cache.KeyValues, observer)) .OnItemUpdated((_, prev) => changeTracker.RemoveItems(prev.Cache.KeyValues, observer)) .Subscribe(); @@ -57,7 +56,6 @@ public IObservable> Run() => Observabl // Because the comparison is based on the parent, which has just been refreshed. var refreshItems = reevalOnRefresh ? shared - .Synchronize(locker) .OnItemRefreshed(mc => changeTracker.RefreshItems(mc.Cache.Keys, observer)) .Subscribe() : Disposable.Empty; diff --git a/src/DynamicData/List/Internal/MergeManyCacheChangeSets.cs b/src/DynamicData/List/Internal/MergeManyCacheChangeSets.cs index 482885f43..fa1f28e0e 100644 --- a/src/DynamicData/List/Internal/MergeManyCacheChangeSets.cs +++ b/src/DynamicData/List/Internal/MergeManyCacheChangeSets.cs @@ -24,28 +24,30 @@ public IObservable> Run() => // Transform to an observable list of merge containers. var sourceListOfCaches = source - .Transform(obj => new ChangeSetCache(changeSetSelector(obj))) - .Synchronize(locker) - .AsObservableList(); - - var shared = sourceListOfCaches.Connect().Publish(); + .Transform(obj => new ChangeSetCache(changeSetSelector(obj).Synchronize(locker))) + .AsObservableList(); // This is manages all of the changes var changeTracker = new ChangeSetMergeTracker(() => sourceListOfCaches.Items.ToArray(), comparer, equalityComparer); + // Share a connection to the source list + var shared = sourceListOfCaches.Connect().Publish(); + + // Merge the child changeset changes together and apply to the tracker + var allChanges = shared + .Synchronize(locker) + .MergeMany(mc => mc.Source) + .Subscribe( + changes => changeTracker.ProcessChangeSet(changes, observer), + observer.OnError, + observer.OnCompleted); + // When a source item is removed, all of its sub-items need to be removed var removedItems = shared + .Synchronize(locker) .OnItemRemoved(mc => changeTracker.RemoveItems(mc.Cache.KeyValues, observer)) .Subscribe(); - // Merge the items back together - var allChanges = shared.MergeMany(mc => mc.Source) - .Synchronize(locker) - .Subscribe( - changes => changeTracker.ProcessChangeSet(changes, observer), - observer.OnError, - observer.OnCompleted); - return new CompositeDisposable(sourceListOfCaches, allChanges, removedItems, shared.Connect()); }); } diff --git a/src/DynamicData/List/Internal/MergeManyListChangeSets.cs b/src/DynamicData/List/Internal/MergeManyListChangeSets.cs index 2cb1dd8bd..5625358f5 100644 --- a/src/DynamicData/List/Internal/MergeManyListChangeSets.cs +++ b/src/DynamicData/List/Internal/MergeManyListChangeSets.cs @@ -20,20 +20,25 @@ public IObservable> Run() => { var locker = new object(); + // Transform to a changeset of Cloned Child Lists and then Share + var sourceListOfLists = source + .Transform(obj => new ClonedListChangeSet(selector(obj).Synchronize(locker), equalityComparer)) + .AsObservableList(); + // This is manages all of the changes var changeTracker = new ChangeSetMergeTracker(); - // Transform to a changeset of Cloned Child Lists and then Share - var shared = source - .Transform(obj => new ClonedListChangeSet(selector(obj).Synchronize(locker), equalityComparer)) - .Publish(); + // Share a connection to the source cache + var shared = sourceListOfLists.Connect().Publish(); // Merge the items back together - var allChanges = shared.MergeMany(clonedList => clonedList.Source.RemoveIndex()) - .Subscribe( - changes => changeTracker.ProcessChangeSet(changes, observer), - observer.OnError, - observer.OnCompleted); + var allChanges = shared + .Synchronize(locker) + .MergeMany(clonedList => clonedList.Source.RemoveIndex()) + .Subscribe( + changes => changeTracker.ProcessChangeSet(changes, observer), + observer.OnError, + observer.OnCompleted); // When a source item is removed, all of its sub-items need to be removed var removedItems = shared @@ -41,6 +46,6 @@ public IObservable> Run() => .OnItemRemoved(mc => changeTracker.RemoveItems(mc.List, observer), invokeOnUnsubscribe: false) .Subscribe(); - return new CompositeDisposable(allChanges, removedItems, shared.Connect()); + return new CompositeDisposable(sourceListOfLists, allChanges, removedItems, shared.Connect()); }); }