Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement: marbles syntax + testing functions #299

Merged
merged 17 commits into from
Feb 6, 2019
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions examples/marbles/frommarbles_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import time

import rx
from rx import concurrency as ccy

err = ValueError("I don't like 5!")

src0 = rx.from_marbles('12-----4-----67--|', timespan=0.2)
src1 = rx.from_marbles('----3----5-# ', timespan=0.2, error=err)

source = rx.merge(src0, src1)
source.subscribe(
on_next=print,
on_error=lambda e: print('boom!! {}'.format(e)),
on_completed=lambda: print('good job!'),
scheduler=ccy.timeout_scheduler,
)

time.sleep(3)
26 changes: 26 additions & 0 deletions examples/marbles/frommarbles_flatmap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import time

import rx
from rx import operators as ops
from rx import concurrency as ccy

a = rx.cold(' ---a---a----------------a-|')
b = rx.cold(' ---b---b---| ')
c = rx.cold(' ---c---c---| ')
d = rx.cold(' --d---d---| ')
e1 = rx.cold('a--b--------c--d-------| ')

observableLookup = {"a": a, "b": b, "c": c, "d": d}

source = e1.pipe(
ops.flat_map(lambda value: observableLookup[value]),
)

source.subscribe_(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use subscribe() from examples now that subscribe supports (again) both observers and callbacks. Subscribe with an _ will still be used by the operators, but it may be confusing for the users that we have two different functions.

Copy link
Collaborator Author

@jcafhe jcafhe Feb 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted 👍
@dbrattli By the way, I've reworked all examples + some others things waiting in a big commit. I'm sorry this PR is a bit of a mess because I've pushed a lot of commits that changes everything. Do you prefer I close this one and open a new one when it's ready?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jcafhe I think it's time to squash and merge. This feature is self contained and I'm not worried if it needs more work.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool !

on_next=print,
on_error=lambda e: print('boom!! {}'.format(e)),
on_completed=lambda: print('good job!'),
scheduler=ccy.timeout_scheduler,
)

time.sleep(3)
20 changes: 20 additions & 0 deletions examples/marbles/frommarbles_lookup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import time

import rx
from rx import concurrency as ccy

lookup0 = {'a': 1, 'b': 3, 'c': 5}
lookup1 = {'x': 2, 'y': 4, 'z': 6}
source0 = rx.cold('a---b----c----|', timespan=0.2, lookup=lookup0)
source1 = rx.cold('---x---y---z--|', timespan=0.2, lookup=lookup1)

observable = rx.merge(source0, source1)

observable.subscribe_(
on_next=print,
on_error=lambda e: print('boom!! {}'.format(e)),
on_completed=lambda: print('good job!'),
scheduler=ccy.timeout_scheduler,
)

time.sleep(3)
18 changes: 18 additions & 0 deletions examples/marbles/frommarbles_merge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import time

import rx
from rx import concurrency as ccy

source0 = rx.cold('a-----d---1--------4-|', timespan=0.1)
source1 = rx.cold('--b-c-------2---3-| ', timespan=0.1)

observable = rx.merge(source0, source1)

observable.subscribe(
on_next=print,
on_error=lambda e: print('boom!! {}'.format(e)),
on_completed=lambda: print('good job!'),
scheduler=ccy.timeout_scheduler,
)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered using .run() instead? It's made for exactly this scenario. It will then return the last value emitted, and you can pipe to ops.to_iterable() to catch them all.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you're right, good idea !

time.sleep(3)
17 changes: 17 additions & 0 deletions examples/marbles/hot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-

from rx.testing import marbles
import rx.concurrency as ccy
import datetime
import rx

#start_time = 5
now = datetime.datetime.utcnow()
start_time = now + datetime.timedelta(seconds=3.0)
hot = rx.hot('--a--b--c--|',
# timespan=0.3,
start_time=start_time,
# scheduler=ccy.timeout_scheduler,
)

hot.subscribe(print, print, lambda: print('completed'))
30 changes: 30 additions & 0 deletions examples/marbles/testing_debounce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from rx import operators as ops
from rx.testing import marbles

"""
Tests debounceTime from rxjs
https://github.com/ReactiveX/rxjs/blob/master/spec/operators/debounceTime-spec.ts

it should delay all element by the specified time
"""

start, cold, hot, exp = marbles.test_context(timespan=1)

e1 = hot('-a--------b------c----|')
ex = exp('------a--------b------(c|)')
expected = ex


def create():
return e1.pipe(
ops.debounce(5),
)


results = start(create)
assert results == expected

print('\ndebounce: results vs expected')
for r, e in zip(results, expected):
print(r, e)

35 changes: 35 additions & 0 deletions examples/marbles/testing_flatmap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from rx import operators as ops
from rx.testing import marbles

"""
Tests MergeMap from rxjs
https://github.com/ReactiveX/rxjs/blob/master/spec/operators/mergeMap-spec.ts

it should flat_map many regular interval inners
"""
start, cold, hot, exp = marbles.test_context(timespan=1)

a = cold(' ----a---a---a---(a|) ')
b = cold(' ----1---b---(b|) ')
c = cold(' ----c---c---c---c---(c|)')
d = cold(' ----(d|) ')
e1 = hot('-a---b-----------c-------d-------| ')
ex = exp('-----a---(a1)(ab)(ab)c---c---(cd)c---(c|)')
expected = ex

observableLookup = {"a": a, "b": b, "c": c, "d": d}


def create():
return e1.pipe(
ops.flat_map(lambda value: observableLookup[value])
)


results = start(create)
assert results == expected


print('\nflat_map: results vs expected')
for r, e in zip(results, expected):
print(r, e)
143 changes: 143 additions & 0 deletions rx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,75 @@ def from_iterable(iterable: Iterable) -> Observable:
from_list = from_iterable


def from_marbles(string: str, timespan: typing.RelativeTime = 0.1, scheduler: typing.Scheduler = None,
lookup = None, error: Exception = None) -> Observable:
"""Convert a marble diagram string to a cold observable sequence, using
an optional scheduler to enumerate the events.

Each character in the string will advance time by timespan
(exept for space). Characters that are not special (see the table below)
will be interpreted as a value to be emitted. numbers will be cast
to int or float.

Special characters:
+--------+--------------------------------------------------------+
| `-` | advance time by timespan |
+--------+--------------------------------------------------------+
| `#` | on_error() |
+--------+--------------------------------------------------------+
| `|` | on_completed() |
+--------+--------------------------------------------------------+
| `(` | open a group of marbles sharing the same timestamp |
+--------+--------------------------------------------------------+
| `)` | close a group of marbles |
+--------+--------------------------------------------------------+
| `,` | separate elements in a group |
+--------+--------------------------------------------------------+
| space | used to align multiple diagrams, does not advance time |
+--------+--------------------------------------------------------+

In a group of marbles, the position of the initial `(` determines the
timestamp at which grouped marbles will be emitted. E.g. `--(abc)--` will
emit a, b, c at 2 * timespan and then advance virtual time by 5 * timespan.

Examples:
>>> from_marbles("--1--(2,3)-4--|")
>>> from_marbles("a--b--c-", lookup={'a': 1, 'b': 2, 'c': 3})
>>> from_marbles("a--b---#", error=ValueError("foo"))

Args:
string: String with marble diagram

timespan: [Optional] duration of each character in second.
If not specified, defaults to 0.1s.

lookup: [Optional] dict used to convert a marble into a specified
value. If not specified, defaults to {}.

error: [Optional] exception that will be use in place of the # symbol.
If not specified, defaults to Exception('error').

scheduler: [Optional] Scheduler to run the the input sequence
on. If not specified, defaults to the downstream scheduler,
else to NewThreadScheduler.

Returns:
The observable sequence whose elements are pulled from the
given marble diagram string.
"""

from .core.observable.marbles import from_marbles as _from_marbles
return _from_marbles(
string,
timespan=timespan,
lookup=lookup,
error=error,
scheduler=scheduler)


cold = from_marbles


def generate_with_relative_time(initial_state, condition, iterate, time_mapper) -> Observable:
"""Generates an observable sequence by iterating a state from an
initial state until the condition fails.
Expand Down Expand Up @@ -276,6 +345,75 @@ def generate(initial_state, condition, iterate) -> Observable:
return _generate(initial_state, condition, iterate)


def hot(string, timespan: typing.RelativeTime=0.1, duetime:typing.AbsoluteOrRelativeTime = 0.0,
scheduler: typing.Scheduler = None, lookup = None, error: Exception = None) -> Observable:
"""Convert a marble diagram string to a hot observable sequence, using
an optional scheduler to enumerate the events.

Each character in the string will advance time by timespan
(exept for space). Characters that are not special (see the table below)
will be interpreted as a value to be emitted. numbers will be cast
to int or float.

Special characters:
+--------+--------------------------------------------------------+
| `-` | advance time by timespan |
+--------+--------------------------------------------------------+
| `#` | on_error() |
+--------+--------------------------------------------------------+
| `|` | on_completed() |
+--------+--------------------------------------------------------+
| `(` | open a group of marbles sharing the same timestamp |
+--------+--------------------------------------------------------+
| `)` | close a group of marbles |
+--------+--------------------------------------------------------+
| `,` | separate elements in a group |
+--------+--------------------------------------------------------+
| space | used to align multiple diagrams, does not advance time |
+--------+--------------------------------------------------------+

In a group of marbles, the position of the initial `(` determines the
timestamp at which grouped marbles will be emitted. E.g. `--(abc)--` will
emit a, b, c at 2 * timespan and then advance virtual time by 5 * timespan.

Examples:
>>> from_marbles("--1--(2,3)-4--|")
>>> from_marbles("a--b--c-", lookup={'a': 1, 'b': 2, 'c': 3})
>>> from_marbles("a--b---#", error=ValueError("foo"))

Args:
string: String with marble diagram

timespan: [Optional] duration of each character in second.
If not specified, defaults to 0.1s.

duetime: [Optional] Absolute datetime or timedelta relative to
the calling time that defines when to start the emission of elements.

lookup: [Optional] dict used to convert a marble into a specified
value. If not specified, defaults to {}.

error: [Optional] exception that will be use in place of the # symbol.
If not specified, defaults to Exception('error').

scheduler: [Optional] Scheduler to run the the input sequence
on. If not specified, defaults to NewThreadScheduler.

Returns:
The observable sequence whose elements are pulled from the
given marble diagram string.
"""

from .core.observable.marbles import hot as _hot
return _hot(
string,
timespan=timespan,
duetime=duetime,
lookup=lookup,
error=error,
scheduler=scheduler)


def if_then(condition: Callable[[], bool], then_source: Observable,
else_source: Observable = None) -> Observable:
"""Determines whether an observable collection contains values.
Expand Down Expand Up @@ -559,6 +697,11 @@ def to_async(func: Callable, scheduler=None) -> Callable:
return _to_async(func, scheduler)


def to_marbles(scheduler: typing.Scheduler = None, timespan=0.1) -> str:
from .core.observable.marbles import to_marbles as _to_marbles
return _to_marbles(scheduler=scheduler, timespan=timespan)


def using(resource_factory: Callable[[], typing.Disposable], observable_factory: Callable[[typing.Disposable], Observable]
) -> Observable:
"""Constructs an observable sequence that depends on a resource
Expand Down
Loading