Skip to content
This repository has been archived by the owner on Nov 9, 2023. It is now read-only.

Add DuplexJsonRpcEngine stream #24

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ module.exports = {
// An object that configures minimum threshold enforcement for coverage results
coverageThreshold: {
global: {
branches: 69.23,
functions: 88.88,
lines: 93.75,
statements: 93.75,
branches: 81.81,
functions: 89.47,
lines: 96.64,
statements: 96.64,
},
},

Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"test:watch": "jest --watch"
},
"dependencies": {
"@metamask/safe-event-emitter": "^2.0.0",
"@metamask/utils": "^2.1.0",
"readable-stream": "^2.3.3"
},
"devDependencies": {
Expand All @@ -48,7 +48,7 @@
"eslint-plugin-prettier": "^3.3.1",
"jest": "^27.5.1",
"jest-it-up": "^2.0.2",
"json-rpc-engine": "^6.1.0",
"json-rpc-engine": "./json-rpc-engine-6.1.2.tgz",
"prettier": "^2.2.1",
"prettier-plugin-packagejson": "^2.2.17",
"rimraf": "^3.0.2",
Expand Down
186 changes: 186 additions & 0 deletions src/createDuplexJsonRpcStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import {
hasProperty,
isJsonRpcRequest,
isObject,
JsonRpcId,
JsonRpcNotification,
JsonRpcRequest,
JsonRpcResponse,
RuntimeObject,
} from '@metamask/utils';
import { Duplex } from 'readable-stream';
import {
DuplexJsonRpcEngine,
JsonRpcMiddleware,
JsonRpcNotificationHandler,
} from 'json-rpc-engine';
import { ErrorMessages, IdMapValue } from './utils';

type StreamCallback = (error?: Error | null) => void;

interface DuplexJsonRpcStreamOptions {
receiverMiddleware?: JsonRpcMiddleware<unknown, unknown>[];
receiverNotificationHandler?: JsonRpcNotificationHandler<unknown>;
senderMiddleware?: JsonRpcMiddleware<unknown, unknown>[];
senderNotificationHandler?: JsonRpcNotificationHandler<unknown>;
}

/**
* Foobar, bar baz.
*
* @param options - Options bag.
* @returns The stream wrapping the duplex JSON-RPC engine.
*/
export default function createDuplexJsonRpcStream(
options: DuplexJsonRpcStreamOptions = {},
) {
const {
receiverMiddleware = [],
receiverNotificationHandler = () => undefined,
senderMiddleware = [],
senderNotificationHandler,
} = options;

const outgoingIdMap: Map<JsonRpcId, IdMapValue> = new Map();
const stream = new Duplex({
objectMode: true,
read: () => undefined,
write: processMessage,
});

const sendNotification = (notification: JsonRpcNotification<unknown>) => {
stream.push(notification);
return undefined;
};

const _senderNotificationHandler = senderNotificationHandler
? async (notification: JsonRpcNotification<unknown>) => {
await senderNotificationHandler(notification);
return sendNotification(notification);
}
: sendNotification;

const engine = new DuplexJsonRpcEngine({
receiverNotificationHandler,
senderNotificationHandler: _senderNotificationHandler,
});

receiverMiddleware.forEach((middleware) =>
engine.addReceiverMiddleware(middleware),
);

senderMiddleware.forEach((middleware) =>
engine.addSenderMiddleware(middleware),
);

engine.addSenderMiddleware((req, res, _next, end) => {
// write req to stream
stream.push(req);
// register request on id map if
if (isJsonRpcRequest(req)) {
outgoingIdMap.set(req.id, { res, end });
}
});

return { duplexEngine: engine, duplexEngineStream: stream };

/**
* Writes a JSON-RPC object to the stream.
*
* @param message - The message to write to the stream.
* @param _encoding - The stream encoding, not used.
* @param cb - The stream write callback.
* @returns Nothing.
*/
function processMessage(
message: unknown,
_encoding: unknown,
cb: StreamCallback,
): void {
let err: Error | null = null;
try {
if (!isObject(message)) {
throw new Error('not an object');
} else if (isResponse(message)) {
receiveResponse(message);
} else if (isRequest(message)) {
return receiveRequest(message, cb);
} else {
throw new Error('neither a response nor request');
}
} catch (_err) {
err = _err as Error;
}

// continue processing stream
return cb(err);
}

/**
* Forwards a JSON-RPC request or notification to the receiving pipeline.
* Pushes any response from the pipeline to the stream.
*
* @param req - The request or notification to receive.
* @param cb - The stream write callback.
*/
function receiveRequest(
req: JsonRpcRequest<unknown> | JsonRpcNotification<unknown>,
cb: StreamCallback,
) {
// TypeScript defaults to the notification overload and we don't get a
// response unless we cast.
engine
.receive(req as JsonRpcRequest<unknown>)
.then((response) => {
if (response) {
stream.push(response);
}
cb();
})
.catch((error) => cb(error));
}

/**
* Receives a response to a request sent via the sending pipeline.
*
* @param res - The response to receive.
*/
function receiveResponse(res: JsonRpcResponse<unknown>) {
const context = outgoingIdMap.get(res.id);
if (!context) {
throw new Error(ErrorMessages.unknownResponse(res.id));
}

// Copy response received from the stream unto original response object,
// which will be returned by the engine on this side.
Object.assign(context.res, res);

outgoingIdMap.delete(res.id);
// Prevent internal stream handler from catching errors from this callback.
setTimeout(context.end);
}
}

/**
* A type guard for {@link JsonRpcResponse}.
*
* @param message - The object to type check.
* @returns The type check result.
*/
function isResponse(
message: RuntimeObject,
): message is JsonRpcResponse<unknown> {
return hasProperty(message, 'result') || hasProperty(message, 'error');
}

/**
* A type guard for {@link JsonRpcRequest} or {@link JsonRpcNotification}.
*
* @param message - The object to type check.
* @returns The type check result.
*/
function isRequest(
message: RuntimeObject,
): message is JsonRpcRequest<unknown> | JsonRpcNotification<unknown> {
return hasProperty(message, 'method');
}
13 changes: 2 additions & 11 deletions src/createEngineStream.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { JsonRpcRequest } from '@metamask/utils';
import { Duplex } from 'readable-stream';
import { JsonRpcEngine, JsonRpcRequest } from 'json-rpc-engine';
import { JsonRpcEngine } from 'json-rpc-engine';

interface EngineStreamOptions {
engine: JsonRpcEngine;
Expand All @@ -13,18 +14,8 @@ interface EngineStreamOptions {
* @returns The stream wrapping the engine.
*/
export default function createEngineStream(opts: EngineStreamOptions): Duplex {
if (!opts || !opts.engine) {
throw new Error('Missing engine parameter!');
}

const { engine } = opts;
const stream = new Duplex({ objectMode: true, read: () => undefined, write });
// forward notifications
if (engine.on) {
engine.on('notification', (message) => {
stream.push(message);
});
}
return stream;

/**
Expand Down
69 changes: 20 additions & 49 deletions src/createStreamMiddleware.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,7 @@
import SafeEventEmitter from '@metamask/safe-event-emitter';
import { Duplex } from 'readable-stream';
import {
JsonRpcEngineNextCallback,
JsonRpcEngineEndCallback,
JsonRpcNotification,
JsonRpcMiddleware,
JsonRpcRequest,
PendingJsonRpcResponse,
} from 'json-rpc-engine';

interface IdMapValue {
req: JsonRpcRequest<unknown>;
res: PendingJsonRpcResponse<unknown>;
next: JsonRpcEngineNextCallback;
end: JsonRpcEngineEndCallback;
}

interface IdMap {
[requestId: string]: IdMapValue;
}
import { JsonRpcMiddleware } from 'json-rpc-engine';
import { isJsonRpcRequest, JsonRpcId, JsonRpcResponse } from '@metamask/utils';
import { ErrorMessages, IdMapValue } from './utils';

/**
* Creates a JsonRpcEngine middleware with an associated Duplex stream and
Expand All @@ -29,28 +12,28 @@ interface IdMap {
* @returns The event emitter, middleware, and stream.
*/
export default function createStreamMiddleware() {
const idMap: IdMap = {};
const idMap: Map<JsonRpcId, IdMapValue> = new Map();
const stream = new Duplex({
objectMode: true,
read: () => undefined,
write: processMessage,
});

const events = new SafeEventEmitter();

const middleware: JsonRpcMiddleware<unknown, unknown> = (
req,
res,
next,
_next,
end,
) => {
// write req to stream
stream.push(req);
// register request on id map
idMap[req.id as unknown as string] = { req, res, next, end };
if (isJsonRpcRequest(req)) {
idMap.set(req.id, { res, end });
}
};

return { events, middleware, stream };
return { middleware, stream };

/**
* Writes a JSON-RPC object to the stream.
Expand All @@ -60,21 +43,17 @@ export default function createStreamMiddleware() {
* @param cb - The stream write callback.
*/
function processMessage(
res: PendingJsonRpcResponse<unknown>,
res: JsonRpcResponse<unknown>,
_encoding: unknown,
cb: (error?: Error | null) => void,
) {
let err: Error | null = null;
try {
const isNotification = !res.id;
if (isNotification) {
processNotification(res as unknown as JsonRpcNotification<unknown>);
} else {
processResponse(res);
}
processResponse(res);
} catch (_err) {
err = _err as Error;
}

// continue processing stream
cb(err);
}
Expand All @@ -84,26 +63,18 @@ export default function createStreamMiddleware() {
*
* @param res - The response to process.
*/
function processResponse(res: PendingJsonRpcResponse<unknown>) {
const context = idMap[res.id as unknown as string];
function processResponse(res: JsonRpcResponse<unknown>) {
const context = idMap.get(res.id);
if (!context) {
throw new Error(`StreamMiddleware - Unknown response id "${res.id}"`);
throw new Error(ErrorMessages.unknownResponse(res.id));
}

delete idMap[res.id as unknown as string];
// copy whole res onto original res
// Copy response received from the stream unto original response object,
// which will be returned by the engine on this side.
Object.assign(context.res, res);
// run callback on empty stack,
// prevent internal stream-handler from catching errors
setTimeout(context.end);
}

/**
* Processes a JSON-RPC notification.
*
* @param notif - The notification to process.
*/
function processNotification(notif: JsonRpcNotification<unknown>) {
events.emit('notification', notif);
idMap.delete(res.id);
// Prevent internal stream handler from catching errors from this callback.
setTimeout(context.end);
}
}
Loading