diff --git a/airflow/kubernetes/__init__.py b/airflow/kubernetes/__init__.py deleted file mode 100644 index 4cea2a1877607..0000000000000 --- a/airflow/kubernetes/__init__.py +++ /dev/null @@ -1,154 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -from airflow.utils.deprecation_tools import add_deprecated_classes - -__deprecated_classes: dict[str, dict[str, str]] = { - "kubernetes_helper_functions": { - "add_pod_suffix": "airflow.providers.cncf.kubernetes.kubernetes_helper_functions.add_pod_suffix.", - "annotations_for_logging_task_metadata": "airflow.providers.cncf.kubernetes." - "kubernetes_helper_functions." - "annotations_for_logging_task_metadata.", - "annotations_to_key": "airflow.providers.cncf.kubernetes." - "kubernetes_helper_functions.annotations_to_key", - "create_pod_id": "airflow.providers.cncf.kubernetes.kubernetes_helper_functions.create_pod_id", - "get_logs_task_metadata": "airflow.providers.cncf.kubernetes." - "kubernetes_helper_functions.get_logs_task_metadata", - "rand_str": "airflow.providers.cncf.kubernetes.kubernetes_helper_functions.rand_str", - }, - "pod": { - "Port": "airflow.providers.cncf.kubernetes.backcompat.pod.Port", - "Resources": "airflow.providers.cncf.kubernetes.backcompat.pod.Resources", - }, - "pod_launcher": { - "PodLauncher": "airflow.providers.cncf.kubernetes.pod_launcher.PodLauncher", - "PodStatus": "airflow.providers.cncf.kubernetes.pod_launcher.PodStatus", - }, - "pod_launcher_deprecated": { - "PodLauncher": "airflow.providers.cncf.kubernetes.pod_launcher_deprecated.PodLauncher", - "PodStatus": "airflow.providers.cncf.kubernetes.pod_launcher_deprecated.PodStatus", - # imports of imports from other kubernetes modules (in case they are imported from here) - "get_kube_client": "airflow.providers.cncf.kubernetes.kube_client.get_kube_client", - "PodDefaults": "airflow.providers.cncf.kubernetes.pod_generator_deprecated.PodDefaults", - }, - "pod_runtime_info_env": { - "PodRuntimeInfoEnv": "airflow.providers.cncf.kubernetes.backcompat." - "pod_runtime_info_env.PodRuntimeInfoEnv", - }, - "volume": { - "Volume": "airflow.providers.cncf.kubernetes.backcompat.volume.Volume", - }, - "volume_mount": { - "VolumeMount": "airflow.providers.cncf.kubernetes.backcompat.volume_mount.VolumeMount", - }, - # the below classes are not served from provider but from internal pre_7_4_0_compatibility package - "k8s_model": { - "K8SModel": "airflow.kubernetes.pre_7_4_0_compatibility.k8s_model.K8SModel", - "append_to_pod": "airflow.kubernetes.pre_7_4_0_compatibility.k8s_model.append_to_pod", - }, - "kube_client": { - "_disable_verify_ssl": "airflow.kubernetes.pre_7_4_0_compatibility.kube_client._disable_verify_ssl", - "_enable_tcp_keepalive": "airflow.kubernetes.pre_7_4_0_compatibility.kube_client." - "_enable_tcp_keepalive", - "get_kube_client": "airflow.kubernetes.pre_7_4_0_compatibility.kube_client.get_kube_client", - }, - "pod_generator": { - "datetime_to_label_safe_datestring": "airflow.kubernetes.pre_7_4_0_compatibility.pod_generator" - ".datetime_to_label_safe_datestring", - "extend_object_field": "airflow.kubernetes.pre_7_4_0_compatibility.pod_generator." - "extend_object_field", - "label_safe_datestring_to_datetime": "airflow.kubernetes.pre_7_4_0_compatibility.pod_generator." - "label_safe_datestring_to_datetime", - "make_safe_label_value": "airflow.kubernetes.pre_7_4_0_compatibility.pod_generator." - "make_safe_label_value", - "merge_objects": "airflow.kubernetes.pre_7_4_0_compatibility.pod_generator.merge_objects", - "PodGenerator": "airflow.kubernetes.pre_7_4_0_compatibility.pod_generator.PodGenerator", - # imports of imports from other kubernetes modules (in case they are imported from here) - "PodGeneratorDeprecated": "airflow.kubernetes.pre_7_4_0_compatibility." - "pod_generator_deprecated.PodGenerator", - "PodDefaults": "airflow.kubernetes.pre_7_4_0_compatibility.pod_generator_deprecated.PodDefaults", - # those two are inlined in kubernetes.pre_7_4_0_compatibility.pod_generator even if - # originally they were imported in airflow.kubernetes.pod_generator - "add_pod_suffix": "airflow.kubernetes.pre_7_4_0_compatibility.pod_generator.add_pod_suffix", - "rand_str": "airflow.kubernetes.pre_7_4_0_compatibility.pod_generator.rand_str", - }, - "pod_generator_deprecated": { - "make_safe_label_value": "airflow.kubernetes.pre_7_4_0_compatibility.pod_generator_deprecated." - "make_safe_label_value", - "PodDefaults": "airflow.kubernetes.pre_7_4_0_compatibility.pod_generator_deprecated.PodDefaults", - "PodGenerator": "airflow.kubernetes.pre_7_4_0_compatibility.pod_generator_deprecated.PodGenerator", - }, - "secret": { - "Secret": "airflow.kubernetes.pre_7_4_0_compatibility.secret.Secret", - # imports of imports from other kubernetes modules (in case they are imported from here) - "K8SModel": "airflow.kubernetes.pre_7_4_0_compatibility.k8s_model.K8SModel", - }, -} - -__override_deprecated_names: dict[str, dict[str, str]] = { - "pod": { - "Port": "kubernetes.client.models.V1ContainerPort", - "Resources": "kubernetes.client.models.V1ResourceRequirements", - }, - "pod_runtime_info_env": { - "PodRuntimeInfoEnv": "kubernetes.client.models.V1EnvVar", - }, - "volume": { - "Volume": "kubernetes.client.models.V1Volume", - }, - "volume_mount": { - "VolumeMount": "kubernetes.client.models.V1VolumeMount", - }, - "k8s_model": { - "K8SModel": "airflow.airflow.providers.cncf.kubernetes.k8s_model.K8SModel", - "append_to_pod": "airflow.airflow.providers.cncf.kubernetes.k8s_model.append_to_pod", - }, - "kube_client": { - "_disable_verify_ssl": "airflow.kubernetes.airflow.providers.cncf.kubernetes." - "kube_client._disable_verify_ssl", - "_enable_tcp_keepalive": "airflow.kubernetes.airflow.providers.cncf.kubernetes.kube_client." - "_enable_tcp_keepalive", - "get_kube_client": "airflow.kubernetes.airflow.providers.cncf.kubernetes.kube_client.get_kube_client", - }, - "pod_generator": { - "datetime_to_label_safe_datestring": "airflow.providers.cncf.kubernetes.pod_generator" - ".datetime_to_label_safe_datestring", - "extend_object_field": "airflow.kubernetes.airflow.providers.cncf.kubernetes.pod_generator." - "extend_object_field", - "label_safe_datestring_to_datetime": "airflow.providers.cncf.kubernetes.pod_generator." - "label_safe_datestring_to_datetime", - "make_safe_label_value": "airflow.providers.cncf.kubernetes.pod_generator.make_safe_label_value", - "merge_objects": "airflow.providers.cncf.kubernetes.pod_generator.merge_objects", - "PodGenerator": "airflow.providers.cncf.kubernetes.pod_generator.PodGenerator", - }, - "pod_generator_deprecated": { - "make_safe_label_value": "airflow.providers.cncf.kubernetes.pod_generator_deprecated." - "make_safe_label_value", - "PodDefaults": "airflow.providers.cncf.kubernetes.pod_generator_deprecated.PodDefaults", - "PodGenerator": "airflow.providers.cncf.kubernetes.pod_generator_deprecated.PodGenerator", - }, - "secret": { - "Secret": "airflow.providers.cncf.kubernetes.secret.Secret", - }, -} -add_deprecated_classes( - __deprecated_classes, - __name__, - __override_deprecated_names, - "The `cncf.kubernetes` provider must be >= 7.4.0 for that.", -) diff --git a/airflow/kubernetes/pre_7_4_0_compatibility/__init__.py b/airflow/kubernetes/pre_7_4_0_compatibility/__init__.py deleted file mode 100644 index 18c84b6d03e51..0000000000000 --- a/airflow/kubernetes/pre_7_4_0_compatibility/__init__.py +++ /dev/null @@ -1,31 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -# All the classes in this module should only be kept for backwards-compatibility reasons. -# old cncf.kubernetes providers will use those in their frozen version for pre-7.4.0 release -import warnings - -warnings.warn( - "This module is deprecated. The `cncf.kubernetes` provider before version 7.4.0 uses this module - " - "you should migrate to a newer version of `cncf.kubernetes` to get rid of this warning. If you " - "import the module via `airflow.kubernetes` import, please use `cncf.kubernetes' " - "provider 7.4.0+ and switch all your imports to use `apache.airflow.providers.cncf.kubernetes` " - "to get rid of the warning.", - DeprecationWarning, - stacklevel=2, -) diff --git a/airflow/kubernetes/pre_7_4_0_compatibility/k8s_model.py b/airflow/kubernetes/pre_7_4_0_compatibility/k8s_model.py deleted file mode 100644 index 930b194a2dbea..0000000000000 --- a/airflow/kubernetes/pre_7_4_0_compatibility/k8s_model.py +++ /dev/null @@ -1,62 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -"""Classes for interacting with Kubernetes API.""" - -from __future__ import annotations - -from abc import ABC, abstractmethod -from functools import reduce -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from kubernetes.client import models as k8s - - -class K8SModel(ABC): - """ - Airflow Kubernetes models are here for backwards compatibility reasons only. - - Ideally clients should use the kubernetes API - and the process of - - client input -> Airflow k8s models -> k8s models - - can be avoided. All of these models implement the - `attach_to_pod` method so that they integrate with the kubernetes client. - """ - - @abstractmethod - def attach_to_pod(self, pod: k8s.V1Pod) -> k8s.V1Pod: - """ - Attaches to pod. - - :param pod: A pod to attach this Kubernetes object to - :return: The pod with the object attached - """ - - -def append_to_pod(pod: k8s.V1Pod, k8s_objects: list[K8SModel] | None): - """ - Attach additional specs to an existing pod object. - - :param pod: A pod to attach a list of Kubernetes objects to - :param k8s_objects: a potential None list of K8SModels - :return: pod with the objects attached if they exist - """ - if not k8s_objects: - return pod - return reduce(lambda p, o: o.attach_to_pod(p), k8s_objects, pod) diff --git a/airflow/kubernetes/pre_7_4_0_compatibility/kube_client.py b/airflow/kubernetes/pre_7_4_0_compatibility/kube_client.py deleted file mode 100644 index 982f0da439bc5..0000000000000 --- a/airflow/kubernetes/pre_7_4_0_compatibility/kube_client.py +++ /dev/null @@ -1,145 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -"""Client for kubernetes communication.""" - -from __future__ import annotations - -import logging - -import urllib3.util - -from airflow.configuration import conf - -log = logging.getLogger(__name__) - -try: - from kubernetes import client, config - from kubernetes.client import Configuration - from kubernetes.client.rest import ApiException - - has_kubernetes = True - - def _get_default_configuration() -> Configuration: - if hasattr(Configuration, "get_default_copy"): - return Configuration.get_default_copy() - return Configuration() - - def _disable_verify_ssl() -> None: - configuration = _get_default_configuration() - configuration.verify_ssl = False - Configuration.set_default(configuration) - -except ImportError as e: - # We need an exception class to be able to use it in ``except`` elsewhere - # in the code base - ApiException = BaseException - has_kubernetes = False - _import_err = e - - -def _enable_tcp_keepalive() -> None: - """ - Enable TCP keepalive mechanism. - - This prevents urllib3 connection to hang indefinitely when idle connection - is time-outed on services like cloud load balancers or firewalls. - - See https://github.com/apache/airflow/pull/11406 for detailed explanation. - Please ping @michalmisiewicz or @dimberman in the PR if you want to modify this function. - """ - import socket - - from urllib3.connection import HTTPConnection, HTTPSConnection - - tcp_keep_idle = conf.getint("kubernetes_executor", "tcp_keep_idle") - tcp_keep_intvl = conf.getint("kubernetes_executor", "tcp_keep_intvl") - tcp_keep_cnt = conf.getint("kubernetes_executor", "tcp_keep_cnt") - - socket_options = [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)] - - if hasattr(socket, "TCP_KEEPIDLE"): - socket_options.append((socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, tcp_keep_idle)) - else: - log.debug("Unable to set TCP_KEEPIDLE on this platform") - - if hasattr(socket, "TCP_KEEPINTVL"): - socket_options.append((socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, tcp_keep_intvl)) - else: - log.debug("Unable to set TCP_KEEPINTVL on this platform") - - if hasattr(socket, "TCP_KEEPCNT"): - socket_options.append((socket.IPPROTO_TCP, socket.TCP_KEEPCNT, tcp_keep_cnt)) - else: - log.debug("Unable to set TCP_KEEPCNT on this platform") - - HTTPSConnection.default_socket_options = HTTPSConnection.default_socket_options + socket_options - HTTPConnection.default_socket_options = HTTPConnection.default_socket_options + socket_options - - -def get_kube_client( - in_cluster: bool = conf.getboolean("kubernetes_executor", "in_cluster"), - cluster_context: str | None = None, - config_file: str | None = None, -) -> client.CoreV1Api: - """ - Retrieve Kubernetes client. - - :param in_cluster: whether we are in cluster - :param cluster_context: context of the cluster - :param config_file: configuration file - :return kubernetes client - :rtype client.CoreV1Api - """ - if not has_kubernetes: - raise _import_err - - if conf.getboolean("kubernetes_executor", "enable_tcp_keepalive"): - _enable_tcp_keepalive() - - configuration = _get_default_configuration() - api_client_retry_configuration = conf.getjson( - "kubernetes_executor", "api_client_retry_configuration", fallback={} - ) - - if not conf.getboolean("kubernetes_executor", "verify_ssl"): - _disable_verify_ssl() - - if isinstance(api_client_retry_configuration, dict): - configuration.retries = urllib3.util.Retry(**api_client_retry_configuration) - else: - raise ValueError("api_client_retry_configuration should be a dictionary") - - if in_cluster: - config.load_incluster_config(client_configuration=configuration) - else: - if cluster_context is None: - cluster_context = conf.get("kubernetes_executor", "cluster_context", fallback=None) - if config_file is None: - config_file = conf.get("kubernetes_executor", "config_file", fallback=None) - config.load_kube_config( - config_file=config_file, context=cluster_context, client_configuration=configuration - ) - - if not conf.getboolean("kubernetes_executor", "verify_ssl"): - configuration.verify_ssl = False - - ssl_ca_cert = conf.get("kubernetes_executor", "ssl_ca_cert") - if ssl_ca_cert: - configuration.ssl_ca_cert = ssl_ca_cert - - api_client = client.ApiClient(configuration=configuration) - return client.CoreV1Api(api_client) diff --git a/airflow/kubernetes/pre_7_4_0_compatibility/pod_generator.py b/airflow/kubernetes/pre_7_4_0_compatibility/pod_generator.py deleted file mode 100644 index aec5602744a25..0000000000000 --- a/airflow/kubernetes/pre_7_4_0_compatibility/pod_generator.py +++ /dev/null @@ -1,680 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -""" -Pod generator compatible with cncf-providers released before 2.7.0 of airflow. - -Compatible with pre-7.4.0 of the cncf.kubernetes provider. - -This module provides an interface between the previous Pod -API and outputs a kubernetes.client.models.V1Pod. -The advantage being that the full Kubernetes API -is supported and no serialization need be written. -""" - -from __future__ import annotations - -import copy -import logging -import os -import secrets -import string -import warnings -from functools import reduce -from typing import TYPE_CHECKING - -import re2 -from dateutil import parser -from kubernetes.client import models as k8s -from kubernetes.client.api_client import ApiClient - -from airflow.exceptions import ( - AirflowConfigException, - PodMutationHookException, - PodReconciliationError, - RemovedInAirflow3Warning, -) -from airflow.kubernetes.pre_7_4_0_compatibility.pod_generator_deprecated import ( - PodDefaults, - PodGenerator as PodGeneratorDeprecated, -) -from airflow.utils import yaml -from airflow.utils.hashlib_wrapper import md5 -from airflow.version import version as airflow_version - -if TYPE_CHECKING: - import datetime - -log = logging.getLogger(__name__) - -MAX_LABEL_LEN = 63 - -alphanum_lower = string.ascii_lowercase + string.digits - - -def rand_str(num): - """ - Generate random lowercase alphanumeric string of length num. - - :meta private: - """ - return "".join(secrets.choice(alphanum_lower) for _ in range(num)) - - -def add_pod_suffix(pod_name: str, rand_len: int = 8, max_len: int = 80) -> str: - """ - Add random string to pod name while staying under max length. - - :param pod_name: name of the pod - :param rand_len: length of the random string to append - :max_len: maximum length of the pod name - :meta private: - """ - suffix = "-" + rand_str(rand_len) - return pod_name[: max_len - len(suffix)].strip("-.") + suffix - - -def make_safe_label_value(string: str) -> str: - """ - Normalize a provided label to be of valid length and characters. - - Valid label values must be 63 characters or less and must be empty or begin and - end with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_), - dots (.), and alphanumerics between. - - If the label value is greater than 63 chars once made safe, or differs in any - way from the original value sent to this function, then we need to truncate to - 53 chars, and append it with a unique hash. - """ - safe_label = re2.sub(r"^[^a-z0-9A-Z]*|[^a-zA-Z0-9_\-\.]|[^a-z0-9A-Z]*$", "", string) - - if len(safe_label) > MAX_LABEL_LEN or string != safe_label: - safe_hash = md5(string.encode()).hexdigest()[:9] - safe_label = safe_label[: MAX_LABEL_LEN - len(safe_hash) - 1] + "-" + safe_hash - - return safe_label - - -def datetime_to_label_safe_datestring(datetime_obj: datetime.datetime) -> str: - """ - Transform a datetime string to use as a label. - - Kubernetes doesn't like ":" in labels, since ISO datetime format uses ":" but - not "_" let's - replace ":" with "_" - - :param datetime_obj: datetime.datetime object - :return: ISO-like string representing the datetime - """ - return datetime_obj.isoformat().replace(":", "_").replace("+", "_plus_") - - -def label_safe_datestring_to_datetime(string: str) -> datetime.datetime: - """ - Transform a label back to a datetime object. - - Kubernetes doesn't permit ":" in labels. ISO datetime format uses ":" but not - "_", let's - replace ":" with "_" - - :param string: str - :return: datetime.datetime object - """ - return parser.parse(string.replace("_plus_", "+").replace("_", ":")) - - -class PodGenerator: - """ - Contains Kubernetes Airflow Worker configuration logic. - - Represents a kubernetes pod and manages execution of a single pod. - Any configuration that is container specific gets applied to - the first container in the list of containers. - - :param pod: The fully specified pod. Mutually exclusive with `pod_template_file` - :param pod_template_file: Path to YAML file. Mutually exclusive with `pod` - :param extract_xcom: Whether to bring up a container for xcom - """ - - def __init__( - self, - pod: k8s.V1Pod | None = None, - pod_template_file: str | None = None, - extract_xcom: bool = True, - ): - if not pod_template_file and not pod: - raise AirflowConfigException( - "Podgenerator requires either a `pod` or a `pod_template_file` argument" - ) - if pod_template_file and pod: - raise AirflowConfigException("Cannot pass both `pod` and `pod_template_file` arguments") - - if pod_template_file: - self.ud_pod = self.deserialize_model_file(pod_template_file) - else: - self.ud_pod = pod - - # Attach sidecar - self.extract_xcom = extract_xcom - - def gen_pod(self) -> k8s.V1Pod: - """Generate pod.""" - warnings.warn("This function is deprecated. ", RemovedInAirflow3Warning, stacklevel=2) - result = self.ud_pod - - result.metadata.name = add_pod_suffix(pod_name=result.metadata.name) - - if self.extract_xcom: - result = self.add_xcom_sidecar(result) - - return result - - @staticmethod - def add_xcom_sidecar(pod: k8s.V1Pod) -> k8s.V1Pod: - """Add sidecar.""" - warnings.warn( - "This function is deprecated. " - "Please use airflow.providers.cncf.kubernetes.utils.xcom_sidecar.add_xcom_sidecar instead", - RemovedInAirflow3Warning, - stacklevel=2, - ) - pod_cp = copy.deepcopy(pod) - pod_cp.spec.volumes = pod.spec.volumes or [] - pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME) - pod_cp.spec.containers[0].volume_mounts = pod_cp.spec.containers[0].volume_mounts or [] - pod_cp.spec.containers[0].volume_mounts.insert(0, PodDefaults.VOLUME_MOUNT) - pod_cp.spec.containers.append(PodDefaults.SIDECAR_CONTAINER) - - return pod_cp - - @staticmethod - def from_obj(obj) -> dict | k8s.V1Pod | None: - """Convert to pod from obj.""" - if obj is None: - return None - - k8s_legacy_object = obj.get("KubernetesExecutor", None) - k8s_object = obj.get("pod_override", None) - - if k8s_legacy_object and k8s_object: - raise AirflowConfigException( - "Can not have both a legacy and new" - "executor_config object. Please delete the KubernetesExecutor" - "dict and only use the pod_override kubernetes.client.models.V1Pod" - "object." - ) - if not k8s_object and not k8s_legacy_object: - return None - - if isinstance(k8s_object, k8s.V1Pod): - return k8s_object - elif isinstance(k8s_legacy_object, dict): - warnings.warn( - "Using a dictionary for the executor_config is deprecated and will soon be removed." - 'please use a `kubernetes.client.models.V1Pod` class with a "pod_override" key' - " instead. ", - category=RemovedInAirflow3Warning, - stacklevel=2, - ) - return PodGenerator.from_legacy_obj(obj) - else: - raise TypeError( - "Cannot convert a non-kubernetes.client.models.V1Pod object into a KubernetesExecutorConfig" - ) - - @staticmethod - def from_legacy_obj(obj) -> k8s.V1Pod | None: - """Convert to pod from obj.""" - if obj is None: - return None - - # We do not want to extract constant here from ExecutorLoader because it is just - # A name in dictionary rather than executor selection mechanism and it causes cyclic import - namespaced = obj.get("KubernetesExecutor", {}) - - if not namespaced: - return None - - resources = namespaced.get("resources") - - if resources is None: - requests = { - "cpu": namespaced.pop("request_cpu", None), - "memory": namespaced.pop("request_memory", None), - "ephemeral-storage": namespaced.get("ephemeral-storage"), # We pop this one in limits - } - limits = { - "cpu": namespaced.pop("limit_cpu", None), - "memory": namespaced.pop("limit_memory", None), - "ephemeral-storage": namespaced.pop("ephemeral-storage", None), - } - all_resources = list(requests.values()) + list(limits.values()) - if all(r is None for r in all_resources): - resources = None - else: - # remove None's so they don't become 0's - requests = {k: v for k, v in requests.items() if v is not None} - limits = {k: v for k, v in limits.items() if v is not None} - resources = k8s.V1ResourceRequirements(requests=requests, limits=limits) - namespaced["resources"] = resources - return PodGeneratorDeprecated(**namespaced).gen_pod() - - @staticmethod - def reconcile_pods(base_pod: k8s.V1Pod, client_pod: k8s.V1Pod | None) -> k8s.V1Pod: - """ - Merge Kubernetes Pod objects. - - :param base_pod: has the base attributes which are overwritten if they exist - in the client pod and remain if they do not exist in the client_pod - :param client_pod: the pod that the client wants to create. - :return: the merged pods - - This can't be done recursively as certain fields are overwritten and some are concatenated. - """ - if client_pod is None: - return base_pod - - client_pod_cp = copy.deepcopy(client_pod) - client_pod_cp.spec = PodGenerator.reconcile_specs(base_pod.spec, client_pod_cp.spec) - client_pod_cp.metadata = PodGenerator.reconcile_metadata(base_pod.metadata, client_pod_cp.metadata) - client_pod_cp = merge_objects(base_pod, client_pod_cp) - - return client_pod_cp - - @staticmethod - def reconcile_metadata(base_meta, client_meta): - """ - Merge Kubernetes Metadata objects. - - :param base_meta: has the base attributes which are overwritten if they exist - in the client_meta and remain if they do not exist in the client_meta - :param client_meta: the spec that the client wants to create. - :return: the merged specs - """ - if base_meta and not client_meta: - return base_meta - if not base_meta and client_meta: - return client_meta - elif client_meta and base_meta: - client_meta.labels = merge_objects(base_meta.labels, client_meta.labels) - client_meta.annotations = merge_objects(base_meta.annotations, client_meta.annotations) - extend_object_field(base_meta, client_meta, "managed_fields") - extend_object_field(base_meta, client_meta, "finalizers") - extend_object_field(base_meta, client_meta, "owner_references") - return merge_objects(base_meta, client_meta) - - return None - - @staticmethod - def reconcile_specs( - base_spec: k8s.V1PodSpec | None, client_spec: k8s.V1PodSpec | None - ) -> k8s.V1PodSpec | None: - """ - Merge Kubernetes PodSpec objects. - - :param base_spec: has the base attributes which are overwritten if they exist - in the client_spec and remain if they do not exist in the client_spec - :param client_spec: the spec that the client wants to create. - :return: the merged specs - """ - if base_spec and not client_spec: - return base_spec - if not base_spec and client_spec: - return client_spec - elif client_spec and base_spec: - client_spec.containers = PodGenerator.reconcile_containers( - base_spec.containers, client_spec.containers - ) - merged_spec = extend_object_field(base_spec, client_spec, "init_containers") - merged_spec = extend_object_field(base_spec, merged_spec, "volumes") - return merge_objects(base_spec, merged_spec) - - return None - - @staticmethod - def reconcile_containers( - base_containers: list[k8s.V1Container], client_containers: list[k8s.V1Container] - ) -> list[k8s.V1Container]: - """ - Merge Kubernetes Container objects. - - :param base_containers: has the base attributes which are overwritten if they exist - in the client_containers and remain if they do not exist in the client_containers - :param client_containers: the containers that the client wants to create. - :return: the merged containers - - The runs recursively over the list of containers. - """ - if not base_containers: - return client_containers - if not client_containers: - return base_containers - - client_container = client_containers[0] - base_container = base_containers[0] - client_container = extend_object_field(base_container, client_container, "volume_mounts") - client_container = extend_object_field(base_container, client_container, "env") - client_container = extend_object_field(base_container, client_container, "env_from") - client_container = extend_object_field(base_container, client_container, "ports") - client_container = extend_object_field(base_container, client_container, "volume_devices") - client_container = merge_objects(base_container, client_container) - - return [ - client_container, - *PodGenerator.reconcile_containers(base_containers[1:], client_containers[1:]), - ] - - @classmethod - def construct_pod( - cls, - dag_id: str, - task_id: str, - pod_id: str, - try_number: int, - kube_image: str, - date: datetime.datetime | None, - args: list[str], - pod_override_object: k8s.V1Pod | None, - base_worker_pod: k8s.V1Pod, - namespace: str, - scheduler_job_id: str, - run_id: str | None = None, - map_index: int = -1, - *, - with_mutation_hook: bool = False, - ) -> k8s.V1Pod: - """ - Create a Pod. - - Construct a pod by gathering and consolidating the configuration from 3 places: - - airflow.cfg - - executor_config - - dynamic arguments - """ - if len(pod_id) > 253: - warnings.warn( - "pod_id supplied is longer than 253 characters; truncating and adding unique suffix.", - category=UserWarning, - stacklevel=2, - ) - pod_id = add_pod_suffix(pod_name=pod_id, max_len=253) - try: - image = pod_override_object.spec.containers[0].image # type: ignore - if not image: - image = kube_image - except Exception: - image = kube_image - - annotations = { - "dag_id": dag_id, - "task_id": task_id, - "try_number": str(try_number), - } - if map_index >= 0: - annotations["map_index"] = str(map_index) - if date: - annotations["execution_date"] = date.isoformat() - if run_id: - annotations["run_id"] = run_id - - dynamic_pod = k8s.V1Pod( - metadata=k8s.V1ObjectMeta( - namespace=namespace, - annotations=annotations, - name=pod_id, - labels=cls.build_labels_for_k8s_executor_pod( - dag_id=dag_id, - task_id=task_id, - try_number=try_number, - airflow_worker=scheduler_job_id, - map_index=map_index, - execution_date=date, - run_id=run_id, - ), - ), - spec=k8s.V1PodSpec( - containers=[ - k8s.V1Container( - name="base", - args=args, - image=image, - env=[k8s.V1EnvVar(name="AIRFLOW_IS_K8S_EXECUTOR_POD", value="True")], - ) - ] - ), - ) - - # Reconcile the pods starting with the first chronologically, - # Pod from the pod_template_File -> Pod from executor_config arg -> Pod from the K8s executor - pod_list = [base_worker_pod, pod_override_object, dynamic_pod] - - try: - pod = reduce(PodGenerator.reconcile_pods, pod_list) - except Exception as e: - raise PodReconciliationError from e - - if with_mutation_hook: - from airflow.settings import pod_mutation_hook - - try: - pod_mutation_hook(pod) - except Exception as e: - raise PodMutationHookException from e - - return pod - - @classmethod - def build_selector_for_k8s_executor_pod( - cls, - *, - dag_id, - task_id, - try_number, - map_index=None, - execution_date=None, - run_id=None, - airflow_worker=None, - ): - """ - Generate selector for kubernetes executor pod. - - :meta private: - """ - labels = cls.build_labels_for_k8s_executor_pod( - dag_id=dag_id, - task_id=task_id, - try_number=try_number, - map_index=map_index, - execution_date=execution_date, - run_id=run_id, - airflow_worker=airflow_worker, - ) - label_strings = [f"{label_id}={label}" for label_id, label in sorted(labels.items())] - if not airflow_worker: # this filters out KPO pods even when we don't know the scheduler job id - label_strings.append("airflow-worker") - selector = ",".join(label_strings) - return selector - - @classmethod - def build_labels_for_k8s_executor_pod( - cls, - *, - dag_id, - task_id, - try_number, - airflow_worker=None, - map_index=None, - execution_date=None, - run_id=None, - ): - """ - Generate labels for kubernetes executor pod. - - :meta private: - """ - labels = { - "dag_id": make_safe_label_value(dag_id), - "task_id": make_safe_label_value(task_id), - "try_number": str(try_number), - "kubernetes_executor": "True", - "airflow_version": airflow_version.replace("+", "-"), - } - if airflow_worker is not None: - labels["airflow-worker"] = make_safe_label_value(str(airflow_worker)) - if map_index is not None and map_index >= 0: - labels["map_index"] = str(map_index) - if execution_date: - labels["execution_date"] = datetime_to_label_safe_datestring(execution_date) - if run_id: - labels["run_id"] = make_safe_label_value(run_id) - return labels - - @staticmethod - def serialize_pod(pod: k8s.V1Pod) -> dict: - """ - Convert a k8s.V1Pod into a json serializable dictionary. - - :param pod: k8s.V1Pod object - :return: Serialized version of the pod returned as dict - """ - api_client = ApiClient() - return api_client.sanitize_for_serialization(pod) - - @staticmethod - def deserialize_model_file(path: str) -> k8s.V1Pod: - """ - Generate a Pod from a file. - - :param path: Path to the file - :return: a kubernetes.client.models.V1Pod - """ - if os.path.exists(path): - with open(path) as stream: - pod = yaml.safe_load(stream) - else: - pod = None - log.warning("Model file %s does not exist", path) - - return PodGenerator.deserialize_model_dict(pod) - - @staticmethod - def deserialize_model_dict(pod_dict: dict | None) -> k8s.V1Pod: - """ - Deserializes a Python dictionary to k8s.V1Pod. - - Unfortunately we need access to the private method - ``_ApiClient__deserialize_model`` from the kubernetes client. - This issue is tracked here; https://github.com/kubernetes-client/python/issues/977. - - :param pod_dict: Serialized dict of k8s.V1Pod object - :return: De-serialized k8s.V1Pod - """ - api_client = ApiClient() - return api_client._ApiClient__deserialize_model(pod_dict, k8s.V1Pod) - - @staticmethod - def make_unique_pod_id(pod_id: str) -> str | None: - r""" - Generate a unique Pod name. - - Kubernetes pod names must consist of one or more lowercase - rfc1035/rfc1123 labels separated by '.' with a maximum length of 253 - characters. - - Name must pass the following regex for validation - ``^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$`` - - For more details, see: - https://github.com/kubernetes/kubernetes/blob/release-1.1/docs/design/identifiers.md - - :param pod_id: requested pod name - :return: ``str`` valid Pod name of appropriate length - """ - warnings.warn( - "This function is deprecated. Use `add_pod_suffix` in `kubernetes_helper_functions`.", - RemovedInAirflow3Warning, - stacklevel=2, - ) - - if not pod_id: - return None - - max_pod_id_len = 100 # arbitrarily chosen - suffix = rand_str(8) # 8 seems good enough - base_pod_id_len = max_pod_id_len - len(suffix) - 1 # -1 for separator - trimmed_pod_id = pod_id[:base_pod_id_len].rstrip("-.") - return f"{trimmed_pod_id}-{suffix}" - - -def merge_objects(base_obj, client_obj): - """ - Merge objects. - - :param base_obj: has the base attributes which are overwritten if they exist - in the client_obj and remain if they do not exist in the client_obj - :param client_obj: the object that the client wants to create. - :return: the merged objects - """ - if not base_obj: - return client_obj - if not client_obj: - return base_obj - - client_obj_cp = copy.deepcopy(client_obj) - - if isinstance(base_obj, dict) and isinstance(client_obj_cp, dict): - base_obj_cp = copy.deepcopy(base_obj) - base_obj_cp.update(client_obj_cp) - return base_obj_cp - - for base_key in base_obj.to_dict().keys(): - base_val = getattr(base_obj, base_key, None) - if not getattr(client_obj, base_key, None) and base_val: - if not isinstance(client_obj_cp, dict): - setattr(client_obj_cp, base_key, base_val) - else: - client_obj_cp[base_key] = base_val - return client_obj_cp - - -def extend_object_field(base_obj, client_obj, field_name): - """ - Add field values to existing objects. - - :param base_obj: an object which has a property `field_name` that is a list - :param client_obj: an object which has a property `field_name` that is a list. - A copy of this object is returned with `field_name` modified - :param field_name: the name of the list field - :return: the client_obj with the property `field_name` being the two properties appended - """ - client_obj_cp = copy.deepcopy(client_obj) - base_obj_field = getattr(base_obj, field_name, None) - client_obj_field = getattr(client_obj, field_name, None) - - if (not isinstance(base_obj_field, list) and base_obj_field is not None) or ( - not isinstance(client_obj_field, list) and client_obj_field is not None - ): - raise ValueError("The chosen field must be a list.") - - if not base_obj_field: - return client_obj_cp - if not client_obj_field: - setattr(client_obj_cp, field_name, base_obj_field) - return client_obj_cp - - appended_fields = base_obj_field + client_obj_field - setattr(client_obj_cp, field_name, appended_fields) - return client_obj_cp diff --git a/airflow/kubernetes/pre_7_4_0_compatibility/pod_generator_deprecated.py b/airflow/kubernetes/pre_7_4_0_compatibility/pod_generator_deprecated.py deleted file mode 100644 index 8dc56331b5bb6..0000000000000 --- a/airflow/kubernetes/pre_7_4_0_compatibility/pod_generator_deprecated.py +++ /dev/null @@ -1,309 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -""" -Backwards compatibility for Pod generation. - -This module provides an interface between the previous Pod -API and outputs a kubernetes.client.models.V1Pod. -The advantage being that the full Kubernetes API -is supported and no serialization need be written. -""" - -from __future__ import annotations - -import copy -import uuid - -import re2 -from kubernetes.client import models as k8s - -from airflow.utils.hashlib_wrapper import md5 - -MAX_POD_ID_LEN = 253 - -MAX_LABEL_LEN = 63 - - -class PodDefaults: - """Static defaults for Pods.""" - - XCOM_MOUNT_PATH = "/airflow/xcom" - SIDECAR_CONTAINER_NAME = "airflow-xcom-sidecar" - XCOM_CMD = 'trap "exit 0" INT; while true; do sleep 30; done;' - VOLUME_MOUNT = k8s.V1VolumeMount(name="xcom", mount_path=XCOM_MOUNT_PATH) - VOLUME = k8s.V1Volume(name="xcom", empty_dir=k8s.V1EmptyDirVolumeSource()) - SIDECAR_CONTAINER = k8s.V1Container( - name=SIDECAR_CONTAINER_NAME, - command=["sh", "-c", XCOM_CMD], - image="alpine", - volume_mounts=[VOLUME_MOUNT], - resources=k8s.V1ResourceRequirements( - requests={ - "cpu": "1m", - } - ), - ) - - -def make_safe_label_value(string): - """ - Normalize a provided label to be of valid length and characters. - - Valid label values must be 63 characters or less and must be empty or begin and - end with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_), - dots (.), and alphanumerics between. - - If the label value is greater than 63 chars once made safe, or differs in any - way from the original value sent to this function, then we need to truncate to - 53 chars, and append it with a unique hash. - """ - safe_label = re2.sub(r"^[^a-z0-9A-Z]*|[^a-zA-Z0-9_\-\.]|[^a-z0-9A-Z]*$", "", string) - - if len(safe_label) > MAX_LABEL_LEN or string != safe_label: - safe_hash = md5(string.encode()).hexdigest()[:9] - safe_label = safe_label[: MAX_LABEL_LEN - len(safe_hash) - 1] + "-" + safe_hash - - return safe_label - - -class PodGenerator: - """ - Contains Kubernetes Airflow Worker configuration logic. - - Represents a kubernetes pod and manages execution of a single pod. - Any configuration that is container specific gets applied to - the first container in the list of containers. - - :param image: The docker image - :param name: name in the metadata section (not the container name) - :param namespace: pod namespace - :param volume_mounts: list of kubernetes volumes mounts - :param envs: A dict containing the environment variables - :param cmds: The command to be run on the first container - :param args: The arguments to be run on the pod - :param labels: labels for the pod metadata - :param node_selectors: node selectors for the pod - :param ports: list of ports. Applies to the first container. - :param volumes: Volumes to be attached to the first container - :param image_pull_policy: Specify a policy to cache or always pull an image - :param restart_policy: The restart policy of the pod - :param image_pull_secrets: Any image pull secrets to be given to the pod. - If more than one secret is required, provide a comma separated list: - secret_a,secret_b - :param init_containers: A list of init containers - :param service_account_name: Identity for processes that run in a Pod - :param resources: Resource requirements for the first containers - :param annotations: annotations for the pod - :param affinity: A dict containing a group of affinity scheduling rules - :param hostnetwork: If True enable host networking on the pod - :param tolerations: A list of kubernetes tolerations - :param security_context: A dict containing the security context for the pod - :param configmaps: Any configmap refs to envfrom. - If more than one configmap is required, provide a comma separated list - configmap_a,configmap_b - :param dnspolicy: Specify a dnspolicy for the pod - :param schedulername: Specify a schedulername for the pod - :param pod: The fully specified pod. Mutually exclusive with `path_or_string` - :param extract_xcom: Whether to bring up a container for xcom - :param priority_class_name: priority class name for the launched Pod - """ - - def __init__( - self, - image: str | None = None, - name: str | None = None, - namespace: str | None = None, - volume_mounts: list[k8s.V1VolumeMount | dict] | None = None, - envs: dict[str, str] | None = None, - cmds: list[str] | None = None, - args: list[str] | None = None, - labels: dict[str, str] | None = None, - node_selectors: dict[str, str] | None = None, - ports: list[k8s.V1ContainerPort | dict] | None = None, - volumes: list[k8s.V1Volume | dict] | None = None, - image_pull_policy: str | None = None, - restart_policy: str | None = None, - image_pull_secrets: str | None = None, - init_containers: list[k8s.V1Container] | None = None, - service_account_name: str | None = None, - resources: k8s.V1ResourceRequirements | dict | None = None, - annotations: dict[str, str] | None = None, - affinity: dict | None = None, - hostnetwork: bool = False, - tolerations: list | None = None, - security_context: k8s.V1PodSecurityContext | dict | None = None, - configmaps: list[str] | None = None, - dnspolicy: str | None = None, - schedulername: str | None = None, - extract_xcom: bool = False, - priority_class_name: str | None = None, - ): - self.pod = k8s.V1Pod() - self.pod.api_version = "v1" - self.pod.kind = "Pod" - - # Pod Metadata - self.metadata = k8s.V1ObjectMeta() - self.metadata.labels = labels - self.metadata.name = name - self.metadata.namespace = namespace - self.metadata.annotations = annotations - - # Pod Container - self.container = k8s.V1Container(name="base") - self.container.image = image - self.container.env = [] - - if envs: - if isinstance(envs, dict): - for key, val in envs.items(): - self.container.env.append(k8s.V1EnvVar(name=key, value=val)) - elif isinstance(envs, list): - self.container.env.extend(envs) - - configmaps = configmaps or [] - self.container.env_from = [] - for configmap in configmaps: - self.container.env_from.append( - k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name=configmap)) - ) - - self.container.command = cmds or [] - self.container.args = args or [] - if image_pull_policy: - self.container.image_pull_policy = image_pull_policy - self.container.ports = ports or [] - self.container.resources = resources - self.container.volume_mounts = volume_mounts or [] - - # Pod Spec - self.spec = k8s.V1PodSpec(containers=[]) - self.spec.security_context = security_context - self.spec.tolerations = tolerations - if dnspolicy: - self.spec.dns_policy = dnspolicy - self.spec.scheduler_name = schedulername - self.spec.host_network = hostnetwork - self.spec.affinity = affinity - self.spec.service_account_name = service_account_name - self.spec.init_containers = init_containers - self.spec.volumes = volumes or [] - self.spec.node_selector = node_selectors - if restart_policy: - self.spec.restart_policy = restart_policy - self.spec.priority_class_name = priority_class_name - - self.spec.image_pull_secrets = [] - - if image_pull_secrets: - for image_pull_secret in image_pull_secrets.split(","): - self.spec.image_pull_secrets.append(k8s.V1LocalObjectReference(name=image_pull_secret)) - - # Attach sidecar - self.extract_xcom = extract_xcom - - def gen_pod(self) -> k8s.V1Pod: - """Generate pod.""" - result = None - - if result is None: - result = self.pod - result.spec = self.spec - result.metadata = self.metadata - result.spec.containers = [self.container] - - result.metadata.name = self.make_unique_pod_id(result.metadata.name) - - if self.extract_xcom: - result = self.add_sidecar(result) - - return result - - @staticmethod - def add_sidecar(pod: k8s.V1Pod) -> k8s.V1Pod: - """Add sidecar.""" - pod_cp = copy.deepcopy(pod) - pod_cp.spec.volumes = pod.spec.volumes or [] - pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME) - pod_cp.spec.containers[0].volume_mounts = pod_cp.spec.containers[0].volume_mounts or [] - pod_cp.spec.containers[0].volume_mounts.insert(0, PodDefaults.VOLUME_MOUNT) - pod_cp.spec.containers.append(PodDefaults.SIDECAR_CONTAINER) - - return pod_cp - - @staticmethod - def from_obj(obj) -> k8s.V1Pod | None: - """Convert to pod from obj.""" - if obj is None: - return None - - if isinstance(obj, PodGenerator): - return obj.gen_pod() - - if not isinstance(obj, dict): - raise TypeError( - "Cannot convert a non-dictionary or non-PodGenerator " - "object into a KubernetesExecutorConfig" - ) - - # We do not want to extract constant here from ExecutorLoader because it is just - # A name in dictionary rather than executor selection mechanism and it causes cyclic import - namespaced = obj.get("KubernetesExecutor", {}) - - if not namespaced: - return None - - resources = namespaced.get("resources") - - if resources is None: - requests = { - "cpu": namespaced.get("request_cpu"), - "memory": namespaced.get("request_memory"), - "ephemeral-storage": namespaced.get("ephemeral-storage"), - } - limits = { - "cpu": namespaced.get("limit_cpu"), - "memory": namespaced.get("limit_memory"), - "ephemeral-storage": namespaced.get("ephemeral-storage"), - } - all_resources = list(requests.values()) + list(limits.values()) - if all(r is None for r in all_resources): - resources = None - else: - resources = k8s.V1ResourceRequirements(requests=requests, limits=limits) - namespaced["resources"] = resources - return PodGenerator(**namespaced).gen_pod() - - @staticmethod - def make_unique_pod_id(dag_id): - r""" - Generate a unique Pod name. - - Kubernetes pod names must be <= 253 chars and must pass the following regex for - validation - ``^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$`` - - :param dag_id: a dag_id with only alphanumeric characters - :return: ``str`` valid Pod name of appropriate length - """ - if not dag_id: - return None - - safe_uuid = uuid.uuid4().hex - safe_pod_id = dag_id[: MAX_POD_ID_LEN - len(safe_uuid) - 1] + "-" + safe_uuid - - return safe_pod_id diff --git a/airflow/kubernetes/pre_7_4_0_compatibility/secret.py b/airflow/kubernetes/pre_7_4_0_compatibility/secret.py deleted file mode 100644 index d4d6a8aa044c5..0000000000000 --- a/airflow/kubernetes/pre_7_4_0_compatibility/secret.py +++ /dev/null @@ -1,125 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -"""Classes for interacting with Kubernetes API.""" - -from __future__ import annotations - -import copy -import uuid - -from kubernetes.client import models as k8s - -from airflow.exceptions import AirflowConfigException -from airflow.kubernetes.pre_7_4_0_compatibility.k8s_model import K8SModel - - -class Secret(K8SModel): - """Defines Kubernetes Secret Volume.""" - - def __init__(self, deploy_type, deploy_target, secret, key=None, items=None): - """ - Initialize a Kubernetes Secret Object. - - Used to track requested secrets from the user. - - :param deploy_type: The type of secret deploy in Kubernetes, either `env` or - `volume` - :param deploy_target: (Optional) The environment variable when - `deploy_type` `env` or file path when `deploy_type` `volume` where - expose secret. If `key` is not provided deploy target should be None. - :param secret: Name of the secrets object in Kubernetes - :param key: (Optional) Key of the secret within the Kubernetes Secret - if not provided in `deploy_type` `env` it will mount all secrets in object - :param items: (Optional) items that can be added to a volume secret for specifying projects of - secret keys to paths - https://kubernetes.io/docs/concepts/configuration/secret/#projection-of-secret-keys-to-specific-paths - """ - if deploy_type not in ("env", "volume"): - raise AirflowConfigException("deploy_type must be env or volume") - - self.deploy_type = deploy_type - self.deploy_target = deploy_target - self.items = items or [] - - if deploy_target is not None and deploy_type == "env": - # if deploying to env, capitalize the deploy target - self.deploy_target = deploy_target.upper() - - if key is not None and deploy_target is None: - raise AirflowConfigException("If `key` is set, `deploy_target` should not be None") - - self.secret = secret - self.key = key - - def to_env_secret(self) -> k8s.V1EnvVar: - """Store es environment secret.""" - return k8s.V1EnvVar( - name=self.deploy_target, - value_from=k8s.V1EnvVarSource( - secret_key_ref=k8s.V1SecretKeySelector(name=self.secret, key=self.key) - ), - ) - - def to_env_from_secret(self) -> k8s.V1EnvFromSource: - """Read from environment to secret.""" - return k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(name=self.secret)) - - def to_volume_secret(self) -> tuple[k8s.V1Volume, k8s.V1VolumeMount]: - """Convert to volume secret.""" - vol_id = f"secretvol{uuid.uuid4()}" - volume = k8s.V1Volume(name=vol_id, secret=k8s.V1SecretVolumeSource(secret_name=self.secret)) - if self.items: - volume.secret.items = self.items - return (volume, k8s.V1VolumeMount(mount_path=self.deploy_target, name=vol_id, read_only=True)) - - def attach_to_pod(self, pod: k8s.V1Pod) -> k8s.V1Pod: - """Attaches to pod.""" - cp_pod = copy.deepcopy(pod) - - if self.deploy_type == "volume": - volume, volume_mount = self.to_volume_secret() - if cp_pod.spec.volumes is None: - cp_pod.spec.volumes = [] - cp_pod.spec.volumes.append(volume) - if cp_pod.spec.containers[0].volume_mounts is None: - cp_pod.spec.containers[0].volume_mounts = [] - cp_pod.spec.containers[0].volume_mounts.append(volume_mount) - - if self.deploy_type == "env" and self.key is not None: - env = self.to_env_secret() - if cp_pod.spec.containers[0].env is None: - cp_pod.spec.containers[0].env = [] - cp_pod.spec.containers[0].env.append(env) - - if self.deploy_type == "env" and self.key is None: - env_from = self.to_env_from_secret() - if cp_pod.spec.containers[0].env_from is None: - cp_pod.spec.containers[0].env_from = [] - cp_pod.spec.containers[0].env_from.append(env_from) - - return cp_pod - - def __eq__(self, other): - return ( - self.deploy_type == other.deploy_type - and self.deploy_target == other.deploy_target - and self.secret == other.secret - and self.key == other.key - ) - - def __repr__(self): - return f"Secret({self.deploy_type}, {self.deploy_target}, {self.secret}, {self.key})" diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 54542a79c6a9c..71bc64c61aea6 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -1822,12 +1822,7 @@ def _has_kubernetes() -> bool: try: from kubernetes.client import models as k8s - try: - from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator - except ImportError: - from airflow.kubernetes.pre_7_4_0_compatibility.pod_generator import ( # type: ignore[assignment] - PodGenerator, - ) + from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator globals()["k8s"] = k8s globals()["PodGenerator"] = PodGenerator diff --git a/airflow/serialization/serializers/kubernetes.py b/airflow/serialization/serializers/kubernetes.py index f095400ee926a..faa2312ac7a81 100644 --- a/airflow/serialization/serializers/kubernetes.py +++ b/airflow/serialization/serializers/kubernetes.py @@ -44,12 +44,7 @@ def serialize(o: object) -> tuple[U, str, int, bool]: return "", "", 0, False if isinstance(o, (k8s.V1Pod, k8s.V1ResourceRequirements)): - try: - from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator - except ImportError: - from airflow.kubernetes.pre_7_4_0_compatibility.pod_generator import ( # type: ignore[assignment] - PodGenerator, - ) + from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator # We're running this in an except block, so we don't want it to fail # under any circumstances, e.g. accessing a non-existing attribute. diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py index e7e44ef254725..33822216f4db5 100644 --- a/airflow/utils/sqlalchemy.py +++ b/airflow/utils/sqlalchemy.py @@ -206,12 +206,8 @@ def ensure_pod_is_valid_after_unpickling(pod: V1Pod) -> V1Pod | None: if not isinstance(pod, V1Pod): return None try: - try: - from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator - except ImportError: - from airflow.kubernetes.pre_7_4_0_compatibility.pod_generator import ( # type: ignore[assignment] - PodGenerator, - ) + from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator + # now we actually reserialize / deserialize the pod pod_dict = sanitize_for_serialization(pod) return PodGenerator.deserialize_model_dict(pod_dict) diff --git a/newsfragments/41735.significant.rst b/newsfragments/41735.significant.rst new file mode 100644 index 0000000000000..5e6c717f0596f --- /dev/null +++ b/newsfragments/41735.significant.rst @@ -0,0 +1 @@ +Removed deprecated module ``airflow.kubernetes``.