Skip to content

Commit

Permalink
[AIRFLOW-3937] KubernetesPodOperator support for envFrom configMapRef…
Browse files Browse the repository at this point in the history
… and secretRef (apache#4772)
  • Loading branch information
galuszkak authored and andriisoldatenko committed Jul 26, 2019
1 parent 308c5b8 commit 81fa3cd
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,21 @@ def extract_volume_secrets(pod, req):

@staticmethod
def extract_env_and_secrets(pod, req):
env_secrets = [s for s in pod.secrets if s.deploy_type == 'env']
if len(pod.envs) > 0 or len(env_secrets) > 0:
envs_from_key_secrets = [
env for env in pod.secrets if env.deploy_type == 'env' and hasattr(env, 'key')
]

if len(pod.envs) > 0 or len(envs_from_key_secrets) > 0:
env = []
for k in pod.envs.keys():
env.append({'name': k, 'value': pod.envs[k]})
for secret in env_secrets:
for secret in envs_from_key_secrets:
KubernetesRequestFactory.add_secret_to_env(env, secret)

req['spec']['containers'][0]['env'] = env

KubernetesRequestFactory._apply_env_from(pod, req)

@staticmethod
def extract_resources(pod, req):
if not pod.resources or pod.resources.is_empty_resource_request():
Expand Down Expand Up @@ -196,3 +202,30 @@ def extract_tolerations(pod, req):
def extract_security_context(pod, req):
if pod.security_context:
req['spec']['securityContext'] = pod.security_context

@staticmethod
def _apply_env_from(pod, req):
envs_from_secrets = [
env for env in pod.secrets if env.deploy_type == 'env' and not hasattr(env, 'key')
]

if pod.configmaps or envs_from_secrets:
req['spec']['containers'][0]['envFrom'] = []

for secret in envs_from_secrets:
req['spec']['containers'][0]['envFrom'].append(
{
'secretRef': {
'name': secret.secret
}
}
)

for configmap in pod.configmaps:
req['spec']['containers'][0]['envFrom'].append(
{
'configMapRef': {
'name': configmap
}
}
)
5 changes: 5 additions & 0 deletions airflow/contrib/kubernetes/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ class Pod:
:type tolerations: list
:param security_context: A dict containing the security context for the pod
:type security_context: dict
:param configmaps: A list containing names of configmaps object
mounting env variables to the pod
:type configmaps: list[str]
"""
def __init__(
self,
Expand All @@ -91,6 +94,7 @@ def __init__(
hostnetwork=False,
tolerations=None,
security_context=None,
configmaps=None
):
self.image = image
self.envs = envs or {}
Expand All @@ -114,3 +118,4 @@ def __init__(
self.hostnetwork = hostnetwork or False
self.tolerations = tolerations or []
self.security_context = security_context
self.configmaps = configmaps or []
25 changes: 19 additions & 6 deletions airflow/contrib/kubernetes/secret.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,41 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from airflow.exceptions import AirflowConfigException


class Secret:
"""Defines Kubernetes Secret Volume"""

def __init__(self, deploy_type, deploy_target, secret, key):
def __init__(self, deploy_type, deploy_target, secret, key=None):
"""Initialize a Kubernetes Secret Object. Used to track requested secrets from
the user.
:param deploy_type: The type of secret deploy in Kubernetes, either `env` or
`volume`
:type deploy_type: str
:param deploy_target: The environment variable when `deploy_type` `env` or
file path when `deploy_type` `volume` where expose secret
file path when `deploy_type` `volume` where expose secret.
If `key` is not provided deploy target should be None.
:type deploy_target: str
:param secret: Name of the secrets object in Kubernetes
:type secret: str
:param key: Key of the secret within the Kubernetes Secret
:type key: str
:param key: (Optional) Key of the secret within the Kubernetes Secret
if not provided in `deploy_type` `env` it will mount all secrets in object
:type key: str or None
"""
self.deploy_type = deploy_type
self.deploy_target = deploy_target.upper()

if deploy_target:
self.deploy_target = deploy_target.upper()

if deploy_type == 'volume':
self.deploy_target = deploy_target

if not deploy_type == 'env' and key is None:
raise AirflowConfigException(
'In deploy_type different than `env` parameter `key` is mandatory'
)

self.secret = secret
self.key = key
if key:
self.key = key
6 changes: 6 additions & 0 deletions airflow/contrib/operators/kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ class KubernetesPodOperator(BaseOperator):
:type hostnetwork: bool
:param tolerations: A list of kubernetes tolerations
:type tolerations: list tolerations
:param configmaps: A list of configmap names objects that we
want mount as env variables
:type configmaps: list[str]
"""
template_fields = ('cmds', 'arguments', 'env_vars', 'config_file')

Expand Down Expand Up @@ -120,6 +123,7 @@ def execute(self, context):
pod.node_selectors = self.node_selectors
pod.hostnetwork = self.hostnetwork
pod.tolerations = self.tolerations
pod.configmaps = self.configmaps

launcher = pod_launcher.PodLauncher(kube_client=client,
extract_xcom=self.do_xcom_push)
Expand Down Expand Up @@ -169,6 +173,7 @@ def __init__(self,
is_delete_operator_pod=False,
hostnetwork=False,
tolerations=None,
configmaps=None,
*args,
**kwargs):
super(KubernetesPodOperator, self).__init__(*args, **kwargs)
Expand Down Expand Up @@ -201,3 +206,4 @@ def __init__(self,
self.is_delete_operator_pod = is_delete_operator_pod
self.hostnetwork = hostnetwork
self.tolerations = tolerations or []
self.configmaps = configmaps or []
8 changes: 6 additions & 2 deletions docs/kubernetes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,14 @@ Kubernetes Operator
secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
secret_env = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn')
secret_all_keys = Secret('env', None, 'airflow-secrets-2')
volume_mount = VolumeMount('test-volume',
mount_path='/root/mount_file',
sub_path=None,
read_only=True)
configmaps = ['test-configmap-1', 'test-configmap-2']
volume_config= {
'persistentVolumeClaim':
{
Expand Down Expand Up @@ -128,15 +131,16 @@ Kubernetes Operator
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
secrets=[secret_file,secret_env]
secrets=[secret_file, secret_env, secret_all_keys],
volumes=[volume],
volume_mounts=[volume_mount]
name="test",
task_id="task",
affinity=affinity,
is_delete_operator_pod=True,
hostnetwork=False,
tolerations=tolerations
tolerations=tolerations,
configmaps=configmaps
)
Expand Down
45 changes: 45 additions & 0 deletions tests/contrib/minikube/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import os
import shutil
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.contrib.kubernetes.secret import Secret
from airflow import AirflowException
from kubernetes.client.rest import ApiException
from subprocess import check_call
Expand Down Expand Up @@ -326,6 +327,50 @@ def test_xcom_push(self):
)
self.assertEqual(k.execute(None), json.loads(return_value))

@mock.patch("airflow.contrib.kubernetes.pod_launcher.PodLauncher.run_pod")
@mock.patch("airflow.contrib.kubernetes.kube_client.get_kube_client")
def test_envs_from_configmaps(self, client_mock, launcher_mock):
# GIVEN
from airflow.utils.state import State
configmaps = ['test-configmap']
# WHEN
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name="test",
task_id="task",
configmaps=configmaps
)
# THEN
launcher_mock.return_value = (State.SUCCESS, None)
k.execute(None)
self.assertEqual(launcher_mock.call_args[0][0].configmaps, configmaps)

@mock.patch("airflow.contrib.kubernetes.pod_launcher.PodLauncher.run_pod")
@mock.patch("airflow.contrib.kubernetes.kube_client.get_kube_client")
def test_envs_from_secrets(self, client_mock, launcher_mock):
# GIVEN
from airflow.utils.state import State
secrets = [Secret('env', None, "secret_name")]
# WHEN
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
secrets=secrets,
labels={"foo": "bar"},
name="test",
task_id="task",
)
# THEN
launcher_mock.return_value = (State.SUCCESS, None)
k.execute(None)
self.assertEqual(launcher_mock.call_args[0][0].secrets, secrets)


if __name__ == '__main__':
unittest.main()

0 comments on commit 81fa3cd

Please sign in to comment.