Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-4008] add envFrom for Kubernetes Executor #4952

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,14 @@ dags_volume_host =
# Useful in local environment, discouraged in production
logs_volume_host =

# A list of configMapsRefs to envFrom. If more than one configMap is
# specified, provide a comma separated list: configmap_a,configmap_b
env_from_configmap_ref =

# A list of secretRefs to envFrom. If more than one secret is
# specified, provide a comma separated list: secret_a,secret_b
env_from_secret_ref =

# Git credentials and repository for DAGs mounted via Git (mutually exclusive with volume claim)
git_repo =
git_branch =
Expand Down Expand Up @@ -737,12 +745,12 @@ tolerations =
# scheduler. You may define as many secrets as needed and the kubernetes launcher will parse the
# defined secrets and mount them as secret environment variables in the launched workers.
# Secrets in this section are defined as follows
# <environment_variable_mount> = <kubernetes_secret_object>:<kubernetes_secret_key>
# <environment_variable_mount> = <kubernetes_secret_object>=<kubernetes_secret_key>
#
# For example if you wanted to mount a kubernetes secret key named `postgres_password` from the
# kubernetes secret object `airflow-secret` as the environment variable `POSTGRES_PASSWORD` into
# your workers you would follow the following format:
# POSTGRES_PASSWORD = airflow-secret:postgres_credentials
# POSTGRES_PASSWORD = airflow-secret=postgres_credentials
#
# Additionally you may override worker airflow settings with the AIRFLOW__<SECTION>__<KEY>
# formatting as supported by airflow normally.
4 changes: 4 additions & 0 deletions airflow/contrib/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ def __init__(self):
self.core_configuration = configuration_dict['core']
self.kube_secrets = configuration_dict.get('kubernetes_secrets', {})
self.kube_env_vars = configuration_dict.get('kubernetes_environment_variables', {})
self.env_from_configmap_ref = configuration.get(self.kubernetes_section,
'env_from_configmap_ref')
self.env_from_secret_ref = configuration.get(self.kubernetes_section,
'env_from_secret_ref')
self.airflow_home = settings.AIRFLOW_HOME
self.dags_folder = configuration.get(self.core_section, 'dags_folder')
self.parallelism = configuration.getint(self.core_section, 'PARALLELISM')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ def extract_affinity(pod, req):
for k, v in six.iteritems(pod.affinity):
req['spec']['affinity'][k] = v

@staticmethod
def extract_node_selector(pod, req):
req['spec']['nodeSelector'] = req['spec'].get('nodeSelector', {})
for k, v in six.iteritems(pod.node_selectors):
req['spec']['nodeSelector'][k] = v

@staticmethod
def extract_cmds(pod, req):
req['spec']['containers'][0]['command'] = pod.cmds
Expand All @@ -83,12 +89,6 @@ def extract_cmds(pod, req):
def extract_args(pod, req):
req['spec']['containers'][0]['args'] = pod.args

@staticmethod
def extract_node_selector(pod, req):
req['spec']['nodeSelector'] = req['spec'].get('nodeSelector', {})
for k, v in six.iteritems(pod.node_selectors):
req['spec']['nodeSelector'][k] = v

@staticmethod
def attach_volumes(pod, req):
req['spec']['volumes'] = (
Expand Down Expand Up @@ -132,7 +132,7 @@ def extract_volume_secrets(pod, req):
@staticmethod
def extract_env_and_secrets(pod, req):
envs_from_key_secrets = [
env for env in pod.secrets if env.deploy_type == 'env' and hasattr(env, 'key')
env for env in pod.secrets if env.deploy_type == 'env' and env.key is not None
]

if len(pod.envs) > 0 or len(envs_from_key_secrets) > 0:
Expand Down Expand Up @@ -206,7 +206,7 @@ def extract_security_context(pod, req):
@staticmethod
def _apply_env_from(pod, req):
envs_from_secrets = [
env for env in pod.secrets if env.deploy_type == 'env' and not hasattr(env, 'key')
env for env in pod.secrets if env.deploy_type == 'env' and env.key is None
]

if pod.configmaps or envs_from_secrets:
Expand Down
6 changes: 3 additions & 3 deletions airflow/contrib/kubernetes/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ class Pod:
:param secrets: Secrets to be launched to the pod
:type secrets: list[airflow.contrib.kubernetes.secret.Secret]
:param result: The result that will be returned to the operator after
successful execution of the pod
successful execution of the pod
:type result: any
:param image_pull_policy: Specify a policy to cache or always pull an image
:type image_pull_policy: str
:param image_pull_secrets: Any image pull secrets to be given to the pod.
If more than one secret is required, provide a
comma separated list: secret_a,secret_b
If more than one secret is required, provide a comma separated list:
secret_a,secret_b
:type image_pull_secrets: str
:param affinity: A dict containing a group of affinity scheduling rules
:type affinity: dict
Expand Down
40 changes: 27 additions & 13 deletions airflow/contrib/kubernetes/secret.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from airflow.exceptions import AirflowConfigException


class Secret:
class Secret(object):
"""Defines Kubernetes Secret Volume"""

def __init__(self, deploy_type, deploy_target, secret, key=None):
Expand All @@ -26,29 +26,43 @@ def __init__(self, deploy_type, deploy_target, secret, key=None):
:param deploy_type: The type of secret deploy in Kubernetes, either `env` or
`volume`
:type deploy_type: str
:param deploy_target: The environment variable when `deploy_type` `env` or
file path when `deploy_type` `volume` where expose secret.
If `key` is not provided deploy target should be None.
:type deploy_target: str
:param deploy_target: (Optional) The environment variable when
`deploy_type` `env` or file path when `deploy_type` `volume` where
expose secret. If `key` is not provided deploy target should be None.
:type deploy_target: str or None
:param secret: Name of the secrets object in Kubernetes
:type secret: str
:param key: (Optional) Key of the secret within the Kubernetes Secret
if not provided in `deploy_type` `env` it will mount all secrets in object
:type key: str or None
"""
self.deploy_type = deploy_type
self.deploy_target = deploy_target

if deploy_target:
if deploy_target is not None and deploy_type == 'env':
# if deploying to env, capitalize the deploy target
self.deploy_target = deploy_target.upper()

if deploy_type == 'volume':
self.deploy_target = deploy_target

if not deploy_type == 'env' and key is None:
if key is not None and deploy_target is None:
raise AirflowConfigException(
'In deploy_type different than `env` parameter `key` is mandatory'
'If `key` is set, `deploy_target` should not be None'
)

self.secret = secret
if key:
self.key = key
self.key = key

def __eq__(self, other):
return (
self.deploy_type == other.deploy_type and
self.deploy_target == other.deploy_target and
self.secret == other.secret and
self.key == other.key
)

def __repr__(self):
return 'Secret({}, {}, {}, {})'.format(
self.deploy_type,
self.deploy_target,
self.secret,
self.key
)
18 changes: 17 additions & 1 deletion airflow/contrib/kubernetes/worker_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,28 @@ def _get_environment(self):
env['AIRFLOW__CORE__DAGS_FOLDER'] = dag_volume_mount_path
return env

def _get_configmaps(self):
"""Extracts any configmapRefs to envFrom"""
if not self.kube_config.env_from_configmap_ref:
return []
return self.kube_config.env_from_configmap_ref.split(',')

def _get_secrets(self):
"""Defines any necessary secrets for the pod executor"""
worker_secrets = []

for env_var_name, obj_key_pair in six.iteritems(self.kube_config.kube_secrets):
k8s_secret_obj, k8s_secret_key = obj_key_pair.split('=')
worker_secrets.append(
Secret('env', env_var_name, k8s_secret_obj, k8s_secret_key))
Secret('env', env_var_name, k8s_secret_obj, k8s_secret_key)
)

if self.kube_config.env_from_secret_ref:
for secret_ref in self.kube_config.env_from_secret_ref.split(','):
worker_secrets.append(
Secret('env', None, secret_ref)
)

return worker_secrets

def _get_image_pull_secrets(self):
Expand Down Expand Up @@ -331,4 +346,5 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da
affinity=affinity,
tolerations=tolerations,
security_context=self._get_security_context(),
configmaps=self._get_configmaps()
)
48 changes: 42 additions & 6 deletions tests/contrib/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
Expand Down Expand Up @@ -37,6 +35,7 @@
from airflow.contrib.executors.kubernetes_executor import KubernetesExecutorConfig
from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration
from airflow.exceptions import AirflowConfigException
from airflow.contrib.kubernetes.secret import Secret
except ImportError:
AirflowKubernetesScheduler = None

Expand Down Expand Up @@ -162,11 +161,8 @@ def setUp(self):
self.resources = mock.patch(
'airflow.contrib.kubernetes.worker_configuration.Resources'
)
self.secret = mock.patch(
'airflow.contrib.kubernetes.worker_configuration.Secret'
)

for patcher in [self.resources, self.secret]:
for patcher in [self.resources]:
self.mock_foo = patcher.start()
self.addCleanup(patcher.stop)

Expand Down Expand Up @@ -590,6 +586,46 @@ def test_kubernetes_environment_variables(self):
env = worker_config._get_environment()
self.assertEqual(env[core_executor], 'LocalExecutor')

def test_get_secrets(self):
# Test when secretRef is None and kube_secrets is not empty
self.kube_config.kube_secrets = {
'AWS_SECRET_KEY': 'airflow-secret=aws_secret_key',
'POSTGRES_PASSWORD': 'airflow-secret=postgres_credentials'
}
self.kube_config.env_from_secret_ref = None
worker_config = WorkerConfiguration(self.kube_config)
secrets = worker_config._get_secrets()
secrets.sort(key=lambda secret: secret.deploy_target)
expected = [
Secret('env', 'AWS_SECRET_KEY', 'airflow-secret', 'aws_secret_key'),
Secret('env', 'POSTGRES_PASSWORD', 'airflow-secret', 'postgres_credentials')
]
self.assertListEqual(expected, secrets)

# Test when secret is not empty and kube_secrets is empty dict
self.kube_config.kube_secrets = {}
self.kube_config.env_from_secret_ref = 'secret_a,secret_b'
worker_config = WorkerConfiguration(self.kube_config)
secrets = worker_config._get_secrets()
expected = [
Secret('env', None, 'secret_a'),
Secret('env', None, 'secret_b')
]
self.assertListEqual(expected, secrets)

def test_get_configmaps(self):
# Test when configmap is empty
self.kube_config.env_from_configmap_ref = ''
worker_config = WorkerConfiguration(self.kube_config)
configmaps = worker_config._get_configmaps()
self.assertListEqual([], configmaps)

# test when configmap is not empty
self.kube_config.env_from_configmap_ref = 'configmap_a,configmap_b'
worker_config = WorkerConfiguration(self.kube_config)
configmaps = worker_config._get_configmaps()
self.assertListEqual(['configmap_a', 'configmap_b'], configmaps)


class TestKubernetesExecutor(unittest.TestCase):
"""
Expand Down
18 changes: 18 additions & 0 deletions tests/contrib/kubernetes/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
18 changes: 18 additions & 0 deletions tests/contrib/kubernetes/kubernetes_request_factory/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Loading