diff --git a/rx/concurrency/currentthreadscheduler.py b/rx/concurrency/currentthreadscheduler.py index 50862d662..8ad57ee31 100644 --- a/rx/concurrency/currentthreadscheduler.py +++ b/rx/concurrency/currentthreadscheduler.py @@ -36,9 +36,6 @@ class CurrentThreadScheduler(SchedulerBase): You never want to schedule timeouts using the CurrentThreadScheduler since that will block the current thread while waiting. """ - - _global: MutableMapping[threading.Thread, 'CurrentThreadScheduler'] = WeakKeyDictionary() - class _Local(threading.local): __slots__ = 'idle', 'queue' @@ -46,6 +43,9 @@ def __init__(self): self.idle: bool = True self.queue: PriorityQueue[ScheduledItem[typing.TState]] = PriorityQueue() + _local = _Local() + _global: MutableMapping[threading.Thread, 'CurrentThreadScheduler'] = WeakKeyDictionary() + def __new__(cls) -> 'CurrentThreadScheduler': """Ensure that each thread has at most a single instance.""" @@ -59,8 +59,7 @@ def __new__(cls) -> 'CurrentThreadScheduler': def __init__(self) -> None: """Creates a scheduler that schedules work as soon as possible on the current thread.""" - - self._local = CurrentThreadScheduler._Local() + super().__init__() def schedule(self, action: typing.ScheduledAction, @@ -114,7 +113,7 @@ def schedule_absolute(self, duetime: typing.AbsoluteTime, si: ScheduledItem[typing.TState] = ScheduledItem(self, state, action, duetime) - local: CurrentThreadScheduler._Local = self._local + local: CurrentThreadScheduler._Local = CurrentThreadScheduler._local local.queue.enqueue(si) if local.idle: local.idle = False @@ -134,7 +133,7 @@ def schedule_required(self) -> bool: False; otherwise, if the trampoline is not active, then it returns True. """ - return self._local.idle + return CurrentThreadScheduler._local.idle def ensure_trampoline(self, action): """Method for testing the CurrentThreadScheduler."""