From c45a46429340a4edb199af01637ce19fdf16a873 Mon Sep 17 00:00:00 2001 From: Dongkuo Ma Date: Thu, 17 Jun 2021 21:41:25 +0800 Subject: [PATCH] fixed a few small mistakes in streams userguide (#161) --- docs/userguide/streams.rst | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/docs/userguide/streams.rst b/docs/userguide/streams.rst index 6e4d92106..4c4953002 100644 --- a/docs/userguide/streams.rst +++ b/docs/userguide/streams.rst @@ -176,9 +176,9 @@ have processed so we can commit and advance the consumer group offset. We use reference counting for this, so when you define an agent that iterates over the topic as a stream:: - @app.agent(topic) - async def process(stream): - async for value in stream: + @app.agent(topic) + async def process(stream): + async for value in stream: print(value) The act of starting that stream iterator will add the topic to @@ -206,8 +206,8 @@ If two agents use streams subscribed to the same topic:: @app.agent(topic) async def processA(stream): - async for value in stream: - print(f'A: {value}') + async for value in stream: + print(f'A: {value}') @app.agent(topic) async def processB(stream): @@ -239,13 +239,13 @@ What this means is that an event is acknowledged when your agent is finished handling it, but you can also manually control when it happens. To manually control when the event is acknowledged, and its reference count -decreased, use ``await event.ack()`` +decreased, use ``await event.ack()``:: - async for event in stream.events(): - print(event.value) - await event.ack() + async for event in stream.events(): + print(event.value) + await event.ack() - You can also use :keyword:`async for` on the event:: +You can also use :keyword:`async with` on the event:: async for event in stream.events(): async with event: