Skip to content

Commit

Permalink
[Streaming] create NodeWebSocketFactory, refactor code, new tests (#1331
Browse files Browse the repository at this point in the history
)

* create NodeWebSocketFactory, refactor code, new tests

* Set connected to true after connecting.
  • Loading branch information
stevengum authored Oct 19, 2019
1 parent 0efce8d commit cf497da
Show file tree
Hide file tree
Showing 14 changed files with 424 additions and 177 deletions.
42 changes: 35 additions & 7 deletions libraries/botbuilder/src/botFrameworkAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,19 @@

import { Activity, ActivityTypes, BotAdapter, BotCallbackHandlerKey, ChannelAccount, ConversationAccount, ConversationParameters, ConversationReference, ConversationsResult, IUserTokenProvider, ResourceResponse, TokenResponse, TurnContext } from 'botbuilder-core';
import { AuthenticationConstants, ChannelValidation, ConnectorClient, EmulatorApiClient, GovernmentConstants, GovernmentChannelValidation, JwtTokenValidation, MicrosoftAppCredentials, SimpleCredentialProvider, TokenApiClient, TokenStatus, TokenApiModels } from 'botframework-connector';
import { IncomingMessage } from 'http';
import * as os from 'os';
import { TokenResolver } from './tokenResolver';
import { IStreamingTransportServer, IReceiveRequest, StreamingResponse, NamedPipeServer, ISocket, WebSocketServer, NodeWebSocket } from 'botframework-streaming';
import { Watershed } from 'watershed';
import {
IReceiveRequest,
ISocket,
IStreamingTransportServer,
NamedPipeServer,
NodeWebSocketFactory,
NodeWebSocketFactoryBase,
StreamingResponse,
WebSocketServer,
} from 'botframework-streaming';
import { StreamingHttpClient } from './streamingHttpClient';

export enum StatusCodes {
Expand Down Expand Up @@ -137,6 +146,11 @@ export interface BotFrameworkAdapterSettings {
* Optional. The option to determine if this adapter accepts WebSocket connections
*/
enableWebSockets?: boolean;

/**
* Optional. Used to pass in a NodeWebSocketFactoryBase instance. Allows bot to accept WebSocket connections.
*/
webSocketFactory?: NodeWebSocketFactoryBase;
}

/**
Expand Down Expand Up @@ -218,8 +232,8 @@ export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvide

private logic: (context: TurnContext) => Promise<void>;
private streamingServer: IStreamingTransportServer;
private isEmulatingOAuthCards: boolean;

private isEmulatingOAuthCards: boolean;
private webSocketFactory: NodeWebSocketFactoryBase;

/**
* Creates a new instance of the [BotFrameworkAdapter](xref:botbuilder.BotFrameworkAdapter) class.
Expand All @@ -244,6 +258,16 @@ export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvide
this.credentialsProvider = new SimpleCredentialProvider(this.credentials.appId, this.credentials.appPassword);
this.isEmulatingOAuthCards = false;

// If the developer wants to use WebSockets, but didn't provide a WebSocketFactory,
// create a NodeWebSocketFactory.
if (this.settings.enableWebSockets && !this.settings.webSocketFactory) {
this.webSocketFactory = new NodeWebSocketFactory();
}

if (this.settings.webSocketFactory) {
this.webSocketFactory = this.settings.webSocketFactory;
}

// If no channelService or openIdMetadata values were passed in the settings, check the process' Environment Variables for values.
// These values may be set when a bot is provisioned on Azure and if so are required for the bot to properly work in Public Azure or a National Cloud.
this.settings.channelService = this.settings.channelService || process.env[AuthenticationConstants.ChannelService];
Expand Down Expand Up @@ -1174,8 +1198,13 @@ export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvide
throw new Error('Streaming logic needs to be provided to `useWebSocket`');
}

if (!this.webSocketFactory || !this.webSocketFactory.createWebSocket) {
throw new Error('BotFrameworkAdapter must have a WebSocketFactory in order to support streaming.');
}

this.logic = logic;

// Restify-specific check.
if (typeof((res as any).claimUpgrade) !== 'function') {
throw new Error("ClaimUpgrade is required for creating WebSocket connection.");
}
Expand All @@ -1187,10 +1216,9 @@ export class BotFrameworkAdapter extends BotAdapter implements IUserTokenProvide
}

const upgrade = (res as any).claimUpgrade();
const ws = new Watershed();
const socket = ws.accept(req, upgrade.socket, upgrade.head);
const socket = this.webSocketFactory.createWebSocket(req as IncomingMessage, upgrade.socket, upgrade.head);

await this.startWebSocket(new NodeWebSocket(socket));
await this.startWebSocket(socket);
}

/**
Expand Down
35 changes: 25 additions & 10 deletions libraries/botbuilder/tests/botFrameworkStreamingAdapter.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class TestAdapterSettings {
constructor(appId = undefined, appPassword = undefined, channelAuthTenant, oAuthEndpoint, openIdMetadata, channelServce) {
this.appId = appId;
this.appPassword = appPassword;
this.enableWebSockets = true;
}
}

Expand Down Expand Up @@ -131,21 +132,35 @@ describe('BotFrameworkStreamingAdapter tests', () => {
expect(handler.streamingServer.disconnect()).to.not.throw;
});

it('starts and stops a websocket server', () => {
let bot = new ActivityHandler();
let handler = new BotFrameworkAdapter();
let request = new TestRequest();
let response = new TestResponse({ claimUpgrade: 'anything' });
it('starts and stops a websocket server', async () => {
const bot = new ActivityHandler();
const handler = new BotFrameworkAdapter(new TestAdapterSettings());
const request = new TestRequest();
request.setIsUpgradeRequest(true);
request.headers = [];
request.headers['upgrade'] = 'websocket';
request.headers['sec-websocket-key'] = 'BFlat';
request.headers['sec-websocket-version'] = '13';
request.headers['sec-websocket-protocol'] = '';

expect(handler.useWebSocket(request, response, async (context) => {
const response = new TestResponse({ claimUpgrade: 'anything' });
const fakeSocket = {
unshift: function () { return true; },
write: function (value) { },
on: function (value) { },
read: function () { return new Buffer.from('data', 'utf8'); },
end: function () { return; },
};
response.setClaimUpgrade({ socket: fakeSocket, head: 'websocket' });
await handler.useWebSocket(request, response, async (context) => {
// Route to bot
await bot.run(context);
})).to.not.throw;
});
});

it('returns a connector client', async () => {
let bot = new ActivityHandler();
let handler = new BotFrameworkAdapter();
let handler = new BotFrameworkAdapter(new TestAdapterSettings());
let request = new TestRequest();
request.setIsUpgradeRequest(true);
request.headers = [];
Expand Down Expand Up @@ -175,7 +190,7 @@ describe('BotFrameworkStreamingAdapter tests', () => {
describe('useWebSocket()', () => {
it('connects', async () => {
let bot = new ActivityHandler();
let handler = new BotFrameworkAdapter();
let handler = new BotFrameworkAdapter(new TestAdapterSettings());
let request = new TestRequest();
request.setIsUpgradeRequest(true);
request.headers = [];
Expand Down Expand Up @@ -424,7 +439,7 @@ describe('BotFrameworkStreamingAdapter tests', () => {

it('sends a request', async () => {
let bot = new ActivityHandler();
let handler = new BotFrameworkAdapter();
let handler = new BotFrameworkAdapter(new TestAdapterSettings());
let request = new TestRequest();
request.setIsUpgradeRequest(true);
request.headers = [];
Expand Down
11 changes: 9 additions & 2 deletions libraries/botframework-streaming/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,17 @@
*/
export { ContentStream } from './contentStream';
export { HttpContent } from './httpContentStream';
export { IStreamingTransportServer, IStreamingTransportClient, ISocket, IReceiveRequest, IReceiveResponse } from './Interfaces'
export { IStreamingTransportServer, IStreamingTransportClient, ISocket, IReceiveRequest, IReceiveResponse } from './interfaces';
export { NamedPipeClient, NamedPipeServer } from './namedPipe';
export { RequestHandler } from './requestHandler';
export { StreamingRequest } from './streamingRequest';
export { StreamingResponse } from './streamingResponse';
export { SubscribableStream } from './subscribableStream';
export { BrowserWebSocket, NodeWebSocket, WebSocketClient, WebSocketServer } from './webSocket';
export {
BrowserWebSocket,
NodeWebSocket,
NodeWebSocketFactory,
NodeWebSocketFactoryBase,
WebSocketClient,
WebSocketServer,
} from './webSocket';
10 changes: 10 additions & 0 deletions libraries/botframework-streaming/src/webSocket/factories/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/**
* @module botframework-streaming
*/
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/

export * from './nodeWebSocketFactory';
export * from './nodeWebSocketFactoryBase';
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* @module botframework-streaming
*/
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/

import { IncomingMessage } from 'http';
import { Socket } from 'net';

import { NodeWebSocket } from '../nodeWebSocket';
import { NodeWebSocketFactoryBase } from './nodeWebSocketFactoryBase';

export class NodeWebSocketFactory extends NodeWebSocketFactoryBase {
constructor() {
super();
}

/**
* Creates a NodeWebSocket instance.
* @param req
* @param socket
* @param head
*/
public createWebSocket(req: IncomingMessage, socket: Socket, head: Buffer): NodeWebSocket {
const s = new NodeWebSocket();
s.create(req, socket, head);

return s;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/**
* @module botframework-streaming
*/
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/

import { IncomingMessage } from 'http';
import { Socket } from 'net';
import { ISocket } from '../../interfaces';

export abstract class NodeWebSocketFactoryBase {
public abstract createWebSocket(req: IncomingMessage, socket: Socket, head: Buffer): ISocket;
}
1 change: 1 addition & 0 deletions libraries/botframework-streaming/src/webSocket/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/

export * from './browserWebSocket';
export * from './factories';
export * from '../interfaces/ISocket';
export * from './nodeWebSocket';
export * from './webSocketClient';
Expand Down
30 changes: 22 additions & 8 deletions libraries/botframework-streaming/src/webSocket/nodeWebSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/
import * as http from 'http';
import * as WaterShed from 'watershed';

import { IncomingMessage, request } from 'http';
import { Socket } from 'net';
import { Watershed } from 'watershed';
import { ISocket } from '../interfaces/ISocket';

const SHED = new Watershed();

export class NodeWebSocket implements ISocket {
private readonly waterShedSocket: any;
private waterShedSocket: any;
private connected: boolean;

/**
Expand All @@ -23,6 +27,17 @@ export class NodeWebSocket implements ISocket {
this.connected = !!waterShedSocket;
}

/**
* Create and set a WaterShed WebSocket with an HTTP Request, Socket and Buffer.
* @param req IncomingMessage
* @param socket Socket
* @param head Buffer
*/
public create(req: IncomingMessage, socket: Socket, head: Buffer): void {
this.waterShedSocket = SHED.accept(req, socket, head);
this.connected = true;
}

/**
* True if the socket is currently connected.
*/
Expand All @@ -47,9 +62,8 @@ export class NodeWebSocket implements ISocket {
*/
public async connect(serverAddress, port = 8082): Promise<void> {
// Following template from https://github.com/joyent/node-watershed#readme
let shed = new WaterShed.Watershed();
let wskey = shed.generateKey();
let options = {
const wskey = SHED.generateKey();
const options = {
port: port,
hostname: serverAddress,
headers: {
Expand All @@ -58,10 +72,10 @@ export class NodeWebSocket implements ISocket {
'Sec-WebSocket-Version': '13'
}
};
let req = http.request(options);
const req = request(options);
req.end();
req.on('upgrade', function(res, socket, head): void {
shed.connect(res, socket, head, wskey);
SHED.connect(res, socket, head, wskey);
});

this.connected = true;
Expand Down
77 changes: 77 additions & 0 deletions libraries/botframework-streaming/tests/NodeWebSocket.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
const { NodeWebSocket } = require('../');
const { expect } = require('chai');
const { FauxSock, TestRequest } = require('./helpers');

describe('NodeSocket', () => {
it('creates a new NodeSocket', () => {
const ns = new NodeWebSocket(new FauxSock);
expect(ns).to.be.instanceOf(NodeWebSocket);
expect(ns.close()).to.not.be.undefined;
});

it('requires a valid URL', () => {
try {
const ns = new NodeWebSocket(new FauxSock);
} catch (error) {
expect(error.message).to.equal('Invalid URL: fakeURL');
}
});

it('starts out connected', () => {
const ns = new NodeWebSocket(new FauxSock);
expect(ns.isConnected()).to.be.true;
});

it('writes to the socket', () => {
const ns = new NodeWebSocket(new FauxSock);
const buff = Buffer.from('hello');
expect(ns.write(buff)).to.not.throw;
});

it('attempts to open a connection', () => {
const ns = new NodeWebSocket(new FauxSock);
expect(ns.connect().catch((error) => {
expect(error.message).to.equal('connect ECONNREFUSED 127.0.0.1:8082');
}));
});

it('can set message handlers on the socket', () => {
const sock = new FauxSock();
const ns = new NodeWebSocket(sock);
expect(sock.textHandler).to.be.undefined;
expect(sock.binaryHandler).to.be.undefined;
expect(ns.setOnMessageHandler(() => { })).to.not.throw;
expect(sock.textHandler).to.not.be.undefined;
expect(sock.binaryHandler).to.not.be.undefined;
});

it('can set error handler on the socket', () => {
const sock = new FauxSock();
const ns = new NodeWebSocket(sock);
expect(sock.errorHandler).to.be.undefined;
expect(ns.setOnErrorHandler(() => { })).to.not.throw;
expect(sock.errorHandler).to.not.be.undefined;
});

it('can set end handler on the socket', () => {
const sock = new FauxSock();
const ns = new NodeWebSocket(sock);
expect(sock.endHandler).to.be.undefined;
expect(ns.setOnCloseHandler(() => { })).to.not.throw;
expect(sock.endHandler).to.not.be.undefined;
});

it('create() should be successful and set a WebSocket', () => {
const sock = new FauxSock();
const nodeSocket = new NodeWebSocket();
const request = new TestRequest();
request.setIsUpgradeRequest(true);
request.headers = [];
request.headers['upgrade'] = 'websocket';
request.headers['sec-websocket-key'] = 'BFlat';
request.headers['sec-websocket-version'] = '13';
request.headers['sec-websocket-protocol'] = '';
nodeSocket.create(request, sock, Buffer.from([]));
nodeSocket.waterShedSocket.destroy();
});
});
Loading

0 comments on commit cf497da

Please sign in to comment.