Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement: marbles syntax + testing functions #299

Merged
merged 17 commits into from
Feb 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions examples/marbles/frommarbles_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import rx
from rx import operators as ops

"""
Specify the error to be raised in place of the # symbol.
"""

err = ValueError("I don't like 5!")

src0 = rx.from_marbles('12-----4-----67--|', timespan=0.2)
src1 = rx.from_marbles('----3----5-# ', timespan=0.2, error=err)

source = rx.merge(src0, src1).pipe(ops.do_action(print))
source.run()
17 changes: 17 additions & 0 deletions examples/marbles/frommarbles_flatmap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import rx
from rx import operators as ops

a = rx.cold(' ---a0---a1----------------a2-| ')
b = rx.cold(' ---b1---b2---| ')
c = rx.cold(' ---c1---c2---| ')
d = rx.cold(' -----d1---d2---|')
e1 = rx.cold('a--b--------c-----d-------| ')

observableLookup = {"a": a, "b": b, "c": c, "d": d}

source = e1.pipe(
ops.flat_map(lambda value: observableLookup[value]),
ops.do_action(lambda v: print(v)),
)

source.run()
16 changes: 16 additions & 0 deletions examples/marbles/frommarbles_lookup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

import rx
import rx.operators as ops
"""
Use a dictionnary to convert elements declared in the marbles diagram to
the specified values.
"""

lookup0 = {'a': 1, 'b': 3, 'c': 5}
lookup1 = {'x': 2, 'y': 4, 'z': 6}
source0 = rx.cold('a---b----c----|', timespan=0.01, lookup=lookup0)
source1 = rx.cold('---x---y---z--|', timespan=0.01, lookup=lookup1)

observable = rx.merge(source0, source1).pipe(ops.to_iterable())
elements = observable.run()
print('received {}'.format(list(elements)))
14 changes: 14 additions & 0 deletions examples/marbles/frommarbles_merge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@

import rx
from rx import operators as ops

"""
simple example that merges two cold observables.
"""

source0 = rx.cold('a-----d---1--------4-|', timespan=0.01)
source1 = rx.cold('--b-c-------2---3-| ', timespan=0.01)

observable = rx.merge(source0, source1).pipe(ops.to_iterable())
elements = observable.run()
print('received {}'.format(list(elements)))
20 changes: 20 additions & 0 deletions examples/marbles/hot_datetime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import datetime

import rx
import rx.operators as ops

"""
Delay the emission of elements to the specified datetime.
"""

now = datetime.datetime.utcnow()
dt = datetime.timedelta(seconds=3.0)
duetime = now + dt

print('{} -> now\n'
'{} -> start of emission in {}s'.format(now, duetime, dt.total_seconds()))

hot = rx.hot('10--11--12--13--(14,|)', timespan=0.2, duetime=duetime)

source = hot.pipe(ops.do_action(print))
source.run()
26 changes: 26 additions & 0 deletions examples/marbles/testing_debounce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from rx import operators as ops
from rx.testing.marbles import marbles_testing

"""
Tests debounceTime from rxjs
https://github.com/ReactiveX/rxjs/blob/master/spec/operators/debounceTime-spec.ts

it should delay all element by the specified time
"""
with marbles_testing(timespan=1.0) as (start, cold, hot, exp):

e1 = cold('-a--------b------c----|')
ex = exp( '------a--------b------(c,|)')
expected = ex

def create():
return e1.pipe(
ops.debounce(5),
)

results = start(create)
assert results == expected

print('debounce: results vs expected')
for r, e in zip(results, expected):
print(r, e)
32 changes: 32 additions & 0 deletions examples/marbles/testing_flatmap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from rx import operators as ops
from rx.testing.marbles import marbles_testing

"""
Tests MergeMap from rxjs
https://github.com/ReactiveX/rxjs/blob/master/spec/operators/mergeMap-spec.ts

it should flat_map many regular interval inners
"""
with marbles_testing(timespan=1.0) as context:
start, cold, hot, exp = context

a = cold(' ----a---a----a----(a,|) ')
b = cold(' ----1----b----(b,|) ')
c = cold(' -------c---c---c----c---(c,|)')
d = cold(' -------(d,|) ')
e1 = hot('-a---b-----------c-------d------------| ')
ex = exp('-----a---(a,1)(a,b)(a,b)c---c---(c,d)c---(c,|)')
expected = ex

observableLookup = {"a": a, "b": b, "c": c, "d": d}

obs = e1.pipe(
ops.flat_map(lambda value: observableLookup[value])
)

results = start(obs)
assert results == expected

print('flat_map: results vs expected')
for r, e in zip(results, expected):
print(r, e)
12 changes: 12 additions & 0 deletions examples/marbles/tomarbles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import rx
from rx import concurrency as ccy
from rx import operators as ops

source0 = rx.cold('a-----d---1--------4-|', timespan=0.1)
source1 = rx.cold('--b-c-------2---3-| ', timespan=0.1)

print("to_marbles() is a blocking operator, we need to wait for completion...")
print('expecting "a-b-c-d---1-2---3--4-|"')
observable = rx.merge(source0, source1).pipe(ops.to_marbles(timespan=0.1))
diagram = observable.run()
print('got "{}"'.format(diagram))
129 changes: 129 additions & 0 deletions rx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,71 @@ def from_iterable(iterable: Iterable) -> Observable:
from_list = from_iterable


def from_marbles(string: str, timespan: typing.RelativeTime = 0.1, scheduler: typing.Scheduler = None,
lookup = None, error: Exception = None) -> Observable:
"""Convert a marble diagram string to a cold observable sequence, using
an optional scheduler to enumerate the events.

Each character in the string will advance time by timespan
(exept for space). Characters that are not special (see the table below)
will be interpreted as a value to be emitted. Numbers will be cast
to int or float.

Special characters:
+--------+--------------------------------------------------------+
| `-` | advance time by timespan |
+--------+--------------------------------------------------------+
| `#` | on_error() |
+--------+--------------------------------------------------------+
| `|` | on_completed() |
+--------+--------------------------------------------------------+
| `(` | open a group of marbles sharing the same timestamp |
+--------+--------------------------------------------------------+
| `)` | close a group of marbles |
+--------+--------------------------------------------------------+
| `,` | separate elements in a group |
+--------+--------------------------------------------------------+
| space | used to align multiple diagrams, does not advance time |
+--------+--------------------------------------------------------+

In a group of elements, the position of the initial `(` determines the
timestamp at which grouped elements will be emitted. E.g. `--(12,3,4)--`
will emit 12, 3, 4 at 2 * timespan and then advance virtual time
by 8 * timespan.

Examples:
>>> from_marbles("--1--(2,3)-4--|")
>>> from_marbles("a--b--c-", lookup={'a': 1, 'b': 2, 'c': 3})
>>> from_marbles("a--b---#", error=ValueError("foo"))

Args:
string: String with marble diagram

timespan: [Optional] duration of each character in second.
If not specified, defaults to 0.1s.

lookup: [Optional] dict used to convert an element into a specified
value. If not specified, defaults to {}.

error: [Optional] exception that will be use in place of the # symbol.
If not specified, defaults to Exception('error').

scheduler: [Optional] Scheduler to run the the input sequence
on. If not specified, defaults to the subscribe scheduler
if defined, else to NewThreadScheduler.

Returns:
The observable sequence whose elements are pulled from the
given marble diagram string.
"""

from .core.observable.marbles import from_marbles as _from_marbles
return _from_marbles(string, timespan, lookup=lookup, error=error, scheduler=scheduler)


cold = from_marbles


def generate_with_relative_time(initial_state, condition, iterate, time_mapper) -> Observable:
"""Generates an observable sequence by iterating a state from an
initial state until the condition fails.
Expand Down Expand Up @@ -276,6 +341,70 @@ def generate(initial_state, condition, iterate) -> Observable:
return _generate(initial_state, condition, iterate)


def hot(string, timespan: typing.RelativeTime=0.1, duetime:typing.AbsoluteOrRelativeTime = 0.0,
scheduler: typing.Scheduler = None, lookup=None, error: Exception = None) -> Observable:
"""Convert a marble diagram string to a hot observable sequence, using
an optional scheduler to enumerate the events.

Each character in the string will advance time by timespan
(exept for space). Characters that are not special (see the table below)
will be interpreted as a value to be emitted. Numbers will be cast
to int or float.

Special characters:
+--------+--------------------------------------------------------+
| `-` | advance time by timespan |
+--------+--------------------------------------------------------+
| `#` | on_error() |
+--------+--------------------------------------------------------+
| `|` | on_completed() |
+--------+--------------------------------------------------------+
| `(` | open a group of elemets sharing the same timestamp |
+--------+--------------------------------------------------------+
| `)` | close a group of elements |
+--------+--------------------------------------------------------+
| `,` | separate elements in a group |
+--------+--------------------------------------------------------+
| space | used to align multiple diagrams, does not advance time |
+--------+--------------------------------------------------------+

In a group of elements, the position of the initial `(` determines the
timestamp at which grouped elements will be emitted. E.g. `--(12,3,4)--`
will emit 12, 3, 4 at 2 * timespan and then advance virtual time
by 8 * timespan.

Examples:
>>> hot("--1--(2,3)-4--|")
>>> hot("a--b--c-", lookup={'a': 1, 'b': 2, 'c': 3})
>>> hot("a--b---#", error=ValueError("foo"))

Args:
string: String with marble diagram

timespan: [Optional] duration of each character in second.
If not specified, defaults to 0.1s.

duetime: [Optional] Absolute datetime or timedelta from now that
determines when to start the emission of elements.

lookup: [Optional] dict used to convert an element into a specified
value. If not specified, defaults to {}.

error: [Optional] exception that will be use in place of the # symbol.
If not specified, defaults to Exception('error').

scheduler: [Optional] Scheduler to run the the input sequence
on. If not specified, defaults to NewThreadScheduler.

Returns:
The observable sequence whose elements are pulled from the
given marble diagram string.
"""

from .core.observable.marbles import hot as _hot
return _hot(string, timespan, duetime, lookup=lookup, error=error, scheduler=scheduler)


def if_then(condition: Callable[[], bool], then_source: Observable,
else_source: Observable = None) -> Observable:
"""Determines whether an observable collection contains values.
Expand Down
Loading