diff --git a/rx/concurrency/schedulerbase.py b/rx/concurrency/schedulerbase.py index c8d805690..af627695b 100644 --- a/rx/concurrency/schedulerbase.py +++ b/rx/concurrency/schedulerbase.py @@ -1,3 +1,4 @@ +from abc import abstractmethod from datetime import datetime, timedelta from typing import Optional @@ -6,15 +7,94 @@ from rx.internal.basic import default_now from rx.internal.constants import DELTA_ZERO, UTC_ZERO + class SchedulerBase(typing.Scheduler): """Provides a set of static properties to access commonly used schedulers. """ + @property + def now(self) -> datetime: + """Represents a notion of time for this scheduler. Tasks being + scheduled on a scheduler will adhere to the time denoted by this + property. + + Returns: + The scheduler's current time, as a datetime instance. + """ + + return default_now() + + @abstractmethod + def schedule(self, + action: typing.ScheduledAction, + state: Optional[typing.TState] = None + ) -> typing.Disposable: + """Schedules an action to be executed. + + Args: + action: Action to be executed. + state: [Optional] state to be given to the action function. + + Returns: + The disposable object used to cancel the scheduled action + (best effort). + """ + return NotImplemented + + @abstractmethod + def schedule_relative(self, + duetime: typing.RelativeTime, + action: typing.ScheduledAction, + state: Optional[typing.TState] = None + ) -> typing.Disposable: + """Schedules an action to be executed after duetime. + + Args: + duetime: Relative time after which to execute the action. + action: Action to be executed. + state: [Optional] state to be given to the action function. + + Returns: + The disposable object used to cancel the scheduled action + (best effort). + """ + return NotImplemented + + @abstractmethod + def schedule_absolute(self, + duetime: typing.AbsoluteTime, + action: typing.ScheduledAction, + state: Optional[typing.TState] = None + ) -> typing.Disposable: + """Schedules an action to be executed at duetime. + + Args: + duetime: Absolute time after which to execute the action. + action: Action to be executed. + state: [Optional] state to be given to the action function. + + Returns: + The disposable object used to cancel the scheduled action + (best effort). + """ + return NotImplemented + def invoke_action(self, action: typing.ScheduledAction, state: Optional[typing.TState] = None ) -> typing.Disposable: + """Invoke the given given action. This is typically called by instances + of ScheduledItem. + + Args: + action: Action to be executed. + state: [Optional] state to be given to the action function. + + Returns: + The disposable object returned by the action, if any; or a new + (no-op) disposable otherwise. + """ ret = action(self, state) if isinstance(ret, typing.Disposable): return ret @@ -65,69 +145,89 @@ def invoke_periodic(scheduler: typing.Scheduler, _: typing.TState) -> Optional[D disp.disposable = self.schedule_relative(period, invoke_periodic, None) return disp - @property - def now(self) -> datetime: - """Returns the current time. + @classmethod + def to_seconds(cls, value: typing.AbsoluteOrRelativeTime) -> float: + """Converts time value to seconds. This method handles both absolute + (datetime) and relative (timedelta) values. If the argument is already + a float, it is simply returned unchanged. - Represents a notion of time for this scheduler. Tasks being - scheduled on a scheduler will adhere to the time denoted by - this property. - """ + Args: + value: the time value to convert to seconds. - return default_now() + Returns: + The value converted to seconds. + """ - @classmethod - def to_seconds(cls, timespan: typing.AbsoluteOrRelativeTime) -> float: - """Converts time value to seconds""" + if isinstance(value, datetime): + value = value - UTC_ZERO - if isinstance(timespan, datetime): - timespan = timespan - UTC_ZERO - timespan = timespan.total_seconds() - elif isinstance(timespan, timedelta): - timespan = timespan.total_seconds() + if isinstance(value, timedelta): + value = value.total_seconds() - return timespan + return value @classmethod - def to_datetime(cls, duetime: typing.AbsoluteOrRelativeTime) -> datetime: - """Converts time value to datetime""" + def to_datetime(cls, value: typing.AbsoluteOrRelativeTime) -> datetime: + """Converts time value to datetime. This method handles both absolute + (float) and relative (timedelta) values. If the argument is already + a datetime, it is simply returned unchanged. + + Args: + value: the time value to convert to datetime. + + Returns: + The value converted to datetime. + """ - if isinstance(duetime, timedelta): - duetime = UTC_ZERO + duetime - elif not isinstance(duetime, datetime): - duetime = datetime.utcfromtimestamp(duetime) + if isinstance(value, timedelta): + value = UTC_ZERO + value + elif not isinstance(value, datetime): + value = datetime.utcfromtimestamp(value) - return duetime + return value @classmethod - def to_timedelta(cls, timespan: typing.AbsoluteOrRelativeTime) -> timedelta: - """Converts time value to timedelta""" + def to_timedelta(cls, value: typing.AbsoluteOrRelativeTime) -> timedelta: + """Converts time value to timedelta. This method handles both absolute + (datetime) and relative (float) values. If the argument is already + a timedelta, it is simply returned unchanged. If the argument is an + absolute time, the result value will be the timedelta since the epoch, + January 1st, 1970, 00:00:00. + + Args: + value: the time value to convert to timedelta. + + Returns: + The value converted to timedelta. + """ - if isinstance(timespan, datetime): - timespan = timespan - UTC_ZERO - elif not isinstance(timespan, timedelta): - timespan = timedelta(seconds=timespan) + if isinstance(value, datetime): + value = value - UTC_ZERO + elif not isinstance(value, timedelta): + value = timedelta(seconds=value) - return timespan + return value @classmethod - def normalize(cls, timespan: typing.RelativeTime) -> typing.RelativeTime: - """Normalizes the specified timespan value to a positive value. + def normalize(cls, value: typing.RelativeTime) -> typing.RelativeTime: + """Normalizes the specified time value to a non-negative value. This + method handles only relative values, given as either timedelta or float, + and will return the normalized value as that same type. Args: - timespan: The time span value to normalize. + value: The time value to normalize. Returns: The specified timespan value if it is zero or positive; otherwise, 0.0 """ - if isinstance(timespan, timedelta): - if not timespan or timespan < DELTA_ZERO: + if isinstance(value, timedelta): + if not value or value < DELTA_ZERO: return DELTA_ZERO - elif isinstance(timespan, float): - if not timespan or timespan < 0.0: + elif isinstance(value, float): + if not value or value < 0.0: return 0.0 - return timespan + return value diff --git a/rx/core/abc/scheduler.py b/rx/core/abc/scheduler.py index 109af102f..28eca0aae 100644 --- a/rx/core/abc/scheduler.py +++ b/rx/core/abc/scheduler.py @@ -20,3 +20,6 @@ def schedule_relative(self, duetime, action, state=None): @abstractmethod def schedule_absolute(self, duetime, action, state=None): return NotImplemented + + def schedule_periodic(self, period, action, state=None): + return NotImplemented diff --git a/rx/core/typing.py b/rx/core/typing.py index d5fe622f8..c6b3a1b33 100644 --- a/rx/core/typing.py +++ b/rx/core/typing.py @@ -47,15 +47,34 @@ def now(self) -> datetime: return NotImplemented @abstractmethod - def schedule(self, action: 'ScheduledAction', state: TState = None) -> Disposable: + def schedule(self, + action: 'ScheduledAction', + state: Optional[TState] = None + ) -> Disposable: return NotImplemented @abstractmethod - def schedule_relative(self, duetime: RelativeTime, action: 'ScheduledAction', state: TState = None) -> Disposable: + def schedule_relative(self, + duetime: RelativeTime, + action: 'ScheduledAction', + state: Optional[TState] = None + ) -> Disposable: return NotImplemented @abstractmethod - def schedule_absolute(self, duetime: AbsoluteTime, action: 'ScheduledAction', state: TState = None) -> Disposable: + def schedule_absolute(self, + duetime: AbsoluteTime, + action: 'ScheduledAction', + state: Optional[TState] = None + ) -> Disposable: + return NotImplemented + + @abstractmethod + def schedule_periodic(self, + period: RelativeTime, + action: 'ScheduledPeriodicAction', + state: Optional[TState] = None + ) -> Disposable: return NotImplemented diff --git a/tests/test_concurrency/test_schedulerbase.py b/tests/test_concurrency/test_schedulerbase.py new file mode 100644 index 000000000..7e6625c32 --- /dev/null +++ b/tests/test_concurrency/test_schedulerbase.py @@ -0,0 +1,51 @@ +import unittest +from datetime import timedelta + +from rx.internal.constants import DELTA_ZERO, UTC_ZERO +from rx.concurrency.schedulerbase import SchedulerBase + + +class TestSchedulerBase(unittest.TestCase): + + def test_base_to_seconds(self): + val = SchedulerBase.to_seconds(0.0) + assert val == 0.0 + val = SchedulerBase.to_seconds(DELTA_ZERO) + assert val == 0.0 + val = SchedulerBase.to_seconds(UTC_ZERO) + assert val == 0.0 + + def test_base_to_datetime(self): + val = SchedulerBase.to_datetime(0.0) + assert val == UTC_ZERO + val = SchedulerBase.to_datetime(DELTA_ZERO) + assert val == UTC_ZERO + val = SchedulerBase.to_datetime(UTC_ZERO) + assert val == UTC_ZERO + + def test_base_to_timedelta(self): + val = SchedulerBase.to_timedelta(0.0) + assert val == DELTA_ZERO + val = SchedulerBase.to_timedelta(DELTA_ZERO) + assert val == DELTA_ZERO + val = SchedulerBase.to_timedelta(UTC_ZERO) + assert val == DELTA_ZERO + + def test_base_normalize_float(self): + val = SchedulerBase.normalize(-1.0) + assert val == 0.0 + val = SchedulerBase.normalize(0.0) + assert val == 0.0 + val = SchedulerBase.normalize(1.0) + assert val == 1.0 + + def test_base_normalize_delta(self): + DELTA_ONE = timedelta(seconds=1.0) + val = SchedulerBase.normalize(-DELTA_ONE) + assert val == DELTA_ZERO + val = SchedulerBase.normalize(DELTA_ZERO) + assert val == DELTA_ZERO + val = SchedulerBase.normalize(DELTA_ONE) + assert val == DELTA_ONE + +