From febeb521cef0851434b797dd121f8bf0504f5067 Mon Sep 17 00:00:00 2001 From: Mario Molina Date: Wed, 6 Oct 2021 02:04:50 -0500 Subject: [PATCH] :tada: Configurable job pull image policy in k8s (#6827) --- .../main/java/io/airbyte/config/Configs.java | 2 + .../java/io/airbyte/config/EnvConfigs.java | 7 ++++ .../java/io/airbyte/workers/WorkerUtils.java | 39 ++++++++----------- .../workers/process/KubePodProcess.java | 4 ++ .../workers/process/KubeProcessFactory.java | 1 + kube/overlays/dev-integration-test/.env | 3 ++ kube/overlays/dev/.env | 3 ++ .../overlays/stable-with-resource-limits/.env | 3 ++ kube/overlays/stable/.env | 3 ++ kube/resources/worker.yaml | 5 +++ 10 files changed, 47 insertions(+), 23 deletions(-) diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java index ac45bb2258992..33ffd0e16e719 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java @@ -67,6 +67,8 @@ public interface Configs { WorkspaceRetentionConfig getWorkspaceRetentionConfig(); + String getJobImagePullPolicy(); + List getWorkerPodTolerations(); Map getWorkerNodeSelectors(); diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java index f1eb954221fd0..5ce2d5b73159c 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -48,6 +48,7 @@ public class EnvConfigs implements Configs { public static final String CONFIG_DATABASE_URL = "CONFIG_DATABASE_URL"; public static final String RUN_DATABASE_MIGRATION_ON_STARTUP = "RUN_DATABASE_MIGRATION_ON_STARTUP"; public static final String WEBAPP_URL = "WEBAPP_URL"; + public static final String JOB_IMAGE_PULL_POLICY = "JOB_IMAGE_PULL_POLICY"; public static final String WORKER_POD_TOLERATIONS = "WORKER_POD_TOLERATIONS"; public static final String WORKER_POD_NODE_SELECTORS = "WORKER_POD_NODE_SELECTORS"; public static final String MAX_SYNC_JOB_ATTEMPTS = "MAX_SYNC_JOB_ATTEMPTS"; @@ -76,6 +77,7 @@ public class EnvConfigs implements Configs { private static final String DEFAULT_KUBE_NAMESPACE = "default"; private static final String DEFAULT_RESOURCE_REQUIREMENT_CPU = null; private static final String DEFAULT_RESOURCE_REQUIREMENT_MEMORY = null; + private static final String DEFAULT_JOB_IMAGE_PULL_POLICY = "IfNotPresent"; private static final String SECRET_STORE_GCP_PROJECT_ID = "SECRET_STORE_GCP_PROJECT_ID"; private static final String SECRET_STORE_GCP_CREDENTIALS = "SECRET_STORE_GCP_CREDENTIALS"; private static final long DEFAULT_MINIMUM_WORKSPACE_RETENTION_DAYS = 1; @@ -278,6 +280,11 @@ private WorkerPodToleration workerPodToleration(final String tolerationStr) { } } + @Override + public String getJobImagePullPolicy() { + return getEnvOrDefault(JOB_IMAGE_PULL_POLICY, DEFAULT_JOB_IMAGE_PULL_POLICY); + } + /** * Returns worker pod tolerations parsed from its own environment variable. The value of the env is * a string that represents one or more tolerations. diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java index 2175281cfa813..0901552abe028 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java @@ -5,6 +5,7 @@ package io.airbyte.workers; import com.google.common.annotations.VisibleForTesting; +import io.airbyte.config.Configs; import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.EnvConfigs; import io.airbyte.config.ResourceRequirements; @@ -28,19 +29,22 @@ // TODO:(Issue-4824): Figure out how to log Docker process information. public class WorkerUtils { - public static final List DEFAULT_WORKER_POD_TOLERATIONS = initWorkerPodTolerations(); - public static final Map DEFAULT_WORKER_POD_NODE_SELECTORS = initWorkerPodNodeSelectors(); + private static final Logger LOGGER = LoggerFactory.getLogger(WorkerUtils.class); + private static final Configs CONFIGS = new EnvConfigs(); + public static final ResourceRequirements DEFAULT_RESOURCE_REQUIREMENTS = initResourceRequirements(); - public static final String DEFAULT_JOBS_IMAGE_PULL_SECRET = new EnvConfigs().getJobsImagePullSecret(); + public static final List DEFAULT_WORKER_POD_TOLERATIONS = CONFIGS.getWorkerPodTolerations(); + public static final Map DEFAULT_WORKER_POD_NODE_SELECTORS = CONFIGS.getWorkerNodeSelectors(); + public static final String DEFAULT_JOBS_IMAGE_PULL_SECRET = CONFIGS.getJobsImagePullSecret(); + public static final String DEFAULT_JOB_IMAGE_PULL_POLICY = CONFIGS.getJobImagePullPolicy(); - private static final Logger LOGGER = LoggerFactory.getLogger(WorkerUtils.class); public static void gentleClose(final Process process, final long timeout, final TimeUnit timeUnit) { if (process == null) { return; } - if (new EnvConfigs().getWorkerEnvironment().equals(WorkerEnvironment.KUBERNETES)) { + if (CONFIGS.getWorkerEnvironment().equals(WorkerEnvironment.KUBERNETES)) { LOGGER.debug("Gently closing process {}", process.info().commandLine().get()); } @@ -96,7 +100,7 @@ static void gentleCloseWithHeartbeat(final Process process, final BiConsumer forceShutdown) { while (process.isAlive() && heartbeatMonitor.isBeating()) { try { - if (new EnvConfigs().getWorkerEnvironment().equals(WorkerEnvironment.KUBERNETES)) { + if (CONFIGS.getWorkerEnvironment().equals(WorkerEnvironment.KUBERNETES)) { LOGGER.debug("Gently closing process {} with heartbeat..", process.info().commandLine().get()); } @@ -108,7 +112,7 @@ static void gentleCloseWithHeartbeat(final Process process, if (process.isAlive()) { try { - if (new EnvConfigs().getWorkerEnvironment().equals(WorkerEnvironment.KUBERNETES)) { + if (CONFIGS.getWorkerEnvironment().equals(WorkerEnvironment.KUBERNETES)) { LOGGER.debug("Gently closing process {} without heartbeat..", process.info().commandLine().get()); } @@ -120,7 +124,7 @@ static void gentleCloseWithHeartbeat(final Process process, // if we were unable to exist gracefully, force shutdown... if (process.isAlive()) { - if (new EnvConfigs().getWorkerEnvironment().equals(WorkerEnvironment.KUBERNETES)) { + if (CONFIGS.getWorkerEnvironment().equals(WorkerEnvironment.KUBERNETES)) { LOGGER.debug("Force shutdown process {}..", process.info().commandLine().get()); } @@ -211,23 +215,12 @@ public static Path getJobRoot(Path workspaceRoot, String jobId, int attemptId) { .resolve(String.valueOf(attemptId)); } - private static List initWorkerPodTolerations() { - final EnvConfigs configs = new EnvConfigs(); - return configs.getWorkerPodTolerations(); - } - - private static Map initWorkerPodNodeSelectors() { - final EnvConfigs configs = new EnvConfigs(); - return configs.getWorkerNodeSelectors(); - } - private static ResourceRequirements initResourceRequirements() { - final EnvConfigs configs = new EnvConfigs(); return new ResourceRequirements() - .withCpuRequest(configs.getCpuRequest()) - .withCpuLimit(configs.getCpuLimit()) - .withMemoryRequest(configs.getMemoryRequest()) - .withMemoryLimit(configs.getMemoryLimit()); + .withCpuRequest(CONFIGS.getCpuRequest()) + .withCpuLimit(CONFIGS.getCpuLimit()) + .withMemoryRequest(CONFIGS.getMemoryRequest()) + .withMemoryLimit(CONFIGS.getMemoryLimit()); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java index a74a15c52a3b7..f3b83475c3357 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java @@ -153,6 +153,7 @@ private static Container getInit(boolean usesStdin, List mainVolume } private static Container getMain(String image, + String imagePullPolicy, boolean usesStdin, String entrypointOverride, List mainVolumeMounts, @@ -177,6 +178,7 @@ private static Container getMain(String image, final ContainerBuilder containerBuilder = new ContainerBuilder() .withName("main") .withImage(image) + .withImagePullPolicy(imagePullPolicy) .withCommand("sh", "-c", mainCommand) .withWorkingDir(CONFIG_DIR) .withVolumeMounts(mainVolumeMounts); @@ -246,6 +248,7 @@ public KubePodProcess(String processRunnerHost, String podName, String namespace, String image, + String imagePullPolicy, int stdoutLocalPort, int stderrLocalPort, String kubeHeartbeatUrl, @@ -309,6 +312,7 @@ public KubePodProcess(String processRunnerHost, Container init = getInit(usesStdin, List.of(pipeVolumeMount, configVolumeMount)); Container main = getMain( image, + imagePullPolicy, usesStdin, entrypointOverride, List.of(pipeVolumeMount, configVolumeMount, terminationVolumeMount), diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java index 96fbf7fe4dbe6..7167a6c5e20a9 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java @@ -118,6 +118,7 @@ public Process create(String jobId, podName, namespace, imageName, + WorkerUtils.DEFAULT_JOB_IMAGE_PULL_POLICY, stdoutLocalPort, stderrLocalPort, kubeHeartbeatUrl, diff --git a/kube/overlays/dev-integration-test/.env b/kube/overlays/dev-integration-test/.env index 33387554e72b6..ebea6835214e1 100644 --- a/kube/overlays/dev-integration-test/.env +++ b/kube/overlays/dev-integration-test/.env @@ -56,3 +56,6 @@ RESOURCE_MEMORY_LIMIT= # Worker pod tolerations and node selectors WORKER_POD_TOLERATIONS= WORKER_POD_NODE_SELECTORS= + +# Job image pull policy +JOB_IMAGE_PULL_POLICY= diff --git a/kube/overlays/dev/.env b/kube/overlays/dev/.env index 515eee87e3207..01cdbfe3926c4 100644 --- a/kube/overlays/dev/.env +++ b/kube/overlays/dev/.env @@ -58,3 +58,6 @@ RESOURCE_MEMORY_LIMIT= # Worker pod tolerations and node selectors WORKER_POD_TOLERATIONS= WORKER_POD_NODE_SELECTORS= + +# Job image pull policy +JOB_IMAGE_PULL_POLICY= diff --git a/kube/overlays/stable-with-resource-limits/.env b/kube/overlays/stable-with-resource-limits/.env index 71ace4cd8ab75..e8ddc8af5b778 100644 --- a/kube/overlays/stable-with-resource-limits/.env +++ b/kube/overlays/stable-with-resource-limits/.env @@ -58,3 +58,6 @@ RESOURCE_MEMORY_LIMIT= # Worker pod tolerations and node selectors WORKER_POD_TOLERATIONS= WORKER_POD_NODE_SELECTORS= + +# Job image pull policy +JOB_IMAGE_PULL_POLICY= diff --git a/kube/overlays/stable/.env b/kube/overlays/stable/.env index 71ace4cd8ab75..e8ddc8af5b778 100644 --- a/kube/overlays/stable/.env +++ b/kube/overlays/stable/.env @@ -58,3 +58,6 @@ RESOURCE_MEMORY_LIMIT= # Worker pod tolerations and node selectors WORKER_POD_TOLERATIONS= WORKER_POD_NODE_SELECTORS= + +# Job image pull policy +JOB_IMAGE_PULL_POLICY= diff --git a/kube/resources/worker.yaml b/kube/resources/worker.yaml index e2d0fd4125d4a..347bfd8a97090 100644 --- a/kube/resources/worker.yaml +++ b/kube/resources/worker.yaml @@ -179,6 +179,11 @@ spec: configMapKeyRef: name: airbyte-env key: WORKER_POD_NODE_SELECTORS + - name: JOB_IMAGE_PULL_POLICY + valueFrom: + configMapKeyRef: + name: airbyte-env + key: JOB_IMAGE_PULL_POLICY ports: - containerPort: 9000 # for heartbeat server - containerPort: 9001 # start temporal worker port pool