Skip to content

Commit

Permalink
Scheduler docstrings, typing, polish (#362)
Browse files Browse the repository at this point in the history
* Add local coverage html report to gitignore

* Polish mainloop schedulers (typing, docstrings)

* Polish mainloop scheduler tests

* Polish schedulers, typing and docstring

* Polish scheduler tests (typing, docstrings)

* Test that scheduler clocks use consistent units

* Process suggestions by @jcafhe
  • Loading branch information
erikkemperman authored May 4, 2019
1 parent f9ac3c8 commit b0d75e4
Show file tree
Hide file tree
Showing 41 changed files with 1,548 additions and 738 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pip-log.txt
.tox
nosetests.xml
coverage.xml
htmlcov
.mypy_cache
.pytest_cache

Expand Down
173 changes: 129 additions & 44 deletions rx/concurrency/catchscheduler.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,161 @@
from rx.disposable import Disposable
from rx.disposable import SingleAssignmentDisposable
from datetime import datetime
from typing import Callable, Optional

from rx.core import typing
from rx.disposable import Disposable, SingleAssignmentDisposable

from .schedulerbase import SchedulerBase


class CatchScheduler(SchedulerBase):
def __init__(self, scheduler, handler):
"""Create new CatchScheduler.

Returns a scheduler that wraps the original scheduler, adding
exception handling for scheduled actions.
def __init__(self,
scheduler: typing.Scheduler,
handler: Callable[[Exception], bool]
) -> None:
"""Wraps a scheduler, passed as constructor argument, adding exception
handling for scheduled actions. The handler should return True to
indicate it handled the exception successfully. Falsy return values will
be taken to indicate that the exception should be escalated (raised by
this scheduler).
Args:
scheduler: The scheduler to be wrapped.
handler: Callable to handle exceptions raised by wrapped scheduler.
"""
self._scheduler = scheduler
self._handler = handler
self._recursive_original = None
self._recursive_wrapper = None

super(CatchScheduler, self).__init__()
super().__init__()
self._scheduler: typing.Scheduler = scheduler
self._handler: Callable[[Exception], bool] = handler
self._recursive_original: typing.Scheduler = None
self._recursive_wrapper: 'CatchScheduler' = None

@property
def now(self) -> datetime:
"""Represents a notion of time for this scheduler. Tasks being
scheduled on a scheduler will adhere to the time denoted by this
property.
Returns:
The scheduler's current time, as a datetime instance.
"""

def local_now(self):
return self._scheduler.now

def schedule_now(self, state, action):
"""Schedules an action to be executed."""
def schedule(self,
action: typing.ScheduledAction,
state: Optional[typing.TState] = None
) -> typing.Disposable:
"""Schedules an action to be executed.
Args:
action: Action to be executed.
state: [Optional] state to be given to the action function.
Returns:
The disposable object used to cancel the scheduled action
(best effort).
"""

action = self._wrap(action)
return self._scheduler.schedule(action, state=state)

def schedule_relative(self,
duetime: typing.RelativeTime,
action: typing.ScheduledAction,
state: Optional[typing.TState] = None
) -> typing.Disposable:
"""Schedules an action to be executed after duetime.
Args:
duetime: Relative time after which to execute the action.
action: Action to be executed.
state: [Optional] state to be given to the action function.
return self._scheduler.scheduleWithState(state, self._wrap(action))
Returns:
The disposable object used to cancel the scheduled action
(best effort).
"""

action = self._wrap(action)
return self._scheduler.schedule_relative(duetime, action, state=state)

def schedule_absolute(self,
duetime: typing.AbsoluteTime,
action: typing.ScheduledAction,
state: Optional[typing.TState] = None
) -> typing.Disposable:
"""Schedules an action to be executed at duetime.
def schedule_relative(self, duetime, action, state=None):
"""Schedules an action to be executed after duetime."""
Args:
duetime: Absolute time at which to execute the action.
action: Action to be executed.
state: [Optional] state to be given to the action function.
Returns:
The disposable object used to cancel the scheduled action
(best effort).
"""

return self._scheduler.schedule_relative(duetime, self._wrap(action), state=state)
action = self._wrap(action)
return self._scheduler.schedule_absolute(duetime, action, state=state)

def schedule_periodic(self,
period: typing.RelativeTime,
action: typing.ScheduledPeriodicAction,
state: Optional[typing.TState] = None
) -> typing.Disposable:
"""Schedules a periodic piece of work.
Args:
period: Period in seconds or timedelta for running the
work periodically.
action: Action to be executed.
state: [Optional] Initial state passed to the action upon
the first iteration.
Returns:
The disposable object used to cancel the scheduled
recurring action (best effort).
"""
d = SingleAssignmentDisposable()
failed = False

def schedule_absolute(self, duetime, action, state=None):
"""Schedules an action to be executed at duetime."""
def periodic_action(periodic_state) -> Optional[typing.TState]:
nonlocal failed
if failed:
return None
try:
return action(periodic_state)
except Exception as ex:
failed = True
if not self._handler(ex):
raise Exception(ex)
d.dispose()
return None

return self._scheduler.schedule_absolute(duetime, self._wrap(action), state=state)
d.disposable = self._scheduler.schedule_periodic(period, periodic_action, state=state)
return d

def _clone(self, scheduler):
def _clone(self, scheduler: typing.Scheduler) -> 'CatchScheduler':
return CatchScheduler(scheduler, self._handler)

def _wrap(self, action):
def _wrap(self, action: typing.ScheduledAction) -> typing.ScheduledAction:
parent = self

def wrapped_action(self, state):
def wrapped_action(self,
state: Optional[typing.TState]
) -> Optional[typing.Disposable]:
try:
return action(parent._get_recursive_wrapper(self), state)
except Exception as ex:
if not parent._handler(ex):
raise Exception(ex)
return Disposable()

return wrapped_action

def _get_recursive_wrapper(self, scheduler):
def _get_recursive_wrapper(self, scheduler) -> 'CatchScheduler':
if self._recursive_original != scheduler:
self._recursive_original = scheduler
wrapper = self._clone(scheduler)
Expand All @@ -60,22 +164,3 @@ def _get_recursive_wrapper(self, scheduler):
self._recursive_wrapper = wrapper

return self._recursive_wrapper

def schedule_periodic(self, period, action, state=None):
d = SingleAssignmentDisposable()
failed = [False]

def periodic_action(periodic_state):
if failed[0]:
return None
try:
return action(periodic_state)
except Exception as ex:
failed[0] = True
if not self._handler(ex):
raise Exception(ex)
d.dispose()
return None

d.disposable = self._scheduler.schedule_periodic(periodic_action, period, state)
return d
41 changes: 27 additions & 14 deletions rx/concurrency/currentthreadscheduler.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
# Current Thread Scheduler
import time
import logging
import threading
from weakref import WeakKeyDictionary
import time
from typing import MutableMapping, Optional
from weakref import WeakKeyDictionary

from rx.core import typing
from rx.internal import PriorityQueue
Expand All @@ -12,6 +11,7 @@
from .schedulerbase import SchedulerBase
from .scheduleditem import ScheduledItem


log = logging.getLogger('Rx')


Expand All @@ -31,6 +31,16 @@ def run(cls, queue: PriorityQueue[ScheduledItem[typing.TState]]) -> None:
time.sleep(diff.total_seconds())


class _Local(threading.local):
__slots__ = 'idle', 'queue'

def __init__(self) -> None:
super().__init__()
self.idle: bool = True
self.queue: PriorityQueue[
ScheduledItem[typing.TState]] = PriorityQueue()


class CurrentThreadScheduler(SchedulerBase):
"""Represents an object that schedules units of work on the current thread.
You never want to schedule timeouts using the CurrentThreadScheduler since
Expand All @@ -39,13 +49,6 @@ class CurrentThreadScheduler(SchedulerBase):

_global: MutableMapping[threading.Thread, 'CurrentThreadScheduler'] = WeakKeyDictionary()

class _Local(threading.local):
__slots__ = 'idle', 'queue'

def __init__(self):
self.idle: bool = True
self.queue: PriorityQueue[ScheduledItem[typing.TState]] = PriorityQueue()

def __new__(cls) -> 'CurrentThreadScheduler':
"""Ensure that each thread has at most a single instance."""

Expand All @@ -60,16 +63,19 @@ def __init__(self) -> None:
"""Creates a scheduler that schedules work as soon as possible
on the current thread."""

self._local = CurrentThreadScheduler._Local()
super().__init__()
self._local = _Local()

def schedule(self,
action: typing.ScheduledAction,
state: Optional[typing.TState] = None
) -> typing.Disposable:
"""Schedules an action to be executed.
Args:
action: Action to be executed.
state: [Optional] state to be given to the action function.
Returns:
The disposable object used to cancel the scheduled action
(best effort).
Expand All @@ -83,10 +89,12 @@ def schedule_relative(self,
state: Optional[typing.TState] = None
) -> typing.Disposable:
"""Schedules an action to be executed after duetime.
Args:
duetime: Relative time after which to execute the action.
action: Action to be executed.
state: [Optional] state to be given to the action function.
Returns:
The disposable object used to cancel the scheduled action
(best effort).
Expand All @@ -95,16 +103,21 @@ def schedule_relative(self,
duetime = SchedulerBase.normalize(self.to_timedelta(duetime))
return self.schedule_absolute(self.now + duetime, action, state=state)

def schedule_absolute(self, duetime: typing.AbsoluteTime,
def schedule_absolute(self,
duetime: typing.AbsoluteTime,
action: typing.ScheduledAction,
state: Optional[typing.TState] = None
) -> typing.Disposable:
"""Schedules an action to be executed at duetime.
Args:
duetime: Absolute time after which to execute the action.
duetime: Absolute time at which to execute the action.
action: Action to be executed.
state: [Optional] state to be given to the action function.
Returns:
The disposable object used to cancel the scheduled action
(best effort).
"""

duetime = self.to_datetime(duetime)
Expand All @@ -114,7 +127,7 @@ def schedule_absolute(self, duetime: typing.AbsoluteTime,

si: ScheduledItem[typing.TState] = ScheduledItem(self, state, action, duetime)

local: CurrentThreadScheduler._Local = self._local
local: _Local = self._local
local.queue.enqueue(si)
if local.idle:
local.idle = False
Expand Down
Loading

0 comments on commit b0d75e4

Please sign in to comment.