Skip to content

Commit

Permalink
feat(rstream): add StreamSource error handling
Browse files Browse the repository at this point in the history
- update stream() opts arg type
- update Stream.subscribe() to use opt error handler to deal w/ errors
  during execution of stream source function
- add test
  • Loading branch information
postspectacular committed Mar 11, 2021
1 parent a8a8c44 commit 73023b6
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 6 deletions.
24 changes: 19 additions & 5 deletions packages/rstream/src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,15 @@ import { optsWithID } from "./utils/idgen";
* @param src -
* @param opts -
*/
export function stream<T>(opts?: Partial<CommonOpts>): Stream<T>;
// prettier-ignore
export function stream<T>(src: StreamSource<T>, opts?: Partial<CommonOpts>): Stream<T>;
export function stream<T>(src?: any, opts?: Partial<CommonOpts>): Stream<T> {
export function stream<T>(opts?: Partial<WithErrorHandlerOpts>): Stream<T>;
export function stream<T>(
src: StreamSource<T>,
opts?: Partial<WithErrorHandlerOpts>
): Stream<T>;
export function stream<T>(
src?: any,
opts?: Partial<WithErrorHandlerOpts>
): Stream<T> {
return new Stream<T>(src, opts);
}

Expand Down Expand Up @@ -131,7 +136,16 @@ export class Stream<T> extends Subscription<T, T> implements IStream<T> {
): any {
const $sub = super.subscribe(sub, opts);
if (!this._inited) {
this._cancel = (this.src && this.src(this)) || (() => void 0);
if (this.src) {
try {
this._cancel = this.src(this) || (() => void 0);
} catch (e) {
let s = this.wrapped;
if (!s || !s.error || !s.error(e)) {
this.unhandledError(e);
}
}
}
this._inited = true;
}
return $sub;
Expand Down
22 changes: 21 additions & 1 deletion packages/rstream/test/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import {
fromIterable,
fromIterableSync,
State,
stream,
Stream,
subscription,
} from "../src";
import { TIMEOUT } from "./config";
import { assertUnsub } from "./utils";
import { assertError, assertIdle, assertUnsub } from "./utils";

describe("Subscription", function () {
this.retries(3);
Expand Down Expand Up @@ -163,4 +164,23 @@ describe("Subscription", function () {
);
assert.deepStrictEqual(buf, [11]);
});

it("stream source error", () => {
let err: any;
const src = stream(
() => {
throw "eek";
},
{
error(e) {
err = e;
return false;
},
}
);
const sub = src.subscribe({});
assert.strictEqual(err, "eek");
assertError(src);
assertIdle(sub);
});
});

0 comments on commit 73023b6

Please sign in to comment.