Skip to content

Commit

Permalink
fix(k8s): avoid concurrency issues when creating port forwards
Browse files Browse the repository at this point in the history
  • Loading branch information
edvald committed Jun 13, 2019
1 parent 78e03b7 commit 169aa3c
Showing 1 changed file with 36 additions and 30 deletions.
66 changes: 36 additions & 30 deletions garden-service/src/plugins/kubernetes/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { get, flatten, uniqBy } from "lodash"
import { ChildProcess } from "child_process"
import { V1Pod } from "@kubernetes/client-node"
import getPort = require("get-port")
const AsyncLock = require("async-lock")

import { KubernetesResource } from "./types"
import { splitLast } from "../../util/util"
Expand Down Expand Up @@ -105,6 +106,7 @@ export interface PortForward {
}

const registeredPortForwards: { [key: string]: PortForward } = {}
const portForwardRegistrationLock = new AsyncLock()

registerCleanupFunction("kill-port-forward-procs", () => {
for (const { proc } of Object.values(registeredPortForwards)) {
Expand All @@ -115,47 +117,51 @@ registerCleanupFunction("kill-port-forward-procs", () => {
export async function getPortForward(
ctx: PluginContext, log: LogEntry, namespace: string, targetDeployment: string, port: number,
): Promise<PortForward> {
let localPort: number

// Using lock here to avoid concurrency issues (multiple parallel requests for same forward).
const key = `${targetDeployment}:${port}`
const registered = registeredPortForwards[key]

if (registered && !registered.proc.killed) {
log.debug(`Reusing local port ${registered.localPort} for ${targetDeployment} container`)
return registered
}
return portForwardRegistrationLock.acquire("register-port-forward", (async () => {
let localPort: number

const k8sCtx = <KubernetesPluginContext>ctx
const registered = registeredPortForwards[key]

// Forward random free local port to the remote rsync container.
localPort = await getPort()
const portMapping = `${localPort}:${port}`
if (registered && !registered.proc.killed) {
log.debug(`Reusing local port ${registered.localPort} for ${targetDeployment} container`)
return registered
}

log.debug(`Forwarding local port ${localPort} to ${targetDeployment} container port ${port}`)
const k8sCtx = <KubernetesPluginContext>ctx

// TODO: use the API directly instead of kubectl (need to reverse engineer kubectl a bit to get how that works)
const portForwardArgs = ["port-forward", targetDeployment, portMapping]
log.silly(`Running 'kubectl ${portForwardArgs.join(" ")}'`)
// Forward random free local port to the remote rsync container.
localPort = await getPort()
const portMapping = `${localPort}:${port}`

const proc = await kubectl.spawn({ log, context: k8sCtx.provider.config.context, namespace, args: portForwardArgs })
log.debug(`Forwarding local port ${localPort} to ${targetDeployment} container port ${port}`)

return new Promise((resolve) => {
proc.on("error", (error) => {
!proc.killed && proc.kill()
throw error
})
// TODO: use the API directly instead of kubectl (need to reverse engineer kubectl a bit to get how that works)
const portForwardArgs = ["port-forward", targetDeployment, portMapping]
log.silly(`Running 'kubectl ${portForwardArgs.join(" ")}'`)

proc.stdout!.on("data", (line) => {
// This is unfortunately the best indication that we have that the connection is up...
log.silly(`[${targetDeployment} port forwarder] ${line}`)
const proc = await kubectl.spawn({ log, context: k8sCtx.provider.config.context, namespace, args: portForwardArgs })

if (line.toString().includes("Forwarding from ")) {
const portForward = { proc, localPort }
registeredPortForwards[key] = portForward
resolve(portForward)
}
return new Promise((resolve) => {
proc.on("error", (error) => {
!proc.killed && proc.kill()
throw error
})

proc.stdout!.on("data", (line) => {
// This is unfortunately the best indication that we have that the connection is up...
log.silly(`[${targetDeployment} port forwarder] ${line}`)

if (line.toString().includes("Forwarding from ")) {
const portForward = { proc, localPort }
registeredPortForwards[key] = portForward
resolve(portForward)
}
})
})
})
}))
}

/**
Expand Down

0 comments on commit 169aa3c

Please sign in to comment.