From 10763e4a692e6c4bef874ecabfa96b4fed3c029a Mon Sep 17 00:00:00 2001 From: Erik Marks Date: Sat, 16 Jul 2022 10:43:25 -0700 Subject: [PATCH 1/2] Add duplex engine stream --- jest.config.js | 8 +- package.json | 4 +- src/createDuplexJsonRpcStream.ts | 186 +++++++++++++++++++++++++++++++ src/createEngineStream.ts | 13 +-- src/createStreamMiddleware.ts | 69 ++++-------- src/index.test.ts | 129 +++++++++++++++++++-- src/index.ts | 7 +- src/utils.ts | 15 +++ yarn.lock | 15 ++- 9 files changed, 368 insertions(+), 78 deletions(-) create mode 100644 src/createDuplexJsonRpcStream.ts create mode 100644 src/utils.ts diff --git a/jest.config.js b/jest.config.js index eb46773..e1086d0 100644 --- a/jest.config.js +++ b/jest.config.js @@ -21,10 +21,10 @@ module.exports = { // An object that configures minimum threshold enforcement for coverage results coverageThreshold: { global: { - branches: 69.23, - functions: 88.88, - lines: 93.75, - statements: 93.75, + branches: 81.81, + functions: 89.47, + lines: 96.64, + statements: 96.64, }, }, diff --git a/package.json b/package.json index ceb6d15..ca7572b 100644 --- a/package.json +++ b/package.json @@ -24,7 +24,7 @@ "test:watch": "jest --watch" }, "dependencies": { - "@metamask/safe-event-emitter": "^2.0.0", + "@metamask/utils": "^2.1.0", "readable-stream": "^2.3.3" }, "devDependencies": { @@ -48,7 +48,7 @@ "eslint-plugin-prettier": "^3.3.1", "jest": "^27.5.1", "jest-it-up": "^2.0.2", - "json-rpc-engine": "^6.1.0", + "json-rpc-engine": "./json-rpc-engine-6.1.2.tgz", "prettier": "^2.2.1", "prettier-plugin-packagejson": "^2.2.17", "rimraf": "^3.0.2", diff --git a/src/createDuplexJsonRpcStream.ts b/src/createDuplexJsonRpcStream.ts new file mode 100644 index 0000000..35a6ba3 --- /dev/null +++ b/src/createDuplexJsonRpcStream.ts @@ -0,0 +1,186 @@ +import { + hasProperty, + isJsonRpcRequest, + isObject, + JsonRpcId, + JsonRpcNotification, + JsonRpcRequest, + JsonRpcResponse, + RuntimeObject, +} from '@metamask/utils'; +import { Duplex } from 'readable-stream'; +import { + DuplexJsonRpcEngine, + JsonRpcMiddleware, + JsonRpcNotificationHandler, +} from 'json-rpc-engine'; +import { ErrorMessages, IdMapValue } from './utils'; + +type StreamCallback = (error?: Error | null) => void; + +interface DuplexJsonRpcStreamOptions { + receiverMiddleware?: JsonRpcMiddleware[]; + receiverNotificationHandler?: JsonRpcNotificationHandler; + senderMiddleware?: JsonRpcMiddleware[]; + senderNotificationHandler?: JsonRpcNotificationHandler; +} + +/** + * Foobar, bar baz. + * + * @param options - Options bag. + * @returns The stream wrapping the duplex JSON-RPC engine. + */ +export default function createDuplexJsonRpcStream( + options: DuplexJsonRpcStreamOptions, +) { + const { + receiverMiddleware = [], + receiverNotificationHandler = () => undefined, + senderMiddleware = [], + senderNotificationHandler, + } = options; + + const outgoingIdMap: Map = new Map(); + const stream = new Duplex({ + objectMode: true, + read: () => undefined, + write: processMessage, + }); + + const sendNotification = (notification: JsonRpcNotification) => { + stream.push(notification); + return undefined; + }; + + const _senderNotificationHandler = senderNotificationHandler + ? async (notification: JsonRpcNotification) => { + await senderNotificationHandler(notification); + return sendNotification(notification); + } + : sendNotification; + + const engine = new DuplexJsonRpcEngine({ + receiverNotificationHandler, + senderNotificationHandler: _senderNotificationHandler, + }); + + receiverMiddleware.forEach((middleware) => + engine.addReceiverMiddleware(middleware), + ); + + senderMiddleware.forEach((middleware) => + engine.addSenderMiddleware(middleware), + ); + + engine.addSenderMiddleware((req, res, _next, end) => { + // write req to stream + stream.push(req); + // register request on id map if + if (isJsonRpcRequest(req)) { + outgoingIdMap.set(req.id, { res, end }); + } + }); + + return { duplexEngine: engine, duplexEngineStream: stream }; + + /** + * Writes a JSON-RPC object to the stream. + * + * @param message - The message to write to the stream. + * @param _encoding - The stream encoding, not used. + * @param cb - The stream write callback. + * @returns Nothing. + */ + function processMessage( + message: unknown, + _encoding: unknown, + cb: StreamCallback, + ): void { + let err: Error | null = null; + try { + if (!isObject(message)) { + throw new Error('not an object'); + } else if (isResponse(message)) { + receiveResponse(message); + } else if (isRequest(message)) { + return receiveRequest(message, cb); + } else { + throw new Error('neither a response nor request'); + } + } catch (_err) { + err = _err as Error; + } + + // continue processing stream + return cb(err); + } + + /** + * Forwards a JSON-RPC request or notification to the receiving pipeline. + * Pushes any response from the pipeline to the stream. + * + * @param req - The request or notification to receive. + * @param cb - The stream write callback. + */ + function receiveRequest( + req: JsonRpcRequest | JsonRpcNotification, + cb: StreamCallback, + ) { + // TypeScript defaults to the notification overload and we don't get a + // response unless we cast. + engine + .receive(req as JsonRpcRequest) + .then((response) => { + if (response) { + stream.push(response); + } + cb(); + }) + .catch((error) => cb(error)); + } + + /** + * Receives a response to a request sent via the sending pipeline. + * + * @param res - The response to receive. + */ + function receiveResponse(res: JsonRpcResponse) { + const context = outgoingIdMap.get(res.id); + if (!context) { + throw new Error(ErrorMessages.unknownResponse(res.id)); + } + + // Copy response received from the stream unto original response object, + // which will be returned by the engine on this side. + Object.assign(context.res, res); + + outgoingIdMap.delete(res.id); + // Prevent internal stream handler from catching errors from this callback. + setTimeout(context.end); + } +} + +/** + * A type guard for {@link JsonRpcResponse}. + * + * @param message - The object to type check. + * @returns The type check result. + */ +function isResponse( + message: RuntimeObject, +): message is JsonRpcResponse { + return hasProperty(message, 'result') || hasProperty(message, 'error'); +} + +/** + * A type guard for {@link JsonRpcRequest} or {@link JsonRpcNotification}. + * + * @param message - The object to type check. + * @returns The type check result. + */ +function isRequest( + message: RuntimeObject, +): message is JsonRpcRequest | JsonRpcNotification { + return hasProperty(message, 'method'); +} diff --git a/src/createEngineStream.ts b/src/createEngineStream.ts index bbb4fd9..a3f44d7 100644 --- a/src/createEngineStream.ts +++ b/src/createEngineStream.ts @@ -1,5 +1,6 @@ +import { JsonRpcRequest } from '@metamask/utils'; import { Duplex } from 'readable-stream'; -import { JsonRpcEngine, JsonRpcRequest } from 'json-rpc-engine'; +import { JsonRpcEngine } from 'json-rpc-engine'; interface EngineStreamOptions { engine: JsonRpcEngine; @@ -13,18 +14,8 @@ interface EngineStreamOptions { * @returns The stream wrapping the engine. */ export default function createEngineStream(opts: EngineStreamOptions): Duplex { - if (!opts || !opts.engine) { - throw new Error('Missing engine parameter!'); - } - const { engine } = opts; const stream = new Duplex({ objectMode: true, read: () => undefined, write }); - // forward notifications - if (engine.on) { - engine.on('notification', (message) => { - stream.push(message); - }); - } return stream; /** diff --git a/src/createStreamMiddleware.ts b/src/createStreamMiddleware.ts index 01f4838..f1b33aa 100644 --- a/src/createStreamMiddleware.ts +++ b/src/createStreamMiddleware.ts @@ -1,24 +1,7 @@ -import SafeEventEmitter from '@metamask/safe-event-emitter'; import { Duplex } from 'readable-stream'; -import { - JsonRpcEngineNextCallback, - JsonRpcEngineEndCallback, - JsonRpcNotification, - JsonRpcMiddleware, - JsonRpcRequest, - PendingJsonRpcResponse, -} from 'json-rpc-engine'; - -interface IdMapValue { - req: JsonRpcRequest; - res: PendingJsonRpcResponse; - next: JsonRpcEngineNextCallback; - end: JsonRpcEngineEndCallback; -} - -interface IdMap { - [requestId: string]: IdMapValue; -} +import { JsonRpcMiddleware } from 'json-rpc-engine'; +import { isJsonRpcRequest, JsonRpcId, JsonRpcResponse } from '@metamask/utils'; +import { ErrorMessages, IdMapValue } from './utils'; /** * Creates a JsonRpcEngine middleware with an associated Duplex stream and @@ -29,28 +12,28 @@ interface IdMap { * @returns The event emitter, middleware, and stream. */ export default function createStreamMiddleware() { - const idMap: IdMap = {}; + const idMap: Map = new Map(); const stream = new Duplex({ objectMode: true, read: () => undefined, write: processMessage, }); - const events = new SafeEventEmitter(); - const middleware: JsonRpcMiddleware = ( req, res, - next, + _next, end, ) => { // write req to stream stream.push(req); // register request on id map - idMap[req.id as unknown as string] = { req, res, next, end }; + if (isJsonRpcRequest(req)) { + idMap.set(req.id, { res, end }); + } }; - return { events, middleware, stream }; + return { middleware, stream }; /** * Writes a JSON-RPC object to the stream. @@ -60,21 +43,17 @@ export default function createStreamMiddleware() { * @param cb - The stream write callback. */ function processMessage( - res: PendingJsonRpcResponse, + res: JsonRpcResponse, _encoding: unknown, cb: (error?: Error | null) => void, ) { let err: Error | null = null; try { - const isNotification = !res.id; - if (isNotification) { - processNotification(res as unknown as JsonRpcNotification); - } else { - processResponse(res); - } + processResponse(res); } catch (_err) { err = _err as Error; } + // continue processing stream cb(err); } @@ -84,26 +63,18 @@ export default function createStreamMiddleware() { * * @param res - The response to process. */ - function processResponse(res: PendingJsonRpcResponse) { - const context = idMap[res.id as unknown as string]; + function processResponse(res: JsonRpcResponse) { + const context = idMap.get(res.id); if (!context) { - throw new Error(`StreamMiddleware - Unknown response id "${res.id}"`); + throw new Error(ErrorMessages.unknownResponse(res.id)); } - delete idMap[res.id as unknown as string]; - // copy whole res onto original res + // Copy response received from the stream unto original response object, + // which will be returned by the engine on this side. Object.assign(context.res, res); - // run callback on empty stack, - // prevent internal stream-handler from catching errors - setTimeout(context.end); - } - /** - * Processes a JSON-RPC notification. - * - * @param notif - The notification to process. - */ - function processNotification(notif: JsonRpcNotification) { - events.emit('notification', notif); + idMap.delete(res.id); + // Prevent internal stream handler from catching errors from this callback. + setTimeout(context.end); } } diff --git a/src/index.test.ts b/src/index.test.ts index 87d1e44..0fa787c 100644 --- a/src/index.test.ts +++ b/src/index.test.ts @@ -1,5 +1,10 @@ -import { JsonRpcEngine } from 'json-rpc-engine'; -import { createStreamMiddleware, createEngineStream } from '.'; +import { Writable } from 'readable-stream'; +import { JsonRpcEngine, JsonRpcMiddleware } from 'json-rpc-engine'; +import { + createStreamMiddleware, + createEngineStream, + createDuplexJsonRpcStream, +} from '.'; const jsonrpc = '2.0' as const; @@ -13,8 +18,12 @@ describe('createStreamMiddleware', () => { await new Promise((resolve, reject) => { // listen for incoming requests jsonRpcConnection.stream.on('data', (_req) => { - expect(req).toStrictEqual(_req); - jsonRpcConnection.stream.write(res); + try { + expect(req).toStrictEqual(_req); + } catch (err) { + return reject(err); + } + return jsonRpcConnection.stream.write(res); }); // run middleware, expect end fn to be called @@ -42,7 +51,7 @@ describe('createStreamMiddleware', () => { describe('createEngineStream', () => { it('processes a request', async () => { const engine = new JsonRpcEngine(); - engine.push((_req, res, _next, end) => { + engine.addMiddleware((_req, res, _next, end) => { res.result = 'test'; end(); }); @@ -76,11 +85,11 @@ describe('middleware and engine to stream', () => { // create guest const engineA = new JsonRpcEngine(); const jsonRpcConnection = createStreamMiddleware(); - engineA.push(jsonRpcConnection.middleware); + engineA.addMiddleware(jsonRpcConnection.middleware); // create host const engineB = new JsonRpcEngine(); - engineB.push((_req, res, _next, end) => { + engineB.addMiddleware((_req, res, _next, end) => { res.result = 'test'; end(); }); @@ -98,3 +107,109 @@ describe('middleware and engine to stream', () => { expect(response).toStrictEqual(res); }); }); + +describe('createDuplexJsonRpcStream', () => { + it('processes inbound and outbound requests and responses', async () => { + // Add receiver middleware + const receiverMiddleware: JsonRpcMiddleware[] = [ + (_req, res, _next, end) => { + res.result = 'received'; + end(); + }, + ]; + + const { duplexEngine, duplexEngineStream } = createDuplexJsonRpcStream({ + receiverMiddleware, + }); + + // Pipe the duplex engine stream to a sink that stores outgoing JSON-RPC + // messages. + const outgoingMessages: any[] = []; + duplexEngineStream.pipe( + new Writable({ + objectMode: true, + write: (obj, _encoding, callback) => { + outgoingMessages.push(obj); + callback(); + }, + }), + ); + + // request and expected result + const incomingReq = { id: 1, jsonrpc, method: 'testIn' }; + const outgoingReq = { id: 2, jsonrpc, method: 'testOut' }; + // const outgoingNotif = { jsonrpc, method: 'notifOut' } + // const incomingNotif = { jsonrpc, method: 'notifIn' }; + + duplexEngineStream.write({ ...incomingReq }); + const outgoingReqPromise = duplexEngine.send({ ...outgoingReq }); + + // Yield the event loop so the stream can finish processing. + await new Promise((resolve) => setTimeout(resolve, 1)); + + // Write the mock response to the outgoing request, and wait for it to be + // resolved by the deferred promise. + const expectedResponse = { id: outgoingReq.id, jsonrpc, result: 'foo' }; + duplexEngineStream.write({ ...expectedResponse }); + const receivedResponse = await outgoingReqPromise; + + // The received response should be equal to the written response. + expect(receivedResponse).toStrictEqual(expectedResponse); + + expect(outgoingMessages).toHaveLength(2); + // The first message should be the outgoing request. + expect(outgoingMessages[0]).toStrictEqual({ ...outgoingReq }); + // The second message should be the response to the incoming request. + expect(outgoingMessages[1]).toStrictEqual({ + id: 1, + jsonrpc, + result: 'received', + }); + }); + + it('processes inbound and outbound notifications', async () => { + const receiverNotificationHandler = jest.fn(); + const senderNotificationHandler = jest.fn(); + + const { duplexEngine, duplexEngineStream } = createDuplexJsonRpcStream({ + receiverNotificationHandler, + senderNotificationHandler, + }); + + // Pipe the duplex engine stream to a sink that stores outgoing JSON-RPC + // messages. + const outgoingMessages: any[] = []; + duplexEngineStream.pipe( + new Writable({ + objectMode: true, + write: (obj, _encoding, callback) => { + outgoingMessages.push(obj); + callback(); + }, + }), + ); + + // request and expected result + const incomingNotif = { jsonrpc, method: 'notifIn' }; + const outgoingNotif = { jsonrpc, method: 'notifOut' }; + + duplexEngineStream.write({ ...incomingNotif }); + expect(await duplexEngine.send({ ...outgoingNotif })).toBeUndefined(); + + // Yield the event loop so the stream can finish processing. + await new Promise((resolve) => setTimeout(resolve, 1)); + + expect(receiverNotificationHandler).toHaveBeenCalledTimes(1); + expect(receiverNotificationHandler).toHaveBeenCalledWith({ + ...incomingNotif, + }); + + expect(senderNotificationHandler).toHaveBeenCalledTimes(1); + expect(senderNotificationHandler).toHaveBeenCalledWith({ + ...outgoingNotif, + }); + + expect(outgoingMessages).toHaveLength(1); + expect(outgoingMessages[0]).toStrictEqual({ ...outgoingNotif }); + }); +}); diff --git a/src/index.ts b/src/index.ts index e53b25d..35d393b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,4 +1,9 @@ import createEngineStream from './createEngineStream'; import createStreamMiddleware from './createStreamMiddleware'; +import createDuplexJsonRpcStream from './createDuplexJsonRpcStream'; -export { createEngineStream, createStreamMiddleware }; +export { + createDuplexJsonRpcStream, + createEngineStream, + createStreamMiddleware, +}; diff --git a/src/utils.ts b/src/utils.ts new file mode 100644 index 0000000..533e97c --- /dev/null +++ b/src/utils.ts @@ -0,0 +1,15 @@ +import { JsonRpcId } from '@metamask/utils'; +import { + JsonRpcEngineEndCallback, + PendingJsonRpcResponse, +} from 'json-rpc-engine'; + +export interface IdMapValue { + res: PendingJsonRpcResponse; + end: JsonRpcEngineEndCallback; +} + +export const ErrorMessages = { + unknownResponse: (id: JsonRpcId) => + `JSON-RPC stream - Received response with unknown id "${id}".`, +} as const; diff --git a/yarn.lock b/yarn.lock index c0f8a94..1e1375b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -686,6 +686,13 @@ resolved "https://registry.yarnpkg.com/@metamask/safe-event-emitter/-/safe-event-emitter-2.0.0.tgz#af577b477c683fad17c619a78208cede06f9605c" integrity sha512-/kSXhY692qiV1MXu6EeOZvg5nECLclxNXcKCxJ3cXQgYuRymRHpdx/t7JXfsK+JLjwA1e1c1/SBrlQYpusC29Q== +"@metamask/utils@^2.0.0", "@metamask/utils@^2.1.0": + version "2.1.0" + resolved "https://registry.yarnpkg.com/@metamask/utils/-/utils-2.1.0.tgz#a65eaa0932b863383844ec323e05e293d8e718ab" + integrity sha512-4PHdo5B1ifpw6GbsdlDpp8oqA++rddSmt2pWBHtIGGL2tQMvmfHdaDDSns4JP9iC+AbMogVcUpv5Vt8ow1zsRA== + dependencies: + fast-deep-equal "^3.1.3" + "@nodelib/fs.scandir@2.1.3": version "2.1.3" resolved "https://registry.yarnpkg.com/@nodelib/fs.scandir/-/fs.scandir-2.1.3.tgz#3a582bdb53804c6ba6d146579c46e52130cf4a3b" @@ -3339,12 +3346,12 @@ json-parse-even-better-errors@^2.3.0: resolved "https://registry.yarnpkg.com/json-parse-even-better-errors/-/json-parse-even-better-errors-2.3.1.tgz#7c47805a94319928e05777405dc12e1f7a4ee02d" integrity sha512-xyFwyhro/JEof6Ghe2iz2NcXoj2sloNsWr/XsERDK/oiPCfaNhl5ONfp+jQdAZRQQ0IJWNzH9zIZF7li91kh2w== -json-rpc-engine@^6.1.0: - version "6.1.0" - resolved "https://registry.yarnpkg.com/json-rpc-engine/-/json-rpc-engine-6.1.0.tgz#bf5ff7d029e1c1bf20cb6c0e9f348dcd8be5a393" - integrity sha512-NEdLrtrq1jUZyfjkr9OCz9EzCNhnRyWtt1PAnvnhwy6e8XETS0Dtc+ZNCO2gvuAoKsIn2+vCSowXTYE4CkgnAQ== +json-rpc-engine@./json-rpc-engine-6.1.2.tgz: + version "6.1.2" + resolved "./json-rpc-engine-6.1.2.tgz#11f80bf3d00af63b43776b27057fca70c6174c75" dependencies: "@metamask/safe-event-emitter" "^2.0.0" + "@metamask/utils" "^2.0.0" eth-rpc-errors "^4.0.2" json-schema-traverse@^0.4.1: From b5b1d275041be9bd6a928f560f3b9085fe8503bd Mon Sep 17 00:00:00 2001 From: Erik Marks Date: Tue, 19 Jul 2022 22:24:24 -0700 Subject: [PATCH 2/2] Make duplex stream constructor argument optional --- src/createDuplexJsonRpcStream.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/createDuplexJsonRpcStream.ts b/src/createDuplexJsonRpcStream.ts index 35a6ba3..f5acb4a 100644 --- a/src/createDuplexJsonRpcStream.ts +++ b/src/createDuplexJsonRpcStream.ts @@ -32,7 +32,7 @@ interface DuplexJsonRpcStreamOptions { * @returns The stream wrapping the duplex JSON-RPC engine. */ export default function createDuplexJsonRpcStream( - options: DuplexJsonRpcStreamOptions, + options: DuplexJsonRpcStreamOptions = {}, ) { const { receiverMiddleware = [],