-
Notifications
You must be signed in to change notification settings - Fork 14.8k
/
Copy pathsecret.py
124 lines (102 loc) · 5.1 KB
/
secret.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Classes for interacting with Kubernetes API."""
from __future__ import annotations
import copy
import uuid
from kubernetes.client import models as k8s
from airflow.exceptions import AirflowConfigException
from airflow.kubernetes.pre_7_4_0_compatibility.k8s_model import K8SModel
class Secret(K8SModel):
"""Defines Kubernetes Secret Volume."""
def __init__(self, deploy_type, deploy_target, secret, key=None, items=None):
"""
Initialize a Kubernetes Secret Object.
Used to track requested secrets from the user.
:param deploy_type: The type of secret deploy in Kubernetes, either `env` or
`volume`
:param deploy_target: (Optional) The environment variable when
`deploy_type` `env` or file path when `deploy_type` `volume` where
expose secret. If `key` is not provided deploy target should be None.
:param secret: Name of the secrets object in Kubernetes
:param key: (Optional) Key of the secret within the Kubernetes Secret
if not provided in `deploy_type` `env` it will mount all secrets in object
:param items: (Optional) items that can be added to a volume secret for specifying projects of
secret keys to paths
https://kubernetes.io/docs/concepts/configuration/secret/#projection-of-secret-keys-to-specific-paths
"""
if deploy_type not in ("env", "volume"):
raise AirflowConfigException("deploy_type must be env or volume")
self.deploy_type = deploy_type
self.deploy_target = deploy_target
self.items = items or []
if deploy_target is not None and deploy_type == "env":
# if deploying to env, capitalize the deploy target
self.deploy_target = deploy_target.upper()
if key is not None and deploy_target is None:
raise AirflowConfigException("If `key` is set, `deploy_target` should not be None")
self.secret = secret
self.key = key
def to_env_secret(self) -> k8s.V1EnvVar:
"""Store es environment secret."""
return k8s.V1EnvVar(
name=self.deploy_target,
value_from=k8s.V1EnvVarSource(
secret_key_ref=k8s.V1SecretKeySelector(name=self.secret, key=self.key)
),
)
def to_env_from_secret(self) -> k8s.V1EnvFromSource:
"""Read from environment to secret."""
return k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(name=self.secret))
def to_volume_secret(self) -> tuple[k8s.V1Volume, k8s.V1VolumeMount]:
"""Convert to volume secret."""
vol_id = f"secretvol{uuid.uuid4()}"
volume = k8s.V1Volume(name=vol_id, secret=k8s.V1SecretVolumeSource(secret_name=self.secret))
if self.items:
volume.secret.items = self.items
return (volume, k8s.V1VolumeMount(mount_path=self.deploy_target, name=vol_id, read_only=True))
def attach_to_pod(self, pod: k8s.V1Pod) -> k8s.V1Pod:
"""Attaches to pod."""
cp_pod = copy.deepcopy(pod)
if self.deploy_type == "volume":
volume, volume_mount = self.to_volume_secret()
if cp_pod.spec.volumes is None:
cp_pod.spec.volumes = []
cp_pod.spec.volumes.append(volume)
if cp_pod.spec.containers[0].volume_mounts is None:
cp_pod.spec.containers[0].volume_mounts = []
cp_pod.spec.containers[0].volume_mounts.append(volume_mount)
if self.deploy_type == "env" and self.key is not None:
env = self.to_env_secret()
if cp_pod.spec.containers[0].env is None:
cp_pod.spec.containers[0].env = []
cp_pod.spec.containers[0].env.append(env)
if self.deploy_type == "env" and self.key is None:
env_from = self.to_env_from_secret()
if cp_pod.spec.containers[0].env_from is None:
cp_pod.spec.containers[0].env_from = []
cp_pod.spec.containers[0].env_from.append(env_from)
return cp_pod
def __eq__(self, other):
return (
self.deploy_type == other.deploy_type
and self.deploy_target == other.deploy_target
and self.secret == other.secret
and self.key == other.key
)
def __repr__(self):
return f"Secret({self.deploy_type}, {self.deploy_target}, {self.secret}, {self.key})"