From 56a15ba9a4267886566e27d0df4b08280be63c8b Mon Sep 17 00:00:00 2001 From: Jon Edvald Date: Thu, 28 Mar 2019 05:32:58 +0100 Subject: [PATCH] fix(k8s): handle logs properly for all module types and resources --- .../src/plugins/kubernetes/container/logs.ts | 8 +- .../src/plugins/kubernetes/helm/logs.ts | 10 +- garden-service/src/plugins/kubernetes/logs.ts | 113 +++++++++--------- .../src/plugins/kubernetes/status.ts | 14 +-- garden-service/src/plugins/kubernetes/util.ts | 80 ++++++++++++- .../src/plugins/openfaas/openfaas.ts | 19 ++- 6 files changed, 159 insertions(+), 85 deletions(-) diff --git a/garden-service/src/plugins/kubernetes/container/logs.ts b/garden-service/src/plugins/kubernetes/container/logs.ts index 5fce876b09..57f21751af 100644 --- a/garden-service/src/plugins/kubernetes/container/logs.ts +++ b/garden-service/src/plugins/kubernetes/container/logs.ts @@ -9,15 +9,17 @@ import { GetServiceLogsParams } from "../../../types/plugin/params" import { ContainerModule } from "../../container/config" import { getAppNamespace } from "../namespace" -import { getKubernetesLogs } from "../logs" +import { getAllLogs } from "../logs" import { KubernetesPluginContext } from "../kubernetes" +import { createDeployment } from "./deployment" export async function getServiceLogs(params: GetServiceLogsParams) { const { ctx, service } = params const k8sCtx = ctx const context = k8sCtx.provider.config.context const namespace = await getAppNamespace(k8sCtx, k8sCtx.provider) - const selector = `service=${service.name}` - return getKubernetesLogs({ ...params, context, namespace, selector }) + const resources = [await createDeployment(k8sCtx.provider, service, params.runtimeContext, namespace, false)] + + return getAllLogs({ ...params, context, namespace, resources }) } diff --git a/garden-service/src/plugins/kubernetes/helm/logs.ts b/garden-service/src/plugins/kubernetes/helm/logs.ts index 728bfcebc6..54eb2cd26d 100644 --- a/garden-service/src/plugins/kubernetes/helm/logs.ts +++ b/garden-service/src/plugins/kubernetes/helm/logs.ts @@ -8,16 +8,18 @@ import { GetServiceLogsParams } from "../../../types/plugin/params" import { getAppNamespace } from "../namespace" -import { getKubernetesLogs } from "../logs" +import { getAllLogs } from "../logs" import { HelmModule } from "./config" import { KubernetesPluginContext } from "../kubernetes" +import { getChartResources } from "./common" export async function getServiceLogs(params: GetServiceLogsParams) { - const { ctx, service } = params + const { ctx, module, log } = params const k8sCtx = ctx const context = k8sCtx.provider.config.context const namespace = await getAppNamespace(k8sCtx, k8sCtx.provider) - const selector = `app.kubernetes.io/name=${service.name}` - return getKubernetesLogs({ ...params, context, namespace, selector }) + const resources = await getChartResources(k8sCtx, module, log) + + return getAllLogs({ ...params, context, namespace, resources }) } diff --git a/garden-service/src/plugins/kubernetes/logs.ts b/garden-service/src/plugins/kubernetes/logs.ts index c0d850529a..44d65f7ad4 100644 --- a/garden-service/src/plugins/kubernetes/logs.ts +++ b/garden-service/src/plugins/kubernetes/logs.ts @@ -7,71 +7,80 @@ */ import * as split from "split" +import { omit } from "lodash" import moment = require("moment") -import JSONStream = require("JSONStream") -import { GetServiceLogsResult } from "../../types/plugin/outputs" -import { GetServiceLogsParams } from "../../types/plugin/params" +import { GetServiceLogsResult, ServiceLogEntry } from "../../types/plugin/outputs" import { splitFirst } from "../../util/util" -import { BinaryCmd } from "../../util/ext-tools" import { kubectl } from "./kubectl" +import { KubernetesResource } from "./types" +import { getAllPodNames } from "./util" +import { KubeApi } from "./api" +import { Service } from "../../types/service" +import Stream from "ts-stream" -interface GetKubernetesLogsParams extends GetServiceLogsParams { +interface GetLogsBaseParams { context: string namespace: string - selector: string + service: Service + stream: Stream + follow: boolean + tail: number } -export async function getKubernetesLogs(params: GetKubernetesLogsParams) { - // Currently Stern doesn't support just returning the logs and exiting, it can only follow - const proc = params.follow - ? await followLogs(params) - : await getLogs(params) +interface GetPodLogsParams extends GetLogsBaseParams { + podNames: string[] +} - return new Promise((resolve, reject) => { - proc.on("error", reject) +interface GetAllLogsParams extends GetLogsBaseParams { + resources: KubernetesResource[] +} - proc.on("exit", () => { - resolve({}) - }) - }) +interface GetLogsParams extends GetLogsBaseParams { + podName: string } -async function followLogs({ context, namespace, service, selector, stream, log, tail }: GetKubernetesLogsParams) { - const args = [ - "--color", "never", - "--context", context, - "--namespace", namespace, - "--output", "json", - "--selector", selector, - "--tail", String(tail), - "--timestamps", - ] +/** + * Stream all logs for the given pod names and service. + */ +export async function getPodLogs(params: GetPodLogsParams) { + const procs = params.podNames.map(podName => getLogs({ ...omit(params, "podNames"), podName })) - const proc = await stern.spawn({ args, log }) - let timestamp: Date | undefined + return new Promise((resolve, reject) => { + for (const proc of procs) { + proc.on("error", reject) - proc.stdout! - .pipe(JSONStream.parse(["message"], (message) => { - const [timestampStr, msg] = splitFirst(message, " ") - try { - timestamp = moment(timestampStr).toDate() - } catch { } - void stream.write({ serviceName: service.name, timestamp, msg: msg.trimRight() }) - })) + proc.on("exit", () => { + resolve({}) + }) + } + }) +} - return proc +/** + * Stream all logs for the given resources and service. + */ +export async function getAllLogs(params: GetAllLogsParams) { + const api = new KubeApi(params.context) + const podNames = await getAllPodNames(api, params.namespace, params.resources) + return getPodLogs({ ...params, podNames }) } -async function getLogs({ context, namespace, service, selector, stream, tail }: GetKubernetesLogsParams) { +function getLogs({ context, namespace, service, stream, tail, follow, podName }: GetLogsParams) { // TODO: do this via API instead of kubectl const kubectlArgs = [ "logs", - "--selector", selector, "--tail", String(tail), "--timestamps=true", + "--all-containers=true", ] + if (follow) { + kubectlArgs.push("--follow=true") + } + + kubectlArgs.push(`pod/${podName}`) + const proc = kubectl(context, namespace).spawn(kubectlArgs) let timestamp: Date @@ -85,26 +94,12 @@ async function getLogs({ context, namespace, service, selector, stream, tail }: try { timestamp = moment(timestampStr).toDate() } catch { } - void stream.write({ serviceName: service.name, timestamp, msg }) + void stream.write({ + serviceName: service.name, + timestamp, + msg: `${podName} ${msg}`, + }) }) return proc } - -const stern = new BinaryCmd({ - name: "stern", - specs: { - darwin: { - url: "https://github.com/wercker/stern/releases/download/1.10.0/stern_darwin_amd64", - sha256: "b91dbcfd3bbda69cd7a7abd80a225ce5f6bb9d6255b7db192de84e80e4e547b7", - }, - linux: { - url: "https://github.com/wercker/stern/releases/download/1.10.0/stern_linux_amd64", - sha256: "a0335b298f6a7922c35804bffb32a68508077b2f35aaef44d9eb116f36bc7eda", - }, - win32: { - url: "https://github.com/wercker/stern/releases/download/1.10.0/stern_windows_amd64.exe", - sha256: "8cb94d3f47c831f2b0a59286336b41569ab38cb1528755545cb490536274f885", - }, - }, -}) diff --git a/garden-service/src/plugins/kubernetes/status.ts b/garden-service/src/plugins/kubernetes/status.ts index 36cfa62d0c..e9f6e4f387 100644 --- a/garden-service/src/plugins/kubernetes/status.ts +++ b/garden-service/src/plugins/kubernetes/status.ts @@ -31,6 +31,7 @@ import { isSubset } from "../../util/is-subset" import { LogEntry } from "../../logger/log-entry" import { V1ReplicationController, V1ReplicaSet } from "@kubernetes/client-node" import dedent = require("dedent") +import { getWorkloadPods, getPods } from "./util" interface WorkloadStatus { state: ServiceState @@ -196,7 +197,7 @@ export async function checkWorkloadStatus( ` + logs } } else { - const pods = await getPods(api, namespace, statusRes.spec.selector.matchLabels) + const pods = await getWorkloadPods(api, namespace, statusRes) const logs = await getPodLogs(api, namespace, pods.map(pod => pod.metadata.name)) if (logs) { @@ -591,17 +592,6 @@ function removeNull(value: T | Iterable): T | Iterable | { [K in keyof } } -/** - * Retrieve a list of pods based on the provided label selector. - */ -async function getPods(api: KubeApi, namespace: string, selector: { [key: string]: string }): Promise { - const selectorString = Object.entries(selector).map(([k, v]) => `${k}=${v}`).join(",") - const res = await api.core.listNamespacedPod( - namespace, undefined, undefined, undefined, true, selectorString, - ) - return res.body.items -} - /** * Get a formatted list of log tails for each of the specified pods. Used for debugging and error logs. */ diff --git a/garden-service/src/plugins/kubernetes/util.ts b/garden-service/src/plugins/kubernetes/util.ts index 0b2d8d9b2c..ccdf9fc781 100644 --- a/garden-service/src/plugins/kubernetes/util.ts +++ b/garden-service/src/plugins/kubernetes/util.ts @@ -6,9 +6,87 @@ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -import { get } from "lodash" +import * as Bluebird from "bluebird" +import { get, flatten, uniqBy } from "lodash" +import { V1Pod } from "@kubernetes/client-node" import { KubernetesResource } from "./types" +import { splitLast } from "../../util/util" +import { KubeApi } from "./api" + +export const workloadTypes = ["Deployment", "DaemonSet", "ReplicaSet", "StatefulSet"] export function getAnnotation(obj: KubernetesResource, key: string): string | null { return get(obj, ["metadata", "annotations", key]) } + +/** + * Given a list of resources, get all the associated pods. + */ +export async function getAllPods(api: KubeApi, namespace: string, resources: KubernetesResource[]): Promise { + const pods = flatten(await Bluebird.map(resources, async (resource) => { + if (resource.apiVersion === "v1" && resource.kind === "Pod") { + return [resource] + } + + if (isWorkload(resource)) { + return getWorkloadPods(api, namespace, resource) + } + + return [] + })) + + return deduplicateResources(pods) +} + +/** + * Given a list of resources, get the names of all the associated pod. + */ +export async function getAllPodNames(api: KubeApi, namespace: string, resources: KubernetesResource[]) { + return (await getAllPods(api, namespace, resources)).map(p => p.metadata.name) +} + +/** + * Retrieve a list of pods based on the provided label selector. + */ +export async function getWorkloadPods(api: KubeApi, namespace: string, resource: KubernetesResource): Promise { + const selector = resource.spec.selector.matchLabels + return getPods(api, resource.metadata.namespace || namespace, selector) +} + +/** + * Retrieve a list of pods based on the provided label selector. + */ +export async function getPods(api: KubeApi, namespace: string, selector: { [key: string]: string }): Promise { + const selectorString = Object.entries(selector).map(([k, v]) => `${k}=${v}`).join(",") + const res = await api.core.listNamespacedPod( + namespace, undefined, undefined, undefined, true, selectorString, + ) + return res.body.items +} + +/** + * Returns the API group of the resource. Returns empty string for "v1" objects. + */ +export function getApiGroup(resource: KubernetesResource) { + const split = splitLast(resource.apiVersion, "/") + return split.length === 1 ? "" : split[0] +} + +/** + * Returns true if the resource is a built-in Kubernetes workload type. + */ +export function isWorkload(resource: KubernetesResource) { + return isBuiltIn(resource) && workloadTypes.includes(resource.kind) +} + +/** + * Returns true if the resource is a built-in Kubernetes type (e.g. v1, apps/*, *.k8s.io/*) + */ +export function isBuiltIn(resource: KubernetesResource) { + const apiGroup = getApiGroup(resource) + return apiGroup.endsWith("k8s.io") || !apiGroup.includes(".") +} + +export function deduplicateResources(resources: KubernetesResource[]) { + return uniqBy(resources, r => `${r.apiVersion}/${r.kind}`) +} diff --git a/garden-service/src/plugins/openfaas/openfaas.ts b/garden-service/src/plugins/openfaas/openfaas.ts index 17c57fd4e9..a095d06191 100644 --- a/garden-service/src/plugins/openfaas/openfaas.ts +++ b/garden-service/src/plugins/openfaas/openfaas.ts @@ -53,7 +53,7 @@ import { Provider, providerConfigBaseSchema } from "../../config/project" import { faasCli } from "./faas-cli" import { CleanupEnvironmentParams } from "../../types/plugin/params" import dedent = require("dedent") -import { getKubernetesLogs } from "../kubernetes/logs" +import { getAllLogs } from "../kubernetes/logs" import { installTiller, checkTillerStatus } from "../kubernetes/helm/tiller" const systemProjectPath = join(STATIC_DIR, "openfaas", "system") @@ -230,8 +230,11 @@ export function gardenPlugin(): GardenPlugin { const k8sProvider = getK8sProvider(ctx) const context = k8sProvider.config.context const namespace = await getAppNamespace(ctx, k8sProvider) - const selector = `faas_function=${service.name}` - return getKubernetesLogs({ ...params, context, namespace, selector }) + + const api = new KubeApi(k8sProvider.config.context) + const resources = await getResources(api, service, namespace) + + return getAllLogs({ ...params, context, namespace, resources }) }, async deployService(params: DeployServiceParams): Promise { @@ -253,15 +256,14 @@ export function gardenPlugin(): GardenPlugin { const k8sProvider = getK8sProvider(openFaasCtx) const namespace = await getAppNamespace(openFaasCtx, k8sProvider) const api = new KubeApi(k8sProvider.config.context) - - const deployment = (await api.apps.readNamespacedDeployment(service.name, namespace)).body + const resources = await getResources(api, service, namespace) await waitForResources({ ctx: openFaasCtx, provider: k8sProvider, serviceName: service.name, log, - resources: [deployment], + resources, }) // TODO: avoid duplicate work here @@ -330,6 +332,11 @@ async function writeStackFile( }) } +async function getResources(api: KubeApi, service: OpenFaasService, namespace: string) { + const deployment = (await api.apps.readNamespacedDeployment(service.name, namespace)).body + return [deployment] +} + async function getServiceStatus({ ctx, module, service, log }: GetServiceStatusParams) { const openFaasCtx = ctx const k8sProvider = getK8sProvider(openFaasCtx)