From 169aa3c1708697c74e9ec1ad85c1ed3dc88ef3c7 Mon Sep 17 00:00:00 2001 From: Jon Edvald Date: Mon, 10 Jun 2019 19:31:20 +0200 Subject: [PATCH] fix(k8s): avoid concurrency issues when creating port forwards --- garden-service/src/plugins/kubernetes/util.ts | 66 ++++++++++--------- 1 file changed, 36 insertions(+), 30 deletions(-) diff --git a/garden-service/src/plugins/kubernetes/util.ts b/garden-service/src/plugins/kubernetes/util.ts index 8e1814284d..03813ec606 100644 --- a/garden-service/src/plugins/kubernetes/util.ts +++ b/garden-service/src/plugins/kubernetes/util.ts @@ -11,6 +11,7 @@ import { get, flatten, uniqBy } from "lodash" import { ChildProcess } from "child_process" import { V1Pod } from "@kubernetes/client-node" import getPort = require("get-port") +const AsyncLock = require("async-lock") import { KubernetesResource } from "./types" import { splitLast } from "../../util/util" @@ -105,6 +106,7 @@ export interface PortForward { } const registeredPortForwards: { [key: string]: PortForward } = {} +const portForwardRegistrationLock = new AsyncLock() registerCleanupFunction("kill-port-forward-procs", () => { for (const { proc } of Object.values(registeredPortForwards)) { @@ -115,47 +117,51 @@ registerCleanupFunction("kill-port-forward-procs", () => { export async function getPortForward( ctx: PluginContext, log: LogEntry, namespace: string, targetDeployment: string, port: number, ): Promise { - let localPort: number - + // Using lock here to avoid concurrency issues (multiple parallel requests for same forward). const key = `${targetDeployment}:${port}` - const registered = registeredPortForwards[key] - if (registered && !registered.proc.killed) { - log.debug(`Reusing local port ${registered.localPort} for ${targetDeployment} container`) - return registered - } + return portForwardRegistrationLock.acquire("register-port-forward", (async () => { + let localPort: number - const k8sCtx = ctx + const registered = registeredPortForwards[key] - // Forward random free local port to the remote rsync container. - localPort = await getPort() - const portMapping = `${localPort}:${port}` + if (registered && !registered.proc.killed) { + log.debug(`Reusing local port ${registered.localPort} for ${targetDeployment} container`) + return registered + } - log.debug(`Forwarding local port ${localPort} to ${targetDeployment} container port ${port}`) + const k8sCtx = ctx - // TODO: use the API directly instead of kubectl (need to reverse engineer kubectl a bit to get how that works) - const portForwardArgs = ["port-forward", targetDeployment, portMapping] - log.silly(`Running 'kubectl ${portForwardArgs.join(" ")}'`) + // Forward random free local port to the remote rsync container. + localPort = await getPort() + const portMapping = `${localPort}:${port}` - const proc = await kubectl.spawn({ log, context: k8sCtx.provider.config.context, namespace, args: portForwardArgs }) + log.debug(`Forwarding local port ${localPort} to ${targetDeployment} container port ${port}`) - return new Promise((resolve) => { - proc.on("error", (error) => { - !proc.killed && proc.kill() - throw error - }) + // TODO: use the API directly instead of kubectl (need to reverse engineer kubectl a bit to get how that works) + const portForwardArgs = ["port-forward", targetDeployment, portMapping] + log.silly(`Running 'kubectl ${portForwardArgs.join(" ")}'`) - proc.stdout!.on("data", (line) => { - // This is unfortunately the best indication that we have that the connection is up... - log.silly(`[${targetDeployment} port forwarder] ${line}`) + const proc = await kubectl.spawn({ log, context: k8sCtx.provider.config.context, namespace, args: portForwardArgs }) - if (line.toString().includes("Forwarding from ")) { - const portForward = { proc, localPort } - registeredPortForwards[key] = portForward - resolve(portForward) - } + return new Promise((resolve) => { + proc.on("error", (error) => { + !proc.killed && proc.kill() + throw error + }) + + proc.stdout!.on("data", (line) => { + // This is unfortunately the best indication that we have that the connection is up... + log.silly(`[${targetDeployment} port forwarder] ${line}`) + + if (line.toString().includes("Forwarding from ")) { + const portForward = { proc, localPort } + registeredPortForwards[key] = portForward + resolve(portForward) + } + }) }) - }) + })) } /**