Skip to content

Commit

Permalink
Don't instantiate new CurrentThreadScheduler._Local with each schedul…
Browse files Browse the repository at this point in the history
…er instance (#363)

* Don't instantiate new CurrentThreadScheduler._Local with each scheduler instance

* Add some notes about CurrentThreadScheduler behaviour
  • Loading branch information
erikkemperman authored May 5, 2019
1 parent 148d61e commit 93f94ca
Showing 1 changed file with 12 additions and 4 deletions.
16 changes: 12 additions & 4 deletions rx/concurrency/currentthreadscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,18 @@ class CurrentThreadScheduler(SchedulerBase):
"""Represents an object that schedules units of work on the current thread.
You never want to schedule timeouts using the CurrentThreadScheduler since
that will block the current thread while waiting.
Please note, there will be at most a single instance per thread -- calls to
the constructor will just return the same instance if one already exists.
Conversely, if you pass an instance to another thread, it will effectively
behave as a separate scheduler, with its own queue. In particular, this
implies that you can't make assumptions about the execution order of items
that were scheduled by different threads -- even if they were submitted to
what superficially appears to be a single scheduler instance.
"""

_local = _Local()
_global: MutableMapping[threading.Thread, 'CurrentThreadScheduler'] = WeakKeyDictionary()

def __new__(cls) -> 'CurrentThreadScheduler':
Expand All @@ -62,9 +72,7 @@ def __new__(cls) -> 'CurrentThreadScheduler':
def __init__(self) -> None:
"""Creates a scheduler that schedules work as soon as possible
on the current thread."""

super().__init__()
self._local = _Local()

def schedule(self,
action: typing.ScheduledAction,
Expand Down Expand Up @@ -127,7 +135,7 @@ def schedule_absolute(self,

si: ScheduledItem[typing.TState] = ScheduledItem(self, state, action, duetime)

local: _Local = self._local
local: _Local = CurrentThreadScheduler._local
local.queue.enqueue(si)
if local.idle:
local.idle = False
Expand All @@ -147,7 +155,7 @@ def schedule_required(self) -> bool:
False; otherwise, if the trampoline is not active, then it
returns True.
"""
return self._local.idle
return CurrentThreadScheduler._local.idle

def ensure_trampoline(self, action):
"""Method for testing the CurrentThreadScheduler."""
Expand Down

0 comments on commit 93f94ca

Please sign in to comment.