From 37c5daa98117945640136e717ada085aeb7efee7 Mon Sep 17 00:00:00 2001 From: Ben Francis Date: Tue, 10 Aug 2021 18:58:30 +0100 Subject: [PATCH 1/2] Server-Sent Events support - closes #2830 --- package-lock.json | 40 +++++++++ package.json | 2 + src/controllers/events_controller.ts | 119 +++++++++++++++++++++++++-- src/models/thing.ts | 4 + src/router.ts | 5 +- src/test/integration/things-test.ts | 80 +++++++++++++++++- 6 files changed, 240 insertions(+), 10 deletions(-) diff --git a/package-lock.json b/package-lock.json index 89d67c8ba..01a4f0e94 100644 --- a/package-lock.json +++ b/package-lock.json @@ -3584,6 +3584,12 @@ "@types/node": "*" } }, + "@types/eventsource": { + "version": "1.1.6", + "resolved": "https://registry.npmjs.org/@types/eventsource/-/eventsource-1.1.6.tgz", + "integrity": "sha512-y4xcLJ+lcoZ6mN9ndSdKOWg24Nj5uQc4Z/NRdy3HbiGGt5hfH3RLwAXr6V+RzGzOljAk48a09n6iY4BMNumEng==", + "dev": true + }, "@types/express": { "version": "4.17.11", "resolved": "https://registry.npmjs.org/@types/express/-/express-4.17.11.tgz", @@ -8286,6 +8292,15 @@ "integrity": "sha512-mQw+2fkQbALzQ7V0MY0IqdnXNOeTtP4r0lN9z7AAawCXgqea7bDii20AYrIBrFd/Hx0M2Ocz6S111CaFkUcb0Q==", "dev": true }, + "eventsource": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/eventsource/-/eventsource-1.1.0.tgz", + "integrity": "sha512-VSJjT5oCNrFvCS6igjzPAt5hBzQ2qPBFIbJ03zLI9SE0mxwZpMw6BfJrbFHm1a141AavMEB8JHmBhWAd66PfCg==", + "dev": true, + "requires": { + "original": "^1.0.0" + } + }, "exec-buffer": { "version": "3.2.0", "resolved": "https://registry.npmjs.org/exec-buffer/-/exec-buffer-3.2.0.tgz", @@ -13989,6 +14004,15 @@ "logalot": "^2.0.0" } }, + "original": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/original/-/original-1.0.2.tgz", + "integrity": "sha512-hyBVl6iqqUOJ8FqRe+l/gS8H+kKYjrEndd5Pm1MfBtsEKA038HkkdbAl/72EAXGyonD/PFsvmVG+EvcIpliMBg==", + "dev": true, + "requires": { + "url-parse": "^1.4.3" + } + }, "os-filter-obj": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/os-filter-obj/-/os-filter-obj-2.0.0.tgz", @@ -14763,6 +14787,12 @@ "strict-uri-encode": "^1.0.0" } }, + "querystringify": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/querystringify/-/querystringify-2.2.0.tgz", + "integrity": "sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ==", + "dev": true + }, "quick-lru": { "version": "5.1.1", "resolved": "https://registry.npmjs.org/quick-lru/-/quick-lru-5.1.1.tgz", @@ -17959,6 +17989,16 @@ "integrity": "sha1-2pN/emLiH+wf0Y1Js1wpNQZ6bHI=", "dev": true }, + "url-parse": { + "version": "1.5.3", + "resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.5.3.tgz", + "integrity": "sha512-IIORyIQD9rvj0A4CLWsHkBBJuNqWpFQe224b6j9t/ABmquIS0qDU2pY6kl6AuOrL5OkCXHMCFNe1jBcuAggjvQ==", + "dev": true, + "requires": { + "querystringify": "^2.1.1", + "requires-port": "^1.0.0" + } + }, "url-parse-lax": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/url-parse-lax/-/url-parse-lax-1.0.0.tgz", diff --git a/package.json b/package.json index a8d1bb163..a43886353 100644 --- a/package.json +++ b/package.json @@ -87,6 +87,7 @@ "@types/config": "0.0.38", "@types/country-list": "^2.1.0", "@types/event-to-promise": "^0.7.1", + "@types/eventsource": "^1.1.6", "@types/express": "^4.17.11", "@types/express-fileupload": "^1.1.6", "@types/express-handlebars": "^3.1.0", @@ -133,6 +134,7 @@ "eslint-config-prettier": "^8.1.0", "eslint-plugin-html": "^6.1.1", "event-to-promise": "^0.8.0", + "eventsource": "^1.1.0", "highlight.js": "^10.6.0", "html-webpack-plugin": "^5.2.0", "image-minimizer-webpack-plugin": "^2.2.0", diff --git a/src/controllers/events_controller.ts b/src/controllers/events_controller.ts index 1af0dd9db..e3ffdc5ee 100644 --- a/src/controllers/events_controller.ts +++ b/src/controllers/events_controller.ts @@ -1,7 +1,7 @@ /** * Events Controller. * - * Manages the top level events queue for the gateway and things. + * Manages events endpoints for the gateway and things. * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this @@ -9,34 +9,139 @@ */ import express from 'express'; +import Things from '../models/things'; +import Thing from '../models/thing'; import Events from '../models/events'; +import Event from '../models/event'; function build(): express.Router { const controller = express.Router({ mergeParams: true }); /** - * Handle getting a list of events. + * Handle getting events of all types. */ controller.get('/', (request, response) => { - if (request.params.thingId) { - response.status(200).json(Events.getByThing(request.params.thingId)); + // Serve either an event stream or a log depending on requested content type + if (request.accepts('text/event-stream')) { + openEventStream(request, response); } else { - response.status(200).json(Events.getGatewayEvents()); + sendEventLog(request, response); } }); /** - * Handle getting a list of events. + * Handle getting events of a specific type. */ controller.get('/:eventName', (request, response) => { + // Serve either an event stream or a log depending on requested content type + if (request.accepts('text/event-stream')) { + openEventStream(request, response); + } else { + sendEventLog(request, response); + } + }); + + /** + * Open a Server-Sent Events event stream to push events to the client. + * + * @param {express.Request} request + * @param {express.Response} response + * @return {Promise} + */ + async function openEventStream( + request: express.Request, + response: express.Response + ): Promise { + const thingID = request.params.thingId; const eventName = request.params.eventName; + let thing: Thing | undefined; + + // Don't allow event streams for events not associated with a Thing + if (!thingID) { + response.status(406).send(); + return; + } + + // Check requested thing exists + try { + thing = await Things.getThing(thingID); + } catch (error: unknown) { + console.error(`Thing not found ${error}`); + response.status(404).send(); + return; + } + + // Check that requested event type (if any) exists + if (eventName && !thing.getEvents()[eventName]) { + response.status(404).send(); + return; + } + // Keep the socket open + request.socket.setKeepAlive(true); + // Prevent Nagle's algorithm from trying to optimise throughput + request.socket.setNoDelay(true); + // Disable inactivity timeout on the socket + request.socket.setTimeout(0); + + // Set event stream content type + response.setHeader('Content-Type', 'text/event-stream'); + // Disable caching and compression + response.setHeader('Cache-Control', 'no-cache,no-transform'); + // Tell client to keep the connection alive + response.setHeader('Connection', 'keep-alive'); + // Set 200 OK response + response.status(200); + // Send headers to complete the connection, but don't end the response + response.flushHeaders(); + + /** + * Handle an event emitted by a Thing + * + * @param {Event} event + */ + function onEvent(event: Event): void { + // If subscribed to a particular event, filter others out + if (eventName && eventName != event.getName()) { + return; + } + + // Generate an ID for the event + const eventId = Date.now(); + + // Push event to client via event stream + response.write(`id: ${eventId}\n`); + response.write(`event: ${event.getName()}\n`); + response.write(`data: ${JSON.stringify(event.getData())}\n\n`); + } + + // Subscribe to events from the specified Thing + if (thing) { + thing.addEventSubscription(onEvent); + } + + // Unsubscribe from events if the connection is closed + response.on('close', function () { + if (thing) { + thing.removeEventSubscription(onEvent); + } + }); + } + + /** + * Respond with a log of events. + * + * @param {express.Request} request + * @param {express.Response} response + */ + function sendEventLog(request: express.Request, response: express.Response): void { + const eventName = request.params.eventName; if (request.params.thingId) { response.status(200).json(Events.getByThing(request.params.thingId, eventName)); } else { response.status(200).json(Events.getGatewayEvents(eventName)); } - }); + } return controller; } diff --git a/src/models/thing.ts b/src/models/thing.ts index 15eeb2929..288cc23dc 100644 --- a/src/models/thing.ts +++ b/src/models/thing.ts @@ -324,6 +324,10 @@ export default class Thing extends EventEmitter { return this.properties; } + getEvents(): Record { + return this.events; + } + /** * Set the visibility of a Thing on the floorplan. * diff --git a/src/router.ts b/src/router.ts index 5d3d96aba..5c640dff7 100644 --- a/src/router.ts +++ b/src/router.ts @@ -113,11 +113,12 @@ class Router { request.url = APP_PREFIX + request.url; next(); - // If request won't accept HTML but will accept JSON, - // or is a WebSocket request, or is multipart/form-data + // If request won't accept HTML but will accept JSON or an event stream, + // or is a WebSocket request, or is multipart/form-data, // treat it as an API request } else if ( (!request.accepts('html') && request.accepts('json')) || + (!request.accepts('html') && request.accepts('text/event-stream')) || request.headers['content-type'] === 'application/json' || request.get('Upgrade') === 'websocket' || request.is('multipart/form-data') || diff --git a/src/test/integration/things-test.ts b/src/test/integration/things-test.ts index 2b670fa93..150b75d1e 100644 --- a/src/test/integration/things-test.ts +++ b/src/test/integration/things-test.ts @@ -1,8 +1,10 @@ -import { server, chai, mockAdapter } from '../common'; +import { server, httpServer, chai, mockAdapter } from '../common'; import { TEST_USER, createUser, headerAuth } from '../user'; import e2p from 'event-to-promise'; import { webSocketOpen, webSocketRead, webSocketSend, webSocketClose } from '../websocket-util'; import WebSocket from 'ws'; +import EventSource from 'eventsource'; +import { AddressInfo } from 'net'; import * as Constants from '../../constants'; import Event from '../../models/event'; import Events from '../../models/events'; @@ -57,6 +59,18 @@ const VALIDATION_THING = { }, }; +const EVENT_THING = { + id: 'event-thing1', + title: 'Event Thing', + '@context': 'https://webthings.io/schemas', + events: { + overheated: { + type: 'number', + unit: 'degree celsius', + }, + }, +}; + const piDescr = { id: 'pi-1', title: 'pi-1', @@ -1021,6 +1035,70 @@ describe('things/', function () { expect(res.body[0].a).toHaveProperty('timestamp'); }); + it('should be able to subscribe to events using EventSource', async () => { + await addDevice(EVENT_THING); + if (!httpServer.address()) { + httpServer.listen(); + await e2p(httpServer, 'listening'); + } + const addr = httpServer.address()!; + + // Test event subscription + let eventSourceURL = + `http://127.0.0.1:${addr.port}${Constants.THINGS_PATH}/` + + `${EVENT_THING.id}/events/overheated?jwt=${jwt}`; + const eventSource = new EventSource(eventSourceURL) as EventTarget & EventSource; + await e2p(eventSource, 'open'); + const overheatedEvent = new Event('overheated', 101, EVENT_THING.id); + const [, event] = await Promise.all([ + Events.add(overheatedEvent), + e2p(eventSource, 'overheated'), + ]); + expect(event.type).toEqual('overheated'); + expect(JSON.parse(event.data)).toEqual(101); + eventSource.close(); + + // Test events subscription + eventSourceURL = + `http://127.0.0.1:${addr.port}${Constants.THINGS_PATH}/` + + `${EVENT_THING.id}/events?jwt=${jwt}`; + const eventsSource = new EventSource(eventSourceURL) as EventTarget & EventSource; + await e2p(eventsSource, 'open'); + const overheatedEvent2 = new Event('overheated', 101, EVENT_THING.id); + const [, event2] = await Promise.all([ + Events.add(overheatedEvent2), + e2p(eventsSource, 'overheated'), + ]); + expect(event2.type).toEqual('overheated'); + expect(JSON.parse(event2.data)).toEqual(101); + eventsSource.close(); + + // Test non-existent thing errors + eventSourceURL = + `http://127.0.0.1:${addr.port}${Constants.THINGS_PATH}` + + `/non-existent-thing/events/overheated?jwt=${jwt}`; + const thinglessEventSource = new EventSource(eventSourceURL) as EventTarget & EventSource; + thinglessEventSource.onerror = jest.fn(); + thinglessEventSource.onopen = jest.fn(); + await e2p(thinglessEventSource, 'error'); + expect(thinglessEventSource.onopen).not.toBeCalled(); + expect(thinglessEventSource.onerror).toBeCalled(); + + // Test non-existent event errors + eventSourceURL = + `http://127.0.0.1:${addr.port}${Constants.THINGS_PATH}` + + `${EVENT_THING.id}/events/non-existentevent?jwt=${jwt}`; + const eventlessEventSource = new EventSource(eventSourceURL) as EventTarget & EventSource; + eventlessEventSource.onerror = jest.fn(); + eventlessEventSource.onopen = jest.fn(); + await e2p(eventlessEventSource, 'error'); + expect(eventlessEventSource.onopen).not.toBeCalled(); + expect(eventlessEventSource.onerror).toBeCalled(); + + // Clean up + httpServer.close(); + }); + // eslint-disable-next-line @typescript-eslint/quotes it("should receive thing's action status messages over websocket", async () => { await addDevice(); From a52a63b7bfabd73b55e67486d3f3a1fff3e4189a Mon Sep 17 00:00:00 2001 From: Ben Francis Date: Mon, 23 Aug 2021 13:06:06 +0100 Subject: [PATCH 2/2] Address review comments --- src/controllers/events_controller.ts | 8 ++--- src/test/integration/things-test.ts | 46 +++++++++++++++------------- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/src/controllers/events_controller.ts b/src/controllers/events_controller.ts index e3ffdc5ee..97744bc72 100644 --- a/src/controllers/events_controller.ts +++ b/src/controllers/events_controller.ts @@ -116,15 +116,11 @@ function build(): express.Router { } // Subscribe to events from the specified Thing - if (thing) { - thing.addEventSubscription(onEvent); - } + thing.addEventSubscription(onEvent); // Unsubscribe from events if the connection is closed response.on('close', function () { - if (thing) { - thing.removeEventSubscription(onEvent); - } + thing!.removeEventSubscription(onEvent); }); } diff --git a/src/test/integration/things-test.ts b/src/test/integration/things-test.ts index 150b75d1e..c5a52a97e 100644 --- a/src/test/integration/things-test.ts +++ b/src/test/integration/things-test.ts @@ -1035,17 +1035,12 @@ describe('things/', function () { expect(res.body[0].a).toHaveProperty('timestamp'); }); - it('should be able to subscribe to events using EventSource', async () => { + it('should be able to subscribe to an event using EventSource', async () => { await addDevice(EVENT_THING); - if (!httpServer.address()) { - httpServer.listen(); - await e2p(httpServer, 'listening'); - } - const addr = httpServer.address()!; + const address = httpServer.address(); - // Test event subscription - let eventSourceURL = - `http://127.0.0.1:${addr.port}${Constants.THINGS_PATH}/` + + const eventSourceURL = + `http://127.0.0.1:${address.port}${Constants.THINGS_PATH}/` + `${EVENT_THING.id}/events/overheated?jwt=${jwt}`; const eventSource = new EventSource(eventSourceURL) as EventTarget & EventSource; await e2p(eventSource, 'open'); @@ -1057,10 +1052,14 @@ describe('things/', function () { expect(event.type).toEqual('overheated'); expect(JSON.parse(event.data)).toEqual(101); eventSource.close(); + }); + + it('should be able to subscribe to all events on a thing using EventSource', async () => { + await addDevice(EVENT_THING); + const address = httpServer.address(); - // Test events subscription - eventSourceURL = - `http://127.0.0.1:${addr.port}${Constants.THINGS_PATH}/` + + const eventSourceURL = + `http://127.0.0.1:${address.port}${Constants.THINGS_PATH}/` + `${EVENT_THING.id}/events?jwt=${jwt}`; const eventsSource = new EventSource(eventSourceURL) as EventTarget & EventSource; await e2p(eventsSource, 'open'); @@ -1072,10 +1071,14 @@ describe('things/', function () { expect(event2.type).toEqual('overheated'); expect(JSON.parse(event2.data)).toEqual(101); eventsSource.close(); + }); - // Test non-existent thing errors - eventSourceURL = - `http://127.0.0.1:${addr.port}${Constants.THINGS_PATH}` + + it('should not be able to subscribe events on a thing that doesnt exist', async () => { + await addDevice(EVENT_THING); + const address = httpServer.address(); + + const eventSourceURL = + `http://127.0.0.1:${address.port}${Constants.THINGS_PATH}` + `/non-existent-thing/events/overheated?jwt=${jwt}`; const thinglessEventSource = new EventSource(eventSourceURL) as EventTarget & EventSource; thinglessEventSource.onerror = jest.fn(); @@ -1083,10 +1086,14 @@ describe('things/', function () { await e2p(thinglessEventSource, 'error'); expect(thinglessEventSource.onopen).not.toBeCalled(); expect(thinglessEventSource.onerror).toBeCalled(); + }); - // Test non-existent event errors - eventSourceURL = - `http://127.0.0.1:${addr.port}${Constants.THINGS_PATH}` + + it('should not be able to subscribe to an event that doesnt exist', async () => { + await addDevice(EVENT_THING); + const address = httpServer.address(); + + const eventSourceURL = + `http://127.0.0.1:${address.port}${Constants.THINGS_PATH}` + `${EVENT_THING.id}/events/non-existentevent?jwt=${jwt}`; const eventlessEventSource = new EventSource(eventSourceURL) as EventTarget & EventSource; eventlessEventSource.onerror = jest.fn(); @@ -1094,9 +1101,6 @@ describe('things/', function () { await e2p(eventlessEventSource, 'error'); expect(eventlessEventSource.onopen).not.toBeCalled(); expect(eventlessEventSource.onerror).toBeCalled(); - - // Clean up - httpServer.close(); }); // eslint-disable-next-line @typescript-eslint/quotes