Skip to content

Commit

Permalink
🎉 Add labels to Kube jobs. Allow injection of Kube Image Pull Secrets. (
Browse files Browse the repository at this point in the history
#6368)

As title.

The labeling is intended to help one differentiate between a pod created for check, spec, discover, sync and normalise. The main impetus for this change is Cloud cost tracking, though this can help with operational debugging too.

The injection of Kube image pull secret enables pulling jobs from private docker repositories that have docker authentication.
  • Loading branch information
davinchia authored Sep 28, 2021
1 parent 802a818 commit 3369237
Show file tree
Hide file tree
Showing 15 changed files with 80 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public interface Configs {

String getSubmitterNumThreads();

String getJobsImagePullSecret();

// Resources
String getCpuRequest();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class EnvConfigs implements Configs {
private static final String RESOURCE_CPU_LIMIT = "RESOURCE_CPU_LIMIT";
private static final String RESOURCE_MEMORY_REQUEST = "RESOURCE_MEMORY_REQUEST";
private static final String RESOURCE_MEMORY_LIMIT = "RESOURCE_MEMORY_LIMIT";
private static final String JOBS_IMAGE_PULL_SECRET = "JOBS_IMAGE_PULL_SECRET";

// defaults
private static final String DEFAULT_SPEC_CACHE_BUCKET = "io-airbyte-cloud-spec-cache";
Expand Down Expand Up @@ -382,6 +383,16 @@ public String getMemoryLimit() {
return getEnvOrDefault(RESOURCE_MEMORY_LIMIT, DEFAULT_RESOURCE_REQUIREMENT_MEMORY);
}

/**
* Returns the name of the secret to be used when pulling down docker images for jobs. Automatically
* injected in the KubePodProcess class and used in the job pod templates. The empty string is a
* no-op value.
*/
@Override
public String getJobsImagePullSecret() {
return getEnvOrDefault(JOBS_IMAGE_PULL_SECRET, "");
}

@Override
public String getS3LogBucket() {
return getEnvOrDefault(LogClientSingleton.S3_LOG_BUCKET, "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.airbyte.config.OperatorDbt;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.workers.normalization.NormalizationRunner;
import io.airbyte.workers.process.KubeProcessFactory;
import io.airbyte.workers.process.ProcessFactory;
import java.nio.file.Path;
import java.util.ArrayList;
Expand Down Expand Up @@ -72,7 +73,9 @@ public boolean transform(String jobId, int attempt, Path jobRoot, JsonNode confi
}
Collections.addAll(dbtArguments, Commandline.translateCommandline(dbtConfig.getDbtArguments()));
process =
processFactory.create(jobId, attempt, jobRoot, dbtConfig.getDockerImage(), false, files, "/bin/bash", resourceRequirements, dbtArguments);
processFactory.create(jobId, attempt, jobRoot, dbtConfig.getDockerImage(), false, files, "/bin/bash", resourceRequirements,
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.CUSTOM_STEP),
dbtArguments);

LineGobbler.gobble(process.getInputStream(), LOGGER::info);
LineGobbler.gobble(process.getErrorStream(), LOGGER::error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class WorkerUtils {
public static final List<WorkerPodToleration> DEFAULT_WORKER_POD_TOLERATIONS = initWorkerPodTolerations();
public static final Map<String, String> DEFAULT_WORKER_POD_NODE_SELECTORS = initWorkerPodNodeSelectors();
public static final ResourceRequirements DEFAULT_RESOURCE_REQUIREMENTS = initResourceRequirements();
public static final String DEFAULT_JOBS_IMAGE_PULL_SECRET = new EnvConfigs().getJobsImagePullSecret();

private static final Logger LOGGER = LoggerFactory.getLogger(WorkerUtils.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerException;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.process.KubeProcessFactory;
import io.airbyte.workers.process.ProcessFactory;
import java.nio.file.Path;
import java.util.Map;
Expand Down Expand Up @@ -102,7 +103,8 @@ private boolean runProcess(final String jobId,
final String... args)
throws Exception {
try {
process = processFactory.create(jobId, attempt, jobRoot, NORMALIZATION_IMAGE_NAME, false, files, null, resourceRequirements, args);
process = processFactory.create(jobId, attempt, jobRoot, NORMALIZATION_IMAGE_NAME, false, files, null, resourceRequirements,
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.NORMALISE_STEP), args);

LineGobbler.gobble(process.getInputStream(), LOGGER::info);
LineGobbler.gobble(process.getErrorStream(), LOGGER::error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public Process spec(final Path jobRoot) throws WorkerException {
Collections.emptyMap(),
null,
resourceRequirement,
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SPEC_JOB),
"spec");
}

Expand All @@ -72,6 +73,7 @@ public Process check(final Path jobRoot, final String configFilename, final Stri
ImmutableMap.of(configFilename, configContents),
null,
resourceRequirement,
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.CHECK_JOB),
"check",
"--config", configFilename);
}
Expand All @@ -87,6 +89,7 @@ public Process discover(final Path jobRoot, final String configFilename, final S
ImmutableMap.of(configFilename, configContents),
null,
resourceRequirement,
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.DISCOVER_JOB),
"discover",
"--config", configFilename);
}
Expand Down Expand Up @@ -126,6 +129,7 @@ public Process read(final Path jobRoot,
files,
null,
resourceRequirement,
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.READ_STEP),
arguments);
}

Expand All @@ -149,6 +153,7 @@ public Process write(final Path jobRoot,
files,
null,
resourceRequirement,
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.WRITE_STEP),
"write",
"--config", configFilename,
"--catalog", catalogFilename);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public Process create(String jobId,
final Map<String, String> files,
final String entrypoint,
final ResourceRequirements resourceRequirements,
final Map<String, String> labels,
final String... args)
throws WorkerException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.LocalObjectReference;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.Quantity;
Expand Down Expand Up @@ -179,6 +180,7 @@ private static Container getMain(String image,
.withCommand("sh", "-c", mainCommand)
.withWorkingDir(CONFIG_DIR)
.withVolumeMounts(mainVolumeMounts);

final ResourceRequirementsBuilder resourceRequirementsBuilder = getResourceRequirementsBuilder(resourceRequirements);
if (resourceRequirementsBuilder != null) {
containerBuilder.withResources(resourceRequirementsBuilder.build());
Expand Down Expand Up @@ -251,6 +253,7 @@ public KubePodProcess(String processRunnerHost,
final Map<String, String> files,
final String entrypointOverride,
ResourceRequirements resourceRequirements,
String imagePullSecret,
List<WorkerPodToleration> tolerations,
Map<String, String> nodeSelectors,
Map<String, String> labels,
Expand Down Expand Up @@ -359,6 +362,7 @@ public KubePodProcess(String processRunnerHost,
.endMetadata()
.withNewSpec()
.withTolerations(buildPodTolerations(tolerations))
.withImagePullSecrets(new LocalObjectReference(imagePullSecret)) // An empty string turns this into a no-op setting.
.withNodeSelector(nodeSelectors.isEmpty() ? null : nodeSelectors)
.withRestartPolicy("Never")
.withInitContainers(init)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.kubernetes.client.openapi.ApiClient;
import java.net.InetAddress;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -24,6 +25,19 @@ public class KubeProcessFactory implements ProcessFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(KubeProcessFactory.class);

public static final String JOB_TYPE = "job_type";
public static final String SYNC_JOB = "sync";
public static final String SPEC_JOB = "spec";
public static final String CHECK_JOB = "check";
public static final String DISCOVER_JOB = "discover";
public static final String NORMALIZATION_JOB = "normalize";

public static final String SYNC_STEP = "sync_step";
public static final String READ_STEP = "read";
public static final String WRITE_STEP = "write";
public static final String NORMALISE_STEP = "normalise";
public static final String CUSTOM_STEP = "custom";

private static final Pattern ALPHABETIC = Pattern.compile("[a-zA-Z]+");;
private static final String JOB_LABEL_KEY = "job_id";
private static final String ATTEMPT_LABEL_KEY = "attempt_id";
Expand Down Expand Up @@ -77,11 +91,11 @@ public Process create(String jobId,
final Map<String, String> files,
final String entrypoint,
final ResourceRequirements resourceRequirements,
final Map<String, String> customLabels,
final String... args)
throws WorkerException {
try {
// used to differentiate source and destination processes with the same id and attempt

final String podName = createPodName(imageName, jobId, attempt);

final int stdoutLocalPort = KubePortManagerSingleton.take();
Expand All @@ -90,6 +104,13 @@ public Process create(String jobId,
final int stderrLocalPort = KubePortManagerSingleton.take();
LOGGER.info("{} stderrLocalPort = {}", podName, stderrLocalPort);

var allLabels = new HashMap<>(customLabels);
var generalKubeLabels = Map.of(
JOB_LABEL_KEY, jobId,
ATTEMPT_LABEL_KEY, String.valueOf(attempt),
WORKER_POD_LABEL_KEY, WORKER_POD_LABEL_VALUE);
allLabels.putAll(generalKubeLabels);

return new KubePodProcess(
processRunnerHost,
officialClient,
Expand All @@ -104,11 +125,10 @@ public Process create(String jobId,
files,
entrypoint,
resourceRequirements,
WorkerUtils.DEFAULT_JOBS_IMAGE_PULL_SECRET,
WorkerUtils.DEFAULT_WORKER_POD_TOLERATIONS,
WorkerUtils.DEFAULT_WORKER_POD_NODE_SELECTORS,
Map.of(JOB_LABEL_KEY, jobId,
ATTEMPT_LABEL_KEY, String.valueOf(attempt),
WORKER_POD_LABEL_KEY, WORKER_POD_LABEL_VALUE),
allLabels,
args);
} catch (Exception e) {
throw new WorkerException(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ public interface ProcessFactory {
*
* @param jobId job Id
* @param attempt attempt Id
* @param jobPath Workspace directory to run the process from
* @param imageName Docker image name to start the process from
* @param files file name to contents map that will be written into the working dir of the process
* prior to execution
* @param jobPath Workspace directory to run the process from.
* @param imageName Docker image name to start the process from.
* @param files File name to contents map that will be written into the working dir of the process
* prior to execution.
* @param entrypoint If not null, the default entrypoint program of the docker image can be changed
* by this argument
* @param args arguments to pass to the docker image being run in the new process
* @return the ProcessBuilder object to run the process
* by this argument.
* @param resourceRequirements CPU and RAM to assign to the created process.
* @param labels Labels to assign to the created Kube pod, if any. Ignore for docker.
* @param args Arguments to pass to the docker image being run in the new process.
* @return ProcessBuilder object to run the process.
* @throws WorkerException
*/
Process create(String jobId,
Expand All @@ -35,6 +37,7 @@ Process create(String jobId,
final Map<String, String> files,
final String entrypoint,
final ResourceRequirements resourceRequirements,
final Map<String, String> labels,
final String... args)
throws WorkerException;

Expand All @@ -46,9 +49,10 @@ default Process create(String jobId,
final Map<String, String> files,
final String entrypoint,
final ResourceRequirements resourceRequirements,
final Map<String, String> labels,
final List<String> args)
throws WorkerException {
return create(jobId, attempt, jobPath, imageName, usesStdin, files, entrypoint, resourceRequirements, args.toArray(new String[0]));
return create(jobId, attempt, jobPath, imageName, usesStdin, files, entrypoint, resourceRequirements, labels, args.toArray(new String[0]));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public static String getEntrypointEnvVariable(ProcessFactory processFactory, Str
false,
Collections.emptyMap(),
"printenv",
WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS);
WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS,
Collections.emptyMap());

BufferedReader stdout = new BufferedReader(new InputStreamReader(process.getInputStream()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.lang.RandomStringUtils;
Expand Down Expand Up @@ -174,7 +175,7 @@ private Process getProcess(String entrypoint) throws WorkerException {
"busybox:latest",
false,
files,
entrypoint, WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS);
entrypoint, WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS, Map.of());
}

private static Set<Integer> getOpenPorts(int count) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.airbyte.workers.WorkerException;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.normalization.DefaultNormalizationRunner.DestinationType;
import io.airbyte.workers.process.KubeProcessFactory;
import io.airbyte.workers.process.ProcessFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -53,6 +54,7 @@ void setup() throws IOException, WorkerException {

when(processFactory.create(JOB_ID, JOB_ATTEMPT, jobRoot, DefaultNormalizationRunner.NORMALIZATION_IMAGE_NAME, false, files, null,
WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS,
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.NORMALISE_STEP),
"run",
"--integration-type", "bigquery",
"--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ void spec() throws WorkerException {
launcher.spec(JOB_ROOT);

Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, Collections.emptyMap(), null,
WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS,
WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS, Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SPEC_JOB),
"spec");
}

Expand All @@ -54,7 +54,7 @@ void check() throws WorkerException {
launcher.check(JOB_ROOT, "config", "{}");

Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, CONFIG_FILES, null,
WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS,
WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS, Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.CHECK_JOB),
"check",
"--config", "config");
}
Expand All @@ -64,7 +64,7 @@ void discover() throws WorkerException {
launcher.discover(JOB_ROOT, "config", "{}");

Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, CONFIG_FILES, null,
WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS,
WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS, Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.DISCOVER_JOB),
"discover",
"--config", "config");
}
Expand All @@ -75,6 +75,7 @@ void read() throws WorkerException {

Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, CONFIG_CATALOG_STATE_FILES, null,
WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS,
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.READ_STEP),
Lists.newArrayList(
"read",
"--config", "config",
Expand All @@ -88,6 +89,7 @@ void write() throws WorkerException {

Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, true, CONFIG_CATALOG_FILES, null,
WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS,
Map.of(KubeProcessFactory.JOB_TYPE, KubeProcessFactory.SYNC_JOB, KubeProcessFactory.SYNC_STEP, KubeProcessFactory.WRITE_STEP),
"write",
"--config", "config",
"--catalog", "catalog");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -73,7 +74,7 @@ public void testFileWriting() throws IOException, WorkerException {

final DockerProcessFactory processFactory = new DockerProcessFactory(workspaceRoot, "", "", "");
processFactory.create("job_id", 0, jobRoot, "busybox", false, ImmutableMap.of("config.json", "{\"data\": 2}"), "echo hi",
WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS);
WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS, Map.of());

assertEquals(
Jsons.jsonNode(ImmutableMap.of("data", 2)),
Expand Down

0 comments on commit 3369237

Please sign in to comment.