Skip to content

Commit

Permalink
Fix zip(); udpate merge()
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Feb 21, 2016
1 parent 7d285d5 commit 0092679
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions src/Observable/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,14 @@ function merge(array $observables)
return new Coroutine($observable->each($emit));
}, $observables);

yield Awaitable\all($coroutines)->then(null, function (\Exception $exception) use ($coroutines) {
try {
yield Awaitable\all($coroutines);
} catch (\Exception $exception) {
foreach ($coroutines as $coroutine) {
$coroutine->cancel($exception);
}
throw $exception;
});
}
});
}

Expand Down Expand Up @@ -195,11 +197,14 @@ function ($value) use (&$i, &$next, &$delayed, $key, $count, $emit) {
));
}

yield Awaitable\choose($coroutines)->cleanup(function () use ($coroutines) {
try {
yield Awaitable\choose($coroutines);
yield $delayed;
} finally {
foreach ($coroutines as $coroutine) {
$coroutine->cancel();
}
});
}

yield $i; // Return the number of times a set was emitted.
});
Expand Down

0 comments on commit 0092679

Please sign in to comment.