Skip to content

Commit

Permalink
fix: extracting middleware to its own files
Browse files Browse the repository at this point in the history
[ci skip]
  • Loading branch information
tegefaulkes committed Feb 14, 2023
1 parent 89c6c59 commit 1d88f5d
Show file tree
Hide file tree
Showing 12 changed files with 562 additions and 466 deletions.
3 changes: 2 additions & 1 deletion src/RPC/RPCClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import type {
} from './types';
import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy';
import Logger from '@matrixai/logger';
import * as middlewareUtils from './middleware';
import * as rpcErrors from './errors';
import * as rpcUtils from './utils';
import {
Expand All @@ -30,7 +31,7 @@ class RPCClient<M extends ClientManifest> {
static async createRPCClient<M extends ClientManifest>({
manifest,
streamPairCreateCallback,
middleware = rpcUtils.defaultClientMiddlewareWrapper(),
middleware = middlewareUtils.defaultClientMiddlewareWrapper(),
logger = new Logger(this.name),
}: {
manifest: M;
Expand Down
3 changes: 2 additions & 1 deletion src/RPC/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
} from './handlers';
import * as rpcUtils from './utils';
import * as rpcErrors from './errors';
import * as middlewareUtils from './middleware';
import { never } from '../utils/utils';
import { sysexits } from '../errors';

Expand All @@ -37,7 +38,7 @@ interface RPCServer extends CreateDestroy {}
class RPCServer {
static async createRPCServer({
manifest,
middleware = rpcUtils.defaultServerMiddlewareWrapper(),
middleware = middlewareUtils.defaultServerMiddlewareWrapper(),
sensitive = false,
logger = new Logger(this.name),
}: {
Expand Down
158 changes: 158 additions & 0 deletions src/RPC/middleware.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import type {
JsonRpcMessage,
JsonRpcRequest,
JsonRpcResponse,
JsonRpcResponseResult,
MiddlewareFactory,
} from './types';
import { TransformStream } from 'stream/web';
import * as rpcErrors from './errors';
import * as rpcUtils from './utils';
const jsonStreamParsers = require('@streamparser/json');

function binaryToJsonMessageStream<T extends JsonRpcMessage>(
messageParser: (message: unknown) => T,
byteLimit: number = 1024 * 1024,
firstMessage?: T,
) {
const parser = new jsonStreamParsers.JSONParser({
separator: '',
paths: ['$'],
});
let bytesWritten: number = 0;

return new TransformStream<Uint8Array, T>({
start: (controller) => {
if (firstMessage != null) controller.enqueue(firstMessage);
parser.onValue = (value) => {
const jsonMessage = messageParser(value.value);
controller.enqueue(jsonMessage);
bytesWritten = 0;
};
},
transform: (chunk) => {
try {
bytesWritten += chunk.byteLength;
parser.write(chunk);
} catch (e) {
throw new rpcErrors.ErrorRpcParse(undefined, { cause: e });
}
if (bytesWritten > byteLimit) {
throw new rpcErrors.ErrorRpcMessageLength();
}
},
});
}

function jsonMessageToBinaryStream() {
return new TransformStream<JsonRpcMessage, Uint8Array>({
transform: (chunk, controller) => {
controller.enqueue(Buffer.from(JSON.stringify(chunk)));
},
});
}

const defaultMiddleware: MiddlewareFactory<
JsonRpcRequest,
JsonRpcRequest,
JsonRpcResponse,
JsonRpcResponse
> = () => {
return {
forward: new TransformStream(),
reverse: new TransformStream(),
};
};

const defaultServerMiddlewareWrapper = (
middleware: MiddlewareFactory<
JsonRpcRequest,
JsonRpcRequest,
JsonRpcResponse,
JsonRpcResponse
> = defaultMiddleware,
) => {
return (header: JsonRpcRequest) => {
const inputTransformStream = binaryToJsonMessageStream(
rpcUtils.parseJsonRpcRequest,
undefined,
header,
);
const outputTransformStream = new TransformStream<
JsonRpcResponseResult,
JsonRpcResponseResult
>();

const middleMiddleware = middleware(header);

const forwardReadable = inputTransformStream.readable.pipeThrough(
middleMiddleware.forward,
); // Usual middleware here
const reverseReadable = outputTransformStream.readable
.pipeThrough(middleMiddleware.reverse) // Usual middleware here
.pipeThrough(jsonMessageToBinaryStream());

return {
forward: {
readable: forwardReadable,
writable: inputTransformStream.writable,
},
reverse: {
readable: reverseReadable,
writable: outputTransformStream.writable,
},
};
};
};

const defaultClientMiddlewareWrapper = (
middleware: MiddlewareFactory<
JsonRpcRequest,
JsonRpcRequest,
JsonRpcResponse,
JsonRpcResponse
> = defaultMiddleware,
): MiddlewareFactory<
Uint8Array,
JsonRpcRequest,
JsonRpcResponse,
Uint8Array
> => {
return () => {
const outputTransformStream = binaryToJsonMessageStream(
rpcUtils.parseJsonRpcResponse,
undefined,
);
const inputTransformStream = new TransformStream<
JsonRpcRequest,
JsonRpcRequest
>();

const middleMiddleware = middleware();
const forwardReadable = inputTransformStream.readable
.pipeThrough(middleMiddleware.forward) // Usual middleware here
.pipeThrough(jsonMessageToBinaryStream());
const reverseReadable = outputTransformStream.readable.pipeThrough(
middleMiddleware.reverse,
); // Usual middleware here

return {
forward: {
readable: forwardReadable,
writable: inputTransformStream.writable,
},
reverse: {
readable: reverseReadable,
writable: outputTransformStream.writable,
},
};
};
};

export {
binaryToJsonMessageStream,
jsonMessageToBinaryStream,
defaultMiddleware,
defaultServerMiddlewareWrapper,
defaultClientMiddlewareWrapper,
};
Loading

0 comments on commit 1d88f5d

Please sign in to comment.