diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index c342763ca059d..29e8606609ac5 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -628,6 +628,14 @@ dags_volume_host = # Useful in local environment, discouraged in production logs_volume_host = +# A list of configMapsRefs to envFrom. If more than one configMap is +# specified, provide a comma separated list: configmap_a,configmap_b +env_from_configmap_ref = + +# A list of secretRefs to envFrom. If more than one secret is +# specified, provide a comma separated list: secret_a,secret_b +env_from_secret_ref = + # Git credentials and repository for DAGs mounted via Git (mutually exclusive with volume claim) git_repo = git_branch = @@ -737,12 +745,12 @@ tolerations = # scheduler. You may define as many secrets as needed and the kubernetes launcher will parse the # defined secrets and mount them as secret environment variables in the launched workers. # Secrets in this section are defined as follows -# = : +# = = # # For example if you wanted to mount a kubernetes secret key named `postgres_password` from the # kubernetes secret object `airflow-secret` as the environment variable `POSTGRES_PASSWORD` into # your workers you would follow the following format: -# POSTGRES_PASSWORD = airflow-secret:postgres_credentials +# POSTGRES_PASSWORD = airflow-secret=postgres_credentials # # Additionally you may override worker airflow settings with the AIRFLOW__
__ # formatting as supported by airflow normally. diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index 38abc5ed85a08..83babdafa44a2 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -125,6 +125,10 @@ def __init__(self): self.core_configuration = configuration_dict['core'] self.kube_secrets = configuration_dict.get('kubernetes_secrets', {}) self.kube_env_vars = configuration_dict.get('kubernetes_environment_variables', {}) + self.env_from_configmap_ref = configuration.get(self.kubernetes_section, + 'env_from_configmap_ref') + self.env_from_secret_ref = configuration.get(self.kubernetes_section, + 'env_from_secret_ref') self.airflow_home = settings.AIRFLOW_HOME self.dags_folder = configuration.get(self.core_section, 'dags_folder') self.parallelism = configuration.getint(self.core_section, 'PARALLELISM') diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py index ea46e1bcd385d..d1b6d4eaf4dca 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py @@ -75,6 +75,12 @@ def extract_affinity(pod, req): for k, v in six.iteritems(pod.affinity): req['spec']['affinity'][k] = v + @staticmethod + def extract_node_selector(pod, req): + req['spec']['nodeSelector'] = req['spec'].get('nodeSelector', {}) + for k, v in six.iteritems(pod.node_selectors): + req['spec']['nodeSelector'][k] = v + @staticmethod def extract_cmds(pod, req): req['spec']['containers'][0]['command'] = pod.cmds @@ -83,12 +89,6 @@ def extract_cmds(pod, req): def extract_args(pod, req): req['spec']['containers'][0]['args'] = pod.args - @staticmethod - def extract_node_selector(pod, req): - req['spec']['nodeSelector'] = req['spec'].get('nodeSelector', {}) - for k, v in six.iteritems(pod.node_selectors): - req['spec']['nodeSelector'][k] = v - @staticmethod def attach_volumes(pod, req): req['spec']['volumes'] = ( @@ -132,7 +132,7 @@ def extract_volume_secrets(pod, req): @staticmethod def extract_env_and_secrets(pod, req): envs_from_key_secrets = [ - env for env in pod.secrets if env.deploy_type == 'env' and hasattr(env, 'key') + env for env in pod.secrets if env.deploy_type == 'env' and env.key is not None ] if len(pod.envs) > 0 or len(envs_from_key_secrets) > 0: @@ -206,7 +206,7 @@ def extract_security_context(pod, req): @staticmethod def _apply_env_from(pod, req): envs_from_secrets = [ - env for env in pod.secrets if env.deploy_type == 'env' and not hasattr(env, 'key') + env for env in pod.secrets if env.deploy_type == 'env' and env.key is None ] if pod.configmaps or envs_from_secrets: diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py index 3d7c138b7e179..f8c0bdffaee08 100644 --- a/airflow/contrib/kubernetes/pod.py +++ b/airflow/contrib/kubernetes/pod.py @@ -50,13 +50,13 @@ class Pod: :param secrets: Secrets to be launched to the pod :type secrets: list[airflow.contrib.kubernetes.secret.Secret] :param result: The result that will be returned to the operator after - successful execution of the pod + successful execution of the pod :type result: any :param image_pull_policy: Specify a policy to cache or always pull an image :type image_pull_policy: str :param image_pull_secrets: Any image pull secrets to be given to the pod. - If more than one secret is required, provide a - comma separated list: secret_a,secret_b + If more than one secret is required, provide a comma separated list: + secret_a,secret_b :type image_pull_secrets: str :param affinity: A dict containing a group of affinity scheduling rules :type affinity: dict diff --git a/airflow/contrib/kubernetes/secret.py b/airflow/contrib/kubernetes/secret.py index d69f507717682..73c51e900acf9 100644 --- a/airflow/contrib/kubernetes/secret.py +++ b/airflow/contrib/kubernetes/secret.py @@ -17,7 +17,7 @@ from airflow.exceptions import AirflowConfigException -class Secret: +class Secret(object): """Defines Kubernetes Secret Volume""" def __init__(self, deploy_type, deploy_target, secret, key=None): @@ -26,10 +26,10 @@ def __init__(self, deploy_type, deploy_target, secret, key=None): :param deploy_type: The type of secret deploy in Kubernetes, either `env` or `volume` :type deploy_type: str - :param deploy_target: The environment variable when `deploy_type` `env` or - file path when `deploy_type` `volume` where expose secret. - If `key` is not provided deploy target should be None. - :type deploy_target: str + :param deploy_target: (Optional) The environment variable when + `deploy_type` `env` or file path when `deploy_type` `volume` where + expose secret. If `key` is not provided deploy target should be None. + :type deploy_target: str or None :param secret: Name of the secrets object in Kubernetes :type secret: str :param key: (Optional) Key of the secret within the Kubernetes Secret @@ -37,18 +37,32 @@ def __init__(self, deploy_type, deploy_target, secret, key=None): :type key: str or None """ self.deploy_type = deploy_type + self.deploy_target = deploy_target - if deploy_target: + if deploy_target is not None and deploy_type == 'env': + # if deploying to env, capitalize the deploy target self.deploy_target = deploy_target.upper() - if deploy_type == 'volume': - self.deploy_target = deploy_target - - if not deploy_type == 'env' and key is None: + if key is not None and deploy_target is None: raise AirflowConfigException( - 'In deploy_type different than `env` parameter `key` is mandatory' + 'If `key` is set, `deploy_target` should not be None' ) self.secret = secret - if key: - self.key = key + self.key = key + + def __eq__(self, other): + return ( + self.deploy_type == other.deploy_type and + self.deploy_target == other.deploy_target and + self.secret == other.secret and + self.key == other.key + ) + + def __repr__(self): + return 'Secret({}, {}, {}, {})'.format( + self.deploy_type, + self.deploy_target, + self.secret, + self.key + ) diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py index e8bc7384a0f82..60128f1479e81 100644 --- a/airflow/contrib/kubernetes/worker_configuration.py +++ b/airflow/contrib/kubernetes/worker_configuration.py @@ -155,13 +155,28 @@ def _get_environment(self): env['AIRFLOW__CORE__DAGS_FOLDER'] = dag_volume_mount_path return env + def _get_configmaps(self): + """Extracts any configmapRefs to envFrom""" + if not self.kube_config.env_from_configmap_ref: + return [] + return self.kube_config.env_from_configmap_ref.split(',') + def _get_secrets(self): """Defines any necessary secrets for the pod executor""" worker_secrets = [] + for env_var_name, obj_key_pair in six.iteritems(self.kube_config.kube_secrets): k8s_secret_obj, k8s_secret_key = obj_key_pair.split('=') worker_secrets.append( - Secret('env', env_var_name, k8s_secret_obj, k8s_secret_key)) + Secret('env', env_var_name, k8s_secret_obj, k8s_secret_key) + ) + + if self.kube_config.env_from_secret_ref: + for secret_ref in self.kube_config.env_from_secret_ref.split(','): + worker_secrets.append( + Secret('env', None, secret_ref) + ) + return worker_secrets def _get_image_pull_secrets(self): @@ -331,4 +346,5 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da affinity=affinity, tolerations=tolerations, security_context=self._get_security_context(), + configmaps=self._get_configmaps() ) diff --git a/tests/contrib/executors/test_kubernetes_executor.py b/tests/contrib/executors/test_kubernetes_executor.py index 7fac05b8baed5..9e969f390e896 100644 --- a/tests/contrib/executors/test_kubernetes_executor.py +++ b/tests/contrib/executors/test_kubernetes_executor.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- -# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -37,6 +35,7 @@ from airflow.contrib.executors.kubernetes_executor import KubernetesExecutorConfig from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration from airflow.exceptions import AirflowConfigException + from airflow.contrib.kubernetes.secret import Secret except ImportError: AirflowKubernetesScheduler = None @@ -162,11 +161,8 @@ def setUp(self): self.resources = mock.patch( 'airflow.contrib.kubernetes.worker_configuration.Resources' ) - self.secret = mock.patch( - 'airflow.contrib.kubernetes.worker_configuration.Secret' - ) - for patcher in [self.resources, self.secret]: + for patcher in [self.resources]: self.mock_foo = patcher.start() self.addCleanup(patcher.stop) @@ -590,6 +586,46 @@ def test_kubernetes_environment_variables(self): env = worker_config._get_environment() self.assertEqual(env[core_executor], 'LocalExecutor') + def test_get_secrets(self): + # Test when secretRef is None and kube_secrets is not empty + self.kube_config.kube_secrets = { + 'AWS_SECRET_KEY': 'airflow-secret=aws_secret_key', + 'POSTGRES_PASSWORD': 'airflow-secret=postgres_credentials' + } + self.kube_config.env_from_secret_ref = None + worker_config = WorkerConfiguration(self.kube_config) + secrets = worker_config._get_secrets() + secrets.sort(key=lambda secret: secret.deploy_target) + expected = [ + Secret('env', 'AWS_SECRET_KEY', 'airflow-secret', 'aws_secret_key'), + Secret('env', 'POSTGRES_PASSWORD', 'airflow-secret', 'postgres_credentials') + ] + self.assertListEqual(expected, secrets) + + # Test when secret is not empty and kube_secrets is empty dict + self.kube_config.kube_secrets = {} + self.kube_config.env_from_secret_ref = 'secret_a,secret_b' + worker_config = WorkerConfiguration(self.kube_config) + secrets = worker_config._get_secrets() + expected = [ + Secret('env', None, 'secret_a'), + Secret('env', None, 'secret_b') + ] + self.assertListEqual(expected, secrets) + + def test_get_configmaps(self): + # Test when configmap is empty + self.kube_config.env_from_configmap_ref = '' + worker_config = WorkerConfiguration(self.kube_config) + configmaps = worker_config._get_configmaps() + self.assertListEqual([], configmaps) + + # test when configmap is not empty + self.kube_config.env_from_configmap_ref = 'configmap_a,configmap_b' + worker_config = WorkerConfiguration(self.kube_config) + configmaps = worker_config._get_configmaps() + self.assertListEqual(['configmap_a', 'configmap_b'], configmaps) + class TestKubernetesExecutor(unittest.TestCase): """ diff --git a/tests/contrib/kubernetes/__init__.py b/tests/contrib/kubernetes/__init__.py new file mode 100644 index 0000000000000..114d189da14ab --- /dev/null +++ b/tests/contrib/kubernetes/__init__.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/contrib/kubernetes/kubernetes_request_factory/__init__.py b/tests/contrib/kubernetes/kubernetes_request_factory/__init__.py new file mode 100644 index 0000000000000..114d189da14ab --- /dev/null +++ b/tests/contrib/kubernetes/kubernetes_request_factory/__init__.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/contrib/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py b/tests/contrib/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py new file mode 100644 index 0000000000000..e7f444abb14a1 --- /dev/null +++ b/tests/contrib/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py @@ -0,0 +1,306 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.contrib.kubernetes.kubernetes_request_factory.\ + kubernetes_request_factory import KubernetesRequestFactory +from airflow.contrib.kubernetes.pod import Pod, Resources +from airflow.contrib.kubernetes.secret import Secret +from parameterized import parameterized +import unittest +import copy + + +class TestKubernetesRequestFactory(unittest.TestCase): + + def setUp(self): + + self.expected = { + 'apiVersion': 'v1', + 'kind': 'Pod', + 'metadata': { + 'name': 'name' + }, + 'spec': { + 'containers': [{ + 'name': 'base', + 'image': 'airflow-worker:latest', + 'command': [ + "/usr/local/airflow/entrypoint.sh", + "/bin/bash sleep 25" + ], + }], + 'restartPolicy': 'Never' + } + } + self.input_req = copy.deepcopy(self.expected) + + def test_extract_image(self): + image = 'v3.14' + pod = Pod(image, {}, []) + KubernetesRequestFactory.extract_image(pod, self.input_req) + self.expected['spec']['containers'][0]['image'] = image + self.assertEqual(self.input_req, self.expected) + + def test_extract_image_pull_policy(self): + # Test when pull policy is not none + pull_policy = 'IfNotPresent' + pod = Pod('v3.14', {}, [], image_pull_policy=pull_policy) + + KubernetesRequestFactory.extract_image_pull_policy(pod, self.input_req) + self.expected['spec']['containers'][0]['imagePullPolicy'] = pull_policy + self.assertEqual(self.input_req, self.expected) + + def test_add_secret_to_env(self): + secret = Secret('env', 'target', 'my-secret', 'KEY') + secret_list = [] + self.expected = [{ + 'name': 'TARGET', + 'valueFrom': { + 'secretKeyRef': { + 'name': 'my-secret', + 'key': 'KEY' + } + } + }] + KubernetesRequestFactory.add_secret_to_env(secret_list, secret) + self.assertListEqual(secret_list, self.expected) + + def test_extract_labels(self): + # Test when labels are not empty + labels = {'label_a': 'val_a', 'label_b': 'val_b'} + pod = Pod('v3.14', {}, [], labels=labels) + self.expected['metadata']['labels'] = labels + KubernetesRequestFactory.extract_labels(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) + + def test_extract_annotations(self): + # Test when annotations are not empty + annotations = {'annot_a': 'val_a', 'annot_b': 'val_b'} + pod = Pod('v3.14', {}, [], annotations=annotations) + self.expected['metadata']['annotations'] = annotations + KubernetesRequestFactory.extract_annotations(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) + + def test_extract_affinity(self): + # Test when affinity is not empty + affinity = {'podAffinity': 'requiredDuringSchedulingIgnoredDuringExecution'} + pod = Pod('v3.14', {}, [], affinity=affinity) + self.expected['spec']['affinity'] = affinity + KubernetesRequestFactory.extract_affinity(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) + + def test_extract_node_selector(self): + # Test when affinity is not empty + node_selectors = {'disktype': 'ssd', 'accelerator': 'nvidia-tesla-p100'} + pod = Pod('v3.14', {}, [], node_selectors=node_selectors) + self.expected['spec']['nodeSelector'] = node_selectors + KubernetesRequestFactory.extract_node_selector(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) + + def test_extract_cmds(self): + cmds = ['test-cmd.sh'] + pod = Pod('v3.14', {}, cmds) + KubernetesRequestFactory.extract_cmds(pod, self.input_req) + self.expected['spec']['containers'][0]['command'] = cmds + self.assertEqual(self.input_req, self.expected) + + def test_extract_args(self): + args = ['test_arg.sh'] + pod = Pod('v3.14', {}, [], args=args) + KubernetesRequestFactory.extract_args(pod, self.input_req) + self.expected['spec']['containers'][0]['args'] = args + self.assertEqual(self.input_req, self.expected) + + def test_attach_volumes(self): + # Test when volumes is not empty list + volumes = ['vol_a', 'vol_b'] + pod = Pod('v3.14', {}, [], volumes=volumes) + self.expected['spec']['volumes'] = volumes + KubernetesRequestFactory.attach_volumes(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) + + def test_attach_volume_mounts(self): + # Test when volumes is not empty list + volume_mounts = ['vol_a', 'vol_b'] + pod = Pod('v3.14', {}, [], volume_mounts=volume_mounts) + self.expected['spec']['containers'][0]['volumeMounts'] = volume_mounts + KubernetesRequestFactory.attach_volume_mounts(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) + + def test_extract_name(self): + name = 'pod-name' + pod = Pod('v3.14', {}, [], name=name) + self.expected['metadata']['name'] = name + KubernetesRequestFactory.extract_name(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) + + def test_extract_volume_secrets(self): + # Test when secrets is not empty + secrets = [ + Secret('volume', 'KEY1', 's1', 'key-1'), + Secret('env', 'KEY2', 's2'), + Secret('volume', 'KEY3', 's3', 'key-2') + ] + pod = Pod('v3.14', {}, [], secrets=secrets) + self.expected['spec']['containers'][0]['volumeMounts'] = [{ + 'mountPath': 'KEY1', + 'name': 'secretvol0', + 'readOnly': True + }, { + 'mountPath': 'KEY3', + 'name': 'secretvol1', + 'readOnly': True + }] + self.expected['spec']['volumes'] = [{ + 'name': 'secretvol0', + 'secret': { + 'secretName': 's1' + } + }, { + 'name': 'secretvol1', + 'secret': { + 'secretName': 's3' + } + }] + KubernetesRequestFactory.extract_volume_secrets(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) + + def test_extract_env_and_secrets(self): + # Test when secrets and envs are not empty + secrets = [ + Secret('env', None, 's1'), + Secret('volume', 'KEY2', 's2', 'key-2'), + Secret('env', None, 's3') + ] + envs = { + 'ENV1': 'val1', + 'ENV2': 'val2' + } + configmaps = ['configmap_a', 'configmap_b'] + pod = Pod('v3.14', envs, [], secrets=secrets, configmaps=configmaps) + self.expected['spec']['containers'][0]['env'] = [ + {'name': 'ENV1', 'value': 'val1'}, + {'name': 'ENV2', 'value': 'val2'}, + ] + self.expected['spec']['containers'][0]['envFrom'] = [{ + 'secretRef': { + 'name': 's1' + } + }, { + 'secretRef': { + 'name': 's3' + } + }, { + 'configMapRef': { + 'name': 'configmap_a' + } + }, { + 'configMapRef': { + 'name': 'configmap_b' + } + }] + + KubernetesRequestFactory.extract_env_and_secrets(pod, self.input_req) + self.input_req['spec']['containers'][0]['env'].sort(key=lambda x: x['name']) + self.assertEqual(self.input_req, self.expected) + + def test_extract_resources(self): + # Test when resources is not empty + resources = Resources('1Gi', 1, '2Gi', 2) + pod = Pod('v3.14', {}, [], resources=resources) + self.expected['spec']['containers'][0]['resources'] = { + 'requests': { + 'memory': '1Gi', + 'cpu': 1 + }, + 'limits': { + 'memory': '2Gi', + 'cpu': 2 + }, + } + KubernetesRequestFactory.extract_resources(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) + + def test_extract_init_containers(self): + init_container = 'init_container' + pod = Pod('v3.14', {}, [], init_containers=init_container) + self.expected['spec']['initContainers'] = init_container + KubernetesRequestFactory.extract_init_containers(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) + + def test_extract_service_account_name(self): + service_account_name = 'service_account_name' + pod = Pod('v3.14', {}, [], service_account_name=service_account_name) + self.expected['spec']['serviceAccountName'] = service_account_name + KubernetesRequestFactory.extract_service_account_name(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) + + def test_extract_hostnetwork(self): + hostnetwork = True + pod = Pod('v3.14', {}, [], hostnetwork=hostnetwork) + self.expected['spec']['hostNetwork'] = hostnetwork + KubernetesRequestFactory.extract_hostnetwork(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) + + def test_extract_image_pull_secrets(self): + image_pull_secrets = 'secret_a,secret_b,secret_c' + pod = Pod('v3.14', {}, [], image_pull_secrets=image_pull_secrets) + self.expected['spec']['imagePullSecrets'] = [ + {'name': 'secret_a'}, + {'name': 'secret_b'}, + {'name': 'secret_c'}, + ] + KubernetesRequestFactory.extract_image_pull_secrets(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) + + def test_extract_tolerations(self): + tolerations = [{ + 'key': 'key', + 'operator': 'Equal', + 'value': 'value', + 'effect': 'NoSchedule' + }] + pod = Pod('v3.14', {}, [], tolerations=tolerations) + self.expected['spec']['tolerations'] = tolerations + KubernetesRequestFactory.extract_tolerations(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) + + def test_security_context(self): + security_context = { + 'runAsUser': 1000, + 'fsGroup': 2000 + } + pod = Pod('v3.14', {}, [], security_context=security_context) + self.expected['spec']['securityContext'] = security_context + KubernetesRequestFactory.extract_security_context(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) + + @parameterized.expand([ + 'extract_resources', + 'extract_init_containers', + 'extract_service_account_name', + 'extract_hostnetwork', + 'extract_image_pull_secrets', + 'extract_tolerations', + 'extract_security_context', + 'extract_volume_secrets' + ]) + def test_identity(self, name): + kube_request_factory_func = getattr(KubernetesRequestFactory, name) + pod = Pod('v3.14', {}, []) + kube_request_factory_func(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) diff --git a/tests/contrib/kubernetes/kubernetes_request_factory/test_pod_request_factory.py b/tests/contrib/kubernetes/kubernetes_request_factory/test_pod_request_factory.py new file mode 100644 index 0000000000000..ff835ed0c9ec4 --- /dev/null +++ b/tests/contrib/kubernetes/kubernetes_request_factory/test_pod_request_factory.py @@ -0,0 +1,159 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.contrib.kubernetes.kubernetes_request_factory.\ + pod_request_factory import SimplePodRequestFactory, \ + ExtractXcomPodRequestFactory +from airflow.contrib.kubernetes.pod import Pod +from airflow.contrib.kubernetes.secret import Secret +from airflow.exceptions import AirflowConfigException +import unittest + +XCOM_CMD = """import time +while True: + try: + time.sleep(3600) + except KeyboardInterrupt: + exit(0) +""" + + +class TestPodRequestFactory(unittest.TestCase): + + def setUp(self): + self.simple_pod_request_factory = SimplePodRequestFactory() + self.xcom_pod_request_factory = ExtractXcomPodRequestFactory() + self.pod = Pod( + image='busybox', + envs={ + 'ENVIRONMENT': 'prod', + 'LOG_LEVEL': 'warning' + }, + name='myapp-pod', + cmds=['sh', '-c', 'echo Hello Kubernetes!'], + labels={'app': 'myapp'}, + image_pull_secrets='pull_secret_a,pull_secret_b', + configmaps=['configmap_a', 'configmap_b'], + secrets=[ + # This should be a secretRef + Secret('env', None, 'secret_a'), + # This should be a single secret mounted in volumeMounts + Secret('volume', '/etc/foo', 'secret_b'), + # This should produce a single secret mounted in env + Secret('env', 'TARGET', 'secret_b', 'source_b'), + ] + ) + self.maxDiff = None + self.expected = { + 'apiVersion': 'v1', + 'kind': 'Pod', + 'metadata': { + 'name': 'myapp-pod', + 'labels': {'app': 'myapp'}, + 'annotations': {}}, + 'spec': { + 'containers': [{ + 'name': 'base', + 'image': 'busybox', + 'command': [ + 'sh', '-c', 'echo Hello Kubernetes!' + ], + 'imagePullPolicy': 'IfNotPresent', + 'args': [], + 'env': [{ + 'name': 'ENVIRONMENT', + 'value': 'prod' + }, { + 'name': 'LOG_LEVEL', + 'value': 'warning' + }, { + 'name': 'TARGET', + 'valueFrom': { + 'secretKeyRef': { + 'name': 'secret_b', + 'key': 'source_b' + } + } + }], + 'envFrom': [{ + 'secretRef': { + 'name': 'secret_a' + } + }, { + 'configMapRef': { + 'name': 'configmap_a' + } + }, { + 'configMapRef': { + 'name': 'configmap_b' + } + }], + 'volumeMounts': [{ + 'mountPath': '/etc/foo', + 'name': 'secretvol0', + 'readOnly': True + }] + }], + 'restartPolicy': 'Never', + 'nodeSelector': {}, + 'volumes': [{ + 'name': 'secretvol0', + 'secret': { + 'secretName': 'secret_b' + } + }], + 'imagePullSecrets': [ + {'name': 'pull_secret_a'}, + {'name': 'pull_secret_b'} + ], + 'affinity': {} + } + } + + def test_secret_throws(self): + with self.assertRaises(AirflowConfigException): + Secret('volume', None, 'secret_a', 'key') + + def test_simple_pod_request_factory_create(self): + result = self.simple_pod_request_factory.create(self.pod) + # sort + result['spec']['containers'][0]['env'].sort(key=lambda x: x['name']) + self.assertEqual(result, self.expected) + + def test_xcom_pod_request_factory_create(self): + result = self.xcom_pod_request_factory.create(self.pod) + container_two = { + 'name': 'airflow-xcom-sidecar', + 'image': 'python:3.5-alpine', + 'command': ['python', '-c', XCOM_CMD], + 'volumeMounts': [ + { + 'name': 'xcom', + 'mountPath': '/airflow/xcom' + } + ] + } + self.expected['spec']['containers'].append(container_two) + self.expected['spec']['containers'][0]['volumeMounts'].insert(0, { + 'name': 'xcom', + 'mountPath': '/airflow/xcom' + }) + self.expected['spec']['volumes'].insert(0, { + 'name': 'xcom', 'emptyDir': {} + }) + result['spec']['containers'][0]['env'].sort(key=lambda x: x['name']) + self.assertEqual(result, self.expected)