Skip to content

Commit

Permalink
Do not mix types and base classes. First iteration. Fixes #293
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrattli committed Jan 24, 2019
1 parent bcae554 commit 93e3df2
Show file tree
Hide file tree
Showing 34 changed files with 59 additions and 60 deletions.
2 changes: 1 addition & 1 deletion rx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from asyncio.futures import Future as _Future
from typing import Iterable, Callable, Any, Optional, Union

from .core import Observer, Observable, abc, typing, pipe
from .core import Observable, abc, typing, pipe
from .core import AnonymousObservable as _AnonymousObservable

from . import disposable
Expand Down
1 change: 0 additions & 1 deletion rx/concurrency/catchscheduler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from rx import disposable
from rx.core import Disposable
from rx.disposable import SingleAssignmentDisposable

from .schedulerbase import SchedulerBase
Expand Down
4 changes: 2 additions & 2 deletions rx/concurrency/eventloopscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Any, List, Optional

from rx import disposable
from rx.core import Disposable, typing
from rx.core import typing
from rx.concurrency import ScheduledItem
from rx.internal.exceptions import DisposedException
from rx.internal.priorityqueue import PriorityQueue
Expand All @@ -14,7 +14,7 @@
log = logging.getLogger('Rx')


class EventLoopScheduler(SchedulerBase, Disposable):
class EventLoopScheduler(SchedulerBase, typing.Disposable):
"""Creates an object that schedules units of work on a designated
thread."""

Expand Down
8 changes: 4 additions & 4 deletions rx/concurrency/schedulerbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
from typing import Optional

from rx import disposable
from rx.core import Scheduler, Disposable, typing
from rx.core.typing import ScheduledAction, ScheduledPeriodicAction, TState
from rx.core import typing
from rx.core.typing import ScheduledAction, ScheduledPeriodicAction, TState, Disposable
from rx.disposable import MultipleAssignmentDisposable
from rx.internal.basic import default_now


class SchedulerBase(Scheduler):
class SchedulerBase(typing.Scheduler):
"""Provides a set of static properties to access commonly used
schedulers.
"""
Expand Down Expand Up @@ -37,7 +37,7 @@ def schedule_periodic(self, period: typing.RelativeTime, action: ScheduledPeriod

disp = MultipleAssignmentDisposable()

def invoke_action(scheduler: Scheduler, _: TState) -> Optional[Disposable]:
def invoke_action(scheduler: typing.Scheduler, _: TState) -> Optional[Disposable]:
nonlocal state

if disp.is_disposed:
Expand Down
5 changes: 1 addition & 4 deletions rx/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
# flake8: noqa
from .typing import Observer, Scheduler, Disposable


from .pipe import pipe

from .observable import Observable
from .observable import ObservableBase as Observable
from .observable import AnonymousObservable, ConnectableObservable
from .observable import GroupedObservable, BlockingObservable

Expand Down
3 changes: 2 additions & 1 deletion rx/core/notification.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from abc import abstractmethod
from rx.concurrency import immediate_scheduler

from . import Observer, AnonymousObserver
from .typing import Observer
from .observer import AnonymousObserver
from .observable import AnonymousObservable


Expand Down
2 changes: 1 addition & 1 deletion rx/core/observable/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .observable import Observable
from .observablebase import ObservableBase
from .anonymousobservable import AnonymousObservable
from .connectableobservable import ConnectableObservable
from .groupedobservable import GroupedObservable
Expand Down
2 changes: 1 addition & 1 deletion rx/core/observable/anonymousobservable.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from rx.core import typing

from .observable import Observable
from .observablebase import ObservableBase as Observable


class AnonymousObservable(Observable):
Expand Down
8 changes: 4 additions & 4 deletions rx/core/observable/blockingobservable.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from typing import Any, Callable, Iterable
from threading import RLock

from rx.core import abc
from rx.core import typing

from .observable import Observable
from .observablebase import ObservableBase as Observable
from ..observer import AnonymousObserver


class BlockingObservable(abc.Observable):
class BlockingObservable(typing.Observable):
def __init__(self, observable: Observable = None):
"""Turns an observable into a blocking observable.
Expand All @@ -18,7 +18,7 @@ def __init__(self, observable: Observable = None):
self.observable = observable
self.lock = RLock()

def subscribe(self, observer: abc.Observer = None, scheduler: abc.Scheduler = None) -> abc.Disposable:
def subscribe(self, observer: typing.Observer = None, scheduler: typing.Scheduler = None) -> typing.Disposable:
"""Subscribe an observer to the observable sequence.
Examples:
Expand Down
4 changes: 2 additions & 2 deletions rx/core/observable/connectableobservable.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from rx import disposable
from rx.disposable import CompositeDisposable

from .observable import Observable
from .observablebase import ObservableBase
from .anonymousobservable import AnonymousObservable


class ConnectableObservable(Observable):
class ConnectableObservable(ObservableBase):
"""Represents an observable that can be connected and
disconnected."""

Expand Down
4 changes: 2 additions & 2 deletions rx/core/observable/groupedobservable.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from rx.disposable import CompositeDisposable

from .observable import Observable
from .observablebase import ObservableBase
from .anonymousobservable import AnonymousObservable

class GroupedObservable(Observable):
class GroupedObservable(ObservableBase):
def __init__(self, key, underlying_observable, merged_disposable=None):
super(GroupedObservable, self).__init__()
self.key = key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from .. import typing, abc


class Observable(typing.Observable):
class ObservableBase(typing.Observable):
"""Observables base class.
Represents a push-style collection and contains all operators as
Expand Down Expand Up @@ -182,7 +182,7 @@ def subscribe(self, observer: typing.Observer = None, scheduler: typing.Schedule
return self.subscribe_(observer.on_next, observer.on_error, observer.on_completed, scheduler)


def pipe(self, *operators: Callable[['Observable'], 'Observable']) -> 'Observable':
def pipe(self, *operators: Callable[['ObservableBase'], 'ObservableBase']) -> 'ObservableBase':
"""Compose multiple operators left to right.
Composes zero or more operators into a functional composition.
Expand Down
3 changes: 2 additions & 1 deletion rx/core/observable/using.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

import rx
from rx import disposable
from rx.core import Observable, AnonymousObservable, Disposable
from rx.core import Observable, AnonymousObservable
from rx.core.typing import Disposable
from rx.disposable import CompositeDisposable


Expand Down
3 changes: 2 additions & 1 deletion rx/core/operators/do.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Callable

from rx.core import Observer, AnonymousObservable, Disposable, Observable, typing
from rx.core import AnonymousObservable, Observable, typing
from rx.core.typing import Observer, Disposable
from rx.disposable import CompositeDisposable


Expand Down
2 changes: 1 addition & 1 deletion rx/core/pipe.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Callable
from functools import reduce
from .observable import Observable
from .observable import ObservableBase as Observable


def pipe(*operators: Callable[[Observable], Observable]) -> Callable[[Observable], Observable]:
Expand Down
2 changes: 1 addition & 1 deletion rx/disposable/booleandisposable.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from threading import RLock
from rx.core import Disposable
from rx.core.typing import Disposable


class BooleanDisposable(Disposable):
Expand Down
2 changes: 1 addition & 1 deletion rx/disposable/compositedisposable.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from threading import RLock

from rx.core import Disposable
from rx.core.typing import Disposable


class CompositeDisposable(Disposable):
Expand Down
2 changes: 1 addition & 1 deletion rx/disposable/multipleassignmentdisposable.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from threading import RLock
from rx.core import Disposable
from rx.core.typing import Disposable


class MultipleAssignmentDisposable(Disposable):
Expand Down
2 changes: 1 addition & 1 deletion rx/disposable/refcountdisposable.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from threading import RLock

from rx import disposable
from rx.core import Disposable
from rx.core.typing import Disposable


class RefCountDisposable(Disposable):
Expand Down
2 changes: 1 addition & 1 deletion rx/disposable/scheduleddisposable.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from threading import RLock
from rx.core import Disposable
from rx.core.typing import Disposable


class ScheduledDisposable(Disposable):
Expand Down
10 changes: 5 additions & 5 deletions rx/disposable/serialdisposable.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Optional

from rx.core import typing
from rx.core import Disposable
from rx.core.typing import Disposable


class SerialDisposable(Disposable):
Expand All @@ -12,13 +12,13 @@ class SerialDisposable(Disposable):
"""

def __init__(self) -> None:
self.current: Optional[typing.Disposable] = None
self.current: Optional[Disposable] = None
self.is_disposed = False
self.lock = RLock()

super().__init__()

def get_disposable(self) -> Optional[typing.Disposable]:
def get_disposable(self) -> Optional[Disposable]:
return self.current

def set_disposable(self, value) -> None:
Expand All @@ -27,7 +27,7 @@ def set_disposable(self, value) -> None:
disposable object. Assigning this property disposes the previous
disposable object."""

old: Optional[typing.Disposable] = None
old: Optional[Disposable] = None

with self.lock:
should_dispose = self.is_disposed
Expand All @@ -47,7 +47,7 @@ def dispose(self) -> None:
"""Disposes the underlying disposable as well as all future
replacements."""

old: Optional[typing.Disposable] = None
old: Optional[Disposable] = None

with self.lock:
if not self.is_disposed:
Expand Down
2 changes: 1 addition & 1 deletion rx/disposable/singleassignmentdisposable.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from threading import RLock
from rx.core import Disposable
from rx.core.typing import Disposable


class SingleAssignmentDisposable(Disposable):
Expand Down
3 changes: 2 additions & 1 deletion rx/subjects/anonymoussubject.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from typing import Any

from rx.core import typing
from rx.core import Observable, Observer, Scheduler
from rx.core import Observable
from rx.core.typing import Observer, Scheduler


class AnonymousSubject(Observable, Observer):
Expand Down
3 changes: 2 additions & 1 deletion rx/subjects/asyncsubject.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import threading

from rx import disposable
from rx.core import Observer, Observable
from rx.core import Observable
from rx.core.typing import Observer
from rx.internal import DisposedException

from .innersubscription import InnerSubscription
Expand Down
3 changes: 2 additions & 1 deletion rx/subjects/behaviorsubject.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import threading

from rx import disposable
from rx.core import Observer, Observable
from rx.core import Observable
from rx.core.typing import Observer
from rx.internal import DisposedException

from .innersubscription import InnerSubscription
Expand Down
3 changes: 2 additions & 1 deletion rx/subjects/replaysubject.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from typing import Any, Optional, List
from datetime import timedelta

from rx.core import Observer, Observable, typing
from rx.core import Observable, typing
from rx.core.typing import Observer
from rx.internal import DisposedException
from rx.concurrency import current_thread_scheduler
from rx.core.observer.scheduledobserver import ScheduledObserver
Expand Down
6 changes: 3 additions & 3 deletions rx/subjects/subject.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
from typing import Any, List, Optional

from rx import disposable
from rx.core import typing
from rx.core import Observer, Observable, Scheduler
from rx.core.typing import Observer, Scheduler, Disposable
from rx.core import Observable
from rx.internal import DisposedException

from .anonymoussubject import AnonymousSubject
Expand All @@ -30,7 +30,7 @@ def check_disposed(self):
if self.is_disposed:
raise DisposedException()

def _subscribe_core(self, observer: Observer, scheduler: Scheduler = None) -> typing.Disposable:
def _subscribe_core(self, observer: Observer, scheduler: Scheduler = None) -> Disposable:
with self.lock:
self.check_disposed()
if not self.is_stopped:
Expand Down
2 changes: 1 addition & 1 deletion rx/testing/mockobserver.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Any, List

from rx.core import Observer, typing
from rx.core.typing import Observer
from rx.core.notification import OnNext, OnError, OnCompleted

from .recorded import Recorded
Expand Down
3 changes: 2 additions & 1 deletion rx/testing/testscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

import rx
from rx import disposable
from rx.core import Observable, Disposable, typing
from rx.core import Observable, typing
from rx.core.typing import Disposable
from rx.concurrency import VirtualTimeScheduler

from .coldobservable import ColdObservable
Expand Down
2 changes: 1 addition & 1 deletion tests/test_core/test_notification.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from rx.core import Observer
from rx.core.typing import Observer
from rx.testing import TestScheduler, ReactiveTest
from rx.core.notification import OnNext, OnError, OnCompleted

Expand Down
3 changes: 2 additions & 1 deletion tests/test_observable/test_connectableobservable.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

import rx
from rx import operators as ops
from rx.core import Observer, Observable
from rx.core import Observable
from rx.core.typing import Observer
from rx.testing import TestScheduler, ReactiveTest
from rx.subjects import Subject
from rx.core import ConnectableObservable
Expand Down
4 changes: 2 additions & 2 deletions tests/test_observable/test_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import rx
from rx import operators as ops
from rx.core import Observer, Observable
from rx.core import ConnectableObservable
from rx.core import ConnectableObservable, Observable
from rx.core.typing import Observer
from rx.testing import TestScheduler, ReactiveTest

on_next = ReactiveTest.on_next
Expand Down
Loading

0 comments on commit 93e3df2

Please sign in to comment.