diff --git a/packages/rstream/src/stream-merge.ts b/packages/rstream/src/stream-merge.ts index c751808ca9..01097618e1 100644 --- a/packages/rstream/src/stream-merge.ts +++ b/packages/rstream/src/stream-merge.ts @@ -1,5 +1,6 @@ -import { CloseMode, ISubscribable, State, TransformableOpts } from "./api"; +import { ISubscribable, State, TransformableOpts } from "./api"; import { Subscription } from "./subscription"; +import { isFirstOrLastInput } from "./utils/checks"; import { optsWithID } from "./utils/idgen"; export interface StreamMergeOpts extends TransformableOpts { @@ -145,11 +146,6 @@ export class StreamMerge extends Subscription { protected markDone(src: ISubscribable) { this.remove(src); - if ( - this.closeIn === CloseMode.FIRST || - (this.closeIn === CloseMode.LAST && !this.sources.size) - ) { - this.done(); - } + isFirstOrLastInput(this.closeIn, this.sources.size) && this.done(); } } diff --git a/packages/rstream/src/stream-sync.ts b/packages/rstream/src/stream-sync.ts index d2fdfc17fb..d965abebd3 100644 --- a/packages/rstream/src/stream-sync.ts +++ b/packages/rstream/src/stream-sync.ts @@ -6,14 +6,9 @@ import { partitionSync, PartitionSync, } from "@thi.ng/transducers"; -import { - CloseMode, - ISubscribable, - LOGGER, - State, - TransformableOpts, -} from "./api"; +import { ISubscribable, LOGGER, State, TransformableOpts } from "./api"; import { Subscription } from "./subscription"; +import { isFirstOrLastInput } from "./utils/checks"; import { optsWithID } from "./utils/idgen"; export type SyncTuple>> = { @@ -283,11 +278,6 @@ export class StreamSync< protected markDone(src: ISubscribable) { this.remove(src); - if ( - this.closeIn === CloseMode.FIRST || - (this.closeIn === CloseMode.LAST && !this.sources.size) - ) { - this.done(); - } + isFirstOrLastInput(this.closeIn, this.sources.size) && this.done(); } } diff --git a/packages/rstream/src/utils/checks.ts b/packages/rstream/src/utils/checks.ts new file mode 100644 index 0000000000..77ba0b27ec --- /dev/null +++ b/packages/rstream/src/utils/checks.ts @@ -0,0 +1,9 @@ +import { CloseMode } from "../api"; + +/** + * Returns true if mode is FIRST, or if mode is LAST *and* `num = 0`. + * + * @internal + */ +export const isFirstOrLastInput = (mode: CloseMode, num: number) => + mode === CloseMode.FIRST || (mode === CloseMode.LAST && !num);