diff --git a/examples/marbles/frommarbles_error.py b/examples/marbles/frommarbles_error.py new file mode 100644 index 000000000..d4589603d --- /dev/null +++ b/examples/marbles/frommarbles_error.py @@ -0,0 +1,14 @@ +import rx +from rx import operators as ops + +""" +Specify the error to be raised in place of the # symbol. +""" + +err = ValueError("I don't like 5!") + +src0 = rx.from_marbles('12-----4-----67--|', timespan=0.2) +src1 = rx.from_marbles('----3----5-# ', timespan=0.2, error=err) + +source = rx.merge(src0, src1).pipe(ops.do_action(print)) +source.run() diff --git a/examples/marbles/frommarbles_flatmap.py b/examples/marbles/frommarbles_flatmap.py new file mode 100644 index 000000000..f25483ee4 --- /dev/null +++ b/examples/marbles/frommarbles_flatmap.py @@ -0,0 +1,17 @@ +import rx +from rx import operators as ops + +a = rx.cold(' ---a0---a1----------------a2-| ') +b = rx.cold(' ---b1---b2---| ') +c = rx.cold(' ---c1---c2---| ') +d = rx.cold(' -----d1---d2---|') +e1 = rx.cold('a--b--------c-----d-------| ') + +observableLookup = {"a": a, "b": b, "c": c, "d": d} + +source = e1.pipe( + ops.flat_map(lambda value: observableLookup[value]), + ops.do_action(lambda v: print(v)), + ) + +source.run() diff --git a/examples/marbles/frommarbles_lookup.py b/examples/marbles/frommarbles_lookup.py new file mode 100644 index 000000000..b436c5dfd --- /dev/null +++ b/examples/marbles/frommarbles_lookup.py @@ -0,0 +1,16 @@ + +import rx +import rx.operators as ops +""" +Use a dictionnary to convert elements declared in the marbles diagram to +the specified values. +""" + +lookup0 = {'a': 1, 'b': 3, 'c': 5} +lookup1 = {'x': 2, 'y': 4, 'z': 6} +source0 = rx.cold('a---b----c----|', timespan=0.01, lookup=lookup0) +source1 = rx.cold('---x---y---z--|', timespan=0.01, lookup=lookup1) + +observable = rx.merge(source0, source1).pipe(ops.to_iterable()) +elements = observable.run() +print('received {}'.format(list(elements))) diff --git a/examples/marbles/frommarbles_merge.py b/examples/marbles/frommarbles_merge.py new file mode 100644 index 000000000..e44e0ac7c --- /dev/null +++ b/examples/marbles/frommarbles_merge.py @@ -0,0 +1,14 @@ + +import rx +from rx import operators as ops + +""" +simple example that merges two cold observables. +""" + +source0 = rx.cold('a-----d---1--------4-|', timespan=0.01) +source1 = rx.cold('--b-c-------2---3-| ', timespan=0.01) + +observable = rx.merge(source0, source1).pipe(ops.to_iterable()) +elements = observable.run() +print('received {}'.format(list(elements))) diff --git a/examples/marbles/hot_datetime.py b/examples/marbles/hot_datetime.py new file mode 100644 index 000000000..7603d7d57 --- /dev/null +++ b/examples/marbles/hot_datetime.py @@ -0,0 +1,20 @@ +import datetime + +import rx +import rx.operators as ops + +""" +Delay the emission of elements to the specified datetime. +""" + +now = datetime.datetime.utcnow() +dt = datetime.timedelta(seconds=3.0) +duetime = now + dt + +print('{} -> now\n' + '{} -> start of emission in {}s'.format(now, duetime, dt.total_seconds())) + +hot = rx.hot('10--11--12--13--(14,|)', timespan=0.2, duetime=duetime) + +source = hot.pipe(ops.do_action(print)) +source.run() diff --git a/examples/marbles/testing_debounce.py b/examples/marbles/testing_debounce.py new file mode 100644 index 000000000..c6d514930 --- /dev/null +++ b/examples/marbles/testing_debounce.py @@ -0,0 +1,26 @@ +from rx import operators as ops +from rx.testing.marbles import marbles_testing + +""" +Tests debounceTime from rxjs +https://github.com/ReactiveX/rxjs/blob/master/spec/operators/debounceTime-spec.ts + +it should delay all element by the specified time +""" +with marbles_testing(timespan=1.0) as (start, cold, hot, exp): + + e1 = cold('-a--------b------c----|') + ex = exp( '------a--------b------(c,|)') + expected = ex + + def create(): + return e1.pipe( + ops.debounce(5), + ) + + results = start(create) + assert results == expected + +print('debounce: results vs expected') +for r, e in zip(results, expected): + print(r, e) diff --git a/examples/marbles/testing_flatmap.py b/examples/marbles/testing_flatmap.py new file mode 100644 index 000000000..9d34592c0 --- /dev/null +++ b/examples/marbles/testing_flatmap.py @@ -0,0 +1,32 @@ +from rx import operators as ops +from rx.testing.marbles import marbles_testing + +""" +Tests MergeMap from rxjs +https://github.com/ReactiveX/rxjs/blob/master/spec/operators/mergeMap-spec.ts + +it should flat_map many regular interval inners +""" +with marbles_testing(timespan=1.0) as context: + start, cold, hot, exp = context + + a = cold(' ----a---a----a----(a,|) ') + b = cold(' ----1----b----(b,|) ') + c = cold(' -------c---c---c----c---(c,|)') + d = cold(' -------(d,|) ') + e1 = hot('-a---b-----------c-------d------------| ') + ex = exp('-----a---(a,1)(a,b)(a,b)c---c---(c,d)c---(c,|)') + expected = ex + + observableLookup = {"a": a, "b": b, "c": c, "d": d} + + obs = e1.pipe( + ops.flat_map(lambda value: observableLookup[value]) + ) + + results = start(obs) + assert results == expected + +print('flat_map: results vs expected') +for r, e in zip(results, expected): + print(r, e) diff --git a/examples/marbles/tomarbles.py b/examples/marbles/tomarbles.py new file mode 100644 index 000000000..ecb327e1b --- /dev/null +++ b/examples/marbles/tomarbles.py @@ -0,0 +1,12 @@ +import rx +from rx import concurrency as ccy +from rx import operators as ops + +source0 = rx.cold('a-----d---1--------4-|', timespan=0.1) +source1 = rx.cold('--b-c-------2---3-| ', timespan=0.1) + +print("to_marbles() is a blocking operator, we need to wait for completion...") +print('expecting "a-b-c-d---1-2---3--4-|"') +observable = rx.merge(source0, source1).pipe(ops.to_marbles(timespan=0.1)) +diagram = observable.run() +print('got "{}"'.format(diagram)) diff --git a/rx/__init__.py b/rx/__init__.py index 9210b8fef..dc2e2c9ab 100644 --- a/rx/__init__.py +++ b/rx/__init__.py @@ -232,6 +232,71 @@ def from_iterable(iterable: Iterable) -> Observable: from_list = from_iterable +def from_marbles(string: str, timespan: typing.RelativeTime = 0.1, scheduler: typing.Scheduler = None, + lookup = None, error: Exception = None) -> Observable: + """Convert a marble diagram string to a cold observable sequence, using + an optional scheduler to enumerate the events. + + Each character in the string will advance time by timespan + (exept for space). Characters that are not special (see the table below) + will be interpreted as a value to be emitted. Numbers will be cast + to int or float. + + Special characters: + +--------+--------------------------------------------------------+ + | `-` | advance time by timespan | + +--------+--------------------------------------------------------+ + | `#` | on_error() | + +--------+--------------------------------------------------------+ + | `|` | on_completed() | + +--------+--------------------------------------------------------+ + | `(` | open a group of marbles sharing the same timestamp | + +--------+--------------------------------------------------------+ + | `)` | close a group of marbles | + +--------+--------------------------------------------------------+ + | `,` | separate elements in a group | + +--------+--------------------------------------------------------+ + | space | used to align multiple diagrams, does not advance time | + +--------+--------------------------------------------------------+ + + In a group of elements, the position of the initial `(` determines the + timestamp at which grouped elements will be emitted. E.g. `--(12,3,4)--` + will emit 12, 3, 4 at 2 * timespan and then advance virtual time + by 8 * timespan. + + Examples: + >>> from_marbles("--1--(2,3)-4--|") + >>> from_marbles("a--b--c-", lookup={'a': 1, 'b': 2, 'c': 3}) + >>> from_marbles("a--b---#", error=ValueError("foo")) + + Args: + string: String with marble diagram + + timespan: [Optional] duration of each character in second. + If not specified, defaults to 0.1s. + + lookup: [Optional] dict used to convert an element into a specified + value. If not specified, defaults to {}. + + error: [Optional] exception that will be use in place of the # symbol. + If not specified, defaults to Exception('error'). + + scheduler: [Optional] Scheduler to run the the input sequence + on. If not specified, defaults to the subscribe scheduler + if defined, else to NewThreadScheduler. + + Returns: + The observable sequence whose elements are pulled from the + given marble diagram string. + """ + + from .core.observable.marbles import from_marbles as _from_marbles + return _from_marbles(string, timespan, lookup=lookup, error=error, scheduler=scheduler) + + +cold = from_marbles + + def generate_with_relative_time(initial_state, condition, iterate, time_mapper) -> Observable: """Generates an observable sequence by iterating a state from an initial state until the condition fails. @@ -276,6 +341,70 @@ def generate(initial_state, condition, iterate) -> Observable: return _generate(initial_state, condition, iterate) +def hot(string, timespan: typing.RelativeTime=0.1, duetime:typing.AbsoluteOrRelativeTime = 0.0, + scheduler: typing.Scheduler = None, lookup=None, error: Exception = None) -> Observable: + """Convert a marble diagram string to a hot observable sequence, using + an optional scheduler to enumerate the events. + + Each character in the string will advance time by timespan + (exept for space). Characters that are not special (see the table below) + will be interpreted as a value to be emitted. Numbers will be cast + to int or float. + + Special characters: + +--------+--------------------------------------------------------+ + | `-` | advance time by timespan | + +--------+--------------------------------------------------------+ + | `#` | on_error() | + +--------+--------------------------------------------------------+ + | `|` | on_completed() | + +--------+--------------------------------------------------------+ + | `(` | open a group of elemets sharing the same timestamp | + +--------+--------------------------------------------------------+ + | `)` | close a group of elements | + +--------+--------------------------------------------------------+ + | `,` | separate elements in a group | + +--------+--------------------------------------------------------+ + | space | used to align multiple diagrams, does not advance time | + +--------+--------------------------------------------------------+ + + In a group of elements, the position of the initial `(` determines the + timestamp at which grouped elements will be emitted. E.g. `--(12,3,4)--` + will emit 12, 3, 4 at 2 * timespan and then advance virtual time + by 8 * timespan. + + Examples: + >>> hot("--1--(2,3)-4--|") + >>> hot("a--b--c-", lookup={'a': 1, 'b': 2, 'c': 3}) + >>> hot("a--b---#", error=ValueError("foo")) + + Args: + string: String with marble diagram + + timespan: [Optional] duration of each character in second. + If not specified, defaults to 0.1s. + + duetime: [Optional] Absolute datetime or timedelta from now that + determines when to start the emission of elements. + + lookup: [Optional] dict used to convert an element into a specified + value. If not specified, defaults to {}. + + error: [Optional] exception that will be use in place of the # symbol. + If not specified, defaults to Exception('error'). + + scheduler: [Optional] Scheduler to run the the input sequence + on. If not specified, defaults to NewThreadScheduler. + + Returns: + The observable sequence whose elements are pulled from the + given marble diagram string. + """ + + from .core.observable.marbles import hot as _hot + return _hot(string, timespan, duetime, lookup=lookup, error=error, scheduler=scheduler) + + def if_then(condition: Callable[[], bool], then_source: Observable, else_source: Observable = None) -> Observable: """Determines whether an observable collection contains values. diff --git a/rx/core/observable/marbles.py b/rx/core/observable/marbles.py new file mode 100644 index 000000000..15663dabe --- /dev/null +++ b/rx/core/observable/marbles.py @@ -0,0 +1,246 @@ +from typing import List, Dict, Tuple +import re +import threading +from datetime import datetime, timedelta + +from rx import Observable +from rx.core import notification +from rx.disposable import CompositeDisposable, Disposable +from rx.concurrency import NewThreadScheduler +from rx.core.typing import RelativeTime, AbsoluteOrRelativeTime, Scheduler + + +new_thread_scheduler = NewThreadScheduler() + +# tokens will be searched in the order below using pipe +# group of elements: match any characters surrounded by () +pattern_group = r"(\(.*?\))" +# timespan: match one or multiple hyphens +pattern_ticks = r"(-+)" +# comma err: match any comma which is not in a group +pattern_comma_error = r"(,)" +# element: match | or # or one or more characters which are not - | # ( ) , +pattern_element = r"(#|\||[^-,()#\|]+)" + +pattern = r'|'.join([ + pattern_group, + pattern_ticks, + pattern_comma_error, + pattern_element, + ]) +tokens = re.compile(pattern) + + +def hot(string: str, timespan: RelativeTime = 0.1, duetime: AbsoluteOrRelativeTime = 0.0, + lookup: Dict = None, error: Exception = None, scheduler: Scheduler = None) -> Observable: + + _scheduler = scheduler or new_thread_scheduler + + if isinstance(duetime, datetime): + duetime = duetime - _scheduler.now + + messages = parse( + string, + timespan=timespan, + time_shift=duetime, + lookup=lookup, + error=error, + raise_stopped=True, + ) + + lock = threading.RLock() + is_stopped = False + observers = [] + + def subscribe(observer, scheduler=None): + # should a hot observable already completed or on error + # re-push on_completed/on_error at subscription time? + if not is_stopped: + with lock: + observers.append(observer) + + def dispose(): + with lock: + try: + observers.remove(observer) + except ValueError: + pass + + return Disposable(dispose) + + def create_action(notification): + def action(scheduler, state=None): + nonlocal is_stopped + + with lock: + for observer in observers: + notification.accept(observer) + + if notification.kind in ('C', 'E'): + is_stopped = True + + return action + + for message in messages: + timespan, notification = message + action = create_action(notification) + + # Don't make closures within a loop + _scheduler.schedule_relative(timespan, action) + + return Observable(subscribe) + + +def from_marbles(string: str, timespan: RelativeTime = 0.1, lookup: Dict = None, + error: Exception = None, scheduler: Scheduler = None) -> Observable: + + disp = CompositeDisposable() + messages = parse(string, timespan=timespan, lookup=lookup, error=error, raise_stopped=True) + + def schedule_msg(message, observer, scheduler): + timespan, notification = message + + def action(scheduler, state=None): + notification.accept(observer) + + disp.add(scheduler.schedule_relative(timespan, action)) + + def subscribe(observer, scheduler_): + _scheduler = scheduler or scheduler_ or new_thread_scheduler + + for message in messages: + # Don't make closures within a loop + schedule_msg(message, observer, _scheduler) + + return disp + return Observable(subscribe) + + +def parse(string: str, timespan: RelativeTime = 1.0, time_shift: RelativeTime = 0.0, lookup: Dict = None, + error: Exception = None, raise_stopped: bool = False) -> List[Tuple[RelativeTime, notification.Notification]]: + """Convert a marble diagram string to a list of messages. + + Each character in the string will advance time by timespan + (exept for space). Characters that are not special (see the table below) + will be interpreted as a value to be emitted. numbers will be cast + to int or float. + + Special characters: + +--------+--------------------------------------------------------+ + | `-` | advance time by timespan | + +--------+--------------------------------------------------------+ + | `#` | on_error() | + +--------+--------------------------------------------------------+ + | `|` | on_completed() | + +--------+--------------------------------------------------------+ + | `(` | open a group of elements sharing the same timestamp | + +--------+--------------------------------------------------------+ + | `)` | close a group of elements | + +--------+--------------------------------------------------------+ + | `,` | separate elements in a group | + +--------+--------------------------------------------------------+ + | space | used to align multiple diagrams, does not advance time | + +--------+--------------------------------------------------------+ + + In a group of elements, the position of the initial `(` determines the + timestamp at which grouped elements will be emitted. E.g. `--(12,3,4)--` + will emit 12, 3, 4 at 2 * timespan and then advance virtual time + by 8 * timespan. + + Examples: + >>> parse("--1--(2,3)-4--|") + >>> parse("a--b--c-", lookup={'a': 1, 'b': 2, 'c': 3}) + >>> parse("a--b---#", error=ValueError("foo")) + + Args: + string: String with marble diagram + + timespan: [Optional] duration of each character in second. + If not specified, defaults to 0.1s. + + lookup: [Optional] dict used to convert an element into a specified + value. If not specified, defaults to {}. + + time_shift: [Optional] time used to delay every elements. + If not specified, defaults to 0.0s. + + error: [Optional] exception that will be use in place of the # symbol. + If not specified, defaults to Exception('error'). + + raise_finished: [optional] raise ValueError if elements are + declared after on_completed or on_error symbol. + + Returns: + A list of messages defined as a tuple of (timespan, notification). + + """ + + error = error or Exception('error') + lookup = lookup or {} + + if isinstance(timespan, timedelta): + timespan = timespan.total_seconds() + if isinstance(time_shift, timedelta): + time_shift = time_shift.total_seconds() + + string = string.replace(' ', '') + + # try to cast a string to an int, then to a float + def try_number(element): + try: + return int(element) + except ValueError: + try: + return float(element) + except ValueError: + return element + + def map_element(time, element): + if element == '|': + return (time, notification.OnCompleted()) + elif element == '#': + return (time, notification.OnError(error)) + else: + value = try_number(element) + value = lookup.get(value, value) + return (time, notification.OnNext(value)) + + is_stopped = False + def check_stopped(element): + nonlocal is_stopped + if raise_stopped: + if is_stopped: + raise ValueError('Elements cannot be declared after a # or | symbol.') + + if element in ('#', '|'): + is_stopped = True + + + iframe = 0 + messages = [] + + for results in tokens.findall(string): + timestamp = iframe * timespan + time_shift + group, ticks, comma_error, element = results + + if group: + elements = group[1:-1].split(',') + for elm in elements: + check_stopped(elm) + grp_messages = [map_element(timestamp, elm) for elm in elements if elm !=''] + messages.extend(grp_messages) + iframe += len(group) + + if ticks: + iframe += len(ticks) + + if comma_error: + raise ValueError("Comma is only allowed in group of elements.") + + if element: + check_stopped(element) + message = map_element(timestamp, element) + messages.append(message) + iframe += len(element) + + return messages diff --git a/rx/core/operators/tomarbles.py b/rx/core/operators/tomarbles.py new file mode 100644 index 000000000..dfe20a535 --- /dev/null +++ b/rx/core/operators/tomarbles.py @@ -0,0 +1,69 @@ +from typing import List + +from rx.core import Observable +from rx.core.typing import Scheduler, RelativeTime +from rx.concurrency import NewThreadScheduler + +new_thread_scheduler = NewThreadScheduler() + + +def _to_marbles(scheduler: Scheduler = None, timespan: RelativeTime = 0.1): + + def to_marbles(source: Observable) -> Observable: + """Convert an observable sequence into a marble diagram string. + + Args: + timespan: [Optional] duration of each character in second. + If not specified, defaults to 0.1s. + scheduler: [Optional] The scheduler used to run the the input + sequence on. + + Returns: + Observable stream. + """ + + def subscribe(observer, scheduler=None): + scheduler = scheduler or new_thread_scheduler + + result: List[str] = [] + last = scheduler.now + + def add_timespan(): + nonlocal last + + now = scheduler.now + diff = now - last + last = now + secs = scheduler.to_seconds(diff) + dashes = "-" * int((secs + timespan / 2.0) * (1.0 / timespan)) + result.append(dashes) + + def on_next(value): + add_timespan() + result.append(stringify(value)) + + def on_error(exception): + add_timespan() + result.append(stringify(exception)) + observer.on_next("".join(n for n in result)) + observer.on_completed() + + def on_completed(): + add_timespan() + result.append("|") + observer.on_next("".join(n for n in result)) + observer.on_completed() + + return source.subscribe_(on_next, on_error, on_completed) + return Observable(subscribe) + return to_marbles + + +def stringify(value): + """Utility for stringifying an event. + """ + string = str(value) + if len(string) > 1: + string = "(%s)" % string + + return string diff --git a/rx/operators/__init__.py b/rx/operators/__init__.py index d018e20a5..8ef5002fc 100644 --- a/rx/operators/__init__.py +++ b/rx/operators/__init__.py @@ -2456,6 +2456,22 @@ def to_iterable() -> Callable[[Observable], Observable]: return _to_iterable() +def to_marbles(timespan: typing.RelativeTime = 0.1, scheduler: typing.Scheduler = None ) -> Callable[[Observable], Observable]: + """Convert an observable sequence into a marble diagram string. + + Args: + timespan: [Optional] duration of each character in second. + If not specified, defaults to 0.1s. + scheduler: [Optional] The scheduler used to run the the input + sequence on. + + Returns: + Observable stream. + """ + from rx.core.operators.tomarbles import _to_marbles + return _to_marbles(scheduler=scheduler, timespan=timespan) + + def to_set() -> Callable[[Observable], Observable]: """Converts the observable sequence to a set. diff --git a/rx/testing/marbles.py b/rx/testing/marbles.py index 04d42be1f..24260b834 100644 --- a/rx/testing/marbles.py +++ b/rx/testing/marbles.py @@ -1,156 +1,155 @@ -from typing import List -import re +from typing import List, Tuple, Union, Dict +from collections import namedtuple +from contextlib import contextmanager +from warnings import warn +import rx from rx.core import Observable -from rx.disposable import CompositeDisposable from rx.concurrency import NewThreadScheduler -from rx.core.notification import OnNext, OnError, OnCompleted +from rx.core.typing import Callable +from rx.testing import TestScheduler, Recorded, ReactiveTest +from rx.core.observable.marbles import parse new_thread_scheduler = NewThreadScheduler() -_pattern = r"\(?([a-zA-Z0-9]+)\)?|(-|[xX]|\|)" -_tokens = re.compile(_pattern) +MarblesContext = namedtuple('MarblesContext', 'start, cold, hot, exp') -def from_marbles(string: str, timespan=0.1) -> Observable: - """Convert a marble diagram string to an observable sequence, using - an optional scheduler to enumerate the events. - Special characters: - - = Timespan of timespan seconds - x = on_error() - | = on_completed() - - All other characters are treated as an on_next() event at the given - moment they are found on the string. - - Examples: - >>> res = rx.from_marbles("1-2-3-|") - >>> res = rx.from_marbles("1-(42)-3-|") - >>> res = rx.from_marbles("1-2-3-x", timeout_scheduler) - - Args: - string: String with marble diagram - scheduler: [Optional] Scheduler to run the the input sequence - on. - - Returns: - The observable sequence whose elements are pulled from the - given marble diagram string. +@contextmanager +def marbles_testing(timespan=1.0): """ + Initialize a :class:`rx.testing.TestScheduler` and return a namedtuple + containing the following functions that wrap its methods. - disp = CompositeDisposable() - completed = [False] - messages = [] - timedelta = [0] - - def handle_timespan(value): - timedelta[0] += timespan - - def handle_on_next(value): - try: - value = int(value) - except Exception: - pass - - if value in ('T', 'F'): - value = value == 'T' - messages.append((OnNext(value), timedelta[0])) - - def handle_on_completed(value): - messages.append((OnCompleted(), timedelta[0])) - completed[0] = True - - def handle_on_error(value): - messages.append((OnError(value), timedelta[0])) - completed[0] = True - - specials = { - '-': handle_timespan, - 'x': handle_on_error, - 'X': handle_on_error, - '|': handle_on_completed - } - - for groups in _tokens.findall(string): - for token in groups: - if token: - func = specials.get(token, handle_on_next) - func(token) + :func:`cold()`: + Parse a marbles string and return a cold observable - if not completed[0]: - messages.append((OnCompleted(), timedelta[0])) + :func:`hot()`: + Parse a marbles string and return a hot observable - def schedule_msg(message, observer, scheduler): - notification, timespan = message + :func:`start()`: + Start the test scheduler, invoke the create function, + subscribe to the resulting sequence, dispose the subscription and + return the resulting records - def action(scheduler, state=None): - notification.accept(observer) + :func:`exp()`: + Parse a marbles string and return a list of records - disp.add(scheduler.schedule_relative(timespan, action)) - - def subscribe(observer, scheduler): - scheduler = scheduler or new_thread_scheduler - - for message in messages: - # Don't make closures within a loop - schedule_msg(message, observer, scheduler) - return disp - return Observable(subscribe) - - -def to_marbles(scheduler=None, timespan=0.1): - """Convert an observable sequence into a marble diagram string - - Args: - scheduler: [Optional] The scheduler used to run the the input - sequence on. + Examples: + >>> with marbles_testing() as (start, cold, hot, exp): + ... obs = hot("-a-----b---c-|") + ... ex = exp( "-a-----b---c-|") + ... results = start(obs) + ... assert results == ex + + The underlying test scheduler is initialized with the following + parameters: + - created time = 100.0s + - subscribed = 200.0s + - disposed = 1000.0s + + **IMPORTANT**: regarding :func:`hot()`, a marble declared as the + first character will be skipped by the test scheduler. + E.g. hot("a--b--") will only emit b. + """ - Returns: - Observable stream. + scheduler = TestScheduler() + created = 100.0 + disposed = 1000.0 + subscribed = 200.0 + start_called = False + outside_of_context = False + + def check(): + if outside_of_context: + warn('context functions should not be called outside of ' + 'with statement.', + UserWarning, + stacklevel=3, + ) + + if start_called: + warn('start() should only be called one time inside ' + 'a with statement.', + UserWarning, + stacklevel=3, + ) + + def test_start(create: Union[Observable, Callable[[], Observable]]) -> List[Recorded]: + nonlocal start_called + check() + + def default_create(): + return create + + if isinstance(create, Observable): + create_function = default_create + else: + create_function = create + + mock_observer = scheduler.start( + create=create_function, + created=created, + subscribed=subscribed, + disposed=disposed, + ) + start_called = True + return mock_observer.messages + + def test_expected(string: str, lookup: Dict = None, error: Exception = None) -> List[Recorded]: + messages = parse( + string, + timespan=timespan, + time_shift=subscribed, + lookup=lookup, + error=error, + ) + return messages_to_records(messages) + + def test_cold(string: str, lookup: Dict = None, error: Exception = None) -> Observable: + check() + return rx.from_marbles( + string, + timespan=timespan, + lookup=lookup, + error=error, + ) + + def test_hot(string: str, lookup: Dict = None, error: Exception = None) -> Observable: + check() + hot_obs = rx.hot( + string, + timespan=timespan, + duetime=subscribed, + lookup=lookup, + error=error, + scheduler=scheduler, + ) + return hot_obs + + try: + yield MarblesContext(test_start, test_cold, test_hot, test_expected) + finally: + outside_of_context = True + + +def messages_to_records(messages: List[Tuple]) -> List[Recorded]: """ - def _to_marbles(source: Observable) -> Observable: - def subscribe(observer, scheduler=None): - scheduler = scheduler or new_thread_scheduler - - result: List[str] = [] - last = scheduler.now - - def add_timespan(): - nonlocal last - - now = scheduler.now - diff = now - last - last = now - secs = scheduler.to_seconds(diff) - dashes = "-" * int((secs + timespan / 2.0) * (1.0 / timespan)) - result.append(dashes) - - def on_next(value): - add_timespan() - result.append(stringify(value)) - - def on_error(exception): - add_timespan() - result.append(stringify(exception)) - observer.on_next("".join(n for n in result)) - observer.on_completed() - - def on_completed(): - add_timespan() - result.append("|") - observer.on_next("".join(n for n in result)) - observer.on_completed() - - return source.subscribe_(on_next, on_error, on_completed) - return Observable(subscribe) - return _to_marbles - - -def stringify(value): - """Utility for stringifying an event. + Helper function to convert messages returned by parse() to a list of + Recorded. """ - string = str(value) - if len(string) > 1: - string = "(%s)" % string + records = [] + + dispatcher = dict( + N=lambda t, n: ReactiveTest.on_next(t, n.value), + E=lambda t, n: ReactiveTest.on_error(t, n.exception), + C=lambda t, n: ReactiveTest.on_completed(t) + ) + + for message in messages: + time, notification = message + kind = notification.kind + record = dispatcher[kind](time, notification) + records.append(record) - return string + return records diff --git a/tests/test_observable/test_marbles.py b/tests/test_observable/test_marbles.py new file mode 100644 index 000000000..e38395a8b --- /dev/null +++ b/tests/test_observable/test_marbles.py @@ -0,0 +1,547 @@ +import unittest + +import rx +from rx.core import notification +from rx.core.observable.marbles import parse +from rx.testing import TestScheduler +from rx.testing.reactivetest import ReactiveTest +import datetime + + +def mess_on_next(time, value): + return (time, notification.OnNext(value)) + + +def mess_on_error(time, error): + return (time, notification.OnError(error)) + + +def mess_on_completed(time): + return (time, notification.OnCompleted()) + + +class TestParse(unittest.TestCase): + + def test_parse_just_on_error(self): + string = "#" + results = parse(string) + expected = [mess_on_error(0.0, Exception('error'))] + assert results == expected + + def test_parse_just_on_error_specified(self): + string = "#" + ex = Exception('Foo') + results = parse(string, error=ex) + expected = [mess_on_error(0.0, ex)] + assert results == expected + + def test_parse_just_on_completed(self): + string = "|" + results = parse(string) + expected = [mess_on_completed(0.0)] + assert results == expected + + def test_parse_just_on_next(self): + string = "a" + results = parse(string) + expected = [mess_on_next(0.0, 'a')] + assert results == expected + + def test_parse_marble_timespan(self): + string = "a--b---c" + " 012345678901234567890" + ts = 0.1 + results = parse(string, timespan=ts) + expected = [ + mess_on_next(0 * ts, 'a'), + mess_on_next(3 * ts, 'b'), + mess_on_next(7 * ts, 'c'), + ] + assert results == expected + + def test_parse_marble_timedelta(self): + string = "a--b---c" + " 012345678901234567890" + ts = 0.1 + results = parse(string, timespan=datetime.timedelta(seconds=ts)) + expected = [ + mess_on_next(0 * ts, 'a'), + mess_on_next(3 * ts, 'b'), + mess_on_next(7 * ts, 'c'), + ] + assert results == expected + + def test_parse_marble_multiple_digits(self): + string = "-ab-cde--" + " 012345678901234567890" + results = parse(string) + expected = [ + mess_on_next(1.0, 'ab'), + mess_on_next(4.0, 'cde'), + ] + assert results == expected + + def test_parse_marble_multiple_digits_int(self): + string = "-1-22-333-" + " 012345678901234567890" + results = parse(string) + expected = [ + mess_on_next(1.0, 1), + mess_on_next(3.0, 22), + mess_on_next(6.0, 333), + ] + assert results == expected + + def test_parse_marble_multiple_digits_float(self): + string = "-1.0--2.345--6.7e8-" + " 012345678901234567890" + results = parse(string) + expected = [ + mess_on_next(1.0, float('1.0')), + mess_on_next(6.0, float('2.345')), + mess_on_next(13.0, float('6.7e8')), + ] + assert results == expected + + def test_parse_marble_completed(self): + string = "-ab-c--|" + " 012345678901234567890" + results = parse(string) + expected = [ + mess_on_next(1.0, 'ab'), + mess_on_next(4.0, 'c'), + mess_on_completed(7.0), + ] + assert results == expected + + def test_parse_marble_with_error(self): + string = "-a-b-c--#--" + " 012345678901234567890" + ex = Exception('ex') + results = parse(string, error=ex) + expected = [ + mess_on_next(1.0, 'a'), + mess_on_next(3.0, 'b'), + mess_on_next(5.0, 'c'), + mess_on_error(8.0, ex), + ] + assert results == expected + + def test_parse_marble_with_space(self): + string = " -a b- c- de |" + " 01 23 45 67 8901234567890" + results = parse(string) + expected = [ + mess_on_next(1.0, 'ab'), + mess_on_next(4.0, 'c'), + mess_on_next(6.0, 'de'), + mess_on_completed(8.0), + ] + assert results == expected + + def test_parse_marble_with_group(self): + string = "-x(ab,12,1.5)-c--(de)-|" + " 012345678901234567890123" + " 0 1 2 " + results = parse(string) + expected = [ + mess_on_next(1.0, 'x'), + mess_on_next(2.0, 'ab'), + mess_on_next(2.0, 12), + mess_on_next(2.0, float('1.5')), + + mess_on_next(14.0, 'c'), + mess_on_next(17.0, 'de'), + + mess_on_completed(22.0), + ] + assert results == expected + + def test_parse_marble_lookup(self): + string = "-ab-c-12-3-|" + " 012345678901234567890" + lookup = { + 'ab': 'aabb', + 'c': 'cc', + 12: '1122', + 3: 33, + } + + results = parse(string, lookup=lookup) + expected = [ + mess_on_next(1.0, 'aabb'), + mess_on_next(4.0, 'cc'), + mess_on_next(6.0, '1122'), + mess_on_next(9.0, 33), + mess_on_completed(11.0), + ] + assert results == expected + + def test_parse_marble_time_shift(self): + string = "-ab----c-d-|" + " 012345678901234567890" + offset = 10.0 + results = parse(string, time_shift=offset) + expected = [ + mess_on_next(1.0 + offset, 'ab'), + mess_on_next(7.0 + offset, 'c'), + mess_on_next(9.0 + offset, 'd'), + mess_on_completed(11.0 + offset), + ] + assert results == expected + + def test_parse_marble_raise_with_elements_after_error(self): + string = "-a-b-c--#-1-" + " 012345678901234567890" + with self.assertRaises(ValueError): + parse(string, raise_stopped=True) + + def test_parse_marble_raise_with_elements_after_completed(self): + string = "-a-b-c--|-1-" + " 012345678901234567890" + with self.assertRaises(ValueError): + parse(string, raise_stopped=True) + + def test_parse_marble_raise_with_elements_after_completed_group(self): + string = "-a-b-c--(|,1)-" + " 012345678901234567890" + with self.assertRaises(ValueError): + parse(string, raise_stopped=True) + + def test_parse_marble_raise_with_elements_after_error_group(self): + string = "-a-b-c--(#,1)-" + " 012345678901234567890" + with self.assertRaises(ValueError): + parse(string, raise_stopped=True) + + +class TestFromMarble(unittest.TestCase): + def create_factory(self, observable): + def create(): + return observable + return create + + def test_from_marbles_on_error(self): + string = "#" + obs = rx.from_marbles(string) + scheduler = TestScheduler() + results = scheduler.start(self.create_factory(obs)).messages + + expected = [ReactiveTest.on_error(200.0, Exception('error'))] + assert results == expected + + def test_from_marbles_on_error_specified(self): + string = "#" + ex = Exception('Foo') + obs = rx.from_marbles(string, error=ex) + scheduler = TestScheduler() + results = scheduler.start(self.create_factory(obs)).messages + + expected = [ReactiveTest.on_error(200.0, ex)] + assert results == expected + + def test_from_marbles_on_complete(self): + string = "|" + obs = rx.from_marbles(string) + scheduler = TestScheduler() + results = scheduler.start(self.create_factory(obs)).messages + expected = [ReactiveTest.on_completed(200.0)] + assert results == expected + + def test_from_marbles_on_next(self): + string = "a" + obs = rx.from_marbles(string) + scheduler = TestScheduler() + results = scheduler.start(self.create_factory(obs)).messages + expected = [ReactiveTest.on_next(200.0, 'a')] + assert results == expected + + def test_from_marbles_timespan(self): + string = "a--b---c" + " 012345678901234567890" + ts = 0.5 + obs = rx.from_marbles(string, timespan=ts) + scheduler = TestScheduler() + results = scheduler.start(self.create_factory(obs)).messages + expected = [ + ReactiveTest.on_next(0 * ts + 200.0, 'a'), + ReactiveTest.on_next(3 * ts + 200.0, 'b'), + ReactiveTest.on_next(7 * ts + 200.0, 'c'), + ] + assert results == expected + + def test_from_marbles_marble_completed(self): + string = "-ab-c--|" + " 012345678901234567890" + obs = rx.from_marbles(string) + scheduler = TestScheduler() + results = scheduler.start(self.create_factory(obs)).messages + expected = [ + ReactiveTest.on_next(200.1, 'ab'), + ReactiveTest.on_next(200.4, 'c'), + ReactiveTest.on_completed(200.7), + ] + assert results == expected + + def test_from_marbles_marble_with_error(self): + string = "-ab-c--#--" + " 012345678901234567890" + ex = Exception('ex') + obs = rx.from_marbles(string, error=ex) + scheduler = TestScheduler() + results = scheduler.start(self.create_factory(obs)).messages + expected = [ + ReactiveTest.on_next(200.1, 'ab'), + ReactiveTest.on_next(200.4, 'c'), + ReactiveTest.on_error(200.7, ex), + ] + assert results == expected + + def test_from_marbles_marble_with_consecutive_symbols(self): + string = "-ab(12)#--" + " 012345678901234567890" + ex = Exception('ex') + obs = rx.from_marbles(string, error=ex) + scheduler = TestScheduler() + results = scheduler.start(self.create_factory(obs)).messages + expected = [ + ReactiveTest.on_next(200.1, 'ab'), + ReactiveTest.on_next(200.3, 12), + ReactiveTest.on_error(200.7, ex), + ] + assert results == expected + + def test_from_marbles_marble_with_space(self): + string = " -a b- c- - |" + " 01 23 45 6 78901234567890" + obs = rx.from_marbles(string) + scheduler = TestScheduler() + results = scheduler.start(self.create_factory(obs)).messages + expected = [ + ReactiveTest.on_next(200.1, 'ab'), + ReactiveTest.on_next(200.4, 'c'), + ReactiveTest.on_completed(200.7), + ] + assert results == expected + + def test_from_marbles_marble_with_group(self): + string = "-(ab)-c-(12.5,def)--(6,|)" + " 012345678901234567890" + obs = rx.from_marbles(string) + scheduler = TestScheduler() + results = scheduler.start(self.create_factory(obs)).messages + expected = [ + ReactiveTest.on_next(200.1, 'ab'), + ReactiveTest.on_next(200.6, 'c'), + ReactiveTest.on_next(200.8, str(12.5)), + ReactiveTest.on_next(200.8, 'def'), + ReactiveTest.on_next(202.0, 6), + ReactiveTest.on_completed(202.0), + ] + assert results == expected + + def test_from_marbles_marble_lookup(self): + string = "-ab-c-12-3-|" + " 012345678901234567890" + lookup = { + 'ab': 'aabb', + 'c': 'cc', + 12: '1122', + 3: 33, + } + obs = rx.from_marbles(string, lookup=lookup) + scheduler = TestScheduler() + results = scheduler.start(self.create_factory(obs)).messages + expected = [ + ReactiveTest.on_next(200.1, 'aabb'), + ReactiveTest.on_next(200.4, 'cc'), + ReactiveTest.on_next(200.6, '1122'), + ReactiveTest.on_next(200.9, 33), + ReactiveTest.on_completed(201.1), + ] + assert results == expected + + +class TestHot(unittest.TestCase): + def create_factory(self, observable): + def create(): + return observable + return create + + def test_hot_on_error(self): + string = "#" + scheduler = TestScheduler() + obs = rx.hot(string, 0.1, 200.1, scheduler=scheduler) + results = scheduler.start(self.create_factory(obs)).messages + + expected = [ReactiveTest.on_error(200.1, Exception('error'))] + assert results == expected + + def test_hot_on_error_specified(self): + string = "#" + ex = Exception('Foo') + scheduler = TestScheduler() + obs = rx.hot(string, 0.1, 200.1, error=ex, scheduler=scheduler) + results = scheduler.start(self.create_factory(obs)).messages + + expected = [ReactiveTest.on_error(200.1, ex)] + assert results == expected + + def test_hot_on_complete(self): + string = "|" + scheduler = TestScheduler() + obs = rx.hot(string, 0.1, 200.1, scheduler=scheduler) + results = scheduler.start(self.create_factory(obs)).messages + expected = [ReactiveTest.on_completed(200.1)] + assert results == expected + + def test_hot_on_next(self): + string = "a" + scheduler = TestScheduler() + obs = rx.hot(string, 0.1, 200.1, scheduler=scheduler) + results = scheduler.start(self.create_factory(obs)).messages + expected = [ReactiveTest.on_next(200.1, 'a')] + assert results == expected + + def test_hot_skipped_at_200(self): + string = "a" + scheduler = TestScheduler() + obs = rx.hot(string, 0.1, 200.0, scheduler=scheduler) + results = scheduler.start(self.create_factory(obs)).messages + expected = [] + assert results == expected + + def test_hot_timespan(self): + string = "-a-b---c" + " 012345678901234567890" + ts = 0.5 + scheduler = TestScheduler() + obs = rx.hot(string, ts, 200.0, scheduler=scheduler) + results = scheduler.start(self.create_factory(obs)).messages + expected = [ + ReactiveTest.on_next(1 * ts + 200.0, 'a'), + ReactiveTest.on_next(3 * ts + 200.0, 'b'), + ReactiveTest.on_next(7 * ts + 200.0, 'c'), + ] + assert results == expected + + def test_hot_marble_completed(self): + string = "-ab-c--|" + " 012345678901234567890" + scheduler = TestScheduler() + obs = rx.hot(string, 0.1, 200.0, scheduler=scheduler) + results = scheduler.start(self.create_factory(obs)).messages + expected = [ + ReactiveTest.on_next(200.1, 'ab'), + ReactiveTest.on_next(200.4, 'c'), + ReactiveTest.on_completed(200.7), + ] + assert results == expected + + def test_hot_marble_with_error(self): + string = "-ab-c--#--" + " 012345678901234567890" + ex = Exception('ex') + scheduler = TestScheduler() + obs = rx.hot(string, 0.1, 200.0, error=ex, scheduler=scheduler) + results = scheduler.start(self.create_factory(obs)).messages + expected = [ + ReactiveTest.on_next(200.1, 'ab'), + ReactiveTest.on_next(200.4, 'c'), + ReactiveTest.on_error(200.7, ex), + ] + assert results == expected + + def test_hot_marble_with_consecutive_symbols(self): + string = "-ab(12)#--" + " 012345678901234567890" + ex = Exception('ex') + scheduler = TestScheduler() + obs = rx.hot(string, 0.1, 200.0, error=ex, scheduler=scheduler) + results = scheduler.start(self.create_factory(obs)).messages + expected = [ + ReactiveTest.on_next(200.1, 'ab'), + ReactiveTest.on_next(200.3, 12), + ReactiveTest.on_error(200.7, ex), + ] + assert results == expected + + def test_hot_marble_with_space(self): + string = " -a b- c- - |" + " 01 23 45 6 78901234567890" + scheduler = TestScheduler() + obs = rx.hot(string, 0.1, 200.0, scheduler=scheduler) + results = scheduler.start(self.create_factory(obs)).messages + expected = [ + ReactiveTest.on_next(200.1, 'ab'), + ReactiveTest.on_next(200.4, 'c'), + ReactiveTest.on_completed(200.7), + ] + assert results == expected + + def test_hot_marble_with_group(self): + string = "-(ab)-c-(12.5,def)--(6,|)" + " 01234567890123456789012345" + scheduler = TestScheduler() + obs = rx.hot(string, 0.1, 200.0, scheduler=scheduler) + results = scheduler.start(self.create_factory(obs)).messages + expected = [ + ReactiveTest.on_next(200.1, 'ab'), + ReactiveTest.on_next(200.6, 'c'), + ReactiveTest.on_next(200.8, str(12.5)), + ReactiveTest.on_next(200.8, 'def'), + ReactiveTest.on_next(202.0, 6), + ReactiveTest.on_completed(202.0), + ] + assert results == expected + + def test_hot_marble_lookup(self): + string = "-ab-c-12-3-|" + " 012345678901234567890" + lookup = { + 'ab': 'aabb', + 'c': 'cc', + 12: '1122', + 3: 33, + } + scheduler = TestScheduler() + obs = rx.hot(string, 0.1, 200.0, lookup=lookup, scheduler=scheduler) + results = scheduler.start(self.create_factory(obs)).messages + expected = [ + ReactiveTest.on_next(200.1, 'aabb'), + ReactiveTest.on_next(200.4, 'cc'), + ReactiveTest.on_next(200.6, '1122'), + ReactiveTest.on_next(200.9, 33), + ReactiveTest.on_completed(201.1), + ] + assert results == expected + + def test_hot_marble_with_datetime(self): + string = "-ab-c--|" + " 012345678901234567890" + scheduler = TestScheduler() + duetime = scheduler.now + datetime.timedelta(seconds=300.0) + + obs = rx.hot(string, 0.1, duetime, scheduler=scheduler) + results = scheduler.start(self.create_factory(obs)).messages + expected = [ + ReactiveTest.on_next(300.1, 'ab'), + ReactiveTest.on_next(300.4, 'c'), + ReactiveTest.on_completed(300.7), + ] + assert results == expected + + def test_hot_marble_with_timedelta(self): + string = "-ab-c--|" + " 012345678901234567890" + scheduler = TestScheduler() + duetime = datetime.timedelta(seconds=300.0) + + obs = rx.hot(string, 0.1, duetime, scheduler=scheduler) + results = scheduler.start(self.create_factory(obs)).messages + expected = [ + ReactiveTest.on_next(300.1, 'ab'), + ReactiveTest.on_next(300.4, 'c'), + ReactiveTest.on_completed(300.7), + ] + assert results == expected diff --git a/tests/test_testing/test_marbles.py b/tests/test_testing/test_marbles.py index 37c71c125..0e6dc92ba 100644 --- a/tests/test_testing/test_marbles.py +++ b/tests/test_testing/test_marbles.py @@ -1,11 +1,12 @@ import unittest -from rx import Observable -from rx.testing import marbles, TestScheduler +from rx.testing.marbles import marbles_testing +from rx.testing.reactivetest import ReactiveTest + #from rx.concurrency import timeout_scheduler, new_thread_scheduler # marble sequences to test: -tested_marbles = '0-1-(10)|', '0|', '(10)-(20)|', '(abc)-|' +#tested_marbles = '0-1-(10)|', '0|', '(10)-(20)|', '(abc)-|' # class TestFromToMarbles(unittest.TestCase): @@ -42,3 +43,147 @@ # self._run_test(expected, new_thread_scheduler, TestScheduler()) +class TestTestContext(unittest.TestCase): + + def test_start_with_cold_never(self): + with marbles_testing() as (start, cold, hot, exp): + obs = cold("----") + " 012345678901234567890" + + def create(): + return obs + + results = start(create) + expected = [] + assert results == expected + + def test_start_with_cold_empty(self): + with marbles_testing() as (start, cold, hot, exp): + obs = cold("------|") + " 012345678901234567890" + + def create(): + return obs + + results = start(create) + expected = [ReactiveTest.on_completed(206)] + assert results == expected + + def test_start_with_cold_normal(self): + with marbles_testing() as (start, cold, hot, exp): + obs = cold("12--3-|") + " 012345678901234567890" + + def create(): + return obs + + results = start(create) + expected = [ + ReactiveTest.on_next(200.0, 12), + ReactiveTest.on_next(204.0, 3), + ReactiveTest.on_completed(206.0), + ] + assert results == expected + + def test_start_with_cold_no_create_function(self): + with marbles_testing() as (start, cold, hot, exp): + obs = cold("12--3-|") + " 012345678901234567890" + + results = start(obs) + expected = [ + ReactiveTest.on_next(200.0, 12), + ReactiveTest.on_next(204.0, 3), + ReactiveTest.on_completed(206.0), + ] + assert results == expected + + def test_start_with_hot_never(self): + with marbles_testing() as (start, cold, hot, exp): + obs = hot("------") + " 012345678901234567890" + + def create(): + return obs + + results = start(create) + expected = [] + assert results == expected + + def test_start_with_hot_empty(self): + with marbles_testing() as (start, cold, hot, exp): + obs = hot("---|") + " 012345678901234567890" + + def create(): + return obs + + results = start(create) + expected = [ReactiveTest.on_completed(203.0), ] + assert results == expected + + def test_start_with_hot_normal(self): + with marbles_testing() as (start, cold, hot, exp): + obs = hot("-12--3-|") + " 012345678901234567890" + + def create(): + return obs + + results = start(create) + expected = [ + ReactiveTest.on_next(201.0, 12), + ReactiveTest.on_next(205.0, 3), + ReactiveTest.on_completed(207.0), + ] + assert results == expected + + def test_exp(self): + with marbles_testing() as (start, cold, hot, exp): + results = exp("12--3--4--5-|") + " 012345678901234567890" + + expected = [ + ReactiveTest.on_next(200.0, 12), + ReactiveTest.on_next(204.0, 3), + ReactiveTest.on_next(207.0, 4), + ReactiveTest.on_next(210.0, 5), + ReactiveTest.on_completed(212.0), + ] + assert results == expected + + def test_start_with_hot_and_exp(self): + with marbles_testing() as (start, cold, hot, exp): + obs = hot(" --3--4--5-|") + expected = exp("--3--4--5-|") + " 012345678901234567890" + + def create(): + return obs + + results = start(create) + assert results == expected + + def test_start_with_cold_and_exp(self): + with marbles_testing() as (start, cold, hot, exp): + obs = cold(" 12--3--4--5-|") + expected = exp(" 12--3--4--5-|") + " 012345678901234567890" + + def create(): + return obs + + results = start(create) + assert results == expected + + def test_start_with_cold_and_exp_group(self): + with marbles_testing() as (start, cold, hot, exp): + obs = cold(" 12--(3,6.5)----(5,#)") + expected = exp(" 12--(3,6.5)----(5,#)") + " 012345678901234567890" + + def create(): + return obs + + results = start(create) + assert results == expected