Skip to content
This repository has been archived by the owner on Jul 26, 2022. It is now read-only.

fix: use getObjectStream to address deprecation warning in kubernetes-client #664

Merged
merged 3 commits into from
Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 7 additions & 13 deletions lib/external-secret.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
'use strict'

const JSONStream = require('json-stream')

/**
* Creates an FIFO queue which you can put to and take from.
* If theres nothing to take it will wait with resolving until
Expand Down Expand Up @@ -37,14 +35,11 @@ async function startWatcher ({
while (true) {
logger.debug('Starting watch stream for namespace %s', loggedNamespaceName)

const stream = kubeClient
const stream = await kubeClient
.apis[customResourceManifest.spec.group]
.v1.watch
.namespaces(namespace)[customResourceManifest.spec.names.plural]
.getStream()

const jsonStream = new JSONStream()
stream.pipe(jsonStream)
.getObjectStream()

let timeout
const restartTimeout = () => {
Expand All @@ -55,23 +50,23 @@ async function startWatcher ({
const timeMs = watchTimeout
timeout = setTimeout(() => {
logger.info(`No watch event for ${timeMs} ms, restarting watcher for ${loggedNamespaceName}`)
stream.abort()
stream.end()
}, timeMs)
timeout.unref()
}

jsonStream.on('data', (evt) => {
stream.on('data', (evt) => {
eventQueue.put(evt)
restartTimeout()
})

jsonStream.on('error', (err) => {
stream.on('error', (err) => {
logger.warn(err, 'Got error on stream for namespace %s', loggedNamespaceName)
deathQueue.put('ERROR')
clearTimeout(timeout)
})

jsonStream.on('end', () => {
stream.on('end', () => {
deathQueue.put('END')
clearTimeout(timeout)
})
Expand All @@ -80,8 +75,7 @@ async function startWatcher ({

logger.info('Stopping watch stream for namespace %s due to event: %s', loggedNamespaceName, deathEvent)
eventQueue.put({ type: 'DELETED_ALL' })

stream.abort()
stream.end()
}
} catch (err) {
logger.error(err, 'Watcher for namespace %s crashed', loggedNamespaceName)
Expand Down
55 changes: 26 additions & 29 deletions lib/external-secret.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ const { getExternalSecretEvents } = require('./external-secret')
describe('getExternalSecretEvents', () => {
let kubeClientMock
let watchedNamespaces
let externalSecretsApiMock
let fakeCustomResourceManifest
let loggerMock
let mockedStream
let externalsecrets

beforeEach(() => {
fakeCustomResourceManifest = {
Expand All @@ -24,12 +23,10 @@ describe('getExternalSecretEvents', () => {
}
}
}
externalSecretsApiMock = sinon.mock()

mockedStream = new Readable()
mockedStream._read = () => { }

externalSecretsApiMock.get = sinon.stub()
externalsecrets = {
getObjectStream: () => undefined
}

kubeClientMock = {
apis: {
Expand All @@ -38,9 +35,7 @@ describe('getExternalSecretEvents', () => {
watch: {
namespaces: () => {
return {
externalsecrets: {
getStream: () => mockedStream
}
externalsecrets
}
}
}
Expand Down Expand Up @@ -69,6 +64,27 @@ describe('getExternalSecretEvents', () => {
spec: { backendType: 'secretsManager', data: [] }
}

const fakeStream = Readable.from([
{
type: 'MODIFIED',
object: fakeExternalSecretObject
},
{
type: 'ADDED',
object: fakeExternalSecretObject
},
{
type: 'DELETED',
object: fakeExternalSecretObject
},
{
type: 'DELETED_ALL'
}
])

fakeStream.end = sinon.stub()
externalsecrets.getObjectStream = () => fakeStream

const events = getExternalSecretEvents({
kubeClient: kubeClientMock,
watchedNamespaces: watchedNamespaces,
Expand All @@ -77,25 +93,6 @@ describe('getExternalSecretEvents', () => {
watchTimeout: 5000
})

mockedStream.push(`${JSON.stringify({
type: 'MODIFIED',
object: fakeExternalSecretObject
})}\n`)

mockedStream.push(`${JSON.stringify({
type: 'ADDED',
object: fakeExternalSecretObject
})}\n`)

mockedStream.push(`${JSON.stringify({
type: 'DELETED',
object: fakeExternalSecretObject
})}\n`)

mockedStream.push(`${JSON.stringify({
type: 'DELETED_ALL'
})}\n`)

const modifiedEvent = await events.next()
expect(modifiedEvent.value.type).is.equal('MODIFIED')
expect(modifiedEvent.value.object).is.deep.equal(fakeExternalSecretObject)
Expand Down
3 changes: 1 addition & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
"aws-sdk": "^2.628.0",
"express": "^4.17.1",
"js-yaml": "^3.14.1",
"json-stream": "^1.0.0",
"kubernetes-client": "^9.0.0",
"lodash.clonedeep": "^4.5.0",
"lodash.mapvalues": "^4.6.0",
Expand Down