Skip to content

Commit

Permalink
Feature: Multithreaded Stress Test for List-to-Cache MergeManyChangeS…
Browse files Browse the repository at this point in the history
…ets (#806)

* Fix operator and added stress test to prevent regressions

* Fix test issue

* Improvements to try and fix the stress test for List-List

* Code synchronization
  • Loading branch information
dwcullop authored Dec 21, 2023
1 parent b605a18 commit 4ffeec0
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 43 deletions.
21 changes: 12 additions & 9 deletions src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,16 @@ public sealed class MergeManyChangeSetsCacheFixture : IDisposable

private readonly ChangeSetAggregator<IMarket, Guid> _marketCacheResults;

private readonly Randomizer _randomizer = new (0x21123737);
private readonly Faker<Market> _marketFaker;

public MergeManyChangeSetsCacheFixture() => _marketCacheResults = _marketCache.Connect().AsAggregator();
private readonly Randomizer _randomizer;

public MergeManyChangeSetsCacheFixture()
{
_randomizer = new(0x21123737);
_marketFaker = Fakers.Market.WithSeed(_randomizer);
_marketCacheResults = _marketCache.Connect().AsAggregator();
}

[Theory]
[InlineData(5, 7)]
Expand Down Expand Up @@ -77,17 +84,13 @@ IObservable<Unit> AddRemoveStress(int marketCount, int priceCount, int parallel,
});

IObservable<IMarket> AddRemoveMarkets(int ownerCount, int parallel, IScheduler scheduler) =>
_randomizer.Interval(MaxAddTime, scheduler).Select(_ => new Market(_randomizer.Utf16String(5, 10, true)))
//.Parallelize(ownerCount, parallel, obs => obs.StressAddRemove(_marketCache, _ => GetRemoveTime(), scheduler))
.Take(ownerCount)
.StressAddRemove(_marketCache, _ => GetRemoveTime(), scheduler)
_marketFaker.IntervalGenerate(MaxAddTime, scheduler)
.Parallelize(ownerCount, parallel, obs => obs.StressAddRemove(_marketCache, _ => GetRemoveTime(), scheduler))
.Finally(_marketCache.Dispose);

IObservable<MarketPrice> AddRemovePrices(Market market, int priceCount, int parallel, IScheduler scheduler) =>
_randomizer.Interval(MaxAddTime, scheduler).Select(_ => market.CreateUniquePrice(_ => GetRandomPrice()))
//.Parallelize(animalCount, parallel, obs => obs.StressAddRemove(owner.Animals, _ => GetRemoveTime(), scheduler))
.Take(priceCount)
.StressAddRemove(market.PricesCache, _ => GetRemoveTime(), scheduler)
.Parallelize(priceCount, parallel, obs => obs.StressAddRemove(market.PricesCache, _ => GetRemoveTime(), scheduler))
.Finally(market.PricesCache.Dispose);

var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices);
Expand Down
2 changes: 0 additions & 2 deletions src/DynamicData.Tests/Cache/MergeManyChangeSetsListFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ public sealed class MergeManyChangeSetsListFixture : IDisposable
const int AddRangeSize = 53;
const int RemoveRangeSize = 37;
#endif
private static readonly TimeSpan s_MaxAddTime = TimeSpan.FromSeconds(0.250);
private static readonly TimeSpan s_MaxRemoveTime = TimeSpan.FromSeconds(0.100);

private readonly ISourceCache<AnimalOwner, Guid> _animalOwners = new SourceCache<AnimalOwner, Guid>(o => o.Id);
private readonly ChangeSetAggregator<AnimalOwner, Guid> _animalOwnerResults;
Expand Down
117 changes: 111 additions & 6 deletions src/DynamicData.Tests/List/MergeManyChangeSetsCacheFixture.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive;
using System.Reactive.Linq;
using System.Threading.Tasks;
using Bogus;
using DynamicData.Kernel;
using DynamicData.Tests.Domain;
using DynamicData.Tests.Utilities;
Expand All @@ -28,15 +33,97 @@ public sealed class MergeManyChangeSetsCacheFixture : IDisposable
const decimal HighestPrice = BasePrice + PriceOffset + 1.0m;
const decimal LowestPrice = BasePrice - 1.0m;

private static readonly Random Random = new (0x03251976);

private static decimal GetRandomPrice() => MarketPrice.RandomPrice(Random, BasePrice, PriceOffset);

private readonly ISourceList<IMarket> _marketList = new SourceList<IMarket>();

private readonly ChangeSetAggregator<IMarket> _marketListResults;

public MergeManyChangeSetsCacheFixture() => _marketListResults = _marketList.Connect().AsAggregator();
private readonly Faker<Market> _marketFaker;

private readonly Randomizer _randomizer;

public MergeManyChangeSetsCacheFixture()
{
_randomizer = new(0x03251976);
_marketFaker = Fakers.Market.WithSeed(_randomizer);
_marketListResults = _marketList.Connect().AsAggregator();
}


[Theory]
[InlineData(5, 7)]
[InlineData(10, 50)]
[InlineData(5, 100)]
[InlineData(200, 500)]
[InlineData(100, 5)]
public async Task MultiThreadedStressTest(int marketCount, int priceCount)
{
var MaxAddTime = TimeSpan.FromSeconds(0.250);
var MaxRemoveTime = TimeSpan.FromSeconds(0.100);

TimeSpan? GetRemoveTime() => _randomizer.Bool() ? _randomizer.TimeSpan(MaxRemoveTime) : null;

IObservable<Unit> AddRemoveStress(int marketCount, int priceCount, int parallel, IScheduler scheduler) =>
Observable.Create<Unit>(observer => new CompositeDisposable
{
AddRemoveMarkets(marketCount, parallel, scheduler)
.Subscribe(
onNext: _ => { },
onError: ex => observer.OnError(ex)),

_marketList.Connect()
.MergeMany(market => AddRemovePrices((Market)market, priceCount, parallel, scheduler))
.Subscribe(
onNext: _ => { },
onError: ex => observer.OnError(ex),
onCompleted: () =>
{
observer.OnNext(Unit.Default);
observer.OnCompleted();
})
});

IObservable<IMarket> AddRemoveMarkets(int ownerCount, int parallel, IScheduler scheduler) =>
_marketFaker.IntervalGenerate(MaxAddTime, scheduler)
.Parallelize(ownerCount, parallel, obs => obs.StressAddRemove(_marketList, _ => GetRemoveTime(), scheduler))
.Finally(_marketList.Dispose);

IObservable<MarketPrice> AddRemovePrices(Market market, int priceCount, int parallel, IScheduler scheduler) =>
_randomizer.Interval(MaxAddTime, scheduler).Select(_ => market.CreateUniquePrice(_ => GetRandomPrice()))
.Parallelize(priceCount, parallel, obs => obs.StressAddRemove(market.PricesCache, _ => GetRemoveTime(), scheduler))
.Finally(market.PricesCache.Dispose);

var merged = _marketList.Connect().MergeManyChangeSets(market => market.LatestPrices);
using var priceResults = merged.AsAggregator();

var adding = true;

// Start asynchrononously modifying the parent list and the child lists
using var addingSub = AddRemoveStress(marketCount, priceCount, Environment.ProcessorCount, TaskPoolScheduler.Default)
.Finally(() => adding = false)
.Subscribe();

// Subscribe / unsubscribe over and over while the collections are being modified
do
{
// Ensure items are being added asynchronously before subscribing to changes
await Task.Yield();

{
// Subscribe
var mergedSub = merged.Subscribe();

// Let other threads run
await Task.Yield();

// Unsubscribe
mergedSub.Dispose();
}
}
while (adding);

// Verify the results
CheckResultContents(_marketListResults, priceResults);
}

[Fact]
public void NullChecks()
Expand Down Expand Up @@ -449,7 +536,7 @@ public void ComparerOnlyAddsBetterValuesOnSourceReplace()
lowPriceResults.Data.Count.Should().Be(PricesPerMarket);
lowPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket);
lowPriceResults.Summary.Overall.Removes.Should().Be(0);
lowPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket * 3);
lowPriceResults.Summary.Overall.Updates.Should().Be(PricesPerMarket * 2);
lowPriceResults.Data.Items.Select(cp => cp.MarketId).ForEach(guid => guid.Should().Be(marketLowLow.Id));
highPriceResults.Data.Count.Should().Be(PricesPerMarket);
highPriceResults.Summary.Overall.Adds.Should().Be(PricesPerMarket);
Expand Down Expand Up @@ -739,10 +826,28 @@ public void Dispose()
DisposeMarkets();
}

private void AddUniquePrices(Market[] markets) => markets.ForEach(m => m.AddUniquePrices(PricesPerMarket, _ => GetRandomPrice()));

private void CheckResultContents(ChangeSetAggregator<IMarket> marketResults, ChangeSetAggregator<MarketPrice, int> priceResults)
{
var expectedMarkets = _marketList.Items.ToList();
var expectedPrices = expectedMarkets.SelectMany(market => ((Market)market).PricesCache.Items).ToList();

// These should be subsets of each other
expectedMarkets.Should().BeSubsetOf(marketResults.Data.Items);
marketResults.Data.Items.Count().Should().Be(expectedMarkets.Count);

// These should be subsets of each other
expectedPrices.Should().BeSubsetOf(priceResults.Data.Items);
priceResults.Data.Items.Count().Should().Be(expectedPrices.Count);
}

private void DisposeMarkets()
{
_marketList.Items.ForEach(m => (m as IDisposable)?.Dispose());
_marketList.Dispose();
_marketList.Clear();
}

private decimal GetRandomPrice() => MarketPrice.RandomPrice(_randomizer, BasePrice, PriceOffset);
}
1 change: 1 addition & 0 deletions src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public IObservable<IChangeSet<TDestination, TDestinationKey>> Run() => Observabl
// This is manages all of the changes
var changeTracker = new ChangeSetMergeTracker<TDestination, TDestinationKey>(() => sourceCacheOfCaches.Items, comparer, equalityComparer);

// Share a connection to the source cache
var shared = sourceCacheOfCaches.Connect().Publish();

// Merge the child changeset changes together and apply to the tracker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public IObservable<IChangeSet<TDestination, TDestinationKey>> Run() => Observabl
.AsObservableCache();

// Share a single connection to the cache
var shared = sourceCacheOfCaches.Connect().Publish();
var shared = sourceCacheOfCaches.Connect().Synchronize(locker).Publish();

// This is manages all of the changes
var changeTracker = new ChangeSetMergeTracker<ParentChildEntry, TDestinationKey>(() => sourceCacheOfCaches.Items, _comparer, _equalityComparer);
Expand All @@ -48,7 +48,6 @@ public IObservable<IChangeSet<TDestination, TDestinationKey>> Run() => Observabl

// When a source item is removed, all of its sub-items need to be removed
var removedItems = shared
.Synchronize(locker)
.OnItemRemoved(mc => changeTracker.RemoveItems(mc.Cache.KeyValues, observer))
.OnItemUpdated((_, prev) => changeTracker.RemoveItems(prev.Cache.KeyValues, observer))
.Subscribe();
Expand All @@ -57,7 +56,6 @@ public IObservable<IChangeSet<TDestination, TDestinationKey>> Run() => Observabl
// Because the comparison is based on the parent, which has just been refreshed.
var refreshItems = reevalOnRefresh
? shared
.Synchronize(locker)
.OnItemRefreshed(mc => changeTracker.RefreshItems(mc.Cache.Keys, observer))
.Subscribe()
: Disposable.Empty;
Expand Down
28 changes: 15 additions & 13 deletions src/DynamicData/List/Internal/MergeManyCacheChangeSets.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,30 @@ public IObservable<IChangeSet<TDestination, TDestinationKey>> Run() =>

// Transform to an observable list of merge containers.
var sourceListOfCaches = source
.Transform(obj => new ChangeSetCache<TDestination, TDestinationKey>(changeSetSelector(obj)))
.Synchronize(locker)
.AsObservableList();

var shared = sourceListOfCaches.Connect().Publish();
.Transform(obj => new ChangeSetCache<TDestination, TDestinationKey>(changeSetSelector(obj).Synchronize(locker)))
.AsObservableList();

// This is manages all of the changes
var changeTracker = new ChangeSetMergeTracker<TDestination, TDestinationKey>(() => sourceListOfCaches.Items.ToArray(), comparer, equalityComparer);

// Share a connection to the source list
var shared = sourceListOfCaches.Connect().Publish();

// Merge the child changeset changes together and apply to the tracker
var allChanges = shared
.Synchronize(locker)
.MergeMany(mc => mc.Source)
.Subscribe(
changes => changeTracker.ProcessChangeSet(changes, observer),
observer.OnError,
observer.OnCompleted);

// When a source item is removed, all of its sub-items need to be removed
var removedItems = shared
.Synchronize(locker)
.OnItemRemoved(mc => changeTracker.RemoveItems(mc.Cache.KeyValues, observer))
.Subscribe();

// Merge the items back together
var allChanges = shared.MergeMany(mc => mc.Source)
.Synchronize(locker)
.Subscribe(
changes => changeTracker.ProcessChangeSet(changes, observer),
observer.OnError,
observer.OnCompleted);

return new CompositeDisposable(sourceListOfCaches, allChanges, removedItems, shared.Connect());
});
}
25 changes: 15 additions & 10 deletions src/DynamicData/List/Internal/MergeManyListChangeSets.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,32 @@ public IObservable<IChangeSet<TDestination>> Run() =>
{
var locker = new object();

// Transform to a changeset of Cloned Child Lists and then Share
var sourceListOfLists = source
.Transform(obj => new ClonedListChangeSet<TDestination>(selector(obj).Synchronize(locker), equalityComparer))
.AsObservableList();

// This is manages all of the changes
var changeTracker = new ChangeSetMergeTracker<TDestination>();

// Transform to a changeset of Cloned Child Lists and then Share
var shared = source
.Transform(obj => new ClonedListChangeSet<TDestination>(selector(obj).Synchronize(locker), equalityComparer))
.Publish();
// Share a connection to the source cache
var shared = sourceListOfLists.Connect().Publish();

// Merge the items back together
var allChanges = shared.MergeMany(clonedList => clonedList.Source.RemoveIndex())
.Subscribe(
changes => changeTracker.ProcessChangeSet(changes, observer),
observer.OnError,
observer.OnCompleted);
var allChanges = shared
.Synchronize(locker)
.MergeMany(clonedList => clonedList.Source.RemoveIndex())
.Subscribe(
changes => changeTracker.ProcessChangeSet(changes, observer),
observer.OnError,
observer.OnCompleted);

// When a source item is removed, all of its sub-items need to be removed
var removedItems = shared
.Synchronize(locker)
.OnItemRemoved(mc => changeTracker.RemoveItems(mc.List, observer), invokeOnUnsubscribe: false)
.Subscribe();

return new CompositeDisposable(allChanges, removedItems, shared.Connect());
return new CompositeDisposable(sourceListOfLists, allChanges, removedItems, shared.Connect());
});
}

0 comments on commit 4ffeec0

Please sign in to comment.