From 7f2d5dd294d00947125c78a213c0f8899ae1e01f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andre=CC=81=20Wachter?= Date: Mon, 9 Jul 2018 15:47:48 +0200 Subject: [PATCH] fix(rstream): Fix potential reference error in transduce() --- packages/rstream/src/subs/transduce.ts | 50 +++++++++++++------------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/packages/rstream/src/subs/transduce.ts b/packages/rstream/src/subs/transduce.ts index aabbf8ac6e..30ad93e73a 100644 --- a/packages/rstream/src/subs/transduce.ts +++ b/packages/rstream/src/subs/transduce.ts @@ -1,28 +1,30 @@ -import { Reducer, Transducer } from "@thi.ng/transducers/api"; +import { Subscription } from '@thi.ng/rstream'; +import { Reducer, Transducer } from '@thi.ng/transducers'; import { isReduced } from "@thi.ng/transducers/reduced"; -import { Subscription } from "../subscription"; - export function transduce(src: Subscription, tx: Transducer, rfn: Reducer, init?: C): Promise { - let acc = init !== undefined ? init : rfn[0](); - return new Promise((resolve, reject) => { - const sub = src.subscribe({ - next(x) { - const _acc = rfn[2](acc, x); - if (isReduced(_acc)) { - sub.unsubscribe(); - resolve(_acc.deref()); - } else { - acc = _acc; - } - }, - done() { - sub.unsubscribe(); - resolve(acc); - }, - error(e) { - reject(e); - } - }, tx); - }); + let acc = init !== undefined ? init : rfn[0](); + let sub: Subscription; + + return new Promise((resolve, reject) => { + sub = src.subscribe({ + next(x) { + const _acc = rfn[2](acc, x); + if (isReduced(_acc)) { + resolve(_acc.deref()); + } else { + acc = _acc; + } + }, + done() { + resolve(acc); + }, + error(e) { + reject(e); + } + }, tx); + }).then( + fulfilled => { sub.unsubscribe(); return fulfilled }, + rejected => { sub.unsubscribe(); throw rejected } + ); }