Skip to content

Commit

Permalink
refactor(rstream): add isFirstOrLastInput(), update StreamMerge/Sync
Browse files Browse the repository at this point in the history
  • Loading branch information
postspectacular committed Sep 18, 2020
1 parent b777cf1 commit ebab5a0
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 20 deletions.
10 changes: 3 additions & 7 deletions packages/rstream/src/stream-merge.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { CloseMode, ISubscribable, State, TransformableOpts } from "./api";
import { ISubscribable, State, TransformableOpts } from "./api";
import { Subscription } from "./subscription";
import { isFirstOrLastInput } from "./utils/checks";
import { optsWithID } from "./utils/idgen";

export interface StreamMergeOpts<A, B> extends TransformableOpts<A, B> {
Expand Down Expand Up @@ -145,11 +146,6 @@ export class StreamMerge<A, B> extends Subscription<A, B> {

protected markDone(src: ISubscribable<A>) {
this.remove(src);
if (
this.closeIn === CloseMode.FIRST ||
(this.closeIn === CloseMode.LAST && !this.sources.size)
) {
this.done();
}
isFirstOrLastInput(this.closeIn, this.sources.size) && this.done();
}
}
16 changes: 3 additions & 13 deletions packages/rstream/src/stream-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,9 @@ import {
partitionSync,
PartitionSync,
} from "@thi.ng/transducers";
import {
CloseMode,
ISubscribable,
LOGGER,
State,
TransformableOpts,
} from "./api";
import { ISubscribable, LOGGER, State, TransformableOpts } from "./api";
import { Subscription } from "./subscription";
import { isFirstOrLastInput } from "./utils/checks";
import { optsWithID } from "./utils/idgen";

export type SyncTuple<T extends IObjectOf<ISubscribable<any>>> = {
Expand Down Expand Up @@ -283,11 +278,6 @@ export class StreamSync<

protected markDone(src: ISubscribable<any>) {
this.remove(src);
if (
this.closeIn === CloseMode.FIRST ||
(this.closeIn === CloseMode.LAST && !this.sources.size)
) {
this.done();
}
isFirstOrLastInput(this.closeIn, this.sources.size) && this.done();
}
}
9 changes: 9 additions & 0 deletions packages/rstream/src/utils/checks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { CloseMode } from "../api";

/**
* Returns true if mode is FIRST, or if mode is LAST *and* `num = 0`.
*
* @internal
*/
export const isFirstOrLastInput = (mode: CloseMode, num: number) =>
mode === CloseMode.FIRST || (mode === CloseMode.LAST && !num);

0 comments on commit ebab5a0

Please sign in to comment.