Skip to content

Commit

Permalink
Merge branch 'master' into feature/pipe-typing
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrattli authored May 11, 2019
2 parents de0a81a + 1de2850 commit 56bc897
Show file tree
Hide file tree
Showing 73 changed files with 297 additions and 225 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,6 @@ _build

# Mac OS
.DS_Store

# Type checkers
.pyre
82 changes: 49 additions & 33 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,51 +1,64 @@
matrix:
include:
- os: linux

- name: "Python 3.6 on Linux"
python: "3.6"
os: linux
dist: xenial
language: python
python: '3.6'
# Coverage for the baseline 3.6 build: include some optional packages
before_script:
# install some packages to ensure all of the optional tests are covered
- pip3 install eventlet gevent pygame pyqt5 pyside2 tornado twisted
# pycairo / pygobject need native libraries
- sudo apt-get install -y libgirepository1.0-dev gir1.2-gtk-3.0
- pip3 install pycairo pygobject
# wxpython needs native libraries
- sudo apt-get install -y freeglut3 freeglut3-dev libgtk-3-dev
libgstreamer-plugins-base1.0-0 libgstreamer-plugins-base1.0-dev
libgstreamer1.0-0 libgstreamer1.0-dev libsdl2-dev
- pip3 install -U -f https://extras.wxpython.org/wxPython4/extras/linux/gtk3/ubuntu-16.04 wxPython
script:
- coverage run --source=rx setup.py test && coveralls

- os: linux
- name: "Python 3.7 on Linux"
python: "3.7"
os: linux
dist: xenial
language: python
python: '3.7'

- os: linux
- name: "Python 3.8-dev on Linux"
python: "3.8-dev"
os: linux
dist: xenial
language: python
python: '3.8-dev'

- os: osx
osx_image: xcode10.1
- name: "Python 3.7 on Linux, with optional dependencies and benchmark"
python: "3.7"
os: linux
dist: xenial
language: python
install:
# Native libraries for pycairo / pygobject
- sudo apt-get install -y libgirepository1.0-dev gir1.2-gtk-3.0
# Pre-built wheel for wxpython
- pip3 install -U -f https://extras.wxpython.org/wxPython4/extras/linux/gtk3/ubuntu-16.04 wxpython
# Install the various optional packages
- pip3 install eventlet gevent pycairo pygobject pygame pyqt5 pyside2 tornado twisted
# Coverage will be reported for the Python 3.7 Linux build
- pip3 install coveralls
script:
- coverage run --source=rx setup.py test && coveralls

- name: "Python 3.7 on MacOS, with optional dependencies and benchmark"
python: "3.7"
os: osx
osx_image: xcode10.2
language: sh
python: '3.7'
install:
# Native libraries for pycairo / pygobject
- brew install pygobject3 gtk+3
# Install the various optional packages
- pip3 install eventlet gevent pycairo pygobject pygame pyqt5 pyside2 tornado twisted wxpython

- os: windows
- name: "Python 3.7 on Windows, with optional dependencies and benchmark"
python: "3.7"
os: windows
language: sh
python: '3.7'
before_install:
install:
# Get Python 3.7 from choco
- choco install python3
- ln -s "/c/Python37/python.exe" "/c/Python37/python3.exe"
- export PATH="/c/Python37:/c/Python37/Scripts/:$PATH"

install:
- python3 setup.py install
- pip3 install coveralls coverage
- pip3 install pytest>=3.0.2 pytest-asyncio pytest-cov --upgrade
- export PATH="/c/Python37/:/c/Python37/Scripts/:$PATH"
# Install the various optional packages
- pip3 install eventlet gevent pygame pyqt5 pyside2 tornado twisted wxpython
# TODO install pycairo and pygobject somehow?

script:
- python3 setup.py test
Expand All @@ -54,4 +67,7 @@ after_success:
# Run a crude benchmark (unit tests minus concurrency) a couple of times.
# Need to make a copy of the script first, because on Windows it disappears
# whilst switching git branches.
- cp ./.travis/bench.sh bench.sh && ./bench.sh
- if [[ $TRAVIS_JOB_NAME == *benchmark ]]; then
pip3 install pytest>=4.4.1 pytest-asyncio>=0.10.0;
cp ./.travis/bench.sh bench.sh && ./bench.sh;
fi
2 changes: 1 addition & 1 deletion project.cfg
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[project]
name = Rx
project = RxPY
version = 3.0.0-alpha2
version = 3.0.0-beta1
description = Reactive Extensions (Rx) for Python
author = Dag Brattli
author_email = [email protected]
Expand Down
25 changes: 14 additions & 11 deletions rx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

from .core import Observable, abc, typing, pipe

from . import disposable


def amb(*sources: Observable) -> Observable:
"""Propagates the observable sequence that reacts first.
Expand Down Expand Up @@ -466,7 +464,7 @@ def if_then(condition: Callable[[], bool], then_source: Observable,
return _if_then(condition, then_source, else_source)


def interval(period, scheduler: typing.Scheduler = None) -> Observable:
def interval(period, scheduler: Optional[typing.Scheduler] = None) -> Observable:
"""Returns an observable sequence that produces a value after each
period.
Expand Down Expand Up @@ -541,7 +539,11 @@ def on_error_resume_next(*sources: Observable) -> Observable:
return _on_error_resume_next(*sources)


def range(start: int, stop: int = None, step: int = None, scheduler: typing.Scheduler = None) -> Observable:
def range(start: int,
stop: Optional[int] = None,
step: Optional[int] = None,
scheduler: Optional[typing.Scheduler] = None
) -> Observable:
"""Generates an observable sequence of integral numbers within a
specified range, using the specified scheduler to send out observer
messages.
Expand All @@ -564,7 +566,7 @@ def range(start: int, stop: int = None, step: int = None, scheduler: typing.Sche
return _range(start, stop, step, scheduler)


def return_value(value: Any, scheduler: typing.Scheduler = None) -> Observable:
def return_value(value: Any, scheduler: Optional[typing.Scheduler] = None) -> Observable:
"""Returns an observable sequence that contains a single element,
using the specified scheduler to send out observer messages.
There is an alias called 'just'.
Expand All @@ -587,7 +589,7 @@ def return_value(value: Any, scheduler: typing.Scheduler = None) -> Observable:
just = return_value


def repeat_value(value: Any = None, repeat_count: int = None) -> Observable:
def repeat_value(value: Any = None, repeat_count: Optional[int] = None) -> Observable:
"""Generates an observable sequence that repeats the given element
the specified number of times.
Expand Down Expand Up @@ -650,7 +652,7 @@ def start_async(function_async) -> Observable:
return _start_async(function_async)


def throw(exception: Exception, scheduler: typing.Scheduler = None) -> Observable:
def throw(exception: Exception, scheduler: Optional[typing.Scheduler] = None) -> Observable:
"""Returns an observable sequence that terminates with an exception,
using the specified scheduler to send out the single OnError
message.
Expand All @@ -670,8 +672,8 @@ def throw(exception: Exception, scheduler: typing.Scheduler = None) -> Observabl
return _throw(exception, scheduler)


def timer(duetime: typing.AbsoluteOrRelativeTime, period: typing.RelativeTime = None,
scheduler: typing.Scheduler = None) -> Observable:
def timer(duetime: typing.AbsoluteOrRelativeTime, period: Optional[typing.RelativeTime] = None,
scheduler: Optional[typing.Scheduler] = None) -> Observable:
"""Returns an observable sequence that produces a value after
duetime has elapsed and then after each period.
Expand Down Expand Up @@ -722,8 +724,9 @@ def to_async(func: Callable, scheduler=None) -> Callable:
return _to_async(func, scheduler)


def using(resource_factory: Callable[[], typing.Disposable], observable_factory: Callable[[typing.Disposable], Observable]
) -> Observable:
def using(resource_factory: Callable[[], typing.Disposable],
observable_factory: Callable[[typing.Disposable], Observable]
) -> Observable:
"""Constructs an observable sequence that depends on a resource
object, whose lifetime is tied to the resulting observable
sequence's lifetime.
Expand Down
3 changes: 2 additions & 1 deletion rx/concurrency/threadpoolscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ def start(self) -> None:
self.future = self.executor.submit(self.target)

def cancel(self) -> None:
self.future.cancel()
if self.future:
self.future.cancel()

def __init__(self, max_workers: Optional[int] = None) -> None:
self.executor: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=max_workers)
Expand Down
7 changes: 4 additions & 3 deletions rx/core/observable/empty.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from typing import Any
from typing import Any, Optional

from rx.core import typing
from rx.core import Observable
from rx.concurrency import immediate_scheduler


def _empty(scheduler: typing.Scheduler = None) -> Observable:
def subscribe(observer: typing.Observer, scheduler_: typing.Scheduler = None) -> typing.Disposable:
def _empty(scheduler: Optional[typing.Scheduler] = None) -> Observable:
def subscribe(observer: typing.Observer, scheduler_: Optional[typing.Scheduler] = None) -> typing.Disposable:
_scheduler = scheduler or scheduler_ or immediate_scheduler

def action(_: typing.Scheduler, __: Any) -> None:
observer.on_completed()

Expand Down
6 changes: 3 additions & 3 deletions rx/core/observable/fromcallback.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from typing import Callable
from typing import Callable, Optional

from rx.disposable import Disposable
from rx.core import typing
from rx.core import Observable
from rx.core.typing import Mapper


def _from_callback(func: Callable, mapper: Mapper = None) -> Callable[[], Observable]:
def _from_callback(func: Callable, mapper: Optional[Mapper] = None) -> Callable[[], Observable]:
"""Converts a callback function to an observable sequence.
Args:
Expand All @@ -24,7 +24,7 @@ def _from_callback(func: Callable, mapper: Mapper = None) -> Callable[[], Observ
def function(*args):
arguments = list(args)

def subscribe(observer: typing.Observer, scheduler: typing.Scheduler = None) -> typing.Disposable:
def subscribe(observer: typing.Observer, scheduler: Optional[typing.Scheduler] = None) -> typing.Disposable:
def handler(*args):
results = list(args)
if mapper:
Expand Down
3 changes: 2 additions & 1 deletion rx/core/observable/fromfuture.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from asyncio.futures import Future
from typing import Optional

from rx.disposable import Disposable
from rx.core import typing
Expand All @@ -18,7 +19,7 @@ def _from_future(future: Future) -> Observable:
and failure.
"""

def subscribe(observer: typing.Observer, scheduler: typing.Scheduler = None) -> typing.Disposable:
def subscribe(observer: typing.Observer, scheduler: Optional[typing.Scheduler] = None) -> typing.Disposable:
def done(future):
try:
value = future.result()
Expand Down
6 changes: 3 additions & 3 deletions rx/core/observable/fromiterable.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from typing import Iterable, Any
from typing import Iterable, Any, Optional

from rx.core import Observable, typing
from rx.concurrency import current_thread_scheduler
from rx.disposable import CompositeDisposable, Disposable


def from_iterable(iterable: Iterable, scheduler: typing.Scheduler = None) -> Observable:
def from_iterable(iterable: Iterable, scheduler: Optional[typing.Scheduler] = None) -> Observable:
"""Converts an iterable to an observable sequence.
Example:
Expand All @@ -20,7 +20,7 @@ def from_iterable(iterable: Iterable, scheduler: typing.Scheduler = None) -> Obs
given iterable sequence.
"""

def subscribe(observer: typing.Observer, scheduler_: typing.Scheduler = None) -> typing.Disposable:
def subscribe(observer: typing.Observer, scheduler_: Optional[typing.Scheduler] = None) -> typing.Disposable:
_scheduler = scheduler or scheduler_ or current_thread_scheduler
iterator = iter(iterable)
disposed = False
Expand Down
1 change: 1 addition & 0 deletions rx/core/observable/groupedobservable.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from .observable import Observable


class GroupedObservable(Observable):
def __init__(self, key, underlying_observable, merged_disposable=None):
super().__init__()
Expand Down
4 changes: 2 additions & 2 deletions rx/core/observable/ifthen.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Callable
from typing import Callable, Optional

import rx
from rx.core import abc
Expand All @@ -7,7 +7,7 @@


def _if_then(condition: Callable[[], bool], then_source: Observable,
else_source: Observable = None) -> Observable:
else_source: Optional[Observable] = None) -> Observable:
"""Determines whether an observable collection contains values.
Example:
Expand Down
4 changes: 3 additions & 1 deletion rx/core/observable/interval.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import Optional

from rx import timer
from rx.core import Observable, typing


def _interval(period, scheduler: typing.Scheduler = None) -> Observable:
def _interval(period, scheduler: Optional[typing.Scheduler] = None) -> Observable:
return timer(period, period, scheduler)
10 changes: 5 additions & 5 deletions rx/core/observable/marbles.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Dict, Tuple
from typing import List, Dict, Tuple, Optional
import re
import threading
from datetime import datetime, timedelta
Expand Down Expand Up @@ -32,7 +32,7 @@


def hot(string: str, timespan: RelativeTime = 0.1, duetime: AbsoluteOrRelativeTime = 0.0,
lookup: Dict = None, error: Exception = None, scheduler: Scheduler = None) -> Observable:
lookup: Dict = None, error: Optional[Exception] = None, scheduler: Optional[Scheduler] = None) -> Observable:

_scheduler = scheduler or new_thread_scheduler

Expand Down Expand Up @@ -92,7 +92,7 @@ def action(scheduler, state=None):


def from_marbles(string: str, timespan: RelativeTime = 0.1, lookup: Dict = None,
error: Exception = None, scheduler: Scheduler = None) -> Observable:
error: Optional[Exception] = None, scheduler: Optional[Scheduler] = None) -> Observable:

disp = CompositeDisposable()
messages = parse(string, timespan=timespan, lookup=lookup, error=error, raise_stopped=True)
Expand All @@ -117,7 +117,7 @@ def subscribe(observer, scheduler_):


def parse(string: str, timespan: RelativeTime = 1.0, time_shift: RelativeTime = 0.0, lookup: Dict = None,
error: Exception = None, raise_stopped: bool = False) -> List[Tuple[RelativeTime, notification.Notification]]:
error: Optional[Exception] = None, raise_stopped: bool = False) -> List[Tuple[RelativeTime, notification.Notification]]:
"""Convert a marble diagram string to a list of messages.
Each character in the string will advance time by timespan
Expand Down Expand Up @@ -206,6 +206,7 @@ def map_element(time, element):
return (time, notification.OnNext(value))

is_stopped = False

def check_stopped(element):
nonlocal is_stopped
if raise_stopped:
Expand All @@ -215,7 +216,6 @@ def check_stopped(element):
if element in ('#', '|'):
is_stopped = True


iframe = 0
messages = []

Expand Down
4 changes: 3 additions & 1 deletion rx/core/observable/never.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from rx.disposable import Disposable
from rx.core import Observable, typing

Expand All @@ -10,7 +12,7 @@ def _never() -> Observable:
An observable sequence whose observers will never get called.
"""

def subscribe(observer: typing.Observer, scheduler: typing.Scheduler = None) -> typing.Disposable:
def subscribe(observer: typing.Observer, scheduler: Optional[typing.Scheduler] = None) -> typing.Disposable:
return Disposable()

return Observable(subscribe)
Loading

0 comments on commit 56bc897

Please sign in to comment.