diff --git a/packages/rstream/src/stream.ts b/packages/rstream/src/stream.ts index ae5c9faf8d..efc3e89754 100644 --- a/packages/rstream/src/stream.ts +++ b/packages/rstream/src/stream.ts @@ -69,10 +69,15 @@ import { optsWithID } from "./utils/idgen"; * @param src - * @param opts - */ -export function stream(opts?: Partial): Stream; -// prettier-ignore -export function stream(src: StreamSource, opts?: Partial): Stream; -export function stream(src?: any, opts?: Partial): Stream { +export function stream(opts?: Partial): Stream; +export function stream( + src: StreamSource, + opts?: Partial +): Stream; +export function stream( + src?: any, + opts?: Partial +): Stream { return new Stream(src, opts); } @@ -131,7 +136,16 @@ export class Stream extends Subscription implements IStream { ): any { const $sub = super.subscribe(sub, opts); if (!this._inited) { - this._cancel = (this.src && this.src(this)) || (() => void 0); + if (this.src) { + try { + this._cancel = this.src(this) || (() => void 0); + } catch (e) { + let s = this.wrapped; + if (!s || !s.error || !s.error(e)) { + this.unhandledError(e); + } + } + } this._inited = true; } return $sub; diff --git a/packages/rstream/test/subscription.ts b/packages/rstream/test/subscription.ts index d8a4309d1d..0cc9be8972 100644 --- a/packages/rstream/test/subscription.ts +++ b/packages/rstream/test/subscription.ts @@ -5,11 +5,12 @@ import { fromIterable, fromIterableSync, State, + stream, Stream, subscription, } from "../src"; import { TIMEOUT } from "./config"; -import { assertUnsub } from "./utils"; +import { assertError, assertIdle, assertUnsub } from "./utils"; describe("Subscription", function () { this.retries(3); @@ -163,4 +164,23 @@ describe("Subscription", function () { ); assert.deepStrictEqual(buf, [11]); }); + + it("stream source error", () => { + let err: any; + const src = stream( + () => { + throw "eek"; + }, + { + error(e) { + err = e; + return false; + }, + } + ); + const sub = src.subscribe({}); + assert.strictEqual(err, "eek"); + assertError(src); + assertIdle(sub); + }); });