From 3e7caf12d3a8353ecc212ac1ba57360314a2a132 Mon Sep 17 00:00:00 2001 From: David Lum Date: Wed, 20 Mar 2019 11:44:30 -0400 Subject: [PATCH 1/4] feat/AIRFLOW-4008/k8s-executor-env-from --- airflow/config_templates/default_airflow.cfg | 8 + .../contrib/executors/kubernetes_executor.py | 4 + .../kubernetes_request_factory.py | 15 ++ .../pod_request_factory.py | 2 + airflow/contrib/kubernetes/pod.py | 14 ++ .../kubernetes/worker_configuration.py | 2 + tests/contrib/kubernetes/__init__.py | 18 ++ .../kubernetes_request_factory/__init__.py | 18 ++ .../test_pod_request_factory.py | 154 ++++++++++++++++++ 9 files changed, 235 insertions(+) create mode 100644 tests/contrib/kubernetes/__init__.py create mode 100644 tests/contrib/kubernetes/kubernetes_request_factory/__init__.py create mode 100644 tests/contrib/kubernetes/kubernetes_request_factory/test_pod_request_factory.py diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index fcb58259ae638..9275478cc7c83 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -627,6 +627,14 @@ dags_volume_host = # Useful in local environment, discouraged in production logs_volume_host = +# A list of configMapsRefs to envFrom. If more than one configMap is +# specified, provide a comma separated list: configmap_a,configmap_b +env_from_configmap_ref = + +# A list of secretRefs to envFrom. If more than one secret is +# specified, provide a comma separated list: secret_a,secret_b +env_from_secret_ref = + # Git credentials and repository for DAGs mounted via Git (mutually exclusive with volume claim) git_repo = git_branch = diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index 7a348815fa6d3..09718a36841d2 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -123,6 +123,10 @@ def __init__(self): self.core_configuration = configuration_dict['core'] self.kube_secrets = configuration_dict.get('kubernetes_secrets', {}) self.kube_env_vars = configuration_dict.get('kubernetes_environment_variables', {}) + self.kube_env_from_configmap_ref = configuration.get(self.kubernetes_section, + 'env_from_configmap_ref') + self.kube_env_from_secret_ref = configuration.get(self.kubernetes_section, + 'env_from_secret_ref') self.airflow_home = configuration.get(self.core_section, 'airflow_home') self.dags_folder = configuration.get(self.core_section, 'dags_folder') self.parallelism = configuration.getint(self.core_section, 'PARALLELISM') diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py index 50959d276d6c5..c6ee3d6ef1e32 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py @@ -196,3 +196,18 @@ def extract_tolerations(pod, req): def extract_security_context(pod, req): if pod.security_context: req['spec']['securityContext'] = pod.security_context + + @staticmethod + def extract_env_from(pod, req): + if pod.env_from_configmap_ref or pod.env_from_secret_ref: + configmap_refs = [{ + 'configMapRef': { + 'name': configmap_ref + } + } for configmap_ref in pod.env_from_configmap_ref.split(',')] + secret_refs = [{ + 'secretRef': { + 'name': secret_ref + } + } for secret_ref in pod.env_from_secret_ref.split(',')] + req['spec']['containers'][0]['envFrom'] = configmap_refs + secret_refs diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py index afcbf93525257..5519f21178ccf 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py @@ -63,6 +63,7 @@ def create(self, pod): self.extract_hostnetwork(pod, req) self.extract_tolerations(pod, req) self.extract_security_context(pod, req) + self.extract_env_from(pod, req) return req @@ -130,4 +131,5 @@ def create(self, pod): self.extract_affinity(pod, req) self.extract_hostnetwork(pod, req) self.extract_tolerations(pod, req) + self.extract_env_from(pod, req) return req diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py index 889d3cd5972d5..c80adf1c16457 100644 --- a/airflow/contrib/kubernetes/pod.py +++ b/airflow/contrib/kubernetes/pod.py @@ -66,6 +66,16 @@ class Pod: :type tolerations: list :param security_context: A dict containing the security context for the pod :type security_context: dict + :param security_context: A dict containing the security context for the pod + :type security_context: dict + :param env_from_configmap_ref: Any configMapRef for the pod to envFrom. + If more than one configMapRef is required, provide a + comma separated list: configmap_a,configmap_b + :type env_from_configmap_ref: str + :param env_from_secret_ref: Any secretRef for the pod to envFrom. + If more than one secretRef is required, provide a + comma separated list: secret_a,secret_b + :type env_from_secret_ref: str """ def __init__( self, @@ -91,6 +101,8 @@ def __init__( hostnetwork=False, tolerations=None, security_context=None, + env_from_configmap_ref=None, + env_from_secret_ref=None ): self.image = image self.envs = envs or {} @@ -114,3 +126,5 @@ def __init__( self.hostnetwork = hostnetwork or False self.tolerations = tolerations or [] self.security_context = security_context + self.env_from_configmap_ref = env_from_configmap_ref + self.env_from_secret_ref = env_from_secret_ref diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py index 2a1d7e58c18d7..31a3b8eb83a77 100644 --- a/airflow/contrib/kubernetes/worker_configuration.py +++ b/airflow/contrib/kubernetes/worker_configuration.py @@ -331,4 +331,6 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da affinity=affinity, tolerations=tolerations, security_context=self._get_security_context(), + env_from_configmap_ref=self.kube_config.env_from_configmap_ref, + env_from_secret_ref=self.kube_config.env_from_secret_ref ) diff --git a/tests/contrib/kubernetes/__init__.py b/tests/contrib/kubernetes/__init__.py new file mode 100644 index 0000000000000..114d189da14ab --- /dev/null +++ b/tests/contrib/kubernetes/__init__.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/contrib/kubernetes/kubernetes_request_factory/__init__.py b/tests/contrib/kubernetes/kubernetes_request_factory/__init__.py new file mode 100644 index 0000000000000..114d189da14ab --- /dev/null +++ b/tests/contrib/kubernetes/kubernetes_request_factory/__init__.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/contrib/kubernetes/kubernetes_request_factory/test_pod_request_factory.py b/tests/contrib/kubernetes/kubernetes_request_factory/test_pod_request_factory.py new file mode 100644 index 0000000000000..51e33626cc731 --- /dev/null +++ b/tests/contrib/kubernetes/kubernetes_request_factory/test_pod_request_factory.py @@ -0,0 +1,154 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.contrib.kubernetes.kubernetes_request_factory.\ + pod_request_factory import SimplePodRequestFactory, \ + ExtractXcomPodRequestFactory +from airflow.contrib.kubernetes.pod import Pod +from mock import ANY +import unittest + + +class TestSimplePodRequestFactory(unittest.TestCase): + + def setUp(self): + self.simple_pod_request_factory = SimplePodRequestFactory() + self.xcom_pod_request_factory = ExtractXcomPodRequestFactory() + self.pod = Pod( + image='busybox', + envs={ + 'ENVIRONMENT': 'prod', + 'LOG_LEVEL': 'warning' + }, + name='myapp-pod', + cmds=['sh', '-c', 'echo Hello Kubernetes!'], + labels={'app': 'myapp'}, + env_from_configmap_ref='env_from_configmap', + env_from_secret_ref='env_from_secret_a,env_from_secret_b', + image_pull_secrets='pull_secret_a,pull_secret_b' + ) + self.maxDiff = None + + def test_simple_pod_request_factory_create(self): + expected_result = { + 'apiVersion': 'v1', + 'kind': 'Pod', + 'metadata': { + 'name': 'myapp-pod', + 'labels': {'app': 'myapp'}, + 'annotations': {}}, + 'spec': { + 'containers': [{ + 'name': 'base', + 'image': 'busybox', + 'command': [ + 'sh', '-c', 'echo Hello Kubernetes!' + ], + 'imagePullPolicy': 'IfNotPresent', + 'args': [], + 'env': [{ + 'name': 'ENVIRONMENT', + 'value': 'prod' + }, { + 'name': 'LOG_LEVEL', + 'value': 'warning' + }], + 'envFrom': [{ + 'configMapRef': { + 'name': 'env_from_configmap' + } + }, { + 'secretRef': { + 'name': 'env_from_secret_a' + } + }, { + 'secretRef': { + 'name': 'env_from_secret_b' + } + }] + }], + 'restartPolicy': 'Never', + 'nodeSelector': {}, + 'volumes': [], + 'imagePullSecrets': [ + {'name': 'pull_secret_a'}, + {'name': 'pull_secret_b'} + ], + 'affinity': {} + } + } + result = self.simple_pod_request_factory.create(self.pod) + self.assertDictEqual(result, expected_result) + + def test_xcom_pod_request_factory_create(self): + result = self.xcom_pod_request_factory.create(self.pod) + expected_result = { + 'apiVersion': 'v1', + 'kind': 'Pod', + 'metadata': { + 'name': 'myapp-pod', + 'labels': {'app': 'myapp'}, + 'annotations': {} + }, 'spec': { + 'volumes': [{'name': 'xcom', 'emptyDir': {}}], + 'containers': [{ + 'name': 'base', + 'image': 'busybox', + 'command': [ + 'sh', '-c', 'echo Hello Kubernetes!' + ], + 'volumeMounts': [{ + 'name': 'xcom', + 'mountPath': '/airflow/xcom' + }], + 'imagePullPolicy': 'IfNotPresent', + 'args': [], + 'env': [ + {'name': 'ENVIRONMENT', 'value': 'prod'}, + {'name': 'LOG_LEVEL', 'value': 'warning'} + ], 'envFrom': [{ + 'configMapRef': { + 'name': 'env_from_configmap' + } + }, { + 'secretRef': { + 'name': 'env_from_secret_a' + } + }, { + 'secretRef': { + 'name': 'env_from_secret_b' + } + }] + }, { + 'name': 'airflow-xcom-sidecar', + 'image': 'python:3.5-alpine', + 'command': ['python', '-c', ANY], + 'volumeMounts': [{ + 'name': 'xcom', 'mountPath': '/airflow/xcom' + }] + }], + 'restartPolicy': 'Never', + 'nodeSelector': {}, + 'imagePullSecrets': [ + {'name': 'pull_secret_a'}, + {'name': 'pull_secret_b'}], + 'affinity': {} + } + } + self.assertDictEqual(result, expected_result) From 93384db6ff94d06cf8d11ad35e3d8f5cefed82b6 Mon Sep 17 00:00:00 2001 From: David Lum Date: Thu, 28 Mar 2019 14:37:31 -0400 Subject: [PATCH 2/4] [AIRFLOW-4008] refactor Secrets --- .../kubernetes_request_factory.py | 4 +- airflow/contrib/kubernetes/secret.py | 21 +- .../kubernetes/worker_configuration.py | 3 +- .../executors/test_kubernetes_executor.py | 13 +- .../test_kubernetes_request_factory.py | 267 ++++++++---------- .../test_pod_request_factory.py | 64 ++++- 6 files changed, 183 insertions(+), 189 deletions(-) diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py index f4cbbf7d4818f..d1b6d4eaf4dca 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py @@ -132,7 +132,7 @@ def extract_volume_secrets(pod, req): @staticmethod def extract_env_and_secrets(pod, req): envs_from_key_secrets = [ - env for env in pod.secrets if env.deploy_type == 'env' and hasattr(env, 'key') + env for env in pod.secrets if env.deploy_type == 'env' and env.key is not None ] if len(pod.envs) > 0 or len(envs_from_key_secrets) > 0: @@ -206,7 +206,7 @@ def extract_security_context(pod, req): @staticmethod def _apply_env_from(pod, req): envs_from_secrets = [ - env for env in pod.secrets if env.deploy_type == 'env' and not hasattr(env, 'key') + env for env in pod.secrets if env.deploy_type == 'env' and env.key is None ] if pod.configmaps or envs_from_secrets: diff --git a/airflow/contrib/kubernetes/secret.py b/airflow/contrib/kubernetes/secret.py index 41643bd4e59be..3c0943268d186 100644 --- a/airflow/contrib/kubernetes/secret.py +++ b/airflow/contrib/kubernetes/secret.py @@ -37,18 +37,23 @@ def __init__(self, deploy_type, deploy_target, secret, key=None): :type key: str or None """ self.deploy_type = deploy_type + self.deploy_target = deploy_target - if deploy_target: + if deploy_target is not None and deploy_type == 'env': + # if deploying to env, capitalize the deploy target self.deploy_target = deploy_target.upper() - if deploy_type == 'volume': - self.deploy_target = deploy_target - - if not deploy_type == 'env' and key is None: + if key is not None and deploy_target is None: raise AirflowConfigException( - 'If deploy_type is not `env` parameter `key` is mandatory' + 'If `key` is set, `deploy_target` should not be None' ) self.secret = secret - if key: - self.key = key + self.key = key + + def __eq__(self, other): + return \ + self.deploy_type == other.deploy_type and \ + self.deploy_target == other.deploy_target and \ + self.secret == other.secret and \ + self.key == other.key diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py index 7fb34ecc5b66b..60128f1479e81 100644 --- a/airflow/contrib/kubernetes/worker_configuration.py +++ b/airflow/contrib/kubernetes/worker_configuration.py @@ -168,7 +168,8 @@ def _get_secrets(self): for env_var_name, obj_key_pair in six.iteritems(self.kube_config.kube_secrets): k8s_secret_obj, k8s_secret_key = obj_key_pair.split('=') worker_secrets.append( - Secret('env', env_var_name, k8s_secret_obj, k8s_secret_key)) + Secret('env', env_var_name, k8s_secret_obj, k8s_secret_key) + ) if self.kube_config.env_from_secret_ref: for secret_ref in self.kube_config.env_from_secret_ref.split(','): diff --git a/tests/contrib/executors/test_kubernetes_executor.py b/tests/contrib/executors/test_kubernetes_executor.py index 0b0581ad6c57b..8e8229c75ddc2 100644 --- a/tests/contrib/executors/test_kubernetes_executor.py +++ b/tests/contrib/executors/test_kubernetes_executor.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- -# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -163,11 +161,8 @@ def setUp(self): self.resources = mock.patch( 'airflow.contrib.kubernetes.worker_configuration.Resources' ) - self.secret = mock.patch( - 'airflow.contrib.kubernetes.worker_configuration.Secret' - ) - for patcher in [self.resources, self.secret]: + for patcher in [self.resources]: self.mock_foo = patcher.start() self.addCleanup(patcher.stop) @@ -593,20 +588,20 @@ def test_kubernetes_environment_variables(self): def test_get_secrets(self): # Test when secretRef is None and kube_secrets is not empty - self.kube_config.kube_config.kube_secrets = { + self.kube_config.kube_secrets = { 'POSTGRES_PASSWORD': 'airflow-secret=postgres_credentials', 'AWS_SECRET_KEY': 'airflow-secret=aws_secret_key' } self.kube_config.env_from_secret_ref = None worker_config = WorkerConfiguration(self.kube_config) secrets = worker_config._get_secrets() - self.assertListEqual([ + self.assertEqual([ Secret('env', 'POSTGRES_PASSWORD', 'airflow-secret', 'postgres_credentials'), Secret('env', 'AWS_SECRET_KEY', 'airflow-secret', 'aws_secret_key') ], secrets) # Test when secret is not empty and kube_secrets is empty dict - self.kube_config.kube_config.kube_secrets = {} + self.kube_config.kube_secrets = {} self.kube_config.env_from_secret_ref = 'secret_a,secret_b' worker_config = WorkerConfiguration(self.kube_config) secrets = worker_config._get_secrets() diff --git a/tests/contrib/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py b/tests/contrib/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py index a0ab39142f625..df2b6c926718a 100644 --- a/tests/contrib/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py +++ b/tests/contrib/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py @@ -19,60 +19,58 @@ kubernetes_request_factory import KubernetesRequestFactory from airflow.contrib.kubernetes.pod import Pod, Resources from airflow.contrib.kubernetes.secret import Secret -import yaml +from parameterized import parameterized import unittest class TestKubernetesRequestFactory(unittest.TestCase): - _yaml = """ -apiVersion: v1 -kind: Pod -metadata: - name: name -spec: - containers: - - name: base - image: airflow-worker:latest - command: ["/usr/local/airflow/entrypoint.sh", "/bin/bash sleep 25"] - restartPolicy: Never -""" - - def setUp(self) -> None: + def setUp(self): self.kubernetes_request_factory = KubernetesRequestFactory() - self.req = yaml.load(self._yaml) + self.req = { + 'apiVersion': 'v1', + 'kind': 'Pod', + 'metadata': { + 'name': 'name' + }, + 'spec': { + 'restartPolicy': 'Never', + 'containers': [{ + 'name': 'base', + 'image': 'airflow-worker:latest', + 'command': [ + "/usr/local/airflow/entrypoint.sh", + "/bin/bash sleep 25" + ], + }] + } + } def test_extract_image(self): input_req = self.req.copy() - reference = self.req.copy() - pod = Pod('v3.14', {}, []) + expected = self.req + image = 'v3.14' + pod = Pod(image, {}, []) self.kubernetes_request_factory.extract_image(pod, input_req) - reference['spec']['containers'][0]['image'] = pod.image - self.assertDictEqual(input_req, reference) + expected['spec']['containers'][0]['image'] = image + self.assertDictEqual(input_req, expected) def test_extract_image_pull_policy(self): - # Test when pull policy is none - input_req = self.req.copy() - reference = self.req.copy() - pod = Pod('v3.14', {}, []) - self.kubernetes_request_factory.extract_image_pull_policy(pod, input_req) - self.assertDictEqual(input_req, reference) - # Test when pull policy is not none pull_policy = 'IfNotPresent' pod = Pod('v3.14', {}, [], image_pull_policy=pull_policy) input_req = self.req.copy() - reference = self.req.copy() + expected = self.req self.kubernetes_request_factory.extract_image_pull_policy(pod, input_req) - reference['spec']['containers'][0]['imagePullPolicy'] = pull_policy - self.assertDictEqual(input_req, reference) + expected['spec']['containers'][0]['imagePullPolicy'] = pull_policy + self.assertDictEqual(input_req, expected) def test_add_secret_to_env(self): secret = Secret('env', 'target', 'my-secret', 'KEY') secret_list = [] - reference = [{ + expected = [{ 'name': 'TARGET', 'valueFrom': { 'secretKeyRef': { @@ -82,145 +80,96 @@ def test_add_secret_to_env(self): } }] self.kubernetes_request_factory.add_secret_to_env(secret_list, secret) - self.assertListEqual(secret_list, reference) + self.assertListEqual(secret_list, expected) def test_extract_labels(self): - # Test when labels are empty dict - input_req = self.req.copy() - reference = self.req.copy() - pod = Pod('v3.14', {}, []) - self.kubernetes_request_factory.extract_labels(pod, input_req) - self.assertDictEqual(input_req, reference) - # Test when labels are not empty labels = {'label_a': 'val_a', 'label_b': 'val_b'} pod = Pod('v3.14', {}, [], labels=labels) input_req = self.req.copy() - reference = self.req.copy() - reference['metadata']['labels'] = labels + expected = self.req + expected['metadata']['labels'] = labels self.kubernetes_request_factory.extract_labels(pod, input_req) - self.assertDictEqual(input_req, reference) + self.assertDictEqual(input_req, expected) def test_extract_annotations(self): - # Test when annotations are empty dict - input_req = self.req.copy() - reference = self.req.copy() - pod = Pod('v3.14', {}, []) - self.kubernetes_request_factory.extract_annotations(pod, input_req) - self.assertDictEqual(input_req, reference) - # Test when annotations are not empty annotations = {'annot_a': 'val_a', 'annot_b': 'val_b'} pod = Pod('v3.14', {}, [], annotations=annotations) input_req = self.req.copy() - reference = self.req.copy() - reference['metadata']['labels'] = annotations + expected = self.req + expected['metadata']['labels'] = annotations self.kubernetes_request_factory.extract_annotations(pod, input_req) - self.assertDictEqual(input_req, reference) + self.assertDictEqual(input_req, expected) def test_extract_affinity(self): - # Test when affinity is empty dict - input_req = self.req.copy() - reference = self.req.copy() - pod = Pod('v3.14', {}, []) - self.kubernetes_request_factory.extract_affinity(pod, input_req) - self.assertDictEqual(input_req, reference) - # Test when affinity is not empty - affinity = {'podAffinity': {'requiredDuringSchedulingIgnoredDuringExecution'}} + affinity = {'podAffinity': 'requiredDuringSchedulingIgnoredDuringExecution'} pod = Pod('v3.14', {}, [], affinity=affinity) input_req = self.req.copy() - reference = self.req.copy() - reference['spec']['affinity'] = affinity + expected = self.req + expected['spec']['affinity'] = affinity self.kubernetes_request_factory.extract_affinity(pod, input_req) - self.assertDictEqual(input_req, reference) + self.assertDictEqual(input_req, expected) def test_extract_node_selector(self): - # Test when affinity is empty dict - input_req = self.req.copy() - reference = self.req.copy() - pod = Pod('v3.14', {}, []) - self.kubernetes_request_factory.extract_node_selector(pod, input_req) - self.assertDictEqual(input_req, reference) - # Test when affinity is not empty - affinity = {'podAffinity': {'requiredDuringSchedulingIgnoredDuringExecution'}} - pod = Pod('v3.14', {}, [], affinity=affinity) + node_selectors = {'disktype': 'ssd', 'accelerator': 'nvidia-tesla-p100'} + pod = Pod('v3.14', {}, [], node_selectors=node_selectors) input_req = self.req.copy() - reference = self.req.copy() - reference['spec']['affinity'] = affinity - self.kubernetes_request_factory.extract_affinity(pod, input_req) - self.assertDictEqual(input_req, reference) + expected = self.req + expected['spec']['nodeSelector'] = node_selectors + self.kubernetes_request_factory.extract_node_selector(pod, input_req) + self.assertDictEqual(input_req, expected) def test_extract_cmds(self): input_req = self.req.copy() - reference = self.req.copy() + expected = self.req.copy() cmds = ['test-cmd.sh'] pod = Pod('v3.14', {}, cmds) self.kubernetes_request_factory.extract_cmds(pod, input_req) - reference['spec']['containers'][0]['command'] = cmds - self.assertDictEqual(input_req, reference) + expected['spec']['containers'][0]['command'] = cmds + self.assertDictEqual(input_req, expected) def test_extract_args(self): input_req = self.req.copy() - reference = self.req.copy() + expected = self.req args = ['test_arg.sh'] pod = Pod('v3.14', {}, [], args=args) self.kubernetes_request_factory.extract_args(pod, input_req) - reference['spec']['containers'][0]['args'] = args - self.assertDictEqual(input_req, reference) + expected['spec']['containers'][0]['args'] = args + self.assertDictEqual(input_req, expected) def test_attach_volumes(self): - # Test when volumes is empty list - input_req = self.req.copy() - reference = self.req.copy() - pod = Pod('v3.14', {}, []) - self.kubernetes_request_factory.attach_volumes(pod, input_req) - self.assertDictEqual(input_req, reference) - # Test when volumes is not empty list volumes = ['vol_a', 'vol_b'] pod = Pod('v3.14', {}, [], volumes=volumes) input_req = self.req.copy() - reference = self.req.copy() - reference['spec']['volumes'] = volumes + expected = self.req + expected['spec']['volumes'] = volumes self.kubernetes_request_factory.attach_volumes(pod, input_req) - self.assertDictEqual(input_req, reference) + self.assertDictEqual(input_req, expected) def test_attach_volume_mounts(self): - # Test when volumes is empty list - input_req = self.req.copy() - reference = self.req.copy() - pod = Pod('v3.14', {}, []) - self.kubernetes_request_factory.attach_volume_mounts(pod, input_req) - self.assertDictEqual(input_req, reference) - # Test when volumes is not empty list volume_mounts = ['vol_a', 'vol_b'] pod = Pod('v3.14', {}, [], volume_mounts=volume_mounts) input_req = self.req.copy() - reference = self.req.copy() - reference['spec']['containers'][0]['volumeMounts'] = volume_mounts + expected = self.req + expected['spec']['containers'][0]['volumeMounts'] = volume_mounts self.kubernetes_request_factory.attach_volume_mounts(pod, input_req) - self.assertDictEqual(input_req, reference) + self.assertDictEqual(input_req, expected) def test_extract_name(self): input_req = self.req.copy() - reference = self.req.copy() + expected = self.req name = 'pod-name' pod = Pod('v3.14', {}, [], name=name) - reference['metadata']['name'] = name + expected['metadata']['name'] = name self.kubernetes_request_factory.extract_name(pod, input_req) - self.assertDictEqual(input_req, reference) + self.assertDictEqual(input_req, expected) def test_extract_volume_secrets(self): - # Test when secrets is empty - input_req = self.req.copy() - reference = self.req.copy() - pod = Pod('v3.14', {}, []) - self.kubernetes_request_factory.extract_volume_secrets(pod, input_req) - self.assertDictEqual(input_req, reference) - # Test when secrets is not empty secrets = [ Secret('volume', 'KEY1', 's1', 'key-1'), @@ -229,8 +178,8 @@ def test_extract_volume_secrets(self): ] pod = Pod('v3.14', {}, [], secrets=secrets) input_req = self.req.copy() - reference = self.req.copy() - reference['spec']['containers'][0]['volumeMounts'] = [{ + expected = self.req + expected['spec']['containers'][0]['volumeMounts'] = [{ 'mountPath': 'KEY1', 'name': 'secretvol1', 'readOnly': True @@ -239,7 +188,7 @@ def test_extract_volume_secrets(self): 'name': 'secretvol2', 'readOnly': True }] - reference['spec']['volumes'] = [{ + expected['spec']['volumes'] = [{ 'name': 'secretvol1', 'secret': { 'secretName': 's1' @@ -251,16 +200,9 @@ def test_extract_volume_secrets(self): } }] self.kubernetes_request_factory.extract_volume_secrets(pod, input_req) - self.assertDictEqual(input_req, reference) + self.assertDictEqual(input_req, expected) def test_extract_env_and_secrets(self): - # Test when secret and envs is empty - input_req = self.req.copy() - reference = self.req.copy() - pod = Pod('v3.14', {}, []) - self.kubernetes_request_factory.extract_env_and_secrets(pod, input_req) - self.assertDictEqual(input_req, reference) - # Test when secrets and envs are not empty secrets = [ Secret('env', None, 's1'), @@ -274,9 +216,9 @@ def test_extract_env_and_secrets(self): configmaps = ['configmap_a', 'configmap_b'] pod = Pod('v3.14', envs, [], secrets=secrets, configmaps=configmaps) input_req = self.req.copy() - reference = self.req.copy() - reference['spec']['containers'][0]['env'] = envs - reference['spec']['containers'][0]['envFrom'] = [{ + expected = self.req + expected['spec']['containers'][0]['env'] = envs + expected['spec']['containers'][0]['envFrom'] = [{ 'secretRef': { 'secretName': 's1' } @@ -294,68 +236,61 @@ def test_extract_env_and_secrets(self): } }] self.kubernetes_request_factory.extract_volume_secrets(pod, input_req) - self.assertDictEqual(input_req, reference) + self.assertDictEqual(input_req, expected) def test_extract_resources(self): - # Test when resources is empty - input_req = self.req.copy() - reference = self.req.copy() - pod = Pod('v3.14', {}, []) - self.kubernetes_request_factory.extract_resources(pod, input_req) - self.assertDictEqual(input_req, reference) - # Test when resources is not empty resources = Resources('1Gi', 1, '2Gi', 2) pod = Pod('v3.14', {}, [], resources=resources) input_req = self.req.copy() - reference = self.req.copy() - reference['spec']['containers'][0]['resources'] = resources + expected = self.req + expected['spec']['containers'][0]['resources'] = resources self.kubernetes_request_factory.extract_volume_secrets(pod, input_req) - self.assertDictEqual(input_req, reference) + self.assertDictEqual(input_req, expected) def test_extract_init_containers(self): input_req = self.req.copy() - reference = self.req.copy() + expected = self.req init_container = 'init_container' pod = Pod('v3.14', {}, [], init_containers=init_container) - reference['spec']['initContainers'] = init_container + expected['spec']['initContainers'] = init_container self.kubernetes_request_factory.extract_resources(pod, input_req) - self.assertDictEqual(input_req, reference) + self.assertDictEqual(input_req, expected) def test_extract_service_account_name(self): input_req = self.req.copy() - reference = self.req.copy() + expected = self.req service_account_name = 'service_account_name' pod = Pod('v3.14', {}, [], service_account_name=service_account_name) - reference['spec']['serviceAccountName'] = service_account_name + expected['spec']['serviceAccountName'] = service_account_name self.kubernetes_request_factory.extract_resources(pod, input_req) - self.assertDictEqual(input_req, reference) + self.assertDictEqual(input_req, expected) def test_extract_hostnetwork(self): input_req = self.req.copy() - reference = self.req.copy() + expected = self.req hostnetwork = True pod = Pod('v3.14', {}, [], hostnetwork=hostnetwork) - reference['spec']['serviceAccountName'] = hostnetwork + expected['spec']['serviceAccountName'] = hostnetwork self.kubernetes_request_factory.extract_resources(pod, input_req) - self.assertDictEqual(input_req, reference) + self.assertDictEqual(input_req, expected) def test_extract_image_pull_secrets(self): input_req = self.req.copy() - reference = self.req.copy() + expected = self.req image_pull_secrets = 'secret_a,secret_b,secret_c' pod = Pod('v3.14', {}, [], image_pull_secrets=image_pull_secrets) - reference['spec']['imagePullSecrets'] = [ + expected['spec']['imagePullSecrets'] = [ {'name': 'secret_a'}, {'name': 'secret_b'}, {'name': 'secret_c'}, ] self.kubernetes_request_factory.extract_resources(pod, input_req) - self.assertDictEqual(input_req, reference) + self.assertDictEqual(input_req, expected) def test_extract_tolerations(self): input_req = self.req.copy() - reference = self.req.copy() + expected = self.req tolerations = [{ 'key': 'key', 'operator': 'Equal', @@ -363,18 +298,40 @@ def test_extract_tolerations(self): 'effect': 'NoSchedule' }] pod = Pod('v3.14', {}, [], tolerations=tolerations) - reference['spec']['tolerations'] = tolerations + expected['spec']['tolerations'] = tolerations self.kubernetes_request_factory.extract_resources(pod, input_req) - self.assertDictEqual(input_req, reference) + self.assertDictEqual(input_req, expected) def test_security_context(self): input_req = self.req.copy() - reference = self.req.copy() + expected = self.req security_context = { 'runAsUser': 1000, 'fsGroup': 2000 } pod = Pod('v3.14', {}, [], security_context=security_context) - reference['spec']['securityContext'] = security_context + expected['spec']['securityContext'] = security_context self.kubernetes_request_factory.extract_resources(pod, input_req) - self.assertDictEqual(input_req, reference) + self.assertDictEqual(input_req, expected) + + @parameterized.expand([ + 'extract_image_pull_policy', + 'extract_labels', + 'extract_annotations', + 'extract_affinity', + 'extract_node_selector', + 'attach_volumes', + 'attach_volume_mounts', + 'extract_volume_secrets', + 'extract_env_and_secrets', + 'extract_resources' + ]) + def test_identity(self, name): + input_req = self.req.copy() + expected = self.req + pod = Pod('v3.14', {}, []) + kubernetes_request_factory_method = getattr( + self.kubernetes_request_factory, name + ) + kubernetes_request_factory_method(pod, input_req) + self.assertDictEqual(input_req, expected) diff --git a/tests/contrib/kubernetes/kubernetes_request_factory/test_pod_request_factory.py b/tests/contrib/kubernetes/kubernetes_request_factory/test_pod_request_factory.py index 0ec45424ecf26..47a5be0962144 100644 --- a/tests/contrib/kubernetes/kubernetes_request_factory/test_pod_request_factory.py +++ b/tests/contrib/kubernetes/kubernetes_request_factory/test_pod_request_factory.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- -# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -21,6 +19,8 @@ pod_request_factory import SimplePodRequestFactory, \ ExtractXcomPodRequestFactory from airflow.contrib.kubernetes.pod import Pod +from airflow.contrib.kubernetes.secret import Secret +from airflow.exceptions import AirflowConfigException from mock import ANY import unittest @@ -41,6 +41,14 @@ def setUp(self): labels={'app': 'myapp'}, image_pull_secrets='pull_secret_a,pull_secret_b', configmaps=['configmap_a', 'configmap_b'], + secrets=[ + # This should be a secretRef + Secret('env', None, 'secret_a'), + # This should be a single secret mounted in volumeMounts + Secret('volume', '/etc/foo', 'secret_b'), + # This should produce a single secret mounted in env + Secret('env', 'TARGET', 'secret_b', 'source_b'), + ] ) self.maxDiff = None self.expected_result = { @@ -65,8 +73,20 @@ def setUp(self): }, { 'name': 'LOG_LEVEL', 'value': 'warning' + }, { + 'name': 'TARGET', + 'valueFrom': { + 'secretKeyRef': { + 'name': 'secret_b', + 'key': 'source_b' + } + } }], 'envFrom': [{ + 'secretRef': { + 'name': 'secret_a' + } + }, { 'configMapRef': { 'name': 'configmap_a' } @@ -74,11 +94,21 @@ def setUp(self): 'configMapRef': { 'name': 'configmap_b' } + }], + 'volumeMounts': [{ + 'mountPath': '/etc/foo', + 'name': 'secretvol0', + 'readOnly': True }] }], 'restartPolicy': 'Never', 'nodeSelector': {}, - 'volumes': [], + 'volumes': [{ + 'name': 'secretvol0', + 'secret': { + 'secretName': 'secret_b' + } + }], 'imagePullSecrets': [ {'name': 'pull_secret_a'}, {'name': 'pull_secret_b'} @@ -87,6 +117,10 @@ def setUp(self): } } + def test_secret_throws(self): + with self.assertRaises(AirflowConfigException): + Secret('volume', None, 'secret_a', 'key') + def test_simple_pod_request_factory_create(self): result = self.simple_pod_request_factory.create(self.pod) self.assertDictEqual(result, self.expected_result) @@ -97,18 +131,20 @@ def test_xcom_pod_request_factory_create(self): 'name': 'airflow-xcom-sidecar', 'image': 'python:3.5-alpine', 'command': ['python', '-c', ANY], - 'volumeMounts': [{ - 'name': 'xcom', 'mountPath': '/airflow/xcom' - }] + 'volumeMounts': [ + { + 'name': 'xcom', + 'mountPath': '/airflow/xcom' + } + ] } - volume_mount = [{ - 'name': 'xcom', - 'mountPath': '/airflow/xcom' - }] expected_result = self.expected_result.copy() expected_result['spec']['containers'].append(container_two) - expected_result['spec']['containers'][0]['volumeMounts'] = volume_mount - expected_result['spec']['volumes'] = [ - {'name': 'xcom', 'emptyDir': {}} - ] + expected_result['spec']['containers'][0]['volumeMounts'].insert(0, { + 'name': 'xcom', + 'mountPath': '/airflow/xcom' + }) + expected_result['spec']['volumes'].insert(0, { + 'name': 'xcom', 'emptyDir': {} + }) self.assertDictEqual(result, expected_result) From 26f8c637974ea9aaba177b51fc49f0f0d9ed2c12 Mon Sep 17 00:00:00 2001 From: David Lum Date: Fri, 29 Mar 2019 12:24:18 -0400 Subject: [PATCH 3/4] [AIRFLOW-4008] fix doc errors --- docs/concepts.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/concepts.rst b/docs/concepts.rst index 94a3fec4f63e1..00cc7b9818c35 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -477,9 +477,9 @@ passed, then a corresponding list of XCom values is returned. It is also possible to pull XCom directly in a template, here's an example of what this may look like: -.. code:: sql +.. code:: python - SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }} + "SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }}" Note that XComs are similar to `Variables`_, but are specifically designed for inter-task communication rather than global settings. From 6e0068fd466869a49b2075b9a19379c7e349bc63 Mon Sep 17 00:00:00 2001 From: David Lum Date: Fri, 29 Mar 2019 13:33:56 -0400 Subject: [PATCH 4/4] [AIRFLOW-4008] don't initialize object, use static methods * deepcopy objects --- airflow/contrib/kubernetes/secret.py | 19 +- .../executors/test_kubernetes_executor.py | 19 +- .../test_kubernetes_request_factory.py | 235 ++++++++---------- .../test_pod_request_factory.py | 29 ++- 4 files changed, 146 insertions(+), 156 deletions(-) diff --git a/airflow/contrib/kubernetes/secret.py b/airflow/contrib/kubernetes/secret.py index 3c0943268d186..73c51e900acf9 100644 --- a/airflow/contrib/kubernetes/secret.py +++ b/airflow/contrib/kubernetes/secret.py @@ -17,7 +17,7 @@ from airflow.exceptions import AirflowConfigException -class Secret: +class Secret(object): """Defines Kubernetes Secret Volume""" def __init__(self, deploy_type, deploy_target, secret, key=None): @@ -52,8 +52,17 @@ def __init__(self, deploy_type, deploy_target, secret, key=None): self.key = key def __eq__(self, other): - return \ - self.deploy_type == other.deploy_type and \ - self.deploy_target == other.deploy_target and \ - self.secret == other.secret and \ + return ( + self.deploy_type == other.deploy_type and + self.deploy_target == other.deploy_target and + self.secret == other.secret and self.key == other.key + ) + + def __repr__(self): + return 'Secret({}, {}, {}, {})'.format( + self.deploy_type, + self.deploy_target, + self.secret, + self.key + ) diff --git a/tests/contrib/executors/test_kubernetes_executor.py b/tests/contrib/executors/test_kubernetes_executor.py index 8e8229c75ddc2..9e969f390e896 100644 --- a/tests/contrib/executors/test_kubernetes_executor.py +++ b/tests/contrib/executors/test_kubernetes_executor.py @@ -589,26 +589,29 @@ def test_kubernetes_environment_variables(self): def test_get_secrets(self): # Test when secretRef is None and kube_secrets is not empty self.kube_config.kube_secrets = { - 'POSTGRES_PASSWORD': 'airflow-secret=postgres_credentials', - 'AWS_SECRET_KEY': 'airflow-secret=aws_secret_key' + 'AWS_SECRET_KEY': 'airflow-secret=aws_secret_key', + 'POSTGRES_PASSWORD': 'airflow-secret=postgres_credentials' } self.kube_config.env_from_secret_ref = None worker_config = WorkerConfiguration(self.kube_config) secrets = worker_config._get_secrets() - self.assertEqual([ - Secret('env', 'POSTGRES_PASSWORD', 'airflow-secret', 'postgres_credentials'), - Secret('env', 'AWS_SECRET_KEY', 'airflow-secret', 'aws_secret_key') - ], secrets) + secrets.sort(key=lambda secret: secret.deploy_target) + expected = [ + Secret('env', 'AWS_SECRET_KEY', 'airflow-secret', 'aws_secret_key'), + Secret('env', 'POSTGRES_PASSWORD', 'airflow-secret', 'postgres_credentials') + ] + self.assertListEqual(expected, secrets) # Test when secret is not empty and kube_secrets is empty dict self.kube_config.kube_secrets = {} self.kube_config.env_from_secret_ref = 'secret_a,secret_b' worker_config = WorkerConfiguration(self.kube_config) secrets = worker_config._get_secrets() - self.assertListEqual([ + expected = [ Secret('env', None, 'secret_a'), Secret('env', None, 'secret_b') - ], secrets) + ] + self.assertListEqual(expected, secrets) def test_get_configmaps(self): # Test when configmap is empty diff --git a/tests/contrib/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py b/tests/contrib/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py index df2b6c926718a..e7f444abb14a1 100644 --- a/tests/contrib/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py +++ b/tests/contrib/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py @@ -21,21 +21,20 @@ from airflow.contrib.kubernetes.secret import Secret from parameterized import parameterized import unittest +import copy class TestKubernetesRequestFactory(unittest.TestCase): def setUp(self): - self.kubernetes_request_factory = KubernetesRequestFactory() - self.req = { + self.expected = { 'apiVersion': 'v1', 'kind': 'Pod', 'metadata': { 'name': 'name' }, 'spec': { - 'restartPolicy': 'Never', 'containers': [{ 'name': 'base', 'image': 'airflow-worker:latest', @@ -43,34 +42,32 @@ def setUp(self): "/usr/local/airflow/entrypoint.sh", "/bin/bash sleep 25" ], - }] + }], + 'restartPolicy': 'Never' } } + self.input_req = copy.deepcopy(self.expected) def test_extract_image(self): - input_req = self.req.copy() - expected = self.req image = 'v3.14' pod = Pod(image, {}, []) - self.kubernetes_request_factory.extract_image(pod, input_req) - expected['spec']['containers'][0]['image'] = image - self.assertDictEqual(input_req, expected) + KubernetesRequestFactory.extract_image(pod, self.input_req) + self.expected['spec']['containers'][0]['image'] = image + self.assertEqual(self.input_req, self.expected) def test_extract_image_pull_policy(self): # Test when pull policy is not none pull_policy = 'IfNotPresent' pod = Pod('v3.14', {}, [], image_pull_policy=pull_policy) - input_req = self.req.copy() - expected = self.req - self.kubernetes_request_factory.extract_image_pull_policy(pod, input_req) - expected['spec']['containers'][0]['imagePullPolicy'] = pull_policy - self.assertDictEqual(input_req, expected) + KubernetesRequestFactory.extract_image_pull_policy(pod, self.input_req) + self.expected['spec']['containers'][0]['imagePullPolicy'] = pull_policy + self.assertEqual(self.input_req, self.expected) def test_add_secret_to_env(self): secret = Secret('env', 'target', 'my-secret', 'KEY') secret_list = [] - expected = [{ + self.expected = [{ 'name': 'TARGET', 'valueFrom': { 'secretKeyRef': { @@ -79,95 +76,77 @@ def test_add_secret_to_env(self): } } }] - self.kubernetes_request_factory.add_secret_to_env(secret_list, secret) - self.assertListEqual(secret_list, expected) + KubernetesRequestFactory.add_secret_to_env(secret_list, secret) + self.assertListEqual(secret_list, self.expected) def test_extract_labels(self): # Test when labels are not empty labels = {'label_a': 'val_a', 'label_b': 'val_b'} pod = Pod('v3.14', {}, [], labels=labels) - input_req = self.req.copy() - expected = self.req - expected['metadata']['labels'] = labels - self.kubernetes_request_factory.extract_labels(pod, input_req) - self.assertDictEqual(input_req, expected) + self.expected['metadata']['labels'] = labels + KubernetesRequestFactory.extract_labels(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) def test_extract_annotations(self): # Test when annotations are not empty annotations = {'annot_a': 'val_a', 'annot_b': 'val_b'} pod = Pod('v3.14', {}, [], annotations=annotations) - input_req = self.req.copy() - expected = self.req - expected['metadata']['labels'] = annotations - self.kubernetes_request_factory.extract_annotations(pod, input_req) - self.assertDictEqual(input_req, expected) + self.expected['metadata']['annotations'] = annotations + KubernetesRequestFactory.extract_annotations(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) def test_extract_affinity(self): # Test when affinity is not empty affinity = {'podAffinity': 'requiredDuringSchedulingIgnoredDuringExecution'} pod = Pod('v3.14', {}, [], affinity=affinity) - input_req = self.req.copy() - expected = self.req - expected['spec']['affinity'] = affinity - self.kubernetes_request_factory.extract_affinity(pod, input_req) - self.assertDictEqual(input_req, expected) + self.expected['spec']['affinity'] = affinity + KubernetesRequestFactory.extract_affinity(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) def test_extract_node_selector(self): # Test when affinity is not empty node_selectors = {'disktype': 'ssd', 'accelerator': 'nvidia-tesla-p100'} pod = Pod('v3.14', {}, [], node_selectors=node_selectors) - input_req = self.req.copy() - expected = self.req - expected['spec']['nodeSelector'] = node_selectors - self.kubernetes_request_factory.extract_node_selector(pod, input_req) - self.assertDictEqual(input_req, expected) + self.expected['spec']['nodeSelector'] = node_selectors + KubernetesRequestFactory.extract_node_selector(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) def test_extract_cmds(self): - input_req = self.req.copy() - expected = self.req.copy() cmds = ['test-cmd.sh'] pod = Pod('v3.14', {}, cmds) - self.kubernetes_request_factory.extract_cmds(pod, input_req) - expected['spec']['containers'][0]['command'] = cmds - self.assertDictEqual(input_req, expected) + KubernetesRequestFactory.extract_cmds(pod, self.input_req) + self.expected['spec']['containers'][0]['command'] = cmds + self.assertEqual(self.input_req, self.expected) def test_extract_args(self): - input_req = self.req.copy() - expected = self.req args = ['test_arg.sh'] pod = Pod('v3.14', {}, [], args=args) - self.kubernetes_request_factory.extract_args(pod, input_req) - expected['spec']['containers'][0]['args'] = args - self.assertDictEqual(input_req, expected) + KubernetesRequestFactory.extract_args(pod, self.input_req) + self.expected['spec']['containers'][0]['args'] = args + self.assertEqual(self.input_req, self.expected) def test_attach_volumes(self): # Test when volumes is not empty list volumes = ['vol_a', 'vol_b'] pod = Pod('v3.14', {}, [], volumes=volumes) - input_req = self.req.copy() - expected = self.req - expected['spec']['volumes'] = volumes - self.kubernetes_request_factory.attach_volumes(pod, input_req) - self.assertDictEqual(input_req, expected) + self.expected['spec']['volumes'] = volumes + KubernetesRequestFactory.attach_volumes(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) def test_attach_volume_mounts(self): # Test when volumes is not empty list volume_mounts = ['vol_a', 'vol_b'] pod = Pod('v3.14', {}, [], volume_mounts=volume_mounts) - input_req = self.req.copy() - expected = self.req - expected['spec']['containers'][0]['volumeMounts'] = volume_mounts - self.kubernetes_request_factory.attach_volume_mounts(pod, input_req) - self.assertDictEqual(input_req, expected) + self.expected['spec']['containers'][0]['volumeMounts'] = volume_mounts + KubernetesRequestFactory.attach_volume_mounts(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) def test_extract_name(self): - input_req = self.req.copy() - expected = self.req name = 'pod-name' pod = Pod('v3.14', {}, [], name=name) - expected['metadata']['name'] = name - self.kubernetes_request_factory.extract_name(pod, input_req) - self.assertDictEqual(input_req, expected) + self.expected['metadata']['name'] = name + KubernetesRequestFactory.extract_name(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) def test_extract_volume_secrets(self): # Test when secrets is not empty @@ -177,30 +156,28 @@ def test_extract_volume_secrets(self): Secret('volume', 'KEY3', 's3', 'key-2') ] pod = Pod('v3.14', {}, [], secrets=secrets) - input_req = self.req.copy() - expected = self.req - expected['spec']['containers'][0]['volumeMounts'] = [{ + self.expected['spec']['containers'][0]['volumeMounts'] = [{ 'mountPath': 'KEY1', - 'name': 'secretvol1', + 'name': 'secretvol0', 'readOnly': True }, { 'mountPath': 'KEY3', - 'name': 'secretvol2', + 'name': 'secretvol1', 'readOnly': True }] - expected['spec']['volumes'] = [{ - 'name': 'secretvol1', + self.expected['spec']['volumes'] = [{ + 'name': 'secretvol0', 'secret': { 'secretName': 's1' } }, { - 'name': 'secretvol2', + 'name': 'secretvol1', 'secret': { 'secretName': 's3' } }] - self.kubernetes_request_factory.extract_volume_secrets(pod, input_req) - self.assertDictEqual(input_req, expected) + KubernetesRequestFactory.extract_volume_secrets(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) def test_extract_env_and_secrets(self): # Test when secrets and envs are not empty @@ -215,16 +192,17 @@ def test_extract_env_and_secrets(self): } configmaps = ['configmap_a', 'configmap_b'] pod = Pod('v3.14', envs, [], secrets=secrets, configmaps=configmaps) - input_req = self.req.copy() - expected = self.req - expected['spec']['containers'][0]['env'] = envs - expected['spec']['containers'][0]['envFrom'] = [{ + self.expected['spec']['containers'][0]['env'] = [ + {'name': 'ENV1', 'value': 'val1'}, + {'name': 'ENV2', 'value': 'val2'}, + ] + self.expected['spec']['containers'][0]['envFrom'] = [{ 'secretRef': { - 'secretName': 's1' + 'name': 's1' } }, { 'secretRef': { - 'secretName': 's3' + 'name': 's3' } }, { 'configMapRef': { @@ -232,65 +210,64 @@ def test_extract_env_and_secrets(self): } }, { 'configMapRef': { - 'name': 'configmap_a' + 'name': 'configmap_b' } }] - self.kubernetes_request_factory.extract_volume_secrets(pod, input_req) - self.assertDictEqual(input_req, expected) + + KubernetesRequestFactory.extract_env_and_secrets(pod, self.input_req) + self.input_req['spec']['containers'][0]['env'].sort(key=lambda x: x['name']) + self.assertEqual(self.input_req, self.expected) def test_extract_resources(self): # Test when resources is not empty resources = Resources('1Gi', 1, '2Gi', 2) pod = Pod('v3.14', {}, [], resources=resources) - input_req = self.req.copy() - expected = self.req - expected['spec']['containers'][0]['resources'] = resources - self.kubernetes_request_factory.extract_volume_secrets(pod, input_req) - self.assertDictEqual(input_req, expected) + self.expected['spec']['containers'][0]['resources'] = { + 'requests': { + 'memory': '1Gi', + 'cpu': 1 + }, + 'limits': { + 'memory': '2Gi', + 'cpu': 2 + }, + } + KubernetesRequestFactory.extract_resources(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) def test_extract_init_containers(self): - input_req = self.req.copy() - expected = self.req init_container = 'init_container' pod = Pod('v3.14', {}, [], init_containers=init_container) - expected['spec']['initContainers'] = init_container - self.kubernetes_request_factory.extract_resources(pod, input_req) - self.assertDictEqual(input_req, expected) + self.expected['spec']['initContainers'] = init_container + KubernetesRequestFactory.extract_init_containers(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) def test_extract_service_account_name(self): - input_req = self.req.copy() - expected = self.req service_account_name = 'service_account_name' pod = Pod('v3.14', {}, [], service_account_name=service_account_name) - expected['spec']['serviceAccountName'] = service_account_name - self.kubernetes_request_factory.extract_resources(pod, input_req) - self.assertDictEqual(input_req, expected) + self.expected['spec']['serviceAccountName'] = service_account_name + KubernetesRequestFactory.extract_service_account_name(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) def test_extract_hostnetwork(self): - input_req = self.req.copy() - expected = self.req hostnetwork = True pod = Pod('v3.14', {}, [], hostnetwork=hostnetwork) - expected['spec']['serviceAccountName'] = hostnetwork - self.kubernetes_request_factory.extract_resources(pod, input_req) - self.assertDictEqual(input_req, expected) + self.expected['spec']['hostNetwork'] = hostnetwork + KubernetesRequestFactory.extract_hostnetwork(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) def test_extract_image_pull_secrets(self): - input_req = self.req.copy() - expected = self.req image_pull_secrets = 'secret_a,secret_b,secret_c' pod = Pod('v3.14', {}, [], image_pull_secrets=image_pull_secrets) - expected['spec']['imagePullSecrets'] = [ + self.expected['spec']['imagePullSecrets'] = [ {'name': 'secret_a'}, {'name': 'secret_b'}, {'name': 'secret_c'}, ] - self.kubernetes_request_factory.extract_resources(pod, input_req) - self.assertDictEqual(input_req, expected) + KubernetesRequestFactory.extract_image_pull_secrets(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) def test_extract_tolerations(self): - input_req = self.req.copy() - expected = self.req tolerations = [{ 'key': 'key', 'operator': 'Equal', @@ -298,40 +275,32 @@ def test_extract_tolerations(self): 'effect': 'NoSchedule' }] pod = Pod('v3.14', {}, [], tolerations=tolerations) - expected['spec']['tolerations'] = tolerations - self.kubernetes_request_factory.extract_resources(pod, input_req) - self.assertDictEqual(input_req, expected) + self.expected['spec']['tolerations'] = tolerations + KubernetesRequestFactory.extract_tolerations(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) def test_security_context(self): - input_req = self.req.copy() - expected = self.req security_context = { 'runAsUser': 1000, 'fsGroup': 2000 } pod = Pod('v3.14', {}, [], security_context=security_context) - expected['spec']['securityContext'] = security_context - self.kubernetes_request_factory.extract_resources(pod, input_req) - self.assertDictEqual(input_req, expected) + self.expected['spec']['securityContext'] = security_context + KubernetesRequestFactory.extract_security_context(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) @parameterized.expand([ - 'extract_image_pull_policy', - 'extract_labels', - 'extract_annotations', - 'extract_affinity', - 'extract_node_selector', - 'attach_volumes', - 'attach_volume_mounts', - 'extract_volume_secrets', - 'extract_env_and_secrets', - 'extract_resources' + 'extract_resources', + 'extract_init_containers', + 'extract_service_account_name', + 'extract_hostnetwork', + 'extract_image_pull_secrets', + 'extract_tolerations', + 'extract_security_context', + 'extract_volume_secrets' ]) def test_identity(self, name): - input_req = self.req.copy() - expected = self.req + kube_request_factory_func = getattr(KubernetesRequestFactory, name) pod = Pod('v3.14', {}, []) - kubernetes_request_factory_method = getattr( - self.kubernetes_request_factory, name - ) - kubernetes_request_factory_method(pod, input_req) - self.assertDictEqual(input_req, expected) + kube_request_factory_func(pod, self.input_req) + self.assertEqual(self.input_req, self.expected) diff --git a/tests/contrib/kubernetes/kubernetes_request_factory/test_pod_request_factory.py b/tests/contrib/kubernetes/kubernetes_request_factory/test_pod_request_factory.py index 47a5be0962144..ff835ed0c9ec4 100644 --- a/tests/contrib/kubernetes/kubernetes_request_factory/test_pod_request_factory.py +++ b/tests/contrib/kubernetes/kubernetes_request_factory/test_pod_request_factory.py @@ -21,11 +21,18 @@ from airflow.contrib.kubernetes.pod import Pod from airflow.contrib.kubernetes.secret import Secret from airflow.exceptions import AirflowConfigException -from mock import ANY import unittest +XCOM_CMD = """import time +while True: + try: + time.sleep(3600) + except KeyboardInterrupt: + exit(0) +""" -class TestSimplePodRequestFactory(unittest.TestCase): + +class TestPodRequestFactory(unittest.TestCase): def setUp(self): self.simple_pod_request_factory = SimplePodRequestFactory() @@ -51,7 +58,7 @@ def setUp(self): ] ) self.maxDiff = None - self.expected_result = { + self.expected = { 'apiVersion': 'v1', 'kind': 'Pod', 'metadata': { @@ -123,14 +130,16 @@ def test_secret_throws(self): def test_simple_pod_request_factory_create(self): result = self.simple_pod_request_factory.create(self.pod) - self.assertDictEqual(result, self.expected_result) + # sort + result['spec']['containers'][0]['env'].sort(key=lambda x: x['name']) + self.assertEqual(result, self.expected) def test_xcom_pod_request_factory_create(self): result = self.xcom_pod_request_factory.create(self.pod) container_two = { 'name': 'airflow-xcom-sidecar', 'image': 'python:3.5-alpine', - 'command': ['python', '-c', ANY], + 'command': ['python', '-c', XCOM_CMD], 'volumeMounts': [ { 'name': 'xcom', @@ -138,13 +147,13 @@ def test_xcom_pod_request_factory_create(self): } ] } - expected_result = self.expected_result.copy() - expected_result['spec']['containers'].append(container_two) - expected_result['spec']['containers'][0]['volumeMounts'].insert(0, { + self.expected['spec']['containers'].append(container_two) + self.expected['spec']['containers'][0]['volumeMounts'].insert(0, { 'name': 'xcom', 'mountPath': '/airflow/xcom' }) - expected_result['spec']['volumes'].insert(0, { + self.expected['spec']['volumes'].insert(0, { 'name': 'xcom', 'emptyDir': {} }) - self.assertDictEqual(result, expected_result) + result['spec']['containers'][0]['env'].sort(key=lambda x: x['name']) + self.assertEqual(result, self.expected)