Skip to content

Commit

Permalink
feat(rstream): update DONE state & teardown logic
Browse files Browse the repository at this point in the history
- DONE state now only valid during depth-first stage of .done()
- state switches to UNSUBSCRIBED during recursive teardown (unless ERROR)
- update tests
  • Loading branch information
postspectacular committed Mar 11, 2021
1 parent ae4866a commit a8a8c44
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 22 deletions.
4 changes: 2 additions & 2 deletions packages/rstream/src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ export class Subscription<A, B> implements ISubscription<A, B> {
protected unsubscribeSelf() {
LOGGER.debug(this.id, "unsub self");
this.parent && this.parent.unsubscribe(this);
this.state < State.DONE && (this.state = State.UNSUBSCRIBED);
this.state < State.UNSUBSCRIBED && (this.state = State.UNSUBSCRIBED);
this.release();
return true;
}
Expand Down Expand Up @@ -263,7 +263,7 @@ export class Subscription<A, B> implements ISubscription<A, B> {
// attempt to call .done in wrapped sub
if (this.dispatchTo("done")) {
// disconnect from parent & internal cleanup
this.unsubscribe();
this.state < State.UNSUBSCRIBED && this.unsubscribe();
}
LOGGER.debug(this.id, "exiting done()");
}
Expand Down
12 changes: 6 additions & 6 deletions packages/rstream/test/metastream.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as assert from "assert";
import { CloseMode, fromIterable, metaStream } from "../src";
import { TIMEOUT } from "./config";
import { assertActive, assertDone, assertIdle } from "./utils";
import { assertActive, assertIdle, assertUnsub } from "./utils";

describe("MetaStream", function () {
this.retries(3);
Expand All @@ -20,9 +20,9 @@ describe("MetaStream", function () {
});
setTimeout(() => {
assert.deepStrictEqual(acc, [10, 20, 30, 20, 40, 60, 30, 60, 90]);
assertDone(meta);
assertDone(sub);
assertDone(sub2);
assertUnsub(meta);
assertUnsub(sub);
assertUnsub(sub2);
done();
}, 5 * TIMEOUT);
});
Expand All @@ -35,7 +35,7 @@ describe("MetaStream", function () {
const sub = src.subscribe(meta);
const child = sub.subscribe({});
setTimeout(() => {
assertDone(src);
assertUnsub(src);
assertActive(meta);
assertActive(sub);
assertIdle(child);
Expand All @@ -59,7 +59,7 @@ describe("MetaStream", function () {
});
setTimeout(() => {
child.unsubscribe();
assertDone(src);
assertUnsub(src);
assertActive(meta);
meta.subscribe({
next(x) {
Expand Down
10 changes: 5 additions & 5 deletions packages/rstream/test/object.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as assert from "assert";
import { fromObject, stream, Subscription } from "../src";
import { assertDone } from "./utils";
import { assertUnsub } from "./utils";

type Foo = { a?: number; b: string };

Expand Down Expand Up @@ -36,8 +36,8 @@ describe("fromObject", () => {
a: [1, 2, undefined],
b: ["foo", "bar", "baz"],
});
assertDone(obj.streams.a);
assertDone(obj.streams.b);
assertUnsub(obj.streams.a);
assertUnsub(obj.streams.b);
});

it("subscriber", () => {
Expand All @@ -64,8 +64,8 @@ describe("fromObject", () => {
a: [1, undefined],
b: ["foo", "bar"],
});
assertDone(obj.streams.a);
assertDone(obj.streams.b);
assertUnsub(obj.streams.a);
assertUnsub(obj.streams.b);
});

it("defaults & dedupe", () => {
Expand Down
14 changes: 7 additions & 7 deletions packages/rstream/test/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { map, mapIndexed } from "@thi.ng/transducers";
import * as assert from "assert";
import { fromIterable, fromIterableSync, PubSub, pubsub } from "../src";
import { TIMEOUT } from "./config";
import { assertDone } from "./utils";
import { assertUnsub } from "./utils";

describe("PubSub", function () {
this.retries(3);
Expand All @@ -18,9 +18,9 @@ describe("PubSub", function () {
const b = pub.subscribeTopic("b", collect);
fromIterableSync("abcbd").subscribe(pub);
assert.deepStrictEqual(acc, { a: ["a"], b: ["b", "b"] });
assertDone(pub);
assertDone(a);
assertDone(b);
assertUnsub(pub);
assertUnsub(a);
assertUnsub(b);
});

it("complex keys", () => {
Expand Down Expand Up @@ -56,7 +56,7 @@ describe("PubSub", function () {
[["b", 2], [["b", 2]]],
]
);
assertDone(pub);
assertUnsub(pub);
});

it("transducer", () => {
Expand Down Expand Up @@ -85,7 +85,7 @@ describe("PubSub", function () {
c: [["c", 20]],
d: [["d", 44]],
});
assertDone(pub);
assertUnsub(pub);
});

it("unsubTopic", function (done) {
Expand All @@ -105,7 +105,7 @@ describe("PubSub", function () {
}, TIMEOUT * 2.5);
setTimeout(() => {
assert.deepStrictEqual(acc, { a: ["a"], b: ["b"] });
assertDone(pub);
assertUnsub(pub);
done();
}, TIMEOUT * 7.5);
});
Expand Down
5 changes: 3 additions & 2 deletions packages/rstream/test/stream-merge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
State,
StreamMerge,
} from "../src";
import { assertActive, assertUnsub } from "./utils";

describe("StreamMerge", () => {
let src: StreamMerge<number, number>;
Expand Down Expand Up @@ -62,9 +63,9 @@ describe("StreamMerge", () => {
let sub1 = src.subscribe({});
let sub2 = src.subscribe({});
sub1.unsubscribe();
assert(src.getState() === State.ACTIVE);
assertActive(src);
sub2.unsubscribe();
assert(src.getState() === State.DONE);
assertUnsub(src);
});

it("applies transducer", (done) => {
Expand Down
18 changes: 18 additions & 0 deletions packages/rstream/test/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
subscription,
} from "../src";
import { TIMEOUT } from "./config";
import { assertUnsub } from "./utils";

describe("Subscription", function () {
this.retries(3);
Expand Down Expand Up @@ -90,6 +91,23 @@ describe("Subscription", function () {
}, TIMEOUT * 4);
});

it("done state", (done) => {
this.timeout(TIMEOUT * 3);
let state = State.IDLE;
src = fromIterable([1]);
const sub = src.subscribe({
done() {
state = sub.getState();
},
});
setTimeout(() => {
assert.strictEqual(state, State.DONE);
assertUnsub(sub);
assertUnsub(src);
done();
}, TIMEOUT * 2);
});

it("completing transducer sends all values", (done) => {
let buf: any[] = [];
src = fromIterable([1, 2, 3], { delay: 10 });
Expand Down

0 comments on commit a8a8c44

Please sign in to comment.