diff --git a/core/src/plugins/kubernetes/api.ts b/core/src/plugins/kubernetes/api.ts index 207d5380a9..a8835c7edd 100644 --- a/core/src/plugins/kubernetes/api.ts +++ b/core/src/plugins/kubernetes/api.ts @@ -10,6 +10,8 @@ // tslint:disable-next-line:no-unused import { IncomingMessage } from "http" import { Agent } from "https" +import chalk from "chalk" +import httpStatusCodes from "http-status-codes" import { ReadStream } from "tty" import { KubeConfig, @@ -51,7 +53,7 @@ import { } from "./types" import { LogEntry } from "../../logger/log-entry" import { kubectl } from "./kubectl" -import { urlJoin } from "../../util/string" +import { deline, urlJoin } from "../../util/string" import { KubernetesProvider } from "./config" import { StringMap } from "../../config/common" import { PluginContext } from "../../plugin-context" @@ -178,7 +180,7 @@ export class KubeApi { public policy: WrappedApi public rbac: WrappedApi - constructor(public context: string, private config: KubeConfig) { + constructor(public log: LogEntry, public context: string, private config: KubeConfig) { const cluster = this.config.getCurrentCluster() if (!cluster) { @@ -190,13 +192,13 @@ export class KubeApi { for (const [name, cls] of Object.entries(apiTypes)) { const api = new cls(cluster.server) - this[name] = this.wrapApi(api, this.config) + this[name] = this.wrapApi(log, api, this.config) } } static async factory(log: LogEntry, ctx: PluginContext, provider: KubernetesProvider) { const config = await getContextConfig(log, ctx, provider) - return new KubeApi(provider.config.context, config) + return new KubeApi(log, provider.config.context, config) } async getApiInfo(): Promise { @@ -341,12 +343,14 @@ export class KubeApi { // apply auth await this.config.applyToRequest(requestOpts) - try { - log.silly(`${requestOpts.method.toUpperCase()} ${url}`) - return await request(requestOpts) - } catch (err) { - throw handleRequestPromiseError(err) - } + return await requestWithRetry(log, path, async () => { + try { + log.silly(`${requestOpts.method.toUpperCase()} ${url}`) + return await request(requestOpts) + } catch (err) { + throw handleRequestPromiseError(err) + } + }) } /** @@ -593,7 +597,7 @@ export class KubeApi { /** * Wrapping the API objects to deal with bugs. */ - private wrapApi(api: T, config: KubeConfig): T { + private wrapApi(log: LogEntry, api: T, config: KubeConfig): T { api.setDefaultAuthentication(config) api.addInterceptor((opts) => { @@ -615,28 +619,30 @@ export class KubeApi { target["defaultHeaders"] = { ...defaultHeaders, "content-type": "application/merge-patch+json" } } - const output = target[name](...args) - target["defaultHeaders"] = defaultHeaders - - if (typeof output.then === "function") { - return ( - output - // return the result body directly if applicable - .then((res: any) => { - if (isPlainObject(res) && res.hasOwnProperty("body")) { - return res["body"] - } else { - return res - } - }) - // the API errors are not properly formed Error objects - .catch((err: Error) => { - throw wrapError(err) - }) - ) - } - - return output + return requestWithRetry(log, name, () => { + const output = target[name](...args) + target["defaultHeaders"] = defaultHeaders + + if (typeof output.then === "function") { + return ( + output + // return the result body directly if applicable + .then((res: any) => { + if (isPlainObject(res) && res.hasOwnProperty("body")) { + return res["body"] + } else { + return res + } + }) + // the API errors are not properly formed Error objects + .catch((err: Error) => { + throw wrapError(err) + }) + ) + } + + return output + }) } }, }) @@ -881,3 +887,70 @@ function handleRequestPromiseError(err: Error) { return wrapError(err) } } + +/** + * Helper function for retrying failed k8s API requests, using exponential backoff. + * + * Only retries the request when it fails with an error that matches certain status codes and/or error + * message contents (see the `shouldRetry` helper for details). + * + * The rationale here is that some errors occur because of network issues, intermittent timeouts etc. + * and should be retried automatically. + */ +async function requestWithRetry( + log: LogEntry, + description: string, + req: () => Promise, + opts?: { maxRetries?: number; minTimeoutMs?: number } +): Promise { + const maxRetries = opts?.maxRetries || 5 + const minTimeoutMs = opts?.minTimeoutMs || 500 + let retryLog: LogEntry | undefined = undefined + const retry = async (usedRetries: number): Promise => { + try { + return await req() + } catch (err) { + if (shouldRetry(err)) { + retryLog = retryLog || log.warn("") + if (usedRetries <= maxRetries) { + const sleepMsec = minTimeoutMs + usedRetries * minTimeoutMs + retryLog.setState(deline` + Kubernetes API: ${description} failed with error ${err.message}, retrying in ${sleepMsec}ms + (#${usedRetries}/${maxRetries}) + `) + await sleep(sleepMsec) + return await retry(usedRetries + 1) + } else { + if (usedRetries === maxRetries) { + retryLog.setState(chalk.red(`Kubernetes API: Maximum retry count exceeded`)) + } + throw err + } + } else { + throw err + } + } + } + const result = await retry(1) + return result +} + +/** + * This helper determines whether an error thrown by a k8s API request should result in the request being retried. + * + * Add more error codes / regexes / filters etc. here as needed. + */ +function shouldRetry(err: any): boolean { + const code = err.statusCode + const msg = err.message || "" + return (code && statusCodesForRetry.includes(code)) || !!errorMessageRegexesForRetry.find((regex) => msg.match(regex)) +} + +const statusCodesForRetry: number[] = [ + httpStatusCodes.BAD_GATEWAY, + httpStatusCodes.GATEWAY_TIMEOUT, + httpStatusCodes.REQUEST_TIMEOUT, + httpStatusCodes.SERVICE_UNAVAILABLE, +] + +const errorMessageRegexesForRetry = [/getaddrinfo ENOTFOUND/, /getaddrinfo EAI_AGAIN/]