Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SchedulerBase polish, typings, test #361

Merged
merged 4 commits into from
May 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 139 additions & 39 deletions rx/concurrency/schedulerbase.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from abc import abstractmethod
from datetime import datetime, timedelta
from typing import Optional

Expand All @@ -6,15 +7,94 @@
from rx.internal.basic import default_now
from rx.internal.constants import DELTA_ZERO, UTC_ZERO


class SchedulerBase(typing.Scheduler):
"""Provides a set of static properties to access commonly used
schedulers.
"""

@property
def now(self) -> datetime:
"""Represents a notion of time for this scheduler. Tasks being
scheduled on a scheduler will adhere to the time denoted by this
property.

Returns:
The scheduler's current time, as a datetime instance.
"""

return default_now()

@abstractmethod
def schedule(self,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just curious, is it worth to re-declare schedule, schedule_relative & schedule_absolute as abstract methods while these are already declared in typing.Scheduler ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, maybe not strictly necessary... Although it might help to keep the generated docs and help() complete.

I guess I included this mainly because of an idea I have that I’m playing around with, which would simplify all of the classes that extend SchedulerBase by putting some often-repeated logic (schedule methods being defined in terms of one another) in the base class.

Having these here at this point would make that second step a lot easier / readable.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool 👍

action: typing.ScheduledAction,
state: Optional[typing.TState] = None
) -> typing.Disposable:
"""Schedules an action to be executed.

Args:
action: Action to be executed.
state: [Optional] state to be given to the action function.

Returns:
The disposable object used to cancel the scheduled action
(best effort).
"""
return NotImplemented

@abstractmethod
def schedule_relative(self,
duetime: typing.RelativeTime,
action: typing.ScheduledAction,
state: Optional[typing.TState] = None
) -> typing.Disposable:
"""Schedules an action to be executed after duetime.

Args:
duetime: Relative time after which to execute the action.
action: Action to be executed.
state: [Optional] state to be given to the action function.

Returns:
The disposable object used to cancel the scheduled action
(best effort).
"""
return NotImplemented

@abstractmethod
def schedule_absolute(self,
duetime: typing.AbsoluteTime,
action: typing.ScheduledAction,
state: Optional[typing.TState] = None
) -> typing.Disposable:
"""Schedules an action to be executed at duetime.

Args:
duetime: Absolute time after which to execute the action.
action: Action to be executed.
state: [Optional] state to be given to the action function.

Returns:
The disposable object used to cancel the scheduled action
(best effort).
"""
return NotImplemented

def invoke_action(self,
action: typing.ScheduledAction,
state: Optional[typing.TState] = None
) -> typing.Disposable:
"""Invoke the given given action. This is typically called by instances
of ScheduledItem.

Args:
action: Action to be executed.
state: [Optional] state to be given to the action function.

Returns:
The disposable object returned by the action, if any; or a new
(no-op) disposable otherwise.
"""
ret = action(self, state)
if isinstance(ret, typing.Disposable):
return ret
Expand Down Expand Up @@ -65,69 +145,89 @@ def invoke_periodic(scheduler: typing.Scheduler, _: typing.TState) -> Optional[D
disp.disposable = self.schedule_relative(period, invoke_periodic, None)
return disp

@property
def now(self) -> datetime:
"""Returns the current time.
@classmethod
def to_seconds(cls, value: typing.AbsoluteOrRelativeTime) -> float:
"""Converts time value to seconds. This method handles both absolute
(datetime) and relative (timedelta) values. If the argument is already
a float, it is simply returned unchanged.

Represents a notion of time for this scheduler. Tasks being
scheduled on a scheduler will adhere to the time denoted by
this property.
"""
Args:
value: the time value to convert to seconds.

return default_now()
Returns:
The value converted to seconds.
"""

@classmethod
def to_seconds(cls, timespan: typing.AbsoluteOrRelativeTime) -> float:
"""Converts time value to seconds"""
if isinstance(value, datetime):
value = value - UTC_ZERO

if isinstance(timespan, datetime):
timespan = timespan - UTC_ZERO
timespan = timespan.total_seconds()
elif isinstance(timespan, timedelta):
timespan = timespan.total_seconds()
if isinstance(value, timedelta):
value = value.total_seconds()

return timespan
return value

@classmethod
def to_datetime(cls, duetime: typing.AbsoluteOrRelativeTime) -> datetime:
"""Converts time value to datetime"""
def to_datetime(cls, value: typing.AbsoluteOrRelativeTime) -> datetime:
"""Converts time value to datetime. This method handles both absolute
(float) and relative (timedelta) values. If the argument is already
a datetime, it is simply returned unchanged.

Args:
value: the time value to convert to datetime.

Returns:
The value converted to datetime.
"""

if isinstance(duetime, timedelta):
duetime = UTC_ZERO + duetime
elif not isinstance(duetime, datetime):
duetime = datetime.utcfromtimestamp(duetime)
if isinstance(value, timedelta):
value = UTC_ZERO + value
elif not isinstance(value, datetime):
value = datetime.utcfromtimestamp(value)

return duetime
return value

@classmethod
def to_timedelta(cls, timespan: typing.AbsoluteOrRelativeTime) -> timedelta:
"""Converts time value to timedelta"""
def to_timedelta(cls, value: typing.AbsoluteOrRelativeTime) -> timedelta:
"""Converts time value to timedelta. This method handles both absolute
erikkemperman marked this conversation as resolved.
Show resolved Hide resolved
(datetime) and relative (float) values. If the argument is already
a timedelta, it is simply returned unchanged. If the argument is an
absolute time, the result value will be the timedelta since the epoch,
January 1st, 1970, 00:00:00.

Args:
value: the time value to convert to timedelta.

Returns:
The value converted to timedelta.
"""

if isinstance(timespan, datetime):
timespan = timespan - UTC_ZERO
elif not isinstance(timespan, timedelta):
timespan = timedelta(seconds=timespan)
if isinstance(value, datetime):
value = value - UTC_ZERO
elif not isinstance(value, timedelta):
value = timedelta(seconds=value)

return timespan
return value

@classmethod
def normalize(cls, timespan: typing.RelativeTime) -> typing.RelativeTime:
"""Normalizes the specified timespan value to a positive value.
def normalize(cls, value: typing.RelativeTime) -> typing.RelativeTime:
"""Normalizes the specified time value to a non-negative value. This
method handles only relative values, given as either timedelta or float,
and will return the normalized value as that same type.

Args:
timespan: The time span value to normalize.
value: The time value to normalize.

Returns:
The specified timespan value if it is zero or positive;
otherwise, 0.0
"""

if isinstance(timespan, timedelta):
if not timespan or timespan < DELTA_ZERO:
if isinstance(value, timedelta):
if not value or value < DELTA_ZERO:
return DELTA_ZERO

elif isinstance(timespan, float):
if not timespan or timespan < 0.0:
elif isinstance(value, float):
if not value or value < 0.0:
return 0.0

return timespan
return value
3 changes: 3 additions & 0 deletions rx/core/abc/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ def schedule_relative(self, duetime, action, state=None):
@abstractmethod
def schedule_absolute(self, duetime, action, state=None):
return NotImplemented

def schedule_periodic(self, period, action, state=None):
return NotImplemented
25 changes: 22 additions & 3 deletions rx/core/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,34 @@ def now(self) -> datetime:
return NotImplemented

@abstractmethod
def schedule(self, action: 'ScheduledAction', state: TState = None) -> Disposable:
def schedule(self,
action: 'ScheduledAction',
state: Optional[TState] = None
) -> Disposable:
return NotImplemented

@abstractmethod
def schedule_relative(self, duetime: RelativeTime, action: 'ScheduledAction', state: TState = None) -> Disposable:
def schedule_relative(self,
duetime: RelativeTime,
action: 'ScheduledAction',
state: Optional[TState] = None
) -> Disposable:
return NotImplemented

@abstractmethod
def schedule_absolute(self, duetime: AbsoluteTime, action: 'ScheduledAction', state: TState = None) -> Disposable:
def schedule_absolute(self,
duetime: AbsoluteTime,
action: 'ScheduledAction',
state: Optional[TState] = None
) -> Disposable:
return NotImplemented

@abstractmethod
def schedule_periodic(self,
period: RelativeTime,
action: 'ScheduledPeriodicAction',
state: Optional[TState] = None
) -> Disposable:
return NotImplemented


Expand Down
51 changes: 51 additions & 0 deletions tests/test_concurrency/test_schedulerbase.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import unittest
from datetime import timedelta

from rx.internal.constants import DELTA_ZERO, UTC_ZERO
from rx.concurrency.schedulerbase import SchedulerBase


class TestSchedulerBase(unittest.TestCase):

def test_base_to_seconds(self):
val = SchedulerBase.to_seconds(0.0)
assert val == 0.0
val = SchedulerBase.to_seconds(DELTA_ZERO)
assert val == 0.0
val = SchedulerBase.to_seconds(UTC_ZERO)
assert val == 0.0

def test_base_to_datetime(self):
val = SchedulerBase.to_datetime(0.0)
assert val == UTC_ZERO
val = SchedulerBase.to_datetime(DELTA_ZERO)
assert val == UTC_ZERO
val = SchedulerBase.to_datetime(UTC_ZERO)
assert val == UTC_ZERO

def test_base_to_timedelta(self):
val = SchedulerBase.to_timedelta(0.0)
assert val == DELTA_ZERO
val = SchedulerBase.to_timedelta(DELTA_ZERO)
assert val == DELTA_ZERO
val = SchedulerBase.to_timedelta(UTC_ZERO)
assert val == DELTA_ZERO

def test_base_normalize_float(self):
val = SchedulerBase.normalize(-1.0)
assert val == 0.0
val = SchedulerBase.normalize(0.0)
assert val == 0.0
val = SchedulerBase.normalize(1.0)
assert val == 1.0

def test_base_normalize_delta(self):
DELTA_ONE = timedelta(seconds=1.0)
val = SchedulerBase.normalize(-DELTA_ONE)
assert val == DELTA_ZERO
val = SchedulerBase.normalize(DELTA_ZERO)
assert val == DELTA_ZERO
val = SchedulerBase.normalize(DELTA_ONE)
assert val == DELTA_ONE