Skip to content

Commit

Permalink
Use binary message RPC protocol for plugin API (#11261)
Browse files Browse the repository at this point in the history
Refactors the plugin RPC protocol to make use of the new message-rpc introduced with #11011/#11228.
- Refactor plugin-ext RpcProtocol API to reuse the new message-rpc protocol
  - Remove custom RPC message encoding and handling reuse message-rpc
  - Implement `BatchingChannel` that queues messages and sends them accumulated on the next process.tick (replaces the old Multiplexer 
    implementation)
   - Refactors proxy handlers and remote target handlers
   - Use `Channel` instead of `MessageConnection` for creating new instances of RPCProtocol
   - Refactor `RpcMessageEncoder`/`RpcMessageDecoder` to enable overwritting of already registered value encoders/decoders. 
   - Add mode property to  base `RpcProtocol` to enable switching from a bidirectional RPC protocol to a client-only or server-only variant.
- Implement special message encoders and decoders for the plugin communication. (Replacement for the old `ObjectTransferrer` JSON replacers/revivers)
- Adapt `HostedPluginServer` and `HostedPluginClient` API to send/receive messages in binary format instead of strings. This enables direct writethrough of the binary messages received from the hosted plugin process.
- Adapt `hosted-plugin-process` and `plugin-host` to directly send binary messages via  `IpcChannel`/`BinaryMessagePipe`

- Remove incorrect (and unused) notification proxy identifiers and instantiation
  - NotificationExt was instantiated in the main context
  - There were unused notification proxy identifiers for main and ext in the wrong contexts

Part of #10684
Fixes #9514

Contributed on behalf of STMicroelectronics
  • Loading branch information
tortmayr authored Dec 5, 2022
1 parent a56dbfd commit c8914a5
Show file tree
Hide file tree
Showing 23 changed files with 590 additions and 574 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
<a name="breaking_changes_1.33.0">[Breaking Changes:](#breaking_changes_1.33.0)</a>

- [core] returns of many methods of `MenuModelRegistry` changed from `CompositeMenuNode` to `MutableCompoundMenuNode`. To mutate a menu, use the `updateOptions` method or add a check for `instanceof CompositeMenuNode`, which will be true in most cases.
- [plugin-ext] refactored the plugin RPC API - now also reuses the msgpackR based RPC protocol that is better suited for handling binary data and enables message tunneling [#11228](https://github.com/eclipse-theia/theia/pull/11261). All plugin protocol types now use `UInt8Array` as type for message parameters instead of `string` - Contributed on behalf of STMicroelectronics.

## v1.32.0 - 11/24/2022

Expand Down
25 changes: 20 additions & 5 deletions packages/core/src/common/message-rpc/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************

import { injectable } from '../../../shared/inversify';
import { Disposable, DisposableCollection } from '../disposable';
import { Emitter, Event } from '../event';
import { ReadBuffer, WriteBuffer } from './message-buffer';
Expand Down Expand Up @@ -72,7 +71,6 @@ export type MessageProvider = () => ReadBuffer;
* Reusable abstract {@link Channel} implementation that sets up
* the basic channel event listeners and offers a generic close method.
*/
@injectable()
export abstract class AbstractChannel implements Channel {

onCloseEmitter: Emitter<ChannelCloseEvent> = new Emitter();
Expand Down Expand Up @@ -101,7 +99,21 @@ export abstract class AbstractChannel implements Channel {
}

abstract getWriteBuffer(): WriteBuffer;
}

/**
* A very basic {@link AbstractChannel} implementation which takes a function
* for retrieving the {@link WriteBuffer} as constructor argument.
*/
export class BasicChannel extends AbstractChannel {

constructor(protected writeBufferProvider: () => WriteBuffer) {
super();
}

getWriteBuffer(): WriteBuffer {
return this.writeBufferProvider();
}
}

/**
Expand Down Expand Up @@ -194,7 +206,7 @@ export class ChannelMultiplexer implements Disposable {
return this.handleClose(id);
}
case MessageTypes.Data: {
return this.handleData(id, buffer.sliceAtReadPosition());
return this.handleData(id, buffer);
}
}
}
Expand All @@ -206,7 +218,7 @@ export class ChannelMultiplexer implements Disposable {
const channel = this.createChannel(id);
this.pendingOpen.delete(id);
this.openChannels.set(id, channel);
resolve!(channel);
resolve(channel);
this.onOpenChannelEmitter.fire({ id, channel });
}
}
Expand Down Expand Up @@ -236,7 +248,7 @@ export class ChannelMultiplexer implements Disposable {
protected handleData(id: string, data: ReadBuffer): void {
const channel = this.openChannels.get(id);
if (channel) {
channel.onMessageEmitter.fire(() => data);
channel.onMessageEmitter.fire(() => data.sliceAtReadPosition());
}
}

Expand All @@ -263,6 +275,9 @@ export class ChannelMultiplexer implements Disposable {
}

open(id: string): Promise<Channel> {
if (this.openChannels.has(id)) {
throw new Error(`Another channel with the id '${id}' is already open.`);
}
const result = new Promise<Channel>((resolve, reject) => {
this.pendingOpen.set(id, resolve);
});
Expand Down
8 changes: 6 additions & 2 deletions packages/core/src/common/message-rpc/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
//
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************
export { RequestHandler, RpcProtocol, RpcProtocolOptions } from './rpc-protocol';
export { Channel, AbstractChannel, ChannelCloseEvent, MessageProvider } from './channel';
export { AbstractChannel, Channel, ChannelCloseEvent, MessageProvider } from './channel';
export { ReadBuffer, WriteBuffer } from './message-buffer';
export { RequestHandler, RpcProtocol, RpcProtocolOptions } from './rpc-protocol';

import { registerMsgPackExtensions } from './rpc-message-encoder';

registerMsgPackExtensions();
70 changes: 70 additions & 0 deletions packages/core/src/common/message-rpc/msg-pack-extension-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// *****************************************************************************
// Copyright (C) 2022 STMicroelectronics and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0.
//
// This Source Code may also be made available under the following Secondary
// Licenses when the conditions for such availability set forth in the Eclipse
// Public License v. 2.0 are satisfied: GNU General Public License, version 2
// with the GNU Classpath Exception which is available at
// https://www.gnu.org/software/classpath/license.html.
//
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************

import { addExtension } from 'msgpackr';

/**
* Handles the global registration of custom MsgPackR extensions
* required for the default RPC communication. MsgPackR extensions
* are installed globally on both ends of the communication channel.
* (frontend-backend, pluginExt-pluginMain).
* Is implemented as singleton as it is also used in plugin child processes which have no access to inversify.
*/
export class MsgPackExtensionManager {
private static readonly INSTANCE = new MsgPackExtensionManager();
public static getInstance(): MsgPackExtensionManager {
return this.INSTANCE;
}

private extensions = new Map<number, MsgPackExtension>();

private constructor() {
}

registerExtensions(...extensions: MsgPackExtension[]): void {
extensions.forEach(extension => {
if (extension.tag < 1 || extension.tag > 100) {
// MsgPackR reserves the tag range 1-100 for custom extensions.
throw new Error(`MsgPack extension tag should be a number from 1-100 but was '${extension.tag}'`);
}
if (this.extensions.has(extension.tag)) {
throw new Error(`Another MsgPack extension with the tag '${extension.tag}' is already registered`);
}
this.extensions.set(extension.tag, extension);
addExtension({
Class: extension.class,
type: extension.tag,
write: extension.serialize,
read: extension.deserialize
});
});
}

getExtension(tag: number): MsgPackExtension | undefined {
return this.extensions.get(tag);
}
}

export interface MsgPackExtension {
class: Function,
tag: number,
serialize(instance: unknown): unknown,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
deserialize(serialized: any): unknown
}

export type Constructor<T> = new (...params: unknown[]) => T;

46 changes: 24 additions & 22 deletions packages/core/src/common/message-rpc/rpc-message-encoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
// *****************************************************************************
/* eslint-disable @typescript-eslint/no-explicit-any */

import { addExtension, Packr as MsgPack } from 'msgpackr';
import { Packr as MsgPack } from 'msgpackr';
import { ReadBuffer, WriteBuffer } from './message-buffer';
import { MsgPackExtensionManager } from './msg-pack-extension-manager';

/**
* This code lets you encode rpc protocol messages (request/reply/notification/error/cancel)
Expand Down Expand Up @@ -121,27 +122,10 @@ export interface RpcMessageEncoder {
}

export const defaultMsgPack = new MsgPack({ moreTypes: true, encodeUndefinedAsNil: false, bundleStrings: false });
// Add custom msgpackR extension for ResponseErrors.
addExtension({
Class: ResponseError,
type: 1,
write: (instance: ResponseError) => {
const { code, data, message, name, stack } = instance;
return { code, data, message, name, stack };
},
read: data => {
const error = new ResponseError(data.code, data.message, data.data);
error.name = data.name;
error.stack = data.stack;
return error;
}
});

export class MsgPackMessageEncoder implements RpcMessageEncoder {

constructor(protected readonly msgPack: MsgPack = defaultMsgPack) {

}
constructor(protected readonly msgPack: MsgPack = defaultMsgPack) { }

cancel(buf: WriteBuffer, requestId: number): void {
this.encode<CancelMessage>(buf, { type: RpcMessageType.Cancel, id: requestId });
Expand Down Expand Up @@ -169,13 +153,11 @@ export class MsgPackMessageEncoder implements RpcMessageEncoder {
throw err;
}
}

}

export class MsgPackMessageDecoder implements RpcMessageDecoder {
constructor(protected readonly msgPack: MsgPack = defaultMsgPack) {
constructor(protected readonly msgPack: MsgPack = defaultMsgPack) { }

}
decode<T = any>(buf: ReadBuffer): T {
const bytes = buf.readBytes();
return this.msgPack.decode(bytes);
Expand All @@ -184,5 +166,25 @@ export class MsgPackMessageDecoder implements RpcMessageDecoder {
parse(buffer: ReadBuffer): RpcMessage {
return this.decode(buffer);
}
}

export function registerMsgPackExtensions(): void {
// Register custom msgPack extension for Errors.
MsgPackExtensionManager.getInstance().registerExtensions({
class: Error,
tag: 1,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
serialize: (error: any) => {
const { code, data, message, name } = error;
const stack = error.stacktrace ?? error.stack;
const isResponseError = error instanceof ResponseError;
return { code, data, message, name, stack, isResponseError };
},
deserialize: data => {
const error = data.isResponseError ? new ResponseError(data.code, data.message, data.data) : new Error(data.message);
error.name = data.name;
error.stack = data.stack;
return error;
}
});
}
Loading

0 comments on commit c8914a5

Please sign in to comment.