Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds async stream rfc #2996

Closed

Conversation

nellshamrell
Copy link
Contributor

@nellshamrell nellshamrell commented Sep 29, 2020

Rendered

Pre-RFC Discussions

Draft 1
Draft 2
Draft 3
Draft 4

Signed-off-by: Nell Shamrell [email protected]


Current FCP: #2996 (comment)


Status: merged

Signed-off-by: Nell Shamrell <[email protected]>
text/0000-async-stream.md Outdated Show resolved Hide resolved
text/0000-async-stream.md Outdated Show resolved Hide resolved
text/0000-async-stream.md Show resolved Hide resolved
text/0000-async-stream.md Outdated Show resolved Hide resolved
text/0000-async-stream.md Outdated Show resolved Hide resolved
text/0000-async-stream.md Outdated Show resolved Hide resolved
@nikomatsakis nikomatsakis added the T-libs-api Relevant to the library API team, which will review and decide on the RFC. label Sep 30, 2020
@nikomatsakis
Copy link
Contributor

We discussed this amongst the lang team at some point and concluded that this does not require @rust-lang/lang signoff. I've tagged it for @rust-lang/libs.

@nikomatsakis
Copy link
Contributor

Also cc @rust-lang/wg-async-foundations

@joshtriplett
Copy link
Member

👍 to adding try_next() if possible.

Also: could we please standardize a method for converting from Iterator to Stream? That may not be able to use IntoStream (because a blanket impl for Iterator would conflict with more specific impls people may want to write), but having a function that takes an impl Iterator<Item = T> and returns an impl Stream<Item = T> would be really helpful.

text/0000-async-stream.md Outdated Show resolved Hide resolved
text/0000-async-stream.md Outdated Show resolved Hide resolved
@yoshuawuyts
Copy link
Member

yoshuawuyts commented Oct 1, 2020

👍 to adding try_next() if possible.

@joshtriplett The conversation around whether to add Stream::next took quite a while to resolve, and was only added because poll_next by itself is not useful for end-users. The proposed try_next adapter has no counterpart in Iterator, is a minor helper to avoid writing .transpose(), and doesn't include functionality unique to async Rust. I would prefer if the scope of this PR would not be expanded beyond what's already been proposed so we don't need to find consensus on the design of further stream adapters in order to stabilize Stream.


Also: could we please standardize a method for converting from Iterator to Stream?

@joshtriplett async-std has stream::from_iter and futures-rs has stream::iter. I don't know if you think this should be mentioned in the RFC, but adding a function with this functionality would be straightforward once we expose Stream.

@joshtriplett
Copy link
Member

@yoshuawuyts To clarify, neither of those comments was intended as a blocker to this RFC. I think it makes sense to add a minimal implementation of Stream, and after doing so, add additional helper functions like these to make it easier to work with.

@nellshamrell
Copy link
Contributor Author

Information about converting an iterator to a stream has been added :)

@LucioFranco
Copy link
Member

LucioFranco commented Oct 5, 2020

I finally got around to reading, this looks pretty good, I think including next is +1. I would really really like to see try_next, I think this is by far the most useful fn I have used for streams. I don't really understand the reason to not include it? I don't think there is much consensus needed for it, we've already agreed how to do next and try_next shouldn't be much more complicated as far as I can tell.

Another example of !Unpin streams are done via async-stream crate https://github.com/tokio-rs/async-stream/ which is usable on stable today.

Thanks a ton @nellshamrell and other for pushing this through 😄

@ratmice
Copy link

ratmice commented Oct 12, 2020

I'm not sure where to comment on this, but given that this RFC discusses LendStream, as a generalization, in theory at least there is a further generalization involving different lifetimes for the Lender, and the Item, with a lifetime bounds that 'lender: 'item. Which probably requires additional control mechanisms to regain ownership in the Lender.

With the idea that the single liftime LendingStream is a specialization of it where the lifetimes are the same.
I haven't thought enough about the extent to which the further generalization can be expressed currently though.
It seemed since the discussion of LendingStream as a generalization of Stream up it might/might not be worth thinking about this further generalization

Copy link

@l4l l4l left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some minor typos, please check it out, I'm not a native speaker there might be mistakes.
That's an amazing job by the way, really like reading the rfc. Hope it would be approved soon.

text/0000-async-stream.md Outdated Show resolved Hide resolved
text/0000-async-stream.md Outdated Show resolved Hide resolved
text/0000-async-stream.md Outdated Show resolved Hide resolved
text/0000-async-stream.md Outdated Show resolved Hide resolved
text/0000-async-stream.md Outdated Show resolved Hide resolved
text/0000-async-stream.md Show resolved Hide resolved
text/0000-async-stream.md Outdated Show resolved Hide resolved
text/0000-async-stream.md Outdated Show resolved Hide resolved
text/0000-async-stream.md Outdated Show resolved Hide resolved
text/0000-async-stream.md Show resolved Hide resolved
@yoshuawuyts
Copy link
Member

I would really really like to see try_next [...] I don't really understand the reason to not include it?

@LucioFranco I covered that earlier in the thread #2996 (comment):

The proposed try_next adapter has no counterpart in Iterator, is a minor helper to avoid writing .transpose(), and doesn't include functionality unique to async Rust. I would prefer if the scope of this PR would not be expanded beyond what's already been proposed so we don't need to find consensus on the design of further stream adapters in order to stabilize Stream.

I see try_next as being desirable only as long as we don't have async iteration syntax, and will fall out of fashion the moment we do. I know you'll see this differently, and discussing the scope and priorities of stream adapters will make for an interesting conversation. But do you really want to push us to find consensus on that now?

We're both part of the Async Foundations WG, and we've had the privilege of being able to set the direction of this RFC. And that was a success; we have consensus on everything included here! I think now is the time to ensure what's in this RFC is accurate, but not seek to expand its scope with subjects we then newly need to find consensus on.

@nikomatsakis
Copy link
Contributor

To ensure I understand, the role of try_next is to allow you to write code like this?

while let Some(foo) = stream.try_next().await? {
}

as opposed to this?

while let Some(foo) = stream.try_next().await {
    let foo = foo?;
}

Hmm, I'm torn. I agree with @yoshuawuyts that we should avoid increasing the scope of the RFC in general, which is intentionally targeted. But I also think that the point of including next was that it was something people would "immediately want" in practice, and I can imagine that try_next also fits in that bucket. I'm not sure how important it is that it would become less useful if/when async iteration syntax is added, given that we don't have a clear timeline on that. Is the primary objection that the method will not be as useful in the future or are there other concerns?

I would want to hold the line against other forms of "scope creep", the only reason I can see to consider try_next is that it feels very analogous to next.

@nikomatsakis
Copy link
Contributor

Thinking on it more, I think what I'd prefer is to leave the RFC unaltered, land it, and consider try_next separately as a PR. It feels like a fairly standard "libs addition". The only difference is that, if we plan to make the futures crate redirect to core, before we actually do that we will need to have a clear idea of what methods we have, because any additional methods will have to be carefully coordinated.

@LucioFranco
Copy link
Member

@yoshuawuyts sure, I am pretty sure I voiced my opinion during that period. It's also my fault for not pushing it harder, I got busy and that happens. I still don't follow your argument you referenced with iterators. I think Streams are not 1:1 with iterators but I rest my case.

@nikomatsakis right, I think most streams I have worked with return some sort of result so avoiding the first example in favor of the second is much nicer.

I think its fine to punt on this for now, just knowing that I think this is extremely useful. My question then is, what does that path look like to add try_next and what does that look like in conjunction with the futures crate? For example, I want to use this try_next and when/if we add it to std will it just replace the one on the futures crate? To me I see this method as one of the foundational ones for this trait and I think that path needs to be clear.

@Nemo157
Copy link
Member

Nemo157 commented Oct 13, 2020

The alternative to try_next is

while let Some(foo) = stream.next().await.transpose()? {
}

which is a pattern I've started using with fallible iterators as well, having to use a while let instead of a for loop is less annoying than having the let foo = foo?; line inside the loop.

@Nemo157
Copy link
Member

Nemo157 commented Jan 20, 2021

If streams can register themselves once at the beginning of a for_each and de-register at the end, the stream may be implemented considerably more efficiently.

for_each takes a closure returning a future which may "block" for as long as it likes, so it would need to pause the stream before running that future and unpause it again afterwards. Similarly as soon as you have a .then(_) in a stream chain that Then stream would have to pre-emptively pause the prior stream in case the future it runs for each item "blocks" for too long. I don't think there are many situations in which you could poll a stream multiple times without pausing it between each poll.

@Thomasdezeeuw
Copy link

Re: the use of the Stream trait when receiving from channels.

Maybe Stream is simply not the correct abstraction for it. If you need to know (for performance reasons) what receivers are actively attempting to receive a value maybe a short-lived Future like the Stream::next method (now removed) returned would be a better fit.

I realise that Stream could provide some nice syntax sugar similar to what for with Iterators, but the following also works fine.

while let Ok(msg) = channel.receive_next().await {
    // Handle message...
}

impl<M> Channel<M> {
    fn receive_next(&mut self) -> impl Future<Output = Result<M, RecvError>> { /* ... */ }
}

It creates short lived Futures allowing the implementation to know which receivers are actively trying to receive a value.

consider `next`, which when called, returns a future which yields
`Option<Item>`.

The future returned by `next` will yield `Some(Item)` as long as there are
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The future returned by next will yield Some(Item) as long as there are elements

How does this play with fuse considering the future exhausted after the first Poll::Ready? Is the idea that you can either rely on that implementation detail for next and continue to pull items using the same future or get a new one after the previous next resolved?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @KodrAus - the answer on this one is I honestly don't know. I'm happy to include a note that this will need to be considered in the eventual RFC and implementation of the next method. Would that be sufficient, since this is in the Future Possibilities section?

}
```

## Initial impls
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found myself wanting to see the contents of the More Usage Examples section here first after we defined our Counter to see what it's like to interact with on the consuming side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @KodrAus - I intentionally did not include the consuming side here since the next method has been moved to the "Future Possibilities" section. I felt including that here would introduce more confusion about what is and is not covered by this RFC.

@KodrAus
Copy link
Contributor

KodrAus commented Jan 21, 2021

@rust-lang/libs I've dropped a link to the FCP in the OP here, since it's gotten quite buried.

@zesterer
Copy link

zesterer commented Jan 21, 2021

Maybe Stream is simply not the correct abstraction for it. If you need to know (for performance reasons) what receivers are actively attempting to receive a value maybe a short-lived Future like the Stream::next method (now removed) returned would be a better fit.

Perhaps this is the case, but I think it would be a shame for Stream to not accommodate this common use-case when the cost of the features required to allow it to accommodate it is so low.

This also means that there is no longer a natural duality between Stream and Iterator as the RFC claims.

@nellshamrell
Copy link
Contributor Author

Hello @rust-lang/libs :) For those of you who have not yet approved FCP in this comment - are there any concerns you have or is there anything blocking approving this for FCP?

@nellshamrell
Copy link
Contributor Author

@zesterer thank you for your comments, they encouraged us to think even more carefully about the design of this feature. I am going to leave the design as it is now as a minimal viable implementation, but we have your concerns about channels recorded here for future reference. I have no doubt we will be opening more RFCs around Async streams in the near future and we will certainly keep your concerns in mind. Thank you again, the community input and context is vital to any RFC.

JohnTitor added a commit to JohnTitor/rust that referenced this pull request Jan 30, 2021
Add `core::stream::Stream`

[[Tracking issue: rust-lang#79024](rust-lang#79024)]

This patch adds the `core::stream` submodule and implements `core::stream::Stream` in accordance with [RFC2996](rust-lang/rfcs#2996). The RFC hasn't been merged yet, but as requested by the libs team in rust-lang/rfcs#2996 (comment) I'm filing this PR to get the ball rolling.

## Documentatation

The docs in this PR have been adapted from [`std::iter`](https://doc.rust-lang.org/std/iter/index.html), [`async_std::stream`](https://docs.rs/async-std/1.7.0/async_std/stream/index.html), and [`futures::stream::Stream`](https://docs.rs/futures/0.3.8/futures/stream/trait.Stream.html). Once this PR lands my plan is to follow this up with PRs to add helper methods such as `stream::repeat` which can be used to document more of the concepts that are currently missing. That will allow us to cover concepts such as "infinite streams" and "laziness" in more depth.

## Feature gate

The feature gate for `Stream` is `stream_trait`. This matches the `#[lang = "future_trait"]` attribute name. The intention is that only the APIs defined in RFC2996 will use this feature gate, with future additions such as `stream::repeat` using their own feature gates. This is so we can ensure a smooth path towards stabilizing the `Stream` trait without needing to stabilize all the APIs in `core::stream` at once. But also don't start expanding the API until _after_ stabilization, as was the case with `std::future`.

__edit:__ the feature gate has been changed to `async_stream` to match the feature gate proposed in the RFC.

## Conclusion

This PR introduces `core::stream::{Stream, Next}` and re-exports it from `std` as `std::stream::{Stream, Next}`. Landing `Stream` in the stdlib has been a mult-year process; and it's incredibly exciting for this to finally happen!

---

r? `````@KodrAus`````
cc/ `````@rust-lang/wg-async-foundations````` `````@rust-lang/libs`````
@nellshamrell
Copy link
Contributor Author

Gentle ping to @rust-lang/libs - this only needs one more approval for FCP!

@dtolnay
Copy link
Member

dtolnay commented Feb 8, 2021

Approving on behalf of withoutboats, as per rust-lang/team#526.

@rfcbot rfcbot added final-comment-period Will be merged/postponed/closed in ~10 calendar days unless new substational objections are raised. and removed proposed-final-comment-period Currently awaiting signoff of all team members in order to enter the final comment period. labels Feb 8, 2021
@rfcbot
Copy link
Collaborator

rfcbot commented Feb 8, 2021

🔔 This is now entering its final comment period, as per the review above. 🔔

@rfcbot rfcbot added finished-final-comment-period The final comment period is finished for this RFC. to-announce and removed final-comment-period Will be merged/postponed/closed in ~10 calendar days unless new substational objections are raised. labels Feb 18, 2021
@rfcbot
Copy link
Collaborator

rfcbot commented Feb 18, 2021

The final comment period, with a disposition to merge, as per the review above, is now complete.

As the automated representative of the governance process, I would like to thank the author for their work and everyone else who contributed.

The RFC will be merged soon.

@jplatte
Copy link
Contributor

jplatte commented Mar 22, 2021

FCP has been complete for a bit over a month now, is something else missing for this to be merged?

@nikomatsakis
Copy link
Contributor

No! I thought I did the merge! Maybe I forgot to push?

@nikomatsakis
Copy link
Contributor

Hmm, maybe I removed the merge commit or something. I'm not sure why github doesn't show this as merged but .. it is. Going to close!

@jplatte
Copy link
Contributor

jplatte commented Mar 22, 2021

Looks like a rebase instead of a merge, and GitHub only "understands" command-line PR merging if the exact original commits are included (i.e. if it's a merge or fast-forward, not rebase or squash).

@nikomatsakis
Copy link
Contributor

Yeah I guess I rebased without realizing it. Oh well!


Unfortunately, async methods in traits are not currently supported,
and there [are a number of challenges to be
resolved](https://rust-lang.github.io/wg-async-foundations/design_notes/async_fn_in_traits.html)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Broken link

with them.

Unfortunately, the use of poll does mean that it is harder to write
stream implementations. The long-term fix for this, discussed in the [Future possiblilities](future-possibilities) section, is dedicated [generator syntax].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Broken link

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any chance you can open a PR with fixes?

LegNeato pushed a commit to LegNeato/juniper that referenced this pull request Jun 26, 2021
Recently tokio got a first stable release and many libraries &
applications already migrated to the newest version.

This changes upgrades tokio version to 1.0.2:

* Tokio renamed some of its features, e.g `rt-util` and `rt-core` now
  combined into `rt`.

* `stream` feature got extracted to a separate crate
  [tokio-stream](https://docs.rs/tokio-stream/0.1.2/tokio_stream),
  waiting for eventual `Stream` landing to the Rust std
  library. [RFC](rust-lang/rfcs#2996)

[TODO]

Actix's integration test_actix_ws_integration test still fails due to,
I guess, async rt is not being initialized. Apart from that, the tests
are green. Please feel free to take over this effort.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-futures Futures related proposals disposition-merge This RFC is in PFCP or FCP with a disposition to merge it. finished-final-comment-period The final comment period is finished for this RFC. Libs-Tracked Libs issues that are tracked on the team's project board. T-libs-api Relevant to the library API team, which will review and decide on the RFC. to-announce
Projects
None yet
Development

Successfully merging this pull request may close these issues.