Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Use Flag instead of Counter for TransformManyAsync/TransformOnObservable #844

Merged
merged 15 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/DynamicData/Cache/Internal/MergeChangeSets.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

using System.Reactive.Concurrency;
using System.Reactive.Linq;
using DynamicData.Internal;

namespace DynamicData.Cache.Internal;

Expand Down Expand Up @@ -33,7 +34,7 @@ public IObservable<IChangeSet<TObject, TKey>> Run() => Observable.Create<IChange
.Synchronize(locker)
.Do(cache.Clone)
.MergeMany(mc => mc.Source.Do(static _ => { }, observer.OnError))
.Subscribe(
.SubscribeSafe(
changes => changeTracker.ProcessChangeSet(changes, observer),
observer.OnError,
observer.OnCompleted);
Expand Down
5 changes: 3 additions & 2 deletions src/DynamicData/Cache/Internal/MergeMany.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using DynamicData.Internal;

namespace DynamicData.Cache.Internal;

Expand Down Expand Up @@ -39,9 +40,9 @@ public IObservable<TDestination> Run() => Observable.Create<TDestination>(
.SubscribeMany((t, key) =>
{
counter.Added();
return _observableSelector(t, key).Synchronize(locker).Finally(() => counter.Finally()).Subscribe(observer.OnNext, _ => { }, () => { });
return _observableSelector(t, key).Synchronize(locker).Finally(() => counter.Finally()).Subscribe(observer.OnNext, static _ => { });
})
.Subscribe(_ => { }, observer.OnError, observer.OnCompleted);
.SubscribeSafe(observer.OnError, observer.OnCompleted);

return new CompositeDisposable(disposable, counter);
});
Expand Down
20 changes: 12 additions & 8 deletions src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ public IObservable<IChangeSet<TDestination, TDestinationKey>> Run() => Observabl
var shared = source
.Transform((obj, key) => new ChangeSetCache<TDestination, TDestinationKey>(selector(obj, key).Synchronize(locker)))
.Synchronize(locker)
.Do(cache.Clone)
.Do(_ => parentUpdate = true)
.Do(changes =>
{
cache.Clone(changes);
parentUpdate = true;
})
.Publish();

// Merge the child changeset changes together and apply to the tracker
Expand All @@ -47,12 +50,13 @@ public IObservable<IChangeSet<TDestination, TDestinationKey>> Run() => Observabl
var subRemove = shared
.OnItemRemoved(changeSetCache => changeTracker.RemoveItems(changeSetCache.Cache.KeyValues), invokeOnUnsubscribe: false)
.OnItemUpdated((_, prev) => changeTracker.RemoveItems(prev.Cache.KeyValues))
.Do(_ =>
{
changeTracker.EmitChanges(observer);
parentUpdate = false;
})
.Subscribe();
.SubscribeSafe(
_ =>
{
changeTracker.EmitChanges(observer);
parentUpdate = false;
},
observer.OnError);

return new CompositeDisposable(shared.Connect(), subMergeMany, subRemove);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ public IObservable<IChangeSet<TDestination, TDestinationKey>> Run() => Observabl
var shared = source
.Transform((obj, key) => new ChangeSetCache<ParentChildEntry, TDestinationKey>(_changeSetSelector(obj, key).Synchronize(locker)))
.Synchronize(locker)
.Do(cache.Clone)
.Do(_ => parentUpdate = true)
.Do(changes =>
{
cache.Clone(changes);
parentUpdate = true;
})
.Publish();

// Merge the child changeset changes together and apply to the tracker
Expand All @@ -63,12 +66,13 @@ public IObservable<IChangeSet<TDestination, TDestinationKey>> Run() => Observabl

// Subscribe to handle all the requested changes and emit them downstream
var subParent = parentObservable
.Do(_ =>
{
changeTracker.EmitChanges(observer);
parentUpdate = false;
})
.Subscribe();
.SubscribeSafe(
_ =>
{
changeTracker.EmitChanges(observer);
parentUpdate = false;
},
observer.OnError);

return new CompositeDisposable(shared.Connect(), subMergeMany, subParent);
}).TransformImmutable(entry => entry.Child);
Expand Down
13 changes: 7 additions & 6 deletions src/DynamicData/Cache/Internal/MergeManyListChangeSets.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ public IObservable<IChangeSet<TDestination>> Run() => Observable.Create<IChangeS
var subRemove = shared
.OnItemRemoved(clonedList => changeTracker.RemoveItems(clonedList.List), invokeOnUnsubscribe: false)
.OnItemUpdated((_, prev) => changeTracker.RemoveItems(prev.List))
.Do(_ =>
{
changeTracker.EmitChanges(observer);
parentUpdate = false;
})
.Subscribe();
.SubscribeSafe(
_ =>
{
changeTracker.EmitChanges(observer);
parentUpdate = false;
},
observer.OnError);

return new CompositeDisposable(shared.Connect(), subMergeMany, subRemove);
});
Expand Down
76 changes: 41 additions & 35 deletions src/DynamicData/Cache/Internal/TransformManyAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,62 +21,68 @@ public IObservable<IChangeSet<TDestination, TDestinationKey>> Run() => Observabl
{
var locker = new object();
var cache = new Cache<ChangeSetCache<TDestination, TDestinationKey>, TKey>();
var updateCounter = 0;

// Transformation Function:
// Create the Child Observable by invoking the async selector, appending the counter and the synchronize
// Pass the result to a new ChangeSetCache instance.
ChangeSetCache<TDestination, TDestinationKey> Transform_(TSource obj, TKey key) => new(
Observable.Defer(() => selector(obj, key))
.Do(_ => Interlocked.Increment(ref updateCounter))
.Synchronize(locker!));
var parentUpdate = false;

// This is manages all of the changes
var changeTracker = new ChangeSetMergeTracker<TDestination, TDestinationKey>(() => cache.Items, comparer, equalityComparer);

// Transform Helper
async Task<IObservable<IChangeSet<TDestination, TDestinationKey>>> InvokeSelector(TSource obj, TKey key)
{
if (errorHandler != null)
{
try
{
return await selector(obj, key).ConfigureAwait(false);
}
catch (Exception e)
{
errorHandler.Invoke(new Error<TSource, TKey>(e, obj, key));
return Observable.Empty<IChangeSet<TDestination, TDestinationKey>>();
}
}

return await selector(obj, key).ConfigureAwait(false);
}

// Transformation Function:
// Create the Child Observable by invoking the async selector, appending the synchronize, and creating a new ChangeSetCache instance.
ChangeSetCache<TDestination, TDestinationKey> Transform_(TSource obj, TKey key) =>
new(Observable.Defer(() => InvokeSelector(obj, key)).Synchronize(locker!));

// Transform to a cache changeset of child caches, synchronize, clone changes to the local copy, and publish.
// Increment updateCounter BEFORE the lock so that incoming changesets will cause the downstream changeset to be delayed
// until all pending changesets have been handled.
var shared =
(errorHandler is null ? source.Transform(Transform_) : source.TransformSafe(Transform_, errorHandler))
.Do(_ => Interlocked.Increment(ref updateCounter))
.Synchronize(locker)
.Do(cache.Clone)
.Publish();
var shared = source
.Transform(Transform_)
.Synchronize(locker)
.Do(
changes =>
{
cache.Clone(changes);
parentUpdate = true;
})
.Publish();

// Merge the child changeset changes together and apply to the tracker
// Emit the changeset if there are no other pending changes
// Emit the changeset if not currently handling a parent stream update
var subMergeMany = shared
.MergeMany(cacheChangeSet => cacheChangeSet.Source)
.SubscribeSafe(
changes => changeTracker.ProcessChangeSet(changes, Interlocked.Decrement(ref updateCounter) == 0 ? observer : null),
changes => changeTracker.ProcessChangeSet(changes, !parentUpdate ? observer : null),
observer.OnError);

// When a source item is removed, all of its sub-items need to be removed
// Emit the changeset if there are no other pending changes
// Emit any pending changes
var subRemove = shared
.OnItemRemoved(changeSetCache => changeTracker.RemoveItems(changeSetCache.Cache.KeyValues), invokeOnUnsubscribe: false)
.OnItemUpdated((_, prev) => changeTracker.RemoveItems(prev.Cache.KeyValues))
.SubscribeSafe(
_ =>
{
if (Interlocked.Decrement(ref updateCounter) == 0)
{
changeTracker.EmitChanges(observer);
}
changeTracker.EmitChanges(observer);
parentUpdate = false;
},
observer.OnError,
() =>
{
if (Volatile.Read(ref updateCounter) == 0)
{
observer.OnCompleted();
}
else
{
changeTracker.MarkComplete();
}
});
observer.OnCompleted);

return new CompositeDisposable(shared.Connect(), subMergeMany, subRemove);
});
Expand Down
23 changes: 11 additions & 12 deletions src/DynamicData/Cache/Internal/TransformOnObservable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Reactive.Disposables;
using System.Reactive.Linq;
using DynamicData.Internal;
using DynamicData.Kernel;

namespace DynamicData.Cache.Internal;

Expand All @@ -18,18 +17,20 @@ public IObservable<IChangeSet<TDestination, TKey>> Run() => Observable.Create<IC
{
var cache = new ChangeAwareCache<TDestination, TKey>();
var locker = new object();
var pendingUpdates = 0;
var parentUpdate = false;

// Helper to emit any pending changes when all the updates have been handled
void EmitChanges()
// Helper to emit any pending changes when appropriate
void EmitChanges(bool fromParent)
{
if (Interlocked.Decrement(ref pendingUpdates) == 0)
if (fromParent || !parentUpdate)
{
var changes = cache!.CaptureChanges();
if (changes.Count > 0)
{
observer.OnNext(changes);
}

parentUpdate = false;
}
}

Expand All @@ -38,27 +39,25 @@ void EmitChanges()
IObservable<TDestination> CreateSubObservable(TSource obj, TKey key) =>
transform(obj, key)
.DistinctUntilChanged()
.Do(_ => Interlocked.Increment(ref pendingUpdates))
.Synchronize(locker!)
.Do(val => cache!.AddOrUpdate(val, key));

// Always increment the counter OUTSIDE of the lock to signal any thread currently holding the lock
// to not emit the changeset because more changes are incoming.
// Flag a parent update is happening once inside the lock
var shared = source
.Do(_ => Interlocked.Increment(ref pendingUpdates))
.Synchronize(locker!)
.Do(_ => parentUpdate = true)
.Publish();

// Use MergeMany because it automatically handles Add/Update/Remove and OnCompleted/OnError correctly
// MergeMany automatically handles Add/Update/Remove and OnCompleted/OnError correctly
var subMerged = shared
.MergeMany(CreateSubObservable)
.SubscribeSafe(_ => EmitChanges(), observer.OnError, observer.OnCompleted);
.SubscribeSafe(_ => EmitChanges(fromParent: false), observer.OnError, observer.OnCompleted);

// Subscribe to the shared Observable to handle Remove events. MergeMany will unsubscribe from the sub-observable,
// but the corresponding key value needs to be removed from the Cache so the remove is observed downstream.
var subRemove = shared
.OnItemRemoved((_, key) => cache!.Remove(key), invokeOnUnsubscribe: false)
.SubscribeSafe(_ => EmitChanges());
.SubscribeSafe(_ => EmitChanges(fromParent: true), observer.OnError);

return new CompositeDisposable(shared.Connect(), subMerged, subRemove);
});
Expand Down
13 changes: 9 additions & 4 deletions src/DynamicData/Internal/ObservableEx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@ public static IDisposable SubscribeSafe<T>(this IObservable<T> observable, Actio
public static IDisposable SubscribeSafe<T>(this IObservable<T> observable, Action<T> onNext, Action<Exception> onError) =>
observable.SubscribeSafe(Observer.Create(onNext, onError));

public static IDisposable SubscribeSafe<T>(this IObservable<T> observable, Action<T> onNext, Action onComplete) =>
observable.SubscribeSafe(Observer.Create(onNext, onComplete));
public static IDisposable SubscribeSafe<T>(this IObservable<T> observable, Action<Exception> onError, Action onComplete) =>
observable.SubscribeSafe(Observer.Create(Stub<T>.Ignore, onError, onComplete));

public static IDisposable SubscribeSafe<T>(this IObservable<T> observable, Action<T> onNext) =>
observable.SubscribeSafe(Observer.Create(onNext));
public static IDisposable SubscribeSafe<T>(this IObservable<T> observable, Action<Exception> onError) =>
observable.SubscribeSafe(Observer.Create(Stub<T>.Ignore, onError));

private static class Stub<T>
{
public static readonly Action<T> Ignore = static _ => { };
}
}