Skip to content

Commit

Permalink
Merge pull request #46 from phettberg/sinkasync-error-callback
Browse files Browse the repository at this point in the history
Fix missing error_callback call in SinkAsync
  • Loading branch information
semiversus authored Jan 18, 2024
2 parents 1554090 + aa9375e commit 4fd21be
Showing 1 changed file with 11 additions and 1 deletion.
12 changes: 11 additions & 1 deletion broqer/subscribers/sink_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
>>> _d.dispose()
"""

import asyncio
import sys

from functools import wraps
from typing import Any

Expand Down Expand Up @@ -55,7 +58,14 @@ def __init__(self, coro, *args, mode=AsyncMode.CONCURRENT,
self._error_callback = error_callback

def emit(self, value: Any, who: Publisher):
self._coro_queue.schedule(value)
future = self._coro_queue.schedule(value)
future.add_done_callback(self._done)

def _done(self, future: asyncio.Future):
try:
future.result()
except Exception: # pylint: disable=broad-except
self._error_callback(*sys.exc_info())


def build_sink_async(coro=None, *, mode: AsyncMode = AsyncMode.CONCURRENT,
Expand Down

0 comments on commit 4fd21be

Please sign in to comment.