diff --git a/core/src/plugins/google/common.ts b/core/src/plugins/google/common.ts index ec44d2e8a5..1222316101 100644 --- a/core/src/plugins/google/common.ts +++ b/core/src/plugins/google/common.ts @@ -80,7 +80,7 @@ export async function prepareEnvironment({ status, log }: PrepareEnvironmentPara section: "google-cloud-functions", msg: `Initializing SDK...`, }) - await gcloud().call(["init"], { timeout: 600, tty: true }) + await gcloud().call(["init"], { timeoutSec: 600, tty: true }) } return { status: { ready: true, outputs: {} } } diff --git a/core/src/plugins/google/gcloud.ts b/core/src/plugins/google/gcloud.ts index 0114cf7f74..3ca05eaf15 100644 --- a/core/src/plugins/google/gcloud.ts +++ b/core/src/plugins/google/gcloud.ts @@ -36,9 +36,9 @@ export class GCloud { } async call(args: string[], opts: SpawnOpts = {}): Promise { - const { data, ignoreError = false, timeout = DEFAULT_TIMEOUT } = opts + const { data, ignoreError = false, timeoutSec: timeout = DEFAULT_TIMEOUT } = opts const preparedArgs = this.prepareArgs(args) - return spawn("gcloud", preparedArgs, { ignoreError, data, timeout }) + return spawn("gcloud", preparedArgs, { ignoreError, data, timeoutSec: timeout }) } async json(args: string[], opts: GCloudParams = {}): Promise { diff --git a/core/src/plugins/kubernetes/api.ts b/core/src/plugins/kubernetes/api.ts index 3842f1128d..2af81606c6 100644 --- a/core/src/plugins/kubernetes/api.ts +++ b/core/src/plugins/kubernetes/api.ts @@ -24,6 +24,9 @@ import { ApiextensionsV1beta1Api, PolicyV1beta1Api, KubernetesObject, + V1Status, + Exec, + Attach, } from "@kubernetes/client-node" import AsyncLock = require("async-lock") import request = require("request-promise") @@ -31,16 +34,26 @@ import requestErrors = require("request-promise/errors") import { safeLoad } from "js-yaml" import { readFile } from "fs-extra" -import { Omit, safeDumpYaml } from "../../util/util" +import { Omit, safeDumpYaml, StringCollector, sleep } from "../../util/util" import { omitBy, isObject, isPlainObject, keyBy } from "lodash" import { GardenBaseError, RuntimeError, ConfigurationError } from "../../exceptions" -import { KubernetesResource, KubernetesServerResource, KubernetesServerList } from "./types" +import { + KubernetesResource, + KubernetesServerResource, + KubernetesServerList, + KubernetesList, + KubernetesPod, +} from "./types" import { LogEntry } from "../../logger/log-entry" import { kubectl } from "./kubectl" import { urlJoin } from "../../util/string" import { KubernetesProvider } from "./config" import { StringMap } from "../../config/common" import { PluginContext } from "../../plugin-context" +import { Writable, Readable, PassThrough } from "stream" +import { WebSocketHandler } from "@kubernetes/client-node/dist/web-socket-handler" +import { getExecExitCode } from "./status/pod" +import { labelSelectorToString } from "./util" interface ApiGroupMap { [groupVersion: string]: V1APIGroup @@ -228,15 +241,7 @@ export class KubeApi { return group } - async getApiResourceInfo(log: LogEntry, manifest: KubernetesResource): Promise { - const apiVersion = manifest.apiVersion - - if (!apiVersion) { - throw new KubernetesError(`Missing apiVersion on resource`, { - manifest, - }) - } - + async getApiResourceInfo(log: LogEntry, apiVersion: string, kind: string): Promise { if (!cachedApiResourceInfo[this.context]) { cachedApiResourceInfo[this.context] = {} } @@ -261,11 +266,12 @@ export class KubeApi { return apiResources[apiVersion] })) - const resource = resourceMap[manifest.kind] + const resource = resourceMap[kind] if (!resource) { - throw new KubernetesError(`Unrecognized resource type ${manifest.apiVersion}/${manifest.kind}`, { - manifest, + throw new KubernetesError(`Unrecognized resource type ${apiVersion}/${kind}`, { + apiVersion, + kind, }) } @@ -307,12 +313,41 @@ export class KubeApi { async readBySpec({ log, namespace, manifest }: { log: LogEntry; namespace: string; manifest: KubernetesResource }) { log.silly(`Fetching Kubernetes resource ${manifest.apiVersion}/${manifest.kind}/${manifest.metadata.name}`) - const apiPath = await this.getApiPath({ manifest, log, namespace }) + const apiPath = await this.getResourceApiPath({ manifest, log, namespace }) const res = await this.request({ log, path: apiPath }) return res.body } + async listResources({ + log, + apiVersion, + kind, + namespace, + labelSelector, + }: { + log: LogEntry + apiVersion: string + kind: string + namespace: string + labelSelector?: { [label: string]: string } + }) { + const apiPath = await this.getResourceTypeApiPath({ log, apiVersion, kind, namespace }) + const labelSelectorString = labelSelector ? labelSelectorToString(labelSelector) : undefined + + const res = await this.request({ log, path: apiPath, opts: { qs: { labelSelector: labelSelectorString } } }) + const list = res.body as KubernetesList + + // This fixes an odd issue where apiVersion and kind are sometimes missing from list items coming from the API :/ + list.items = list.items.map((r) => ({ + ...r, + apiVersion: r.apiVersion || apiVersion, + kind: r.kind || kind, + })) + + return list + } + async replace({ log, resource, @@ -324,7 +359,7 @@ export class KubeApi { }) { log.silly(`Replacing Kubernetes resource ${resource.apiVersion}/${resource.kind}/${resource.metadata.name}`) - const apiPath = await this.getApiPath({ manifest: resource, log, namespace }) + const apiPath = await this.getResourceApiPath({ manifest: resource, log, namespace }) const res = await this.request({ log, path: apiPath, opts: { method: "put", body: resource } }) return res.body @@ -352,7 +387,7 @@ export class KubeApi { async deleteBySpec({ namespace, manifest, log }: { namespace: string; manifest: KubernetesResource; log: LogEntry }) { log.silly(`Deleting Kubernetes resource ${manifest.apiVersion}/${manifest.kind}/${manifest.metadata.name}`) - const apiPath = await this.getApiPath({ manifest, log, namespace }) + const apiPath = await this.getResourceApiPath({ manifest, log, namespace }) try { await this.request({ log, path: apiPath, opts: { method: "delete" } }) @@ -363,7 +398,26 @@ export class KubeApi { } } - private async getApiPath({ + private async getResourceTypeApiPath({ + apiVersion, + kind, + log, + namespace, + }: { + apiVersion: string + kind: string + log: LogEntry + namespace: string + }) { + const resourceInfo = await this.getApiResourceInfo(log, apiVersion, kind) + const basePath = getGroupBasePath(apiVersion) + + return resourceInfo.namespaced + ? `${basePath}/namespaces/${namespace}/${resourceInfo.name}` + : `${basePath}/${resourceInfo.name}` + } + + private async getResourceApiPath({ manifest, log, namespace, @@ -372,14 +426,32 @@ export class KubeApi { log: LogEntry namespace?: string }) { - const resourceInfo = await this.getApiResourceInfo(log, manifest) const apiVersion = manifest.apiVersion - const name = manifest.metadata.name - const basePath = getGroupBasePath(apiVersion) - return resourceInfo.namespaced - ? `${basePath}/namespaces/${namespace || manifest.metadata.namespace}/${resourceInfo.name}/${name}` - : `${basePath}/${resourceInfo.name}/${name}` + if (!apiVersion) { + throw new KubernetesError(`Missing apiVersion on resource`, { + manifest, + }) + } + + if (!namespace) { + namespace = manifest.metadata.namespace + } + + if (!namespace) { + throw new KubernetesError(`Missing namespace on resource and no namespace specified`, { + manifest, + }) + } + + const typePath = await this.getResourceTypeApiPath({ + log, + apiVersion, + kind: manifest.kind, + namespace, + }) + + return typePath + "/" + manifest.metadata.name } async upsert>({ @@ -468,6 +540,151 @@ export class KubeApi { }, }) } + + /** + * Exec a command in the specified Pod container. + * + * Warning: Do not use tty=true unless you're actually attaching to a terminal, since collecting output will not work. + */ + async execInPod({ + namespace, + podName, + containerName, + command, + stdout, + stderr, + stdin, + tty, + timeoutSec, + }: { + namespace: string + podName: string + containerName: string + command: string[] + stdout?: Writable + stderr?: Writable + stdin?: Readable + tty: boolean + timeoutSec?: number + }): Promise<{ exitCode?: number; allLogs: string; stdout: string; stderr: string; timedOut: boolean }> { + const stdoutCollector = new StringCollector() + const stderrCollector = new StringCollector() + const combinedCollector = new StringCollector() + + let _stdout: Writable = stdoutCollector + let _stderr: Writable = stderrCollector + + // Unless we're attaching a TTY to the output streams, we multiplex the outputs to both a StringCollector, + // and whatever stream the caller provided. + if (!tty) { + _stdout = new PassThrough() + _stdout.pipe(stdoutCollector) + _stdout.pipe(combinedCollector) + + if (stdout) { + _stdout.pipe(stdout) + } + + _stderr = new PassThrough() + _stderr.pipe(stderrCollector) + _stderr.pipe(combinedCollector) + + if (stderr) { + _stderr.pipe(stderr) + } + } + + const execHandler = new Exec(this.config, new WebSocketHandler(this.config)) + let status: V1Status + + const ws = await execHandler.exec( + namespace, + podName, + containerName, + command, + _stdout, + _stderr, + stdin || null, + tty, + (_status) => { + status = _status + } + ) + + return new Promise((resolve, reject) => { + let done = false + + const finish = (timedOut: boolean, exitCode?: number) => { + !done && + resolve({ + allLogs: combinedCollector.getString(), + stdout: stdoutCollector.getString(), + stderr: stderrCollector.getString(), + timedOut, + exitCode, + }) + done = true + } + + if (timeoutSec) { + setTimeout(() => { + !done && finish(true) + }, timeoutSec * 1000) + } + + ws.on("error", (err) => { + !done && reject(err) + done = true + }) + + ws.on("close", () => { + finish(false, getExecExitCode(status)) + }) + }) + } + + /** + * Attach to the specified Pod and container. + * + * Warning: Do not use tty=true unless you're actually attaching to a terminal, since collecting output will not work. + */ + async attachToPod({ + namespace, + podName, + containerName, + stdout, + stderr, + stdin, + tty, + }: { + namespace: string + podName: string + containerName: string + stdout?: Writable + stderr?: Writable + stdin?: Readable + tty: boolean + }) { + const handler = new Attach(this.config, new WebSocketHandler(this.config)) + return handler.attach(namespace, podName, containerName, stdout || null, stderr || null, stdin || null, tty) + } + + /** + * Create an ad-hoc Pod. Use this method to handle race-condition cases when creating Pods. + */ + async createPod(namespace: string, pod: KubernetesPod) { + try { + await this.core.createNamespacedPod(namespace, pod) + } catch (error) { + // This can occur in laggy environments, just need to retry + if (error.message.includes("No API token found for service account")) { + await sleep(500) + return this.createPod(namespace, pod) + } else { + throw new KubernetesError(`Failed to create Pod ${pod.metadata.name}: ${error.message}`, { error }) + } + } + } } function getGroupBasePath(apiVersion: string) { @@ -515,13 +732,14 @@ async function getContextConfig(log: LogEntry, ctx: PluginContext, provider: Kub } function wrapError(err: any) { - if (!err.message) { + if (!err.message || err.name === "HttpError") { const response = err.response || {} const body = response.body || err.body const wrapped = new KubernetesError(`Got error from Kubernetes API - ${body.message}`, { body, request: omitBy(response.request, (v, k) => isObject(v) || k[0] === "_"), }) + wrapped.statusCode = err.statusCode return wrapped } else { return err diff --git a/core/src/plugins/kubernetes/commands/cleanup-cluster-registry.ts b/core/src/plugins/kubernetes/commands/cleanup-cluster-registry.ts index 79053d2813..f7d929a351 100644 --- a/core/src/plugins/kubernetes/commands/cleanup-cluster-registry.ts +++ b/core/src/plugins/kubernetes/commands/cleanup-cluster-registry.ts @@ -19,21 +19,17 @@ import { queryRegistry } from "../container/util" import { splitFirst, splitLast } from "../../../util/util" import { LogEntry } from "../../../logger/log-entry" import Bluebird from "bluebird" -import { - CLUSTER_REGISTRY_DEPLOYMENT_NAME, - inClusterRegistryHostname, - dockerDaemonDeploymentName, - dockerDaemonContainerName, -} from "../constants" +import { CLUSTER_REGISTRY_DEPLOYMENT_NAME, inClusterRegistryHostname, dockerDaemonContainerName } from "../constants" import { PluginError } from "../../../exceptions" -import { apply, kubectl } from "../kubectl" +import { apply } from "../kubectl" import { waitForResources } from "../status/status" import { execInWorkload } from "../container/exec" import { dedent, deline } from "../../../util/string" -import { execInPod, BuilderExecParams, buildSyncDeploymentName } from "../container/build" -import { getDeploymentPodName } from "../util" +import { buildSyncDeploymentName, getDockerDaemonPodRunner } from "../container/build" +import { getDeploymentPod } from "../util" import { getSystemNamespace } from "../namespace" import { PluginContext } from "../../../plugin-context" +import { PodRunner } from "../run" const workspaceSyncDirTtl = 0.5 * 86400 // 2 days @@ -57,6 +53,8 @@ export const cleanupClusterRegistry: PluginCommand = { // Scan through all Pods in cluster const api = await KubeApi.factory(log, ctx, provider) + const systemNamespace = await getSystemNamespace(ctx, provider, log) + const imagesInUse = await getImagesInUse(api, provider, log) // Get images in registry @@ -82,11 +80,11 @@ export const cleanupClusterRegistry: PluginCommand = { } if (provider.config.buildMode === "cluster-docker") { - await deleteImagesFromDaemon({ ctx, provider, log, imagesInUse }) + await deleteImagesFromDaemon({ api, ctx, provider, log, imagesInUse, systemNamespace }) } // Clean old directories from build sync volume - await cleanupBuildSyncVolume(ctx, provider, log) + await cleanupBuildSyncVolume({ api, ctx, provider, log, systemNamespace }) log.info({ msg: chalk.green("\nDone!"), status: "success" }) @@ -237,7 +235,7 @@ async function runRegistryGarbageCollection(ctx: KubernetesPluginContext, api: K const modifiedDeployment: KubernetesDeployment = sanitizeResource(registryDeployment) - modifiedDeployment.spec.template.spec.containers[0].env.push({ + modifiedDeployment.spec!.template.spec!.containers[0].env!.push({ name: "REGISTRY_STORAGE_MAINTENANCE", // This needs to be YAML because of issue https://github.com/docker/distribution/issues/1736 value: dedent` @@ -286,9 +284,9 @@ async function runRegistryGarbageCollection(ctx: KubernetesPluginContext, api: K registryDeployment = await api.apps.readNamespacedDeployment(CLUSTER_REGISTRY_DEPLOYMENT_NAME, systemNamespace) const writableRegistry = sanitizeResource(registryDeployment) // -> Remove the maintenance flag - writableRegistry.spec.template.spec.containers[0].env = writableRegistry.spec.template.spec.containers[0].env.filter( - (e) => e.name !== "REGISTRY_STORAGE_MAINTENANCE" - ) + writableRegistry.spec.template.spec!.containers[0].env = + writableRegistry.spec?.template.spec?.containers[0].env?.filter((e) => e.name !== "REGISTRY_STORAGE_MAINTENANCE") || + [] await apply({ ctx, @@ -320,15 +318,19 @@ function sanitizeResource(resource: T): T { } async function deleteImagesFromDaemon({ + api, ctx, provider, log, imagesInUse, + systemNamespace, }: { + api: KubeApi ctx: PluginContext provider: KubernetesProvider log: LogEntry imagesInUse: string[] + systemNamespace: string }) { log = log.info({ msg: chalk.white(`Cleaning images from Docker daemon...`), @@ -336,19 +338,17 @@ async function deleteImagesFromDaemon({ }) log.info("Getting list of images from daemon...") - const podName = await getDeploymentPodName(dockerDaemonDeploymentName, ctx, provider, log) - const listArgs = ["docker", "images", "--format", "{{.Repository}}:{{.Tag}}"] - const res = await execInPod({ - ctx, - provider, + const runner = await getDockerDaemonPodRunner({ api, systemNamespace, ctx, provider }) + + const res = await runner.exec({ log, - args: listArgs, - podName, + command: ["docker", "images", "--format", "{{.Repository}}:{{.Tag}}"], containerName: dockerDaemonContainerName, - timeout: 300, + timeoutSec: 300, }) - const imagesInDaemon = res.stdout + + const imagesInDaemon = res.log .split("\n") .filter(Boolean) // Not sure why we see some of these @@ -374,8 +374,12 @@ async function deleteImagesFromDaemon({ await Bluebird.map( imagesBatches, async (images) => { - const args = ["docker", "rmi", ...images] - await execInPod({ ctx, provider, log, args, podName, containerName: dockerDaemonContainerName, timeout: 300 }) + await runner.exec({ + log, + command: ["docker", "rmi", ...images], + containerName: dockerDaemonContainerName, + timeoutSec: 300, + }) log.setState(deline` Deleting images: ${pluralize("batch", counter, true)} of ${imagesBatches.length} left...`) @@ -387,42 +391,54 @@ async function deleteImagesFromDaemon({ // Run a prune operation log.info(`Pruning with \`docker image prune -f\`...`) - await execInPod({ - ctx, - provider, + await runner.exec({ log, - args: ["docker", "image", "prune", "-f"], - podName, + command: ["docker", "image", "prune", "-f"], containerName: dockerDaemonContainerName, - timeout: 300, + timeoutSec: 300, }) log.setSuccess() } -async function cleanupBuildSyncVolume(ctx: PluginContext, provider: KubernetesProvider, log: LogEntry) { +async function cleanupBuildSyncVolume({ + api, + ctx, + provider, + log, + systemNamespace, +}: { + api: KubeApi + ctx: PluginContext + provider: KubernetesProvider + log: LogEntry + systemNamespace: string +}) { log = log.info({ msg: chalk.white(`Cleaning up old workspaces from build sync volume...`), status: "active", }) - const podName = await getDeploymentPodName(buildSyncDeploymentName, ctx, provider, log) + const pod = await getDeploymentPod({ api, deploymentName: buildSyncDeploymentName, namespace: systemNamespace }) - const statArgs = ["sh", "-c", 'stat /data/* -c "%n %X"'] - const stat = await execInBuildSync({ + const runner = new PodRunner({ + api, ctx, provider, + namespace: systemNamespace, + pod, + }) + + const stat = await runner.exec({ log, - args: statArgs, - timeout: 30, - podName, - containerName: dockerDaemonContainerName, + command: ["sh", "-c", 'stat /data/* -c "%n %X"'], + timeoutSec: 30, }) // Remove directories last accessed more than workspaceSyncDirTtl ago const minTimestamp = new Date().getTime() / 1000 - workspaceSyncDirTtl - const outdatedDirs = stat.stdout + const outdatedDirs = stat.log .split("\n") .filter(Boolean) .map((line) => { @@ -434,32 +450,14 @@ async function cleanupBuildSyncVolume(ctx: PluginContext, provider: KubernetesPr const dirsToDelete = ["/data/tmp/*", ...outdatedDirs] - // Delete the director + // Delete the directories log.info(`Deleting ${dirsToDelete.length} workspace directories.`) - const deleteArgs = ["rm", "-rf", ...dirsToDelete] - await execInBuildSync({ - ctx, - provider, + + await runner.exec({ log, - args: deleteArgs, - timeout: 300, - podName, - containerName: dockerDaemonContainerName, + command: ["rm", "-rf", ...dirsToDelete], + timeoutSec: 30, }) log.setSuccess() } - -async function execInBuildSync({ ctx, provider, log, args, timeout, podName }: BuilderExecParams) { - const execCmd = ["exec", "-i", podName, "--", ...args] - const systemNamespace = await getSystemNamespace(ctx, provider, log) - - log.verbose(`Running: kubectl ${execCmd.join(" ")}`) - - return kubectl(ctx, provider).exec({ - args: execCmd, - log, - namespace: systemNamespace, - timeoutSec: timeout, - }) -} diff --git a/core/src/plugins/kubernetes/commands/pull-image.ts b/core/src/plugins/kubernetes/commands/pull-image.ts index 5c6ff687cf..1e6b317c3e 100644 --- a/core/src/plugins/kubernetes/commands/pull-image.ts +++ b/core/src/plugins/kubernetes/commands/pull-image.ts @@ -22,9 +22,10 @@ import { RuntimeError } from "../../../exceptions" import { PodRunner } from "../run" import { inClusterRegistryHostname } from "../constants" import { getAppNamespace, getSystemNamespace } from "../namespace" -import { makePodName, skopeoImage, getSkopeoContainer, getDockerAuthVolume } from "../util" +import { makePodName, getSkopeoContainer, getDockerAuthVolume } from "../util" import { getRegistryPortForward } from "../container/util" import { PluginContext } from "../../../plugin-context" +import { KubernetesPod } from "../types" export const pullImage: PluginCommand = { name: "pull-image", @@ -160,7 +161,6 @@ async function pullFromExternalRegistry( api, podName, systemNamespace, - module, log, }) @@ -199,10 +199,9 @@ async function importImage({ await tmp.withFile(async ({ path }) => { let writeStream = fs.createWriteStream(path) - await runner.spawn({ + await runner.exec({ command: getOutputCommand, - container: "skopeo", - ignoreError: false, + containerName: "skopeo", log, stdout: writeStream, }) @@ -216,10 +215,9 @@ async function pullImageFromRegistry(runner: PodRunner, command: string, log: Lo // TODO: make this timeout configurable await runner.exec({ command: ["sh", "-c", command], - container: "skopeo", - ignoreError: false, + containerName: "skopeo", log, - timeout: 60 * 1000 * 5, // 5 minutes, + timeoutSec: 60 * 1000 * 5, // 5 minutes, }) } @@ -229,7 +227,6 @@ async function launchSkopeoContainer({ api, podName, systemNamespace, - module, log, }: { ctx: PluginContext @@ -237,18 +234,17 @@ async function launchSkopeoContainer({ api: KubeApi podName: string systemNamespace: string - module: GardenModule log: LogEntry }): Promise { const sleepCommand = "sleep 86400" - const runner = new PodRunner({ - ctx, - api, - podName, - provider, - image: skopeoImage, - module, - namespace: systemNamespace, + + const pod: KubernetesPod = { + apiVersion: "v1", + kind: "Pod", + metadata: { + name: podName, + namespace: systemNamespace, + }, spec: { shareProcessNamespace: true, volumes: [ @@ -257,18 +253,23 @@ async function launchSkopeoContainer({ ], containers: [getSkopeoContainer(sleepCommand)], }, + } + + const runner = new PodRunner({ + ctx, + api, + pod, + provider, + namespace: systemNamespace, }) - const { pod, state, debugLog } = await runner.start({ + const { status } = await runner.start({ log, - ignoreError: false, }) - if (state !== "ready") { - throw new RuntimeError("Failed to start skopeo contaer", { - pod, - state, - debugLog, + if (status.state !== "ready") { + throw new RuntimeError("Failed to start skopeo container", { + status, }) } diff --git a/core/src/plugins/kubernetes/container/build.ts b/core/src/plugins/kubernetes/container/build.ts index ce03fc08f2..961f086874 100644 --- a/core/src/plugins/kubernetes/container/build.ts +++ b/core/src/plugins/kubernetes/container/build.ts @@ -15,7 +15,7 @@ import { containerHelpers } from "../../container/helpers" import { buildContainerModule, getContainerBuildStatus, getDockerBuildFlags } from "../../container/build" import { GetBuildStatusParams, BuildStatus } from "../../../types/plugin/module/getBuildStatus" import { BuildModuleParams, BuildResult } from "../../../types/plugin/module/build" -import { millicpuToString, megabytesToString, getDeploymentPodName, makePodName } from "../util" +import { millicpuToString, megabytesToString, getDeploymentPod, makePodName } from "../util" import { RSYNC_PORT, dockerAuthSecretName, @@ -27,7 +27,6 @@ import { } from "../constants" import { posix, resolve } from "path" import { KubeApi } from "../api" -import { kubectl } from "../kubectl" import { LogEntry } from "../../../logger/log-entry" import { getDockerAuthVolume } from "../util" import { KubernetesProvider, ContainerBuildMode, KubernetesPluginContext, DEFAULT_KANIKO_IMAGE } from "../config" @@ -40,13 +39,14 @@ import { Writable } from "stream" import { LogLevel } from "../../../logger/log-node" import { exec, renderOutputStream } from "../../../util/util" import { loadImageToKind } from "../local/kind" -import { getSystemNamespace, getAppNamespace } from "../namespace" +import { getSystemNamespace } from "../namespace" import { dedent } from "../../../util/string" import chalk = require("chalk") import { loadImageToMicrok8s, getMicrok8sImageStatus } from "../local/microk8s" import { RunResult } from "../../../types/plugin/base" import { ContainerProvider } from "../../container/container" import { PluginContext } from "../../../plugin-context" +import { KubernetesPod } from "../types" const registryPort = 5000 @@ -114,6 +114,7 @@ const buildStatusHandlers: { [mode in ContainerBuildMode]: BuildStatusHandler } const k8sCtx = ctx as KubernetesPluginContext const provider = k8sCtx.provider const deploymentRegistry = provider.config.deploymentRegistry + const api = await KubeApi.factory(log, ctx, provider) if (!deploymentRegistry) { // This is validated in the provider configure handler, so this is an internal error if it happens @@ -123,25 +124,28 @@ const buildStatusHandlers: { [mode in ContainerBuildMode]: BuildStatusHandler } const args = await getManifestInspectArgs(module, deploymentRegistry) const pushArgs = ["/bin/sh", "-c", "DOCKER_CLI_EXPERIMENTAL=enabled docker " + args.join(" ")] - const podName = await getDeploymentPodName(dockerDaemonDeploymentName, ctx, provider, log) - const res = await execInPod({ - ctx, - provider, - log, - args: pushArgs, - timeout: 300, - podName, - containerName: dockerDaemonContainerName, - ignoreError: true, - }) + const systemNamespace = await getSystemNamespace(ctx, provider, log) + const runner = await getDockerDaemonPodRunner({ api, systemNamespace, ctx, provider }) - // Non-zero exit code can both mean the manifest is not found, and any other unexpected error - if (res.exitCode !== 0 && !res.stderr.includes("no such manifest")) { - const detail = res.all || `docker manifest inspect exited with code ${res.exitCode}` - log.warn(chalk.yellow(`Unable to query registry for image status: ${detail}`)) - } + try { + await runner.exec({ + log, + command: pushArgs, + timeoutSec: 300, + containerName: dockerDaemonContainerName, + }) + return { ready: true } + } catch (err) { + const res = err.detail.result + + // Non-zero exit code can both mean the manifest is not found, and any other unexpected error + if (res.exitCode !== 0 && !res.stderr.includes("no such manifest")) { + const detail = res.all || `docker manifest inspect exited with code ${res.exitCode}` + log.warn(chalk.yellow(`Unable to query registry for image status: ${detail}`)) + } - return { ready: res.exitCode === 0 } + return { ready: false } + } }, "kaniko": async (params) => { @@ -166,27 +170,29 @@ const buildStatusHandlers: { [mode in ContainerBuildMode]: BuildStatusHandler } skopeoCommand.push(`docker://${remoteId}`) const podCommand = ["sh", "-c", skopeoCommand.join(" ")] - const podName = await getDeploymentPodName(gardenUtilDaemonDeploymentName, ctx, provider, log) - const res = await execInPod({ - ctx, - provider, - log, - args: podCommand, - timeout: 300, - podName, - containerName: skopeoDaemonContainerName, - ignoreError: true, - }) + const api = await KubeApi.factory(log, ctx, provider) + const systemNamespace = await getSystemNamespace(ctx, provider, log) + const runner = await getUtilDaemonPodRunner({ api, systemNamespace, ctx, provider }) - // Non-zero exit code can both mean the manifest is not found, and any other unexpected error - if (res.exitCode !== 0 && !res.stderr.includes("manifest unknown")) { - throw new RuntimeError(`Unable to query registry for image status: ${res.all}`, { - command: skopeoCommand, - output: res.all, + try { + await runner.exec({ + log, + command: podCommand, + timeoutSec: 300, + containerName: skopeoDaemonContainerName, }) + return { ready: true } + } catch (err) { + const res = err.detail.result + // Non-zero exit code can both mean the manifest is not found, and any other unexpected error + if (res.exitCode !== 0 && !res.stderr.includes("manifest unknown")) { + throw new RuntimeError(`Unable to query registry for image status: ${res.all}`, { + command: skopeoCommand, + output: res.all, + }) + } + return { ready: false } } - - return { ready: res.exitCode === 0 } }, } @@ -226,14 +232,18 @@ const localBuild: BuildHandler = async (params) => { const remoteBuild: BuildHandler = async (params) => { const { ctx, module, log } = params const provider = ctx.provider - const namespace = await getAppNamespace(ctx, log, provider) const systemNamespace = await getSystemNamespace(ctx, provider, log) + const api = await KubeApi.factory(log, ctx, provider) if (!(await containerHelpers.hasDockerfile(module))) { return {} } - const buildSyncPod = await getDeploymentPodName(buildSyncDeploymentName, ctx, provider, log) + const buildSyncPod = await getDeploymentPod({ + api, + deploymentName: buildSyncDeploymentName, + namespace: systemNamespace, + }) // Sync the build context to the remote sync service // -> Get a tunnel to the service log.setState("Syncing sources to cluster...") @@ -241,7 +251,7 @@ const remoteBuild: BuildHandler = async (params) => { ctx, log, namespace: systemNamespace, - targetResource: `Pod/${buildSyncPod}`, + targetResource: `Pod/${buildSyncPod.metadata.name}`, port: RSYNC_PORT, }) @@ -310,7 +320,6 @@ const remoteBuild: BuildHandler = async (params) => { ] // Execute the build - const podName = await getDeploymentPodName(dockerDaemonDeploymentName, ctx, provider, log) const containerName = dockerDaemonContainerName const buildTimeout = module.spec.build.timeout @@ -318,17 +327,17 @@ const remoteBuild: BuildHandler = async (params) => { args = ["/bin/sh", "-c", "DOCKER_BUILDKIT=1 " + args.join(" ")] } - const buildRes = await execInPod({ - ctx, - provider, + const runner = await getDockerDaemonPodRunner({ api, ctx, provider, systemNamespace }) + + const buildRes = await runner.exec({ log, - args, - timeout: buildTimeout, - podName, + command: args, + timeoutSec: buildTimeout, containerName, stdout, }) - buildLog = buildRes.stdout + buildRes.stderr + + buildLog = buildRes.log // Push the image to the registry log.setState({ msg: `Pushing image ${localId} to registry...` }) @@ -336,17 +345,15 @@ const remoteBuild: BuildHandler = async (params) => { const dockerCmd = ["docker", "push", deploymentImageId] const pushArgs = ["/bin/sh", "-c", dockerCmd.join(" ")] - const pushRes = await execInPod({ - ctx, - provider, + const pushRes = await runner.exec({ log, - args: pushArgs, - timeout: 300, - podName, + command: pushArgs, + timeoutSec: 300, containerName, stdout, }) - buildLog += pushRes.stdout + pushRes.stderr + + buildLog += pushRes.log } else if (provider.config.buildMode === "kaniko") { // build with Kaniko const args = [ @@ -367,7 +374,15 @@ const remoteBuild: BuildHandler = async (params) => { args.push(...getDockerBuildFlags(module)) // Execute the build - const buildRes = await runKaniko({ ctx, provider, namespace, log, module, args, outputStream: stdout }) + const buildRes = await runKaniko({ + ctx, + provider, + log, + namespace: systemNamespace, + module, + args, + outputStream: stdout, + }) buildLog = buildRes.log if (kanikoBuildFailed(buildRes)) { @@ -387,18 +402,52 @@ const remoteBuild: BuildHandler = async (params) => { } } -export interface BuilderExecParams { +export async function getDockerDaemonPodRunner({ + api, + systemNamespace, + ctx, + provider, +}: { + api: KubeApi + systemNamespace: string ctx: PluginContext provider: KubernetesProvider - log: LogEntry - args: string[] - env?: { [key: string]: string } - ignoreError?: boolean - timeout: number - podName: string - containerName: string - stdout?: Writable - stderr?: Writable +}) { + const pod = await getDeploymentPod({ api, deploymentName: dockerDaemonDeploymentName, namespace: systemNamespace }) + + return new PodRunner({ + api, + ctx, + provider, + namespace: systemNamespace, + pod, + }) +} + +export async function getUtilDaemonPodRunner({ + api, + systemNamespace, + ctx, + provider, +}: { + api: KubeApi + systemNamespace: string + ctx: PluginContext + provider: KubernetesProvider +}) { + const pod = await getDeploymentPod({ + api, + deploymentName: gardenUtilDaemonDeploymentName, + namespace: systemNamespace, + }) + + return new PodRunner({ + api, + ctx, + provider, + namespace: systemNamespace, + pod, + }) } export const DEFAULT_KANIKO_FLAGS = ["--cache=true"] @@ -407,7 +456,7 @@ export const getKanikoFlags = (flags?: string[], topLevelFlags?: string[]): stri if (!flags && !topLevelFlags) { return DEFAULT_KANIKO_FLAGS } - const flagToKey = (flag) => { + const flagToKey = (flag: string) => { const found = flag.match(/--([a-zA-Z]*)/) if (found === null) { throw new ConfigurationError(`Invalid format for a kaniko flag`, { flag }) @@ -435,35 +484,6 @@ const buildHandlers: { [mode in ContainerBuildMode]: BuildHandler } = { "kaniko": remoteBuild, } -// TODO: we should make a simple service around this instead of execing into containers -export async function execInPod({ - ctx, - provider, - log, - args, - ignoreError, - timeout, - podName, - containerName, - stdout, - stderr, -}: BuilderExecParams) { - const execCmd = ["exec", "-i", podName, "-c", containerName, "--", ...args] - const systemNamespace = await getSystemNamespace(ctx, provider, log) - - log.verbose(`Running: kubectl ${execCmd.join(" ")}`) - - return kubectl(ctx, provider).exec({ - args: execCmd, - ignoreError, - log, - namespace: systemNamespace, - timeoutSec: timeout, - stdout, - stderr, - }) -} - interface RunKanikoParams { ctx: PluginContext provider: KubernetesProvider @@ -474,9 +494,16 @@ interface RunKanikoParams { outputStream: Writable } -async function runKaniko({ ctx, provider, namespace, log, module, args, outputStream }: RunKanikoParams) { +async function runKaniko({ + ctx, + provider, + namespace, + log, + module, + args, + outputStream, +}: RunKanikoParams): Promise { const api = await KubeApi.factory(log, ctx, provider) - const systemNamespace = await getSystemNamespace(ctx, provider, log) const podName = makePodName("kaniko", namespace, module.name) const registryHostname = getRegistryHostname(provider.config) @@ -601,27 +628,36 @@ async function runKaniko({ ctx, provider, namespace, log, module, args, outputSt ]) } + const pod: KubernetesPod = { + apiVersion: "v1", + kind: "Pod", + metadata: { + name: podName, + namespace, + }, + spec, + } + const runner = new PodRunner({ ctx, api, - podName, + pod, provider, - image: kanikoImage, - module, - namespace: systemNamespace, - spec, + namespace, }) - try { - return runner.startAndWait({ - ignoreError: true, - interactive: false, - log, - timeout: module.spec.build.timeout, - stdout: outputStream, - }) - } finally { - await runner.stop() + const result = await runner.runAndWait({ + log, + remove: true, + timeoutSec: module.spec.build.timeout, + stdout: outputStream, + tty: false, + }) + + return { + ...result, + moduleName: module.name, + version: module.version.versionString, } } diff --git a/core/src/plugins/kubernetes/container/deployment.ts b/core/src/plugins/kubernetes/container/deployment.ts index 2f52476ade..0c5601238a 100644 --- a/core/src/plugins/kubernetes/container/deployment.ts +++ b/core/src/plugins/kubernetes/container/deployment.ts @@ -362,7 +362,7 @@ export async function createWorkloadManifest({ }, } - workload.spec.template.spec.containers = [container] + workload.spec.template.spec!.containers = [container] if (service.spec.command && service.spec.command.length > 0) { container.command = service.spec.command @@ -377,7 +377,7 @@ export async function createWorkloadManifest({ } if (spec.volumes && spec.volumes.length) { - configureVolumes(service.module, workload.spec.template.spec, spec.volumes) + configureVolumes(service.module, workload.spec.template.spec!, spec.volumes) } const ports = spec.ports @@ -408,9 +408,11 @@ export async function createWorkloadManifest({ }) } } else { - workload.spec.replicas = configuredReplicas + const deployment = workload + deployment.spec!.replicas = configuredReplicas - workload.spec.strategy = { + // Need the any cast because the library types are busted + deployment.spec!.strategy = { type: "RollingUpdate", rollingUpdate: { // This is optimized for fast re-deployment. @@ -423,7 +425,7 @@ export async function createWorkloadManifest({ if (provider.config.imagePullSecrets.length > 0) { // add any configured imagePullSecrets - workload.spec.template.spec.imagePullSecrets = await prepareImagePullSecrets({ api, provider, namespace, log }) + workload.spec.template.spec!.imagePullSecrets = await prepareImagePullSecrets({ api, provider, namespace, log }) } // this is important for status checks to work correctly, because how K8s normalizes resources @@ -465,8 +467,8 @@ export async function createWorkloadManifest({ fsGroup: 2000, } - workload.spec.template.spec.affinity = affinity - workload.spec.template.spec.securityContext = securityContext + workload.spec.template.spec!.affinity = affinity + workload.spec.template.spec!.securityContext = securityContext } if (enableHotReload) { @@ -484,9 +486,9 @@ export async function createWorkloadManifest({ }) } - if (!workload.spec.template.spec.volumes.length) { + if (!workload.spec.template.spec?.volumes?.length) { // this is important for status checks to work correctly - delete workload.spec.template.spec.volumes + delete workload.spec.template.spec?.volumes } return workload diff --git a/core/src/plugins/kubernetes/container/exec.ts b/core/src/plugins/kubernetes/container/exec.ts index 51e2a7259b..b4f4ad7988 100644 --- a/core/src/plugins/kubernetes/container/exec.ts +++ b/core/src/plugins/kubernetes/container/exec.ts @@ -11,7 +11,6 @@ import { DeploymentError } from "../../../exceptions" import { ContainerModule } from "../../container/config" import { KubeApi } from "../api" import { getAppNamespace } from "../namespace" -import { kubectl } from "../kubectl" import { getContainerServiceStatus } from "./status" import { KubernetesPluginContext, KubernetesProvider } from "../config" import { ExecInServiceParams } from "../../../types/plugin/service/execInService" @@ -19,6 +18,7 @@ import { LogEntry } from "../../../logger/log-entry" import { getCurrentWorkloadPods } from "../util" import { KubernetesWorkload } from "../types" import { PluginContext } from "../../../plugin-context" +import { PodRunner } from "../run" export async function execInService(params: ExecInServiceParams) { const { ctx, log, service, command, interactive } = params @@ -75,22 +75,20 @@ export async function execInWorkload({ }) } - // exec in the pod via kubectl - const opts: string[] = [] - - if (interactive) { - opts.push("-it") - } + const runner = new PodRunner({ + api, + ctx, + provider, + namespace, + pod, + }) - const kubecmd = ["exec", ...opts, pod.metadata.name, "--", ...command] - const res = await kubectl(ctx, provider).spawnAndWait({ + const res = await runner.exec({ log, - namespace, - args: kubecmd, - ignoreError: true, + command, timeoutSec: 999999, tty: interactive, }) - return { code: res.code, output: res.all } + return { code: res.exitCode, output: res.log } } diff --git a/core/src/plugins/kubernetes/container/logs.ts b/core/src/plugins/kubernetes/container/logs.ts index 81ccd10349..1339785ba1 100644 --- a/core/src/plugins/kubernetes/container/logs.ts +++ b/core/src/plugins/kubernetes/container/logs.ts @@ -9,7 +9,7 @@ import { GetServiceLogsParams } from "../../../types/plugin/service/getServiceLogs" import { ContainerModule } from "../../container/config" import { getAppNamespace } from "../namespace" -import { getAllLogs } from "../logs" +import { streamK8sLogs } from "../logs" import { KubernetesPluginContext } from "../config" import { createWorkloadManifest } from "./deployment" import { emptyRuntimeContext } from "../../../runtime-context" @@ -37,5 +37,5 @@ export async function getServiceLogs(params: GetServiceLogsParams s.resource) }, } } diff --git a/core/src/plugins/kubernetes/helm/logs.ts b/core/src/plugins/kubernetes/helm/logs.ts index 569b0f5804..5de65738f3 100644 --- a/core/src/plugins/kubernetes/helm/logs.ts +++ b/core/src/plugins/kubernetes/helm/logs.ts @@ -7,7 +7,7 @@ */ import { GetServiceLogsParams } from "../../../types/plugin/service/getServiceLogs" -import { getAllLogs } from "../logs" +import { streamK8sLogs } from "../logs" import { HelmModule } from "./config" import { KubernetesPluginContext } from "../config" import { getChartResources } from "./common" @@ -26,5 +26,5 @@ export async function getServiceLogs(params: GetServiceLogsParams) { const resources = await getChartResources(k8sCtx, module, false, log) - return getAllLogs({ ...params, provider, defaultNamespace: namespace, resources }) + return streamK8sLogs({ ...params, provider, defaultNamespace: namespace, resources }) } diff --git a/core/src/plugins/kubernetes/helm/run.ts b/core/src/plugins/kubernetes/helm/run.ts index 338e638b23..66d9c8d406 100644 --- a/core/src/plugins/kubernetes/helm/run.ts +++ b/core/src/plugins/kubernetes/helm/run.ts @@ -18,10 +18,10 @@ import { RunResult } from "../../../types/plugin/base" import { RunTaskParams, RunTaskResult } from "../../../types/plugin/task/runTask" import { uniqByName } from "../../../util/util" import { prepareEnvVars } from "../util" -import { V1PodSpec } from "@kubernetes/client-node" import { KubeApi } from "../api" import { getModuleNamespace } from "../namespace" import { DEFAULT_TASK_TIMEOUT } from "../../../constants" +import { KubernetesPod } from "../types" export async function runHelmModule({ ctx, @@ -66,36 +66,47 @@ export async function runHelmModule({ // Apply overrides const env = uniqByName([...prepareEnvVars(runtimeContext.envVars), ...(container.env || [])]) - const spec: V1PodSpec = { - containers: [ - { - ...container, - ...(command && { command }), - ...(args && { args }), - env, - }, - ], - } - const api = await KubeApi.factory(log, ctx, provider) - const podName = makePodName("run", module.name) + + const pod: KubernetesPod = { + apiVersion: "v1", + kind: "Pod", + metadata: { + name: makePodName("run", module.name), + namespace, + }, + spec: { + containers: [ + { + ...container, + ...(command && { command }), + ...(args && { args }), + env, + }, + ], + }, + } const runner = new PodRunner({ ctx, api, - podName, + pod, provider, - image: container.image, - module, namespace, - spec, }) - return runner.startAndWait({ - interactive, + const result = await runner.runAndWait({ log, - timeout, + remove: true, + timeoutSec: timeout, + tty: !!interactive, }) + + return { + ...result, + moduleName: module.name, + version: module.version.versionString, + } } export async function runHelmTask(params: RunTaskParams): Promise { @@ -130,7 +141,7 @@ export async function runHelmTask(params: RunTaskParams): Promise): Prom const testName = testConfig.name const { command, args } = testConfig.spec - const image = container.image + const image = container.image! const timeout = testConfig.timeout || DEFAULT_TEST_TIMEOUT const result = await runAndCopy({ diff --git a/core/src/plugins/kubernetes/hot-reload.ts b/core/src/plugins/kubernetes/hot-reload.ts index 8c0bf9d980..ee320c7cc8 100644 --- a/core/src/plugins/kubernetes/hot-reload.ts +++ b/core/src/plugins/kubernetes/hot-reload.ts @@ -23,11 +23,9 @@ import { RSYNC_PORT } from "./constants" import { getAppNamespace } from "./namespace" import { KubernetesPluginContext } from "./config" import { HotReloadServiceParams, HotReloadServiceResult } from "../../types/plugin/service/hotReloadService" -import { KubernetesResource, KubernetesWorkload, KubernetesList } from "./types" +import { KubernetesResource, KubernetesWorkload } from "./types" import { normalizeLocalRsyncPath, normalizeRelativePath } from "../../util/fs" import { createWorkloadManifest } from "./container/deployment" -import { kubectl } from "./kubectl" -import { labelSelectorToString } from "./util" import { KubeApi } from "./api" import { syncWithOptions } from "../../util/sync" import { GardenModule } from "../../types/module" @@ -154,21 +152,21 @@ export function configureHotReload({ } // These any casts are necessary because of flaws in the TS definitions in the client library. - if (!target.spec.template.spec.volumes) { - target.spec.template.spec.volumes = [] + if (!target.spec.template.spec!.volumes) { + target.spec.template.spec!.volumes = [] } - target.spec.template.spec.volumes.push({ + target.spec.template.spec!.volumes.push({ name: syncVolumeName, emptyDir: {}, }) - if (!target.spec.template.spec.initContainers) { - target.spec.template.spec.initContainers = [] + if (!target.spec.template.spec!.initContainers) { + target.spec.template.spec!.initContainers = [] } - target.spec.template.spec.initContainers.push(initContainer) + target.spec.template.spec!.initContainers.push(initContainer) - target.spec.template.spec.containers.push(rsyncContainer) + target.spec.template.spec!.containers.push(rsyncContainer) } /** @@ -206,15 +204,17 @@ export async function hotReloadContainer({ log, blueGreen: provider.config.deploymentStrategy === "blue-green", }) - const selector = labelSelectorToString({ - [gardenAnnotationKey("service")]: service.name, - }) - // TODO: make and use a KubeApi method for this - const res: KubernetesList = await kubectl(ctx, provider).json({ - args: ["get", manifest.kind, "-l", selector], + + const res = await api.listResources({ log, + apiVersion: manifest.apiVersion, + kind: manifest.kind, namespace, + labelSelector: { + [gardenAnnotationKey("service")]: service.name, + }, }) + const list = res.items.filter((r) => r.metadata.annotations![gardenAnnotationKey("hot-reload")] === "true") if (list.length === 0) { diff --git a/core/src/plugins/kubernetes/kubernetes-module/common.ts b/core/src/plugins/kubernetes/kubernetes-module/common.ts index 6225ddcbea..9329996c6b 100644 --- a/core/src/plugins/kubernetes/kubernetes-module/common.ts +++ b/core/src/plugins/kubernetes/kubernetes-module/common.ts @@ -40,7 +40,7 @@ export async function getManifests({ return Bluebird.map(manifests, async (manifest) => { // Ensure a namespace is set, if not already set, and if required by the resource type if (!manifest.metadata.namespace) { - const info = await api.getApiResourceInfo(log, manifest) + const info = await api.getApiResourceInfo(log, manifest.apiVersion, manifest.kind) if (info.namespaced) { manifest.metadata.namespace = defaultNamespace diff --git a/core/src/plugins/kubernetes/kubernetes-module/handlers.ts b/core/src/plugins/kubernetes/kubernetes-module/handlers.ts index 36825b94d9..e8f996bf1d 100644 --- a/core/src/plugins/kubernetes/kubernetes-module/handlers.ts +++ b/core/src/plugins/kubernetes/kubernetes-module/handlers.ts @@ -16,7 +16,7 @@ import { ServiceStatus } from "../../../types/service" import { compareDeployedResources, waitForResources } from "../status/status" import { KubeApi } from "../api" import { ModuleAndRuntimeActionHandlers } from "../../../types/plugin/plugin" -import { getAllLogs } from "../logs" +import { streamK8sLogs } from "../logs" import { deleteObjectsBySelector, apply } from "../kubectl" import { BuildModuleParams, BuildResult } from "../../../types/plugin/module/build" import { GetServiceStatusParams } from "../../../types/plugin/service/getServiceStatus" @@ -218,7 +218,7 @@ async function getServiceLogs(params: GetServiceLogsParams) { const api = await KubeApi.factory(log, ctx, provider) const manifests = await getManifests({ api, log, module, defaultNamespace: namespace }) - return getAllLogs({ ...params, provider, defaultNamespace: namespace, resources: manifests }) + return streamK8sLogs({ ...params, provider, defaultNamespace: namespace, resources: manifests }) } function getSelector(service: KubernetesService) { diff --git a/core/src/plugins/kubernetes/kubernetes-module/run.ts b/core/src/plugins/kubernetes/kubernetes-module/run.ts index a57f581be9..a95c0fa558 100644 --- a/core/src/plugins/kubernetes/kubernetes-module/run.ts +++ b/core/src/plugins/kubernetes/kubernetes-module/run.ts @@ -49,7 +49,7 @@ export async function runKubernetesTask(params: RunTaskParams) args, artifacts: task.spec.artifacts, envVars: task.spec.env, - image: container.image, + image: container.image!, namespace, podName: makePodName("task", module.name, task.name), description: `Task '${task.name}' in container module '${module.name}'`, diff --git a/core/src/plugins/kubernetes/kubernetes-module/test.ts b/core/src/plugins/kubernetes/kubernetes-module/test.ts index af209d1d90..a08a249672 100644 --- a/core/src/plugins/kubernetes/kubernetes-module/test.ts +++ b/core/src/plugins/kubernetes/kubernetes-module/test.ts @@ -37,7 +37,7 @@ export async function testKubernetesModule(params: TestModuleParams follow: boolean tail: number -} - -interface GetPodLogsParams extends GetLogsBaseParams { - pods: KubernetesPod[] -} - -interface GetAllLogsParams extends GetLogsBaseParams { resources: KubernetesResource[] } -interface GetLogsParams extends GetLogsBaseParams { - pod: KubernetesPod -} - -export const sternSpec: PluginToolSpec = { - name: "stern", - description: "Utility CLI for streaming logs from Kubernetes.", - type: "binary", - builds: [ - { - platform: "darwin", - architecture: "amd64", - url: "https://github.com/wercker/stern/releases/download/1.11.0/stern_darwin_amd64", - sha256: "7aea3b6691d47b3fb844dfc402905790665747c1e6c02c5cabdd41994533d7e9", - }, - { - platform: "linux", - architecture: "amd64", - url: "https://github.com/wercker/stern/releases/download/1.11.0/stern_linux_amd64", - sha256: "e0b39dc26f3a0c7596b2408e4fb8da533352b76aaffdc18c7ad28c833c9eb7db", - }, - { - platform: "windows", - architecture: "amd64", - url: "https://github.com/wercker/stern/releases/download/1.11.0/stern_windows_amd64.exe", - sha256: "75708b9acf6ef0eeffbe1f189402adc0405f1402e6b764f1f5152ca288e3109e", - }, - ], -} - -/** - * Stream all logs for the given pod names and service. - */ -export async function getPodLogs(params: GetPodLogsParams) { - const procs = await Bluebird.map(params.pods, (pod) => getLogs({ ...omit(params, "pods"), pod })) - - return new Promise((resolve, reject) => { - // Make sure to resolve if no processes get created - if (procs.length === 0) { - return resolve({}) - } - for (const proc of procs) { - proc.on("error", () => reject) - - proc.on("exit", () => resolve({})) - } - }) -} - /** * Stream all logs for the given resources and service. */ -export async function getAllLogs(params: GetAllLogsParams) { +export async function streamK8sLogs(params: GetAllLogsParams) { const api = await KubeApi.factory(params.log, params.ctx, params.provider) const pods = await getAllPods(api, params.defaultNamespace, params.resources) - return getPodLogs({ ...params, pods }) -} -async function getLogs({ ctx, log, provider, service, stream, tail, follow, pod }: GetLogsParams) { - if (follow) { - return followLogs({ ctx, log, provider, service, stream, tail, pod }) - } + if (params.follow) { + const procs = await Bluebird.map(pods, (pod) => followLogs({ ...omit(params, "pods"), pod })).filter(Boolean) + + return new Promise((resolve, reject) => { + // Make sure to resolve if no processes get created + if (procs.length === 0) { + return resolve({}) + } + for (const proc of procs) { + proc.on("error", () => reject) - return readLogs({ log, ctx, provider, service, stream, tail, pod }) + proc.on("exit", () => resolve({})) + } + }) + } else { + await Bluebird.map(pods, (pod) => readLogsFromApi({ ...omit(params, "pods"), pod })) + return {} + } } -async function readLogs({ +async function readLogsFromApi({ log, ctx, provider, @@ -118,28 +72,50 @@ async function readLogs({ stream, tail, pod, + defaultNamespace, }: { log: LogEntry ctx: PluginContext provider: KubernetesProvider service: Service stream: Stream - tail: number + tail?: number pod: KubernetesPod + defaultNamespace: string }) { - const kubectlArgs = ["logs", "--tail", String(tail), "--timestamps=true", "--all-containers=true"] - - kubectlArgs.push(`pod/${pod.metadata.name}`) + const api = await KubeApi.factory(log, ctx, provider) + + const logs = await getPodLogs({ + api, + namespace: pod.metadata.namespace || defaultNamespace, + pod, + lineLimit: tail === -1 ? undefined : tail, + timestamps: true, + }) - const proc = await kubectl(ctx, provider).spawn({ - args: kubectlArgs, - log, - namespace: pod.metadata.namespace, + const serviceName = service.name + + const allLines = logs.flatMap(({ containerName, log: _log }) => { + return _log.split("\n").map((line) => { + try { + const [timestampStr, msg] = splitFirst(line, " ") + const timestamp = moment(timestampStr).toDate() + return { serviceName, timestamp, msg: formatLine(containerName, msg) } + } catch { + return { serviceName, msg: formatLine(containerName, line) } + } + }) }) - handleLogMessageStreamFromProcess(proc, stream, service) - return proc + for (const line of sortBy(allLines, "timestamp")) { + void stream.write(line) + } } + +function formatLine(containerName: string, line: string) { + return chalk.gray(containerName + " → ") + line.trimEnd() +} + async function followLogs({ ctx, log, @@ -148,6 +124,7 @@ async function followLogs({ stream, tail, pod, + defaultNamespace, }: { ctx: PluginContext log: LogEntry @@ -156,10 +133,11 @@ async function followLogs({ stream: Stream tail: number pod: KubernetesPod + defaultNamespace: string }) { const sternArgs = [ `--context=${provider.config.context}`, - `--namespace=${pod.metadata.namespace}`, + `--namespace=${pod.metadata.namespace || defaultNamespace}`, `--exclude-container=garden-*`, "--tail", String(tail), @@ -186,52 +164,56 @@ async function followLogs({ log, }) - handleLogMessageStreamFromProcess(proc, stream, service, true) - return proc -} - -function handleLogMessageStreamFromProcess( - proc: ChildProcess, - stream: Stream, - service: Service, - json?: boolean -) { - let timestamp: Date - - proc.stdout!.pipe(split()).on("data", (s) => { + proc.stdout!.pipe(split()).on("data", (s: Buffer) => { if (!s) { return } - let timestampStr: string + let timestamp: Date | undefined = undefined let msg: string try { - const parsed = json ? parseSternLogMessage(s) : splitFirst(s, " ") - timestampStr = parsed[0] - msg = parsed[1] + const parsed = JSON.parse(s.toString()) + let [timestampStr, line] = splitFirst(parsed.message, " ") + msg = formatLine(parsed.containerName, line) timestamp = moment(timestampStr).toDate() } catch (err) { /** * If the message was supposed to be JSON but parsing failed, we stream the message unparsed. It may contain * error information useful for debugging. */ - msg = s + msg = s.toString() } void stream.write({ serviceName: service.name, timestamp, - msg: `${msg}`, + msg, }) }) + + return proc } -function parseSternLogMessage(message: string): string[] { - let log = JSON.parse(message) - const logMessageChunks = log.message.split(" ") - return [ - logMessageChunks[0], - logMessageChunks - .slice(1, logMessageChunks.length) - .join(" ") - .trimEnd(), - ] +export const sternSpec: PluginToolSpec = { + name: "stern", + description: "Utility CLI for streaming logs from Kubernetes.", + type: "binary", + builds: [ + { + platform: "darwin", + architecture: "amd64", + url: "https://github.com/wercker/stern/releases/download/1.11.0/stern_darwin_amd64", + sha256: "7aea3b6691d47b3fb844dfc402905790665747c1e6c02c5cabdd41994533d7e9", + }, + { + platform: "linux", + architecture: "amd64", + url: "https://github.com/wercker/stern/releases/download/1.11.0/stern_linux_amd64", + sha256: "e0b39dc26f3a0c7596b2408e4fb8da533352b76aaffdc18c7ad28c833c9eb7db", + }, + { + platform: "windows", + architecture: "amd64", + url: "https://github.com/wercker/stern/releases/download/1.11.0/stern_windows_amd64.exe", + sha256: "75708b9acf6ef0eeffbe1f189402adc0405f1402e6b764f1f5152ca288e3109e", + }, + ], } diff --git a/core/src/plugins/kubernetes/namespace.ts b/core/src/plugins/kubernetes/namespace.ts index 56081cb5d3..d634f98eaf 100644 --- a/core/src/plugins/kubernetes/namespace.ts +++ b/core/src/plugins/kubernetes/namespace.ts @@ -15,7 +15,7 @@ import { KubernetesProvider, KubernetesPluginContext } from "./config" import { DeploymentError, TimeoutError } from "../../exceptions" import { getPackageVersion, sleep } from "../../util/util" import { GetEnvironmentStatusParams } from "../../types/plugin/provider/getEnvironmentStatus" -import { kubectl, KUBECTL_DEFAULT_TIMEOUT } from "./kubectl" +import { KUBECTL_DEFAULT_TIMEOUT } from "./kubectl" import { LogEntry } from "../../logger/log-entry" import { gardenAnnotationKey } from "../../util/string" import dedent from "dedent" @@ -141,8 +141,8 @@ export async function prepareNamespaces({ ctx, log }: GetEnvironmentStatusParams const k8sCtx = ctx try { - // TODO: use API instead of kubectl (I just couldn't find which API call to make) - await kubectl(k8sCtx, k8sCtx.provider).exec({ log, args: ["version"] }) + const api = await KubeApi.factory(log, ctx, ctx.provider as KubernetesProvider) + await api.request({ path: "/version", log }) } catch (err) { log.setError("Error") diff --git a/core/src/plugins/kubernetes/run.ts b/core/src/plugins/kubernetes/run.ts index cc9ce5a3dd..c12f0550a5 100644 --- a/core/src/plugins/kubernetes/run.ts +++ b/core/src/plugins/kubernetes/run.ts @@ -10,20 +10,23 @@ import { resolve } from "path" import tar from "tar" import tmp from "tmp-promise" import { V1PodSpec, V1Pod, V1Container } from "@kubernetes/client-node" -import { tailString } from "../../util/string" import { RunResult } from "../../types/plugin/base" -import { kubectl } from "./kubectl" import { GardenModule } from "../../types/module" import { LogEntry } from "../../logger/log-entry" -import { PluginError, GardenBaseError, TimeoutError, RuntimeError, ConfigurationError } from "../../exceptions" +import { + PluginError, + GardenBaseError, + TimeoutError, + RuntimeError, + ConfigurationError, + ParameterError, +} from "../../exceptions" import { KubernetesProvider } from "./config" -import { Writable } from "stream" -import { ChildProcess } from "child_process" -import { sleep, uniqByName } from "../../util/util" +import { Writable, Readable } from "stream" +import { uniqByName, sleep } from "../../util/util" import { KubeApi } from "./api" -import { checkPodStatus, getPodLogs } from "./status/pod" -import { KubernetesServerResource } from "./types" -import { ServiceState } from "../../types/service" +import { getPodLogs, checkPodStatus } from "./status/pod" +import { KubernetesResource, KubernetesPod } from "./types" import { RunModuleParams } from "../../types/plugin/module/runModule" import { ContainerEnvVars, ContainerVolumeSpec } from "../container/config" import { prepareEnvVars, makePodName } from "./util" @@ -33,9 +36,13 @@ import cpy from "cpy" import { prepareImagePullSecrets } from "./secrets" import { configureVolumes } from "./container/deployment" import { PluginContext } from "../../plugin-context" +import { waitForResources, ResourceStatus } from "./status/status" +import { cloneDeep } from "lodash" -const MAX_BUFFER_SIZE = 1024 * 1024 +// Default timeout for individual run/exec operations +const defaultTimeout = 600 +// TODO: break this function up export async function runAndCopy({ ctx, log, @@ -137,38 +144,32 @@ export async function runAndCopy({ podName = makePodName("run", module.name) } + const pod: KubernetesResource = { + apiVersion: "v1", + kind: "Pod", + metadata: { + name: podName, + namespace, + }, + spec, + } + const runner = new PodRunner({ ctx, api, - podName, + pod, provider, - image, - module, namespace, - spec, }) let result: RunResult const startedAt = new Date() - // Need to retrieve the logs explicitly, because kubectl exec/run sometimes fail to capture them - const getLogs = async () => { - const containerLogs = await getPodLogs({ - api, - namespace, - podName: runner.podName, - containerNames: [mainContainerName], - }) - return containerLogs[0]?.log || "" - } - - const timedOutResult = async () => { - const logs = (await getLogs()).trim() - + const timedOutResult = async (logs: string) => { return { command: runner.getFullCommand(), completedAt: new Date(), - log: "Command timed out." + (logs ? ` Here are the logs until the timeout occurred:\n\n${logs.trim()}` : ""), + log: "Command timed out." + (logs ? ` Here are the logs until the timeout occurred:\n\n${logs}` : ""), moduleName: module.name, startedAt, success: false, @@ -176,23 +177,61 @@ export async function runAndCopy({ } } - if (getArtifacts) { + if (!getArtifacts) { try { - // Start the Pod - const { pod, state, debugLog } = await runner.start({ - ignoreError: true, + const res = await runner.runAndWait({ log, - stdout, - stderr, + remove: true, + timeoutSec: timeout || defaultTimeout, + tty: !!interactive, }) + result = { + ...res, + moduleName: module.name, + version: module.version.versionString, + } + } catch (err) { + if (err.type === "timeout") { + result = await timedOutResult(err.detail.logs) + } else if (err.type === "pod-runner") { + // Command exited with non-zero code + result = { + log: err.detail.logs || err.message, + moduleName: module.name, + version: module.version.versionString, + success: false, + startedAt, + completedAt: new Date(), + command: [...(command || []), ...(args || [])], + } + } else { + throw err + } + } + + return result + } - errorMetadata.pod = pod - errorMetadata.state = state - errorMetadata.debugLog = debugLog + const timeoutSec = timeout || defaultTimeout - if (state !== "ready") { - // Specifically look for error indicating `sh` is missing, and report with helpful message. - const containerStatus = pod!.status.containerStatuses![0] + try { + errorMetadata.pod = pod + + // Start the Pod + try { + await runner.start({ log, timeoutSec }) + } catch (err) { + if (err.type !== "deployment") { + throw err + } + + // Specifically look for deployment error indicating `sh` is missing, and report with more helpful message. + const status = err.detail.status + + errorMetadata.status = status + + if (status.state !== "ready") { + const containerStatus = status.resource.status.containerStatuses![0] if (containerStatus?.state?.terminated?.message?.includes("not found")) { throw new ConfigurationError( @@ -203,158 +242,160 @@ export async function runAndCopy({ errorMetadata ) } else { - throw new RuntimeError(`Failed to start Pod ${runner.podName}: ${debugLog}`, errorMetadata) + throw new RuntimeError( + `Failed to start Pod ${runner.podName}: ${JSON.stringify(status.resource.status, null, 2)}`, + errorMetadata + ) } } + } - try { - await runner.exec({ - command: ["sh", "-c", "tar --help"], - container: mainContainerName, - ignoreError: false, - log, - stdout, - stderr, - }) - } catch (err) { - // TODO: fall back to copying `arc` (https://github.com/mholt/archiver) or similar into the container and - // using that (tar is not statically compiled so we can't copy that directly). Keeping this snippet around - // for that: - // await runner.exec({ - // command: ["sh", "-c", `sed -n 'w ${arcPath}'; chmod +x ${arcPath}`], - // container: containerName, - // ignoreError: false, - // input: , - // log, - // stdout, - // stderr, - // }) - throw new ConfigurationError( - deline` - ${description} specifies artifacts to export, but the image doesn't - contain the tar binary. In order to copy artifacts out of Kubernetes containers, both sh and tar need to - be installed in the image.`, - errorMetadata - ) - } + try { + await runner.exec({ + command: ["sh", "-c", "tar --help"], + containerName: mainContainerName, + log, + stdout, + stderr, + // Anything above two minutes for this would be unusual + timeoutSec: 120, + }) + } catch (err) { + // TODO: fall back to copying `arc` (https://github.com/mholt/archiver) or similar into the container and + // using that (tar is not statically compiled so we can't copy that directly). Keeping this snippet around + // for that: + // await runner.exec({ + // command: ["sh", "-c", `sed -n 'w ${arcPath}'; chmod +x ${arcPath}`], + // container: containerName, + // ignoreError: false, + // input: , + // log, + // stdout, + // stderr, + // }) + throw new ConfigurationError( + deline` + ${description} specifies artifacts to export, but the image doesn't + contain the tar binary. In order to copy artifacts out of Kubernetes containers, both sh and tar need to + be installed in the image.`, + errorMetadata + ) + } - // Escape the command, so that we can safely pass it as a single string - const cmd = [...command!, ...(args || [])].map((s) => JSON.stringify(s)).join(" ") + // Escape the command, so that we can safely pass it as a single string + const cmd = [...command!, ...(args || [])].map((s) => JSON.stringify(s)) - try { - result = await runner.exec({ - // Pipe the output from the command to the /tmp/output pipe, including stderr. Some shell voodoo happening - // here, but this was the only working approach I could find after a lot of trial and error. - command: ["sh", "-c", `exec >/tmp/output; ${cmd}`], - container: mainContainerName, - ignoreError: true, - log, - stdout, - stderr, - timeout, - }) - result.log = (await getLogs()).trim() || result.log - } catch (err) { - if (err.type === "timeout") { - result = await timedOutResult() - } else { - throw err + try { + const res = await runner.exec({ + // Pipe the output from the command to the /tmp/output pipe, including stderr. Some shell voodoo happening + // here, but this was the only working approach I could find after a lot of trial and error. + command: ["sh", "-c", `exec >/tmp/output; ${cmd.join(" ")}`], + containerName: mainContainerName, + log, + stdout, + stderr, + timeoutSec, + }) + result = { + ...res, + log: (await runner.getMainContainerLogs()).trim() || res.log, + moduleName: module.name, + version: module.version.versionString, + } + } catch (err) { + const res = err.detail.result + + if (err.type === "timeout") { + // Command timed out + result = await timedOutResult((await runner.getMainContainerLogs()).trim()) + } else if (err.type === "pod-runner" && res && res.exitCode) { + // Command exited with non-zero code + result = { + log: (await runner.getMainContainerLogs()).trim() || err.message, + moduleName: module.name, + version: module.version.versionString, + success: false, + startedAt, + completedAt: new Date(), + command: cmd, } + } else { + throw err } + } - // Copy the artifacts - await Promise.all( - artifacts.map(async (artifact) => { - const tmpDir = await tmp.dir({ unsafeCleanup: true }) - // Remove leading slash (which is required in the schema) - const sourcePath = artifact.source.slice(1) - const targetPath = resolve(artifactsPath!, artifact.target || ".") - - const tarCmd = [ - "tar", - "-c", // create an archive - "-f", - "-", // pipe to stdout - // Files to match. The .DS_Store file is a trick to avoid errors when no files are matched. The file is - // ignored later when copying from the temp directory. See https://github.com/sindresorhus/cpy#ignorejunk - `$(ls ${sourcePath} 2>/dev/null) .DS_Store`, - ] + // Copy the artifacts + await Promise.all( + artifacts.map(async (artifact) => { + const tmpDir = await tmp.dir({ unsafeCleanup: true }) + // Remove leading slash (which is required in the schema) + const sourcePath = artifact.source.slice(1) + const targetPath = resolve(artifactsPath!, artifact.target || ".") + + const tarCmd = [ + "tar", + "-c", // create an archive + "-f", + "-", // pipe to stdout + // Files to match. The .DS_Store file is a trick to avoid errors when no files are matched. The file is + // ignored later when copying from the temp directory. See https://github.com/sindresorhus/cpy#ignorejunk + `$(ls ${sourcePath} 2>/dev/null) .DS_Store`, + ] + + try { + await new Promise((_resolve, reject) => { + // Create an extractor to receive the tarball we will stream from the container + // and extract to the artifacts directory. + let done = 0 + + const extractor = tar.x({ + cwd: tmpDir.path, + strict: true, + onentry: (entry) => log.debug("tar: got file " + entry.path), + }) - try { - await new Promise((_resolve, reject) => { - // Create an extractor to receive the tarball we will stream from the container - // and extract to the artifacts directory. - let done = 0 - - const extractor = tar.x({ - cwd: tmpDir.path, - strict: true, - onentry: (entry) => log.debug("tar: got file " + entry.path), - }) + extractor.on("end", () => { + // Need to make sure both processes are complete before resolving (may happen in either order) + done++ + done === 2 && _resolve() + }) + extractor.on("error", (err) => { + reject(err) + }) - extractor.on("end", () => { + // Tarball the requested files and stream to the above extractor. + runner + .exec({ + command: ["sh", "-c", "cd / && touch .DS_Store && " + tarCmd.join(" ")], + containerName: mainContainerName, + log, + stdout: extractor, + timeoutSec, + }) + .then(() => { // Need to make sure both processes are complete before resolving (may happen in either order) done++ done === 2 && _resolve() }) - extractor.on("error", (err) => { - reject(err) - }) - - // Tarball the requested files and stream to the above extractor. - runner - .exec({ - command: ["sh", "-c", "cd / && touch .DS_Store && " + tarCmd.join(" ")], - container: mainContainerName, - ignoreError: false, - log, - stdout: extractor, - }) - .then(() => { - // Need to make sure both processes are complete before resolving (may happen in either order) - done++ - done === 2 && _resolve() - }) - .catch(reject) - }) + .catch(reject) + }) - // Copy the resulting files to the artifacts directory - try { - await cpy("**/*", targetPath, { cwd: tmpDir.path, ignoreJunk: true }) - } catch (err) { - // Ignore error thrown when the directory is empty - if (err.name !== "CpyError" || !err.message.includes("the file doesn't exist")) { - throw err - } + // Copy the resulting files to the artifacts directory + try { + await cpy("**/*", targetPath, { cwd: tmpDir.path, ignoreJunk: true }) + } catch (err) { + // Ignore error thrown when the directory is empty + if (err.name !== "CpyError" || !err.message.includes("the file doesn't exist")) { + throw err } - } finally { - await tmpDir.cleanup() } - }) - ) - } finally { - await runner.stop() - } - } else { - try { - result = await runner.startAndWait({ - interactive, - ignoreError: true, - log, - remove: false, - timeout, + } finally { + await tmpDir.cleanup() + } }) - result.log = (await getLogs()).trim() || result.log - } catch (err) { - if (err.type === "timeout") { - result = await timedOutResult() - } else { - throw err - } - } finally { - // Make sure Pod is cleaned up - await runner.stop() - } + ) + } finally { + await runner.stop() } return result @@ -364,45 +405,53 @@ class PodRunnerParams { ctx: PluginContext annotations?: { [key: string]: string } api: KubeApi - image: string - module: GardenModule + pod: KubernetesPod namespace: string - podName: string provider: KubernetesProvider - spec: V1PodSpec } interface StartParams { - ignoreError?: boolean - input?: Buffer | string log: LogEntry - stdout?: Writable - stderr?: Writable - timeout?: number + timeoutSec?: number } type ExecParams = StartParams & { command: string[] - container?: string - ignoreError?: boolean + containerName?: string + stdout?: Writable + stderr?: Writable + stdin?: Readable + tty?: boolean } -type StartAndWaitParams = StartParams & { - interactive: boolean - remove?: boolean +type RunParams = StartParams & { + stdout?: Writable + stderr?: Writable + stdin?: Readable + remove: boolean + tty: boolean } class PodRunnerError extends GardenBaseError { - type = "PodRunner" + type = "pod-runner" +} + +interface RunAndWaitResult { + command: string[] + startedAt: Date + completedAt: Date + log: string + success: boolean } export class PodRunner extends PodRunnerParams { - proc: ChildProcess + podName: string + running: boolean constructor(params: PodRunnerParams) { super() - const spec = params.spec + const spec = params.pod.spec if (!spec.containers || spec.containers.length === 0) { throw new PluginError(`Pod spec for PodRunner must contain at least one container`, { @@ -410,250 +459,241 @@ export class PodRunner extends PodRunnerParams { }) } + params.pod.metadata.annotations = { + ...(params.pod.metadata.annotations || {}), + // Workaround to make sure sidecars are not injected, + // due to https://github.com/kubernetes/kubernetes/issues/25908 + "sidecar.istio.io/inject": "false", + } + Object.assign(this, params) + + this.podName = this.pod.metadata.name } getFullCommand() { - return [...(this.spec.containers[0].command || []), ...(this.spec.containers[0].args || [])] + return [...(this.pod.spec.containers[0].command || []), ...(this.pod.spec.containers[0].args || [])] + } + + getMainContainerName() { + return this.pod.spec.containers[0].name } /** - * Starts the Pod, attaches to it, and waits for a result. Use this if you just need to start a Pod - * and get its output, and for interactive sessions. + * Runs the Pod, waits for it to terminate, and returns the result. Throws if the Pod never successfully starts. + * Returns the logs for the first container in the Pod. Returns success=false if Pod exited with non-zero code. + * + * If tty=true, we attach to the process stdio during execution. */ - async startAndWait({ - log, - ignoreError, - interactive, - stdout, - remove = true, - timeout, - }: StartAndWaitParams): Promise { - const { module, spec } = this - - if (interactive) { - spec.containers[0].stdin = true - spec.containers[0].stdinOnce = true - spec.containers[0].tty = true - } - - const kubecmd = [...this.getBaseRunArgs(), interactive ? "--tty" : "--quiet"] - - if (remove) { - kubecmd.push("--rm") - } - - const command = this.getFullCommand() - log.verbose(`Running '${command.join(" ")}' in Pod ${this.podName}`) + async runAndWait(params: RunParams): Promise { + const { log, remove, timeoutSec, tty } = params + let { stdout, stderr, stdin } = params + const { namespace, podName } = this const startedAt = new Date() + let success = true + let attached = false + let mainContainerLogs = "" + const mainContainerName = this.getMainContainerName() + + if (tty) { + if (stdout || stderr || stdin) { + throw new ParameterError(`Cannot set both tty and stdout/stderr/stdin streams`, { params }) + } - // TODO: use API library - const res = await kubectl(this.ctx, this.provider).spawnAndWait({ - log, - namespace: this.namespace, - ignoreError, - args: kubecmd, - stdout, - timeoutSec: timeout, - tty: interactive, - }) - - return { - moduleName: module.name, - command, - version: module.version.versionString, - startedAt, - completedAt: new Date(), - log: res.all, - success: res.code === 0, + stdout = process.stdout + stderr = process.stderr + stdin = process.stdin } - } - /** - * Starts the Pod and leaves it running. Use this along with the `exec()` method when you need to run multiple - * commands in the same Pod. Note that you *must manually call `stop()`* when you're done. Otherwise the Pod will - * stay running in the cluster until the process exits. - */ - async start({ log, ignoreError, stdout, stderr, timeout }: StartParams) { - const { spec } = this - - const kubecmd = [...this.getBaseRunArgs(), "--quiet"] - - const command = [...(spec.containers[0].command || []), ...(spec.containers[0].args || [])] - log.verbose(`Starting Pod ${this.podName} with command '${command.join(" ")}'`) + const getDebugLogs = async () => { + try { + return this.getMainContainerLogs() + } catch (err) { + return "" + } + } - // TODO: use API directly - this.proc = await kubectl(this.ctx, this.provider).spawn({ - log, - namespace: this.namespace, - args: kubecmd, - stdout, - stderr, - }) + try { + await this.createPod({ log, tty }) + + // Wait until Pod terminates + while (true) { + const serverPod = await this.api.core.readNamespacedPodStatus(podName, namespace) + const state = checkPodStatus(serverPod) + + const mainContainerStatus = (serverPod.status.containerStatuses || []).find((s) => s.name === mainContainerName) + const terminated = mainContainerStatus?.state?.terminated + const exitReason = terminated?.reason + const exitCode = terminated?.exitCode + + if (state === "unhealthy") { + if ( + exitCode !== undefined && + exitCode < 127 && + exitReason !== "ContainerCannotRun" && + exitReason !== "StartError" + ) { + // Successfully ran the command in the main container, but returned non-zero exit code + success = false + break + } - let debugLog = "" - this.proc.stdout!.on("data", (data) => (debugLog += data)) - this.proc.stderr!.on("data", (data) => (debugLog += data)) + const statusStr = terminated + ? `${terminated.reason} - ${terminated.message}` + : "Status:\n" + JSON.stringify(serverPod.status, null, 2) - const start = new Date().getTime() - let pod: KubernetesServerResource | undefined - let state: ServiceState = "missing" + throw new PodRunnerError(`Failed to start Pod ${podName}. ${statusStr}`, { + logs: statusStr, + exitCode, + pod: serverPod, + }) + } - // Wait for Pod to be ready - while (true) { - await sleep(250) + if (state === "stopped") { + success = exitCode === 0 + break + } - try { - pod = await this.api.core.readNamespacedPod(this.podName, this.namespace) - } catch (err) { - if (err.statusCode === 404) { - if (this.proc.killed) { - if (ignoreError) { - break - } - throw new RuntimeError(`Failed to start Pod ${this.podName}: ${debugLog}`, { - podName: this.podName, - log: debugLog, + if (!attached && (tty || stdout || stderr)) { + // Try to attach to Pod to stream logs + try { + await this.api.attachToPod({ + namespace, + podName, + containerName: mainContainerName, + stdout, + stderr, + stdin, + tty, }) + attached = true + } catch (err) { + // Ignore errors when attaching, we'll just keep trying } - // Pod isn't ready - continue } - } - state = checkPodStatus(pod!) + const elapsed = (new Date().getTime() - startedAt.getTime()) / 1000 - if (state === "ready") { - break - } else if (state === "unhealthy") { - if (ignoreError) { - break + if (timeoutSec && elapsed > timeoutSec) { + const msg = `Command timed out after ${timeoutSec} seconds.` + throw new TimeoutError(msg, { + logs: (await getDebugLogs()) || msg, + serverPod, + }) } - throw new RuntimeError(`Failed to start Pod ${this.podName}: ${debugLog}`, { pod }) + + await sleep(200) } - if (timeout && new Date().getTime() - start > timeout) { - throw new TimeoutError(`Timed out waiting for Pod ${this.podName} to start: ${debugLog}`, { - podName: this.podName, - log: debugLog, - }) + // Retrieve logs after run + mainContainerLogs = await this.getMainContainerLogs() + } finally { + if (remove) { + await this.stop() } } - return { proc: this.proc, pod, state, debugLog } - } - - async spawn(params: ExecParams) { - const { log, command, container, ignoreError, input, stdout, stderr, timeout } = params - - if (!this.proc) { - throw new PodRunnerError(`Attempting to spawn a command in Pod before starting it`, { command }) + return { + command: this.getFullCommand(), + startedAt, + completedAt: new Date(), + log: mainContainerLogs, + success, } + } - // TODO: use API library - const args = ["exec", "-i", this.podName, "-c", container || this.spec.containers[0].name, "--", ...command] - - const startedAt = new Date() - - const proc = await kubectl(this.ctx, this.provider).spawn({ - args, - namespace: this.namespace, - ignoreError, - input, - log, - stdout, - stderr, - timeoutSec: timeout, - }) - - let result: string = "" - - return new Promise((_resolve, reject) => { - proc.on("close", (code) => { - if (code === 0) { - _resolve({ - moduleName: this.module.name, - command, - version: this.module.version.versionString, - startedAt, - completedAt: new Date(), - log: result, - success: code === 0, - }) - } - - reject( - new RuntimeError(`Failed to spawn kubectl process with code ${code}`, { - code, - }) - ) - }) + /** + * Starts the Pod and leaves it running. Use this along with the `exec()` method when you need to run multiple + * commands in the same Pod. Note that you *must manually call `stop()`* when you're done. Otherwise the Pod will + * stay running in the cluster until the process exits. + */ + async start({ log, timeoutSec }: StartParams) { + const { ctx, provider, pod, namespace } = this - proc.on("error", (err) => { - !proc.killed && proc.kill() - throw err - }) + await this.createPod({ log, tty: false }) - proc.stdout!.on("data", (s) => { - result = tailString(result + s, MAX_BUFFER_SIZE, true) - }) + // Wait for Pod to be ready + const statuses = await waitForResources({ namespace, ctx, provider, resources: [pod], log, timeoutSec }) - stdout && proc.stdout?.pipe(stdout) - stderr && proc.stderr?.pipe(stderr) - }) + return { status: statuses[0] as ResourceStatus } } /** * Executes a command in the running Pod. Must be called after `start()`. */ async exec(params: ExecParams) { - const { log, command, container, ignoreError, input, stdout, stderr, timeout } = params + const { command, containerName: container, timeoutSec, tty = false } = params + let { stdout, stderr, stdin } = params - if (!this.proc) { - throw new PodRunnerError(`Attempting to exec a command in Pod before starting it`, { command }) - } + if (tty) { + if (stdout || stderr || stdin) { + throw new ParameterError(`Cannot set both tty and stdout/stderr/stdin streams`, { params }) + } - // TODO: use API library - const args = ["exec", "-i", this.podName, "-c", container || this.spec.containers[0].name, "--", ...command] + stdout = process.stdout + stderr = process.stderr + stdin = process.stdin + } const startedAt = new Date() - const res = await kubectl(this.ctx, this.provider).exec({ - args, + const result = await this.api.execInPod({ namespace: this.namespace, - ignoreError, - input, - log, + podName: this.podName, + containerName: container || this.pod.spec.containers[0].name, + command, stdout, stderr, - timeoutSec: timeout, + stdin, + tty, + timeoutSec, }) - if (res.timedOut) { - throw new TimeoutError("Command timed out.", { error: res }) + if (result.timedOut) { + throw new TimeoutError(`Command timed out after ${timeoutSec} seconds.`, { + result, + logs: result.allLogs, + }) + } + + if (result.exitCode !== 0) { + throw new PodRunnerError(`Command exited with code ${result.exitCode}:\n${result.allLogs}`, { + result, + logs: result.allLogs, + }) } return { - moduleName: this.module.name, command, - version: this.module.version.versionString, startedAt, completedAt: new Date(), - log: res.stdout + res.stderr, - success: res.exitCode === 0, + log: result.stdout + result.stderr, + exitCode: result.exitCode, + success: result.exitCode === 0, } } + async getLogs() { + const { api, namespace, pod } = this + + return getPodLogs({ + api, + namespace, + pod, + }) + } + + async getMainContainerLogs() { + const allLogs = await this.getLogs() + return allLogs.find((l) => l.containerName === this.getMainContainerName())?.log || "" + } + /** - * Disconnects from a connected Pod (if any) and removes it from the cluster. You can safely call this even + * Removes the Pod from the cluster, if it's running. You can safely call this even * if the process is no longer active. */ async stop() { - if (this.proc) { - delete this.proc - } - try { await this.api.core.deleteNamespacedPod(this.podName, this.namespace, undefined, undefined, 0) } catch (err) { @@ -663,31 +703,26 @@ export class PodRunner extends PodRunnerParams { } } - private getBaseRunArgs() { - const { spec } = this - - const overrides: any = { - metadata: { - annotations: { - // Workaround to make sure sidecars are not injected, - // due to https://github.com/kubernetes/kubernetes/issues/25908 - "sidecar.istio.io/inject": "false", - ...(this.annotations || {}), - }, - }, - spec, + private async createPod({ log, tty }: { log: LogEntry; tty: boolean }) { + const command = this.getFullCommand() + log.verbose(`Starting Pod ${this.podName} with command '${command.join(" ")}'`) + + const pod = cloneDeep(this.pod) + + if (tty) { + // Need to be sure container is attachable + pod.spec.containers[0].stdin = true + pod.spec.containers[0].stdinOnce = true + pod.spec.containers[0].tty = true } - return [ - "run", - this.podName || makePodName("run", this.module.name), - `--image=${this.image}`, - "--restart=Never", - // Need to attach to get the log output and exit code. - "-i", - // This is a little messy, but it works... - "--overrides", - `${JSON.stringify(overrides)}`, - ] + // We never want to restart containers in these ephemeral pods + pod.spec.restartPolicy = "Never" + + try { + await this.api.createPod(this.namespace, pod) + } catch (error) { + throw new PodRunnerError(`Failed to create Pod ${this.podName}: ${error.message}`, { error }) + } } } diff --git a/core/src/plugins/kubernetes/status/pod.ts b/core/src/plugins/kubernetes/status/pod.ts index fc800c702e..30a87b9131 100644 --- a/core/src/plugins/kubernetes/status/pod.ts +++ b/core/src/plugins/kubernetes/status/pod.ts @@ -8,8 +8,8 @@ import { KubeApi, KubernetesError } from "../api" import Bluebird from "bluebird" -import { KubernetesServerResource } from "../types" -import { V1Pod } from "@kubernetes/client-node" +import { KubernetesServerResource, KubernetesPod } from "../types" +import { V1Pod, V1Status } from "@kubernetes/client-node" import { ResourceStatus } from "./status" import chalk from "chalk" import { ServiceState, combineStates } from "../../../types/service" @@ -25,7 +25,11 @@ export function checkPodStatus(pod: KubernetesServerResource): ServiceSta if (containerStatuses) { for (const c of containerStatuses) { - if (c.state && c.state.waiting && c.state.waiting.reason === "ImageInspectError") { + if ( + c.state && + c.state.waiting && + (c.state.waiting.reason === "ImageInspectError" || c.state.waiting.reason === "ErrImagePull") + ) { return "unhealthy" } if (c.state && c.state.terminated) { @@ -43,7 +47,7 @@ export function checkPodStatus(pod: KubernetesServerResource): ServiceSta return "unhealthy" } else if (phase === "Running") { return "ready" - } else if (phase === "Succeeded") { + } else if (phase === "Succeeded" || phase === "Completed") { return "stopped" } else { return "unknown" @@ -60,31 +64,21 @@ export function checkWorkloadPodStatus( export async function getPodLogs({ api, namespace, - podName, + pod, containerNames, byteLimit, lineLimit, + timestamps, }: { api: KubeApi namespace: string - podName: string + pod: V1Pod containerNames?: string[] byteLimit?: number lineLimit?: number + timestamps?: boolean }) { - let podRes: V1Pod - - try { - podRes = await api.core.readNamespacedPod(podName, namespace) - } catch (err) { - if (err.statusCode === 404) { - return [] - } else { - throw err - } - } - - let podContainers = podRes.spec!.containers.map((c) => c.name).filter((n) => !n.match(/garden-/)) + let podContainers = pod.spec!.containers.map((c) => c.name).filter((n) => !n.match(/garden-/)) if (containerNames) { podContainers = podContainers.filter((name) => containerNames.includes(name)) @@ -93,21 +87,42 @@ export async function getPodLogs({ return Bluebird.map(podContainers, async (containerName) => { let log = "" + const follow = false + const insecureSkipTLSVerify = false + const pretty = undefined + const sinceSeconds = undefined + try { log = await api.core.readNamespacedPodLog( - podName, + pod.metadata!.name!, namespace, containerName, - false, // follow - false, // insecureSkipTLSVerify + follow, + insecureSkipTLSVerify, byteLimit, - undefined, // pretty + pretty, false, // previous - undefined, // sinceSeconds - lineLimit + sinceSeconds, + lineLimit, + timestamps ) } catch (err) { - if (err instanceof KubernetesError && err.message.includes("waiting to start")) { + if (err.statusCode === 404) { + // Couldn't find pod/container, try requesting a previously terminated one + log = await api.core.readNamespacedPodLog( + pod.metadata!.name!, + namespace, + containerName, + follow, + insecureSkipTLSVerify, + byteLimit, + pretty, + true, // previous + sinceSeconds, + lineLimit, + timestamps + ) + } else if (err instanceof KubernetesError && err.message.includes("waiting to start")) { log = "" } else { throw err @@ -126,13 +141,13 @@ export async function getPodLogs({ /** * Get a formatted list of log tails for each of the specified pods. Used for debugging and error logs. */ -export async function getFormattedPodLogs(api: KubeApi, namespace: string, podNames: string[]): Promise { - const allLogs = await Bluebird.map(podNames, async (podName) => { +export async function getFormattedPodLogs(api: KubeApi, namespace: string, pods: KubernetesPod[]): Promise { + const allLogs = await Bluebird.map(pods, async (pod) => { return { - podName, + podName: pod.metadata.name, // Putting 5000 bytes as a length limit in addition to the line limit, just as a precaution in case someone // accidentally logs a binary file or something. - containers: await getPodLogs({ api, namespace, podName, byteLimit: 5000, lineLimit: podLogLines }), + containers: await getPodLogs({ api, namespace, pod, byteLimit: 5000, lineLimit: podLogLines }), } }) @@ -147,3 +162,20 @@ export async function getFormattedPodLogs(api: KubeApi, namespace: string, podNa }) .join("\n\n") } + +export function getExecExitCode(status: V1Status) { + let exitCode = 0 + + if (status.status !== "Success") { + exitCode = 1 + + const causes = status.details?.causes || [] + const exitCodeCause = causes.find((c) => c.reason === "ExitCode") + + if (exitCodeCause && exitCodeCause.message) { + exitCode = parseInt(exitCodeCause.message, 10) + } + } + + return exitCode +} diff --git a/core/src/plugins/kubernetes/status/status.ts b/core/src/plugins/kubernetes/status/status.ts index cb995853ef..8e24c0595c 100644 --- a/core/src/plugins/kubernetes/status/status.ts +++ b/core/src/plugins/kubernetes/status/status.ts @@ -15,7 +15,7 @@ import { KubeApi } from "../api" import { KUBECTL_DEFAULT_TIMEOUT } from "../kubectl" import { getAppNamespace } from "../namespace" import Bluebird from "bluebird" -import { KubernetesResource, KubernetesServerResource } from "../types" +import { KubernetesResource, KubernetesServerResource, BaseResource } from "../types" import { zip, isArray, isPlainObject, pickBy, mapValues, flatten, cloneDeep } from "lodash" import { KubernetesProvider, KubernetesPluginContext } from "../config" import { isSubset } from "../../../util/is-subset" @@ -33,24 +33,24 @@ import { checkWorkloadStatus } from "./workload" import { checkWorkloadPodStatus } from "./pod" import { gardenAnnotationKey, stableStringify } from "../../../util/string" -export interface ResourceStatus { +export interface ResourceStatus { state: ServiceState - resource: KubernetesServerResource + resource: KubernetesServerResource lastMessage?: string warning?: true logs?: string } -export interface StatusHandlerParams { +export interface StatusHandlerParams { api: KubeApi namespace: string - resource: KubernetesServerResource + resource: KubernetesServerResource log: LogEntry resourceVersion?: number } -interface ObjHandler { - (params: StatusHandlerParams): Promise +interface StatusHandler { + (params: StatusHandlerParams): Promise> } const pvcPhaseMap: { [key: string]: ServiceState } = { @@ -62,36 +62,33 @@ const pvcPhaseMap: { [key: string]: ServiceState } = { // Handlers to check the rollout status for K8s objects where that applies. // Using https://github.com/kubernetes/helm/blob/master/pkg/kube/wait.go as a reference here. -const objHandlers: { [kind: string]: ObjHandler } = { +const objHandlers: { [kind: string]: StatusHandler } = { DaemonSet: checkWorkloadStatus, Deployment: checkWorkloadStatus, StatefulSet: checkWorkloadStatus, - PersistentVolumeClaim: async ({ resource }) => { + PersistentVolumeClaim: async ({ resource }: StatusHandlerParams) => { const pvc = >resource const state: ServiceState = pvcPhaseMap[pvc.status.phase!] || "unknown" return { state, resource } }, - Pod: async ({ resource }) => { + Pod: async ({ resource }: StatusHandlerParams) => { return checkWorkloadPodStatus(resource, [>resource]) }, - ReplicaSet: async ({ api, namespace, resource }) => { + ReplicaSet: async ({ api, namespace, resource }: StatusHandlerParams) => { return checkWorkloadPodStatus( resource, await getPods(api, namespace, (>resource).spec.selector!.matchLabels!) ) }, - ReplicationController: async ({ api, namespace, resource }) => { - return checkWorkloadPodStatus( - resource, - await getPods(api, namespace, (>resource).spec.selector) - ) + ReplicationController: async ({ api, namespace, resource }: StatusHandlerParams) => { + return checkWorkloadPodStatus(resource, await getPods(api, namespace, resource.spec!.selector!)) }, - Service: async ({ resource }) => { + Service: async ({ resource }: StatusHandlerParams) => { if (resource.spec.type === "ExternalName") { return { state: "ready", resource } } @@ -165,19 +162,32 @@ interface WaitParams { namespace: string ctx: PluginContext provider: KubernetesProvider - serviceName: string + serviceName?: string resources: KubernetesResource[] log: LogEntry + timeoutSec?: number } /** * Wait until the rollout is complete for each of the given Kubernetes objects */ -export async function waitForResources({ namespace, ctx, provider, serviceName, resources, log }: WaitParams) { +export async function waitForResources({ + namespace, + ctx, + provider, + serviceName, + resources, + log, + timeoutSec, +}: WaitParams) { let loops = 0 let lastMessage: string | undefined const startTime = new Date().getTime() + if (!timeoutSec) { + timeoutSec = KUBECTL_DEFAULT_TIMEOUT + } + const statusLine = log.info({ symbol: "info", section: serviceName, @@ -195,11 +205,12 @@ export async function waitForResources({ namespace, ctx, provider, serviceName, for (const status of statuses) { const resource = status.resource + const statusMessage = `${resource.kind} ${resource.metadata.name} is "${status.state}"` - log.debug(`Status of ${resource.kind} ${resource.metadata.name} is "${status.state}"`) + log.debug(`Status of ${statusMessage}`) if (status.state === "unhealthy") { - let msg = `Error deploying ${serviceName}: ${status.lastMessage}` + let msg = `Error deploying ${serviceName || "resources"}: ${status.lastMessage || statusMessage}` if (status.logs) { msg += "\n\n" + status.logs @@ -221,20 +232,23 @@ export async function waitForResources({ namespace, ctx, provider, serviceName, } } - if (combineStates(statuses.map((s) => s.state)) === "ready") { + const combinedStates = combineStates(statuses.map((s) => s.state)) + + // Note: "stopped" is a normal state for Pods, which run to completion + if (combinedStates === "ready" || combinedStates === "stopped") { break } const now = new Date().getTime() - if (now - startTime > KUBECTL_DEFAULT_TIMEOUT * 1000) { - throw new DeploymentError(`Timed out waiting for ${serviceName} to deploy`, { statuses }) + if (now - startTime > timeoutSec * 1000) { + throw new DeploymentError(`Timed out waiting for ${serviceName || "resources"} to deploy`, { statuses }) } } statusLine.setState({ symbol: "info", section: serviceName, msg: `Resources ready` }) - return statuses.map((s) => s.resource) + return statuses } interface ComparisonResult { diff --git a/core/src/plugins/kubernetes/status/workload.ts b/core/src/plugins/kubernetes/status/workload.ts index 6057f3b3e3..aeefb4934d 100644 --- a/core/src/plugins/kubernetes/status/workload.ts +++ b/core/src/plugins/kubernetes/status/workload.ts @@ -82,8 +82,8 @@ export async function checkWorkloadStatus({ api, namespace, resource }: StatusHa } // Attach pod logs for debug output - const podNames = (await getPods()).map((pod) => pod.metadata.name) - const podLogs = (await getFormattedPodLogs(api, namespace, podNames)) || undefined + const pods = await getPods() + const podLogs = (await getFormattedPodLogs(api, namespace, pods)) || undefined if (podLogs) { logs += chalk.white("\n\n━━━ Pod logs ━━━\n") diff --git a/core/src/plugins/kubernetes/types.ts b/core/src/plugins/kubernetes/types.ts index 54f942dd5f..fa4cbc89b4 100644 --- a/core/src/plugins/kubernetes/types.ts +++ b/core/src/plugins/kubernetes/types.ts @@ -25,6 +25,7 @@ export interface BaseResource { metadata: Partial & { name: string } + [key: string]: any } // Because the Kubernetes API library types currently list all keys as optional, we use this type to wrap the @@ -37,8 +38,6 @@ export type KubernetesResource & { name: string } - // We add this here for convenience because it's so frequently checked on untyped resources - spec?: any } & Omit & // Make sure these are required if they're on the provided type { @@ -77,8 +76,4 @@ export type KubernetesReplicaSet = KubernetesResource export type KubernetesStatefulSet = KubernetesResource export type KubernetesPod = KubernetesResource -export type KubernetesWorkload = - | KubernetesResource - | KubernetesResource - | KubernetesResource - | KubernetesResource +export type KubernetesWorkload = KubernetesResource diff --git a/core/src/plugins/kubernetes/util.ts b/core/src/plugins/kubernetes/util.ts index 886732b65d..f0fb917495 100644 --- a/core/src/plugins/kubernetes/util.ts +++ b/core/src/plugins/kubernetes/util.ts @@ -20,14 +20,13 @@ import { gardenAnnotationKey, base64, deline, stableStringify } from "../../util import { MAX_CONFIGMAP_DATA_SIZE, dockerAuthSecretName, dockerAuthSecretKey } from "./constants" import { ContainerEnvVars } from "../container/config" import { ConfigurationError, PluginError } from "../../exceptions" -import { KubernetesProvider, ServiceResourceSpec } from "./config" +import { ServiceResourceSpec } from "./config" import { LogEntry } from "../../logger/log-entry" import { PluginContext } from "../../plugin-context" import { HelmModule } from "./helm/config" import { KubernetesModule } from "./kubernetes-module/config" import { getChartPath, renderHelmTemplateString } from "./helm/common" import { HotReloadableResource } from "./hot-reload" -import { getSystemNamespace } from "./namespace" export const skopeoImage = "gardendev/skopeo:1.41.0-1" @@ -140,7 +139,7 @@ export async function getWorkloadPods(api: KubeApi, namespace: string, resource: selectorString // labelSelector ) - const replicaSets = replicaSetRes.items.filter((r) => r.spec.replicas > 0) + const replicaSets = replicaSetRes.items.filter((r) => (r.spec.replicas || 0) > 0) if (replicaSets.length === 0) { return [] @@ -364,7 +363,7 @@ export function prepareEnvVars(env: ContainerEnvVars): V1EnvVar[] { * * @param manifest any Kubernetes manifest */ -export function convertDeprecatedManifestVersion(manifest: KubernetesResource): KubernetesResource { +export function convertDeprecatedManifestVersion(manifest: KubernetesWorkload): KubernetesWorkload { const { apiVersion, kind } = manifest if (workloadTypes.includes(kind)) { @@ -386,7 +385,7 @@ export function convertDeprecatedManifestVersion(manifest: KubernetesResource): if (manifest.spec && !manifest.spec.selector) { manifest.spec.selector = { // This resolves to an empty object if both of these are (for whatever reason) undefined - ...{ matchLabels: manifest.spec.template.metadata.labels || manifest.metadata.labels }, + ...{ matchLabels: manifest.spec.template?.metadata?.labels || manifest.metadata.labels }, } } } @@ -394,26 +393,26 @@ export function convertDeprecatedManifestVersion(manifest: KubernetesResource): return manifest } -export async function getDeploymentPodName( - deploymentName: string, - ctx: PluginContext, - provider: KubernetesProvider, - log: LogEntry -) { - const api = await KubeApi.factory(log, ctx, provider) - const systemNamespace = await getSystemNamespace(ctx, provider, log) - - const status = await api.apps.readNamespacedDeployment(deploymentName, systemNamespace) - const pods = await getPods(api, systemNamespace, status.spec.selector.matchLabels) +export async function getDeploymentPod({ + api, + deploymentName, + namespace, +}: { + api: KubeApi + deploymentName: string + namespace: string +}) { + const status = await api.apps.readNamespacedDeployment(deploymentName, namespace) + const pods = await getPods(api, namespace, status.spec.selector?.matchLabels || {}) const pod = sample(pods) if (!pod) { throw new PluginError(`Could not a running pod in a deployment: ${deploymentName}`, { deploymentName, - systemNamespace, + namespace, }) } - return pod.metadata.name + return pod } export function getStaticLabelsFromPod(pod: KubernetesPod): { [key: string]: string } { @@ -556,7 +555,7 @@ export function getResourceContainer(resource: HotReloadableResource, containerN const kind = resource.kind const name = resource.metadata.name - const containers = resource.spec.template.spec.containers || [] + const containers = resource.spec.template.spec?.containers || [] if (containers.length === 0) { throw new ConfigurationError(`${kind} ${resource.metadata.name} has no containers configured.`, { resource }) diff --git a/core/src/plugins/openfaas/openfaas.ts b/core/src/plugins/openfaas/openfaas.ts index aed7865ed6..f7721063c3 100644 --- a/core/src/plugins/openfaas/openfaas.ts +++ b/core/src/plugins/openfaas/openfaas.ts @@ -17,7 +17,7 @@ import { waitForResources } from "../kubernetes/status/status" import { checkWorkloadStatus } from "../kubernetes/status/workload" import { createGardenPlugin } from "../../types/plugin/plugin" import { faasCliSpec } from "./faas-cli" -import { getAllLogs } from "../kubernetes/logs" +import { streamK8sLogs } from "../kubernetes/logs" import { DeployServiceParams } from "../../types/plugin/service/deployService" import { GetServiceStatusParams } from "../../types/plugin/service/getServiceStatus" import { GetServiceLogsParams } from "../../types/plugin/service/getServiceLogs" @@ -253,7 +253,7 @@ async function getServiceLogs(params: GetServiceLogsParams) { const api = await KubeApi.factory(log, ctx, k8sProvider) const resources = await getResources(api, service, namespace) - return getAllLogs({ ...params, provider: k8sProvider, defaultNamespace: namespace, resources }) + return streamK8sLogs({ ...params, provider: k8sProvider, defaultNamespace: namespace, resources }) } const faasNetesInitTimeout = 10000 @@ -398,8 +398,8 @@ async function getServiceStatus({ } } - const container: any = findByName(deployment.spec.template.spec.containers, service.name) - const envVersion = findByName(container.env, "GARDEN_VERSION") + const container = findByName(deployment.spec.template?.spec?.containers || [], service.name) + const envVersion = findByName(container?.env || [], "GARDEN_VERSION") const version = envVersion ? envVersion.value : undefined const resourceVersion = parseInt(deployment.metadata.resourceVersion!, 10) const status = await checkWorkloadStatus({ diff --git a/core/src/types/plugin/base.ts b/core/src/types/plugin/base.ts index 9e5fd2ad62..28806a6041 100644 --- a/core/src/types/plugin/base.ts +++ b/core/src/types/plugin/base.ts @@ -77,6 +77,7 @@ export const runBaseParams = { .description("If set, how long to run the command before timing out."), } +// TODO: update this schema in 0.13 export interface RunResult { // FIXME: this field can always be inferred moduleName: string @@ -85,6 +86,7 @@ export interface RunResult { // FIXME: this field can always be inferred version: string success: boolean + exitCode?: number // FIXME: we should avoid native Date objects startedAt: Date completedAt: Date @@ -107,6 +109,10 @@ export const runResultSchema = () => .boolean() .required() .description("Whether the module was successfully run."), + exitCode: joi + .number() + .integer() + .description("The exit code of the run (if applicable)."), startedAt: joi .date() .required() diff --git a/core/src/util/ext-tools.ts b/core/src/util/ext-tools.ts index fad8794eb7..bee77c54a3 100644 --- a/core/src/util/ext-tools.ts +++ b/core/src/util/ext-tools.ts @@ -190,7 +190,7 @@ export class PluginTool { log.debug(`Spawning '${path} ${args.join(" ")}' in ${cwd}`) return spawn(path, args || [], { cwd, - timeout: timeoutSec, + timeoutSec, ignoreError, env, rawMode, diff --git a/core/src/util/util.ts b/core/src/util/util.ts index d5e35092fb..4f6ab8c54a 100644 --- a/core/src/util/util.ts +++ b/core/src/util/util.ts @@ -22,7 +22,7 @@ import chalk from "chalk" import { safeDump, safeLoad, DumpOptions } from "js-yaml" import { createHash } from "crypto" import { tailString, dedent } from "./string" -import { Writable } from "stream" +import { Writable, Readable } from "stream" import { LogEntry } from "../logger/log-entry" import execa = require("execa") import { v4 } from "uuid" @@ -191,7 +191,7 @@ export async function exec(cmd: string, args: string[], opts: ExecOpts = {}) { } export interface SpawnOpts { - timeout?: number + timeoutSec?: number cwd?: string data?: Buffer ignoreError?: boolean @@ -213,7 +213,18 @@ export interface SpawnOutput { // TODO Dump output to a log file if it exceeds the MAX_BUFFER_SIZE export function spawn(cmd: string, args: string[], opts: SpawnOpts = {}) { - const { timeout = 0, cwd, data, ignoreError = false, env, rawMode = true, stdout, stderr, tty, wait = true } = opts + const { + timeoutSec: timeout = 0, + cwd, + data, + ignoreError = false, + env, + rawMode = true, + stdout, + stderr, + tty, + wait = true, + } = opts const stdio = tty ? "inherit" : "pipe" const proc = _spawn(cmd, args, { cwd, env, stdio }) @@ -642,3 +653,44 @@ export async function runScript(log: LogEntry, cwd: string, script: string) { await proc } + +export async function streamToString(stream: Readable) { + const chunks: Buffer[] = [] + return new Promise((resolve, reject) => { + stream.on("data", (chunk) => chunks.push(Buffer.from(chunk))) + stream.on("error", reject) + stream.on("end", () => resolve(Buffer.concat(chunks).toString("utf8"))) + }) +} + +/** + * A writable stream that collects all data written, and features a `getString()` method. + */ +export class StringCollector extends Writable { + private chunks: Buffer[] + private error: Error + + constructor() { + super() + + this.chunks = [] + + this.on("data", (chunk) => this.chunks.push(Buffer.from(chunk))) + this.on("error", (err) => { + this.error = err + }) + } + + // tslint:disable-next-line: function-name + _write(chunk: Buffer, _: string, callback: () => void) { + this.chunks.push(Buffer.from(chunk)) + callback() + } + + getString() { + if (this.error) { + throw this.error + } + return Buffer.concat(this.chunks).toString("utf8") + } +} diff --git a/core/test/integ/src/plugins/kubernetes/api.ts b/core/test/integ/src/plugins/kubernetes/api.ts index f9e2f9f90b..c329afa506 100644 --- a/core/test/integ/src/plugins/kubernetes/api.ts +++ b/core/test/integ/src/plugins/kubernetes/api.ts @@ -12,32 +12,58 @@ import { KubernetesConfig } from "../../../../../src/plugins/kubernetes/config" import { KubeApi } from "../../../../../src/plugins/kubernetes/api" import { getDataDir, makeTestGarden } from "../../../../helpers" import { getAppNamespace } from "../../../../../src/plugins/kubernetes/namespace" -import { randomString } from "../../../../../src/util/string" +import { randomString, dedent, gardenAnnotationKey } from "../../../../../src/util/string" import { V1ConfigMap } from "@kubernetes/client-node" -import { KubernetesResource } from "../../../../../src/plugins/kubernetes/types" +import { KubernetesResource, KubernetesPod } from "../../../../../src/plugins/kubernetes/types" import { expect } from "chai" +import { waitForResources } from "../../../../../src/plugins/kubernetes/status/status" +import { PluginContext } from "../../../../../src/plugin-context" +import { StringCollector } from "../../../../../src/util/util" describe("KubeApi", () => { let garden: Garden + let ctx: PluginContext let provider: Provider let api: KubeApi + let namespace: string + + const containerName = "main" before(async () => { const root = getDataDir("test-projects", "container") garden = await makeTestGarden(root) provider = (await garden.resolveProvider(garden.log, "local-kubernetes")) as Provider - const ctx = await garden.getPluginContext(provider) + ctx = await garden.getPluginContext(provider) api = await KubeApi.factory(garden.log, ctx, provider) + namespace = await getAppNamespace(ctx, garden.log, provider) }) after(async () => { await garden.close() }) + function makePod(command: string[], image = "busybox"): KubernetesPod { + return { + apiVersion: "v1", + kind: "Pod", + metadata: { + name: "api-test-" + randomString(8), + namespace, + }, + spec: { + containers: [ + { + name: containerName, + image, + command, + }, + ], + }, + } + } + describe("replace", () => { it("should replace an existing resource in the cluster", async () => { - const ctx = await garden.getPluginContext(provider) - const namespace = await getAppNamespace(ctx, garden.log, provider) const name = randomString() const configMap: KubernetesResource = { @@ -65,4 +91,213 @@ describe("KubeApi", () => { } }) }) + + describe("execInPod", () => { + it("should exec a command in a Pod and return the output", async () => { + const pod = makePod(["/bin/sh", "-c", "sleep 600"]) + const podName = pod.metadata.name + + await api.createPod(namespace, pod) + await waitForResources({ namespace, ctx, provider, serviceName: "exec-test", resources: [pod], log: garden.log }) + + try { + const res = await api.execInPod({ + namespace, + podName, + containerName, + command: ["/bin/sh", "-c", "echo some output"], + tty: false, + }) + expect(res.stdout).to.equal("some output\n") + expect(res.stderr).to.equal("") + expect(res.exitCode).to.equal(0) + expect(res.timedOut).to.be.false + } finally { + await api.core.deleteNamespacedPod(podName, namespace) + } + }) + + it("should correctly return an error exit code", async () => { + const pod = makePod(["/bin/sh", "-c", "sleep 600"]) + const podName = pod.metadata.name + + await api.createPod(namespace, pod) + await waitForResources({ namespace, ctx, provider, serviceName: "exec-test", resources: [pod], log: garden.log }) + + try { + const res = await api.execInPod({ + namespace, + podName, + containerName, + command: ["/bin/sh", "-c", "exit 2"], + tty: false, + }) + expect(res.stdout).to.equal("") + expect(res.stderr).to.equal("") + expect(res.exitCode).to.equal(2) + expect(res.timedOut).to.be.false + } finally { + await api.core.deleteNamespacedPod(podName, namespace) + } + }) + + it("should optionally time out", async () => { + const pod = makePod(["/bin/sh", "-c", "sleep 600"]) + const podName = pod.metadata.name + + await api.createPod(namespace, pod) + await waitForResources({ namespace, ctx, provider, serviceName: "exec-test", resources: [pod], log: garden.log }) + + try { + const res = await api.execInPod({ + namespace, + podName, + containerName: "main", + command: ["/bin/sh", "-c", "echo foo && sleep 100"], + tty: false, + timeoutSec: 2, + }) + expect(res.stdout).to.equal("foo\n") + expect(res.stderr).to.equal("") + expect(res.exitCode).to.be.undefined + expect(res.timedOut).to.be.true + } finally { + await api.core.deleteNamespacedPod(podName, namespace) + } + }) + }) + + describe("attachToPod", () => { + it("should attach to a running Pod and stream the output", async () => { + const pod = makePod([ + "/bin/sh", + "-c", + dedent` + for i in 1 2 3 4 5 + do + echo "Log line $i" + sleep 1 + done + `, + ]) + const podName = pod.metadata.name + + await api.createPod(namespace, pod) + await waitForResources({ namespace, ctx, provider, serviceName: "exec-test", resources: [pod], log: garden.log }) + + const stdout = new StringCollector() + + try { + const ws = await api.attachToPod({ + namespace, + podName, + containerName, + stdout, + tty: false, + }) + + await new Promise((resolve, reject) => { + ws.onerror = ({ error }) => { + reject(error) + } + + ws.onclose = () => { + resolve() + } + }) + + const output = stdout.getString() + expect(output).to.include("Log line") + } finally { + await api.core.deleteNamespacedPod(podName, namespace) + } + }) + }) + + describe("listResources", () => { + it("should list all resources of specified kind", async () => { + const name = randomString() + + const configMap: KubernetesResource = { + apiVersion: "v1", + kind: "ConfigMap", + metadata: { + name, + namespace, + }, + data: { + something: "whatever", + }, + } + + await api.core.createNamespacedConfigMap(namespace, configMap) + + try { + const list = await api.listResources({ + log: garden.log, + apiVersion: "v1", + kind: "ConfigMap", + namespace, + }) + expect(list.kind).to.equal("ConfigMapList") + expect(list.items.find((r) => r.metadata.name === name)).to.exist + } finally { + await api.deleteBySpec({ namespace, manifest: configMap, log: garden.log }) + } + }) + + it("should list resources with a label selector", async () => { + const nameA = randomString() + const nameB = randomString() + const serviceName = randomString() + + const labels = { + [gardenAnnotationKey("service")]: serviceName, + } + + const configMapA: KubernetesResource = { + apiVersion: "v1", + kind: "ConfigMap", + metadata: { + name: nameA, + namespace, + labels, + }, + data: { + something: "whatever", + }, + } + const configMapB: KubernetesResource = { + apiVersion: "v1", + kind: "ConfigMap", + metadata: { + name: nameB, + namespace, + // No labels on this one + }, + data: { + something: "whatever", + }, + } + + await api.core.createNamespacedConfigMap(namespace, configMapA) + await api.core.createNamespacedConfigMap(namespace, configMapB) + + try { + const list = await api.listResources({ + log: garden.log, + apiVersion: "v1", + kind: "ConfigMap", + namespace, + labelSelector: labels, + }) + expect(list.kind).to.equal("ConfigMapList") + expect(list.items.length).to.equal(1) + expect(list.items.find((r) => r.metadata.name === nameA)).to.exist + } finally { + await api.deleteBySpec({ namespace, manifest: configMapA, log: garden.log }) + await api.deleteBySpec({ namespace, manifest: configMapB, log: garden.log }) + } + }) + }) }) diff --git a/core/test/integ/src/plugins/kubernetes/commands/pull-image.ts b/core/test/integ/src/plugins/kubernetes/commands/pull-image.ts index ef645902cb..1d02664909 100644 --- a/core/test/integ/src/plugins/kubernetes/commands/pull-image.ts +++ b/core/test/integ/src/plugins/kubernetes/commands/pull-image.ts @@ -56,7 +56,7 @@ describe("pull-image plugin command", () => { before(async () => { await init("cluster-docker-remote-registry") - module = await graph.getModule("remote-registry-test") + module = graph.getModule("remote-registry-test") // build the image await garden.buildDir.syncFromSrc(module, garden.log) @@ -80,7 +80,7 @@ describe("pull-image plugin command", () => { before(async () => { await init("cluster-docker") - module = await graph.getModule("simple-service") + module = graph.getModule("simple-service") // build the image await garden.buildDir.syncFromSrc(module, garden.log) diff --git a/core/test/integ/src/plugins/kubernetes/container/build.ts b/core/test/integ/src/plugins/kubernetes/container/build.ts index 53c415c8ba..4746e7d546 100644 --- a/core/test/integ/src/plugins/kubernetes/container/build.ts +++ b/core/test/integ/src/plugins/kubernetes/container/build.ts @@ -12,24 +12,23 @@ import { ConfigGraph } from "../../../../../../src/config-graph" import { k8sBuildContainer, k8sGetContainerBuildStatus, - execInPod, + getDockerDaemonPodRunner, } from "../../../../../../src/plugins/kubernetes/container/build" -import { getDeploymentPodName } from "../../../../../../src/plugins/kubernetes/util" import { PluginContext } from "../../../../../../src/plugin-context" import { KubernetesProvider } from "../../../../../../src/plugins/kubernetes/config" import { expect } from "chai" import { getContainerTestGarden } from "./container" import { containerHelpers } from "../../../../../../src/plugins/container/helpers" -import { - dockerDaemonDeploymentName, - dockerDaemonContainerName, -} from "../../../../../../src/plugins/kubernetes/constants" +import { dockerDaemonContainerName } from "../../../../../../src/plugins/kubernetes/constants" +import { KubeApi } from "../../../../../../src/plugins/kubernetes/api" +import { getSystemNamespace } from "../../../../../../src/plugins/kubernetes/namespace" describe("kubernetes build flow", () => { let garden: Garden let graph: ConfigGraph let provider: KubernetesProvider let ctx: PluginContext + let systemNamespace: string after(async () => { if (garden) { @@ -42,6 +41,7 @@ describe("kubernetes build flow", () => { graph = await garden.getConfigGraph(garden.log) provider = await garden.resolveProvider(garden.log, "local-kubernetes") ctx = await garden.getPluginContext(provider) + systemNamespace = await getSystemNamespace(ctx, provider, garden.log) } context("local mode", () => { @@ -170,10 +170,16 @@ describe("kubernetes build flow", () => { // Clear the image tag from the in-cluster builder const remoteId = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) - const podName = await getDeploymentPodName(dockerDaemonDeploymentName, ctx, provider, garden.log) - const containerName = dockerDaemonContainerName - const args = ["docker", "rmi", remoteId] - await execInPod({ ctx, provider, log: garden.log, args, timeout: 300, podName, containerName }) + const api = await KubeApi.factory(garden.log, ctx, provider) + + const runner = await getDockerDaemonPodRunner({ api, systemNamespace, ctx, provider }) + + await runner.exec({ + log: garden.log, + command: ["docker", "rmi", remoteId], + timeoutSec: 300, + containerName: dockerDaemonContainerName, + }) // This should still report the build as ready, because it's in the registry const status = await k8sGetContainerBuildStatus({ @@ -229,10 +235,16 @@ describe("kubernetes build flow", () => { // Clear the image tag from the in-cluster builder const remoteId = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) - const podName = await getDeploymentPodName(dockerDaemonDeploymentName, ctx, provider, garden.log) - const containerName = dockerDaemonContainerName - const args = ["docker", "rmi", remoteId] - await execInPod({ ctx, provider, log: garden.log, args, timeout: 300, podName, containerName }) + const api = await KubeApi.factory(garden.log, ctx, provider) + + const runner = await getDockerDaemonPodRunner({ api, systemNamespace, ctx, provider }) + + await runner.exec({ + log: garden.log, + command: ["docker", "rmi", remoteId], + timeoutSec: 300, + containerName: dockerDaemonContainerName, + }) // This should still report the build as ready, because it's in the registry const status = await k8sGetContainerBuildStatus({ diff --git a/core/test/integ/src/plugins/kubernetes/container/container.ts b/core/test/integ/src/plugins/kubernetes/container/container.ts index 15f59e1f14..e1f33c6b13 100644 --- a/core/test/integ/src/plugins/kubernetes/container/container.ts +++ b/core/test/integ/src/plugins/kubernetes/container/container.ts @@ -6,7 +6,6 @@ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -import tmp from "tmp-promise" import { getDataDir, makeTestGarden, expectError } from "../../../../../helpers" import { TestTask } from "../../../../../../src/tasks/test" import { emptyDir, pathExists } from "fs-extra" @@ -16,13 +15,10 @@ import { Garden } from "../../../../../../src/garden" import { ConfigGraph } from "../../../../../../src/config-graph" import { findByName } from "../../../../../../src/util/util" import { deline } from "../../../../../../src/util/string" -import { runAndCopy } from "../../../../../../src/plugins/kubernetes/run" -import { containerHelpers } from "../../../../../../src/plugins/container/helpers" import { runContainerService } from "../../../../../../src/plugins/kubernetes/container/run" import { prepareRuntimeContext } from "../../../../../../src/runtime-context" import { KubeApi } from "../../../../../../src/plugins/kubernetes/api" import { KubernetesProvider } from "../../../../../../src/plugins/kubernetes/config" -import { makePodName } from "../../../../../../src/plugins/kubernetes/util" import { decryptSecretFile } from "../../../../helpers" import { GARDEN_SERVICE_ROOT } from "../../../../../../src/constants" import { KubernetesResource } from "../../../../../../src/plugins/kubernetes/types" @@ -103,12 +99,10 @@ describe("kubernetes container module handlers", () => { let garden: Garden let graph: ConfigGraph let provider: KubernetesProvider - let namespace: string before(async () => { garden = await makeTestGarden(root) provider = await garden.resolveProvider(garden.log, "local-kubernetes") - namespace = provider.config.namespace! }) beforeEach(async () => { @@ -119,366 +113,6 @@ describe("kubernetes container module handlers", () => { await garden.close() }) - describe("runAndCopy", () => { - let tmpDir: tmp.DirectoryResult - - beforeEach(async () => { - tmpDir = await tmp.dir({ unsafeCleanup: true }) - }) - - afterEach(async () => { - await tmpDir.cleanup() - }) - - it("should run a basic module", async () => { - const module = graph.getModule("simple") - const image = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) - - const result = await runAndCopy({ - ctx: await garden.getPluginContext(provider), - log: garden.log, - command: ["sh", "-c", "echo ok"], - args: [], - interactive: false, - module, - namespace, - runtimeContext: { envVars: {}, dependencies: [] }, - image, - }) - - expect(result.log.trim()).to.equal("ok") - }) - - it("should clean up the created container", async () => { - const module = graph.getModule("simple") - const image = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) - const podName = makePodName("test", module.name) - - await runAndCopy({ - ctx: await garden.getPluginContext(provider), - log: garden.log, - command: ["sh", "-c", "echo ok"], - args: [], - interactive: false, - module, - namespace: provider.config.namespace!, - podName, - runtimeContext: { envVars: {}, dependencies: [] }, - image, - }) - - const api = await KubeApi.factory(garden.log, await garden.getPluginContext(provider), provider) - - await expectError( - () => api.core.readNamespacedPod(podName, namespace), - (err) => expect(err.statusCode).to.equal(404) - ) - }) - - it("should return with success=false when command exceeds timeout", async () => { - const task = graph.getTask("artifacts-task") - const module = task.module - const image = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) - - const result = await runAndCopy({ - ctx: await garden.getPluginContext(provider), - log: garden.log, - command: ["sh", "-c", "echo banana && sleep 10"], - args: [], - interactive: false, - module, - namespace, - runtimeContext: { envVars: {}, dependencies: [] }, - image, - timeout: 4, - }) - - // Note: Kubernetes doesn't always return the logs when commands time out. - expect(result.log.trim()).to.include("Command timed out.") - expect(result.success).to.be.false - }) - - context("artifacts are specified", () => { - it("should copy artifacts out of the container", async () => { - const task = graph.getTask("artifacts-task") - const module = task.module - const image = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) - - const result = await runAndCopy({ - ctx: await garden.getPluginContext(provider), - log: garden.log, - command: task.spec.command, - args: [], - interactive: false, - module, - namespace, - runtimeContext: { envVars: {}, dependencies: [] }, - artifacts: task.spec.artifacts, - artifactsPath: tmpDir.path, - image, - }) - - expect(result.log.trim()).to.equal("ok") - expect(await pathExists(join(tmpDir.path, "task.txt"))).to.be.true - expect(await pathExists(join(tmpDir.path, "subdir", "task.txt"))).to.be.true - }) - - it("should clean up the created Pod", async () => { - const task = graph.getTask("artifacts-task") - const module = task.module - const image = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) - const podName = makePodName("test", module.name) - - await runAndCopy({ - ctx: await garden.getPluginContext(provider), - log: garden.log, - command: task.spec.command, - args: [], - interactive: false, - module, - namespace, - podName, - runtimeContext: { envVars: {}, dependencies: [] }, - artifacts: task.spec.artifacts, - artifactsPath: tmpDir.path, - image, - }) - - const api = await KubeApi.factory(garden.log, await garden.getPluginContext(provider), provider) - - await expectError( - () => api.core.readNamespacedPod(podName, namespace), - (err) => expect(err.statusCode).to.equal(404) - ) - }) - - it("should handle globs when copying artifacts out of the container", async () => { - const task = graph.getTask("globs-task") - const module = task.module - const image = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) - - await runAndCopy({ - ctx: await garden.getPluginContext(provider), - log: garden.log, - command: task.spec.command, - args: [], - interactive: false, - module, - namespace, - runtimeContext: { envVars: {}, dependencies: [] }, - artifacts: task.spec.artifacts, - artifactsPath: tmpDir.path, - image, - }) - - expect(await pathExists(join(tmpDir.path, "subdir", "task.txt"))).to.be.true - expect(await pathExists(join(tmpDir.path, "output.txt"))).to.be.true - }) - - it("should not throw when an artifact is missing", async () => { - const task = graph.getTask("artifacts-task") - const module = task.module - const image = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) - - await runAndCopy({ - ctx: await garden.getPluginContext(provider), - log: garden.log, - command: ["sh", "-c", "echo ok"], - args: [], - interactive: false, - module, - namespace, - runtimeContext: { envVars: {}, dependencies: [] }, - artifacts: task.spec.artifacts, - artifactsPath: tmpDir.path, - image, - }) - }) - - it("should correctly copy a whole directory", async () => { - const task = graph.getTask("dir-task") - const module = task.module - const image = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) - - await runAndCopy({ - ctx: await garden.getPluginContext(provider), - log: garden.log, - command: task.spec.command, - args: [], - interactive: false, - module, - namespace, - runtimeContext: { envVars: {}, dependencies: [] }, - artifacts: task.spec.artifacts, - artifactsPath: tmpDir.path, - image, - }) - - expect(await pathExists(join(tmpDir.path, "my-task-report"))).to.be.true - expect(await pathExists(join(tmpDir.path, "my-task-report", "output.txt"))).to.be.true - }) - - it("should return with logs and success=false when command exceeds timeout", async () => { - const task = graph.getTask("artifacts-task") - const module = task.module - const image = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) - - const result = await runAndCopy({ - ctx: await garden.getPluginContext(provider), - log: garden.log, - command: ["sh", "-c", "echo banana && sleep 10"], - args: [], - interactive: false, - module, - namespace, - runtimeContext: { envVars: {}, dependencies: [] }, - artifacts: task.spec.artifacts, - artifactsPath: tmpDir.path, - image, - timeout: 3, - }) - - expect(result.log.trim()).to.equal("Command timed out. Here are the logs until the timeout occurred:\n\nbanana") - expect(result.success).to.be.false - }) - - it("should copy artifacts out of the container even when task times out", async () => { - const task = graph.getTask("artifacts-task") - const module = task.module - const image = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) - - const result = await runAndCopy({ - ctx: await garden.getPluginContext(provider), - log: garden.log, - command: ["sh", "-c", "touch /task.txt && sleep 10"], - args: [], - interactive: false, - module, - namespace, - runtimeContext: { envVars: {}, dependencies: [] }, - artifacts: task.spec.artifacts, - artifactsPath: tmpDir.path, - image, - timeout: 3, - }) - - expect(result.log.trim()).to.equal("Command timed out.") - expect(await pathExists(join(tmpDir.path, "task.txt"))).to.be.true - expect(result.success).to.be.false - }) - - it("should throw when container doesn't contain sh", async () => { - const task = graph.getTask("missing-sh-task") - const module = task.module - const image = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) - - const actions = await garden.getActionRouter() - await garden.buildDir.syncFromSrc(module, garden.log) - await actions.build({ - module, - log: garden.log, - }) - - await expectError( - async () => - runAndCopy({ - ctx: await garden.getPluginContext(provider), - log: garden.log, - command: ["sh", "-c", "echo ok"], - args: [], - interactive: false, - module, - namespace, - runtimeContext: { envVars: {}, dependencies: [] }, - artifacts: task.spec.artifacts, - artifactsPath: tmpDir.path, - description: "Foo", - image, - timeout: 20000, - stdout: process.stdout, - stderr: process.stderr, - }), - (err) => - expect(err.message).to.equal(deline` - Foo specifies artifacts to export, but the image doesn't - contain the sh binary. In order to copy artifacts out of Kubernetes containers, both sh and tar need - to be installed in the image. - `) - ) - }) - - it("should throw when container doesn't contain tar", async () => { - const task = graph.getTask("missing-tar-task") - const module = task.module - const image = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) - - const actions = await garden.getActionRouter() - await garden.buildDir.syncFromSrc(module, garden.log) - await actions.build({ - module, - log: garden.log, - }) - - await expectError( - async () => - runAndCopy({ - ctx: await garden.getPluginContext(provider), - log: garden.log, - command: ["sh", "-c", "echo ok"], - args: [], - interactive: false, - module, - namespace, - runtimeContext: { envVars: {}, dependencies: [] }, - artifacts: task.spec.artifacts, - artifactsPath: tmpDir.path, - description: "Foo", - image, - timeout: 20000, - stdout: process.stdout, - stderr: process.stderr, - }), - (err) => - expect(err.message).to.equal(deline` - Foo specifies artifacts to export, but the image doesn't - contain the tar binary. In order to copy artifacts out of Kubernetes containers, both sh and tar need - to be installed in the image. - `) - ) - }) - - it("should throw when no command is specified", async () => { - const task = graph.getTask("missing-tar-task") - const module = task.module - const image = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) - - await expectError( - async () => - runAndCopy({ - ctx: await garden.getPluginContext(provider), - log: garden.log, - args: [], - interactive: false, - module, - namespace, - runtimeContext: { envVars: {}, dependencies: [] }, - artifacts: task.spec.artifacts, - artifactsPath: tmpDir.path, - description: "Foo", - image, - }), - (err) => - expect(err.message).to.equal(deline` - Foo specifies artifacts to export, but doesn't explicitly set a \`command\`. - The kubernetes provider currently requires an explicit command to be set for tests and tasks that - export artifacts, because the image's entrypoint cannot be inferred in that execution mode. - Please set the \`command\` field and try again. - `) - ) - }) - }) - }) - describe("runContainerService", () => { it("should run a service", async () => { const service = graph.getService("echo-service") @@ -583,7 +217,9 @@ describe("kubernetes container module handlers", () => { await expectError( async () => await garden.processTasks([testTask], { throwOnError: true }), - (err) => expect(err.message).to.match(/bork/) + (err) => { + expect(err.message).to.match(/bork/) + } ) const actions = await garden.getActionRouter() @@ -624,7 +260,7 @@ describe("kubernetes container module handlers", () => { }) it("should fail if an error occurs, but copy the artifacts out of the container", async () => { - const module = await graph.getModule("simple") + const module = graph.getModule("simple") const testTask = new TestTask({ garden, diff --git a/core/test/integ/src/plugins/kubernetes/container/deployment.ts b/core/test/integ/src/plugins/kubernetes/container/deployment.ts index ebc2a829cc..e1863d7de5 100644 --- a/core/test/integ/src/plugins/kubernetes/container/deployment.ts +++ b/core/test/integ/src/plugins/kubernetes/container/deployment.ts @@ -178,7 +178,7 @@ describe("kubernetes container deployment handlers", () => { const copiedSecret = await api.core.readNamespacedSecret(secretName, namespace) expect(copiedSecret).to.exist - expect(resource.spec.template.spec.imagePullSecrets).to.eql([{ name: secretName }]) + expect(resource.spec.template?.spec?.imagePullSecrets).to.eql([{ name: secretName }]) }) it("should copy and reference imagePullSecrets with docker credential helper", async () => { @@ -217,7 +217,7 @@ describe("kubernetes container deployment handlers", () => { const copiedSecret = await api.core.readNamespacedSecret(secretName, namespace) expect(copiedSecret).to.exist - expect(resource.spec.template.spec.imagePullSecrets).to.eql([{ name: secretName }]) + expect(resource.spec.template?.spec?.imagePullSecrets).to.eql([{ name: secretName }]) }) it("should correctly mount a referenced PVC module", async () => { @@ -236,10 +236,10 @@ describe("kubernetes container deployment handlers", () => { blueGreen: false, }) - expect(resource.spec.template.spec.volumes).to.eql([ + expect(resource.spec.template?.spec?.volumes).to.eql([ { name: "test", persistentVolumeClaim: { claimName: "volume-module" } }, ]) - expect(resource.spec.template.spec.containers[0].volumeMounts).to.eql([{ name: "test", mountPath: "/volume" }]) + expect(resource.spec.template?.spec?.containers[0].volumeMounts).to.eql([{ name: "test", mountPath: "/volume" }]) }) it("should throw if incompatible module is specified as a volume module", async () => { diff --git a/core/test/integ/src/plugins/kubernetes/container/run.ts b/core/test/integ/src/plugins/kubernetes/container/run.ts index a2c58e3d7a..b77e395adb 100644 --- a/core/test/integ/src/plugins/kubernetes/container/run.ts +++ b/core/test/integ/src/plugins/kubernetes/container/run.ts @@ -161,7 +161,7 @@ describe("runContainerTask", () => { }) it("should fail if an error occurs, but copy the artifacts out of the container", async () => { - const task = await graph.getTask("artifacts-task-fail") + const task = graph.getTask("artifacts-task-fail") const testTask = new TaskTask({ garden, diff --git a/core/test/integ/src/plugins/kubernetes/helm/hot-reload.ts b/core/test/integ/src/plugins/kubernetes/helm/hot-reload.ts index 4a29af7da9..3b739dff27 100644 --- a/core/test/integ/src/plugins/kubernetes/helm/hot-reload.ts +++ b/core/test/integ/src/plugins/kubernetes/helm/hot-reload.ts @@ -124,7 +124,7 @@ describe("configureHotReload", () => { hotReloadSpec, target: hotReloadTarget, }) - const containers: any[] = hotReloadTarget.spec.template.spec.containers + const containers: any[] = hotReloadTarget.spec.template.spec?.containers || [] // This is a second, non-main/resource container included by the Helm chart, which should not mount the sync volume. const secondContainer = containers.find((c) => c.name === "second-container") diff --git a/core/test/integ/src/plugins/kubernetes/run.ts b/core/test/integ/src/plugins/kubernetes/run.ts new file mode 100644 index 0000000000..399e9dae2d --- /dev/null +++ b/core/test/integ/src/plugins/kubernetes/run.ts @@ -0,0 +1,754 @@ +/* + * Copyright (C) 2018-2020 Garden Technologies, Inc. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +import tmp from "tmp-promise" +import { expectError } from "../../../../helpers" +import { pathExists } from "fs-extra" +import { expect } from "chai" +import { join } from "path" +import { Garden } from "../../../../../src/garden" +import { ConfigGraph } from "../../../../../src/config-graph" +import { deline, randomString, dedent } from "../../../../../src/util/string" +import { runAndCopy, PodRunner } from "../../../../../src/plugins/kubernetes/run" +import { containerHelpers } from "../../../../../src/plugins/container/helpers" +import { KubeApi } from "../../../../../src/plugins/kubernetes/api" +import { KubernetesProvider } from "../../../../../src/plugins/kubernetes/config" +import { makePodName } from "../../../../../src/plugins/kubernetes/util" +import { getContainerTestGarden } from "./container/container" +import { KubernetesPod } from "../../../../../src/plugins/kubernetes/types" +import { PluginContext } from "../../../../../src/plugin-context" +import { LogEntry } from "../../../../../src/logger/log-entry" +import { sleep, StringCollector } from "../../../../../src/util/util" + +describe("kubernetes Pod runner functions", () => { + let garden: Garden + let ctx: PluginContext + let graph: ConfigGraph + let provider: KubernetesProvider + let namespace: string + let api: KubeApi + let log: LogEntry + + before(async () => { + garden = await getContainerTestGarden() + provider = await garden.resolveProvider(garden.log, "local-kubernetes") + ctx = await garden.getPluginContext(provider) + namespace = provider.config.namespace! + api = await KubeApi.factory(garden.log, ctx, provider) + log = garden.log + }) + + beforeEach(async () => { + graph = await garden.getConfigGraph(garden.log) + }) + + after(async () => { + await garden.close() + }) + + function makePod(command: string[], image = "busybox"): KubernetesPod { + return { + apiVersion: "v1", + kind: "Pod", + metadata: { + name: "runner-test-" + randomString(8), + namespace, + }, + spec: { + containers: [ + { + name: "main", + image, + command, + }, + ], + }, + } + } + + describe("PodRunner", () => { + let runner: PodRunner + + afterEach(async () => { + if (runner) { + await runner.stop() + } + }) + + describe("start", () => { + it("creates a Pod and waits for it to start", async () => { + const pod = makePod(["sh", "-c", "sleep 600"]) + + runner = new PodRunner({ + ctx, + pod, + namespace, + api, + provider, + }) + + const res = await runner.start({ log }) + expect(res.status.state).to.equal("ready") + }) + + it("throws if the Pod fails to start before timeout", async () => { + const badImage = randomString(16) + const pod = makePod(["foo"], badImage) + + runner = new PodRunner({ + ctx, + pod, + namespace, + api, + provider, + }) + + await expectError(() => runner.start({ log, timeoutSec: 2 })) + }) + }) + + describe("exec", () => { + it("runs the specified command in the Pod", async () => { + const pod = makePod(["sh", "-c", "sleep 600"]) + + runner = new PodRunner({ + ctx, + pod, + namespace, + api, + provider, + }) + + await runner.start({ log }) + + const res = await runner.exec({ + log, + command: ["echo", "foo"], + }) + + expect(res.log.trim()).to.equal("foo") + }) + + it("throws if execution times out", async () => { + const pod = makePod(["sh", "-c", "sleep 600"]) + + runner = new PodRunner({ + ctx, + pod, + namespace, + api, + provider, + }) + + await runner.start({ log }) + await expectError( + () => runner.exec({ log, command: ["sh", "-c", "sleep 100"], timeoutSec: 1 }), + (err) => expect(err.message).to.equal("Command timed out after 1 seconds.") + ) + }) + + it("throws if command returns non-zero exit code", async () => { + const pod = makePod(["sh", "-c", "sleep 600"]) + + runner = new PodRunner({ + ctx, + pod, + namespace, + api, + provider, + }) + + await runner.start({ log }) + await expectError( + () => runner.exec({ log, command: ["sh", "-c", "echo foo && exit 2"] }), + (err) => expect(err.message.trim()).to.equal("Command exited with code 2:\nfoo") + ) + }) + }) + + describe("getLogs", () => { + it("retrieves the logs from the Pod", async () => { + const pod = makePod(["sh", "-c", "echo foo && sleep 600"]) + + runner = new PodRunner({ + ctx, + pod, + namespace, + api, + provider, + }) + + await runner.start({ log }) + const logs = await runner.getLogs() + expect(logs).to.eql([ + { + containerName: "main", + log: "foo\n", + }, + ]) + }) + + it("retrieves the logs from the Pod after it terminates", async () => { + const pod = makePod(["sh", "-c", "echo foo"]) + + runner = new PodRunner({ + ctx, + pod, + namespace, + api, + provider, + }) + + await runner.start({ log }) + await sleep(500) + + const logs = await runner.getLogs() + expect(logs).to.eql([ + { + containerName: "main", + log: "foo\n", + }, + ]) + }) + }) + + describe("runAndWait", () => { + it("creates a Pod and waits for it to complete before returning the run result", async () => { + const pod = makePod(["sh", "-c", "echo foo"]) + + runner = new PodRunner({ + ctx, + pod, + namespace, + api, + provider, + }) + + const res = await runner.runAndWait({ log, remove: true, tty: false }) + + expect(res.log.trim()).to.equal("foo") + expect(res.success).to.be.true + }) + + it("returns success=false if Pod returns with non-zero exit code", async () => { + const pod = makePod(["sh", "-c", "echo foo && exit 1"]) + + runner = new PodRunner({ + ctx, + pod, + namespace, + api, + provider, + }) + + const res = await runner.runAndWait({ log, remove: true, tty: false }) + + expect(res.log.trim()).to.equal("foo") + expect(res.success).to.be.false + }) + + it("can attach to the Pod and stream outputs", async () => { + const pod = makePod([ + "/bin/sh", + "-c", + dedent` + for i in 1 2 3 4 5 + do + echo "Log line $i" + sleep 1 + done + `, + ]) + + runner = new PodRunner({ + ctx, + pod, + namespace, + api, + provider, + }) + + const stdout = new StringCollector() + + const res = await runner.runAndWait({ log, remove: true, stdout, tty: false }) + + const output = stdout.getString() + + expect(output).to.include("Log line") + expect(res.log.trim()).to.equal(dedent` + Log line 1 + Log line 2 + Log line 3 + Log line 4 + Log line 5 + `) + expect(res.success).to.be.true + }) + + it("throws if Pod is invalid", async () => { + const pod = { + apiVersion: "v1", + kind: "Pod", + metadata: { + name: "!&/$/%#/", + namespace, + }, + spec: { + containers: [ + { + name: "main", + image: "busybox", + command: ["sh", "-c", "echo foo"], + }, + ], + }, + } + + runner = new PodRunner({ + ctx, + pod, + namespace, + api, + + provider, + }) + + await expectError( + () => runner.runAndWait({ log, remove: true, tty: false }), + (err) => expect(err.message).to.include("Failed to create Pod") + ) + }) + + it("throws if Pod cannot start", async () => { + const badImage = randomString(16) + const pod = makePod(["sh", "-c", "echo foo"], badImage) + + runner = new PodRunner({ + ctx, + pod, + namespace, + api, + provider, + }) + + await expectError( + () => runner.runAndWait({ log, remove: true, tty: false }), + (err) => expect(err.message).to.include("Failed to start Pod") + ) + }) + + context("tty=true", () => { + it("attaches to the process stdio during execution", async () => { + const pod = makePod([ + "/bin/sh", + "-c", + dedent` + for i in 1 2 3 4 5 + do + echo "Log line $i" + sleep 1 + done + `, + ]) + + runner = new PodRunner({ + ctx, + pod, + namespace, + api, + provider, + }) + + const res = await runner.runAndWait({ log, remove: true, tty: true }) + + expect(res.log.trim().replace(/\r\n/g, "\n")).to.equal(dedent` + Log line 1 + Log line 2 + Log line 3 + Log line 4 + Log line 5 + `) + expect(res.success).to.be.true + }) + + it("throws if also specifying stdout or stderr", async () => { + const pod = makePod(["sh", "-c", "echo foo"]) + + runner = new PodRunner({ + ctx, + pod, + namespace, + api, + provider, + }) + + await expectError( + () => runner.runAndWait({ log, remove: true, tty: true, stdout: new StringCollector() }), + (err) => expect(err.message).to.equal("Cannot set both tty and stdout/stderr/stdin streams") + ) + }) + }) + }) + }) + + describe("runAndCopy", () => { + let tmpDir: tmp.DirectoryResult + + beforeEach(async () => { + tmpDir = await tmp.dir({ unsafeCleanup: true }) + }) + + afterEach(async () => { + await tmpDir.cleanup() + }) + + it("should run a basic module", async () => { + const module = graph.getModule("simple") + const image = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) + + const result = await runAndCopy({ + ctx: await garden.getPluginContext(provider), + log: garden.log, + command: ["sh", "-c", "echo ok"], + args: [], + interactive: false, + module, + namespace, + runtimeContext: { envVars: {}, dependencies: [] }, + image, + }) + + expect(result.log.trim()).to.equal("ok") + }) + + it("should clean up the created container", async () => { + const module = graph.getModule("simple") + const image = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) + const podName = makePodName("test", module.name) + + await runAndCopy({ + ctx: await garden.getPluginContext(provider), + log: garden.log, + command: ["sh", "-c", "echo ok"], + args: [], + interactive: false, + module, + namespace: provider.config.namespace!, + podName, + runtimeContext: { envVars: {}, dependencies: [] }, + image, + }) + + await expectError( + () => api.core.readNamespacedPod(podName, namespace), + (err) => expect(err.statusCode).to.equal(404) + ) + }) + + it("should return with success=false when command exceeds timeout", async () => { + const task = graph.getTask("artifacts-task") + const module = task.module + const image = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) + + const result = await runAndCopy({ + ctx: await garden.getPluginContext(provider), + log: garden.log, + command: ["sh", "-c", "echo banana && sleep 10"], + args: [], + interactive: false, + module, + namespace, + runtimeContext: { envVars: {}, dependencies: [] }, + image, + timeout: 4, + }) + + // Note: Kubernetes doesn't always return the logs when commands time out. + expect(result.log.trim()).to.include("Command timed out.") + expect(result.success).to.be.false + }) + + context("artifacts are specified", () => { + it("should copy artifacts out of the container", async () => { + const task = graph.getTask("artifacts-task") + const module = task.module + const image = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) + + const result = await runAndCopy({ + ctx: await garden.getPluginContext(provider), + log: garden.log, + command: task.spec.command, + args: [], + interactive: false, + module, + namespace, + runtimeContext: { envVars: {}, dependencies: [] }, + artifacts: task.spec.artifacts, + artifactsPath: tmpDir.path, + image, + }) + + expect(result.log.trim()).to.equal("ok") + expect(await pathExists(join(tmpDir.path, "task.txt"))).to.be.true + expect(await pathExists(join(tmpDir.path, "subdir", "task.txt"))).to.be.true + }) + + it("should clean up the created Pod", async () => { + const task = graph.getTask("artifacts-task") + const module = task.module + const image = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) + const podName = makePodName("test", module.name) + + await runAndCopy({ + ctx: await garden.getPluginContext(provider), + log: garden.log, + command: task.spec.command, + args: [], + interactive: false, + module, + namespace, + podName, + runtimeContext: { envVars: {}, dependencies: [] }, + artifacts: task.spec.artifacts, + artifactsPath: tmpDir.path, + image, + }) + + await expectError( + () => api.core.readNamespacedPod(podName, namespace), + (err) => expect(err.statusCode).to.equal(404) + ) + }) + + it("should handle globs when copying artifacts out of the container", async () => { + const task = graph.getTask("globs-task") + const module = task.module + const image = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) + + await runAndCopy({ + ctx: await garden.getPluginContext(provider), + log: garden.log, + command: task.spec.command, + args: [], + interactive: false, + module, + namespace, + runtimeContext: { envVars: {}, dependencies: [] }, + artifacts: task.spec.artifacts, + artifactsPath: tmpDir.path, + image, + }) + + expect(await pathExists(join(tmpDir.path, "subdir", "task.txt"))).to.be.true + expect(await pathExists(join(tmpDir.path, "output.txt"))).to.be.true + }) + + it("should not throw when an artifact is missing", async () => { + const task = graph.getTask("artifacts-task") + const module = task.module + const image = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) + + await runAndCopy({ + ctx: await garden.getPluginContext(provider), + log: garden.log, + command: ["sh", "-c", "echo ok"], + args: [], + interactive: false, + module, + namespace, + runtimeContext: { envVars: {}, dependencies: [] }, + artifacts: task.spec.artifacts, + artifactsPath: tmpDir.path, + image, + }) + }) + + it("should correctly copy a whole directory", async () => { + const task = graph.getTask("dir-task") + const module = task.module + const image = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) + + await runAndCopy({ + ctx: await garden.getPluginContext(provider), + log: garden.log, + command: task.spec.command, + args: [], + interactive: false, + module, + namespace, + runtimeContext: { envVars: {}, dependencies: [] }, + artifacts: task.spec.artifacts, + artifactsPath: tmpDir.path, + image, + }) + + expect(await pathExists(join(tmpDir.path, "my-task-report"))).to.be.true + expect(await pathExists(join(tmpDir.path, "my-task-report", "output.txt"))).to.be.true + }) + + it("should return with logs and success=false when command exceeds timeout", async () => { + const task = graph.getTask("artifacts-task") + const module = task.module + const image = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) + + const result = await runAndCopy({ + ctx: await garden.getPluginContext(provider), + log: garden.log, + command: ["sh", "-c", "echo banana && sleep 10"], + args: [], + interactive: false, + module, + namespace, + runtimeContext: { envVars: {}, dependencies: [] }, + artifacts: task.spec.artifacts, + artifactsPath: tmpDir.path, + image, + timeout: 3, + }) + + expect(result.log.trim()).to.equal("Command timed out. Here are the logs until the timeout occurred:\n\nbanana") + expect(result.success).to.be.false + }) + + it("should copy artifacts out of the container even when task times out", async () => { + const task = graph.getTask("artifacts-task") + const module = task.module + const image = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) + + const result = await runAndCopy({ + ctx: await garden.getPluginContext(provider), + log: garden.log, + command: ["sh", "-c", "touch /task.txt && sleep 10"], + args: [], + interactive: false, + module, + namespace, + runtimeContext: { envVars: {}, dependencies: [] }, + artifacts: task.spec.artifacts, + artifactsPath: tmpDir.path, + image, + timeout: 3, + }) + + expect(result.log.trim()).to.equal("Command timed out.") + expect(await pathExists(join(tmpDir.path, "task.txt"))).to.be.true + expect(result.success).to.be.false + }) + + it("should throw when container doesn't contain sh", async () => { + const task = graph.getTask("missing-sh-task") + const module = task.module + const image = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) + + const actions = await garden.getActionRouter() + await garden.buildDir.syncFromSrc(module, garden.log) + await actions.build({ + module, + log: garden.log, + }) + + await expectError( + async () => + runAndCopy({ + ctx: await garden.getPluginContext(provider), + log: garden.log, + command: ["sh", "-c", "echo ok"], + args: [], + interactive: false, + module, + namespace, + runtimeContext: { envVars: {}, dependencies: [] }, + artifacts: task.spec.artifacts, + artifactsPath: tmpDir.path, + description: "Foo", + image, + timeout: 20000, + stdout: process.stdout, + stderr: process.stderr, + }), + (err) => + expect(err.message).to.equal(deline` + Foo specifies artifacts to export, but the image doesn't + contain the sh binary. In order to copy artifacts out of Kubernetes containers, both sh and tar need + to be installed in the image. + `) + ) + }) + + it("should throw when container doesn't contain tar", async () => { + const task = graph.getTask("missing-tar-task") + const module = task.module + const image = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) + + const actions = await garden.getActionRouter() + await garden.buildDir.syncFromSrc(module, garden.log) + await actions.build({ + module, + log: garden.log, + }) + + await expectError( + async () => + runAndCopy({ + ctx: await garden.getPluginContext(provider), + log: garden.log, + command: ["sh", "-c", "echo ok"], + args: [], + interactive: false, + module, + namespace, + runtimeContext: { envVars: {}, dependencies: [] }, + artifacts: task.spec.artifacts, + artifactsPath: tmpDir.path, + description: "Foo", + image, + timeout: 20000, + stdout: process.stdout, + stderr: process.stderr, + }), + (err) => + expect(err.message).to.equal(deline` + Foo specifies artifacts to export, but the image doesn't + contain the tar binary. In order to copy artifacts out of Kubernetes containers, both sh and tar need + to be installed in the image. + `) + ) + }) + + it("should throw when no command is specified", async () => { + const task = graph.getTask("missing-tar-task") + const module = task.module + const image = await containerHelpers.getDeploymentImageId(module, provider.config.deploymentRegistry) + + await expectError( + async () => + runAndCopy({ + ctx: await garden.getPluginContext(provider), + log: garden.log, + args: [], + interactive: false, + module, + namespace, + runtimeContext: { envVars: {}, dependencies: [] }, + artifacts: task.spec.artifacts, + artifactsPath: tmpDir.path, + description: "Foo", + image, + }), + (err) => + expect(err.message).to.equal(deline` + Foo specifies artifacts to export, but doesn't explicitly set a \`command\`. + The kubernetes provider currently requires an explicit command to be set for tests and tasks that + export artifacts, because the image's entrypoint cannot be inferred in that execution mode. + Please set the \`command\` field and try again. + `) + ) + }) + }) + }) +}) diff --git a/core/test/integ/src/plugins/kubernetes/util.ts b/core/test/integ/src/plugins/kubernetes/util.ts index 1feef736d1..7ddcac3333 100644 --- a/core/test/integ/src/plugins/kubernetes/util.ts +++ b/core/test/integ/src/plugins/kubernetes/util.ts @@ -286,19 +286,19 @@ describe("util", () => { it("should get the first container on the resource if no name is specified", async () => { const deployment = await getDeployment() - const expected = deployment.spec.template.spec.containers[0] + const expected = deployment.spec.template.spec!.containers[0] expect(getResourceContainer(deployment)).to.equal(expected) }) it("should pick the container by name if specified", async () => { const deployment = await getDeployment() - const expected = deployment.spec.template.spec.containers[0] + const expected = deployment.spec.template.spec!.containers[0] expect(getResourceContainer(deployment, "api")).to.equal(expected) }) it("should throw if no containers are in resource", async () => { const deployment = await getDeployment() - deployment.spec.template.spec.containers = [] + deployment.spec.template.spec!.containers = [] await expectError( () => getResourceContainer(deployment), (err) => expect(err.message).to.equal("Deployment api-release has no containers configured.") diff --git a/docs/reference/commands.md b/docs/reference/commands.md index 28a812639d..49a485d8fb 100644 --- a/docs/reference/commands.md +++ b/docs/reference/commands.md @@ -191,6 +191,9 @@ tests: # The command that was run in the module. command: + # The exit code of the run (if applicable). + exitCode: + # When the module run was started. startedAt: @@ -773,6 +776,9 @@ tests: # The command that was run in the module. command: + # The exit code of the run (if applicable). + exitCode: + # When the module run was started. startedAt: @@ -1575,6 +1581,9 @@ command: # Whether the module was successfully run. success: +# The exit code of the run (if applicable). +exitCode: + # When the module run was started. startedAt: @@ -1989,6 +1998,9 @@ tests: # The command that was run in the module. command: + # The exit code of the run (if applicable). + exitCode: + # When the module run was started. startedAt: @@ -2240,6 +2252,9 @@ result: # The command that was run in the module. command: + # The exit code of the run (if applicable). + exitCode: + # When the module run was started. startedAt: @@ -2507,6 +2522,9 @@ tests: # The command that was run in the module. command: + # The exit code of the run (if applicable). + exitCode: + # When the module run was started. startedAt: