Skip to content

Commit

Permalink
fix(k8s): handle logs properly for all module types and resources
Browse files Browse the repository at this point in the history
  • Loading branch information
edvald authored and thsig committed Mar 28, 2019
1 parent 68b27aa commit 56a15ba
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 85 deletions.
8 changes: 5 additions & 3 deletions garden-service/src/plugins/kubernetes/container/logs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@
import { GetServiceLogsParams } from "../../../types/plugin/params"
import { ContainerModule } from "../../container/config"
import { getAppNamespace } from "../namespace"
import { getKubernetesLogs } from "../logs"
import { getAllLogs } from "../logs"
import { KubernetesPluginContext } from "../kubernetes"
import { createDeployment } from "./deployment"

export async function getServiceLogs(params: GetServiceLogsParams<ContainerModule>) {
const { ctx, service } = params
const k8sCtx = <KubernetesPluginContext>ctx
const context = k8sCtx.provider.config.context
const namespace = await getAppNamespace(k8sCtx, k8sCtx.provider)
const selector = `service=${service.name}`

return getKubernetesLogs({ ...params, context, namespace, selector })
const resources = [await createDeployment(k8sCtx.provider, service, params.runtimeContext, namespace, false)]

return getAllLogs({ ...params, context, namespace, resources })
}
10 changes: 6 additions & 4 deletions garden-service/src/plugins/kubernetes/helm/logs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@

import { GetServiceLogsParams } from "../../../types/plugin/params"
import { getAppNamespace } from "../namespace"
import { getKubernetesLogs } from "../logs"
import { getAllLogs } from "../logs"
import { HelmModule } from "./config"
import { KubernetesPluginContext } from "../kubernetes"
import { getChartResources } from "./common"

export async function getServiceLogs(params: GetServiceLogsParams<HelmModule>) {
const { ctx, service } = params
const { ctx, module, log } = params
const k8sCtx = <KubernetesPluginContext>ctx
const context = k8sCtx.provider.config.context
const namespace = await getAppNamespace(k8sCtx, k8sCtx.provider)
const selector = `app.kubernetes.io/name=${service.name}`

return getKubernetesLogs({ ...params, context, namespace, selector })
const resources = await getChartResources(k8sCtx, module, log)

return getAllLogs({ ...params, context, namespace, resources })
}
113 changes: 54 additions & 59 deletions garden-service/src/plugins/kubernetes/logs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,71 +7,80 @@
*/

import * as split from "split"
import { omit } from "lodash"
import moment = require("moment")
import JSONStream = require("JSONStream")

import { GetServiceLogsResult } from "../../types/plugin/outputs"
import { GetServiceLogsParams } from "../../types/plugin/params"
import { GetServiceLogsResult, ServiceLogEntry } from "../../types/plugin/outputs"
import { splitFirst } from "../../util/util"
import { BinaryCmd } from "../../util/ext-tools"
import { kubectl } from "./kubectl"
import { KubernetesResource } from "./types"
import { getAllPodNames } from "./util"
import { KubeApi } from "./api"
import { Service } from "../../types/service"
import Stream from "ts-stream"

interface GetKubernetesLogsParams extends GetServiceLogsParams {
interface GetLogsBaseParams {
context: string
namespace: string
selector: string
service: Service
stream: Stream<ServiceLogEntry>
follow: boolean
tail: number
}

export async function getKubernetesLogs(params: GetKubernetesLogsParams) {
// Currently Stern doesn't support just returning the logs and exiting, it can only follow
const proc = params.follow
? await followLogs(params)
: await getLogs(params)
interface GetPodLogsParams extends GetLogsBaseParams {
podNames: string[]
}

return new Promise<GetServiceLogsResult>((resolve, reject) => {
proc.on("error", reject)
interface GetAllLogsParams extends GetLogsBaseParams {
resources: KubernetesResource[]
}

proc.on("exit", () => {
resolve({})
})
})
interface GetLogsParams extends GetLogsBaseParams {
podName: string
}

async function followLogs({ context, namespace, service, selector, stream, log, tail }: GetKubernetesLogsParams) {
const args = [
"--color", "never",
"--context", context,
"--namespace", namespace,
"--output", "json",
"--selector", selector,
"--tail", String(tail),
"--timestamps",
]
/**
* Stream all logs for the given pod names and service.
*/
export async function getPodLogs(params: GetPodLogsParams) {
const procs = params.podNames.map(podName => getLogs({ ...omit(params, "podNames"), podName }))

const proc = await stern.spawn({ args, log })
let timestamp: Date | undefined
return new Promise<GetServiceLogsResult>((resolve, reject) => {
for (const proc of procs) {
proc.on("error", reject)

proc.stdout!
.pipe(JSONStream.parse(["message"], (message) => {
const [timestampStr, msg] = splitFirst(message, " ")
try {
timestamp = moment(timestampStr).toDate()
} catch { }
void stream.write({ serviceName: service.name, timestamp, msg: msg.trimRight() })
}))
proc.on("exit", () => {
resolve({})
})
}
})
}

return proc
/**
* Stream all logs for the given resources and service.
*/
export async function getAllLogs(params: GetAllLogsParams) {
const api = new KubeApi(params.context)
const podNames = await getAllPodNames(api, params.namespace, params.resources)
return getPodLogs({ ...params, podNames })
}

async function getLogs({ context, namespace, service, selector, stream, tail }: GetKubernetesLogsParams) {
function getLogs({ context, namespace, service, stream, tail, follow, podName }: GetLogsParams) {
// TODO: do this via API instead of kubectl
const kubectlArgs = [
"logs",
"--selector", selector,
"--tail", String(tail),
"--timestamps=true",
"--all-containers=true",
]

if (follow) {
kubectlArgs.push("--follow=true")
}

kubectlArgs.push(`pod/${podName}`)

const proc = kubectl(context, namespace).spawn(kubectlArgs)
let timestamp: Date

Expand All @@ -85,26 +94,12 @@ async function getLogs({ context, namespace, service, selector, stream, tail }:
try {
timestamp = moment(timestampStr).toDate()
} catch { }
void stream.write({ serviceName: service.name, timestamp, msg })
void stream.write({
serviceName: service.name,
timestamp,
msg: `${podName} ${msg}`,
})
})

return proc
}

const stern = new BinaryCmd({
name: "stern",
specs: {
darwin: {
url: "https://github.com/wercker/stern/releases/download/1.10.0/stern_darwin_amd64",
sha256: "b91dbcfd3bbda69cd7a7abd80a225ce5f6bb9d6255b7db192de84e80e4e547b7",
},
linux: {
url: "https://github.com/wercker/stern/releases/download/1.10.0/stern_linux_amd64",
sha256: "a0335b298f6a7922c35804bffb32a68508077b2f35aaef44d9eb116f36bc7eda",
},
win32: {
url: "https://github.com/wercker/stern/releases/download/1.10.0/stern_windows_amd64.exe",
sha256: "8cb94d3f47c831f2b0a59286336b41569ab38cb1528755545cb490536274f885",
},
},
})
14 changes: 2 additions & 12 deletions garden-service/src/plugins/kubernetes/status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import { isSubset } from "../../util/is-subset"
import { LogEntry } from "../../logger/log-entry"
import { V1ReplicationController, V1ReplicaSet } from "@kubernetes/client-node"
import dedent = require("dedent")
import { getWorkloadPods, getPods } from "./util"

interface WorkloadStatus {
state: ServiceState
Expand Down Expand Up @@ -196,7 +197,7 @@ export async function checkWorkloadStatus(
` + logs
}
} else {
const pods = await getPods(api, namespace, statusRes.spec.selector.matchLabels)
const pods = await getWorkloadPods(api, namespace, statusRes)
const logs = await getPodLogs(api, namespace, pods.map(pod => pod.metadata.name))

if (logs) {
Expand Down Expand Up @@ -591,17 +592,6 @@ function removeNull<T>(value: T | Iterable<T>): T | Iterable<T> | { [K in keyof
}
}

/**
* Retrieve a list of pods based on the provided label selector.
*/
async function getPods(api: KubeApi, namespace: string, selector: { [key: string]: string }): Promise<V1Pod[]> {
const selectorString = Object.entries(selector).map(([k, v]) => `${k}=${v}`).join(",")
const res = await api.core.listNamespacedPod(
namespace, undefined, undefined, undefined, true, selectorString,
)
return res.body.items
}

/**
* Get a formatted list of log tails for each of the specified pods. Used for debugging and error logs.
*/
Expand Down
80 changes: 79 additions & 1 deletion garden-service/src/plugins/kubernetes/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,87 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

import { get } from "lodash"
import * as Bluebird from "bluebird"
import { get, flatten, uniqBy } from "lodash"
import { V1Pod } from "@kubernetes/client-node"
import { KubernetesResource } from "./types"
import { splitLast } from "../../util/util"
import { KubeApi } from "./api"

export const workloadTypes = ["Deployment", "DaemonSet", "ReplicaSet", "StatefulSet"]

export function getAnnotation(obj: KubernetesResource, key: string): string | null {
return get(obj, ["metadata", "annotations", key])
}

/**
* Given a list of resources, get all the associated pods.
*/
export async function getAllPods(api: KubeApi, namespace: string, resources: KubernetesResource[]): Promise<V1Pod[]> {
const pods = flatten(await Bluebird.map(resources, async (resource) => {
if (resource.apiVersion === "v1" && resource.kind === "Pod") {
return [<V1Pod>resource]
}

if (isWorkload(resource)) {
return getWorkloadPods(api, namespace, resource)
}

return []
}))

return <V1Pod[]>deduplicateResources(pods)
}

/**
* Given a list of resources, get the names of all the associated pod.
*/
export async function getAllPodNames(api: KubeApi, namespace: string, resources: KubernetesResource[]) {
return (await getAllPods(api, namespace, resources)).map(p => p.metadata.name)
}

/**
* Retrieve a list of pods based on the provided label selector.
*/
export async function getWorkloadPods(api: KubeApi, namespace: string, resource: KubernetesResource): Promise<V1Pod[]> {
const selector = resource.spec.selector.matchLabels
return getPods(api, resource.metadata.namespace || namespace, selector)
}

/**
* Retrieve a list of pods based on the provided label selector.
*/
export async function getPods(api: KubeApi, namespace: string, selector: { [key: string]: string }): Promise<V1Pod[]> {
const selectorString = Object.entries(selector).map(([k, v]) => `${k}=${v}`).join(",")
const res = await api.core.listNamespacedPod(
namespace, undefined, undefined, undefined, true, selectorString,
)
return res.body.items
}

/**
* Returns the API group of the resource. Returns empty string for "v1" objects.
*/
export function getApiGroup(resource: KubernetesResource) {
const split = splitLast(resource.apiVersion, "/")
return split.length === 1 ? "" : split[0]
}

/**
* Returns true if the resource is a built-in Kubernetes workload type.
*/
export function isWorkload(resource: KubernetesResource) {
return isBuiltIn(resource) && workloadTypes.includes(resource.kind)
}

/**
* Returns true if the resource is a built-in Kubernetes type (e.g. v1, apps/*, *.k8s.io/*)
*/
export function isBuiltIn(resource: KubernetesResource) {
const apiGroup = getApiGroup(resource)
return apiGroup.endsWith("k8s.io") || !apiGroup.includes(".")
}

export function deduplicateResources(resources: KubernetesResource[]) {
return uniqBy(resources, r => `${r.apiVersion}/${r.kind}`)
}
19 changes: 13 additions & 6 deletions garden-service/src/plugins/openfaas/openfaas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import { Provider, providerConfigBaseSchema } from "../../config/project"
import { faasCli } from "./faas-cli"
import { CleanupEnvironmentParams } from "../../types/plugin/params"
import dedent = require("dedent")
import { getKubernetesLogs } from "../kubernetes/logs"
import { getAllLogs } from "../kubernetes/logs"
import { installTiller, checkTillerStatus } from "../kubernetes/helm/tiller"

const systemProjectPath = join(STATIC_DIR, "openfaas", "system")
Expand Down Expand Up @@ -230,8 +230,11 @@ export function gardenPlugin(): GardenPlugin {
const k8sProvider = getK8sProvider(<OpenFaasPluginContext>ctx)
const context = k8sProvider.config.context
const namespace = await getAppNamespace(ctx, k8sProvider)
const selector = `faas_function=${service.name}`
return getKubernetesLogs({ ...params, context, namespace, selector })

const api = new KubeApi(k8sProvider.config.context)
const resources = await getResources(api, service, namespace)

return getAllLogs({ ...params, context, namespace, resources })
},

async deployService(params: DeployServiceParams<OpenFaasModule>): Promise<ServiceStatus> {
Expand All @@ -253,15 +256,14 @@ export function gardenPlugin(): GardenPlugin {
const k8sProvider = getK8sProvider(openFaasCtx)
const namespace = await getAppNamespace(openFaasCtx, k8sProvider)
const api = new KubeApi(k8sProvider.config.context)

const deployment = (await api.apps.readNamespacedDeployment(service.name, namespace)).body
const resources = await getResources(api, service, namespace)

await waitForResources({
ctx: openFaasCtx,
provider: k8sProvider,
serviceName: service.name,
log,
resources: [deployment],
resources,
})

// TODO: avoid duplicate work here
Expand Down Expand Up @@ -330,6 +332,11 @@ async function writeStackFile(
})
}

async function getResources(api: KubeApi, service: OpenFaasService, namespace: string) {
const deployment = (await api.apps.readNamespacedDeployment(service.name, namespace)).body
return [deployment]
}

async function getServiceStatus({ ctx, module, service, log }: GetServiceStatusParams<OpenFaasModule>) {
const openFaasCtx = <OpenFaasPluginContext>ctx
const k8sProvider = getK8sProvider(openFaasCtx)
Expand Down

0 comments on commit 56a15ba

Please sign in to comment.