Skip to content

Commit

Permalink
SchedulerBase polish, typings, test (#361)
Browse files Browse the repository at this point in the history
* Docstrings in SchedulerBase

* SchedulerBase is abstract

* Add schedule_periodic to scheduler abc, typing

* Add unit tests for SchedulerBase class methods
  • Loading branch information
erikkemperman authored May 4, 2019
1 parent be24259 commit f9ac3c8
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 42 deletions.
178 changes: 139 additions & 39 deletions rx/concurrency/schedulerbase.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from abc import abstractmethod
from datetime import datetime, timedelta
from typing import Optional

Expand All @@ -6,15 +7,94 @@
from rx.internal.basic import default_now
from rx.internal.constants import DELTA_ZERO, UTC_ZERO


class SchedulerBase(typing.Scheduler):
"""Provides a set of static properties to access commonly used
schedulers.
"""

@property
def now(self) -> datetime:
"""Represents a notion of time for this scheduler. Tasks being
scheduled on a scheduler will adhere to the time denoted by this
property.
Returns:
The scheduler's current time, as a datetime instance.
"""

return default_now()

@abstractmethod
def schedule(self,
action: typing.ScheduledAction,
state: Optional[typing.TState] = None
) -> typing.Disposable:
"""Schedules an action to be executed.
Args:
action: Action to be executed.
state: [Optional] state to be given to the action function.
Returns:
The disposable object used to cancel the scheduled action
(best effort).
"""
return NotImplemented

@abstractmethod
def schedule_relative(self,
duetime: typing.RelativeTime,
action: typing.ScheduledAction,
state: Optional[typing.TState] = None
) -> typing.Disposable:
"""Schedules an action to be executed after duetime.
Args:
duetime: Relative time after which to execute the action.
action: Action to be executed.
state: [Optional] state to be given to the action function.
Returns:
The disposable object used to cancel the scheduled action
(best effort).
"""
return NotImplemented

@abstractmethod
def schedule_absolute(self,
duetime: typing.AbsoluteTime,
action: typing.ScheduledAction,
state: Optional[typing.TState] = None
) -> typing.Disposable:
"""Schedules an action to be executed at duetime.
Args:
duetime: Absolute time after which to execute the action.
action: Action to be executed.
state: [Optional] state to be given to the action function.
Returns:
The disposable object used to cancel the scheduled action
(best effort).
"""
return NotImplemented

def invoke_action(self,
action: typing.ScheduledAction,
state: Optional[typing.TState] = None
) -> typing.Disposable:
"""Invoke the given given action. This is typically called by instances
of ScheduledItem.
Args:
action: Action to be executed.
state: [Optional] state to be given to the action function.
Returns:
The disposable object returned by the action, if any; or a new
(no-op) disposable otherwise.
"""
ret = action(self, state)
if isinstance(ret, typing.Disposable):
return ret
Expand Down Expand Up @@ -65,69 +145,89 @@ def invoke_periodic(scheduler: typing.Scheduler, _: typing.TState) -> Optional[D
disp.disposable = self.schedule_relative(period, invoke_periodic, None)
return disp

@property
def now(self) -> datetime:
"""Returns the current time.
@classmethod
def to_seconds(cls, value: typing.AbsoluteOrRelativeTime) -> float:
"""Converts time value to seconds. This method handles both absolute
(datetime) and relative (timedelta) values. If the argument is already
a float, it is simply returned unchanged.
Represents a notion of time for this scheduler. Tasks being
scheduled on a scheduler will adhere to the time denoted by
this property.
"""
Args:
value: the time value to convert to seconds.
return default_now()
Returns:
The value converted to seconds.
"""

@classmethod
def to_seconds(cls, timespan: typing.AbsoluteOrRelativeTime) -> float:
"""Converts time value to seconds"""
if isinstance(value, datetime):
value = value - UTC_ZERO

if isinstance(timespan, datetime):
timespan = timespan - UTC_ZERO
timespan = timespan.total_seconds()
elif isinstance(timespan, timedelta):
timespan = timespan.total_seconds()
if isinstance(value, timedelta):
value = value.total_seconds()

return timespan
return value

@classmethod
def to_datetime(cls, duetime: typing.AbsoluteOrRelativeTime) -> datetime:
"""Converts time value to datetime"""
def to_datetime(cls, value: typing.AbsoluteOrRelativeTime) -> datetime:
"""Converts time value to datetime. This method handles both absolute
(float) and relative (timedelta) values. If the argument is already
a datetime, it is simply returned unchanged.
Args:
value: the time value to convert to datetime.
Returns:
The value converted to datetime.
"""

if isinstance(duetime, timedelta):
duetime = UTC_ZERO + duetime
elif not isinstance(duetime, datetime):
duetime = datetime.utcfromtimestamp(duetime)
if isinstance(value, timedelta):
value = UTC_ZERO + value
elif not isinstance(value, datetime):
value = datetime.utcfromtimestamp(value)

return duetime
return value

@classmethod
def to_timedelta(cls, timespan: typing.AbsoluteOrRelativeTime) -> timedelta:
"""Converts time value to timedelta"""
def to_timedelta(cls, value: typing.AbsoluteOrRelativeTime) -> timedelta:
"""Converts time value to timedelta. This method handles both absolute
(datetime) and relative (float) values. If the argument is already
a timedelta, it is simply returned unchanged. If the argument is an
absolute time, the result value will be the timedelta since the epoch,
January 1st, 1970, 00:00:00.
Args:
value: the time value to convert to timedelta.
Returns:
The value converted to timedelta.
"""

if isinstance(timespan, datetime):
timespan = timespan - UTC_ZERO
elif not isinstance(timespan, timedelta):
timespan = timedelta(seconds=timespan)
if isinstance(value, datetime):
value = value - UTC_ZERO
elif not isinstance(value, timedelta):
value = timedelta(seconds=value)

return timespan
return value

@classmethod
def normalize(cls, timespan: typing.RelativeTime) -> typing.RelativeTime:
"""Normalizes the specified timespan value to a positive value.
def normalize(cls, value: typing.RelativeTime) -> typing.RelativeTime:
"""Normalizes the specified time value to a non-negative value. This
method handles only relative values, given as either timedelta or float,
and will return the normalized value as that same type.
Args:
timespan: The time span value to normalize.
value: The time value to normalize.
Returns:
The specified timespan value if it is zero or positive;
otherwise, 0.0
"""

if isinstance(timespan, timedelta):
if not timespan or timespan < DELTA_ZERO:
if isinstance(value, timedelta):
if not value or value < DELTA_ZERO:
return DELTA_ZERO

elif isinstance(timespan, float):
if not timespan or timespan < 0.0:
elif isinstance(value, float):
if not value or value < 0.0:
return 0.0

return timespan
return value
3 changes: 3 additions & 0 deletions rx/core/abc/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ def schedule_relative(self, duetime, action, state=None):
@abstractmethod
def schedule_absolute(self, duetime, action, state=None):
return NotImplemented

def schedule_periodic(self, period, action, state=None):
return NotImplemented
25 changes: 22 additions & 3 deletions rx/core/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,34 @@ def now(self) -> datetime:
return NotImplemented

@abstractmethod
def schedule(self, action: 'ScheduledAction', state: TState = None) -> Disposable:
def schedule(self,
action: 'ScheduledAction',
state: Optional[TState] = None
) -> Disposable:
return NotImplemented

@abstractmethod
def schedule_relative(self, duetime: RelativeTime, action: 'ScheduledAction', state: TState = None) -> Disposable:
def schedule_relative(self,
duetime: RelativeTime,
action: 'ScheduledAction',
state: Optional[TState] = None
) -> Disposable:
return NotImplemented

@abstractmethod
def schedule_absolute(self, duetime: AbsoluteTime, action: 'ScheduledAction', state: TState = None) -> Disposable:
def schedule_absolute(self,
duetime: AbsoluteTime,
action: 'ScheduledAction',
state: Optional[TState] = None
) -> Disposable:
return NotImplemented

@abstractmethod
def schedule_periodic(self,
period: RelativeTime,
action: 'ScheduledPeriodicAction',
state: Optional[TState] = None
) -> Disposable:
return NotImplemented


Expand Down
51 changes: 51 additions & 0 deletions tests/test_concurrency/test_schedulerbase.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import unittest
from datetime import timedelta

from rx.internal.constants import DELTA_ZERO, UTC_ZERO
from rx.concurrency.schedulerbase import SchedulerBase


class TestSchedulerBase(unittest.TestCase):

def test_base_to_seconds(self):
val = SchedulerBase.to_seconds(0.0)
assert val == 0.0
val = SchedulerBase.to_seconds(DELTA_ZERO)
assert val == 0.0
val = SchedulerBase.to_seconds(UTC_ZERO)
assert val == 0.0

def test_base_to_datetime(self):
val = SchedulerBase.to_datetime(0.0)
assert val == UTC_ZERO
val = SchedulerBase.to_datetime(DELTA_ZERO)
assert val == UTC_ZERO
val = SchedulerBase.to_datetime(UTC_ZERO)
assert val == UTC_ZERO

def test_base_to_timedelta(self):
val = SchedulerBase.to_timedelta(0.0)
assert val == DELTA_ZERO
val = SchedulerBase.to_timedelta(DELTA_ZERO)
assert val == DELTA_ZERO
val = SchedulerBase.to_timedelta(UTC_ZERO)
assert val == DELTA_ZERO

def test_base_normalize_float(self):
val = SchedulerBase.normalize(-1.0)
assert val == 0.0
val = SchedulerBase.normalize(0.0)
assert val == 0.0
val = SchedulerBase.normalize(1.0)
assert val == 1.0

def test_base_normalize_delta(self):
DELTA_ONE = timedelta(seconds=1.0)
val = SchedulerBase.normalize(-DELTA_ONE)
assert val == DELTA_ZERO
val = SchedulerBase.normalize(DELTA_ZERO)
assert val == DELTA_ZERO
val = SchedulerBase.normalize(DELTA_ONE)
assert val == DELTA_ONE


0 comments on commit f9ac3c8

Please sign in to comment.