Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server-Sent Events support - closes #2830 #2863

Merged
merged 2 commits into from
Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
"@types/config": "0.0.38",
"@types/country-list": "^2.1.0",
"@types/event-to-promise": "^0.7.1",
"@types/eventsource": "^1.1.6",
"@types/express": "^4.17.11",
"@types/express-fileupload": "^1.1.6",
"@types/express-handlebars": "^3.1.0",
Expand Down Expand Up @@ -133,6 +134,7 @@
"eslint-config-prettier": "^8.1.0",
"eslint-plugin-html": "^6.1.1",
"event-to-promise": "^0.8.0",
"eventsource": "^1.1.0",
"highlight.js": "^10.6.0",
"html-webpack-plugin": "^5.2.0",
"image-minimizer-webpack-plugin": "^2.2.0",
Expand Down
119 changes: 112 additions & 7 deletions src/controllers/events_controller.ts
Original file line number Diff line number Diff line change
@@ -1,42 +1,147 @@
/**
* Events Controller.
*
* Manages the top level events queue for the gateway and things.
* Manages events endpoints for the gateway and things.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

import express from 'express';
import Things from '../models/things';
import Thing from '../models/thing';
import Events from '../models/events';
import Event from '../models/event';

function build(): express.Router {
const controller = express.Router({ mergeParams: true });

/**
* Handle getting a list of events.
* Handle getting events of all types.
*/
controller.get('/', (request, response) => {
if (request.params.thingId) {
response.status(200).json(Events.getByThing(request.params.thingId));
// Serve either an event stream or a log depending on requested content type
if (request.accepts('text/event-stream')) {
openEventStream(request, response);
} else {
response.status(200).json(Events.getGatewayEvents());
sendEventLog(request, response);
}
});

/**
* Handle getting a list of events.
* Handle getting events of a specific type.
*/
controller.get('/:eventName', (request, response) => {
// Serve either an event stream or a log depending on requested content type
if (request.accepts('text/event-stream')) {
openEventStream(request, response);
} else {
sendEventLog(request, response);
}
});

/**
* Open a Server-Sent Events event stream to push events to the client.
*
* @param {express.Request} request
* @param {express.Response} response
* @return {Promise}
*/
async function openEventStream(
request: express.Request,
response: express.Response
): Promise<void> {
const thingID = request.params.thingId;
const eventName = request.params.eventName;
let thing: Thing | undefined;

// Don't allow event streams for events not associated with a Thing
if (!thingID) {
response.status(406).send();
return;
}

// Check requested thing exists
try {
thing = await Things.getThing(thingID);
} catch (error: unknown) {
console.error(`Thing not found ${error}`);
response.status(404).send();
return;
}

// Check that requested event type (if any) exists
if (eventName && !thing.getEvents()[eventName]) {
response.status(404).send();
return;
}

// Keep the socket open
request.socket.setKeepAlive(true);
// Prevent Nagle's algorithm from trying to optimise throughput
request.socket.setNoDelay(true);
// Disable inactivity timeout on the socket
request.socket.setTimeout(0);

// Set event stream content type
response.setHeader('Content-Type', 'text/event-stream');
// Disable caching and compression
response.setHeader('Cache-Control', 'no-cache,no-transform');
// Tell client to keep the connection alive
response.setHeader('Connection', 'keep-alive');
// Set 200 OK response
response.status(200);
// Send headers to complete the connection, but don't end the response
response.flushHeaders();

/**
* Handle an event emitted by a Thing
*
* @param {Event} event
*/
function onEvent(event: Event): void {
// If subscribed to a particular event, filter others out
if (eventName && eventName != event.getName()) {
return;
}

// Generate an ID for the event
const eventId = Date.now();

// Push event to client via event stream
response.write(`id: ${eventId}\n`);
response.write(`event: ${event.getName()}\n`);
response.write(`data: ${JSON.stringify(event.getData())}\n\n`);
}

// Subscribe to events from the specified Thing
if (thing) {
benfrancis marked this conversation as resolved.
Show resolved Hide resolved
thing.addEventSubscription(onEvent);
}

// Unsubscribe from events if the connection is closed
response.on('close', function () {
if (thing) {
benfrancis marked this conversation as resolved.
Show resolved Hide resolved
thing.removeEventSubscription(onEvent);
}
});
}

/**
* Respond with a log of events.
*
* @param {express.Request} request
* @param {express.Response} response
*/
function sendEventLog(request: express.Request, response: express.Response): void {
const eventName = request.params.eventName;
if (request.params.thingId) {
response.status(200).json(Events.getByThing(request.params.thingId, eventName));
} else {
response.status(200).json(Events.getGatewayEvents(eventName));
}
});
}

return controller;
}
Expand Down
4 changes: 4 additions & 0 deletions src/models/thing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,10 @@ export default class Thing extends EventEmitter {
return this.properties;
}

getEvents(): Record<string, EventSchema> {
return this.events;
}

/**
* Set the visibility of a Thing on the floorplan.
*
Expand Down
5 changes: 3 additions & 2 deletions src/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,12 @@ class Router {
request.url = APP_PREFIX + request.url;
next();

// If request won't accept HTML but will accept JSON,
// or is a WebSocket request, or is multipart/form-data
// If request won't accept HTML but will accept JSON or an event stream,
// or is a WebSocket request, or is multipart/form-data,
// treat it as an API request
} else if (
(!request.accepts('html') && request.accepts('json')) ||
(!request.accepts('html') && request.accepts('text/event-stream')) ||
request.headers['content-type'] === 'application/json' ||
request.get('Upgrade') === 'websocket' ||
request.is('multipart/form-data') ||
Expand Down
80 changes: 79 additions & 1 deletion src/test/integration/things-test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { server, chai, mockAdapter } from '../common';
import { server, httpServer, chai, mockAdapter } from '../common';
import { TEST_USER, createUser, headerAuth } from '../user';
import e2p from 'event-to-promise';
import { webSocketOpen, webSocketRead, webSocketSend, webSocketClose } from '../websocket-util';
import WebSocket from 'ws';
import EventSource from 'eventsource';
import { AddressInfo } from 'net';
import * as Constants from '../../constants';
import Event from '../../models/event';
import Events from '../../models/events';
Expand Down Expand Up @@ -57,6 +59,18 @@ const VALIDATION_THING = {
},
};

const EVENT_THING = {
id: 'event-thing1',
title: 'Event Thing',
'@context': 'https://webthings.io/schemas',
events: {
overheated: {
type: 'number',
unit: 'degree celsius',
},
},
};

const piDescr = {
id: 'pi-1',
title: 'pi-1',
Expand Down Expand Up @@ -1021,6 +1035,70 @@ describe('things/', function () {
expect(res.body[0].a).toHaveProperty('timestamp');
});

it('should be able to subscribe to events using EventSource', async () => {
await addDevice(EVENT_THING);
if (!httpServer.address()) {
benfrancis marked this conversation as resolved.
Show resolved Hide resolved
httpServer.listen();
await e2p(httpServer, 'listening');
}
const addr = <AddressInfo>httpServer.address()!;

// Test event subscription
let eventSourceURL =
`http://127.0.0.1:${addr.port}${Constants.THINGS_PATH}/` +
`${EVENT_THING.id}/events/overheated?jwt=${jwt}`;
const eventSource = new EventSource(eventSourceURL) as EventTarget & EventSource;
await e2p(eventSource, 'open');
const overheatedEvent = new Event('overheated', 101, EVENT_THING.id);
const [, event] = await Promise.all([
Events.add(overheatedEvent),
e2p(eventSource, 'overheated'),
]);
expect(event.type).toEqual('overheated');
expect(JSON.parse(event.data)).toEqual(101);
eventSource.close();

// Test events subscription
benfrancis marked this conversation as resolved.
Show resolved Hide resolved
eventSourceURL =
`http://127.0.0.1:${addr.port}${Constants.THINGS_PATH}/` +
`${EVENT_THING.id}/events?jwt=${jwt}`;
const eventsSource = new EventSource(eventSourceURL) as EventTarget & EventSource;
await e2p(eventsSource, 'open');
const overheatedEvent2 = new Event('overheated', 101, EVENT_THING.id);
const [, event2] = await Promise.all([
Events.add(overheatedEvent2),
e2p(eventsSource, 'overheated'),
]);
expect(event2.type).toEqual('overheated');
expect(JSON.parse(event2.data)).toEqual(101);
eventsSource.close();

// Test non-existent thing errors
benfrancis marked this conversation as resolved.
Show resolved Hide resolved
eventSourceURL =
`http://127.0.0.1:${addr.port}${Constants.THINGS_PATH}` +
`/non-existent-thing/events/overheated?jwt=${jwt}`;
const thinglessEventSource = new EventSource(eventSourceURL) as EventTarget & EventSource;
thinglessEventSource.onerror = jest.fn();
thinglessEventSource.onopen = jest.fn();
await e2p(thinglessEventSource, 'error');
expect(thinglessEventSource.onopen).not.toBeCalled();
expect(thinglessEventSource.onerror).toBeCalled();

// Test non-existent event errors
benfrancis marked this conversation as resolved.
Show resolved Hide resolved
eventSourceURL =
`http://127.0.0.1:${addr.port}${Constants.THINGS_PATH}` +
`${EVENT_THING.id}/events/non-existentevent?jwt=${jwt}`;
const eventlessEventSource = new EventSource(eventSourceURL) as EventTarget & EventSource;
eventlessEventSource.onerror = jest.fn();
eventlessEventSource.onopen = jest.fn();
await e2p(eventlessEventSource, 'error');
expect(eventlessEventSource.onopen).not.toBeCalled();
expect(eventlessEventSource.onerror).toBeCalled();

// Clean up
httpServer.close();
benfrancis marked this conversation as resolved.
Show resolved Hide resolved
});

// eslint-disable-next-line @typescript-eslint/quotes
it("should receive thing's action status messages over websocket", async () => {
await addDevice();
Expand Down