From 04c4a4ac51047e9038ef4b57f02957f07dec08aa Mon Sep 17 00:00:00 2001 From: Thorarinn Sigurdsson Date: Tue, 7 Mar 2023 16:46:17 +0100 Subject: [PATCH] fix(k8s): more stable & performant log streaming (#3730) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(k8s): more stable & performant log streaming Fixes an issue with very high upload bandwidth use when running Kubernetes based tests/tasks that produce a lot of log output. co-written by @stefreak and @thsig - Improved connection management and pod lifecycle logic, including more robust connection timeout enforcement. - Removed keepalive logic, since it doesn't work on all operating systems. - Improved deduplication logic to generate fewer false positives (and eliminate false-negatives). - Use sinceTime when fetching logs on retry to make sure we don't fetch any unnecessary logs. - When a runner pod terminates, we make sure to wait until the final logs have been fetched. - Default to using the tail option in conjunction with a "max log lines in memory" setting instead of limitBytes to avoid clipping / incomplete log lines while also avoiding the loading of too much log data into memory. - Only start one connection attempt at a time, to prevent multiple connections to the same container at once. - Make sure that we only call createConnections once it has finished, so that there is only one concurrent instance of the method running per LogFollower at a time. Fixes #3586. co-authored-by: thsig co-authored-by: Steffen Neubauer * fix(k8s): make sure LogFollower only connects once I noticed the following log message when I increased latency & packet loss in Network Link Conditioner: ``` [silly] ``` This means the connection is not established yet, but the LogFollower is connecting yet again (which causes a vicious cycle and makes the internet connection even worse). This is probably the root cause for the issue described in #3586. With this bug fixed, I am 100% certain this PR Fixes #3586 Co-authored-by: Thorarinn Sigurdsson co-authored-by: Steffen Neubauer * improvement(k8s): cap age of logs on retry attempt in `garden logs` When streaming logs from the k8s api using `garden logs`, we do not want to stream old log messages as the user might have been disconnected for a long time (e.g. when the laptop went to sleep) Co-authored-by: Eyþór Magnússon co-authored-by: Steffen Neubauer --------- Co-authored-by: Steffen Neubauer Co-authored-by: thsig Co-authored-by: Eyþór Magnússon --- core/src/commands/logs.ts | 8 +- core/src/plugins/kubernetes/logs.ts | 535 +++++++++++------- core/src/plugins/kubernetes/run.ts | 11 +- core/src/plugins/kubernetes/util.ts | 4 + .../src/plugins/kubernetes/container/logs.ts | 14 +- docs/reference/commands.md | 6 +- 6 files changed, 364 insertions(+), 214 deletions(-) diff --git a/core/src/commands/logs.ts b/core/src/commands/logs.ts index 8ec6f946f6..fe7513e540 100644 --- a/core/src/commands/logs.ts +++ b/core/src/commands/logs.ts @@ -42,7 +42,9 @@ const logsOpts = { `, }), "follow": new BooleanParameter({ - help: "Continuously stream new logs from the service(s).", + help: deline` + Continuously stream new logs from the service(s). + When the \`--follow\` option is set, we default to \`--since 1m\`.`, alias: "f", }), "tail": new IntegerParameter({ @@ -104,8 +106,8 @@ export class LogsCommand extends Command { help = "Retrieves the most recent logs for the specified service(s)." description = dedent` - Outputs logs for all or specified services, and optionally waits for news logs to come in. Defaults - to getting logs from the last minute when in \`--follow\` mode. You can change this with the \`--since\` option. + Outputs logs for all or specified services, and optionally waits for news logs to come in. Defaults to getting logs + from the last minute when in \`--follow\` mode. You can change this with the \`--since\` or \`--tail\` options. Examples: diff --git a/core/src/plugins/kubernetes/logs.ts b/core/src/plugins/kubernetes/logs.ts index 0a7a50193f..8dcdeecef0 100644 --- a/core/src/plugins/kubernetes/logs.ts +++ b/core/src/plugins/kubernetes/logs.ts @@ -7,13 +7,12 @@ */ import { omit, sortBy } from "lodash" -import moment from "moment" import parseDuration from "parse-duration" import { ServiceLogEntry } from "../../types/plugin/service/getServiceLogs" import { KubernetesResource, KubernetesPod, BaseResource } from "./types" -import { getAllPods } from "./util" -import { KubeApi } from "./api" +import { getAllPods, summarize } from "./util" +import { KubeApi, KubernetesError } from "./api" import { GardenService } from "../../types/service" import Stream from "ts-stream" import { LogEntry } from "../../logger/log-entry" @@ -21,14 +20,17 @@ import Bluebird from "bluebird" import { KubernetesProvider } from "./config" import { PluginToolSpec } from "../../types/plugin/tools" import { PluginContext } from "../../plugin-context" -import { getPodLogs } from "./status/pod" -import { splitFirst, isValidDateInstance } from "../../util/util" +import { checkPodStatus, getPodLogs } from "./status/pod" +import { splitFirst, isValidDateInstance, sleep } from "../../util/util" import { Writable } from "stream" import request from "request" import { LogLevel } from "../../logger/logger" +import { clearTimeout } from "timers" +import { HttpError } from "@kubernetes/client-node" // When not following logs, the entire log is read into memory and sorted. // We therefore set a maximum on the number of lines we fetch. + const maxLogLinesInMemory = 100000 interface GetAllLogsParams { @@ -44,6 +46,11 @@ interface GetAllLogsParams { resources: KubernetesResource[] } +export interface LogEntryBase { + msg: string + timestamp?: Date +} + /** * Stream all logs for the given resources and service. */ @@ -58,7 +65,10 @@ export async function streamK8sLogs(params: GetAllLogsParams) { logsFollower.close() }) - await logsFollower.followLogs({ tail: params.tail, since: params.since, limitBytes: null }) + // We use sinceOnRetry 30s here, to cap the maximum age of log messages on retry attempts to max 30s + // because we don't want to spam users with old log messages if they were running `garden logs` and then + // disconnected for a long time, e.g. because the laptop was in sleep. + await logsFollower.followLogs({ tail: params.tail, since: params.since, sinceOnRetry: "30s" }) } else { const pods = await getAllPods(api, params.defaultNamespace, params.resources) let tail = params.tail @@ -70,34 +80,32 @@ export async function streamK8sLogs(params: GetAllLogsParams) { params.log.debug(`Tail parameter not set explicitly. Setting to ${tail} to prevent log overflow.`) } - await Bluebird.map(pods, (pod) => readLogs({ ...omit(params, "pods"), entryConverter, pod, tail })) + const { stream } = params + await Bluebird.map(pods, async (pod) => { + const serviceLogEntries = await readLogs({ ...omit(params, "pods", "stream"), entryConverter, pod, tail, api }) + for (const entry of sortBy(serviceLogEntries, "timestamp")) { + void stream.write(entry) + } + }) } return {} } -async function readLogs({ - log, - ctx, - provider, - stream, +async function readLogs({ + api, entryConverter, tail, pod, defaultNamespace, since, }: { - log: LogEntry - ctx: PluginContext - provider: KubernetesProvider - stream: Stream + api: KubeApi entryConverter: PodLogEntryConverter tail?: number pod: KubernetesPod defaultNamespace: string since?: string -}) { - const api = await KubeApi.factory(log, ctx, provider) - +}): Promise { const logs = await getPodLogs({ api, namespace: pod.metadata?.namespace || defaultNamespace, @@ -111,82 +119,67 @@ async function readLogs({ return _log.split("\n").map((line) => { line = line.trimEnd() const res = { containerName } - try { - const [timestampStr, msg] = splitFirst(line, " ") - const timestamp = moment(timestampStr).toDate() - return entryConverter({ ...res, timestamp, msg }) - } catch { - return entryConverter({ ...res, msg: line }) - } + const { timestamp, msg } = parseTimestampAndMessage(line) + return entryConverter({ ...res, timestamp, msg }) }) }) - for (const line of sortBy(allLines, "timestamp")) { - void stream.write(line) - } + return sortBy(allLines, "timestamp") } -type ConnectionStatus = "connected" | "error" | "closed" +type ConnectionStatus = "connecting" | "connected" | "timed-out" | "error" | "closed" +const disconnectedStatuses: ConnectionStatus[] = ["timed-out", "error", "closed"] +interface LastLogEntries { + messages: string[] + timestamp: Date +} interface LogConnection { pod: KubernetesPod containerName: string namespace: string - request: request.Request status: ConnectionStatus + shouldRetry: boolean + request?: request.Request + timeout?: NodeJS.Timeout + + // for reconnect & deduplication logic + lastLogEntries?: LastLogEntries + previousConnectionLastLogEntries?: LastLogEntries } interface LogOpts { tail?: number since?: string /** - * If set to null, does not limit the number of bytes. This parameter is made mandatory, so that the usage site - * makes a deliberate and informed choice about it. + * Maximum age of logs to fetch on retry attempts. * - * From the k8s javascript client library docs: + * Can be useful in case you don't want to fetch the complete history of logs on retry attempts, for example when the + * we don't care about completeness, for example in `garden logs --follow`. * - * "If set, the number of bytes to read from the server before terminating the log output. This may not display a - * complete final line of logging, and may return slightly more or slightly less than the specified limit."" + * By default the LogFollower will try to fetch all the logs (unless the amount the app logged between retries exceeds + * maxLogLinesInMemory). */ - limitBytes: number | null + sinceOnRetry?: string } const defaultRetryIntervalMs = 10000 -/** - * The maximum number of streamed entries to keep around to compare incoming entries against for deduplication - * purposes. - * - * One such buffer is maintained for each container for each resource the `K8sLogFollower` instance - * is following (and deduplication is performed separately for each followed container). - * - * Deduplication is needed e.g. when the connection with a container is lost and reestablished, and recent logs are - * re-fetched. Some of those log entries may have the same timestamp and message as recently streamed entries, - * and not re-streaming them if they match an entry in the deduplication buffer is usually the desired behavior - * (since it prevents duplicate log lines). - * - * The deduplication buffer size should be kept relatively small, since a large buffer adds a slight delay before - * entries are streamed. - */ -const defaultDeduplicationBufferSize = 500 - /** * A helper class for following logs and managing the logs connections. * * The class operates kind of like a control loop, fetching the state of all pods for a given service at * an interval, comparing the result against current active connections and attempting re-connects as needed. */ -export class K8sLogFollower { +export class K8sLogFollower { private connections: { [key: string]: LogConnection } private stream: Stream private entryConverter: PodLogEntryConverter private k8sApi: KubeApi private log: LogEntry - private deduplicationBufferSize: number - private deduplicationBuffers: { [key: string]: { msg: string; time: number }[] } private defaultNamespace: string private resources: KubernetesResource[] - private intervalId: NodeJS.Timer | null + private timeoutId?: NodeJS.Timer | null private resolve: ((val: unknown) => void) | null private retryIntervalMs: number @@ -196,7 +189,6 @@ export class K8sLogFollower { defaultNamespace, k8sApi, log, - deduplicationBufferSize = defaultDeduplicationBufferSize, resources, retryIntervalMs = defaultRetryIntervalMs, }: { @@ -204,7 +196,6 @@ export class K8sLogFollower { entryConverter: PodLogEntryConverter k8sApi: KubeApi log: LogEntry - deduplicationBufferSize?: number defaultNamespace: string resources: KubernetesResource[] retryIntervalMs?: number @@ -214,13 +205,10 @@ export class K8sLogFollower { this.connections = {} this.k8sApi = k8sApi this.log = log - this.deduplicationBufferSize = deduplicationBufferSize this.defaultNamespace = defaultNamespace this.resources = resources - this.intervalId = null this.resolve = null this.retryIntervalMs = retryIntervalMs - this.deduplicationBuffers = {} } /** @@ -228,11 +216,19 @@ export class K8sLogFollower { * until outside code calls the close method. */ public async followLogs(opts: LogOpts) { - await this.createConnections(opts) + // make sure that createConnections is never called concurrently (wait for it to finish, then wait retryIntervalMs) + const followLoop = async () => { + try { + await this.createConnections(opts) + } finally { + // if timeoutId is null, close() has been called and we should stop the loop. + if (this.timeoutId !== null) { + this.timeoutId = setTimeout(followLoop, this.retryIntervalMs) + } + } + } - this.intervalId = setInterval(async () => { - await this.createConnections(opts) - }, this.retryIntervalMs) + await followLoop() return new Promise((resolve, _reject) => { this.resolve = resolve @@ -240,39 +236,114 @@ export class K8sLogFollower { } /** - * Cleans up all active network requests and resolves the promise that was created - * when the logs following was started. + * Cleans up all active network requests and resolves the promise that was created when the logs following + * was started. */ public close() { - if (this.intervalId) { - clearInterval(this.intervalId) - this.intervalId = null + this.clearConnections() + if (this.resolve) { + this.resolve({}) } - Object.values(this.connections).forEach((conn) => { + } + + /** + * Same as `close`, but also fetches the last several seconds of logs and streams any missing entries + * (in case any were missing). + */ + public async closeAndFlush() { + await this.flushFinalLogs() + this.close() + } + + private clearConnections() { + const conns = Object.values(this.connections) + if (this.timeoutId) { + clearTimeout(this.timeoutId) + this.timeoutId = null + } + conns.forEach((conn) => { try { - conn.request.abort() + conn.request?.abort() } catch {} }) - this.resolve && this.resolve({}) } - private handleConnectionClose(connectionId: string, status: ConnectionStatus, reason: string) { - const conn = this.connections[connectionId] - const prevStatus = conn.status - this.connections[connectionId] = { - ...conn, - status, + private async flushFinalLogs() { + this.log.debug("flushFinalLogs called...") + + // wait max 20 seconds + for (let i = 0; i < 20; i++) { + const allConnections = Object.values(this.connections) + + if (allConnections.length === 0) { + this.log.debug("flushFinalLogs: unexpectedly encountered empty list of connections") + } + + if (allConnections.every((c) => c.status === "closed" && c.shouldRetry === false)) { + this.log.debug("flushFinalLogs: all connections were finished. Success!") + return + } + await sleep(1000) } + this.log.warn( + "Failed to finish streaming logs: Timed out after 20 seconds. Some logs might be missing in the verbose log output, or in Garden Cloud." + ) + } + + private async handleConnectionClose(connection: LogConnection, status: ConnectionStatus, error: Error | string) { + clearTimeout(connection.timeout) + + const prevStatus = connection.status + connection.status = status + connection.previousConnectionLastLogEntries = connection.lastLogEntries + + const description = `container '${connection.containerName}' in Pod '${connection.pod.metadata.name}` + // There's no need to log the closed event that happens after an error event - if (!(prevStatus === "error" && status === "closed")) { - this.log.silly( - `` - ) + // Also no need to log the error event after a timed-out event + if (!(prevStatus === "error" && status === "closed") && !(prevStatus === "timed-out" && status === "error")) { + let reason = error + if (error instanceof HttpError) { + reason = `HTTP request failed with status ${error.statusCode}` + } + this.log.silly(``) + } + + /** + * Helper to stop retrying a connection. + * + * This means that we won't fetch logs again from this container, but createConnections will still + * be called and thus we will still notice when new Pods are added to the Deployment, for example when + * the user runs `garden deploy`. + */ + const stopRetrying = (why: string) => { + this.log.silly(``) + + connection.shouldRetry = false + } + + try { + const pod = await this.k8sApi.core.readNamespacedPodStatus(connection.pod.metadata.name, connection.namespace) + const podStatus = checkPodStatus(pod) + + const wasError = prevStatus === "error" || status === "error" + if (podStatus === "missing" && !wasError) { + stopRetrying("The pod was missing") + } else if (podStatus === "stopped" && !wasError) { + stopRetrying("The pod was stopped") + } else { + this.log.silly(``) + } + } catch (e) { + this.log.silly(``) + if (!(e instanceof KubernetesError)) { + throw e + } } } - private async createConnections({ tail, since, limitBytes }: LogOpts) { + private async createConnections({ tail, since, sinceOnRetry }: LogOpts) { let pods: KubernetesPod[] try { @@ -283,191 +354,239 @@ export class K8sLogFollower { return } const containers = pods.flatMap((pod) => { - const podContainers = pod.spec!.containers.map((c) => c.name).filter((n) => !n.match(/garden-/)) - return podContainers.map((containerName) => ({ + return containerNamesForLogging(pod).map((containerName) => ({ pod, containerName, })) }) if (containers.length === 0) { - this.log.debug(``) + this.log.debug( + `` + ) } await Bluebird.map(containers, async ({ pod, containerName }) => { - const connectionId = this.getConnectionId(pod, containerName) - // Cast type to make it explicit that it can be undefined - const conn = this.connections[connectionId] as LogConnection | undefined - const podName = pod.metadata.name + const connection = this.createConnectionIfMissing(pod, containerName) - if (conn && conn.status === "connected") { - // Nothing to do - return - } else if (conn) { + if (disconnectedStatuses.includes(connection.status) && connection.shouldRetry) { // The connection has been registered but is not active this.log.silly( - `` + `` ) + connection.status = "connecting" + } else { + // nothing to do + return } - const isRetry = !!conn?.status - const namespace = pod.metadata?.namespace || this.defaultNamespace + let req: request.Request + + const makeTimeout = () => { + const idleTimeout = 60000 + return setTimeout(async () => { + await this.handleConnectionClose( + connection, + "timed-out", + `Connection has been idle for ${idleTimeout / 1000} seconds.` + ) + req?.abort() + }, idleTimeout) + } const _self = this // The ts-stream library that we use for service logs entries doesn't properly implement // a writeable stream which the K8s API expects so we wrap it here. const writableStream = new Writable({ - write(chunk, _encoding, next) { + write(chunk: Buffer | undefined, _encoding: BufferEncoding, next) { + // clear the timeout, as we have activity on the socket + clearTimeout(connection.timeout) + connection.timeout = makeTimeout() + + // we do not use the encoding parameter, because it is invalid + // we can assume that we receive utf-8 encoded strings from k8s const line = chunk?.toString()?.trimEnd() if (!line) { + next() return } - let timestamp: Date | undefined - // Fallback to printing the full line if we can't parse the timestamp - let msg = line - try { - const parts = splitFirst(line, " ") - const dateInstance = new Date(parts[0]) - if (isValidDateInstance(dateInstance)) { - timestamp = dateInstance - } - msg = parts[1] - } catch {} - if (_self.deduplicate({ msg, podName, containerName, timestamp })) { + const { timestamp, msg } = parseTimestampAndMessage(line) + + // If we can't parse the timestamp, we encountered a kubernetes error + if (!timestamp) { + _self.log.debug( + `Encountered a log message without timestamp. This is probably an error message from the Kubernetes API: ${line}` + ) + } else if (_self.isDuplicate({ connection, timestamp, msg })) { + _self.log.silly(`Dropping duplicate log message: ${line}`) + } else { + _self.updateLastLogEntries({ connection, timestamp, msg }) _self.write({ msg, containerName, timestamp, }) } + next() }, }) - let req: request.Request try { - req = await this.getPodLogs({ - namespace, - containerName, - podName: pod.metadata.name, + req = await this.streamPodLogs({ + connection, stream: writableStream, - limitBytes, - tail, - timestamps: true, - // If we're retrying, presunmably because the connection was cut, we only want the latest logs. - // Otherwise we might end up fetching logs that have already been rendered. - since: isRetry ? "10s" : since, + tail: tail || Math.floor(maxLogLinesInMemory / containers.length), + since, + sinceOnRetry, }) this.log.silly(``) } catch (err) { - // Log the error and keep trying. - // If the error is "HTTP request failed" most likely the pod is just not up yet - if (err.message !== "HTTP request failed") { - this.log.debug( - `` - ) - } + await this.handleConnectionClose(connection, "error", err) return } - this.connections[connectionId] = { - namespace, - pod, - request: req, - containerName, - status: "connected", - } + connection.request = req + connection.status = "connected" + connection.timeout = makeTimeout() - req.on("error", (error) => this.handleConnectionClose(connectionId, "error", error.message)) - req.on("close", () => this.handleConnectionClose(connectionId, "closed", "Request closed")) - req.on("socket", (socket) => { - // If the socket is idle for 30 seconds, we kill the connection and reconnect. - const socketTimeoutMs = 30000 - socket.setTimeout(socketTimeoutMs) - socket.setKeepAlive(true, socketTimeoutMs / 2) - socket.on("error", (err) => { - this.handleConnectionClose(connectionId, "error", `Socket error: ${err.message}`) - }) - socket.on("timeout", () => { - this.log.debug(``) - // This will trigger a "close" event which we handle separately - socket.destroy() - }) - }) + req.on("error", async (error) => await this.handleConnectionClose(connection, "error", error)) + req.on("close", async () => await this.handleConnectionClose(connection, "closed", "Request closed")) }) } - private async getPodLogs({ - namespace, - podName, - containerName, + private async streamPodLogs({ + connection, stream, - limitBytes, tail, since, - timestamps, + sinceOnRetry, }: { - namespace: string - podName: string - containerName: string + connection: LogConnection stream: Writable - limitBytes: null | number - tail?: number - timestamps?: boolean + tail: number since?: string + sinceOnRetry?: string }) { - const logger = this.k8sApi.getLogger() - const sinceSeconds = since ? parseDuration(since, "s") || undefined : undefined - const opts = { - follow: true, + follow: true, // only works with follow true, as we receive chunks with multiple messages in the stream otherwise pretty: false, previous: false, - sinceSeconds, + timestamps: true, tailLines: tail, - timestamps, } - if (limitBytes) { - opts["limitBytes"] = limitBytes + // Get timestamp of last seen message from previous connection attempt, and only fetch logs since this time. + // This is because we've already streamed all the previous logs. This helps avoid unnecessary data transfer. + let sinceTime = connection.lastLogEntries?.timestamp.toISOString() + + // If this is a retry attempt and the sinceOnRetry parameter is set, we don't want to fetch old logs + if (sinceTime && sinceOnRetry) { + opts["sinceSeconds"] = parseDuration(sinceOnRetry, "s") || undefined + } + + // This is a retry attempt + else if (sinceTime) { + opts["sinceTime"] = sinceTime } - return logger.log(namespace, podName, containerName, stream, opts) + // If this is not a retry attempt and the since parameter has been set + else if (since) { + opts["sinceSeconds"] = parseDuration(since, "s") || undefined + } + + return this.k8sApi + .getLogger() + .log(connection.namespace, connection.pod.metadata.name, connection.containerName, stream, opts) } - private getConnectionId(pod: KubernetesPod, containerName: string) { - return `${pod.metadata.name}-${containerName}` + private createConnectionIfMissing(pod: KubernetesPod, containerName: string): LogConnection { + const connectionId = `${pod.metadata.name}-${containerName}` + + if (this.connections[connectionId] === undefined) { + this.connections[connectionId] = { + namespace: pod.metadata.namespace || this.defaultNamespace, + pod, + containerName, + status: "closed", + shouldRetry: true, + } + } + + return this.connections[connectionId] } /** - * Returns `false` if an entry with the same message and timestamp has already been buffered for the given `podName` - * and `containerNamee`. Returns `true` otherwise. + * Returns `true` if the message is considered a duplicate, and `false` if otherwise. + * + * This works by comparing the message timestamp with the lastLogEntries of the previous connection attempt + * (`connection.previousConnectionLastLogEntries`), and if the timestamp is equal by comparing the messages + * themselves. */ - private deduplicate({ + private isDuplicate({ + connection, + timestamp, msg, - podName, - containerName, - timestamp = new Date(), }: { + connection: LogConnection + timestamp: Date msg: string - podName: string - containerName?: string - timestamp?: Date }): boolean { - const key = `${podName}.${containerName}` - const buffer = this.deduplicationBuffers[key] || [] - const time = timestamp ? timestamp.getTime() : 0 - const duplicate = !!buffer.find((e) => e.msg === msg && e.time === time) - if (duplicate) { + // get last messages from previous connection attempt + const beforeReconnect = connection.previousConnectionLastLogEntries + + if (!beforeReconnect) { + // This can't be a duplicate, because this is not a reconnect attempt return false } - buffer.push({ msg, time }) - if (buffer.length > this.deduplicationBufferSize) { - buffer.shift() + + // lastMessages is an Array, because there might be multiple messages for a given time stamp. + const lastMessages = beforeReconnect.messages + const lastTime = beforeReconnect.timestamp.getTime() + + const time = timestamp.getTime() + + // message is a duplicate because we've seen a more recent message in the previous connection already + if (time < lastTime) { + return true + } + + // This message is a duplicate if we've seen it in the previous connection already + if (time === lastTime) { + return lastMessages.includes(msg) + } + + // This message has a more recent timestamp than the last message seen in the previous connection + return false + } + + /** + * Maintains `connection.lastLogEntries` + * + * This method makes sure that the `lastLogEntries` of the `connection` always contains + * the log messages with the most recently seen timestamp. + */ + private updateLastLogEntries({ + connection, + timestamp, + msg, + }: { + connection: LogConnection + timestamp: Date + msg: string + }) { + const time = timestamp.getTime() + const lastTime = connection.lastLogEntries?.timestamp.getTime() + + if (!connection.lastLogEntries || time !== lastTime) { + connection.lastLogEntries = { messages: [msg], timestamp } + } else { + // we got another message for the same timestamp + connection.lastLogEntries.messages.push(msg) } - this.deduplicationBuffers[key] = buffer - return true } private write({ @@ -499,7 +618,29 @@ export interface PodLogEntryConverterParams { timestamp?: Date } -export type PodLogEntryConverter = (p: PodLogEntryConverterParams) => T +function parseTimestampAndMessage(line: string): { msg: string; timestamp?: Date } { + let timestamp: Date | null = null + // Fallback to printing the full line if we can't parse the timestamp + let msg = line + try { + const parts = splitFirst(line, " ") + const dateInstance = new Date(parts[0]) + if (isValidDateInstance(dateInstance)) { + timestamp = dateInstance + } + msg = parts[1] + } catch {} + return timestamp ? { msg, timestamp } : { msg } +} + +/** + * Returns a list of container names from which to fetch logs. Ignores sidecar containers injected by Garden. + */ +function containerNamesForLogging(pod: KubernetesPod): string[] { + return pod.spec!.containers.map((c) => c.name).filter((n) => !n.match(/^garden-/)) +} + +export type PodLogEntryConverter = (p: PodLogEntryConverterParams) => T export const makeServiceLogEntry: (serviceName: string) => PodLogEntryConverter = (serviceName) => { return ({ timestamp, msg, level, containerName }: PodLogEntryConverterParams) => ({ diff --git a/core/src/plugins/kubernetes/run.ts b/core/src/plugins/kubernetes/run.ts index e327ba3a70..1032a07ee3 100644 --- a/core/src/plugins/kubernetes/run.ts +++ b/core/src/plugins/kubernetes/run.ts @@ -819,7 +819,9 @@ export class PodRunner extends PodRunnerParams { }) return new K8sLogFollower({ defaultNamespace: this.namespace, - retryIntervalMs: 10, + // We use 1 second in the PodRunner, because the task / test will only finish once the LogFollower finished. + // If this is too low, we waste resources (network/cpu) – if it's too high we add extra time to the run execution. + retryIntervalMs: 1000, stream, log, entryConverter: makeRunLogEntry, @@ -845,8 +847,7 @@ export class PodRunner extends PodRunnerParams { const startedAt = new Date() const logsFollower = this.prepareLogsFollower(params) - const limitBytes = 1000 * 1024 // 1MB - logsFollower.followLogs({ limitBytes }).catch((_err) => { + logsFollower.followLogs({}).catch((_err) => { // Errors in `followLogs` are logged there, so all we need to do here is to ensure that the follower is closed. logsFollower.close() }) @@ -873,8 +874,10 @@ export class PodRunner extends PodRunnerParams { success: exitCode === undefined || exitCode === 0, } } finally { - logsFollower.close() + log.debug("Closing logsFollower...") + await logsFollower.closeAndFlush() if (remove) { + log.debug("Stopping PodRunner") await this.stop() } } diff --git a/core/src/plugins/kubernetes/util.ts b/core/src/plugins/kubernetes/util.ts index 8bbb2fa6e7..332b789afb 100644 --- a/core/src/plugins/kubernetes/util.ts +++ b/core/src/plugins/kubernetes/util.ts @@ -778,3 +778,7 @@ export function renderPodEvents(events: CoreV1Event[]): string { return text } + +export function summarize(resources: KubernetesResource[]) { + return resources.map((r) => `${r.kind} ${r.metadata.name}`).join(", ") +} diff --git a/core/test/integ/src/plugins/kubernetes/container/logs.ts b/core/test/integ/src/plugins/kubernetes/container/logs.ts index 3bcc5a2ff6..a487e2ca56 100644 --- a/core/test/integ/src/plugins/kubernetes/container/logs.ts +++ b/core/test/integ/src/plugins/kubernetes/container/logs.ts @@ -138,7 +138,7 @@ describe("kubernetes", () => { setTimeout(() => { logsFollower.close() }, 5000) - await logsFollower.followLogs({ limitBytes: null }) + await logsFollower.followLogs({}) expect(ctx.log.toString()).to.match(/Connected to container 'simple-service'/) @@ -215,7 +215,7 @@ describe("kubernetes", () => { // Start following logs even when no services is deployed // (we don't wait for the Promise since it won't resolve unless we close the connection) // tslint:disable-next-line: no-floating-promises - logsFollower.followLogs({ limitBytes: null }) + logsFollower.followLogs({}) await sleep(1500) // Deploy the service @@ -225,19 +225,19 @@ describe("kubernetes", () => { logsFollower.close() const missingContainerRegex = new RegExp( - `` + `` ) const connectedRegex = new RegExp("