Skip to content

Commit

Permalink
fix(k8s): automatic retry for failed API requests
Browse files Browse the repository at this point in the history
We now automatically retry failed Kubernetes API requests if the reason
for the failure matches certain conditions.

For example, timeouts or DNS-related errors will result in retries, but
not 404/not found errors (and so forth), which will be thrown without
retrying.

We can easily add more error codes and/or conditions to this logic if we
discover further error cases that should result in retries.
  • Loading branch information
thsig committed May 17, 2021
1 parent ea11bb6 commit 72165da
Showing 1 changed file with 106 additions and 33 deletions.
139 changes: 106 additions & 33 deletions core/src/plugins/kubernetes/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
// tslint:disable-next-line:no-unused
import { IncomingMessage } from "http"
import { Agent } from "https"
import chalk from "chalk"
import httpStatusCodes from "http-status-codes"
import { ReadStream } from "tty"
import {
KubeConfig,
Expand Down Expand Up @@ -51,7 +53,7 @@ import {
} from "./types"
import { LogEntry } from "../../logger/log-entry"
import { kubectl } from "./kubectl"
import { urlJoin } from "../../util/string"
import { deline, urlJoin } from "../../util/string"
import { KubernetesProvider } from "./config"
import { StringMap } from "../../config/common"
import { PluginContext } from "../../plugin-context"
Expand Down Expand Up @@ -178,7 +180,7 @@ export class KubeApi {
public policy: WrappedApi<PolicyV1beta1Api>
public rbac: WrappedApi<RbacAuthorizationV1Api>

constructor(public context: string, private config: KubeConfig) {
constructor(public log: LogEntry, public context: string, private config: KubeConfig) {
const cluster = this.config.getCurrentCluster()

if (!cluster) {
Expand All @@ -190,13 +192,13 @@ export class KubeApi {

for (const [name, cls] of Object.entries(apiTypes)) {
const api = new cls(cluster.server)
this[name] = this.wrapApi(api, this.config)
this[name] = this.wrapApi(log, api, this.config)
}
}

static async factory(log: LogEntry, ctx: PluginContext, provider: KubernetesProvider) {
const config = await getContextConfig(log, ctx, provider)
return new KubeApi(provider.config.context, config)
return new KubeApi(log, provider.config.context, config)
}

async getApiInfo(): Promise<ApiInfo> {
Expand Down Expand Up @@ -341,12 +343,14 @@ export class KubeApi {
// apply auth
await this.config.applyToRequest(requestOpts)

try {
log.silly(`${requestOpts.method.toUpperCase()} ${url}`)
return await request(requestOpts)
} catch (err) {
throw handleRequestPromiseError(err)
}
return await requestWithRetry(log, path, async () => {
try {
log.silly(`${requestOpts.method.toUpperCase()} ${url}`)
return await request(requestOpts)
} catch (err) {
throw handleRequestPromiseError(err)
}
})
}

/**
Expand Down Expand Up @@ -593,7 +597,7 @@ export class KubeApi {
/**
* Wrapping the API objects to deal with bugs.
*/
private wrapApi<T extends K8sApi>(api: T, config: KubeConfig): T {
private wrapApi<T extends K8sApi>(log: LogEntry, api: T, config: KubeConfig): T {
api.setDefaultAuthentication(config)

api.addInterceptor((opts) => {
Expand All @@ -615,28 +619,30 @@ export class KubeApi {
target["defaultHeaders"] = { ...defaultHeaders, "content-type": "application/merge-patch+json" }
}

const output = target[name](...args)
target["defaultHeaders"] = defaultHeaders

if (typeof output.then === "function") {
return (
output
// return the result body directly if applicable
.then((res: any) => {
if (isPlainObject(res) && res.hasOwnProperty("body")) {
return res["body"]
} else {
return res
}
})
// the API errors are not properly formed Error objects
.catch((err: Error) => {
throw wrapError(err)
})
)
}

return output
return requestWithRetry(log, name, () => {
const output = target[name](...args)
target["defaultHeaders"] = defaultHeaders

if (typeof output.then === "function") {
return (
output
// return the result body directly if applicable
.then((res: any) => {
if (isPlainObject(res) && res.hasOwnProperty("body")) {
return res["body"]
} else {
return res
}
})
// the API errors are not properly formed Error objects
.catch((err: Error) => {
throw wrapError(err)
})
)
}

return output
})
}
},
})
Expand Down Expand Up @@ -881,3 +887,70 @@ function handleRequestPromiseError(err: Error) {
return wrapError(err)
}
}

/**
* Helper function for retrying failed k8s API requests, using exponential backoff.
*
* Only retries the request when it fails with an error that matches certain status codes and/or error
* message contents (see the `shouldRetry` helper for details).
*
* The rationale here is that some errors occur because of network issues, intermittent timeouts etc.
* and should be retried automatically.
*/
async function requestWithRetry<R>(
log: LogEntry,
description: string,
req: () => Promise<R>,
opts?: { maxRetries?: number; minTimeoutMs?: number }
): Promise<R> {
const maxRetries = opts?.maxRetries || 5
const minTimeoutMs = opts?.minTimeoutMs || 500
let retryLog: LogEntry | undefined = undefined
const retry = async (usedRetries: number): Promise<R> => {
try {
return await req()
} catch (err) {
if (shouldRetry(err)) {
retryLog = retryLog || log.warn("")
if (usedRetries <= maxRetries) {
const sleepMsec = minTimeoutMs + usedRetries * minTimeoutMs
retryLog.setState(deline`
Kubernetes API: ${description} failed with error ${err.message}, retrying in ${sleepMsec}ms
(#${usedRetries}/${maxRetries})
`)
await sleep(sleepMsec)
return await retry(usedRetries + 1)
} else {
if (usedRetries === maxRetries) {
retryLog.setState(chalk.red(`Kubernetes API: Maximum retry count exceeded`))
}
throw err
}
} else {
throw err
}
}
}
const result = await retry(1)
return result
}

/**
* This helper determines whether an error thrown by a k8s API request should result in the request being retried.
*
* Add more error codes / regexes / filters etc. here as needed.
*/
function shouldRetry(err: any): boolean {
const code = err.statusCode
const msg = err.message || ""
return (code && statusCodesForRetry.includes(code)) || !!errorMessageRegexesForRetry.find((regex) => msg.match(regex))
}

const statusCodesForRetry: number[] = [
httpStatusCodes.BAD_GATEWAY,
httpStatusCodes.GATEWAY_TIMEOUT,
httpStatusCodes.REQUEST_TIMEOUT,
httpStatusCodes.SERVICE_UNAVAILABLE,
]

const errorMessageRegexesForRetry = [/getaddrinfo ENOTFOUND/, /getaddrinfo EAI_AGAIN/]

0 comments on commit 72165da

Please sign in to comment.