Skip to content

Commit

Permalink
Switch back to raising Lagged by default
Browse files Browse the repository at this point in the history
Makes the broadcast test suite not hang xD, and is our expected default
behaviour. Also removes a ton of commented legacy cruft from before the
refactor to remove the `.receive()` recursion and fixes some typing.

Oh right, and in the case where there's only one subscriber left we warn
log about it since in theory we could actually entirely unwind the
bcaster back to the original underlying, though not sure if that's sane
or works for some use cases (like wanting to have some other subscriber
get added dynamically later).
  • Loading branch information
goodboy committed Jan 29, 2023
1 parent 80f9838 commit 4ce2dcd
Showing 1 changed file with 23 additions and 110 deletions.
133 changes: 23 additions & 110 deletions tractor/trionics/_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,22 @@ class BroadcastState(Struct):
# If the broadcaster was cancelled, we might as well track it
cancelled: dict[int, Task] = {}

def statistics(self) -> dict[str, str | int | float]:
def statistics(self) -> dict[str, Any]:
'''
Return broadcast receiver group "statistics" like many of
``trio``'s internal task-sync primitives.
'''
key: int | None
ev: trio.Event | None

subs = self.subs
if self.recv_ready is not None:
key, ev = self.recv_ready
else:
key = ev = None

qlens = {}
qlens: dict[int, int] = {}
for tid, sz in subs.items():
qlens[tid] = sz if sz != -1 else 0

Expand Down Expand Up @@ -154,7 +157,7 @@ def __init__(
rx_chan: AsyncReceiver,
state: BroadcastState,
receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None,
raise_on_lag: bool = False,
raise_on_lag: bool = True,

) -> None:

Expand All @@ -180,7 +183,12 @@ def receive_nowait(
_key: int | None = None,
_state: BroadcastState | None = None,

) -> ReceiveType:
) -> Any:
'''
Sync version of `.receive()` which does all the low level work
of receiving from the underlying/wrapped receive channel.
'''
key = _key or self.key
state = _state or self._state

Expand Down Expand Up @@ -273,7 +281,6 @@ async def _receive_from_underlying(
# already retreived the last value

# XXX: which of these impls is fastest?

# subs = state.subs.copy()
# subs.pop(key)

Expand Down Expand Up @@ -310,7 +317,6 @@ async def _receive_from_underlying(
raise

finally:

# Reset receiver waiter task event for next blocking condition.
# this MUST be reset even if the above ``.recv()`` call
# was cancelled to avoid the next consumer from blocking on
Expand All @@ -330,83 +336,14 @@ async def receive(self) -> ReceiveType:
pass

# current task already has the latest value **and** is the
# first task to begin waiting for a new one
# first task to begin waiting for a new one so we begin blocking
# until rescheduled with the a new value from the underlying.
if state.recv_ready is None:
return await self._receive_from_underlying(key, state)

# if self._closed:
# raise trio.ClosedResourceError

# event = trio.Event()
# state.recv_ready = key, event

# try:
# # if we're cancelled here it should be
# # fine to bail without affecting any other consumers
# # right?
# value = await self._recv()

# # items with lower indices are "newer"
# # NOTE: ``collections.deque`` implicitly takes care of
# # trucating values outside our ``state.maxlen``. In the
# # alt-backend-array-case we'll need to make sure this is
# # implemented in similar ringer-buffer-ish style.
# state.queue.appendleft(value)

# # broadcast new value to all subscribers by increasing
# # all sequence numbers that will point in the queue to
# # their latest available value.

# # don't decrement the sequence for this task since we
# # already retreived the last value

# # XXX: which of these impls is fastest?

# # subs = state.subs.copy()
# # subs.pop(key)

# for sub_key in filter(
# # lambda k: k != key, state.subs,
# partial(ne, key), state.subs,
# ):
# state.subs[sub_key] += 1

# # NOTE: this should ONLY be set if the above task was *NOT*
# # cancelled on the `._recv()` call.
# event.set()
# return value

# except trio.EndOfChannel:
# # if any one consumer gets an EOC from the underlying
# # receiver we need to unblock and send that signal to
# # all other consumers.
# self._state.eoc = True
# if event.statistics().tasks_waiting:
# event.set()
# raise

# except (
# trio.Cancelled,
# ):
# # handle cancelled specially otherwise sibling
# # consumers will be awoken with a sequence of -1
# # and will potentially try to rewait the underlying
# # receiver instead of just cancelling immediately.
# self._state.cancelled[key] = current_task()
# if event.statistics().tasks_waiting:
# event.set()
# raise

# finally:

# # Reset receiver waiter task event for next blocking condition.
# # this MUST be reset even if the above ``.recv()`` call
# # was cancelled to avoid the next consumer from blocking on
# # an event that won't be set!
# state.recv_ready = None

# This task is all caught up and ready to receive the latest
# value, so queue sched it on the internal event.
# value, so queue/schedule it to be woken on the next internal
# event.
else:
while state.recv_ready is not None:
# seq = state.subs[key]
Expand All @@ -419,9 +356,7 @@ async def receive(self) -> ReceiveType:
_state=state,
)
except trio.WouldBlock:
if (
self._closed
):
if self._closed:
raise trio.ClosedResourceError

subs = state.subs
Expand All @@ -433,9 +368,12 @@ async def receive(self) -> ReceiveType:
# XXX: we are the last and only user of this BR so
# likely it makes sense to unwind back to the
# underlying?
import tractor
await tractor.breakpoint()

# import tractor
# await tractor.breakpoint()
log.warning(
f'Only one sub left for {self}?\n'
'We can probably unwind from breceiver?'
)

# XXX: In the case where the first task to allocate the
# ``.recv_ready`` event is cancelled we will be woken
Expand All @@ -445,33 +383,8 @@ async def receive(self) -> ReceiveType:
# been incremented and then receive again.
# return await self.receive()

# if state.recv_ready is None:

print(f'{key}: {state.statistics()}')
return await self._receive_from_underlying(key, state)

# seq = state.subs[key]

# NOTE: if we ever would like the behaviour where if the
# first task to recv on the underlying is cancelled but it
# still DOES trigger the ``.recv_ready``, event we'll likely need
# this logic:

# if seq > -1:
# # stuff from above..
# seq = state.subs[key]

# value = state.queue[seq]
# state.subs[key] -= 1
# return value

# elif (
# seq == -1
# ):

# else:
raise RuntimeError(f'Unable to receive {key}:\n{state.statistics()}')

@asynccontextmanager
async def subscribe(
self,
Expand Down

0 comments on commit 4ce2dcd

Please sign in to comment.