Skip to content
This repository has been archived by the owner on Nov 9, 2023. It is now read-only.

Commit

Permalink
Add duplex engine stream
Browse files Browse the repository at this point in the history
  • Loading branch information
rekmarks committed Jul 26, 2022
1 parent 8c64407 commit 10763e4
Show file tree
Hide file tree
Showing 9 changed files with 368 additions and 78 deletions.
8 changes: 4 additions & 4 deletions jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ module.exports = {
// An object that configures minimum threshold enforcement for coverage results
coverageThreshold: {
global: {
branches: 69.23,
functions: 88.88,
lines: 93.75,
statements: 93.75,
branches: 81.81,
functions: 89.47,
lines: 96.64,
statements: 96.64,
},
},

Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"test:watch": "jest --watch"
},
"dependencies": {
"@metamask/safe-event-emitter": "^2.0.0",
"@metamask/utils": "^2.1.0",
"readable-stream": "^2.3.3"
},
"devDependencies": {
Expand All @@ -48,7 +48,7 @@
"eslint-plugin-prettier": "^3.3.1",
"jest": "^27.5.1",
"jest-it-up": "^2.0.2",
"json-rpc-engine": "^6.1.0",
"json-rpc-engine": "./json-rpc-engine-6.1.2.tgz",
"prettier": "^2.2.1",
"prettier-plugin-packagejson": "^2.2.17",
"rimraf": "^3.0.2",
Expand Down
186 changes: 186 additions & 0 deletions src/createDuplexJsonRpcStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import {
hasProperty,
isJsonRpcRequest,
isObject,
JsonRpcId,
JsonRpcNotification,
JsonRpcRequest,
JsonRpcResponse,
RuntimeObject,
} from '@metamask/utils';
import { Duplex } from 'readable-stream';
import {
DuplexJsonRpcEngine,
JsonRpcMiddleware,
JsonRpcNotificationHandler,
} from 'json-rpc-engine';
import { ErrorMessages, IdMapValue } from './utils';

type StreamCallback = (error?: Error | null) => void;

interface DuplexJsonRpcStreamOptions {
receiverMiddleware?: JsonRpcMiddleware<unknown, unknown>[];
receiverNotificationHandler?: JsonRpcNotificationHandler<unknown>;
senderMiddleware?: JsonRpcMiddleware<unknown, unknown>[];
senderNotificationHandler?: JsonRpcNotificationHandler<unknown>;
}

/**
* Foobar, bar baz.
*
* @param options - Options bag.
* @returns The stream wrapping the duplex JSON-RPC engine.
*/
export default function createDuplexJsonRpcStream(
options: DuplexJsonRpcStreamOptions,
) {
const {
receiverMiddleware = [],
receiverNotificationHandler = () => undefined,
senderMiddleware = [],
senderNotificationHandler,
} = options;

const outgoingIdMap: Map<JsonRpcId, IdMapValue> = new Map();
const stream = new Duplex({
objectMode: true,
read: () => undefined,
write: processMessage,
});

const sendNotification = (notification: JsonRpcNotification<unknown>) => {
stream.push(notification);
return undefined;
};

const _senderNotificationHandler = senderNotificationHandler
? async (notification: JsonRpcNotification<unknown>) => {
await senderNotificationHandler(notification);
return sendNotification(notification);
}
: sendNotification;

const engine = new DuplexJsonRpcEngine({
receiverNotificationHandler,
senderNotificationHandler: _senderNotificationHandler,
});

receiverMiddleware.forEach((middleware) =>
engine.addReceiverMiddleware(middleware),
);

senderMiddleware.forEach((middleware) =>
engine.addSenderMiddleware(middleware),
);

engine.addSenderMiddleware((req, res, _next, end) => {
// write req to stream
stream.push(req);
// register request on id map if
if (isJsonRpcRequest(req)) {
outgoingIdMap.set(req.id, { res, end });
}
});

return { duplexEngine: engine, duplexEngineStream: stream };

/**
* Writes a JSON-RPC object to the stream.
*
* @param message - The message to write to the stream.
* @param _encoding - The stream encoding, not used.
* @param cb - The stream write callback.
* @returns Nothing.
*/
function processMessage(
message: unknown,
_encoding: unknown,
cb: StreamCallback,
): void {
let err: Error | null = null;
try {
if (!isObject(message)) {
throw new Error('not an object');
} else if (isResponse(message)) {
receiveResponse(message);
} else if (isRequest(message)) {
return receiveRequest(message, cb);
} else {
throw new Error('neither a response nor request');
}
} catch (_err) {
err = _err as Error;
}

// continue processing stream
return cb(err);
}

/**
* Forwards a JSON-RPC request or notification to the receiving pipeline.
* Pushes any response from the pipeline to the stream.
*
* @param req - The request or notification to receive.
* @param cb - The stream write callback.
*/
function receiveRequest(
req: JsonRpcRequest<unknown> | JsonRpcNotification<unknown>,
cb: StreamCallback,
) {
// TypeScript defaults to the notification overload and we don't get a
// response unless we cast.
engine
.receive(req as JsonRpcRequest<unknown>)
.then((response) => {
if (response) {
stream.push(response);
}
cb();
})
.catch((error) => cb(error));
}

/**
* Receives a response to a request sent via the sending pipeline.
*
* @param res - The response to receive.
*/
function receiveResponse(res: JsonRpcResponse<unknown>) {
const context = outgoingIdMap.get(res.id);
if (!context) {
throw new Error(ErrorMessages.unknownResponse(res.id));
}

// Copy response received from the stream unto original response object,
// which will be returned by the engine on this side.
Object.assign(context.res, res);

outgoingIdMap.delete(res.id);
// Prevent internal stream handler from catching errors from this callback.
setTimeout(context.end);
}
}

/**
* A type guard for {@link JsonRpcResponse}.
*
* @param message - The object to type check.
* @returns The type check result.
*/
function isResponse(
message: RuntimeObject,
): message is JsonRpcResponse<unknown> {
return hasProperty(message, 'result') || hasProperty(message, 'error');
}

/**
* A type guard for {@link JsonRpcRequest} or {@link JsonRpcNotification}.
*
* @param message - The object to type check.
* @returns The type check result.
*/
function isRequest(
message: RuntimeObject,
): message is JsonRpcRequest<unknown> | JsonRpcNotification<unknown> {
return hasProperty(message, 'method');
}
13 changes: 2 additions & 11 deletions src/createEngineStream.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { JsonRpcRequest } from '@metamask/utils';
import { Duplex } from 'readable-stream';
import { JsonRpcEngine, JsonRpcRequest } from 'json-rpc-engine';
import { JsonRpcEngine } from 'json-rpc-engine';

interface EngineStreamOptions {
engine: JsonRpcEngine;
Expand All @@ -13,18 +14,8 @@ interface EngineStreamOptions {
* @returns The stream wrapping the engine.
*/
export default function createEngineStream(opts: EngineStreamOptions): Duplex {
if (!opts || !opts.engine) {
throw new Error('Missing engine parameter!');
}

const { engine } = opts;
const stream = new Duplex({ objectMode: true, read: () => undefined, write });
// forward notifications
if (engine.on) {
engine.on('notification', (message) => {
stream.push(message);
});
}
return stream;

/**
Expand Down
69 changes: 20 additions & 49 deletions src/createStreamMiddleware.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,7 @@
import SafeEventEmitter from '@metamask/safe-event-emitter';
import { Duplex } from 'readable-stream';
import {
JsonRpcEngineNextCallback,
JsonRpcEngineEndCallback,
JsonRpcNotification,
JsonRpcMiddleware,
JsonRpcRequest,
PendingJsonRpcResponse,
} from 'json-rpc-engine';

interface IdMapValue {
req: JsonRpcRequest<unknown>;
res: PendingJsonRpcResponse<unknown>;
next: JsonRpcEngineNextCallback;
end: JsonRpcEngineEndCallback;
}

interface IdMap {
[requestId: string]: IdMapValue;
}
import { JsonRpcMiddleware } from 'json-rpc-engine';
import { isJsonRpcRequest, JsonRpcId, JsonRpcResponse } from '@metamask/utils';
import { ErrorMessages, IdMapValue } from './utils';

/**
* Creates a JsonRpcEngine middleware with an associated Duplex stream and
Expand All @@ -29,28 +12,28 @@ interface IdMap {
* @returns The event emitter, middleware, and stream.
*/
export default function createStreamMiddleware() {
const idMap: IdMap = {};
const idMap: Map<JsonRpcId, IdMapValue> = new Map();
const stream = new Duplex({
objectMode: true,
read: () => undefined,
write: processMessage,
});

const events = new SafeEventEmitter();

const middleware: JsonRpcMiddleware<unknown, unknown> = (
req,
res,
next,
_next,
end,
) => {
// write req to stream
stream.push(req);
// register request on id map
idMap[req.id as unknown as string] = { req, res, next, end };
if (isJsonRpcRequest(req)) {
idMap.set(req.id, { res, end });
}
};

return { events, middleware, stream };
return { middleware, stream };

/**
* Writes a JSON-RPC object to the stream.
Expand All @@ -60,21 +43,17 @@ export default function createStreamMiddleware() {
* @param cb - The stream write callback.
*/
function processMessage(
res: PendingJsonRpcResponse<unknown>,
res: JsonRpcResponse<unknown>,
_encoding: unknown,
cb: (error?: Error | null) => void,
) {
let err: Error | null = null;
try {
const isNotification = !res.id;
if (isNotification) {
processNotification(res as unknown as JsonRpcNotification<unknown>);
} else {
processResponse(res);
}
processResponse(res);
} catch (_err) {
err = _err as Error;
}

// continue processing stream
cb(err);
}
Expand All @@ -84,26 +63,18 @@ export default function createStreamMiddleware() {
*
* @param res - The response to process.
*/
function processResponse(res: PendingJsonRpcResponse<unknown>) {
const context = idMap[res.id as unknown as string];
function processResponse(res: JsonRpcResponse<unknown>) {
const context = idMap.get(res.id);
if (!context) {
throw new Error(`StreamMiddleware - Unknown response id "${res.id}"`);
throw new Error(ErrorMessages.unknownResponse(res.id));
}

delete idMap[res.id as unknown as string];
// copy whole res onto original res
// Copy response received from the stream unto original response object,
// which will be returned by the engine on this side.
Object.assign(context.res, res);
// run callback on empty stack,
// prevent internal stream-handler from catching errors
setTimeout(context.end);
}

/**
* Processes a JSON-RPC notification.
*
* @param notif - The notification to process.
*/
function processNotification(notif: JsonRpcNotification<unknown>) {
events.emit('notification', notif);
idMap.delete(res.id);
// Prevent internal stream handler from catching errors from this callback.
setTimeout(context.end);
}
}
Loading

0 comments on commit 10763e4

Please sign in to comment.