Skip to content

Commit

Permalink
feat: Support overriding Duplex stream options as constructor options (
Browse files Browse the repository at this point in the history
  • Loading branch information
legobeat authored May 15, 2024
1 parent 6c9f640 commit e3e759b
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 21 deletions.
4 changes: 3 additions & 1 deletion src/BasePostMessageStream.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Duplex } from 'readable-stream';
import type { DuplexOptions } from 'readable-stream';
import { StreamData } from './utils';

const noop = () => undefined;
Expand All @@ -24,9 +25,10 @@ export abstract class BasePostMessageStream extends Duplex {

private _log: Log;

constructor() {
constructor(streamOptions?: DuplexOptions) {
super({
objectMode: true,
...streamOptions,
});

// Initialization flags
Expand Down
7 changes: 4 additions & 3 deletions src/WebWorker/WebWorkerParentPostMessageStream.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import type { DuplexOptions } from 'readable-stream';
import {
BasePostMessageStream,
PostMessageEvent,
} from '../BasePostMessageStream';
import { DEDICATED_WORKER_NAME, isValidStreamMessage } from '../utils';

interface WorkerParentStreamArgs {
interface WorkerParentStreamArgs extends DuplexOptions {
worker: Worker;
}

Expand All @@ -24,8 +25,8 @@ export class WebWorkerParentPostMessageStream extends BasePostMessageStream {
* @param args.worker - The Web Worker to exchange messages with. The worker
* must instantiate a `WebWorkerPostMessageStream`.
*/
constructor({ worker }: WorkerParentStreamArgs) {
super();
constructor({ worker, ...streamOptions }: WorkerParentStreamArgs) {
super(streamOptions);

this._target = DEDICATED_WORKER_NAME;
this._worker = worker;
Expand Down
5 changes: 3 additions & 2 deletions src/WebWorker/WebWorkerPostMessageStream.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// We ignore coverage for the entire file due to limits on our instrumentation,
// but it is in fact covered by our tests.
import type { DuplexOptions } from 'readable-stream';
import {
BasePostMessageStream,
PostMessageEvent,
Expand All @@ -17,7 +18,7 @@ import {
export class WebWorkerPostMessageStream extends BasePostMessageStream {
private _name: string;

constructor() {
constructor(streamOptions: DuplexOptions = {}) {
// Kudos: https://stackoverflow.com/a/18002694
if (
typeof self === 'undefined' ||
Expand All @@ -29,7 +30,7 @@ export class WebWorkerPostMessageStream extends BasePostMessageStream {
);
}

super();
super(streamOptions);

this._name = DEDICATED_WORKER_NAME;
self.addEventListener('message', this._onMessage.bind(this) as any);
Expand Down
5 changes: 3 additions & 2 deletions src/node-process/ProcessMessageStream.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import type { DuplexOptions } from 'readable-stream';
import { BasePostMessageStream } from '../BasePostMessageStream';
import { isValidStreamMessage, StreamData } from '../utils';

/**
* Child process-side Node.js `child_process` stream.
*/
export class ProcessMessageStream extends BasePostMessageStream {
constructor() {
super();
constructor(streamOptions: DuplexOptions = {}) {
super(streamOptions);

if (typeof globalThis.process.send !== 'function') {
throw new Error(
Expand Down
7 changes: 4 additions & 3 deletions src/node-process/ProcessParentMessageStream.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import type { ChildProcess } from 'child_process';
import type { DuplexOptions } from 'readable-stream';
import { BasePostMessageStream } from '../BasePostMessageStream';
import { isValidStreamMessage, StreamData } from '../utils';

interface ProcessParentMessageStreamArgs {
interface ProcessParentMessageStreamArgs extends DuplexOptions {
process: ChildProcess;
}

Expand All @@ -18,8 +19,8 @@ export class ProcessParentMessageStream extends BasePostMessageStream {
* @param args - Options bag.
* @param args.process - The process to communicate with.
*/
constructor({ process }: ProcessParentMessageStreamArgs) {
super();
constructor({ process, ...streamOptions }: ProcessParentMessageStreamArgs) {
super(streamOptions);

this._process = process;
this._onMessage = this._onMessage.bind(this);
Expand Down
5 changes: 3 additions & 2 deletions src/node-thread/ThreadMessageStream.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { parentPort } from 'worker_threads';
import type { DuplexOptions } from 'readable-stream';
import { BasePostMessageStream } from '../BasePostMessageStream';
import { isValidStreamMessage, StreamData } from '../utils';

Expand All @@ -8,8 +9,8 @@ import { isValidStreamMessage, StreamData } from '../utils';
export class ThreadMessageStream extends BasePostMessageStream {
#parentPort: Exclude<typeof parentPort, null>;

constructor() {
super();
constructor(streamOptions: DuplexOptions = {}) {
super(streamOptions);

if (!parentPort) {
throw new Error(
Expand Down
7 changes: 4 additions & 3 deletions src/node-thread/ThreadParentMessageStream.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { Worker } from 'worker_threads';
import type { DuplexOptions } from 'readable-stream';
import { BasePostMessageStream } from '../BasePostMessageStream';
import { isValidStreamMessage, StreamData } from '../utils';

interface ThreadParentMessageStreamArgs {
interface ThreadParentMessageStreamArgs extends DuplexOptions {
thread: Worker;
}

Expand All @@ -18,8 +19,8 @@ export class ThreadParentMessageStream extends BasePostMessageStream {
* @param args - Options bag.
* @param args.thread - The thread to communicate with.
*/
constructor({ thread }: ThreadParentMessageStreamArgs) {
super();
constructor({ thread, ...streamOptions }: ThreadParentMessageStreamArgs) {
super(streamOptions);

this._thread = thread;
this._onMessage = this._onMessage.bind(this);
Expand Down
11 changes: 8 additions & 3 deletions src/runtime/BrowserRuntimePostMessageStream.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import type { DuplexOptions } from 'readable-stream';
import {
BasePostMessageStream,
PostMessageEvent,
} from '../BasePostMessageStream';
import { isValidStreamMessage } from '../utils';

export interface BrowserRuntimePostMessageStreamArgs {
export interface BrowserRuntimePostMessageStreamArgs extends DuplexOptions {
name: string;
target: string;
}
Expand All @@ -26,8 +27,12 @@ export class BrowserRuntimePostMessageStream extends BasePostMessageStream {
* multiple streams sharing the same runtime.
* @param args.target - The name of the stream to exchange messages with.
*/
constructor({ name, target }: BrowserRuntimePostMessageStreamArgs) {
super();
constructor({
name,
target,
...streamOptions
}: BrowserRuntimePostMessageStreamArgs) {
super(streamOptions);

this.#name = name;
this.#target = target;
Expand Down
18 changes: 18 additions & 0 deletions src/window/WindowPostMessageStream.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,24 @@
import { WindowPostMessageStream } from './WindowPostMessageStream';

describe('WindowPostMessageStream', () => {
it('can override base stream options', () => {
const pms = new WindowPostMessageStream({
name: 'foo',
target: 'bar',
encoding: 'ucs2',
objectMode: false,
});
expect(pms._readableState.encoding).toBe('ucs2');
expect(pms._readableState.objectMode).toBe(false);
expect(pms._writableState.objectMode).toBe(false);
});

it('can be instantiated with default options', () => {
const pms = new WindowPostMessageStream({ name: 'foo', target: 'bar' });
expect(pms._readableState.objectMode).toBe(true);
expect(pms._writableState.objectMode).toBe(true);
});

it('throws if window.postMessage is not a function', () => {
const originalPostMessage = window.postMessage;
(window as any).postMessage = undefined;
Expand Down
6 changes: 4 additions & 2 deletions src/window/WindowPostMessageStream.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { assert } from '@metamask/utils';
import type { DuplexOptions } from 'readable-stream';
import {
BasePostMessageStream,
PostMessageEvent,
} from '../BasePostMessageStream';
import { isValidStreamMessage } from '../utils';

interface WindowPostMessageStreamArgs {
interface WindowPostMessageStreamArgs extends DuplexOptions {
name: string;
target: string;
targetOrigin?: string;
Expand Down Expand Up @@ -56,8 +57,9 @@ export class WindowPostMessageStream extends BasePostMessageStream {
target,
targetOrigin = location.origin,
targetWindow = window,
...streamOptions
}: WindowPostMessageStreamArgs) {
super();
super(streamOptions);

if (
typeof window === 'undefined' ||
Expand Down

0 comments on commit e3e759b

Please sign in to comment.