diff --git a/src/prefect/utilities/processutils.py b/src/prefect/utilities/processutils.py index 46729b5c59b2..c17250256b5e 100644 --- a/src/prefect/utilities/processutils.py +++ b/src/prefect/utilities/processutils.py @@ -8,7 +8,7 @@ import anyio.abc from anyio.streams.text import TextReceiveStream, TextSendStream -TextSink = Union[TextIO, TextSendStream] +TextSink = Union[anyio.AsyncFile, TextIO, TextSendStream] @asynccontextmanager @@ -101,9 +101,17 @@ async def consume_process_output( async def stream_text(source: TextReceiveStream, sink: Optional[TextSink]): + if isinstance(sink, TextIOBase): + # Convert the blocking sink to an async-compatible object + sink = anyio.wrap_file(sink) + async for item in source: if isinstance(sink, TextSendStream): await sink.send(item) - elif isinstance(sink, TextIOBase): - sink.write(item) - sink.flush() + elif isinstance(sink, anyio.AsyncFile): + await sink.write(item) + await sink.flush() + elif sink is None: + pass # Consume the item but perform no action + else: + raise TypeError(f"Unsupported sink type {type(sink).__name__}")