From e16b8ebf9074512952d35f7eb0fd9a80b8cf4ff3 Mon Sep 17 00:00:00 2001 From: Pavel Jankoski Date: Tue, 21 Nov 2023 15:13:23 +0100 Subject: [PATCH 1/2] console: Rollback old http stream --- .../containers/device-importer/index.js | 4 +- sdk/js/src/api/http.js | 9 +- sdk/js/src/api/index.js | 3 +- sdk/js/src/api/index.test.js | 20 +++ .../src/api/stream/subscribeToHttpStream.js | 145 ++++++++++++++++++ ...tream.js => subscribeToWebSocketStream.js} | 41 ++--- sdk/js/src/service/applications.js | 4 +- sdk/js/src/service/devices/index.js | 6 +- sdk/js/src/service/gateways.js | 4 +- sdk/js/src/service/organizations.js | 4 +- 10 files changed, 208 insertions(+), 32 deletions(-) create mode 100644 sdk/js/src/api/stream/subscribeToHttpStream.js rename sdk/js/src/api/stream/{subscribeToStream.js => subscribeToWebSocketStream.js} (82%) diff --git a/pkg/webui/console/containers/device-importer/index.js b/pkg/webui/console/containers/device-importer/index.js index 85a94377c0..b6b24bad15 100644 --- a/pkg/webui/console/containers/device-importer/index.js +++ b/pkg/webui/console/containers/device-importer/index.js @@ -155,7 +155,7 @@ const DeviceImporter = () => { devices = await new Promise((resolve, reject) => { const chunks = [] - templateStream.on('chunk', message => { + templateStream.on('message', message => { appendToLog(message) chunks.push(message) }) @@ -252,7 +252,7 @@ const DeviceImporter = () => { createStream.current = tts.Applications.Devices.bulkCreate(appId, devices) await new Promise(resolve => { - createStream.current.on('chunk', handleCreationSuccess) + createStream.current.on('message', handleCreationSuccess) createStream.current.on('error', handleCreationError) createStream.current.on('close', resolve) diff --git a/sdk/js/src/api/http.js b/sdk/js/src/api/http.js index bfde30408c..67dc7fbd88 100644 --- a/sdk/js/src/api/http.js +++ b/sdk/js/src/api/http.js @@ -24,6 +24,8 @@ import { RATE_LIMIT_RETRIES, } from '../util/constants' +import subscribeToHttpStream from './stream/subscribeToHttpStream' + /** * Http Class is a connector for the API that uses the HTTP bridge to connect. */ @@ -85,7 +87,7 @@ class Http { } } - async handleRequest(method, endpoint, component, payload = {}) { + async handleRequest(method, endpoint, component, payload = {}, isStream) { const parsedComponent = component || this._parseStackComponent(endpoint) if (!this._stackConfig.isComponentAvailable(parsedComponent)) { // If the component has not been defined in The Things Stack config, make no @@ -96,6 +98,11 @@ class Http { } try { + if (isStream) { + const url = this._stackConfig.getComponentUrlByName(parsedComponent) + endpoint + return subscribeToHttpStream(payload, url) + } + const config = { method, url: endpoint, diff --git a/sdk/js/src/api/index.js b/sdk/js/src/api/index.js index 853ab988c6..aefcfa1dfb 100644 --- a/sdk/js/src/api/index.js +++ b/sdk/js/src/api/index.js @@ -62,12 +62,13 @@ Signature tried: ${paramSignature}`) } let route = endpoint.pattern + const isStream = Boolean(endpoint.stream) for (const parameter of endpoint.parameters) { route = route.replace(`{${parameter}}`, routeParams[parameter]) } - return connector.handleRequest(endpoint.method, route, component, payload) + return connector.handleRequest(endpoint.method, route, component, payload, isStream) } this[serviceName][`${rpcName}AllowedFieldMaskPaths`] = rpc.allowedFieldMaskPaths diff --git a/sdk/js/src/api/index.test.js b/sdk/js/src/api/index.test.js index bc3ecf366b..084699352c 100644 --- a/sdk/js/src/api/index.test.js +++ b/sdk/js/src/api/index.test.js @@ -92,6 +92,7 @@ describe('API class', () => { '/users/test/applications', undefined, { name: 'test-name' }, + false, ) }) @@ -103,8 +104,27 @@ describe('API class', () => { it('respects the search query', () => { api.ApplicationRegistry.List(undefined, { limit: 2, page: 1 }) + expect(api._connector.handleRequest).toHaveBeenCalledTimes(1) + expect(api._connector.handleRequest).toHaveBeenCalledWith( + 'get', + '/applications', + undefined, + { limit: 2, page: 1 }, + false, + ) + }) + + it('sets stream value to true for streaming endpoints', () => { + api.ApplicationRegistry.Events() expect(api._connector.handleRequest).toHaveBeenCalledTimes(1) + expect(api._connector.handleRequest).toHaveBeenCalledWith( + 'get', + '/events', + undefined, + undefined, + true, + ) expect(api._connector.handleRequest).toHaveBeenCalledWith('get', '/applications', undefined, { limit: 2, page: 1, diff --git a/sdk/js/src/api/stream/subscribeToHttpStream.js b/sdk/js/src/api/stream/subscribeToHttpStream.js new file mode 100644 index 0000000000..b57dd13eee --- /dev/null +++ b/sdk/js/src/api/stream/subscribeToHttpStream.js @@ -0,0 +1,145 @@ +// Copyright © 2019 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import ArrayBufferToString from 'arraybuffer-to-string' + +import Token from '../../util/token' + +import { notify, EVENTS } from './shared' +import 'web-streams-polyfill/dist/polyfill' + +/** + * Opens a new stream. + * + * @async + * @param {object} payload - - The body of the initial request. + * @param {string} url - The stream endpoint. + * + * @example + * (async () => { + * const stream = await stream( + * { identifiers: [{ application_ids: { application_id: 'my-app' }}]}, + * '/api/v3/events', + * ) + * + * // Add listeners to the stream. + * stream + * .on('start', () => console.log('conn opened')) + * .on('message', message => console.log('received message', message)) + * .on('error', error => console.log(error)) + * .on('close', wasClientRequest => console.log(wasClientRequest ? 'conn closed by client' : 'conn closed by server')) + * + * // Start the stream after attaching listerners. + * stream.open() + * + * // Close the stream after 20 s. + * setTimeout(() => stream.close(), 20000) + * })() + * + * @returns {object} The stream subscription object with the `on` function for + * attaching listeners and the `close` function to close the stream. + */ +export default async (payload, url) => { + const initialListeners = Object.values(EVENTS).reduce( + (acc, curr) => ({ ...acc, [curr]: null }), + {}, + ) + let listeners = initialListeners + let closeRequested = false + const token = new Token().get() + + let Authorization = null + if (typeof token === 'function') { + Authorization = `Bearer ${(await token()).access_token}` + } else { + Authorization = `Bearer ${token}` + } + + const abortController = new AbortController() + const response = await fetch(url, { + body: JSON.stringify(payload), + method: 'POST', + signal: abortController.signal, + headers: { + Authorization, + Accept: 'text/event-stream', + }, + }) + + if (response.status !== 200) { + const err = await response.json() + + throw 'error' in err ? err.error : err + } + + let buffer = '' + const reader = response.body.getReader() + const onMessage = ({ done, value }) => { + if (done) { + notify(listeners[EVENTS.CLOSE], closeRequested) + listeners = initialListeners + return + } + + const parsed = ArrayBufferToString(value) + buffer += parsed + const lines = buffer.split(/\n\n/) + buffer = lines.pop() + for (const line of lines) { + notify(listeners[EVENTS.MESSAGE], JSON.parse(line).result) + } + + return reader.read().then(onMessage) + } + + return { + open: () => { + reader + .read() + .then(data => { + notify(listeners[EVENTS.START]) + + return data + }) + .then(onMessage) + .catch(error => { + notify(listeners[EVENTS.ERROR], error) + listeners = initialListeners + }) + }, + on(eventName, callback) { + if (listeners[eventName] === undefined) { + throw new Error( + `${eventName} event is not supported. Should be one of: start, error, message or close`, + ) + } + + listeners[eventName] = callback + + return this + }, + close: () => { + closeRequested = true + + reader + .cancel() + .then(() => { + abortController.abort() + }) + .catch(error => { + notify(listeners[EVENTS.ERROR], error) + }) + }, + } +} diff --git a/sdk/js/src/api/stream/subscribeToStream.js b/sdk/js/src/api/stream/subscribeToWebSocketStream.js similarity index 82% rename from sdk/js/src/api/stream/subscribeToStream.js rename to sdk/js/src/api/stream/subscribeToWebSocketStream.js index bbc2610ffc..6587b42a49 100644 --- a/sdk/js/src/api/stream/subscribeToStream.js +++ b/sdk/js/src/api/stream/subscribeToWebSocketStream.js @@ -27,12 +27,14 @@ const initialListeners = Object.values(EVENTS).reduce((acc, curr) => ({ ...acc, * @async * @param {object} payload - - The body of the initial request. * @param {string} baseUrl - The stream baseUrl. + * @param {string} endpoint - The stream endpoint. * * @example * (async () => { * const stream = await stream( * { identifiers: [{ application_ids: { application_id: 'my-app' }}]}, - * 'http://localhost:8080/api/v3', + * 'http://localhost:8080', + * '/api/v3', * ) * * // Add listeners to the stream. @@ -49,7 +51,7 @@ const initialListeners = Object.values(EVENTS).reduce((acc, curr) => ({ ...acc, * @returns {object} The stream subscription object with the `on` function for * attaching listeners and the `close` function to close the stream. */ -export default async (payload, baseUrl) => { +export default async (payload, baseUrl, endpoint = '/console/internal/events/') => { const subscriptionId = Date.now() const subscriptionPayload = JSON.stringify({ type: MESSAGE_TYPES.SUBSCRIBE, @@ -61,6 +63,7 @@ export default async (payload, baseUrl) => { id: subscriptionId, }) let closeRequested = false + const url = baseUrl + endpoint await new Promise(async resolve => { // Add the new subscription to the subscriptions object. @@ -68,7 +71,7 @@ export default async (payload, baseUrl) => { // to resolve the promise after the subscription confirmation message. subscriptions = { ...subscriptions, - [subscriptionId]: { ...initialListeners, url: baseUrl, _resolver: resolve }, + [subscriptionId]: { ...initialListeners, url, _resolver: resolve }, } const token = new Token().get() @@ -76,33 +79,33 @@ export default async (payload, baseUrl) => { const baseUrlParsed = baseUrl.replace('http', 'ws') // Open up the WebSocket connection if it doesn't exist. - if (!wsInstances[baseUrl]) { - wsInstances[baseUrl] = new WebSocket(`${baseUrlParsed}/console/internal/events/`, [ + if (!wsInstances[url]) { + wsInstances[url] = new WebSocket(`${baseUrlParsed}${endpoint}`, [ 'ttn.lorawan.v3.console.internal.events.v1', `ttn.lorawan.v3.header.authorization.bearer.${tokenParsed}`, ]) // Event listener for 'open' - wsInstances[baseUrl].addEventListener('open', () => { - wsInstances[baseUrl].send(subscriptionPayload) + wsInstances[url].addEventListener('open', () => { + wsInstances[url].send(subscriptionPayload) }) // Broadcast connection errors to all listeners. - wsInstances[baseUrl].addEventListener('error', error => { + wsInstances[url].addEventListener('error', error => { Object.values(subscriptions) - .filter(s => s.url === baseUrl) + .filter(s => s.url === url) .forEach(s => notify(s[EVENTS.ERROR], error)) resolve() }) // Event listener for 'close' - wsInstances[baseUrl].addEventListener('close', () => { - delete wsInstances[baseUrl] + wsInstances[url].addEventListener('close', () => { + delete wsInstances[url] }) // After the WebSocket connection is open, add the event listeners. // Wait for the subscription confirmation message before resolving. - wsInstances[baseUrl].addEventListener('message', ({ data }) => { + wsInstances[url].addEventListener('message', ({ data }) => { const dataParsed = JSON.parse(data) const listeners = subscriptions[dataParsed.id] @@ -130,14 +133,14 @@ export default async (payload, baseUrl) => { notify(listeners[EVENTS.CLOSE], closeRequested) // Remove the subscription. delete subscriptions[dataParsed.id] - if (!Object.values(subscriptions).some(s => s.url === baseUrl)) { - wsInstances[baseUrl].close() + if (!Object.values(subscriptions).some(s => s.url === url)) { + wsInstances[url].close() } } }) - } else if (wsInstances[baseUrl] && wsInstances[baseUrl].readyState === WebSocket.OPEN) { + } else if (wsInstances[url] && wsInstances[url].readyState === WebSocket.OPEN) { // If the WebSocket connection is already open, only add the subscription. - wsInstances[baseUrl].send(subscriptionPayload) + wsInstances[url].send(subscriptionPayload) } }) @@ -155,13 +158,13 @@ export default async (payload, baseUrl) => { return this }, close: () => { - if (wsInstances[baseUrl]) { + if (wsInstances[url]) { closeRequested = true - wsInstances[baseUrl].send(unsubscribePayload) + wsInstances[url].send(unsubscribePayload) // Wait for the server to confirm the unsubscribe. return new Promise(resolve => { - wsInstances[baseUrl].addEventListener('message', ({ data }) => { + wsInstances[url].addEventListener('message', ({ data }) => { const { type, id } = JSON.parse(data) if (id === subscriptionId && type === MESSAGE_TYPES.UNSUBSCRIBE) { resolve() diff --git a/sdk/js/src/service/applications.js b/sdk/js/src/service/applications.js index b262e24053..42cde4598e 100644 --- a/sdk/js/src/service/applications.js +++ b/sdk/js/src/service/applications.js @@ -15,7 +15,7 @@ import autoBind from 'auto-bind' import Marshaler from '../util/marshaler' -import subscribeToStream from '../api/stream/subscribeToStream' +import subscribeToWebSocketStream from '../api/stream/subscribeToWebSocketStream' import combineStreams from '../util/combine-streams' import { STACK_COMPONENTS_MAP } from '../util/constants' @@ -236,7 +236,7 @@ class Applications { distinctComponents.map(component => this._stackConfig.getComponentUrlByName(component)), ) - const streams = [...baseUrls].map(baseUrl => subscribeToStream(payload, baseUrl)) + const streams = [...baseUrls].map(baseUrl => subscribeToWebSocketStream(payload, baseUrl)) // Combine all stream sources to one subscription generator. return combineStreams(streams) diff --git a/sdk/js/src/service/devices/index.js b/sdk/js/src/service/devices/index.js index 0d7c90b39c..5b8e0021d4 100644 --- a/sdk/js/src/service/devices/index.js +++ b/sdk/js/src/service/devices/index.js @@ -19,7 +19,7 @@ import traverse from 'traverse' import { notify, EVENTS } from '../../api/stream/shared' import Marshaler from '../../util/marshaler' -import subscribeToStream from '../../api/stream/subscribeToStream' +import subscribeToWebSocketStream from '../../api/stream/subscribeToWebSocketStream' import deviceEntityMap from '../../../generated/device-entity-map.json' import DownlinkQueue from '../downlink-queue' import { STACK_COMPONENTS_MAP } from '../../util/constants' @@ -664,7 +664,7 @@ class Devices { on(eventName, callback) { if (listeners[eventName] === undefined) { throw new Error( - `${eventName} event is not supported. Should be one of: start, error, chunk or close`, + `${eventName} event is not supported. Should be one of: open, error, message or close`, ) } @@ -699,7 +699,7 @@ class Devices { distinctComponents.map(component => this._stackConfig.getComponentUrlByName(component)), ) - const streams = [...baseUrls].map(baseUrl => subscribeToStream(payload, baseUrl)) + const streams = [...baseUrls].map(baseUrl => subscribeToWebSocketStream(payload, baseUrl)) // Combine all stream sources to one subscription generator. return combineStreams(streams) diff --git a/sdk/js/src/service/gateways.js b/sdk/js/src/service/gateways.js index 5478e958dd..41f5d820a7 100644 --- a/sdk/js/src/service/gateways.js +++ b/sdk/js/src/service/gateways.js @@ -15,7 +15,7 @@ import autoBind from 'auto-bind' import Marshaler from '../util/marshaler' -import subscribeToStream from '../api/stream/subscribeToStream' +import subscribeToWebSocketStream from '../api/stream/subscribeToWebSocketStream' import { STACK_COMPONENTS_MAP } from '../util/constants' import combineStreams from '../util/combine-streams' @@ -262,7 +262,7 @@ class Gateways { distinctComponents.map(component => this._stackConfig.getComponentUrlByName(component)), ) - const streams = [...baseUrls].map(baseUrl => subscribeToStream(payload, baseUrl)) + const streams = [...baseUrls].map(baseUrl => subscribeToWebSocketStream(payload, baseUrl)) // Combine all stream sources to one subscription generator. return combineStreams(streams) diff --git a/sdk/js/src/service/organizations.js b/sdk/js/src/service/organizations.js index 07549d7511..5469acd031 100644 --- a/sdk/js/src/service/organizations.js +++ b/sdk/js/src/service/organizations.js @@ -15,7 +15,7 @@ import autoBind from 'auto-bind' import Marshaler from '../util/marshaler' -import subscribeToStream from '../api/stream/subscribeToStream' +import subscribeToWebSocketStream from '../api/stream/subscribeToWebSocketStream' import { STACK_COMPONENTS_MAP } from '../util/constants' import ApiKeys from './api-keys' @@ -168,7 +168,7 @@ class Organizations { const baseUrl = this._stackConfig.getComponentUrlByName(STACK_COMPONENTS_MAP.is) - return subscribeToStream(payload, baseUrl) + return subscribeToWebSocketStream(payload, baseUrl) } } From 66ebf060688a7cfcc1eba1df3d6bbb46ac363e3b Mon Sep 17 00:00:00 2001 From: Pavel Jankoski Date: Tue, 21 Nov 2023 15:29:58 +0100 Subject: [PATCH 2/2] console: Fix unit tests --- sdk/js/src/api/index.test.js | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sdk/js/src/api/index.test.js b/sdk/js/src/api/index.test.js index 084699352c..2747549797 100644 --- a/sdk/js/src/api/index.test.js +++ b/sdk/js/src/api/index.test.js @@ -104,6 +104,7 @@ describe('API class', () => { it('respects the search query', () => { api.ApplicationRegistry.List(undefined, { limit: 2, page: 1 }) + expect(api._connector.handleRequest).toHaveBeenCalledTimes(1) expect(api._connector.handleRequest).toHaveBeenCalledWith( 'get', @@ -125,9 +126,5 @@ describe('API class', () => { undefined, true, ) - expect(api._connector.handleRequest).toHaveBeenCalledWith('get', '/applications', undefined, { - limit: 2, - page: 1, - }) }) })