Skip to content

Commit

Permalink
Enhancement: marbles syntax + testing functions (#299)
Browse files Browse the repository at this point in the history
* complete marbles syntax + add tests + add new testing functions

* sleep the main thread in examples

* fix the scheduler selection mechanism

* remove dependency to testscheduler.create_cold_observable() for cold() function + typo

* implement hot not dependent to test scheduler

* move from_marbles(), to_iterable(), hot(), parse() from rx.testing.marbles to rx.core.observable.marbles + update tests & examples

* remove dependency to rx.testing.recorded.Recorded() + update test

* fix exp() function

* add to_marbles in rx.__init__.py

* implement the marbles syntax as discussed in #299 + update tests

* add tests for rx.hot

* implement hot with no operators

* move tests relative to from_marbles()/cold() and hot() to tests/test_observable/test_marbles.py

* fix elements should be skipped after on_completed or on_error

* handle timedelta/datetime + enforce time value as float + cleaning

* move to_iterable in rx/operators instead of rx/observable
make marble context a true python context
update tests
update examples

* typo
  • Loading branch information
jcafhe authored and dbrattli committed Feb 6, 2019
1 parent 4e29c7d commit 379b699
Show file tree
Hide file tree
Showing 15 changed files with 1,443 additions and 141 deletions.
14 changes: 14 additions & 0 deletions examples/marbles/frommarbles_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import rx
from rx import operators as ops

"""
Specify the error to be raised in place of the # symbol.
"""

err = ValueError("I don't like 5!")

src0 = rx.from_marbles('12-----4-----67--|', timespan=0.2)
src1 = rx.from_marbles('----3----5-# ', timespan=0.2, error=err)

source = rx.merge(src0, src1).pipe(ops.do_action(print))
source.run()
17 changes: 17 additions & 0 deletions examples/marbles/frommarbles_flatmap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import rx
from rx import operators as ops

a = rx.cold(' ---a0---a1----------------a2-| ')
b = rx.cold(' ---b1---b2---| ')
c = rx.cold(' ---c1---c2---| ')
d = rx.cold(' -----d1---d2---|')
e1 = rx.cold('a--b--------c-----d-------| ')

observableLookup = {"a": a, "b": b, "c": c, "d": d}

source = e1.pipe(
ops.flat_map(lambda value: observableLookup[value]),
ops.do_action(lambda v: print(v)),
)

source.run()
16 changes: 16 additions & 0 deletions examples/marbles/frommarbles_lookup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

import rx
import rx.operators as ops
"""
Use a dictionnary to convert elements declared in the marbles diagram to
the specified values.
"""

lookup0 = {'a': 1, 'b': 3, 'c': 5}
lookup1 = {'x': 2, 'y': 4, 'z': 6}
source0 = rx.cold('a---b----c----|', timespan=0.01, lookup=lookup0)
source1 = rx.cold('---x---y---z--|', timespan=0.01, lookup=lookup1)

observable = rx.merge(source0, source1).pipe(ops.to_iterable())
elements = observable.run()
print('received {}'.format(list(elements)))
14 changes: 14 additions & 0 deletions examples/marbles/frommarbles_merge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@

import rx
from rx import operators as ops

"""
simple example that merges two cold observables.
"""

source0 = rx.cold('a-----d---1--------4-|', timespan=0.01)
source1 = rx.cold('--b-c-------2---3-| ', timespan=0.01)

observable = rx.merge(source0, source1).pipe(ops.to_iterable())
elements = observable.run()
print('received {}'.format(list(elements)))
20 changes: 20 additions & 0 deletions examples/marbles/hot_datetime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import datetime

import rx
import rx.operators as ops

"""
Delay the emission of elements to the specified datetime.
"""

now = datetime.datetime.utcnow()
dt = datetime.timedelta(seconds=3.0)
duetime = now + dt

print('{} -> now\n'
'{} -> start of emission in {}s'.format(now, duetime, dt.total_seconds()))

hot = rx.hot('10--11--12--13--(14,|)', timespan=0.2, duetime=duetime)

source = hot.pipe(ops.do_action(print))
source.run()
26 changes: 26 additions & 0 deletions examples/marbles/testing_debounce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from rx import operators as ops
from rx.testing.marbles import marbles_testing

"""
Tests debounceTime from rxjs
https://github.com/ReactiveX/rxjs/blob/master/spec/operators/debounceTime-spec.ts
it should delay all element by the specified time
"""
with marbles_testing(timespan=1.0) as (start, cold, hot, exp):

e1 = cold('-a--------b------c----|')
ex = exp( '------a--------b------(c,|)')
expected = ex

def create():
return e1.pipe(
ops.debounce(5),
)

results = start(create)
assert results == expected

print('debounce: results vs expected')
for r, e in zip(results, expected):
print(r, e)
32 changes: 32 additions & 0 deletions examples/marbles/testing_flatmap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from rx import operators as ops
from rx.testing.marbles import marbles_testing

"""
Tests MergeMap from rxjs
https://github.com/ReactiveX/rxjs/blob/master/spec/operators/mergeMap-spec.ts
it should flat_map many regular interval inners
"""
with marbles_testing(timespan=1.0) as context:
start, cold, hot, exp = context

a = cold(' ----a---a----a----(a,|) ')
b = cold(' ----1----b----(b,|) ')
c = cold(' -------c---c---c----c---(c,|)')
d = cold(' -------(d,|) ')
e1 = hot('-a---b-----------c-------d------------| ')
ex = exp('-----a---(a,1)(a,b)(a,b)c---c---(c,d)c---(c,|)')
expected = ex

observableLookup = {"a": a, "b": b, "c": c, "d": d}

obs = e1.pipe(
ops.flat_map(lambda value: observableLookup[value])
)

results = start(obs)
assert results == expected

print('flat_map: results vs expected')
for r, e in zip(results, expected):
print(r, e)
12 changes: 12 additions & 0 deletions examples/marbles/tomarbles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import rx
from rx import concurrency as ccy
from rx import operators as ops

source0 = rx.cold('a-----d---1--------4-|', timespan=0.1)
source1 = rx.cold('--b-c-------2---3-| ', timespan=0.1)

print("to_marbles() is a blocking operator, we need to wait for completion...")
print('expecting "a-b-c-d---1-2---3--4-|"')
observable = rx.merge(source0, source1).pipe(ops.to_marbles(timespan=0.1))
diagram = observable.run()
print('got "{}"'.format(diagram))
129 changes: 129 additions & 0 deletions rx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,71 @@ def from_iterable(iterable: Iterable) -> Observable:
from_list = from_iterable


def from_marbles(string: str, timespan: typing.RelativeTime = 0.1, scheduler: typing.Scheduler = None,
lookup = None, error: Exception = None) -> Observable:
"""Convert a marble diagram string to a cold observable sequence, using
an optional scheduler to enumerate the events.
Each character in the string will advance time by timespan
(exept for space). Characters that are not special (see the table below)
will be interpreted as a value to be emitted. Numbers will be cast
to int or float.
Special characters:
+--------+--------------------------------------------------------+
| `-` | advance time by timespan |
+--------+--------------------------------------------------------+
| `#` | on_error() |
+--------+--------------------------------------------------------+
| `|` | on_completed() |
+--------+--------------------------------------------------------+
| `(` | open a group of marbles sharing the same timestamp |
+--------+--------------------------------------------------------+
| `)` | close a group of marbles |
+--------+--------------------------------------------------------+
| `,` | separate elements in a group |
+--------+--------------------------------------------------------+
| space | used to align multiple diagrams, does not advance time |
+--------+--------------------------------------------------------+
In a group of elements, the position of the initial `(` determines the
timestamp at which grouped elements will be emitted. E.g. `--(12,3,4)--`
will emit 12, 3, 4 at 2 * timespan and then advance virtual time
by 8 * timespan.
Examples:
>>> from_marbles("--1--(2,3)-4--|")
>>> from_marbles("a--b--c-", lookup={'a': 1, 'b': 2, 'c': 3})
>>> from_marbles("a--b---#", error=ValueError("foo"))
Args:
string: String with marble diagram
timespan: [Optional] duration of each character in second.
If not specified, defaults to 0.1s.
lookup: [Optional] dict used to convert an element into a specified
value. If not specified, defaults to {}.
error: [Optional] exception that will be use in place of the # symbol.
If not specified, defaults to Exception('error').
scheduler: [Optional] Scheduler to run the the input sequence
on. If not specified, defaults to the subscribe scheduler
if defined, else to NewThreadScheduler.
Returns:
The observable sequence whose elements are pulled from the
given marble diagram string.
"""

from .core.observable.marbles import from_marbles as _from_marbles
return _from_marbles(string, timespan, lookup=lookup, error=error, scheduler=scheduler)


cold = from_marbles


def generate_with_relative_time(initial_state, condition, iterate, time_mapper) -> Observable:
"""Generates an observable sequence by iterating a state from an
initial state until the condition fails.
Expand Down Expand Up @@ -276,6 +341,70 @@ def generate(initial_state, condition, iterate) -> Observable:
return _generate(initial_state, condition, iterate)


def hot(string, timespan: typing.RelativeTime=0.1, duetime:typing.AbsoluteOrRelativeTime = 0.0,
scheduler: typing.Scheduler = None, lookup=None, error: Exception = None) -> Observable:
"""Convert a marble diagram string to a hot observable sequence, using
an optional scheduler to enumerate the events.
Each character in the string will advance time by timespan
(exept for space). Characters that are not special (see the table below)
will be interpreted as a value to be emitted. Numbers will be cast
to int or float.
Special characters:
+--------+--------------------------------------------------------+
| `-` | advance time by timespan |
+--------+--------------------------------------------------------+
| `#` | on_error() |
+--------+--------------------------------------------------------+
| `|` | on_completed() |
+--------+--------------------------------------------------------+
| `(` | open a group of elemets sharing the same timestamp |
+--------+--------------------------------------------------------+
| `)` | close a group of elements |
+--------+--------------------------------------------------------+
| `,` | separate elements in a group |
+--------+--------------------------------------------------------+
| space | used to align multiple diagrams, does not advance time |
+--------+--------------------------------------------------------+
In a group of elements, the position of the initial `(` determines the
timestamp at which grouped elements will be emitted. E.g. `--(12,3,4)--`
will emit 12, 3, 4 at 2 * timespan and then advance virtual time
by 8 * timespan.
Examples:
>>> hot("--1--(2,3)-4--|")
>>> hot("a--b--c-", lookup={'a': 1, 'b': 2, 'c': 3})
>>> hot("a--b---#", error=ValueError("foo"))
Args:
string: String with marble diagram
timespan: [Optional] duration of each character in second.
If not specified, defaults to 0.1s.
duetime: [Optional] Absolute datetime or timedelta from now that
determines when to start the emission of elements.
lookup: [Optional] dict used to convert an element into a specified
value. If not specified, defaults to {}.
error: [Optional] exception that will be use in place of the # symbol.
If not specified, defaults to Exception('error').
scheduler: [Optional] Scheduler to run the the input sequence
on. If not specified, defaults to NewThreadScheduler.
Returns:
The observable sequence whose elements are pulled from the
given marble diagram string.
"""

from .core.observable.marbles import hot as _hot
return _hot(string, timespan, duetime, lookup=lookup, error=error, scheduler=scheduler)


def if_then(condition: Callable[[], bool], then_source: Observable,
else_source: Observable = None) -> Observable:
"""Determines whether an observable collection contains values.
Expand Down
Loading

0 comments on commit 379b699

Please sign in to comment.