Skip to content

Commit

Permalink
fix(k8s): make sure LogFollower only connects once
Browse files Browse the repository at this point in the history
I noticed the following log message when I increased latency & packet
loss in Network Link Conditioner:

```
[silly] <Not connected to container vault in Pod vault-0. Connection status is connecting>
```

This means the connection is not established yet, but the LogFollower is
connecting yet again (which causes a vicious cycle and makes the
internet connection even worse).

This is probably the root cause for the issue described in #3586.

With this bug fixed, I am 100% certain this PR Fixes #3586

Co-authored-by: Thorarinn Sigurdsson <[email protected]>
  • Loading branch information
stefreak and thsig committed Mar 7, 2023
1 parent 931d19b commit 5c753ff
Showing 1 changed file with 32 additions and 28 deletions.
60 changes: 32 additions & 28 deletions core/src/plugins/kubernetes/logs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { splitFirst, isValidDateInstance, sleep } from "../../util/util"
import { Writable } from "stream"
import request from "request"
import { LogLevel } from "../../logger/logger"
import { clearTimeout } from "timers"

// When not following logs, the entire log is read into memory and sorted.
// We therefore set a maximum on the number of lines we fetch.
Expand Down Expand Up @@ -123,6 +124,7 @@ async function readLogs<T extends LogEntryBase>({
}

type ConnectionStatus = "connecting" | "connected" | "timed-out" | "error" | "closed"
const disconnectedStatuses: ConnectionStatus[] = ["timed-out", "error", "closed"]

interface LastLogEntries {
messages: string[]
Expand Down Expand Up @@ -163,7 +165,7 @@ export class K8sLogFollower<T extends LogEntryBase> {
private log: LogEntry
private defaultNamespace: string
private resources: KubernetesResource<BaseResource>[]
private intervalId: NodeJS.Timer | null
private timeoutId: NodeJS.Timer | null
private resolve: ((val: unknown) => void) | null
private retryIntervalMs: number

Expand Down Expand Up @@ -191,7 +193,7 @@ export class K8sLogFollower<T extends LogEntryBase> {
this.log = log
this.defaultNamespace = defaultNamespace
this.resources = resources
this.intervalId = null
this.timeoutId = null
this.resolve = null
this.retryIntervalMs = retryIntervalMs
}
Expand All @@ -201,11 +203,16 @@ export class K8sLogFollower<T extends LogEntryBase> {
* until outside code calls the close method.
*/
public async followLogs(opts: LogOpts) {
await this.createConnections(opts)
// make sure that createConnections is never called concurrently (wait for it to finish, then wait retryIntervalMs)
const followLoop = async () => {
try {
await this.createConnections(opts)
} finally {
this.timeoutId = setTimeout(followLoop, this.retryIntervalMs)
}
}

this.intervalId = setInterval(async () => {
await this.createConnections(opts)
}, this.retryIntervalMs)
await followLoop()

return new Promise((resolve, _reject) => {
this.resolve = resolve
Expand All @@ -218,7 +225,9 @@ export class K8sLogFollower<T extends LogEntryBase> {
*/
public close() {
this.clearConnections()
this.resolve && this.resolve({})
if (this.resolve) {
this.resolve({})
}
}

/**
Expand All @@ -232,9 +241,9 @@ export class K8sLogFollower<T extends LogEntryBase> {

private clearConnections() {
const conns = Object.values(this.connections)
if (this.intervalId) {
clearInterval(this.intervalId)
this.intervalId = null
if (this.timeoutId) {
clearTimeout(this.timeoutId)
this.timeoutId = null
}
conns.forEach((conn) => {
try {
Expand Down Expand Up @@ -338,16 +347,17 @@ export class K8sLogFollower<T extends LogEntryBase> {
await Bluebird.map(containers, async ({ pod, containerName }) => {
const connection = this.createConnectionIfMissing(pod, containerName)

if (connection && (connection.shouldRetry === false || connection.status === "connected")) {
// Nothing to do
if (disconnectedStatuses.includes(connection.status) && connection.shouldRetry) {
// The connection has been registered but is not active
this.log.silly(
`<Connecting to container ${connection.containerName} in Pod ${connection.pod.metadata.name}, because current connection status is ${connection.status}>`
)
connection.status = "connecting"
} else {
// nothing to do
return
}

// The connection has been registered but is not active
this.log.silly(
`<Not connected to container ${connection.containerName} in Pod ${connection.pod.metadata.name}. Connection status is ${connection.status}>`
)

let req: request.Request

const makeTimeout = () => {
Expand Down Expand Up @@ -411,13 +421,7 @@ export class K8sLogFollower<T extends LogEntryBase> {
})
this.log.silly(`<Connected to container '${containerName}' in Pod '${pod.metadata.name}'>`)
} catch (err) {
// Log the error and keep trying.
// If the error is "HTTP request failed" most likely the pod is just not up yet
if (err.message !== "HTTP request failed") {
this.log.debug(
`<Getting logs for container '${containerName}' in Pod '${pod.metadata.name}' failed with error: ${err?.message}>`
)
}
await this.handleConnectionClose(connection, "error", err.message)
return
}
connection.request = req
Expand All @@ -440,8 +444,6 @@ export class K8sLogFollower<T extends LogEntryBase> {
tail: number
sinceSeconds?: string
}) {
const logger = this.k8sApi.getLogger()

const opts = {
follow: true, // only works with follow true, as we receive chunks with multiple messages in the stream otherwise
pretty: false,
Expand All @@ -463,7 +465,9 @@ export class K8sLogFollower<T extends LogEntryBase> {
opts["sinceSeconds"] = parseDuration(sinceSeconds, "s") || undefined
}

return logger.log(connection.namespace, connection.pod.metadata.name, connection.containerName, stream, opts)
return this.k8sApi
.getLogger()
.log(connection.namespace, connection.pod.metadata.name, connection.containerName, stream, opts)
}

private createConnectionIfMissing(pod: KubernetesPod, containerName: string): LogConnection {
Expand All @@ -474,7 +478,7 @@ export class K8sLogFollower<T extends LogEntryBase> {
namespace: pod.metadata.namespace || this.defaultNamespace,
pod,
containerName,
status: "connecting",
status: "closed",
shouldRetry: true,
}
}
Expand Down

0 comments on commit 5c753ff

Please sign in to comment.