Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't instantiate new CurrentThreadScheduler._Local with each scheduler instance #363

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions rx/concurrency/currentthreadscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,18 @@ class CurrentThreadScheduler(SchedulerBase):
"""Represents an object that schedules units of work on the current thread.
You never want to schedule timeouts using the CurrentThreadScheduler since
that will block the current thread while waiting.

Please note, there will be at most a single instance per thread -- calls to
the constructor will just return the same instance if one already exists.

Conversely, if you pass an instance to another thread, it will effectively
behave as a separate scheduler, with its own queue. In particular, this
implies that you can't make assumptions about the execution order of items
that were scheduled by different threads -- even if they were submitted to
what superficially appears to be a single scheduler instance.
"""

_local = _Local()
_global: MutableMapping[threading.Thread, 'CurrentThreadScheduler'] = WeakKeyDictionary()

def __new__(cls) -> 'CurrentThreadScheduler':
Expand All @@ -62,9 +72,7 @@ def __new__(cls) -> 'CurrentThreadScheduler':
def __init__(self) -> None:
"""Creates a scheduler that schedules work as soon as possible
on the current thread."""

super().__init__()
self._local = _Local()

def schedule(self,
action: typing.ScheduledAction,
Expand Down Expand Up @@ -127,7 +135,7 @@ def schedule_absolute(self,

si: ScheduledItem[typing.TState] = ScheduledItem(self, state, action, duetime)

local: _Local = self._local
local: _Local = CurrentThreadScheduler._local
local.queue.enqueue(si)
if local.idle:
local.idle = False
Expand All @@ -147,7 +155,7 @@ def schedule_required(self) -> bool:
False; otherwise, if the trampoline is not active, then it
returns True.
"""
return self._local.idle
return CurrentThreadScheduler._local.idle

def ensure_trampoline(self, action):
"""Method for testing the CurrentThreadScheduler."""
Expand Down