From 2d9e907e9c5a7ef9b0944473561c7c6d52f82841 Mon Sep 17 00:00:00 2001 From: Karsten Schmidt Date: Sun, 3 May 2020 18:20:29 +0100 Subject: [PATCH] fix(rstream): MetaStream close mode handling - never go into DONE state if `closeIn == CloseMode.NEVER` - fix/update unsubscribe() & pass arg - update detach() to consider `closeOut` mode - add tests --- packages/rstream/src/metastream.ts | 24 +++++----- packages/rstream/test/metastream.ts | 74 +++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 11 deletions(-) create mode 100644 packages/rstream/test/metastream.ts diff --git a/packages/rstream/src/metastream.ts b/packages/rstream/src/metastream.ts index d9feb4b229..e4e1bf0e18 100644 --- a/packages/rstream/src/metastream.ts +++ b/packages/rstream/src/metastream.ts @@ -1,5 +1,5 @@ import { assert, Fn } from "@thi.ng/api"; -import { CommonOpts, State } from "./api"; +import { CloseMode, CommonOpts, State } from "./api"; import { Subscription } from "./subscription"; import { optsWithID } from "./utils/idgen"; @@ -123,7 +123,7 @@ export class MetaStream extends Subscription { } }, error: (e) => super.error(e), - __owner: this + __owner: this, }); } } @@ -131,22 +131,24 @@ export class MetaStream extends Subscription { done() { if (this.stream) { - this.detach(); + this.detach(true); } - super.done(); + this.closeIn !== CloseMode.NEVER && super.done(); } unsubscribe(sub?: Subscription) { if (this.stream && (!sub || this.subs.length === 1)) { - this.detach(); + this.detach(!sub); } - return super.unsubscribe(); + return super.unsubscribe(sub); } - protected detach() { - assert(!!this.stream, "input stream already removed"); - this.stream!.unsubscribe(this.sub); - delete this.stream; - delete this.sub; + protected detach(force: boolean) { + if (force || this.closeOut !== CloseMode.NEVER) { + assert(!!this.stream, "input stream already removed"); + this.stream!.unsubscribe(this.sub); + delete this.stream; + delete this.sub; + } } } diff --git a/packages/rstream/test/metastream.ts b/packages/rstream/test/metastream.ts new file mode 100644 index 0000000000..ce2f16648c --- /dev/null +++ b/packages/rstream/test/metastream.ts @@ -0,0 +1,74 @@ +import * as assert from "assert"; +import { CloseMode, fromIterable, metaStream, State } from "../src/index"; +import { TIMEOUT } from "./config"; + +describe("MetaStream", () => { + it("basic", (done) => { + const src = fromIterable([1, 2, 3], { delay: TIMEOUT }); + const meta = metaStream((x) => + fromIterable([x * 10, x * 20, x * 30], { delay: TIMEOUT >> 2 }) + ); + const sub = src.subscribe(meta); + const acc: number[] = []; + const sub2 = sub.subscribe({ + next(x) { + acc.push(x); + }, + }); + setTimeout(() => { + assert.deepEqual(acc, [10, 20, 30, 20, 40, 60, 30, 60, 90]); + assert.equal(meta.getState(), State.DONE); + assert.equal(sub.getState(), State.DONE); + assert.equal(sub2.getState(), State.DONE); + done(); + }, 5 * TIMEOUT); + }); + + it("closein", (done) => { + const src = fromIterable([1], { delay: TIMEOUT }); + const meta = metaStream((x) => fromIterable([x]), { + closeIn: CloseMode.NEVER, + }); + const sub = src.subscribe(meta); + const child = sub.subscribe({ + next(x) { + console.log(x); + }, + }); + setTimeout(() => { + assert.equal(src.getState(), State.DONE); + assert.equal(meta.getState(), State.ACTIVE); + assert.equal(sub.getState(), State.ACTIVE); + assert.equal(child.getState(), State.IDLE); + done(); + }, 3 * TIMEOUT); + }); + + it("closeout", (done) => { + const src = fromIterable([1], { delay: TIMEOUT }); + const meta = src.subscribe( + metaStream((x) => fromIterable([x * 10]), { + closeIn: CloseMode.NEVER, + closeOut: CloseMode.NEVER, + }) + ); + const acc: number[] = []; + const child = meta.subscribe({ + next(x) { + acc.push(x); + }, + }); + setTimeout(() => { + child.unsubscribe(); + assert.equal(src.getState(), State.DONE); + assert.equal(meta.getState(), State.ACTIVE); + meta.subscribe({ + next(x) { + acc.push(x); + }, + }); + assert.deepEqual(acc, [10, 10]); + done(); + }, 3 * TIMEOUT); + }); +});