Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support overriding Duplex stream options as constructor options #131

Merged
merged 6 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/BasePostMessageStream.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Duplex } from 'readable-stream';
import type { DuplexOptions } from 'readable-stream';
import { StreamData } from './utils';

const noop = () => undefined;
Expand All @@ -24,9 +25,10 @@ export abstract class BasePostMessageStream extends Duplex {

private _log: Log;

constructor() {
constructor(streamOptions?: DuplexOptions) {
super({
objectMode: true,
...streamOptions,
});

// Initialization flags
Expand Down
7 changes: 4 additions & 3 deletions src/WebWorker/WebWorkerParentPostMessageStream.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import type { DuplexOptions } from 'readable-stream';
import {
BasePostMessageStream,
PostMessageEvent,
} from '../BasePostMessageStream';
import { DEDICATED_WORKER_NAME, isValidStreamMessage } from '../utils';

interface WorkerParentStreamArgs {
interface WorkerParentStreamArgs extends DuplexOptions {
worker: Worker;
}

Expand All @@ -24,8 +25,8 @@ export class WebWorkerParentPostMessageStream extends BasePostMessageStream {
* @param args.worker - The Web Worker to exchange messages with. The worker
* must instantiate a `WebWorkerPostMessageStream`.
*/
constructor({ worker }: WorkerParentStreamArgs) {
super();
constructor({ worker, ...streamOptions }: WorkerParentStreamArgs) {
super(streamOptions);

this._target = DEDICATED_WORKER_NAME;
this._worker = worker;
Expand Down
5 changes: 3 additions & 2 deletions src/WebWorker/WebWorkerPostMessageStream.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// We ignore coverage for the entire file due to limits on our instrumentation,
// but it is in fact covered by our tests.
import type { DuplexOptions } from 'readable-stream';
import {
BasePostMessageStream,
PostMessageEvent,
Expand All @@ -17,7 +18,7 @@ import {
export class WebWorkerPostMessageStream extends BasePostMessageStream {
private _name: string;

constructor() {
constructor(streamOptions: DuplexOptions = {}) {
// Kudos: https://stackoverflow.com/a/18002694
if (
typeof self === 'undefined' ||
Expand All @@ -29,7 +30,7 @@ export class WebWorkerPostMessageStream extends BasePostMessageStream {
);
}

super();
super(streamOptions);

this._name = DEDICATED_WORKER_NAME;
self.addEventListener('message', this._onMessage.bind(this) as any);
Expand Down
5 changes: 3 additions & 2 deletions src/node-process/ProcessMessageStream.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import type { DuplexOptions } from 'readable-stream';
import { BasePostMessageStream } from '../BasePostMessageStream';
import { isValidStreamMessage, StreamData } from '../utils';

/**
* Child process-side Node.js `child_process` stream.
*/
export class ProcessMessageStream extends BasePostMessageStream {
constructor() {
super();
constructor(streamOptions: DuplexOptions = {}) {
super(streamOptions);

if (typeof globalThis.process.send !== 'function') {
throw new Error(
Expand Down
7 changes: 4 additions & 3 deletions src/node-process/ProcessParentMessageStream.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import type { ChildProcess } from 'child_process';
import type { DuplexOptions } from 'readable-stream';
import { BasePostMessageStream } from '../BasePostMessageStream';
import { isValidStreamMessage, StreamData } from '../utils';

interface ProcessParentMessageStreamArgs {
interface ProcessParentMessageStreamArgs extends DuplexOptions {
process: ChildProcess;
}

Expand All @@ -18,8 +19,8 @@ export class ProcessParentMessageStream extends BasePostMessageStream {
* @param args - Options bag.
* @param args.process - The process to communicate with.
*/
constructor({ process }: ProcessParentMessageStreamArgs) {
super();
constructor({ process, ...streamOptions }: ProcessParentMessageStreamArgs) {
super(streamOptions);

this._process = process;
this._onMessage = this._onMessage.bind(this);
Expand Down
5 changes: 3 additions & 2 deletions src/node-thread/ThreadMessageStream.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { parentPort } from 'worker_threads';
import type { DuplexOptions } from 'readable-stream';
import { BasePostMessageStream } from '../BasePostMessageStream';
import { isValidStreamMessage, StreamData } from '../utils';

Expand All @@ -8,8 +9,8 @@ import { isValidStreamMessage, StreamData } from '../utils';
export class ThreadMessageStream extends BasePostMessageStream {
#parentPort: Exclude<typeof parentPort, null>;

constructor() {
super();
constructor(streamOptions: DuplexOptions = {}) {
super(streamOptions);

if (!parentPort) {
throw new Error(
Expand Down
7 changes: 4 additions & 3 deletions src/node-thread/ThreadParentMessageStream.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { Worker } from 'worker_threads';
import type { DuplexOptions } from 'readable-stream';
import { BasePostMessageStream } from '../BasePostMessageStream';
import { isValidStreamMessage, StreamData } from '../utils';

interface ThreadParentMessageStreamArgs {
interface ThreadParentMessageStreamArgs extends DuplexOptions {
thread: Worker;
}

Expand All @@ -18,8 +19,8 @@ export class ThreadParentMessageStream extends BasePostMessageStream {
* @param args - Options bag.
* @param args.thread - The thread to communicate with.
*/
constructor({ thread }: ThreadParentMessageStreamArgs) {
super();
constructor({ thread, ...streamOptions }: ThreadParentMessageStreamArgs) {
super(streamOptions);

this._thread = thread;
this._onMessage = this._onMessage.bind(this);
Expand Down
11 changes: 8 additions & 3 deletions src/runtime/BrowserRuntimePostMessageStream.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import type { DuplexOptions } from 'readable-stream';
import {
BasePostMessageStream,
PostMessageEvent,
} from '../BasePostMessageStream';
import { isValidStreamMessage } from '../utils';

export interface BrowserRuntimePostMessageStreamArgs {
export interface BrowserRuntimePostMessageStreamArgs extends DuplexOptions {
name: string;
target: string;
}
Expand All @@ -26,8 +27,12 @@ export class BrowserRuntimePostMessageStream extends BasePostMessageStream {
* multiple streams sharing the same runtime.
* @param args.target - The name of the stream to exchange messages with.
*/
constructor({ name, target }: BrowserRuntimePostMessageStreamArgs) {
super();
constructor({
name,
target,
...streamOptions
}: BrowserRuntimePostMessageStreamArgs) {
super(streamOptions);

this.#name = name;
this.#target = target;
Expand Down
18 changes: 18 additions & 0 deletions src/window/WindowPostMessageStream.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,24 @@
import { WindowPostMessageStream } from './WindowPostMessageStream';

describe('WindowPostMessageStream', () => {
it('can override base stream options', () => {
const pms = new WindowPostMessageStream({
name: 'foo',
target: 'bar',
encoding: 'ucs2',
objectMode: false,
});
expect(pms._readableState.encoding).toBe('ucs2');
expect(pms._readableState.objectMode).toBe(false);
expect(pms._writableState.objectMode).toBe(false);
});

it('can be instantiated with default options', () => {
const pms = new WindowPostMessageStream({ name: 'foo', target: 'bar' });
expect(pms._readableState.objectMode).toBe(true);
expect(pms._writableState.objectMode).toBe(true);
});

it('throws if window.postMessage is not a function', () => {
const originalPostMessage = window.postMessage;
(window as any).postMessage = undefined;
Expand Down
6 changes: 4 additions & 2 deletions src/window/WindowPostMessageStream.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { assert } from '@metamask/utils';
import type { DuplexOptions } from 'readable-stream';
import {
BasePostMessageStream,
PostMessageEvent,
} from '../BasePostMessageStream';
import { isValidStreamMessage } from '../utils';

interface WindowPostMessageStreamArgs {
interface WindowPostMessageStreamArgs extends DuplexOptions {
name: string;
target: string;
targetOrigin?: string;
Expand Down Expand Up @@ -56,8 +57,9 @@ export class WindowPostMessageStream extends BasePostMessageStream {
target,
targetOrigin = location.origin,
targetWindow = window,
...streamOptions
}: WindowPostMessageStreamArgs) {
super();
super(streamOptions);

if (
typeof window === 'undefined' ||
Expand Down
Loading