Skip to content

Commit

Permalink
Merge pull request #6718 from TheThingsNetwork/fix/rollback-http-stream
Browse files Browse the repository at this point in the history
console: Rollback old http stream
  • Loading branch information
PavelJankoski authored Nov 22, 2023
2 parents 69259b5 + 66ebf06 commit 323b9b2
Show file tree
Hide file tree
Showing 10 changed files with 209 additions and 36 deletions.
4 changes: 2 additions & 2 deletions pkg/webui/console/containers/device-importer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ const DeviceImporter = () => {
devices = await new Promise((resolve, reject) => {
const chunks = []

templateStream.on('chunk', message => {
templateStream.on('message', message => {
appendToLog(message)
chunks.push(message)
})
Expand Down Expand Up @@ -252,7 +252,7 @@ const DeviceImporter = () => {
createStream.current = tts.Applications.Devices.bulkCreate(appId, devices)

await new Promise(resolve => {
createStream.current.on('chunk', handleCreationSuccess)
createStream.current.on('message', handleCreationSuccess)
createStream.current.on('error', handleCreationError)
createStream.current.on('close', resolve)

Expand Down
9 changes: 8 additions & 1 deletion sdk/js/src/api/http.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import {
RATE_LIMIT_RETRIES,
} from '../util/constants'

import subscribeToHttpStream from './stream/subscribeToHttpStream'

/**
* Http Class is a connector for the API that uses the HTTP bridge to connect.
*/
Expand Down Expand Up @@ -85,7 +87,7 @@ class Http {
}
}

async handleRequest(method, endpoint, component, payload = {}) {
async handleRequest(method, endpoint, component, payload = {}, isStream) {
const parsedComponent = component || this._parseStackComponent(endpoint)
if (!this._stackConfig.isComponentAvailable(parsedComponent)) {
// If the component has not been defined in The Things Stack config, make no
Expand All @@ -96,6 +98,11 @@ class Http {
}

try {
if (isStream) {
const url = this._stackConfig.getComponentUrlByName(parsedComponent) + endpoint
return subscribeToHttpStream(payload, url)
}

const config = {
method,
url: endpoint,
Expand Down
3 changes: 2 additions & 1 deletion sdk/js/src/api/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,13 @@ Signature tried: ${paramSignature}`)
}

let route = endpoint.pattern
const isStream = Boolean(endpoint.stream)

for (const parameter of endpoint.parameters) {
route = route.replace(`{${parameter}}`, routeParams[parameter])
}

return connector.handleRequest(endpoint.method, route, component, payload)
return connector.handleRequest(endpoint.method, route, component, payload, isStream)
}

this[serviceName][`${rpcName}AllowedFieldMaskPaths`] = rpc.allowedFieldMaskPaths
Expand Down
25 changes: 21 additions & 4 deletions sdk/js/src/api/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ describe('API class', () => {
'/users/test/applications',
undefined,
{ name: 'test-name' },
false,
)
})

Expand All @@ -105,9 +106,25 @@ describe('API class', () => {
api.ApplicationRegistry.List(undefined, { limit: 2, page: 1 })

expect(api._connector.handleRequest).toHaveBeenCalledTimes(1)
expect(api._connector.handleRequest).toHaveBeenCalledWith('get', '/applications', undefined, {
limit: 2,
page: 1,
})
expect(api._connector.handleRequest).toHaveBeenCalledWith(
'get',
'/applications',
undefined,
{ limit: 2, page: 1 },
false,
)
})

it('sets stream value to true for streaming endpoints', () => {
api.ApplicationRegistry.Events()

expect(api._connector.handleRequest).toHaveBeenCalledTimes(1)
expect(api._connector.handleRequest).toHaveBeenCalledWith(
'get',
'/events',
undefined,
undefined,
true,
)
})
})
145 changes: 145 additions & 0 deletions sdk/js/src/api/stream/subscribeToHttpStream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright © 2019 The Things Network Foundation, The Things Industries B.V.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import ArrayBufferToString from 'arraybuffer-to-string'

import Token from '../../util/token'

import { notify, EVENTS } from './shared'
import 'web-streams-polyfill/dist/polyfill'

/**
* Opens a new stream.
*
* @async
* @param {object} payload - - The body of the initial request.
* @param {string} url - The stream endpoint.
*
* @example
* (async () => {
* const stream = await stream(
* { identifiers: [{ application_ids: { application_id: 'my-app' }}]},
* '/api/v3/events',
* )
*
* // Add listeners to the stream.
* stream
* .on('start', () => console.log('conn opened'))
* .on('message', message => console.log('received message', message))
* .on('error', error => console.log(error))
* .on('close', wasClientRequest => console.log(wasClientRequest ? 'conn closed by client' : 'conn closed by server'))
*
* // Start the stream after attaching listerners.
* stream.open()
*
* // Close the stream after 20 s.
* setTimeout(() => stream.close(), 20000)
* })()
*
* @returns {object} The stream subscription object with the `on` function for
* attaching listeners and the `close` function to close the stream.
*/
export default async (payload, url) => {
const initialListeners = Object.values(EVENTS).reduce(
(acc, curr) => ({ ...acc, [curr]: null }),
{},
)
let listeners = initialListeners
let closeRequested = false
const token = new Token().get()

let Authorization = null
if (typeof token === 'function') {
Authorization = `Bearer ${(await token()).access_token}`
} else {
Authorization = `Bearer ${token}`
}

const abortController = new AbortController()
const response = await fetch(url, {
body: JSON.stringify(payload),
method: 'POST',
signal: abortController.signal,
headers: {
Authorization,
Accept: 'text/event-stream',
},
})

if (response.status !== 200) {
const err = await response.json()

throw 'error' in err ? err.error : err
}

let buffer = ''
const reader = response.body.getReader()
const onMessage = ({ done, value }) => {
if (done) {
notify(listeners[EVENTS.CLOSE], closeRequested)
listeners = initialListeners
return
}

const parsed = ArrayBufferToString(value)
buffer += parsed
const lines = buffer.split(/\n\n/)
buffer = lines.pop()
for (const line of lines) {
notify(listeners[EVENTS.MESSAGE], JSON.parse(line).result)
}

return reader.read().then(onMessage)
}

return {
open: () => {
reader
.read()
.then(data => {
notify(listeners[EVENTS.START])

return data
})
.then(onMessage)
.catch(error => {
notify(listeners[EVENTS.ERROR], error)
listeners = initialListeners
})
},
on(eventName, callback) {
if (listeners[eventName] === undefined) {
throw new Error(
`${eventName} event is not supported. Should be one of: start, error, message or close`,
)
}

listeners[eventName] = callback

return this
},
close: () => {
closeRequested = true

reader
.cancel()
.then(() => {
abortController.abort()
})
.catch(error => {
notify(listeners[EVENTS.ERROR], error)
})
},
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ const initialListeners = Object.values(EVENTS).reduce((acc, curr) => ({ ...acc,
* @async
* @param {object} payload - - The body of the initial request.
* @param {string} baseUrl - The stream baseUrl.
* @param {string} endpoint - The stream endpoint.
*
* @example
* (async () => {
* const stream = await stream(
* { identifiers: [{ application_ids: { application_id: 'my-app' }}]},
* 'http://localhost:8080/api/v3',
* 'http://localhost:8080',
* '/api/v3',
* )
*
* // Add listeners to the stream.
Expand All @@ -49,7 +51,7 @@ const initialListeners = Object.values(EVENTS).reduce((acc, curr) => ({ ...acc,
* @returns {object} The stream subscription object with the `on` function for
* attaching listeners and the `close` function to close the stream.
*/
export default async (payload, baseUrl) => {
export default async (payload, baseUrl, endpoint = '/console/internal/events/') => {
const subscriptionId = Date.now()
const subscriptionPayload = JSON.stringify({
type: MESSAGE_TYPES.SUBSCRIBE,
Expand All @@ -61,48 +63,49 @@ export default async (payload, baseUrl) => {
id: subscriptionId,
})
let closeRequested = false
const url = baseUrl + endpoint

await new Promise(async resolve => {
// Add the new subscription to the subscriptions object.
// Also add the resolver function to the subscription object to be able
// to resolve the promise after the subscription confirmation message.
subscriptions = {
...subscriptions,
[subscriptionId]: { ...initialListeners, url: baseUrl, _resolver: resolve },
[subscriptionId]: { ...initialListeners, url, _resolver: resolve },
}

const token = new Token().get()
const tokenParsed = typeof token === 'function' ? (await token()).access_token : token
const baseUrlParsed = baseUrl.replace('http', 'ws')

// Open up the WebSocket connection if it doesn't exist.
if (!wsInstances[baseUrl]) {
wsInstances[baseUrl] = new WebSocket(`${baseUrlParsed}/console/internal/events/`, [
if (!wsInstances[url]) {
wsInstances[url] = new WebSocket(`${baseUrlParsed}${endpoint}`, [
'ttn.lorawan.v3.console.internal.events.v1',
`ttn.lorawan.v3.header.authorization.bearer.${tokenParsed}`,
])

// Event listener for 'open'
wsInstances[baseUrl].addEventListener('open', () => {
wsInstances[baseUrl].send(subscriptionPayload)
wsInstances[url].addEventListener('open', () => {
wsInstances[url].send(subscriptionPayload)
})

// Broadcast connection errors to all listeners.
wsInstances[baseUrl].addEventListener('error', error => {
wsInstances[url].addEventListener('error', error => {
Object.values(subscriptions)
.filter(s => s.url === baseUrl)
.filter(s => s.url === url)
.forEach(s => notify(s[EVENTS.ERROR], error))
resolve()
})

// Event listener for 'close'
wsInstances[baseUrl].addEventListener('close', () => {
delete wsInstances[baseUrl]
wsInstances[url].addEventListener('close', () => {
delete wsInstances[url]
})

// After the WebSocket connection is open, add the event listeners.
// Wait for the subscription confirmation message before resolving.
wsInstances[baseUrl].addEventListener('message', ({ data }) => {
wsInstances[url].addEventListener('message', ({ data }) => {
const dataParsed = JSON.parse(data)
const listeners = subscriptions[dataParsed.id]

Expand Down Expand Up @@ -130,14 +133,14 @@ export default async (payload, baseUrl) => {
notify(listeners[EVENTS.CLOSE], closeRequested)
// Remove the subscription.
delete subscriptions[dataParsed.id]
if (!Object.values(subscriptions).some(s => s.url === baseUrl)) {
wsInstances[baseUrl].close()
if (!Object.values(subscriptions).some(s => s.url === url)) {
wsInstances[url].close()
}
}
})
} else if (wsInstances[baseUrl] && wsInstances[baseUrl].readyState === WebSocket.OPEN) {
} else if (wsInstances[url] && wsInstances[url].readyState === WebSocket.OPEN) {
// If the WebSocket connection is already open, only add the subscription.
wsInstances[baseUrl].send(subscriptionPayload)
wsInstances[url].send(subscriptionPayload)
}
})

Expand All @@ -155,13 +158,13 @@ export default async (payload, baseUrl) => {
return this
},
close: () => {
if (wsInstances[baseUrl]) {
if (wsInstances[url]) {
closeRequested = true
wsInstances[baseUrl].send(unsubscribePayload)
wsInstances[url].send(unsubscribePayload)

// Wait for the server to confirm the unsubscribe.
return new Promise(resolve => {
wsInstances[baseUrl].addEventListener('message', ({ data }) => {
wsInstances[url].addEventListener('message', ({ data }) => {
const { type, id } = JSON.parse(data)
if (id === subscriptionId && type === MESSAGE_TYPES.UNSUBSCRIBE) {
resolve()
Expand Down
4 changes: 2 additions & 2 deletions sdk/js/src/service/applications.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import autoBind from 'auto-bind'

import Marshaler from '../util/marshaler'
import subscribeToStream from '../api/stream/subscribeToStream'
import subscribeToWebSocketStream from '../api/stream/subscribeToWebSocketStream'
import combineStreams from '../util/combine-streams'
import { STACK_COMPONENTS_MAP } from '../util/constants'

Expand Down Expand Up @@ -236,7 +236,7 @@ class Applications {
distinctComponents.map(component => this._stackConfig.getComponentUrlByName(component)),
)

const streams = [...baseUrls].map(baseUrl => subscribeToStream(payload, baseUrl))
const streams = [...baseUrls].map(baseUrl => subscribeToWebSocketStream(payload, baseUrl))

// Combine all stream sources to one subscription generator.
return combineStreams(streams)
Expand Down
Loading

0 comments on commit 323b9b2

Please sign in to comment.