Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

console: Rollback old http stream #6718

Merged
merged 2 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/webui/console/containers/device-importer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ const DeviceImporter = () => {
devices = await new Promise((resolve, reject) => {
const chunks = []

templateStream.on('chunk', message => {
templateStream.on('message', message => {
appendToLog(message)
chunks.push(message)
})
Expand Down Expand Up @@ -252,7 +252,7 @@ const DeviceImporter = () => {
createStream.current = tts.Applications.Devices.bulkCreate(appId, devices)

await new Promise(resolve => {
createStream.current.on('chunk', handleCreationSuccess)
createStream.current.on('message', handleCreationSuccess)
createStream.current.on('error', handleCreationError)
createStream.current.on('close', resolve)

Expand Down
9 changes: 8 additions & 1 deletion sdk/js/src/api/http.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2019 The Things Network Foundation, The Things Industries B.V.

Check warning on line 1 in sdk/js/src/api/http.js

View workflow job for this annotation

GitHub Actions / Check Mergeability

sdk/js/src/api/http.js has a conflict when merging TheThingsIndustries/lorawan-stack:v3.28.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,8 @@
RATE_LIMIT_RETRIES,
} from '../util/constants'

import subscribeToHttpStream from './stream/subscribeToHttpStream'

/**
* Http Class is a connector for the API that uses the HTTP bridge to connect.
*/
Expand Down Expand Up @@ -85,7 +87,7 @@
}
}

async handleRequest(method, endpoint, component, payload = {}) {
async handleRequest(method, endpoint, component, payload = {}, isStream) {
const parsedComponent = component || this._parseStackComponent(endpoint)
if (!this._stackConfig.isComponentAvailable(parsedComponent)) {
// If the component has not been defined in The Things Stack config, make no
Expand All @@ -96,6 +98,11 @@
}

try {
if (isStream) {
const url = this._stackConfig.getComponentUrlByName(parsedComponent) + endpoint
return subscribeToHttpStream(payload, url)
}

const config = {
method,
url: endpoint,
Expand Down
3 changes: 2 additions & 1 deletion sdk/js/src/api/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,13 @@ Signature tried: ${paramSignature}`)
}

let route = endpoint.pattern
const isStream = Boolean(endpoint.stream)

for (const parameter of endpoint.parameters) {
route = route.replace(`{${parameter}}`, routeParams[parameter])
}

return connector.handleRequest(endpoint.method, route, component, payload)
return connector.handleRequest(endpoint.method, route, component, payload, isStream)
}

this[serviceName][`${rpcName}AllowedFieldMaskPaths`] = rpc.allowedFieldMaskPaths
Expand Down
25 changes: 21 additions & 4 deletions sdk/js/src/api/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ describe('API class', () => {
'/users/test/applications',
undefined,
{ name: 'test-name' },
false,
)
})

Expand All @@ -105,9 +106,25 @@ describe('API class', () => {
api.ApplicationRegistry.List(undefined, { limit: 2, page: 1 })

expect(api._connector.handleRequest).toHaveBeenCalledTimes(1)
expect(api._connector.handleRequest).toHaveBeenCalledWith('get', '/applications', undefined, {
limit: 2,
page: 1,
})
expect(api._connector.handleRequest).toHaveBeenCalledWith(
'get',
'/applications',
undefined,
{ limit: 2, page: 1 },
false,
)
})

it('sets stream value to true for streaming endpoints', () => {
api.ApplicationRegistry.Events()

expect(api._connector.handleRequest).toHaveBeenCalledTimes(1)
expect(api._connector.handleRequest).toHaveBeenCalledWith(
'get',
'/events',
undefined,
undefined,
true,
)
})
})
145 changes: 145 additions & 0 deletions sdk/js/src/api/stream/subscribeToHttpStream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright © 2019 The Things Network Foundation, The Things Industries B.V.

Check warning on line 1 in sdk/js/src/api/stream/subscribeToHttpStream.js

View workflow job for this annotation

GitHub Actions / Check Mergeability

sdk/js/src/api/stream/subscribeToHttpStream.js has a conflict when merging TheThingsIndustries/lorawan-stack:v3.28.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import ArrayBufferToString from 'arraybuffer-to-string'

import Token from '../../util/token'

import { notify, EVENTS } from './shared'
import 'web-streams-polyfill/dist/polyfill'

/**
* Opens a new stream.
*
* @async
* @param {object} payload - - The body of the initial request.
* @param {string} url - The stream endpoint.
*
* @example
* (async () => {
* const stream = await stream(
* { identifiers: [{ application_ids: { application_id: 'my-app' }}]},
* '/api/v3/events',
* )
*
* // Add listeners to the stream.
* stream
* .on('start', () => console.log('conn opened'))
* .on('message', message => console.log('received message', message))
* .on('error', error => console.log(error))
* .on('close', wasClientRequest => console.log(wasClientRequest ? 'conn closed by client' : 'conn closed by server'))
*
* // Start the stream after attaching listerners.
* stream.open()
*
* // Close the stream after 20 s.
* setTimeout(() => stream.close(), 20000)
* })()
*
* @returns {object} The stream subscription object with the `on` function for
* attaching listeners and the `close` function to close the stream.
*/
export default async (payload, url) => {
const initialListeners = Object.values(EVENTS).reduce(
(acc, curr) => ({ ...acc, [curr]: null }),
{},
)
let listeners = initialListeners
let closeRequested = false
const token = new Token().get()

let Authorization = null
if (typeof token === 'function') {
Authorization = `Bearer ${(await token()).access_token}`
} else {
Authorization = `Bearer ${token}`
}

const abortController = new AbortController()
const response = await fetch(url, {
body: JSON.stringify(payload),
method: 'POST',
signal: abortController.signal,
headers: {
Authorization,
Accept: 'text/event-stream',
},
})

if (response.status !== 200) {
const err = await response.json()

throw 'error' in err ? err.error : err
}

let buffer = ''
const reader = response.body.getReader()
const onMessage = ({ done, value }) => {
if (done) {
notify(listeners[EVENTS.CLOSE], closeRequested)
listeners = initialListeners
return
}

const parsed = ArrayBufferToString(value)
buffer += parsed
const lines = buffer.split(/\n\n/)
buffer = lines.pop()
for (const line of lines) {
notify(listeners[EVENTS.MESSAGE], JSON.parse(line).result)
}

return reader.read().then(onMessage)
}

return {
open: () => {
reader
.read()
.then(data => {
notify(listeners[EVENTS.START])

return data
})
.then(onMessage)
.catch(error => {
notify(listeners[EVENTS.ERROR], error)
listeners = initialListeners
})
},
on(eventName, callback) {
if (listeners[eventName] === undefined) {
throw new Error(
`${eventName} event is not supported. Should be one of: start, error, message or close`,
)
}

listeners[eventName] = callback

return this
},
close: () => {
closeRequested = true

reader
.cancel()
.then(() => {
abortController.abort()
})
.catch(error => {
notify(listeners[EVENTS.ERROR], error)
})
},
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ const initialListeners = Object.values(EVENTS).reduce((acc, curr) => ({ ...acc,
* @async
* @param {object} payload - - The body of the initial request.
* @param {string} baseUrl - The stream baseUrl.
* @param {string} endpoint - The stream endpoint.
*
* @example
* (async () => {
* const stream = await stream(
* { identifiers: [{ application_ids: { application_id: 'my-app' }}]},
* 'http://localhost:8080/api/v3',
* 'http://localhost:8080',
* '/api/v3',
* )
*
* // Add listeners to the stream.
Expand All @@ -49,7 +51,7 @@ const initialListeners = Object.values(EVENTS).reduce((acc, curr) => ({ ...acc,
* @returns {object} The stream subscription object with the `on` function for
* attaching listeners and the `close` function to close the stream.
*/
export default async (payload, baseUrl) => {
export default async (payload, baseUrl, endpoint = '/console/internal/events/') => {
const subscriptionId = Date.now()
const subscriptionPayload = JSON.stringify({
type: MESSAGE_TYPES.SUBSCRIBE,
Expand All @@ -61,48 +63,49 @@ export default async (payload, baseUrl) => {
id: subscriptionId,
})
let closeRequested = false
const url = baseUrl + endpoint

await new Promise(async resolve => {
// Add the new subscription to the subscriptions object.
// Also add the resolver function to the subscription object to be able
// to resolve the promise after the subscription confirmation message.
subscriptions = {
...subscriptions,
[subscriptionId]: { ...initialListeners, url: baseUrl, _resolver: resolve },
[subscriptionId]: { ...initialListeners, url, _resolver: resolve },
}

const token = new Token().get()
const tokenParsed = typeof token === 'function' ? (await token()).access_token : token
const baseUrlParsed = baseUrl.replace('http', 'ws')

// Open up the WebSocket connection if it doesn't exist.
if (!wsInstances[baseUrl]) {
wsInstances[baseUrl] = new WebSocket(`${baseUrlParsed}/console/internal/events/`, [
if (!wsInstances[url]) {
wsInstances[url] = new WebSocket(`${baseUrlParsed}${endpoint}`, [
'ttn.lorawan.v3.console.internal.events.v1',
`ttn.lorawan.v3.header.authorization.bearer.${tokenParsed}`,
])

// Event listener for 'open'
wsInstances[baseUrl].addEventListener('open', () => {
wsInstances[baseUrl].send(subscriptionPayload)
wsInstances[url].addEventListener('open', () => {
wsInstances[url].send(subscriptionPayload)
})

// Broadcast connection errors to all listeners.
wsInstances[baseUrl].addEventListener('error', error => {
wsInstances[url].addEventListener('error', error => {
Object.values(subscriptions)
.filter(s => s.url === baseUrl)
.filter(s => s.url === url)
.forEach(s => notify(s[EVENTS.ERROR], error))
resolve()
})

// Event listener for 'close'
wsInstances[baseUrl].addEventListener('close', () => {
delete wsInstances[baseUrl]
wsInstances[url].addEventListener('close', () => {
delete wsInstances[url]
})

// After the WebSocket connection is open, add the event listeners.
// Wait for the subscription confirmation message before resolving.
wsInstances[baseUrl].addEventListener('message', ({ data }) => {
wsInstances[url].addEventListener('message', ({ data }) => {
const dataParsed = JSON.parse(data)
const listeners = subscriptions[dataParsed.id]

Expand Down Expand Up @@ -130,14 +133,14 @@ export default async (payload, baseUrl) => {
notify(listeners[EVENTS.CLOSE], closeRequested)
// Remove the subscription.
delete subscriptions[dataParsed.id]
if (!Object.values(subscriptions).some(s => s.url === baseUrl)) {
wsInstances[baseUrl].close()
if (!Object.values(subscriptions).some(s => s.url === url)) {
wsInstances[url].close()
}
}
})
} else if (wsInstances[baseUrl] && wsInstances[baseUrl].readyState === WebSocket.OPEN) {
} else if (wsInstances[url] && wsInstances[url].readyState === WebSocket.OPEN) {
// If the WebSocket connection is already open, only add the subscription.
wsInstances[baseUrl].send(subscriptionPayload)
wsInstances[url].send(subscriptionPayload)
}
})

Expand All @@ -155,13 +158,13 @@ export default async (payload, baseUrl) => {
return this
},
close: () => {
if (wsInstances[baseUrl]) {
if (wsInstances[url]) {
closeRequested = true
wsInstances[baseUrl].send(unsubscribePayload)
wsInstances[url].send(unsubscribePayload)

// Wait for the server to confirm the unsubscribe.
return new Promise(resolve => {
wsInstances[baseUrl].addEventListener('message', ({ data }) => {
wsInstances[url].addEventListener('message', ({ data }) => {
const { type, id } = JSON.parse(data)
if (id === subscriptionId && type === MESSAGE_TYPES.UNSUBSCRIBE) {
resolve()
Expand Down
4 changes: 2 additions & 2 deletions sdk/js/src/service/applications.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 The Things Network Foundation, The Things Industries B.V.

Check warning on line 1 in sdk/js/src/service/applications.js

View workflow job for this annotation

GitHub Actions / Check Mergeability

sdk/js/src/service/applications.js has a conflict when merging TheThingsIndustries/lorawan-stack:v3.28.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -15,7 +15,7 @@
import autoBind from 'auto-bind'

import Marshaler from '../util/marshaler'
import subscribeToStream from '../api/stream/subscribeToStream'
import subscribeToWebSocketStream from '../api/stream/subscribeToWebSocketStream'
import combineStreams from '../util/combine-streams'
import { STACK_COMPONENTS_MAP } from '../util/constants'

Expand Down Expand Up @@ -236,7 +236,7 @@
distinctComponents.map(component => this._stackConfig.getComponentUrlByName(component)),
)

const streams = [...baseUrls].map(baseUrl => subscribeToStream(payload, baseUrl))
const streams = [...baseUrls].map(baseUrl => subscribeToWebSocketStream(payload, baseUrl))

// Combine all stream sources to one subscription generator.
return combineStreams(streams)
Expand Down
Loading
Loading