Skip to content

Commit

Permalink
Improve robustness of subprocess text streaming (#6445)
Browse files Browse the repository at this point in the history
Previously, this utility would call blocking writes to stderr and stdout directly. This appears to introduce the possibility of race conditions and blocked event loops when many processes are run concurrently. Since we use these utilities to launch flows in parallel processes from the agent, it is important that they are robust to concurrency. `wrap_file` _may_ not be thread safe, but this is still an improvement from where we were. There is not a clear suggested pattern for this, see extensive discussion at python-trio/trio#174.
  • Loading branch information
zanieb authored Aug 17, 2022
1 parent c014d40 commit 5a22374
Showing 1 changed file with 12 additions and 4 deletions.
16 changes: 12 additions & 4 deletions src/prefect/utilities/processutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import anyio.abc
from anyio.streams.text import TextReceiveStream, TextSendStream

TextSink = Union[TextIO, TextSendStream]
TextSink = Union[anyio.AsyncFile, TextIO, TextSendStream]


@asynccontextmanager
Expand Down Expand Up @@ -101,9 +101,17 @@ async def consume_process_output(


async def stream_text(source: TextReceiveStream, sink: Optional[TextSink]):
if isinstance(sink, TextIOBase):
# Convert the blocking sink to an async-compatible object
sink = anyio.wrap_file(sink)

async for item in source:
if isinstance(sink, TextSendStream):
await sink.send(item)
elif isinstance(sink, TextIOBase):
sink.write(item)
sink.flush()
elif isinstance(sink, anyio.AsyncFile):
await sink.write(item)
await sink.flush()
elif sink is None:
pass # Consume the item but perform no action
else:
raise TypeError(f"Unsupported sink type {type(sink).__name__}")

0 comments on commit 5a22374

Please sign in to comment.