From 00926796b2f16cb97530204a718a7ea8dcd6f18b Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sun, 21 Feb 2016 13:07:36 -0600 Subject: [PATCH] Fix zip(); udpate merge() --- src/Observable/functions.php | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/Observable/functions.php b/src/Observable/functions.php index d2b44c1..26c6ed1 100644 --- a/src/Observable/functions.php +++ b/src/Observable/functions.php @@ -80,12 +80,14 @@ function merge(array $observables) return new Coroutine($observable->each($emit)); }, $observables); - yield Awaitable\all($coroutines)->then(null, function (\Exception $exception) use ($coroutines) { + try { + yield Awaitable\all($coroutines); + } catch (\Exception $exception) { foreach ($coroutines as $coroutine) { $coroutine->cancel($exception); } throw $exception; - }); + } }); } @@ -195,11 +197,14 @@ function ($value) use (&$i, &$next, &$delayed, $key, $count, $emit) { )); } - yield Awaitable\choose($coroutines)->cleanup(function () use ($coroutines) { + try { + yield Awaitable\choose($coroutines); + yield $delayed; + } finally { foreach ($coroutines as $coroutine) { $coroutine->cancel(); } - }); + } yield $i; // Return the number of times a set was emitted. });