Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TransformAsync enhancements #819

Merged
merged 12 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions src/DynamicData.Tests/Cache/TransformAsyncFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;

using DynamicData.Binding;
using DynamicData.Tests.Domain;

using FluentAssertions;
Expand Down Expand Up @@ -93,7 +93,7 @@ public void Remove()
public async Task RemoveFlowsToTheEnd()
{
var transform = 0;
var count = 500;
var count = 100;
ReadOnlyObservableCollection<Person> collection;

var cache = new SourceCache<Person, string>(p => p.Name);
Expand Down Expand Up @@ -121,9 +121,8 @@ public async Task RemoveFlowsToTheEnd()
cache.RemoveKey(p.Name);
}

while (transform != count)
await Task.Delay(100);
await Task.Delay(3000);
await collection.ToObservableChangeSet().Take(count * 2);

collection.Count.Should().Be(0);
}

Expand Down
102 changes: 47 additions & 55 deletions src/DynamicData/Cache/Internal/TransformAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,71 +3,60 @@
// See the LICENSE file in the project root for full license information.

using System.Reactive.Linq;

using System.Reactive.Threading.Tasks;
using DynamicData.Kernel;

namespace DynamicData.Cache.Internal;

internal sealed class TransformAsync<TDestination, TSource, TKey>(IObservable<IChangeSet<TSource, TKey>> source, Func<TSource, Optional<TSource>, TKey, Task<TDestination>> transformFactory, Action<Error<TSource, TKey>>? exceptionCallback, IObservable<Func<TSource, TKey, bool>>? forceTransform = null)
internal class TransformAsync<TDestination, TSource, TKey>(
IObservable<IChangeSet<TSource, TKey>> source,
Func<TSource, Optional<TSource>, TKey, Task<TDestination>> transformFactory,
Action<Error<TSource, TKey>>? exceptionCallback,
IObservable<Func<TSource, TKey, bool>>? forceTransform = null,
int maximumConcurrency = 4,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The introduction of this parameter is the main performance improvement here, yeah?

I would have the default be null, for a couple of reasons:

  1. You're really just passing this parameter directly to .Merge() and .Merge() doesn't enforce a default, you either supply a limit, or there isn't one.
  2. 4 is the reasonable default that .NET usually uses for CPU parallelism, but this isn't about limiting CPU parallelism. The practical scenario that this supports is about I/O concurrency, E.G. database queries.

Otherwise, this functionally looks good to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I had no idea there was an overload of Merge that took a max concurrency parameter. But having looked it up, I agree it should be null, and if it is, then invoke the overload that doesn't use one.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only found out about the overload to limit concurrency recently myself. Rx is the gift that keeps giving.

You both make fair points, and the same also occurred to me. So it will be null.

What do you two think about point 2? Complex overloads are a cognitive pain which optional params tried to resolve. That's why I M thinking about moving towards options objects for primitive optional values

Copy link
Collaborator

@JakenVeina JakenVeina Jan 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Options objects wouldn't be TOO crazy, but I'm not sure where the line is for how many parameters it takes to merit a full object. Is there any wisdom we can glean from Microsoft's usages of that pattern? If we did do this, we would definitely want to implement Options objects as readonly record struct I think.

In my personal opinion, I don't consider optional parameters as being cognitive pain at all, because when I invoke almost any method where the parameter purpose isn't self-evident at a glance, I do...

var result = SomeMethod(
    parameter1: value1,
    parameter2: value2,
    parameter3: value3);

This plays seamlessly with optional parameters, cause all I have to do is omit the ones I don't want.

This is also pretty much functionally and semantically identical to using an options struct.

var result = SomeMethod(new()
{
    Option1 = value1,
    Option2 = value2,
    Option3 = value3
});

Both of these offer essentially the exact same cost for the method call, as all the parameters/options (used or not) are allocated on the stack. The difference really is just syntax, and how much code we have to write in the library.

If the question is whether to use overloads or optional parameters to provide variable functionality, my vote is definitely for optional parameters, except for methods that already exist, since adding new optional parameters is a breaking change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

record struct is fine, but not all versions of C# support it. Either way, make sure to pass it using in so that it isn't copied on the stack.

An options object is not a bad idea because it enables changes to be made without breaking things (adding new options, changing defaults) but we should also have overloads that allow a few commonly used parameters to be set without having to create the object.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it enables changes to be made without breaking things

Oooh, an excellent point.

bool transformOnRefresh = false)
where TDestination : notnull
where TSource : notnull
where TKey : notnull
{
public IObservable<IChangeSet<TDestination, TKey>> Run() => Observable.Create<IChangeSet<TDestination, TKey>>(observer =>
{
var cache = new ChangeAwareCache<TransformedItemContainer, TKey>();
var asyncLock = new SemaphoreSlim(1, 1);

var transformer = source.Select(async changes =>
{
try
{
await asyncLock.WaitAsync();
return await DoTransform(cache, changes).ConfigureAwait(false);
}
finally
{
asyncLock.Release();
}
}).Concat();

if (forceTransform is not null)
{
var locker = new object();
var forced = forceTransform.Synchronize(locker)
.Select(async shouldTransform =>
{
try
{
await asyncLock.WaitAsync();
return await DoTransform(cache, shouldTransform).ConfigureAwait(false);
}
finally
{
asyncLock.Release();
}
}).Concat();

transformer = transformer.Synchronize(locker).Merge(forced);
}

return transformer.SubscribeSafe(observer);
});

private async Task<IChangeSet<TDestination, TKey>> DoTransform(ChangeAwareCache<TransformedItemContainer, TKey> cache, Func<TSource, TKey, bool> shouldTransform)
{
var toTransform = cache.KeyValues.Where(kvp => shouldTransform(kvp.Value.Source, kvp.Key)).Select(kvp => new Change<TSource, TKey>(ChangeReason.Update, kvp.Key, kvp.Value.Source, kvp.Value.Source)).ToArray();
public IObservable<IChangeSet<TDestination, TKey>> Run() =>
Observable.Create<IChangeSet<TDestination, TKey>>(observer =>
{
var cache = new ChangeAwareCache<TransformedItemContainer, TKey>();

var transformed = await Task.WhenAll(toTransform.Select(Transform)).ConfigureAwait(false);
var transformer = source.Select(changes => DoTransform(cache, changes)).Concat();

return ProcessUpdates(cache, transformed);
}
if (forceTransform is not null)
{
var locker = new object();
var forced = forceTransform.Synchronize(locker)
.Select(shouldTransform => DoTransform(cache, shouldTransform)).Concat();

transformer = transformer.Synchronize(locker).Merge(forced);
}

return transformer.SubscribeSafe(observer);
});

private async Task<IChangeSet<TDestination, TKey>> DoTransform(ChangeAwareCache<TransformedItemContainer, TKey> cache, IChangeSet<TSource, TKey> changes)
private IObservable<IChangeSet<TDestination, TKey>> DoTransform(
ChangeAwareCache<TransformedItemContainer, TKey> cache, Func<TSource, TKey, bool> shouldTransform)
{
var transformed = await Task.WhenAll(changes.Select(Transform)).ConfigureAwait(false);
var toTransform = cache.KeyValues.Where(kvp => shouldTransform(kvp.Value.Source, kvp.Key)).Select(kvp =>
new Change<TSource, TKey>(ChangeReason.Update, kvp.Key, kvp.Value.Source, kvp.Value.Source)).ToArray();

return toTransform.Select(change => Observable.Defer(() => Transform(change).ToObservable()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Observable.FromAsync(() => Transform(change)) gives the same semantics as Defer plus ToObservable and more clearly expresses your intent.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, you could do:

return toTransform.Select(change => Transform(change))
        .Merge()
        .ToArray()
        .Select(transformed => ProcessUpdates(cache, transformed));

Or, if you want to provide the maximum concurrency parameter:

return toTransform.Select(change => Observable.FromAsync(() => Transform(change)))
        .Merge(maximumConcurrency)
        .ToArray()
        .Select(transformed => ProcessUpdates(cache, transformed));

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I believe Defer() is redundant here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defer is necessary, otherwise the task is invoked immediately, which would result in the merge limiter having no effect as it would be the results being limited not the actual tasks.

Copy link
Collaborator

@JakenVeina JakenVeina Jan 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, Observable.FromAsync() already works that way. The async function returning Task (I.E. Transform) is not invoked until the observable is subscribed to. Which is the thing that maxConcurrent controls.

var whenCompletedSource = new TaskCompletionSource<int>();

var whenCompleted = Observable.FromAsync(() =>
{
    Console.WriteLine("Observable.FromAsync()");
    return whenCompletedSource.Task;
});

Console.WriteLine("whenCompleted.Subscribe()");
whenCompleted.Subscribe(x => Console.WriteLine($"whenCompleted.OnNext({x})"));

Console.WriteLine("whenCompletedSource.SetResult(7)}");
whenCompletedSource.SetResult(7);

Thread.Sleep(TimeSpan.FromSeconds(1));

This produces the following output:

whenCompleted.Subscribe()
Observable.FromAsync()
whenCompletedSource.SetResult(7)}
whenCompleted.OnNext(7)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your example is for a single invocation. What I'd be interested to see is to try and limit say a 1000 tasks in a way that that only 4 of them would run concurrently

Copy link
Member

@dwcullop dwcullop Jan 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function passed to Observable.FromAsync gets run once per subscription but not until the subscription happens (like Defer) and Merge(x) does not subscribe to the X+1-th item until one of the previous streams completes.

You need Defer if you invoke a method to get a task and then convert to an observable, but I think Defer plus ToObservable is semantically the same as Observable.FromAsync, so coder's choice, but I do feel like Observable.FromAsync is clearer.

Code

I used this test... It's only 20, not 1000, but I don't think that changes anything...

async Task Main()
{
    // These two seem to be the same
    var obsAsync = Observable.FromAsync(AsyncThing);
    // var obsAsync = Observable.Defer(() => AsyncThing().ToObservable());

    var obs = Observable.Range(0, 20).Select(n => obsAsync.Spy($"Async #{n}"));
    
    // Only 3 AsyncThings at a time
    await obs.Merge(3);
    
    // Let them all fly
    //await obs.Merge();
}

static int current = 0;
static int counter = 0;

async Task<int> AsyncThing()
{
    int n = Interlocked.Increment(ref current);
    Console.WriteLine($"Starting AsyncThing #{n} ({Interlocked.Increment(ref counter)})");
    
    try
    {
        await Task.Delay(100);
        return n;
    }
    finally
    {
        Console.WriteLine($"Ending AsyncThing #{n} ({Interlocked.Decrement(ref counter)})");
    }
}

AsyncThing is invoked once per subscription. The Observable is created, but the async method is not invoked until the subscribe.

With Merge(), counter gets up to 20 because they're all running at once. With Merge(3), then counter never gets bigger than 3.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking into this. I add the overload and some testing soon.

Copy link
Collaborator Author

@RolandPheasant RolandPheasant Jan 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JakenVeina I can confirm that Observable,FromAsync() works like the Defer as you suggested. I was not willing to change what I had until I had a test. I get similar run times with your suggested change as previously.

image

I have a question for you against the test.

.Merge(maximumConcurrency < 1 ? int.MaxValue : maximumConcurrency)
.ToArray()
.Select(transformed => ProcessUpdates(cache, transformed));
}

return ProcessUpdates(cache, transformed);
private IObservable<IChangeSet<TDestination, TKey>> DoTransform(
ChangeAwareCache<TransformedItemContainer, TKey> cache, IChangeSet<TSource, TKey> changes)
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, the point of allowing concurrency to be specified is for cases like if the transformation needs to do something like hit a database and only a certain number of DB connections are allowed at once? I guess that makes sense, but I really don't think it's DD's job to do that.
The same thing could be accomplished by having a Semaphore in the Transform function. So, maybe you don't need this new parameter at all.

Merge also has this overload (I also recently discovered):
IObservable<TSource> Merge<TSource>(IObservable<Task<TSource>> sources)

Which seems to be exactly what you're trying to achieve with the Defer / ToObservable combination. Unfortunately, I don't see a version of that works with tasks AND has a concurrency limit, but I'm not really sure you need to use that.

Copy link
Collaborator

@JakenVeina JakenVeina Jan 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rather agree about this not being DD's responsibility, but from the perspective of " we're calling .Merge() so, let's expose the optimization parameter that .Merge() has", I think it makes sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rather agree about this not being DD's responsibility, but from the perspective of " we're calling .Merge() so, let's expose the optimization parameter that .Merge() has", I think it makes sense.

I guess if the logic is in the middle of the operator, there aren't many good options for allowing consumers to customize it.

return changes.Select(change => Observable.Defer(() => Transform(change).ToObservable()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This threw me off at first... Using ToArray with an Observable. But if you're starting with an array, I guess it makes sense. Never seen that approach before so I wanted to call it out.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good eye on this.

To me, this seems like a big indicator that something nonsensical is being done here. It seems like the only point of converting each change to its own observable is to be able to .Merge() them together, with the maxConcurrent parameter, yes? But that accomplishes nothing if we then do .ToArray() on the whole thing. That's gonna block the whole thread, is it not?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it won't block the thread. ToArray returns an IObservable<T[]> that fires once (when the input sequence has completed). There's no need to block, you just don't get events downstream until the all the Mergeed streams have completed.

I think it does make sense. It's just a new technique to me.

.Merge(maximumConcurrency < 1 ? int.MaxValue : maximumConcurrency)
.ToArray()
.Select(transformed => ProcessUpdates(cache, transformed));
}

private ChangeSet<TDestination, TKey> ProcessUpdates(ChangeAwareCache<TransformedItemContainer, TKey> cache, TransformResult[] transformedItems)
Expand All @@ -76,7 +65,8 @@ private ChangeSet<TDestination, TKey> ProcessUpdates(ChangeAwareCache<Transforme
var errors = transformedItems.Where(t => !t.Success).ToArray();
if (errors.Length > 0)
{
errors.ForEach(t => exceptionCallback?.Invoke(new Error<TSource, TKey>(t.Error, t.Change.Current, t.Change.Key)));
errors.ForEach(t =>
exceptionCallback?.Invoke(new Error<TSource, TKey>(t.Error, t.Change.Current, t.Change.Key)));
}

foreach (var result in transformedItems.Where(t => t.Success))
Expand All @@ -100,6 +90,7 @@ private ChangeSet<TDestination, TKey> ProcessUpdates(ChangeAwareCache<Transforme
}

var changes = cache.CaptureChanges();

var transformed = changes.Select(change => new Change<TDestination, TKey>(change.Reason, change.Key, change.Current.Destination, change.Previous.Convert(x => x.Destination), change.CurrentIndex, change.PreviousIndex));

return new ChangeSet<TDestination, TKey>(transformed);
Expand All @@ -109,9 +100,10 @@ private async Task<TransformResult> Transform(Change<TSource, TKey> change)
{
try
{
if (change.Reason == ChangeReason.Add || change.Reason == ChangeReason.Update)
if (change.Reason is ChangeReason.Add or ChangeReason.Update || (change.Reason is ChangeReason.Refresh && transformOnRefresh))
{
var destination = await transformFactory(change.Current, change.Previous, change.Key).ConfigureAwait(false);
var destination = await transformFactory(change.Current, change.Previous, change.Key)
.ConfigureAwait(false);
return new TransformResult(change, new TransformedItemContainer(change.Current, destination));
}

Expand Down