From 7ac2acf37dfe85dae5e5a0607bb43b085ffb6047 Mon Sep 17 00:00:00 2001 From: Pavel Jankoski Date: Mon, 20 Nov 2023 18:01:09 +0100 Subject: [PATCH] console: Replaced server side events with single socket connection using the native WebSocket API. --- CHANGELOG.md | 1 + config/webpack.config.babel.js | 1 + .../console/store/middleware/logics/events.js | 6 +- sdk/js/src/api/http.js | 9 +- sdk/js/src/api/index.js | 3 +- sdk/js/src/api/index.test.js | 25 +-- sdk/js/src/api/stream/shared.js | 11 +- sdk/js/src/api/stream/stream-node.js | 122 ------------ sdk/js/src/api/stream/stream.js | 145 --------------- sdk/js/src/api/stream/subscribeToStream.js | 174 ++++++++++++++++++ sdk/js/src/index.js | 5 +- sdk/js/src/service/applications.js | 20 +- sdk/js/src/service/devices/index.js | 24 +-- sdk/js/src/service/gateways.js | 11 +- sdk/js/src/service/organizations.js | 9 +- sdk/js/src/util/combine-streams.js | 7 +- 16 files changed, 231 insertions(+), 342 deletions(-) delete mode 100644 sdk/js/src/api/stream/stream-node.js delete mode 100644 sdk/js/src/api/stream/stream.js create mode 100644 sdk/js/src/api/stream/subscribeToStream.js diff --git a/CHANGELOG.md b/CHANGELOG.md index 56a1c8d91f..3b1a9932c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ For details about compatibility between different releases, see the **Commitment ### Added ### Changed +- Server side events replaced with single socket connection using the native WebSocket API. ### Deprecated diff --git a/config/webpack.config.babel.js b/config/webpack.config.babel.js index 2281c7e473..4530e40b95 100644 --- a/config/webpack.config.babel.js +++ b/config/webpack.config.babel.js @@ -175,6 +175,7 @@ export default { target: WEBPACK_DEV_SERVER_USE_TLS ? 'https://localhost:8885' : 'http://localhost:1885', changeOrigin: true, secure: false, + ws: true, }, ], historyApiFallback: true, diff --git a/pkg/webui/console/store/middleware/logics/events.js b/pkg/webui/console/store/middleware/logics/events.js index fe61572a04..ef797fbd7b 100644 --- a/pkg/webui/console/store/middleware/logics/events.js +++ b/pkg/webui/console/store/middleware/logics/events.js @@ -133,14 +133,14 @@ const createEventsConnectLogics = (reducerName, entityName, onEventsStart) => { try { channel = await onEventsStart([id], filterRegExp, EVENT_TAIL, after) - channel.on('start', () => dispatch(startEventsSuccess(id, { silent }))) - channel.on('chunk', message => dispatch(getEventSuccess(id, message))) + channel.on('open', () => dispatch(startEventsSuccess(id, { silent }))) + channel.on('message', message => dispatch(getEventSuccess(id, message))) channel.on('error', error => dispatch(getEventFailure(id, error))) channel.on('close', wasClientRequest => dispatch(closeEvents(id, { silent: wasClientRequest })), ) - channel.open() + channel.on('open', () => dispatch(startEventsSuccess(id, { silent }))) } catch (error) { if (isUnauthenticatedError(error)) { // The user is no longer authenticated; reinitiate the auth flow diff --git a/sdk/js/src/api/http.js b/sdk/js/src/api/http.js index d4df5501bf..bfde30408c 100644 --- a/sdk/js/src/api/http.js +++ b/sdk/js/src/api/http.js @@ -24,8 +24,6 @@ import { RATE_LIMIT_RETRIES, } from '../util/constants' -import stream from './stream/stream-node' - /** * Http Class is a connector for the API that uses the HTTP bridge to connect. */ @@ -87,7 +85,7 @@ class Http { } } - async handleRequest(method, endpoint, component, payload = {}, isStream) { + async handleRequest(method, endpoint, component, payload = {}) { const parsedComponent = component || this._parseStackComponent(endpoint) if (!this._stackConfig.isComponentAvailable(parsedComponent)) { // If the component has not been defined in The Things Stack config, make no @@ -98,11 +96,6 @@ class Http { } try { - if (isStream) { - const url = this._stackConfig.getComponentUrlByName(parsedComponent) + endpoint - return stream(payload, url) - } - const config = { method, url: endpoint, diff --git a/sdk/js/src/api/index.js b/sdk/js/src/api/index.js index aefcfa1dfb..853ab988c6 100644 --- a/sdk/js/src/api/index.js +++ b/sdk/js/src/api/index.js @@ -62,13 +62,12 @@ Signature tried: ${paramSignature}`) } let route = endpoint.pattern - const isStream = Boolean(endpoint.stream) for (const parameter of endpoint.parameters) { route = route.replace(`{${parameter}}`, routeParams[parameter]) } - return connector.handleRequest(endpoint.method, route, component, payload, isStream) + return connector.handleRequest(endpoint.method, route, component, payload) } this[serviceName][`${rpcName}AllowedFieldMaskPaths`] = rpc.allowedFieldMaskPaths diff --git a/sdk/js/src/api/index.test.js b/sdk/js/src/api/index.test.js index 2747549797..bc3ecf366b 100644 --- a/sdk/js/src/api/index.test.js +++ b/sdk/js/src/api/index.test.js @@ -92,7 +92,6 @@ describe('API class', () => { '/users/test/applications', undefined, { name: 'test-name' }, - false, ) }) @@ -106,25 +105,9 @@ describe('API class', () => { api.ApplicationRegistry.List(undefined, { limit: 2, page: 1 }) expect(api._connector.handleRequest).toHaveBeenCalledTimes(1) - expect(api._connector.handleRequest).toHaveBeenCalledWith( - 'get', - '/applications', - undefined, - { limit: 2, page: 1 }, - false, - ) - }) - - it('sets stream value to true for streaming endpoints', () => { - api.ApplicationRegistry.Events() - - expect(api._connector.handleRequest).toHaveBeenCalledTimes(1) - expect(api._connector.handleRequest).toHaveBeenCalledWith( - 'get', - '/events', - undefined, - undefined, - true, - ) + expect(api._connector.handleRequest).toHaveBeenCalledWith('get', '/applications', undefined, { + limit: 2, + page: 1, + }) }) }) diff --git a/sdk/js/src/api/stream/shared.js b/sdk/js/src/api/stream/shared.js index 643209a7a9..8d1b8b18ff 100644 --- a/sdk/js/src/api/stream/shared.js +++ b/sdk/js/src/api/stream/shared.js @@ -19,8 +19,15 @@ export const notify = (listener, ...args) => { } export const EVENTS = Object.freeze({ - START: 'start', - CHUNK: 'chunk', + OPEN: 'open', + MESSAGE: 'message', ERROR: 'error', CLOSE: 'close', }) + +export const MESSAGE_TYPES = Object.freeze({ + SUBSCRIBE: 'subscribe', + UNSUBSCRIBE: 'unsubscribe', + PUBLISH: 'publish', + ERROR: 'error', +}) diff --git a/sdk/js/src/api/stream/stream-node.js b/sdk/js/src/api/stream/stream-node.js deleted file mode 100644 index 7b41db8291..0000000000 --- a/sdk/js/src/api/stream/stream-node.js +++ /dev/null @@ -1,122 +0,0 @@ -// Copyright © 2019 The Things Network Foundation, The Things Industries B.V. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -import axios from 'axios' - -import Token from '../../util/token' - -import { notify, EVENTS } from './shared' - -/** - * Opens a new stream. - * - * @async - * @param {object} payload - - The body of the initial request. - * @param {string} url - The stream endpoint. - * - * @example - * (async () => { - * const stream = await stream( - * { identifiers: [{ application_ids: { application_id: 'my-app' }}]}, - * 'http://localhost:1885/api/v3/events', - * ) - * - * // Add listeners to the stream. - * stream - * .on('start', () => console.log('conn opened')) - * .on('chunk', chunk => console.log('received chunk', chunk)) - * .on('error', error => console.log(error)) - * .on('close', wasClientRequest => console.log(wasClientRequest ? 'conn closed by client' : 'conn closed by server')) - * - * // Start the stream after attaching the listeners. - * stream.open() - * - * // Close the stream after 20 s. - * setTimeout(() => stream.close(), 20000) - * })() - * - * @returns {object} The stream subscription object with the `on` function for - * attaching listeners and the `close` function to close the stream. - */ -export default async (payload, url) => { - let listeners = Object.values(EVENTS).reduce((acc, curr) => ({ ...acc, [curr]: null }), {}) - let reader = null - let closeRequested = false - - const token = new Token().get() - - let Authorization = null - if (typeof token === 'function') { - Authorization = `Bearer ${(await token()).access_token}` - } else { - Authorization = `Bearer ${token}` - } - - let buffer = '' - axios({ - url, - data: JSON.stringify(payload), - method: 'POST', - responseType: 'stream', - headers: { - Authorization, - Accept: 'text/event-stream', - }, - }) - .then(response => response.data) - .then(stream => { - reader = stream - }) - - return { - open: () => { - notify(listeners[EVENTS.START]) - - reader.on('data', data => { - const parsed = data.toString('utf8') - buffer += parsed - const lines = buffer.split(/\n\n/) - buffer = lines.pop() - for (const line of lines) { - notify(listeners[EVENTS.CHUNK], JSON.parse(line).result) - } - }) - reader.on('end', () => { - notify(listeners[EVENTS.CLOSE], closeRequested) - listeners = {} - }) - reader.on('error', error => { - notify(listeners[EVENTS.ERROR], error) - listeners = {} - }) - }, - on(eventName, callback) { - if (listeners[eventName] === undefined) { - throw new Error( - `${eventName} event is not supported. Should be one of: start, error, chunk or close`, - ) - } - - listeners[eventName] = callback - - return this - }, - close: () => { - if (reader) { - closeRequested = true - reader.cancel() - } - }, - } -} diff --git a/sdk/js/src/api/stream/stream.js b/sdk/js/src/api/stream/stream.js deleted file mode 100644 index 3fd4999574..0000000000 --- a/sdk/js/src/api/stream/stream.js +++ /dev/null @@ -1,145 +0,0 @@ -// Copyright © 2019 The Things Network Foundation, The Things Industries B.V. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -import ArrayBufferToString from 'arraybuffer-to-string' - -import Token from '../../util/token' - -import { notify, EVENTS } from './shared' -import 'web-streams-polyfill/dist/polyfill' - -/** - * Opens a new stream. - * - * @async - * @param {object} payload - - The body of the initial request. - * @param {string} url - The stream endpoint. - * - * @example - * (async () => { - * const stream = await stream( - * { identifiers: [{ application_ids: { application_id: 'my-app' }}]}, - * '/api/v3/events', - * ) - * - * // Add listeners to the stream. - * stream - * .on('start', () => console.log('conn opened')) - * .on('chunk', chunk => console.log('received chunk', chunk)) - * .on('error', error => console.log(error)) - * .on('close', wasClientRequest => console.log(wasClientRequest ? 'conn closed by client' : 'conn closed by server')) - * - * // Start the stream after attaching listerners. - * stream.open() - * - * // Close the stream after 20 s. - * setTimeout(() => stream.close(), 20000) - * })() - * - * @returns {object} The stream subscription object with the `on` function for - * attaching listeners and the `close` function to close the stream. - */ -export default async (payload, url) => { - const initialListeners = Object.values(EVENTS).reduce( - (acc, curr) => ({ ...acc, [curr]: null }), - {}, - ) - let listeners = initialListeners - let closeRequested = false - const token = new Token().get() - - let Authorization = null - if (typeof token === 'function') { - Authorization = `Bearer ${(await token()).access_token}` - } else { - Authorization = `Bearer ${token}` - } - - const abortController = new AbortController() - const response = await fetch(url, { - body: JSON.stringify(payload), - method: 'POST', - signal: abortController.signal, - headers: { - Authorization, - Accept: 'text/event-stream', - }, - }) - - if (response.status !== 200) { - const err = await response.json() - - throw 'error' in err ? err.error : err - } - - let buffer = '' - const reader = response.body.getReader() - const onChunk = ({ done, value }) => { - if (done) { - notify(listeners[EVENTS.CLOSE], closeRequested) - listeners = initialListeners - return - } - - const parsed = ArrayBufferToString(value) - buffer += parsed - const lines = buffer.split(/\n\n/) - buffer = lines.pop() - for (const line of lines) { - notify(listeners[EVENTS.CHUNK], JSON.parse(line).result) - } - - return reader.read().then(onChunk) - } - - return { - open: () => { - reader - .read() - .then(data => { - notify(listeners[EVENTS.START]) - - return data - }) - .then(onChunk) - .catch(error => { - notify(listeners[EVENTS.ERROR], error) - listeners = initialListeners - }) - }, - on(eventName, callback) { - if (listeners[eventName] === undefined) { - throw new Error( - `${eventName} event is not supported. Should be one of: start, error, chunk or close`, - ) - } - - listeners[eventName] = callback - - return this - }, - close: () => { - closeRequested = true - - reader - .cancel() - .then(() => { - abortController.abort() - }) - .catch(error => { - notify(listeners[EVENTS.ERROR], error) - }) - }, - } -} diff --git a/sdk/js/src/api/stream/subscribeToStream.js b/sdk/js/src/api/stream/subscribeToStream.js new file mode 100644 index 0000000000..bbc2610ffc --- /dev/null +++ b/sdk/js/src/api/stream/subscribeToStream.js @@ -0,0 +1,174 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import Token from '../../util/token' +import { warn } from '../../../../../pkg/webui/lib/log' + +import { notify, EVENTS, MESSAGE_TYPES } from './shared' + +const wsInstances = {} +let subscriptions = {} +const initialListeners = Object.values(EVENTS).reduce((acc, curr) => ({ ...acc, [curr]: {} }), {}) + +/** + * Opens a new stream. + * + * @async + * @param {object} payload - - The body of the initial request. + * @param {string} baseUrl - The stream baseUrl. + * + * @example + * (async () => { + * const stream = await stream( + * { identifiers: [{ application_ids: { application_id: 'my-app' }}]}, + * 'http://localhost:8080/api/v3', + * ) + * + * // Add listeners to the stream. + * stream + * .on('open', () => console.log('conn opened')) + * .on('message', ({ data }) => console.log('received data', JSON.parse(data))) + * .on('error', error => console.log(error)) + * .on('close', wasClientRequest => console.log(wasClientRequest ? 'conn closed by client' : 'conn closed by server')) + * + * // Close the stream after 20 s. + * setTimeout(() => stream.close(), 20000) + * })() + * + * @returns {object} The stream subscription object with the `on` function for + * attaching listeners and the `close` function to close the stream. + */ +export default async (payload, baseUrl) => { + const subscriptionId = Date.now() + const subscriptionPayload = JSON.stringify({ + type: MESSAGE_TYPES.SUBSCRIBE, + id: subscriptionId, + ...payload, + }) + const unsubscribePayload = JSON.stringify({ + type: MESSAGE_TYPES.UNSUBSCRIBE, + id: subscriptionId, + }) + let closeRequested = false + + await new Promise(async resolve => { + // Add the new subscription to the subscriptions object. + // Also add the resolver function to the subscription object to be able + // to resolve the promise after the subscription confirmation message. + subscriptions = { + ...subscriptions, + [subscriptionId]: { ...initialListeners, url: baseUrl, _resolver: resolve }, + } + + const token = new Token().get() + const tokenParsed = typeof token === 'function' ? (await token()).access_token : token + const baseUrlParsed = baseUrl.replace('http', 'ws') + + // Open up the WebSocket connection if it doesn't exist. + if (!wsInstances[baseUrl]) { + wsInstances[baseUrl] = new WebSocket(`${baseUrlParsed}/console/internal/events/`, [ + 'ttn.lorawan.v3.console.internal.events.v1', + `ttn.lorawan.v3.header.authorization.bearer.${tokenParsed}`, + ]) + + // Event listener for 'open' + wsInstances[baseUrl].addEventListener('open', () => { + wsInstances[baseUrl].send(subscriptionPayload) + }) + + // Broadcast connection errors to all listeners. + wsInstances[baseUrl].addEventListener('error', error => { + Object.values(subscriptions) + .filter(s => s.url === baseUrl) + .forEach(s => notify(s[EVENTS.ERROR], error)) + resolve() + }) + + // Event listener for 'close' + wsInstances[baseUrl].addEventListener('close', () => { + delete wsInstances[baseUrl] + }) + + // After the WebSocket connection is open, add the event listeners. + // Wait for the subscription confirmation message before resolving. + wsInstances[baseUrl].addEventListener('message', ({ data }) => { + const dataParsed = JSON.parse(data) + const listeners = subscriptions[dataParsed.id] + + if (!listeners) { + warn('Message received for closed or unknown subscription with ID', dataParsed.id) + + return + } + + if (dataParsed.type === MESSAGE_TYPES.SUBSCRIBE) { + notify(listeners[EVENTS.OPEN]) + // Resolve the promise after the subscription confirmation message. + listeners._resolver() + } + + if (dataParsed.type === MESSAGE_TYPES.ERROR) { + notify(listeners[EVENTS.ERROR], dataParsed) + } + + if (dataParsed.type === MESSAGE_TYPES.PUBLISH) { + notify(listeners[EVENTS.MESSAGE], dataParsed.event) + } + + if (dataParsed.type === MESSAGE_TYPES.UNSUBSCRIBE) { + notify(listeners[EVENTS.CLOSE], closeRequested) + // Remove the subscription. + delete subscriptions[dataParsed.id] + if (!Object.values(subscriptions).some(s => s.url === baseUrl)) { + wsInstances[baseUrl].close() + } + } + }) + } else if (wsInstances[baseUrl] && wsInstances[baseUrl].readyState === WebSocket.OPEN) { + // If the WebSocket connection is already open, only add the subscription. + wsInstances[baseUrl].send(subscriptionPayload) + } + }) + + // Return an observer object with the `on` and `close` functions for + // the current subscription. + return { + on(eventName, callback) { + if (!Object.values(EVENTS).includes(eventName)) { + throw new Error( + `${eventName} event is not supported. Should be one of: open, message, error or close`, + ) + } + subscriptions[subscriptionId][eventName] = callback + + return this + }, + close: () => { + if (wsInstances[baseUrl]) { + closeRequested = true + wsInstances[baseUrl].send(unsubscribePayload) + + // Wait for the server to confirm the unsubscribe. + return new Promise(resolve => { + wsInstances[baseUrl].addEventListener('message', ({ data }) => { + const { type, id } = JSON.parse(data) + if (id === subscriptionId && type === MESSAGE_TYPES.UNSUBSCRIBE) { + resolve() + } + }) + }) + } + }, + } +} diff --git a/sdk/js/src/index.js b/sdk/js/src/index.js index 20af399f98..248ab976ad 100644 --- a/sdk/js/src/index.js +++ b/sdk/js/src/index.js @@ -55,7 +55,10 @@ class TTS { this.Ns = new Ns(this.api.Ns) this.Is = new Is(this.api.Is) this.As = new As(this.api) - this.Organizations = new Organizations(this.api) + this.Organizations = new Organizations(this.api, { + defaultUserId, + stackConfig: stackConfiguration, + }) this.Users = new Users(this.api) this.Auth = new Auth(this.api.EntityAccess) this.Sessions = new Sessions(this.api) diff --git a/sdk/js/src/service/applications.js b/sdk/js/src/service/applications.js index 8204be0a54..b262e24053 100644 --- a/sdk/js/src/service/applications.js +++ b/sdk/js/src/service/applications.js @@ -15,6 +15,7 @@ import autoBind from 'auto-bind' import Marshaler from '../util/marshaler' +import subscribeToStream from '../api/stream/subscribeToStream' import combineStreams from '../util/combine-streams' import { STACK_COMPONENTS_MAP } from '../util/constants' @@ -26,8 +27,7 @@ import Webhooks from './webhooks' import PubSubs from './pubsubs' import Packages from './application-packages' -const { is: IS, as: AS, ns: NS, js: JS, dtc: DTC } = STACK_COMPONENTS_MAP - +const { is: IS, gs: GS } = STACK_COMPONENTS_MAP /** * Applications Class provides an abstraction on all applications and manages * data handling from different sources. It exposes an API to easily work with @@ -230,18 +230,14 @@ class Applications { // Event streams can come from multiple stack components. It is necessary to // check for stack components on different hosts and open distinct stream // connections for any distinct host if need be. - const distinctComponents = this._stackConfig.getComponentsWithDistinctBaseUrls([ - IS, - JS, - NS, - AS, - DTC, - ]) - - const streams = distinctComponents.map(component => - this._api.Events.Stream({ component }, payload), + const distinctComponents = this._stackConfig.getComponentsWithDistinctBaseUrls([IS, GS]) + + const baseUrls = new Set( + distinctComponents.map(component => this._stackConfig.getComponentUrlByName(component)), ) + const streams = [...baseUrls].map(baseUrl => subscribeToStream(payload, baseUrl)) + // Combine all stream sources to one subscription generator. return combineStreams(streams) } diff --git a/sdk/js/src/service/devices/index.js b/sdk/js/src/service/devices/index.js index f075d58849..0d7c90b39c 100644 --- a/sdk/js/src/service/devices/index.js +++ b/sdk/js/src/service/devices/index.js @@ -19,18 +19,18 @@ import traverse from 'traverse' import { notify, EVENTS } from '../../api/stream/shared' import Marshaler from '../../util/marshaler' -import combineStreams from '../../util/combine-streams' +import subscribeToStream from '../../api/stream/subscribeToStream' import deviceEntityMap from '../../../generated/device-entity-map.json' import DownlinkQueue from '../downlink-queue' import { STACK_COMPONENTS_MAP } from '../../util/constants' import DeviceClaim from '../claim' +import combineStreams from '../../util/combine-streams' import Repository from './repository' import { splitSetPaths, splitGetPaths, makeRequests } from './split' import mergeDevice from './merge' -const { is: IS, ns: NS, as: AS, js: JS, dtc: DTC } = STACK_COMPONENTS_MAP - +const { is: IS, ns: NS, as: AS, js: JS, gs: GS } = STACK_COMPONENTS_MAP /** * Devices Class provides an abstraction on all devices and manages data * handling from different sources. It exposes an API to easily work with @@ -645,7 +645,7 @@ class Devices { const result = await this.create(applicationId, end_device, paths) - notify(listeners[EVENTS.CHUNK], result) + notify(listeners[EVENTS.MESSAGE], result) finishedCount++ } catch (error) { notify(listeners[EVENTS.ERROR], error) @@ -693,18 +693,14 @@ class Devices { // Event streams can come from multiple stack components. It is necessary to // check for stack components on different hosts and open distinct stream // connections for any distinct host if need be. - const distinctComponents = this._stackConfig.getComponentsWithDistinctBaseUrls([ - IS, - JS, - NS, - AS, - DTC, - ]) - - const streams = distinctComponents.map(component => - this._api.Events.Stream({ component }, payload), + const distinctComponents = this._stackConfig.getComponentsWithDistinctBaseUrls([IS, GS]) + + const baseUrls = new Set( + distinctComponents.map(component => this._stackConfig.getComponentUrlByName(component)), ) + const streams = [...baseUrls].map(baseUrl => subscribeToStream(payload, baseUrl)) + // Combine all stream sources to one subscription generator. return combineStreams(streams) } diff --git a/sdk/js/src/service/gateways.js b/sdk/js/src/service/gateways.js index 88af373d31..5478e958dd 100644 --- a/sdk/js/src/service/gateways.js +++ b/sdk/js/src/service/gateways.js @@ -15,8 +15,9 @@ import autoBind from 'auto-bind' import Marshaler from '../util/marshaler' -import combineStreams from '../util/combine-streams' +import subscribeToStream from '../api/stream/subscribeToStream' import { STACK_COMPONENTS_MAP } from '../util/constants' +import combineStreams from '../util/combine-streams' import ApiKeys from './api-keys' import Collaborators from './collaborators' @@ -257,10 +258,12 @@ class Gateways { STACK_COMPONENTS_MAP.gs, ]) - const streams = distinctComponents.map(component => - this._api.Events.Stream({ component }, payload), + const baseUrls = new Set( + distinctComponents.map(component => this._stackConfig.getComponentUrlByName(component)), ) + const streams = [...baseUrls].map(baseUrl => subscribeToStream(payload, baseUrl)) + // Combine all stream sources to one subscription generator. return combineStreams(streams) } @@ -273,7 +276,7 @@ class Gateways { // https://github.com/TheThingsNetwork/lorawan-stack/issues/3280 const endpoint = `/gcs/gateways/${gatewayId}/semtechudp/global_conf.json` - const response = await this._api._connector.handleRequest('get', endpoint, 'gcs', false) + const response = await this._api._connector.handleRequest('get', endpoint, 'gcs') return Marshaler.payloadSingleResponse(response.data) } diff --git a/sdk/js/src/service/organizations.js b/sdk/js/src/service/organizations.js index 5f0981109a..07549d7511 100644 --- a/sdk/js/src/service/organizations.js +++ b/sdk/js/src/service/organizations.js @@ -15,13 +15,16 @@ import autoBind from 'auto-bind' import Marshaler from '../util/marshaler' +import subscribeToStream from '../api/stream/subscribeToStream' +import { STACK_COMPONENTS_MAP } from '../util/constants' import ApiKeys from './api-keys' import Collaborators from './collaborators' class Organizations { - constructor(api) { + constructor(api, { stackConfig }) { this._api = api + this._stackConfig = stackConfig this.ApiKeys = new ApiKeys(api.OrganizationAccess, { parentRoutes: { @@ -163,7 +166,9 @@ class Organizations { after, } - return this._api.Events.Stream(undefined, payload) + const baseUrl = this._stackConfig.getComponentUrlByName(STACK_COMPONENTS_MAP.is) + + return subscribeToStream(payload, baseUrl) } } diff --git a/sdk/js/src/util/combine-streams.js b/sdk/js/src/util/combine-streams.js index 92ab832595..75e20855b5 100644 --- a/sdk/js/src/util/combine-streams.js +++ b/sdk/js/src/util/combine-streams.js @@ -1,4 +1,4 @@ -// Copyright © 2020 The Things Network Foundation, The Things Industries B.V. +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -29,11 +29,6 @@ const combinedStream = async streams => { const subscribers = await Promise.all(streams) return { - open: () => { - for (const subscriber of subscribers) { - subscriber.open() - } - }, on: (eventName, callback) => { for (const subscriber of subscribers) { subscriber.on(eventName, callback)