Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Stress Tests for List-to-List MergeManyChangeSets #800

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions src/DynamicData.Tests/List/MergeManyChangeSetsListFixture.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
using System;
using System.Linq;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading.Tasks;
using Bogus;
using DynamicData.Kernel;
using DynamicData.Tests.Domain;
using DynamicData.Tests.Utilities;
using FluentAssertions;
using Xunit;

Expand All @@ -20,6 +25,8 @@ public sealed class MergeManyChangeSetsListFixture : IDisposable
const int AddRangeSize = 53;
const int RemoveRangeSize = 37;
#endif
private static readonly TimeSpan s_MaxAddTime = TimeSpan.FromSeconds(0.1);
private static readonly TimeSpan s_MaxRemoveTime = TimeSpan.FromSeconds(1.0);

private readonly ISourceList<AnimalOwner> _animalOwners = new SourceList<AnimalOwner>();
private readonly ChangeSetAggregator<AnimalOwner> _animalOwnerResults;
Expand All @@ -39,6 +46,71 @@ public MergeManyChangeSetsListFixture()
_animalResults = _animalOwners.Connect().MergeManyChangeSets(owner => owner.Animals.Connect()).AsAggregator();
}

[Theory]
[InlineData(5, 7)]
[InlineData(10, 50)]
[InlineData(10, 1_000)]
[InlineData(200, 500)]
[InlineData(1_000, 10)]
public async Task MultiThreadedStressTest(int ownerCount, int animalCount) =>
dwcullop marked this conversation as resolved.
Show resolved Hide resolved
_ = await AddRemoveAnimalsStress(ownerCount, animalCount, TaskPoolScheduler.Default)
.Finally(CheckResultContents);

[Theory]
[InlineData(5, 7)]
[InlineData(5, 200)]
[InlineData(10, 100)]
[InlineData(20, 50)]
[InlineData(100, 10)]
public void NoDeadlockOrExceptionIfSubscribeDuringModify(int ownerCount, int animalCount)
{
// Not used so don't let it waste time
_animalResults.Dispose();

// Arrange
Func<Task> CreateTest(IScheduler sch, int owners, int animals) =>
async () =>
{
var mergeAnimals = _animalOwners.Connect().MergeManyChangeSets(owner => owner.Animals.Connect());

var addingAnimals = true;

using var addOwners = GenerateOwners(sch)
.Take(owners)
.StressAddRemove(_animalOwners, _ => GetRemoveTime(), sch)
.Finally(() => _animalOwners.Dispose())
.Subscribe();

using var addAnimals = _animalOwners.Connect()
.MergeMany(owner => AddRemoveAnimals(owner, sch, animals))
.Finally(() => addingAnimals = false)
.Subscribe();

do
{
// Ensure items are being added asynchronously before subscribing to the animal changes
await Task.Yield();

{
// Subscribe
var mergedSub = mergeAnimals.Subscribe();

// Let other threads run
await Task.Yield();

// Unsubscribe
mergedSub.Dispose();
}
}
while (addingAnimals);
};

// Act

// Assert
CreateTest(TaskPoolScheduler.Default, ownerCount, animalCount).Should().NotThrowAsync();
dwcullop marked this conversation as resolved.
Show resolved Hide resolved
}

[Fact]
public void NullChecks()
{
Expand Down Expand Up @@ -438,6 +510,41 @@ public void ResultFailsIfSourceFails()
results.Exception.Should().Be(expectedError);
}

private IObservable<Unit> AddRemoveAnimalsStress(int ownerCount, int animalCount, IScheduler scheduler) =>
Observable.Create<Unit>(observer => new CompositeDisposable
{
GenerateOwners(scheduler)
.Take(ownerCount)
.StressAddRemove(_animalOwners, _ => GetRemoveTime(), scheduler)
.Finally(() => _animalOwners.Dispose())
.Subscribe(
onNext: _ => { },
onError: ex => observer.OnError(ex)),

_animalOwners.Connect()
.MergeMany(owner => AddRemoveAnimals(owner, scheduler, animalCount))
.Subscribe(
onNext: _ => { },
onError: ex => observer.OnError(ex),
onCompleted: () =>
{
observer.OnNext(Unit.Default);
observer.OnCompleted();
})
});

private IObservable<Animal> AddRemoveAnimals(AnimalOwner owner, IScheduler sch, int addCount) =>
GenerateAnimals(sch)
.Take(addCount)
.StressAddRemove(owner.Animals, _ => GetRemoveTime(), sch)
.Finally(owner.Animals.Dispose);

private IObservable<AnimalOwner> GenerateOwners(IScheduler scheduler) =>
_randomizer.Interval(s_MaxAddTime, scheduler).Select(_ => _animalOwnerFaker.Generate());

private IObservable<Animal> GenerateAnimals(IScheduler scheduler) =>
_randomizer.Interval(s_MaxAddTime, scheduler).Select(_ => _animalFaker.Generate());

private void CheckResultContents()
{
var expectedOwners = _animalOwners.Items.ToList();
Expand All @@ -459,4 +566,7 @@ public void Dispose()
_animalResults.Dispose();
_animalOwners.Dispose();
}

private TimeSpan? GetRemoveTime() => _randomizer.Bool() ? NextRemoveTime() : null;
private TimeSpan NextRemoveTime() => _randomizer.TimeSpan(s_MaxRemoveTime);
}
2 changes: 1 addition & 1 deletion src/DynamicData/Cache/Internal/MergeManyListChangeSets.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace DynamicData.Cache.Internal;
/// <summary>
/// Operator that is similiar to MergeMany but intelligently handles List ChangeSets.
/// </summary>
internal sealed class MergeManyListChangeSets<TObject, TKey, TDestination>(IObservable<IChangeSet<TObject, TKey>> source, Func<TObject, TKey, IObservable<IChangeSet<TDestination>>> selector, IEqualityComparer<TDestination>? equalityComparer = null)
internal sealed class MergeManyListChangeSets<TObject, TKey, TDestination>(IObservable<IChangeSet<TObject, TKey>> source, Func<TObject, TKey, IObservable<IChangeSet<TDestination>>> selector, IEqualityComparer<TDestination>? equalityComparer)
where TObject : notnull
where TKey : notnull
where TDestination : notnull
Expand Down
10 changes: 4 additions & 6 deletions src/DynamicData/List/Internal/MergeManyListChangeSets.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace DynamicData.List.Internal;
/// <summary>
/// Operator that is similiar to MergeMany but intelligently handles List ChangeSets.
/// </summary>
internal sealed class MergeManyListChangeSets<TObject, TDestination>(IObservable<IChangeSet<TObject>> source, Func<TObject, IObservable<IChangeSet<TDestination>>> selector, IEqualityComparer<TDestination>? equalityComparer = null)
internal sealed class MergeManyListChangeSets<TObject, TDestination>(IObservable<IChangeSet<TObject>> source, Func<TObject, IObservable<IChangeSet<TDestination>>> selector, IEqualityComparer<TDestination>? equalityComparer)
where TObject : notnull
where TDestination : notnull
{
Expand All @@ -24,11 +24,9 @@ public IObservable<IChangeSet<TDestination>> Run() =>
var changeTracker = new ChangeSetMergeTracker<TDestination>();

// Transform to a changeset of Cloned Child Lists and then Share
var sourceListofLists = source
var shared = source
.Transform(obj => new ClonedListChangeSet<TDestination>(selector(obj).Synchronize(locker), equalityComparer))
.AsObservableList();

var shared = sourceListofLists.Connect().Publish();
.Publish();

// Merge the items back together
var allChanges = shared.MergeMany(clonedList => clonedList.Source.RemoveIndex())
Expand All @@ -43,6 +41,6 @@ public IObservable<IChangeSet<TDestination>> Run() =>
.OnItemRemoved(mc => changeTracker.RemoveItems(mc.List, observer), invokeOnUnsubscribe: false)
.Subscribe();

return new CompositeDisposable(sourceListofLists, allChanges, removedItems, shared.Connect());
return new CompositeDisposable(allChanges, removedItems, shared.Connect());
});
}
2 changes: 1 addition & 1 deletion src/DynamicData/List/ObservableListEx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ public static IObservable<IChangeSet<TDestination>> MergeManyChangeSets<TObject,
throw new ArgumentNullException(nameof(observableSelector));
}

return new MergeManyListChangeSets<TObject, TDestination>(source, observableSelector).Run();
return new MergeManyListChangeSets<TObject, TDestination>(source, observableSelector, equalityComparer).Run();
}

/// <summary>
Expand Down