-
Notifications
You must be signed in to change notification settings - Fork 756
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Publish
overload for IAsyncEnumerable
#2159
Comments
This isn't a part of the code I'm deeply familiar with, but having just looked at it now, the This might be behind your "hopefully without too much of a memory overhead" concern. This buffering only occurs for elements retrieved from the source after any particular subscriber called So in the scenario you describe, it shouldn't actually hold onto anything because there is only one enumerator active at any one time. I've been trying to work out if there's some reason this was not implemented for
The reason I'm looking at all the gaps is to see if there's some overarching theme here. I think if we were to add However, since I don't think anyone has asked for any of these before the demand clearly isn't all that strong... So it might be best to look at adding just The one implementation concern I can see that does unite these two methods is that we'd need an This does make me wonder if this was why these got left out: holding locks in async code is potentially problematic. And this in turn leads to the question of whether just producing |
That would be absolutely perfect! I assume it keeps track of the last executed position then, to be able to continue from there?
Would definitely be nice to have full coverage of all operators for
Adoption of
I see. That makes sense. And I saw your discussion with the .NET team on this regard in another issue. I agree with you that it would make the most sense that this async lock be hosted in BCL and not in a library. And I would understand if you wanted to wait for that to happen before tackling this particular work here.
Totally fair. I would not like to see any drastic differences in behavior between the sync and async counterparts of the same operator even if in my particular case, the more limited feature-set would suffice. I'm also with you here. The only other thing that came to my mind while reading your answer describing the features of I opened this issue when I just went through a feature on our codebase that would benefit from it, but it is not the first time I've wanted to be able to "continue iterating from the last iteration position", it's just that the other times I needed it, I ended up changing the logic to work around it somehow. |
Yes. It's mostly me, and not full time.
I wasn't envisaging differences in logical behaviour. It was more that I'm thinking it might need a different implementation strategy. This is actually an instance of a larger problem with introducing async in Rx and Ix. In the classic non-async Rx the usual assumption is that calls to There's a sort of related issue over in Reaqtor. We have no async support there, and it's mainly because in that world we need to be able to checkpoint the state of operators (to provide reliable persistence, including the ability to migrate an Rx subscription from one machine to another). The way that works today is that we are able to bring everything to a complete halt briefly by temporarily stopping processing input. Because there's no async, and because the basic assumption pervading everything in Reaqtor is that we avoid blocking as much as possible, everything comes to a halt very quickly, we can make our checkpoint, and then resume without an obvious gap in service. But once you introduce async, it seems likely that this strategy won't work (because if you need async it's presumably because you can't actually avoid blocking). (If you're familiar with Reaqtor this might have you raising an eyebrow because it does in fact define various async versions of the Rx interfaces. However, we actually rewrite those subscriptions. We support async because when a client outside of the Reaqtor query engine kicks off a subscription, that's a remote operation so it needs to be async, but when we materialize the subscription inside the query engine, we actually rewrite it at runtime to be a non-async one.) The libraries in this repo don't need to worry about that checkpointing problem. However, there are plenty of cases where we currently hold locks in the non-async version while performing steps that need to be That's not to say I'm not going to do this, just that it might not be good enough to copy and paste in the existing |
I see. Yeah... that's harsh. You are in an even worse position than the OData team then. I wish you the best and hope more folks will join you on this project if only for the selfish reason that I love Rx in general 😆.
Oh, to be clear, I wasn't implying you'd end up with different behaviors. I was basically agreeing with your initial statement that the operations should work consistently.
This is fascinating to hear. Even though I'm not super familiar with Reaqtor, I'm aware of its existence, how complex it is and what it can be used for. Thanks for sharing this interesting tidbit. Makes me wonder if perhaps what it needs is some sort of "durable task" shenanigans that durable azure functions use which basically "override" the way async/await works. But that's probably off-topic here. Thanks for your insights. |
Feature request
I'd like for a new extension to exist with this signature:
That would be the
async
counterpart of the existingIEnumerable.Publish
extension:reactive/Ix.NET/Source/System.Interactive/System/Linq/Operators/Publish.cs
Line 35 in 5f831de
Async,Ix
.Minor (feature).
It is fairly specialized in our case, but at the same time fairly broadly applicable.
We just went through some specific requirement where the capability of grabbing "the next N elements" from an
IAsyncEnumerable
multiple times based on a parentIAsyncEnumerable
would cleanly solve our requirement in an efficient manner.I came to know that the
Publish
extensions onIEnumerable
allows for something like that by buffering the last enumerated index of the sequence, which allows you to, say, callTake(10)
on the sequence, do something with those 10 elements, then callTake(10)
again and grab the next 10, without multiple enumeration of the original (and hopefully without too much of a memory overhead).We would want that same capability but for
IAsyncEnumerable
.We have a scenario where a small database has been serialized as CSV files: each CSV file is an entity, and relationships between entities (1:1 or 1:N) are controlled by columns in those CSVs that work like foreign keys. At the same time, this multi-CSV structure has an additional column in all files that keeps track "what entity we are dealing with". For example, if the set of files describes 5 main entities, this column will go from 1 through 5 in values, in that order.
For example:
vehicle.csv
(main entity)engine.csv
(N:1 with vehicle)We need to parse this structure as cleanly and as efficiently as possible. Assuming we have an
IAsyncEnumerable
for each of these CSV files (using something likeCsvHelper
,Publish
would allow us to do something like:The outside
foreach
is straightforward as that's the 1:1 parent entity, but the child entities can have multiple rows that correspond to that parent, so we useTakeWhile
to read those that are related to this entity based on the runningEntityNum
value.Without
Publish
, this of course doesn't work for anything past the first parent since theTakeWhile
call would always be on the beginning of the sequence, and having to add aSkipWhile
(or even aSkip
) there would mean unneeded multiple enumeration too.The only other equivalent in my mind for this problem would be to drop to the enumerator level and call
MoveNextAsync
manually but that makes the code substantially less readable.The text was updated successfully, but these errors were encountered: