Skip to content

Commit

Permalink
fix: refactor waitForPod and waitForPodConditions to stabilize code t…
Browse files Browse the repository at this point in the history
…imeouts during e2e tests (#317)

Signed-off-by: Jeromy Cannon <[email protected]>
  • Loading branch information
jeromy-cannon authored May 21, 2024
1 parent 2255374 commit ca7b21b
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 80 deletions.
4 changes: 2 additions & 2 deletions src/commands/network.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ export class NetworkCommand extends BaseCommand {
subTasks.push({
title: `Check Node: ${chalk.yellow(nodeId)}`,
task: () =>
self.k8.waitForPod(constants.POD_STATUS_RUNNING, [
self.k8.waitForPods([constants.POD_PHASE_RUNNING], [
'fullstack.hedera.com/type=network-node',
`fullstack.hedera.com/node-name=${nodeId}`
], 1, 60 * 15, 1000) // timeout 15 minutes
Expand Down Expand Up @@ -374,7 +374,7 @@ export class NetworkCommand extends BaseCommand {
{
title: 'Waiting for network pods to be ready',
task: async (ctx, _) => {
await this.k8.waitForPod(constants.POD_STATUS_RUNNING, [
await this.k8.waitForPods([constants.POD_PHASE_RUNNING], [
'fullstack.hedera.com/type=network-node'
], 1)
}
Expand Down
2 changes: 1 addition & 1 deletion src/commands/node.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ export class NodeCommand extends BaseCommand {
throw new FullstackTestingError(`failed to stop portForward for podName ${podName} with localPort ${localPort}: ${e.message}`, e)
}
try {
await this.k8.recyclePodByLabels(podLabels, 50)
await this.k8.recyclePodByLabels(podLabels)
} catch (e) {
throw new FullstackTestingError(`failed to recycle pod for podName ${podName} with localPort ${localPort}: ${e.message}`, e)
}
Expand Down
2 changes: 1 addition & 1 deletion src/commands/relay.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ export class RelayCommand extends BaseCommand {

await self.chartManager.install(namespace, releaseName, chartPath, '', valuesArg)

await self.k8.waitForPod(constants.POD_STATUS_RUNNING, [
await self.k8.waitForPods([constants.POD_PHASE_RUNNING], [
'app=hedera-json-rpc-relay',
`app.kubernetes.io/instance=${releaseName}`
], 1, 900, 1000)
Expand Down
2 changes: 1 addition & 1 deletion src/core/constants.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export const ACCOUNT_CREATE_BATCH_SIZE = process.env.ACCOUNT_CREATE_BATCH_SIZE |
export const NODE_PROXY_USER_ID = process.env.NODE_PROXY_USER_ID || 'admin'
export const NODE_PROXY_PASSWORD = process.env.NODE_PROXY_PASSWORD || 'adminpwd'

export const POD_STATUS_RUNNING = 'Running'
export const POD_PHASE_RUNNING = 'Running'

export const POD_CONDITION_INITIALIZED = 'Initialized'
export const POD_CONDITION_READY = 'Ready'
Expand Down
125 changes: 52 additions & 73 deletions src/core/k8.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ export class K8 {
}
}

async recyclePodByLabels (podLabels, maxAttempts = 50) {
async recyclePodByLabels (podLabels, maxAttempts = 30, delay = 2000, waitForPodMaxAttempts = 10, waitForPodDelay = 2000) {
const podArray = await this.getPodsByLabel(podLabels)
for (const pod of podArray) {
const podName = pod.metadata.name
Expand All @@ -776,65 +776,83 @@ export class K8 {

let attempts = 0
while (attempts++ < maxAttempts) {
// wait longer for pods to be deleted and recreated when running in CI with high loads of parallel runners
const status = await this.waitForPod(constants.POD_STATUS_RUNNING, podLabels, 1, 120, 2000)
if (status) {
const newPods = await this.getPodsByLabel(podLabels)
if (newPods.length === podArray.length) return newPods
try {
const pods = await this.waitForPods([constants.POD_PHASE_RUNNING],
podLabels, 1, waitForPodMaxAttempts, waitForPodDelay)
if (pods.length === podArray.length) {
return pods
}
} catch (e) {
this.logger.warn(`deleted pod still not running [${podLabels.join(',')}, attempt: ${attempts}/${maxAttempts}]`)
}

await sleep(2000)
await sleep(delay)
}

throw new FullstackTestingError(`pods are not running after deletion with labels [${podLabels.join(',')}]`)
}

/**
* Wait for pod
* @param status phase of the pod
* @param phases an array of acceptable phases of the pods
* @param labels pod labels
* @param podCount number of pod expected
* @param maxAttempts maximum attempts to check
* @param delay delay between checks in milliseconds
* @param podItemPredicate a predicate function to check the pod item
* @return a Promise that checks the status of an array of pods
*/
async waitForPod (status = 'Running', labels = [], podCount = 1, maxAttempts = 10, delay = 500) {
async waitForPods (phases = [constants.POD_PHASE_RUNNING], labels = [], podCount = 1, maxAttempts = 10, delay = 500, podItemPredicate) {
const ns = this._getNamespace()
const fieldSelector = `status.phase=${status}`
const labelSelector = labels.join(',')

this.logger.debug(`WaitForPod [namespace:${ns}, fieldSector(${fieldSelector}, labelSelector: ${labelSelector}], maxAttempts: ${maxAttempts}`)
this.logger.debug(`WaitForPod [namespace:${ns}, labelSelector: ${labelSelector}], maxAttempts: ${maxAttempts}`)

return new Promise((resolve, reject) => {
let attempts = 0

const check = async () => {
this.logger.debug(`Checking for pod [namespace:${ns}, fieldSector(${fieldSelector}, labelSelector: ${labelSelector}] [attempt: ${attempts}/${maxAttempts}]`)
const check = async (resolve, reject) => {
this.logger.debug(`Checking for pod [namespace:${ns}, labelSelector: ${labelSelector}] [attempt: ${attempts}/${maxAttempts}]`)

// wait for the pod to be available with the given status and labels
const resp = await this.kubeClient.listNamespacedPod(
ns,
false,
false,
undefined,
fieldSelector,
undefined,
labelSelector,
podCount
)

this.logger.debug(`${resp.body.items.length}/${podCount} pod found [namespace:${ns}, fieldSector(${fieldSelector}, labelSelector: ${labelSelector}] [attempt: ${attempts}/${maxAttempts}]`)
if (resp.body && resp.body.items && resp.body.items.length === podCount) {
return resolve(resp.body.items)
this.logger.debug(`${resp.body?.items?.length}/${podCount} pod found [namespace:${ns}, labelSelector: ${labelSelector}] [attempt: ${attempts}/${maxAttempts}]`)
if (resp.body?.items?.length === podCount) {
let phaseMatchCount = 0
let predicateMatchCount = 0

for (const item of resp.body.items) {
if (phases.includes(item.status?.phase)) {
phaseMatchCount++
}

if (podItemPredicate && podItemPredicate(item)) {
predicateMatchCount++
}
}

if (phaseMatchCount === podCount && (!podItemPredicate || (predicateMatchCount === podCount))) {
return resolve(resp.body.items)
}
}

if (attempts++ < maxAttempts) {
setTimeout(check, delay)
if (++attempts < maxAttempts) {
setTimeout(() => check(resolve, reject), delay)
} else {
return reject(new FullstackTestingError(`Expected number of pod (${podCount}) not found ${fieldSelector} ${labelSelector} [attempts = ${attempts}/${maxAttempts}]`))
return reject(new FullstackTestingError(`Expected number of pod (${podCount}) not found for labels: ${labelSelector}, phases: ${phases.join(',')} [attempts = ${attempts}/${maxAttempts}]`))
}
}

check()
check(resolve, reject)
})
}

Expand All @@ -848,7 +866,7 @@ export class K8 {
*/
async waitForPodReady (labels = [], podCount = 1, maxAttempts = 10, delay = 500) {
try {
return await this.waitForPodCondition(K8.PodReadyCondition, labels, podCount, maxAttempts, delay)
return await this.waitForPodConditions(K8.PodReadyCondition, labels, podCount, maxAttempts, delay)
} catch (e) {
throw new FullstackTestingError(`Pod not ready [maxAttempts = ${maxAttempts}]`, e)
}
Expand All @@ -864,67 +882,28 @@ export class K8 {
* @return {Promise<unknown>}
*/

async waitForPodCondition (
async waitForPodConditions (
conditionsMap,
labels = [],
podCount = 1, maxAttempts = 10, delay = 500) {
if (!conditionsMap || conditionsMap.size === 0) throw new MissingArgumentError('pod conditions are required')
const ns = this._getNamespace()
const labelSelector = labels.join(',')

this.logger.debug(`WaitForCondition [namespace:${ns}, conditions = ${conditionsMap.toString()} labelSelector: ${labelSelector}], maxAttempts: ${maxAttempts}`)

return new Promise((resolve, reject) => {
let attempts = 0

const check = async () => {
this.logger.debug(`Checking for pod ready [namespace:${ns}, labelSelector: ${labelSelector}] [attempt: ${attempts}/${maxAttempts}]`)

// wait for the pod to be available with the given status and labels
let pods
try {
pods = await this.waitForPod(constants.POD_STATUS_RUNNING, labels, podCount, maxAttempts, delay)
this.logger.debug(`${pods.length}/${podCount} pod found [namespace:${ns}, labelSelector: ${labelSelector}] [attempt: ${attempts}/${maxAttempts}]`)

if (pods.length >= podCount) {
const podWithMatchedCondition = []

// check conditions
for (const pod of pods) {
let matchedCondition = 0
for (const cond of pod.status.conditions) {
for (const entry of conditionsMap.entries()) {
const condType = entry[0]
const condStatus = entry[1]
if (cond.type === condType && cond.status === condStatus) {
this.logger.debug(`Pod condition met for ${pod.metadata.name} [type: ${cond.type} status: ${cond.status}]`)
matchedCondition++
}
}

if (matchedCondition >= conditionsMap.size) {
podWithMatchedCondition.push(pod)
break
}
}
}

if (podWithMatchedCondition.length >= podCount) {
return resolve(podWithMatchedCondition)
return await this.waitForPods([constants.POD_PHASE_RUNNING], labels, podCount, maxAttempts, delay, (pod) => {
if (pod.status?.conditions?.length > 0) {
for (const cond of pod.status.conditions) {
for (const entry of conditionsMap.entries()) {
const condType = entry[0]
const condStatus = entry[1]
if (cond.type === condType && cond.status === condStatus) {
this.logger.debug(`Pod condition met for ${pod.metadata.name} [type: ${cond.type} status: ${cond.status}]`)
return true
}
}
} catch (e) {
this.logger.error(`Pod not found with expected conditions [maxAttempts = ${maxAttempts}], ${e.message}`, e)
}

if (attempts++ < maxAttempts) {
setTimeout(check, delay)
} else {
return reject(new FullstackTestingError(`Pod not found with expected conditions [maxAttempts = ${maxAttempts}]`))
}
}

check()
// condition not found
return false
})
}

Expand Down
4 changes: 2 additions & 2 deletions test/e2e/core/k8_e2e.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ describe('K8', () => {
'fullstack.hedera.com/type=network-node'
]

const pods = await k8.waitForPod(constants.POD_STATUS_RUNNING, labels, 1)
const pods = await k8.waitForPods([constants.POD_PHASE_RUNNING], labels, 1)
expect(pods.length).toStrictEqual(1)
})

Expand All @@ -149,7 +149,7 @@ describe('K8', () => {
.set(constants.POD_CONDITION_INITIALIZED, constants.POD_CONDITION_STATUS_TRUE)
.set(constants.POD_CONDITION_POD_SCHEDULED, constants.POD_CONDITION_STATUS_TRUE)
.set(constants.POD_CONDITION_READY, constants.POD_CONDITION_STATUS_TRUE)
const pods = await k8.waitForPodCondition(conditions, labels, 1)
const pods = await k8.waitForPodConditions(conditions, labels, 1)
expect(pods.length).toStrictEqual(1)
})

Expand Down
Loading

0 comments on commit ca7b21b

Please sign in to comment.