Skip to content

Commit

Permalink
fix(rstream): Fix potential reference error in transduce()
Browse files Browse the repository at this point in the history
  • Loading branch information
andrew8er committed Jul 9, 2018
1 parent 576f1da commit 7f2d5dd
Showing 1 changed file with 26 additions and 24 deletions.
50 changes: 26 additions & 24 deletions packages/rstream/src/subs/transduce.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
import { Reducer, Transducer } from "@thi.ng/transducers/api";
import { Subscription } from '@thi.ng/rstream';
import { Reducer, Transducer } from '@thi.ng/transducers';
import { isReduced } from "@thi.ng/transducers/reduced";

import { Subscription } from "../subscription";

export function transduce<A, B, C>(src: Subscription<any, A>, tx: Transducer<A, B>, rfn: Reducer<C, B>, init?: C): Promise<C> {
let acc = init !== undefined ? init : rfn[0]();
return new Promise((resolve, reject) => {
const sub = src.subscribe({
next(x) {
const _acc = rfn[2](acc, x);
if (isReduced(_acc)) {
sub.unsubscribe();
resolve(_acc.deref());
} else {
acc = _acc;
}
},
done() {
sub.unsubscribe();
resolve(acc);
},
error(e) {
reject(e);
}
}, tx);
});
let acc = init !== undefined ? init : rfn[0]();
let sub: Subscription<A, B>;

return new Promise<C>((resolve, reject) => {
sub = src.subscribe({
next(x) {
const _acc = rfn[2](acc, x);
if (isReduced(_acc)) {
resolve(_acc.deref());
} else {
acc = _acc;
}
},
done() {
resolve(acc);
},
error(e) {
reject(e);
}
}, tx);
}).then(
fulfilled => { sub.unsubscribe(); return fulfilled },
rejected => { sub.unsubscribe(); throw rejected }
);
}

0 comments on commit 7f2d5dd

Please sign in to comment.