Skip to content

Commit

Permalink
Refactor Airflow operator (#191) (#3869)
Browse files Browse the repository at this point in the history
  • Loading branch information
masipauskas authored Aug 8, 2024
1 parent 407b849 commit d82fcfd
Show file tree
Hide file tree
Showing 19 changed files with 431 additions and 1,623 deletions.
114 changes: 8 additions & 106 deletions docs/python_airflow_operator.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@ and handles job cancellation if the Airflow task is killed.
* **job_request** (*JobSubmitRequestItem*) –


* **job_set_prefix** (*str** | **None*) –
* **job_set_prefix** (*Optional**[**str**]*) –


* **lookout_url_template** (*str** | **None*) –
* **lookout_url_template** (*Optional**[**str**]*) –


* **poll_interval** (*int*) –


* **container_logs** (*str** | **None*) –
* **container_logs** (*Optional**[**str**]*) –


* **k8s_token_retriever** (*TokenRetriever** | **None*) –
* **k8s_token_retriever** (*Optional**[**TokenRetriever**]*) –


* **deferrable** (*bool*) –
Expand Down Expand Up @@ -89,19 +89,7 @@ operator needs to be cleaned up, or it will leave ghost processes behind.



#### pod_manager(k8s_context)

* **Parameters**

**k8s_context** (*str*) –



* **Return type**

*PodLogManager*


#### _property_ pod_manager(_: KubernetesPodLogManage_ )

#### render_template_fields(context, jinja_env=None)
Template all attributes listed in self.template_fields.
Expand Down Expand Up @@ -147,7 +135,7 @@ Initializes a new ArmadaOperator.
* **job_request** (*JobSubmitRequestItem*) – The job to be submitted to Armada.


* **job_set_prefix** (*Optional**[**str**]*) – A string to prepend to the jobSet name
* **job_set_prefix** (*Optional**[**str**]*) – A string to prepend to the jobSet name.


* **lookout_url_template** – Template for creating lookout links. If not specified
Expand All @@ -170,95 +158,9 @@ acknowledged by Armada.
:type job_acknowledgement_timeout: int
:param kwargs: Additional keyword arguments to pass to the BaseOperator.

## armada.triggers.armada module


### _class_ armada.triggers.armada.ArmadaTrigger(job_id, armada_queue, job_set_id, poll_interval, tracking_message, job_acknowledgement_timeout, job_request_namespace, channel_args=None, channel_args_details=None, container_logs=None, k8s_token_retriever=None, k8s_token_retriever_details=None, last_log_time=None)
Bases: `BaseTrigger`

An Airflow Trigger that can asynchronously manage an Armada job.


* **Parameters**


* **job_id** (*str*) –


* **armada_queue** (*str*) –


* **job_set_id** (*str*) –


* **poll_interval** (*int*) –


* **tracking_message** (*str*) –


* **job_acknowledgement_timeout** (*int*) –


* **job_request_namespace** (*str*) –


* **channel_args** (*GrpcChannelArgs*) –


* **channel_args_details** (*Dict**[**str**, **Any**]*) –


* **container_logs** (*str** | **None*) –


* **k8s_token_retriever** (*TokenRetriever** | **None*) –


* **k8s_token_retriever_details** (*Tuple**[**str**, **Dict**[**str**, **Any**]**] **| **None*) –


* **last_log_time** (*DateTime** | **None*) –



#### _property_ client(_: ArmadaAsyncIOClien_ )

#### pod_manager(k8s_context)

* **Parameters**

**k8s_context** (*str*) –



* **Return type**

*PodLogManagerAsync*



#### _async_ run()
Run the Trigger Asynchronously. This will poll Armada until the Job reaches a
terminal state


* **Return type**

*AsyncIterator*[*TriggerEvent*]



#### serialize()
Serialises the state of this Trigger.
When the Trigger is re-hydrated, these values will be passed to init() as kwargs
:return:


* **Return type**

tuple

### armada.operators.armada.log_exceptions(method)
## armada.triggers.armada module

## armada.auth module

Expand Down
3 changes: 1 addition & 2 deletions third_party/airflow/armada/auth.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from typing import Dict, Any, Tuple, Protocol

from typing import Any, Dict, Protocol, Tuple

""" We use this interface for objects fetching Kubernetes auth tokens. Since
it's used within the Trigger, it must be serialisable."""
Expand Down
143 changes: 143 additions & 0 deletions third_party/airflow/armada/log_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
from __future__ import annotations

import math
import threading
from http.client import HTTPResponse
from typing import Dict, List, Optional, Tuple, cast

import pendulum
from airflow.utils.log.logging_mixin import LoggingMixin
from armada.auth import TokenRetriever
from kubernetes import client, config
from pendulum import DateTime
from pendulum.parsing.exceptions import ParserError
from urllib3.exceptions import HTTPError


class KubernetesPodLogManager(LoggingMixin):
"""Monitor logs of Kubernetes pods asynchronously."""

CLIENTS_LOCK = threading.Lock()
CLIENTS: Dict[str, client.CoreV1Api] = {}

def __init__(
self,
token_retriever: Optional[TokenRetriever] = None,
):
"""
Create PodLogManger.
:param token_retriever: Retrieves auth tokens
"""
super().__init__()
self._token_retriever = token_retriever

def _k8s_client(self, k8s_context) -> client.CoreV1Api:
"""
K8S Clients are expensive to initialize (especially loading configuration).
We cache them per context in class level cache.
Access to this method can be from multiple-threads.
"""
if k8s_context not in KubernetesPodLogManager.CLIENTS:
with KubernetesPodLogManager.CLIENTS_LOCK:
configuration = client.Configuration()
config.load_kube_config(
client_configuration=configuration, context=k8s_context
)
k8s_client = client.CoreV1Api(
api_client=client.ApiClient(configuration=configuration)
)
k8s_client.api_client.configuration.api_key_prefix["authorization"] = (
"Bearer"
)
KubernetesPodLogManager.CLIENTS[k8s_context] = k8s_client
return KubernetesPodLogManager.CLIENTS[k8s_context]

def _with_bearer_auth(self, client):
client.api_client.configuration.api_key["authorization"] = (
self._token_retriever.get_token()
)

def fetch_container_logs(
self,
*,
k8s_context: str,
namespace: str,
pod: str,
container: str,
since_time: Optional[DateTime],
) -> Optional[DateTime]:
"""
Fetches container logs, do not follow container logs.
"""
client = self._k8s_client(k8s_context)
self._with_bearer_auth(client)
since_seconds = (
math.ceil((pendulum.now() - since_time).total_seconds())
if since_time
else None
)
try:
logs = client.read_namespaced_pod_log(
namespace=namespace,
name=pod,
container=container,
follow=False,
timestamps=True,
since_seconds=since_seconds,
_preload_content=False,
)
if logs.status == 404:
self.log.warning(f"Unable to fetch logs - pod {pod} has been deleted.")
return since_time
except HTTPError as e:
self.log.exception(f"There was an error reading the kubernetes API: {e}.")
raise

return self._stream_logs(container, since_time, logs)

def _stream_logs(
self, container: str, since_time: Optional[DateTime], logs: HTTPResponse
) -> Optional[DateTime]:
messages: List[str] = []
message_timestamp = None
try:
chunk = logs.read()
lines = chunk.decode("utf-8", errors="backslashreplace").splitlines()
for raw_line in lines:
line_timestamp, message = self._parse_log_line(raw_line)

if line_timestamp: # detect new log-line (starts with timestamp)
if since_time and line_timestamp <= since_time:
continue
self._log_container_message(container, messages)
messages.clear()
message_timestamp = line_timestamp
messages.append(message)
except HTTPError as e:
self.log.warning(
f"Reading of logs interrupted for container {container} with error {e}."
)

self._log_container_message(container, messages)
return message_timestamp

def _log_container_message(self, container: str, messages: List[str]):
if messages:
self.log.info("[%s] %s", container, "\n".join(messages))

def _parse_log_line(self, line: bytes) -> Tuple[DateTime | None, str]:
"""
Parse K8s log line and returns the final state.
:param line: k8s log line
:return: timestamp and log message
"""
timestamp, sep, message = line.strip().partition(" ")
if not sep:
return None, line
try:
last_log_time = cast(DateTime, pendulum.parse(timestamp))
except ParserError:
return None, line
return last_log_time, message
Loading

0 comments on commit d82fcfd

Please sign in to comment.