Skip to content

Commit

Permalink
[WIP] Integrate new message-rpc prototype into core messaging API (ex…
Browse files Browse the repository at this point in the history
…tensions)

Part of eclipse-theia#10684
  • Loading branch information
tortmayr committed Feb 25, 2022
1 parent 76595fb commit 2287c58
Show file tree
Hide file tree
Showing 41 changed files with 903 additions and 809 deletions.
4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
"**/@types/node": "12"
},
"devDependencies": {
"@types/chai": "4.3.0",
"@types/chai-spies": "1.0.3",
"@types/chai-string": "^1.4.0",
"@types/jsdom": "^11.0.4",
"@types/node": "12",
Expand All @@ -20,6 +22,8 @@
"@typescript-eslint/eslint-plugin": "^4.8.1",
"@typescript-eslint/eslint-plugin-tslint": "^4.8.1",
"@typescript-eslint/parser": "^4.8.1",
"chai": "4.3.4",
"chai-spies": "1.0.0",
"chai-string": "^1.4.0",
"chalk": "4.0.0",
"concurrently": "^3.5.0",
Expand Down
84 changes: 57 additions & 27 deletions packages/core/src/browser/messaging/ws-connection-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************

import { injectable, interfaces, decorate, unmanaged } from 'inversify';
import { JsonRpcProxyFactory, JsonRpcProxy, Emitter, Event } from '../../common';
import { decorate, injectable, interfaces, unmanaged } from 'inversify';
import { io, Socket } from 'socket.io-client';
import { Emitter, Event, JsonRpcProxy, JsonRpcProxyFactory } from '../../common';
import { ArrayBufferReadBuffer, ArrayBufferWriteBuffer } from '../../common/message-rpc/array-buffer-message-buffer';
import { Channel, ReadBufferConstructor } from '../../common/message-rpc/channel';
import { WriteBuffer } from '../../common/message-rpc/message-buffer';
import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider';
import { WebSocketChannel } from '../../common/messaging/web-socket-channel';
import { Endpoint } from '../endpoint';
import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider';
import { io, Socket } from 'socket.io-client';

decorate(injectable(), JsonRpcProxyFactory);
decorate(unmanaged(), JsonRpcProxyFactory, 0);
Expand All @@ -35,6 +38,8 @@ export interface WebSocketOptions {
export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebSocketOptions> {

protected readonly onSocketDidOpenEmitter: Emitter<void> = new Emitter();
// Socket that is used by the main channel
protected socket: Socket;
get onSocketDidOpen(): Event<void> {
return this.onSocketDidOpenEmitter.event;
}
Expand All @@ -48,31 +53,23 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
return container.get(WebSocketConnectionProvider).createProxy<T>(path, arg);
}

protected readonly socket: Socket;

constructor() {
super();
protected createMainChannel(): Channel {
const url = this.createWebSocketUrl(WebSocketChannel.wsPath);
const socket = this.createWebSocket(url);
const channel = new SocketIOChannel(socket);
socket.on('connect', () => {
this.fireSocketDidOpen();
});
socket.on('disconnect', reason => {
for (const channel of [...this.channels.values()]) {
channel.close(undefined, reason);
}
this.fireSocketDidClose();
});
socket.on('message', data => {
this.handleIncomingRawMessage(data);
});
channel.onClose(() => this.fireSocketDidClose());
socket.connect();
this.socket = socket;

return channel;
}

override openChannel(path: string, handler: (channel: WebSocketChannel) => void, options?: WebSocketOptions): void {
override async openChannel(path: string, handler: (channel: Channel) => void, options?: WebSocketOptions): Promise<void> {
if (this.socket.connected) {
super.openChannel(path, handler, options);
return super.openChannel(path, handler, options);
} else {
const openChannel = () => {
this.socket.off('connect', openChannel);
Expand All @@ -82,14 +79,6 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
}
}

protected createChannel(id: number): WebSocketChannel {
return new WebSocketChannel(id, content => {
if (this.socket.connected) {
this.socket.send(content);
}
});
}

/**
* Creates a websocket URL to the current location
*/
Expand Down Expand Up @@ -128,3 +117,44 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
this.onSocketDidCloseEmitter.fire(undefined);
}
}

export class SocketIOChannel implements Channel {
protected readonly onCloseEmitter: Emitter<void> = new Emitter();
get onClose(): Event<void> {
return this.onCloseEmitter.event;
}

protected readonly onMessageEmitter: Emitter<ReadBufferConstructor> = new Emitter();
get onMessage(): Event<ReadBufferConstructor> {
return this.onMessageEmitter.event;
}

protected readonly onErrorEmitter: Emitter<unknown> = new Emitter();
get onError(): Event<unknown> {
return this.onErrorEmitter.event;
}

readonly id: string;

constructor(protected readonly socket: Socket) {
socket.on('error', error => this.onErrorEmitter.fire(error));
socket.on('disconnect', reason => this.onCloseEmitter.fire());
socket.on('message', buffer => this.onMessageEmitter.fire(() => new ArrayBufferReadBuffer(buffer)));
this.id = socket.id;
}

getWriteBuffer(): WriteBuffer {
const result = new ArrayBufferWriteBuffer();
if (this.socket.connected) {
result.onCommit(buffer => {
this.socket.emit('message', buffer);
});
}
return result;
}

close(): void {
this.socket.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
********************************************************************************/
import { expect } from 'chai';
import { ArrayBufferReadBuffer, ArrrayBufferWriteBuffer } from './array-buffer-message-buffer';
import { ArrayBufferReadBuffer, ArrayBufferWriteBuffer } from './array-buffer-message-buffer';

describe('array message buffer tests', () => {
it('basic read write test', () => {
const buffer = new ArrayBuffer(1024);
const writer = new ArrrayBufferWriteBuffer(buffer);
const writer = new ArrayBufferWriteBuffer(buffer);

writer.writeByte(8);
writer.writeInt(10000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,21 @@
import { Emitter, Event } from '../event';
import { ReadBuffer, WriteBuffer } from './message-buffer';

export class ArrrayBufferWriteBuffer implements WriteBuffer {
/**
* Converts the given node {@link Buffer} to an {@link ArrayBuffer}. The node buffer implementation is backed by an `Uint8Array`
* so the conversion can be efficiently achieved by slicing the section that is represented by the `Buffer` from the underlying
* array buffer.
* @param buffer The buffer that should be converted.
* @returns an `ArrayBuffer`representation of the given buffer.
*/
export function toArrayBuffer(buffer: Buffer): ArrayBuffer {
if (buffer.byteOffset === 0 && buffer.byteLength === buffer.buffer.byteLength) {
return buffer.buffer;
}
return buffer.buffer.slice(buffer.byteOffset, buffer.byteOffset + buffer.byteLength);
}

export class ArrayBufferWriteBuffer implements WriteBuffer {
constructor(private buffer: ArrayBuffer = new ArrayBuffer(1024), private offset: number = 0) {
}

Expand Down Expand Up @@ -85,7 +99,8 @@ export class ArrrayBufferWriteBuffer implements WriteBuffer {
export class ArrayBufferReadBuffer implements ReadBuffer {
private offset: number = 0;

constructor(private readonly buffer: ArrayBuffer) {
constructor(private readonly buffer: ArrayBuffer, readPosition = 0) {
this.offset = readPosition;
}

private get msg(): DataView {
Expand All @@ -97,9 +112,13 @@ export class ArrayBufferReadBuffer implements ReadBuffer {
}

readInt(): number {
const result = this.msg.getInt32(this.offset);
this.offset += 4;
return result;
try {
const result = this.msg.getInt32(this.offset);
this.offset += 4;
return result;
} catch (err) {
throw err;
}
}

readString(): string {
Expand All @@ -121,4 +140,8 @@ export class ArrayBufferReadBuffer implements ReadBuffer {
this.offset += length;
return result;
}

copy(): ReadBuffer {
return new ArrayBufferReadBuffer(this.buffer, this.offset);
}
}
14 changes: 6 additions & 8 deletions packages/core/src/common/message-rpc/channel.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
********************************************************************************/
import { assert, expect, spy, use } from 'chai';
import * as spies from 'chai-spies';

import { ChannelMultiplexer, ChannelPipe } from './channel';
import { ReadBuffer } from './message-buffer';
import { ChannelMultiplexer, ChannelPipe, ReadBufferConstructor } from './channel';

use(spies);

Expand All @@ -42,15 +40,15 @@ describe('multiplexer test', () => {
assert.isNotNull(rightFirst);
assert.isNotNull(rightSecond);

const leftSecondSpy = spy((buf: ReadBuffer) => {
const message = buf.readString();
const leftSecondSpy = spy((buf: ReadBufferConstructor) => {
const message = buf().readString();
expect(message).equal('message for second');
});

leftSecond.onMessage(leftSecondSpy);

const rightFirstSpy = spy((buf: ReadBuffer) => {
const message = buf.readString();
const rightFirstSpy = spy((buf: ReadBufferConstructor) => {
const message = buf().readString();
expect(message).equal('message for first');
});

Expand All @@ -63,5 +61,5 @@ describe('multiplexer test', () => {
expect(rightFirstSpy).to.be.called();

expect(openChannelSpy).to.be.called.exactly(4);
})
});
});
Loading

0 comments on commit 2287c58

Please sign in to comment.