From 57483187656e4241c9aae7b15ec2996cbc99c19d Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Tue, 19 Dec 2023 13:57:59 -0800 Subject: [PATCH] Feature: Stress Tests for Cache-to-Cache MergeManyChangeSets (#802) * Stress Tests for the Cache-Cache Version * Test tuning * Improve Stress for Cache-Cache version * Cache-to-Cache Improvements --- .../Cache/MergeManyChangeSetsCacheFixture.cs | 120 ++++++++++++++++-- src/DynamicData.Tests/Domain/Fakers.cs | 2 + src/DynamicData.Tests/Domain/Market.cs | 21 +++ src/DynamicData.Tests/Domain/MarketPrice.cs | 19 ++- .../Internal/MergeManyCacheChangeSets.cs | 21 ++- 5 files changed, 158 insertions(+), 25 deletions(-) diff --git a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs index ab96b0688..896656840 100644 --- a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs +++ b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs @@ -1,7 +1,13 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Reactive; +using System.Reactive.Concurrency; +using System.Reactive.Disposables; using System.Reactive.Linq; +using System.Threading; +using System.Threading.Tasks; +using Bogus; using DynamicData.Kernel; using DynamicData.Tests.Domain; using DynamicData.Tests.Utilities; @@ -29,16 +35,94 @@ public sealed class MergeManyChangeSetsCacheFixture : IDisposable const decimal HighestPrice = BasePrice + PriceOffset + 1.0m; const decimal LowestPrice = BasePrice - 1.0m; - private static readonly Random Random = new (0x21123737); - - private static decimal GetRandomPrice() => MarketPrice.RandomPrice(Random, BasePrice, PriceOffset); - private readonly ISourceCache _marketCache = new SourceCache(p => p.Id); private readonly ChangeSetAggregator _marketCacheResults; + private readonly Randomizer _randomizer = new (0x21123737); + public MergeManyChangeSetsCacheFixture() => _marketCacheResults = _marketCache.Connect().AsAggregator(); + [Theory] + [InlineData(5, 7)] + [InlineData(10, 50)] + [InlineData(5, 100)] + [InlineData(200, 500)] + [InlineData(100, 5)] + public async Task MultiThreadedStressTest(int marketCount, int priceCount) + { + var MaxAddTime = TimeSpan.FromSeconds(0.250); + var MaxRemoveTime = TimeSpan.FromSeconds(0.100); + + TimeSpan? GetRemoveTime() => _randomizer.Bool() ? _randomizer.TimeSpan(MaxRemoveTime) : null; + + IObservable AddRemoveStress(int marketCount, int priceCount, int parallel, IScheduler scheduler) => + Observable.Create(observer => new CompositeDisposable + { + AddRemoveMarkets(marketCount, parallel, scheduler) + .Subscribe( + onNext: _ => { }, + onError: ex => observer.OnError(ex)), + + _marketCache.Connect() + .MergeMany(market => AddRemovePrices((Market)market, priceCount, parallel, scheduler)) + .Subscribe( + onNext: _ => { }, + onError: ex => observer.OnError(ex), + onCompleted: () => + { + observer.OnNext(Unit.Default); + observer.OnCompleted(); + }) + }); + + IObservable AddRemoveMarkets(int ownerCount, int parallel, IScheduler scheduler) => + _randomizer.Interval(MaxAddTime, scheduler).Select(_ => new Market(_randomizer.Utf16String(5, 10, true))) + //.Parallelize(ownerCount, parallel, obs => obs.StressAddRemove(_marketCache, _ => GetRemoveTime(), scheduler)) + .Take(ownerCount) + .StressAddRemove(_marketCache, _ => GetRemoveTime(), scheduler) + .Finally(_marketCache.Dispose); + + IObservable AddRemovePrices(Market market, int priceCount, int parallel, IScheduler scheduler) => + _randomizer.Interval(MaxAddTime, scheduler).Select(_ => market.CreateUniquePrice(_ => GetRandomPrice())) + //.Parallelize(animalCount, parallel, obs => obs.StressAddRemove(owner.Animals, _ => GetRemoveTime(), scheduler)) + .Take(priceCount) + .StressAddRemove(market.PricesCache, _ => GetRemoveTime(), scheduler) + .Finally(market.PricesCache.Dispose); + + var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices); + using var priceResults = merged.AsAggregator(); + + var adding = true; + + // Start asynchrononously modifying the parent list and the child lists + using var addingSub = AddRemoveStress(marketCount, priceCount, Environment.ProcessorCount, TaskPoolScheduler.Default) + .Finally(() => adding = false) + .Subscribe(); + + // Subscribe / unsubscribe over and over while the collections are being modified + do + { + // Ensure items are being added asynchronously before subscribing to changes + await Task.Yield(); + + { + // Subscribe + var mergedSub = merged.Subscribe(); + + // Let other threads run + await Task.Yield(); + + // Unsubscribe + mergedSub.Dispose(); + } + } + while (adding); + + // Verify the results + CheckResultContents(_marketCacheResults, priceResults); + } + [Fact] public void NullChecks() { @@ -136,7 +220,7 @@ public void AllExistingSubItemsPresentInResult() // having var markets = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray(); using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator(); - markets.Select((m, index) => new { Market = m, Index = index }).ForEach(m => m.Market.SetPrices(m.Index * ItemIdStride, (m.Index * ItemIdStride) + PricesPerMarket, GetRandomPrice)); + AddUniquePrices(markets); // when _marketCache.AddOrUpdate(markets); @@ -160,7 +244,7 @@ public void AllNewSubItemsPresentInResult() _marketCache.AddOrUpdate(markets); // when - markets.Select((m, index) => new { Market = m, Index = index }).ForEach(m => m.Market.SetPrices(m.Index * ItemIdStride, (m.Index * ItemIdStride) + PricesPerMarket, GetRandomPrice)); + AddUniquePrices(markets); // then _marketCacheResults.Data.Count.Should().Be(MarketCount); @@ -179,7 +263,7 @@ public void AllRefreshedSubItemsAreRefreshed() var markets = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray(); using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator(); _marketCache.AddOrUpdate(markets); - markets.Select((m, index) => new { Market = m, Index = index }).ForEach(m => m.Market.SetPrices(m.Index * ItemIdStride, (m.Index * ItemIdStride) + PricesPerMarket, GetRandomPrice)); + AddUniquePrices(markets); // when markets.ForEach(m => m.RefreshAllPrices(GetRandomPrice)); @@ -285,7 +369,7 @@ public void AnyRemovedSubItemIsRemoved() var markets = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray(); using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator(); _marketCache.AddOrUpdate(markets); - markets.Select((m, index) => new { Market = m, Index = index }).ForEach(m => m.Market.SetPrices(m.Index * ItemIdStride, (m.Index * ItemIdStride) + PricesPerMarket, GetRandomPrice)); + AddUniquePrices(markets); // when markets.ForEach(m => m.PricesCache.Edit(updater => updater.RemoveKeys(updater.Keys.Take(RemoveCount)))); @@ -306,7 +390,7 @@ public void AnySourceItemRemovedRemovesAllSourceValues() var markets = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray(); using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator(); _marketCache.AddOrUpdate(markets); - markets.Select((m, index) => new { Market = m, Index = index }).ForEach(m => m.Market.SetPrices(m.Index * ItemIdStride, (m.Index * ItemIdStride) + PricesPerMarket, GetRandomPrice)); + AddUniquePrices(markets); // when _marketCache.Edit(updater => updater.RemoveKeys(updater.Keys.Take(RemoveCount))); @@ -691,10 +775,28 @@ public void Dispose() DisposeMarkets(); } + private void AddUniquePrices(Market[] markets) => markets.ForEach(m => m.AddUniquePrices(PricesPerMarket, _ => GetRandomPrice())); + + private void CheckResultContents(ChangeSetAggregator marketResults, ChangeSetAggregator priceResults) + { + var expectedMarkets = _marketCache.Items.ToList(); + var expectedPrices = expectedMarkets.SelectMany(market => ((Market)market).PricesCache.Items).ToList(); + + // These should be subsets of each other + expectedMarkets.Should().BeSubsetOf(marketResults.Data.Items); + marketResults.Data.Items.Count().Should().Be(expectedMarkets.Count); + + // These should be subsets of each other + expectedPrices.Should().BeSubsetOf(priceResults.Data.Items); + priceResults.Data.Items.Count().Should().Be(expectedPrices.Count); + } + private void DisposeMarkets() { _marketCache.Items.ForEach(m => (m as IDisposable)?.Dispose()); _marketCache.Dispose(); _marketCache.Clear(); } + + private decimal GetRandomPrice() => MarketPrice.RandomPrice(_randomizer, BasePrice, PriceOffset); } diff --git a/src/DynamicData.Tests/Domain/Fakers.cs b/src/DynamicData.Tests/Domain/Fakers.cs index a71cb37a7..069b0fcf6 100644 --- a/src/DynamicData.Tests/Domain/Fakers.cs +++ b/src/DynamicData.Tests/Domain/Fakers.cs @@ -45,6 +45,8 @@ internal static class Fakers public static Faker AnimalOwnerWithAnimals { get; } = AnimalOwner.Clone().WithInitialAnimals(Animal); + public static Faker Market { get; } = new Faker().CustomInstantiator(faker => new Market($"{faker.Commerce.ProductName()} Market Id#{faker.Random.AlphaNumeric(5)}")); + public static Faker WithInitialAnimals(this Faker existing, Faker animalFaker, int minCount, int maxCount) => existing.FinishWith((faker, owner) => owner.Animals.AddRange(animalFaker.GenerateLazy(faker.Random.Number(minCount, maxCount)))); diff --git a/src/DynamicData.Tests/Domain/Market.cs b/src/DynamicData.Tests/Domain/Market.cs index 4422df07a..a100e6df3 100644 --- a/src/DynamicData.Tests/Domain/Market.cs +++ b/src/DynamicData.Tests/Domain/Market.cs @@ -3,6 +3,7 @@ using System.Diagnostics.CodeAnalysis; using System.Linq; using System.Reactive.Linq; +using System.Threading; using DynamicData.Kernel; using DynamicData.Tests.Utilities; @@ -21,6 +22,8 @@ internal interface IMarket internal sealed class Market : IMarket, IDisposable { + private static int s_UniquePriceId; + private readonly ISourceCache _latestPrices = new SourceCache(p => p.ItemId); public static IComparer RatingCompare { get; } = new RatingComparer(); @@ -39,6 +42,10 @@ public Market(int name) : this($"Market #{name}", Guid.NewGuid()) { } + public Market(string name) : this(name, Guid.NewGuid()) + { + } + public string Name { get; } public Guid Id { get; } @@ -51,6 +58,12 @@ public Market(int name) : this($"Market #{name}", Guid.NewGuid()) public MarketPrice CreatePrice(int itemId, decimal price) => new(itemId, price, Id); + public MarketPrice CreateUniquePrice(Func getPrice) + { + var id = Interlocked.Increment(ref s_UniquePriceId); + return CreatePrice(id, getPrice(id)); + } + public Market AddRandomIdPrices(Random r, int count, int minId, int maxId, Func randPrices) { _latestPrices.AddOrUpdate(Enumerable.Range(0, int.MaxValue).Select(_ => r.Next(minId, maxId)).Distinct().Take(count).Select(id => CreatePrice(id, randPrices()))); @@ -95,10 +108,18 @@ public Market SetPrices(int minId, int maxId, Func getPrice) => th public Market SetPrices(int minId, int maxId, decimal newPrice) => SetPrices(minId, maxId, _ => newPrice); + public Market SetPrice(int id, Func getPrice) => this.With(_ => _latestPrices.AddOrUpdate(CreatePrice(id, getPrice()))); + + public Market AddUniquePrices(int count, Func getPrice) => + this.With(_ => _latestPrices.AddOrUpdate(CreateUniquePrices(count, getPrice))); + public void Dispose() => _latestPrices.Dispose(); public override string ToString() => $"Market '{Name}' [{Id}] (Rating: {Rating})"; + private IEnumerable CreateUniquePrices(int count, Func getPrice) => + Enumerable.Range(0, count).Select(_ => CreateUniquePrice(getPrice)); + private class RatingComparer : IComparer { public int Compare([DisallowNull] IMarket x, [DisallowNull] IMarket y) => diff --git a/src/DynamicData.Tests/Domain/MarketPrice.cs b/src/DynamicData.Tests/Domain/MarketPrice.cs index e88845af5..e75181d7a 100644 --- a/src/DynamicData.Tests/Domain/MarketPrice.cs +++ b/src/DynamicData.Tests/Domain/MarketPrice.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; +using Bogus; namespace DynamicData.Tests.Domain; @@ -42,18 +43,28 @@ public decimal Price public static decimal RandomPrice(Random r, decimal basePrice, decimal offset) => basePrice + (decimal)r.NextDouble() * offset; + public static decimal RandomPrice(Randomizer r, decimal basePrice, decimal offset) => r.Decimal(basePrice, basePrice + offset); + + public override bool Equals(object? obj) => obj is MarketPrice price && Price == price.Price && TimeStamp.Equals(price.TimeStamp) && MarketId.Equals(price.MarketId) && ItemId == price.ItemId; + + public override int GetHashCode() => HashCode.Combine(Price, TimeStamp, MarketId, ItemId); + + public static bool operator ==(MarketPrice? left, MarketPrice? right) => EqualityComparer.Default.Equals(left, right); + + public static bool operator !=(MarketPrice? left, MarketPrice? right) => !(left == right); + private class CurrentPriceEqualityComparer : IEqualityComparer { public virtual bool Equals([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) => x.MarketId.Equals(x.MarketId) && x.ItemId == y.ItemId && x.Price == y.Price; public int GetHashCode([DisallowNull] MarketPrice obj) => throw new NotImplementedException(); } - private class TimeStampPriceEqualityComparer : CurrentPriceEqualityComparer, IEqualityComparer + private sealed class TimeStampPriceEqualityComparer : CurrentPriceEqualityComparer, IEqualityComparer { public override bool Equals([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) => base.Equals(x, y) && x.TimeStamp == y.TimeStamp; } - private class LowestPriceComparer : IComparer + private sealed class LowestPriceComparer : IComparer { public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) { @@ -62,7 +73,7 @@ public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) } } - private class HighestPriceComparer : IComparer + private sealed class HighestPriceComparer : IComparer { public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) { @@ -71,7 +82,7 @@ public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) } } - private class LatestPriceComparer : IComparer + private sealed class LatestPriceComparer : IComparer { public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) { diff --git a/src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs b/src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs index 96df9dd47..957b2962e 100644 --- a/src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs +++ b/src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs @@ -21,29 +21,26 @@ public IObservable> Run() => Observabl { var locker = new object(); - // Transform to an observable cache of merge containers. + // Transform to an observable changeset of cached changesets var sourceCacheOfCaches = source - .IgnoreSameReferenceUpdate() - .WhereReasonsAre(ChangeReason.Add, ChangeReason.Remove, ChangeReason.Update) - .Transform((obj, key) => new ChangeSetCache(selector(obj, key))) - .Synchronize(locker) - .AsObservableCache(); + .Transform((obj, key) => new ChangeSetCache(selector(obj, key).Synchronize(locker))) + .AsObservableCache(); - var shared = sourceCacheOfCaches.Connect().Publish(); - - // this is manages all of the changes + // This is manages all of the changes var changeTracker = new ChangeSetMergeTracker(() => sourceCacheOfCaches.Items, comparer, equalityComparer); - // merge the items back together + var shared = sourceCacheOfCaches.Connect().Publish(); + + // Merge the child changeset changes together and apply to the tracker var allChanges = shared.MergeMany(mc => mc.Source) - .Synchronize(locker) .Subscribe( changes => changeTracker.ProcessChangeSet(changes, observer), observer.OnError, observer.OnCompleted); - // when a source item is removed, all of its sub-items need to be removed + // When a source item is removed, all of its sub-items need to be removed var removedItems = shared + .Synchronize(locker) .OnItemRemoved(mc => changeTracker.RemoveItems(mc.Cache.KeyValues, observer)) .OnItemUpdated((_, prev) => changeTracker.RemoveItems(prev.Cache.KeyValues, observer)) .Subscribe();