Skip to content

Commit

Permalink
improvement(k8s): reduce usage of kubectl, use APIs directly
Browse files Browse the repository at this point in the history
Cleaning up a bit of tech debt here. This will make Garden processes
more light-weight, and hopefully reduce some test flakiness that has
appeared to be related to our use of kubectl when running test/task
pods.

The refactor of PodRunner is the most substantial change, so please
take a close look at that one. Tests should cover the changes pretty
well though.

I didn't go so far as to replace our use of `kubectl apply`, because
there's just too much magic that happens in that thing. Same with
`kubectl port-forward`, at least that'll be a PR of its own.

I also skipped a few instances where kubectl is being used infrequently.
  • Loading branch information
edvald committed Aug 18, 2020
1 parent dd2caf5 commit 5cdbcea
Show file tree
Hide file tree
Showing 40 changed files with 2,336 additions and 1,302 deletions.
2 changes: 1 addition & 1 deletion core/src/plugins/google/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ export async function prepareEnvironment({ status, log }: PrepareEnvironmentPara
section: "google-cloud-functions",
msg: `Initializing SDK...`,
})
await gcloud().call(["init"], { timeout: 600, tty: true })
await gcloud().call(["init"], { timeoutSec: 600, tty: true })
}

return { status: { ready: true, outputs: {} } }
Expand Down
4 changes: 2 additions & 2 deletions core/src/plugins/google/gcloud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ export class GCloud {
}

async call(args: string[], opts: SpawnOpts = {}): Promise<GCloudOutput> {
const { data, ignoreError = false, timeout = DEFAULT_TIMEOUT } = opts
const { data, ignoreError = false, timeoutSec: timeout = DEFAULT_TIMEOUT } = opts
const preparedArgs = this.prepareArgs(args)
return spawn("gcloud", preparedArgs, { ignoreError, data, timeout })
return spawn("gcloud", preparedArgs, { ignoreError, data, timeoutSec: timeout })
}

async json(args: string[], opts: GCloudParams = {}): Promise<any> {
Expand Down
268 changes: 243 additions & 25 deletions core/src/plugins/kubernetes/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,36 @@ import {
ApiextensionsV1beta1Api,
PolicyV1beta1Api,
KubernetesObject,
V1Status,
Exec,
Attach,
} from "@kubernetes/client-node"
import AsyncLock = require("async-lock")
import request = require("request-promise")
import requestErrors = require("request-promise/errors")
import { safeLoad } from "js-yaml"
import { readFile } from "fs-extra"

import { Omit, safeDumpYaml } from "../../util/util"
import { Omit, safeDumpYaml, StringCollector, sleep } from "../../util/util"
import { omitBy, isObject, isPlainObject, keyBy } from "lodash"
import { GardenBaseError, RuntimeError, ConfigurationError } from "../../exceptions"
import { KubernetesResource, KubernetesServerResource, KubernetesServerList } from "./types"
import {
KubernetesResource,
KubernetesServerResource,
KubernetesServerList,
KubernetesList,
KubernetesPod,
} from "./types"
import { LogEntry } from "../../logger/log-entry"
import { kubectl } from "./kubectl"
import { urlJoin } from "../../util/string"
import { KubernetesProvider } from "./config"
import { StringMap } from "../../config/common"
import { PluginContext } from "../../plugin-context"
import { Writable, Readable, PassThrough } from "stream"
import { WebSocketHandler } from "@kubernetes/client-node/dist/web-socket-handler"
import { getExecExitCode } from "./status/pod"
import { labelSelectorToString } from "./util"

interface ApiGroupMap {
[groupVersion: string]: V1APIGroup
Expand Down Expand Up @@ -228,15 +241,7 @@ export class KubeApi {
return group
}

async getApiResourceInfo(log: LogEntry, manifest: KubernetesResource): Promise<V1APIResource> {
const apiVersion = manifest.apiVersion

if (!apiVersion) {
throw new KubernetesError(`Missing apiVersion on resource`, {
manifest,
})
}

async getApiResourceInfo(log: LogEntry, apiVersion: string, kind: string): Promise<V1APIResource> {
if (!cachedApiResourceInfo[this.context]) {
cachedApiResourceInfo[this.context] = {}
}
Expand All @@ -261,11 +266,12 @@ export class KubeApi {
return apiResources[apiVersion]
}))

const resource = resourceMap[manifest.kind]
const resource = resourceMap[kind]

if (!resource) {
throw new KubernetesError(`Unrecognized resource type ${manifest.apiVersion}/${manifest.kind}`, {
manifest,
throw new KubernetesError(`Unrecognized resource type ${apiVersion}/${kind}`, {
apiVersion,
kind,
})
}

Expand Down Expand Up @@ -307,12 +313,41 @@ export class KubeApi {
async readBySpec({ log, namespace, manifest }: { log: LogEntry; namespace: string; manifest: KubernetesResource }) {
log.silly(`Fetching Kubernetes resource ${manifest.apiVersion}/${manifest.kind}/${manifest.metadata.name}`)

const apiPath = await this.getApiPath({ manifest, log, namespace })
const apiPath = await this.getResourceApiPath({ manifest, log, namespace })

const res = await this.request({ log, path: apiPath })
return res.body
}

async listResources<T extends KubernetesResource>({
log,
apiVersion,
kind,
namespace,
labelSelector,
}: {
log: LogEntry
apiVersion: string
kind: string
namespace: string
labelSelector?: { [label: string]: string }
}) {
const apiPath = await this.getResourceTypeApiPath({ log, apiVersion, kind, namespace })
const labelSelectorString = labelSelector ? labelSelectorToString(labelSelector) : undefined

const res = await this.request({ log, path: apiPath, opts: { qs: { labelSelector: labelSelectorString } } })
const list = res.body as KubernetesList<T>

// This fixes an odd issue where apiVersion and kind are sometimes missing from list items coming from the API :/
list.items = list.items.map((r) => ({
...r,
apiVersion: r.apiVersion || apiVersion,
kind: r.kind || kind,
}))

return list
}

async replace({
log,
resource,
Expand All @@ -324,7 +359,7 @@ export class KubeApi {
}) {
log.silly(`Replacing Kubernetes resource ${resource.apiVersion}/${resource.kind}/${resource.metadata.name}`)

const apiPath = await this.getApiPath({ manifest: resource, log, namespace })
const apiPath = await this.getResourceApiPath({ manifest: resource, log, namespace })

const res = await this.request({ log, path: apiPath, opts: { method: "put", body: resource } })
return res.body
Expand Down Expand Up @@ -352,7 +387,7 @@ export class KubeApi {
async deleteBySpec({ namespace, manifest, log }: { namespace: string; manifest: KubernetesResource; log: LogEntry }) {
log.silly(`Deleting Kubernetes resource ${manifest.apiVersion}/${manifest.kind}/${manifest.metadata.name}`)

const apiPath = await this.getApiPath({ manifest, log, namespace })
const apiPath = await this.getResourceApiPath({ manifest, log, namespace })

try {
await this.request({ log, path: apiPath, opts: { method: "delete" } })
Expand All @@ -363,7 +398,26 @@ export class KubeApi {
}
}

private async getApiPath({
private async getResourceTypeApiPath({
apiVersion,
kind,
log,
namespace,
}: {
apiVersion: string
kind: string
log: LogEntry
namespace: string
}) {
const resourceInfo = await this.getApiResourceInfo(log, apiVersion, kind)
const basePath = getGroupBasePath(apiVersion)

return resourceInfo.namespaced
? `${basePath}/namespaces/${namespace}/${resourceInfo.name}`
: `${basePath}/${resourceInfo.name}`
}

private async getResourceApiPath({
manifest,
log,
namespace,
Expand All @@ -372,14 +426,32 @@ export class KubeApi {
log: LogEntry
namespace?: string
}) {
const resourceInfo = await this.getApiResourceInfo(log, manifest)
const apiVersion = manifest.apiVersion
const name = manifest.metadata.name
const basePath = getGroupBasePath(apiVersion)

return resourceInfo.namespaced
? `${basePath}/namespaces/${namespace || manifest.metadata.namespace}/${resourceInfo.name}/${name}`
: `${basePath}/${resourceInfo.name}/${name}`
if (!apiVersion) {
throw new KubernetesError(`Missing apiVersion on resource`, {
manifest,
})
}

if (!namespace) {
namespace = manifest.metadata.namespace
}

if (!namespace) {
throw new KubernetesError(`Missing namespace on resource and no namespace specified`, {
manifest,
})
}

const typePath = await this.getResourceTypeApiPath({
log,
apiVersion,
kind: manifest.kind,
namespace,
})

return typePath + "/" + manifest.metadata.name
}

async upsert<K extends keyof CrudMap, O extends KubernetesResource<CrudMapTypes[K]>>({
Expand Down Expand Up @@ -468,6 +540,151 @@ export class KubeApi {
},
})
}

/**
* Exec a command in the specified Pod container.
*
* Warning: Do not use tty=true unless you're actually attaching to a terminal, since collecting output will not work.
*/
async execInPod({
namespace,
podName,
containerName,
command,
stdout,
stderr,
stdin,
tty,
timeoutSec,
}: {
namespace: string
podName: string
containerName: string
command: string[]
stdout?: Writable
stderr?: Writable
stdin?: Readable
tty: boolean
timeoutSec?: number
}): Promise<{ exitCode?: number; allLogs: string; stdout: string; stderr: string; timedOut: boolean }> {
const stdoutCollector = new StringCollector()
const stderrCollector = new StringCollector()
const combinedCollector = new StringCollector()

let _stdout: Writable = stdoutCollector
let _stderr: Writable = stderrCollector

// Unless we're attaching a TTY to the output streams, we multiplex the outputs to both a StringCollector,
// and whatever stream the caller provided.
if (!tty) {
_stdout = new PassThrough()
_stdout.pipe(stdoutCollector)
_stdout.pipe(combinedCollector)

if (stdout) {
_stdout.pipe(stdout)
}

_stderr = new PassThrough()
_stderr.pipe(stderrCollector)
_stderr.pipe(combinedCollector)

if (stderr) {
_stderr.pipe(stderr)
}
}

const execHandler = new Exec(this.config, new WebSocketHandler(this.config))
let status: V1Status

const ws = await execHandler.exec(
namespace,
podName,
containerName,
command,
_stdout,
_stderr,
stdin || null,
tty,
(_status) => {
status = _status
}
)

return new Promise((resolve, reject) => {
let done = false

const finish = (timedOut: boolean, exitCode?: number) => {
!done &&
resolve({
allLogs: combinedCollector.getString(),
stdout: stdoutCollector.getString(),
stderr: stderrCollector.getString(),
timedOut,
exitCode,
})
done = true
}

if (timeoutSec) {
setTimeout(() => {
!done && finish(true)
}, timeoutSec * 1000)
}

ws.on("error", (err) => {
!done && reject(err)
done = true
})

ws.on("close", () => {
finish(false, getExecExitCode(status))
})
})
}

/**
* Attach to the specified Pod and container.
*
* Warning: Do not use tty=true unless you're actually attaching to a terminal, since collecting output will not work.
*/
async attachToPod({
namespace,
podName,
containerName,
stdout,
stderr,
stdin,
tty,
}: {
namespace: string
podName: string
containerName: string
stdout?: Writable
stderr?: Writable
stdin?: Readable
tty: boolean
}) {
const handler = new Attach(this.config, new WebSocketHandler(this.config))
return handler.attach(namespace, podName, containerName, stdout || null, stderr || null, stdin || null, tty)
}

/**
* Create an ad-hoc Pod. Use this method to handle race-condition cases when creating Pods.
*/
async createPod(namespace: string, pod: KubernetesPod) {
try {
await this.core.createNamespacedPod(namespace, pod)
} catch (error) {
// This can occur in laggy environments, just need to retry
if (error.message.includes("No API token found for service account")) {
await sleep(500)
return this.createPod(namespace, pod)
} else {
throw new KubernetesError(`Failed to create Pod ${pod.metadata.name}: ${error.message}`, { error })
}
}
}
}

function getGroupBasePath(apiVersion: string) {
Expand Down Expand Up @@ -515,13 +732,14 @@ async function getContextConfig(log: LogEntry, ctx: PluginContext, provider: Kub
}

function wrapError(err: any) {
if (!err.message) {
if (!err.message || err.name === "HttpError") {
const response = err.response || {}
const body = response.body || err.body
const wrapped = new KubernetesError(`Got error from Kubernetes API - ${body.message}`, {
body,
request: omitBy(response.request, (v, k) => isObject(v) || k[0] === "_"),
})
wrapped.statusCode = err.statusCode
return wrapped
} else {
return err
Expand Down
Loading

0 comments on commit 5cdbcea

Please sign in to comment.