Skip to content

Commit

Permalink
Ch. 17.04 (NoStarch edits): second section
Browse files Browse the repository at this point in the history
  • Loading branch information
chriskrycho committed Jan 16, 2025
1 parent 1b9b50e commit 60d1f4a
Showing 1 changed file with 60 additions and 56 deletions.
116 changes: 60 additions & 56 deletions src/ch17-04-streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,24 +121,17 @@ we can do that _is_ unique to streams.
### Composing Streams

Many concepts are naturally represented as streams: items becoming available in
a queue, or working with more data than can fit in a computer’s memory by only
pulling chunks of it from the file system at a time, or data arriving over the
a queue, chunks of data being pulled incrementally from the filesystem when the
full data set is too large for the computer’s , or data arriving over the
network over time. Because streams are futures, we can use them with any other
kind of future, too, and we can combine them in interesting ways. For example,
we can batch up events to avoid triggering too many network calls, set timeouts
on sequences of long-running operations, or throttle user interface events to
avoid doing needless work.
kind of future and combine them in interesting ways. For example, we can batch
up events to avoid triggering too many network calls, set timeouts on sequences
of long-running operations, or throttle user interface events to avoid doing
needless work.

Let’s start by building a little stream of messages, as a stand-in for a stream
of data we might see from a WebSocket or another real-time communication
protocol. In Listing 17-33, we create a function `get_messages` which returns
`impl Stream<Item = String>`. For its implementation, we create an async
channel, loop over the first ten letters of the English alphabet, and send them
across the channel.

We also use a new type: `ReceiverStream`, which converts the `rx` receiver from
the `trpl::channel` into a `Stream` with a `next` method. Back in `main`, we use
a `while let` loop to print all the messages from the stream.
protocol, as shown in Listing 17-33.

<Listing number="17-33" caption="Using the `rx` receiver as a `ReceiverStream`" file-name="src/main.rs">

Expand All @@ -148,6 +141,14 @@ a `while let` loop to print all the messages from the stream.

</Listing>

First, we create a function called `get_messages` that returns `impl Stream<Item
= String>`. For its implementation, we create an async channel, loop over the
first 10 letters of the English alphabet, and send them across the channel.

We also use a new type: `ReceiverStream`, which converts the `rx` receiver from
the `trpl::channel` into a `Stream` with a `next` method. Back in `main`, we use
a `while let` loop to print all the messages from the stream.

When we run this code, we get exactly the results we would expect:

<!-- Not extracting output because changes to this output aren't significant;
Expand All @@ -167,19 +168,12 @@ Message: 'i'
Message: 'j'
```

We could do this with the regular `Receiver` API, or even the regular `Iterator`
API, though. Let’s add something that requires streams: adding a timeout
which applies to every item in the stream, and a delay on the items we emit.
Again, we could do this with the regular `Receiver` API or even the regular
`Iterator` API, though, so let’s add a feature that requires streams: adding a
timeout that applies to every item in the stream, and a delay on the items we
emit, as shown in Listing 17-34.


In Listing 17-34, we start by adding a timeout to the stream with the `timeout`
method, which comes from the `StreamExt` trait. Then we update the body of the
`while let` loop, because the stream now returns a `Result`. The `Ok` variant
indicates a message arrived in time; the `Err` variant indicates that the
timeout elapsed before any message arrived. We `match` on that result and either
print the message when we receive it successfully, or print a notice about the
timeout. Finally, notice that we pin the messages after applying the timeout to
them, because the timeout helper produces a stream which needs to be pinned to
be polled.

<Listing number="17-34" caption="Using the `StreamExt::timeout` method to set a time limit on the items in a stream" file-name="src/main.rs">

Expand All @@ -189,14 +183,19 @@ be polled.

</Listing>

We start by adding a timeout to the stream with the `timeout` method, which
comes from the `StreamExt` trait. Then we update the body of the `while let`
loop, because the stream now returns a `Result`. The `Ok` variant indicates a
message arrived in time; the `Err` variant indicates that the timeout elapsed
before any message arrived. We `match` on that result and either print the
message when we receive it successfully or print a notice about the timeout.
Finally, notice that we pin the messages after applying the timeout to them,
because the timeout helper produces a stream that needs to be pinned to be
polled.

However, because there are no delays between messages, this timeout does not
change the behavior of the program. Let’s add a variable delay to the messages
we send. In `get_messages`, we use the `enumerate` iterator method with the
`messages` array so that we can get the index of each item we are sending along
with the item itself. Then we apply a 100 millisecond delay to even-index items
and a 300 millisecond delay to odd-index items, to simulate the different delays
we might see from a stream of messages in the real world. Because our timeout is
for 200 milliseconds, this should affect half of the messages.
we send, as shown in Listing 17-35.

<Listing number="17-35" caption="Sending messages through `tx` with an async delay without making `get_messages` an async function" file-name="src/main.rs">

Expand All @@ -206,30 +205,36 @@ for 200 milliseconds, this should affect half of the messages.

</Listing>

In `get_messages`, we use the `enumerate` iterator method with the `messages`
array so that we can get the index of each item we’re sending along with the
item itself. Then we apply a 100-millisecond delay to even-index items and a
300-millisecond delay to odd-index items to simulate the different delays we
might see from a stream of messages in the real world. Because our timeout is
for 200 milliseconds, this should affect half of the messages.

To sleep between messages in the `get_messages` function without blocking, we
need to use async. However, we can’t make `get_messages` itself into an async
function, because then we’d return a `Future<Output = Stream<Item = String>>`
instead of a `Stream<Item = String>>`. The caller would have to await
`get_messages` itself to get access to the stream. But remember: everything in a
given future happens linearly; concurrency happens _between_ futures. Awaiting
`get_messages` would require it to send all the messages, including sleeping
between sending each message, before returning the receiver stream. As a result,
the timeout would end up useless. There would be no delays in the stream itself:
the delays would all happen before the stream was even available.
`get_messages` would require it to send all the messages, including the sleep
delay between each message, before returning the receiver stream. As a result,
the timeout would be useless. There would be no delays in the stream itself;
they would all happen before the stream was even available.

Instead, we leave `get_messages` as a regular function which returns a stream,
and spawn a task to handle the async `sleep` calls.
Instead, we leave `get_messages` as a regular function that returns a stream,
and we spawn a task to handle the async `sleep` calls.

> Note: calling `spawn_task` in this way works because we already set up our
> runtime. Calling this particular implementation of `spawn_task` _without_
> first setting up a runtime will cause a panic. Other implementations choose
> different tradeoffs: they might spawn a new runtime and so avoid the panic but
> end up with a bit of extra overhead, or simply not provide a standalone way to
> spawn tasks without reference to a runtime. You should make sure you know what
> tradeoff your runtime has chosen and write your code accordingly!
> Note: Calling `spawn_task` in this way works because we already set up our
> runtime; had we not, it would cause a panic. Other implementations choose
> different tradeoffs: they might spawn a new runtime and avoid the panic but
> end up with a bit of extra overhead, or they may simply not provide a
> standalone way to spawn tasks without reference to a runtime. Make sure you
> know what tradeoff your runtime has chosen and write your code accordingly!
Now our code has a much more interesting result! Between every other pair of
messages, we see an error reported: `Problem: Elapsed(())`.
Now our code has a much more interesting result. Between every other pair of
messages, a `Problem: Elapsed(())` error.

<!-- manual-regeneration
cd listings/ch17-async-await/listing-17-35
Expand All @@ -255,16 +260,15 @@ Problem: Elapsed(())
Message: 'j'
```

The timeout doesn’t prevent the messages from arriving in the end—we still get
all of the original messages. This is because our channel is unbounded: it can
hold as many messages as we can fit in memory. If the message doesn’t arrive
before the timeout, our stream handler will account for that, but when it polls
the stream again, the message may now have arrived.
The timeout doesn’t prevent the messages from arriving in the end. We still get
all of the original messages, because our channel is unbounded: it can hold as
many messages as we can fit in memory. If the message doesn’t arrive before the
timeout, our stream handler will account for that, but when it polls the stream
again, the message may now have arrived.

You can get different behavior if needed by using other kinds of channels, or
other kinds of streams more generally. Let’s see one of those in practice in our
final example for this section, by combining a stream of time intervals with
this stream of messages.
You can get different behavior if needed by using other kinds of channels or
other kinds of streams more generally. Let’s see one of those in practice by
combining a stream of time intervals with this stream of messages.

### Merging Streams

Expand Down

0 comments on commit 60d1f4a

Please sign in to comment.