Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable trigger logging in webserver #27758

Merged
merged 100 commits into from
Feb 4, 2023
Merged
Show file tree
Hide file tree
Changes from 75 commits
Commits
Show all changes
100 commits
Select commit Hold shift + click to select a range
e6d2b94
Enable trigger logging
dstandish Jan 10, 2023
666cc42
Don't emit end_of_log for task deferral
dstandish Jan 11, 2023
d36667f
add support for ES
dstandish Jan 11, 2023
c15d88a
get ES working again
dstandish Jan 12, 2023
db346d0
don't say 'starting' when resuming from deferral
dstandish Jan 12, 2023
52ece7e
fixup! get ES working again
dstandish Jan 12, 2023
be77d4e
make it optional to have logs in sep pane
dstandish Jan 12, 2023
e34a815
less joinedload
dstandish Jan 12, 2023
66e9a98
docstring
dstandish Jan 12, 2023
7aea462
use enum for reader mode
dstandish Jan 13, 2023
65018f0
use-enum
dstandish Jan 13, 2023
301907d
handle odd error
dstandish Jan 13, 2023
e63ebb5
enum
dstandish Jan 13, 2023
6409cda
no extra code
dstandish Jan 13, 2023
eeb66a6
add support for cloudwatch
dstandish Jan 13, 2023
4efce54
add support for gcp
dstandish Jan 13, 2023
933add0
--wip-- [skip ci]
dstandish Jan 14, 2023
3bf09e4
forward shut down logs arg to task runner
dstandish Jan 16, 2023
4f92a09
ensure that deferral return code propagates to local task job
dstandish Jan 16, 2023
81202bc
start the process of switching to single pane -- WIP!
dstandish Jan 16, 2023
4fd4f6d
revert changes to UI
dstandish Jan 16, 2023
32076b5
s3 ... untested
dstandish Jan 16, 2023
e37b449
fixups
dstandish Jan 17, 2023
ae387f0
fixups
dstandish Jan 17, 2023
3c05dbd
remove some removed stuff
dstandish Jan 17, 2023
5227dd3
fixup remove log type
dstandish Jan 17, 2023
abd39fc
Update airflow/cli/commands/task_command.py
dstandish Jan 17, 2023
dbe4467
top import
dstandish Jan 17, 2023
6284dee
pluggable timestamp parser
dstandish Jan 17, 2023
e17420c
fix handling when mult messages same timestamp
dstandish Jan 17, 2023
a597a5c
comment
dstandish Jan 17, 2023
f034542
no read local if fin
dstandish Jan 17, 2023
f99e1de
no needed
dstandish Jan 17, 2023
fc5fc39
not needed
dstandish Jan 18, 2023
d4f78c1
docstring
dstandish Jan 18, 2023
41d55d6
docstring
dstandish Jan 18, 2023
524f1bb
support wasb
dstandish Jan 19, 2023
837c8db
todo
dstandish Jan 19, 2023
d0b980d
fix es eol
dstandish Jan 19, 2023
85bb418
fix ui logs load bug
dstandish Jan 19, 2023
a3f0e05
simplif
dstandish Jan 19, 2023
5d691dd
can disable trigger logging
dstandish Jan 21, 2023
b13f833
no queues for cloudwatch -- avoids many threads
dstandish Jan 23, 2023
3306bf1
wip -- got "native" trigger handlers working with stackdriver
dstandish Jan 25, 2023
3fc9ab8
fixup
dstandish Jan 25, 2023
a5fac73
fixup
dstandish Jan 25, 2023
ec9c6e4
Merge branch 'main' into enable-trigger-logging
dstandish Jan 25, 2023
a3c8ecf
documentation
dstandish Jan 26, 2023
5302139
docs
dstandish Jan 26, 2023
bf98112
fix some tests
dstandish Jan 26, 2023
1f3cca2
fix tests
dstandish Jan 27, 2023
47ce171
fix static checks
dstandish Jan 27, 2023
8598ade
fix static checks
dstandish Jan 27, 2023
66f4def
provider future compat
dstandish Jan 27, 2023
ce271f0
fix tests
dstandish Jan 28, 2023
c533c40
Merge SQLAlchemy query options
uranusjr Jan 30, 2023
ef6568b
Simplify is_trigger_log_context declaration
uranusjr Jan 30, 2023
8f5ccb8
Simplify typing when Path is involved
uranusjr Jan 30, 2023
5766454
Log message format
uranusjr Jan 30, 2023
6c5bcbf
Fix task command tests
uranusjr Jan 30, 2023
7bf84c4
cache executor method
dstandish Jan 30, 2023
0186d9f
Merge branch 'main' into enable-trigger-logging
dstandish Jan 30, 2023
2338501
stackdriver fix
dstandish Jan 30, 2023
fdb5d16
--wip-- [skip ci]
dstandish Jan 30, 2023
376c328
Revert "--wip-- [skip ci]"
dstandish Jan 30, 2023
c6c3ed9
Revert "forward shut down logs arg to task runner"
dstandish Jan 30, 2023
f931a4f
fix tests
dstandish Jan 31, 2023
7102ace
fix tests
dstandish Jan 31, 2023
1e86a5e
fix tests
dstandish Jan 31, 2023
3dfdcbd
fix tests
dstandish Jan 31, 2023
d974d22
fix tests
dstandish Jan 31, 2023
74bff9b
fix tests
dstandish Jan 31, 2023
5a0cf5f
Merge branch 'main' into enable-trigger-logging
dstandish Jan 31, 2023
2b626f5
fix tests
dstandish Feb 1, 2023
549542a
fix tests
dstandish Feb 1, 2023
9308787
Update tests/charts/test_extra_env_env_from.py
dstandish Feb 2, 2023
fa86951
Update docs/apache-airflow/administration-and-deployment/logging-moni…
dstandish Feb 2, 2023
e02c2ba
Update docs/apache-airflow/administration-and-deployment/logging-moni…
dstandish Feb 2, 2023
9b0c177
Update docs/apache-airflow/administration-and-deployment/logging-moni…
dstandish Feb 2, 2023
2b54b39
Update docs/apache-airflow/administration-and-deployment/logging-moni…
dstandish Feb 2, 2023
64ecb1a
Update docs/apache-airflow/administration-and-deployment/logging-moni…
dstandish Feb 2, 2023
e657b65
Update docs/apache-airflow/administration-and-deployment/logging-moni…
dstandish Feb 2, 2023
0628fd3
Update chart/templates/triggerer/triggerer-service.yaml
dstandish Feb 2, 2023
74a12c1
restore just some of the original phrasing re however vs but 😛
dstandish Feb 2, 2023
36c7e8c
Revert "Update chart/templates/triggerer/triggerer-service.yaml"
dstandish Feb 2, 2023
1020340
Revert "Revert "Update chart/templates/triggerer/triggerer-service.ya…
dstandish Feb 2, 2023
76a75c5
add ver check for service name
dstandish Feb 2, 2023
d65fcc1
version guard networkpolicty
dstandish Feb 2, 2023
d1f387e
format
dstandish Feb 2, 2023
69c0688
no version guard in triggerer sts
dstandish Feb 2, 2023
3bd4877
no STS in triggerer dep if < 2.6
dstandish Feb 3, 2023
31a88cb
no port if < 2.6
dstandish Feb 3, 2023
4799f1e
no comment
dstandish Feb 3, 2023
ccd82ea
template comment
dstandish Feb 3, 2023
1a090f3
Merge branch 'main' into enable-trigger-logging
dstandish Feb 3, 2023
611ae28
update chart default airflow version
dstandish Feb 3, 2023
566814d
Revert "update chart default airflow version"
dstandish Feb 3, 2023
522524c
revert chart tests
dstandish Feb 3, 2023
3b3d1bf
fix tests
dstandish Feb 3, 2023
d097d5b
fix tests
dstandish Feb 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions airflow/api_connexion/endpoints/log_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from flask import Response, request
from itsdangerous.exc import BadSignature
from itsdangerous.url_safe import URLSafeSerializer
from sqlalchemy.orm import joinedload
from sqlalchemy.orm.session import Session

from airflow.api_connexion import security
Expand Down Expand Up @@ -73,9 +74,10 @@ def get_log(
metadata["download_logs"] = False

task_log_reader = TaskLogReader()

if not task_log_reader.supports_read:
raise BadRequest("Task log handler does not support read logs.")
ti = (
query = (
session.query(TaskInstance)
.filter(
TaskInstance.task_id == task_id,
Expand All @@ -84,8 +86,10 @@ def get_log(
TaskInstance.map_index == map_index,
)
.join(TaskInstance.dag_run)
.one_or_none()
.options(joinedload("trigger"))
dstandish marked this conversation as resolved.
Show resolved Hide resolved
.options(joinedload("trigger.triggerer_job"))
)
ti = query.one_or_none()
if ti is None:
metadata["end_of_log"] = True
raise NotFound(title="TaskInstance not found")
Expand Down
1 change: 1 addition & 0 deletions airflow/cli/cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2072,6 +2072,7 @@ class GroupCommand(NamedTuple):
ARG_LOG_FILE,
ARG_CAPACITY,
ARG_VERBOSE,
ARG_SKIP_SERVE_LOGS,
),
),
ActionCommand(
Expand Down
30 changes: 19 additions & 11 deletions airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from airflow.models.dag import DAG
from airflow.models.dagrun import DagRun
from airflow.models.operator import needs_expansion
from airflow.models.taskinstance import TaskReturnCode
from airflow.settings import IS_K8S_EXECUTOR_POD
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
Expand All @@ -58,6 +59,7 @@
suppress_logs_and_warning,
)
from airflow.utils.dates import timezone
from airflow.utils.log.file_task_handler import _set_task_deferred_context_var
from airflow.utils.log.logging_mixin import StreamLogWriter
from airflow.utils.log.secrets_masker import RedactedIO
from airflow.utils.net import get_hostname
Expand Down Expand Up @@ -182,7 +184,7 @@ def _get_ti(
return ti, dr_created


def _run_task_by_selected_method(args, dag: DAG, ti: TaskInstance) -> None:
def _run_task_by_selected_method(args, dag: DAG, ti: TaskInstance) -> None | TaskReturnCode:
"""
Runs the task based on a mode.

Expand All @@ -193,11 +195,11 @@ def _run_task_by_selected_method(args, dag: DAG, ti: TaskInstance) -> None:
- by executor
"""
if args.local:
_run_task_by_local_task_job(args, ti)
return _run_task_by_local_task_job(args, ti)
elif args.raw:
_run_raw_task(args, ti)
return _run_raw_task(args, ti)
else:
_run_task_by_executor(args, dag, ti)
return _run_task_by_executor(args, dag, ti)


def _run_task_by_executor(args, dag, ti):
Expand Down Expand Up @@ -239,7 +241,7 @@ def _run_task_by_executor(args, dag, ti):
executor.end()


def _run_task_by_local_task_job(args, ti):
def _run_task_by_local_task_job(args, ti) -> TaskReturnCode | None:
"""Run LocalTaskJob, which monitors the raw task execution process."""
run_job = LocalTaskJob(
task_instance=ti,
Expand All @@ -254,11 +256,14 @@ def _run_task_by_local_task_job(args, ti):
external_executor_id=_extract_external_executor_id(args),
)
try:
run_job.run()
ret = run_job.run()

finally:
if args.shut_down_logging:
logging.shutdown()
with suppress(ValueError):
return TaskReturnCode(ret)
return None


RAW_TASK_UNSUPPORTED_OPTION = [
Expand All @@ -269,9 +274,9 @@ def _run_task_by_local_task_job(args, ti):
]


def _run_raw_task(args, ti: TaskInstance) -> None:
def _run_raw_task(args, ti: TaskInstance) -> None | TaskReturnCode:
"""Runs the main task handling code."""
ti._run_raw_task(
return ti._run_raw_task(
mark_success=args.mark_success,
job_id=args.job_id,
pool=args.pool,
Expand Down Expand Up @@ -407,18 +412,21 @@ def task_run(args, dag=None):
# this should be last thing before running, to reduce likelihood of an open session
# which can cause trouble if running process in a fork.
settings.reconfigure_orm(disable_connection_pool=True)

task_return_code = None
try:
if args.interactive:
_run_task_by_selected_method(args, dag, ti)
task_return_code = _run_task_by_selected_method(args, dag, ti)
else:
with _move_task_handlers_to_root(ti), _redirect_stdout_to_ti_log(ti):
_run_task_by_selected_method(args, dag, ti)
task_return_code = _run_task_by_selected_method(args, dag, ti)
if task_return_code == TaskReturnCode.DEFERRED:
_set_task_deferred_context_var()
finally:
try:
get_listener_manager().hook.before_stopping(component=TaskCommandMarker())
except Exception:
pass
return task_return_code


@cli_utils.action_cli(check_db=False)
Expand Down
29 changes: 25 additions & 4 deletions airflow/cli/commands/triggerer_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,35 @@
from __future__ import annotations

import signal
from contextlib import contextmanager
from functools import partial
from multiprocessing import Process
from typing import Generator

import daemon
from daemon.pidfile import TimeoutPIDLockFile

from airflow import settings
from airflow.configuration import conf
from airflow.jobs.triggerer_job import TriggererJob
from airflow.utils import cli as cli_utils
from airflow.utils.cli import setup_locations, setup_logging, sigint_handler, sigquit_handler
from airflow.utils.serve_logs import serve_logs


@contextmanager
def _serve_logs(skip_serve_logs: bool = False) -> Generator[None, None, None]:
"""Starts serve_logs sub-process"""
sub_proc = None
if skip_serve_logs is False:
port = conf.getint("logging", "trigger_log_server_port", fallback=8794)
sub_proc = Process(target=partial(serve_logs, port=port))
sub_proc.start()
try:
yield
finally:
if sub_proc:
sub_proc.terminate()


@cli_utils.action_cli
Expand All @@ -44,18 +65,18 @@ def triggerer(args):
stdout_handle.truncate(0)
stderr_handle.truncate(0)

ctx = daemon.DaemonContext(
daemon_context = daemon.DaemonContext(
pidfile=TimeoutPIDLockFile(pid, -1),
files_preserve=[handle],
stdout=stdout_handle,
stderr=stderr_handle,
umask=int(settings.DAEMON_UMASK, 8),
)
with ctx:
with daemon_context, _serve_logs(args.skip_serve_logs):
job.run()

else:
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
signal.signal(signal.SIGQUIT, sigquit_handler)
job.run()
with _serve_logs(args.skip_serve_logs):
job.run()
18 changes: 18 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,24 @@ logging:
type: string
example: ~
default: "8793"
trigger_log_server_port:
description: |
Port to serve logs from for triggerer. See worker_log_server_port description
for more info.
version_added: 2.6.0
type: string
example: ~
default: "8794"
interleave_timestamp_parser:
description: |
We must parse timestamps to interleave logs between trigger and task. To do so,
we need to parse timestamps in log files. In case your log format is non-standard,
you may provide import path to callable which takes a string log line and returns
the timestamp (datetime.datetime compatible).
version_added: 2.6.0
type: string
example: path.to.my_func
default: ~
metrics:
description: |
StatsD (https://github.com/etsy/statsd) integration settings.
Expand Down
11 changes: 11 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,17 @@ extra_logger_names =
# visible from the main web server to connect into the workers.
worker_log_server_port = 8793

# Port to serve logs from for triggerer. See worker_log_server_port description
# for more info.
trigger_log_server_port = 8794

# We must parse timestamps to interleave logs between trigger and task. To do so,
# we need to parse timestamps in log files. In case your log format is non-standard,
# you may provide import path to callable which takes a string log line and returns
# the timestamp (datetime.datetime compatible).
# Example: interleave_timestamp_parser = path.to.my_func
# interleave_timestamp_parser =

[metrics]

# StatsD (https://github.com/etsy/statsd) integration settings.
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_time_delta_sensor_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@
catchup=False,
tags=["example"],
) as dag:
wait = TimeDeltaSensorAsync(task_id="wait", delta=datetime.timedelta(seconds=10))
wait = TimeDeltaSensorAsync(task_id="wait", delta=datetime.timedelta(seconds=30))
dstandish marked this conversation as resolved.
Show resolved Hide resolved
finish = EmptyOperator(task_id="finish")
wait >> finish
3 changes: 2 additions & 1 deletion airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,14 +355,15 @@ def execute_async(
"""
raise NotImplementedError()

def get_task_log(self, ti: TaskInstance, log: str = "") -> None | str | tuple[str, dict[str, bool]]:
def get_task_log(self, ti: TaskInstance) -> tuple[list[str], list[str]]:
dstandish marked this conversation as resolved.
Show resolved Hide resolved
"""
This method can be implemented by any child class to return the task logs.

:param ti: A TaskInstance object
:param log: log str
:return: logs or tuple of logs and meta dict
"""
return [], []

def end(self) -> None: # pragma: no cover
"""Wait synchronously for the previously submitted job to complete."""
Expand Down
6 changes: 3 additions & 3 deletions airflow/executors/celery_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,11 @@ def queue_task_instance(
cfg_path=cfg_path,
)

def get_task_log(self, ti: TaskInstance, log: str = "") -> None | str | tuple[str, dict[str, bool]]:
def get_task_log(self, ti: TaskInstance) -> tuple[list[str], list[str]]:
"""Fetch task log from Kubernetes executor"""
if ti.queue == self.kubernetes_executor.kubernetes_queue:
return self.kubernetes_executor.get_task_log(ti=ti, log=log)
return None
return self.kubernetes_executor.get_task_log(ti=ti)
return [], []

def has_task(self, task_instance: TaskInstance) -> bool:
"""
Expand Down
19 changes: 9 additions & 10 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,14 +781,16 @@ def _get_pod_namespace(ti: TaskInstance):
namespace = pod_override.metadata.namespace
return namespace or conf.get("kubernetes_executor", "namespace", fallback="default")

def get_task_log(self, ti: TaskInstance, log: str = "") -> str | tuple[str, dict[str, bool]]:

def get_task_log(self, ti: TaskInstance) -> tuple[list[str], list[str]]:
messages = []
log = []
try:
from airflow.kubernetes.kube_client import get_kube_client
from airflow.kubernetes.pod_generator import PodGenerator

client = get_kube_client()

log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
messages.append(f"Trying to get logs (last 100 lines) from worker pod {ti.hostname}")
selector = PodGenerator.build_selector_for_k8s_executor_pod(
dag_id=ti.dag_id,
task_id=ti.task_id,
Expand Down Expand Up @@ -816,13 +818,10 @@ def get_task_log(self, ti: TaskInstance, log: str = "") -> str | tuple[str, dict
)

for line in res:
log += line.decode()

return log

except Exception as f:
log += f"*** Unable to fetch logs from worker pod {ti.hostname} ***\n{str(f)}\n\n"
return log, {"end_of_log": True}
log.append(line.decode())
except Exception as e:
messages.append(f"Reading from k8s pod logs failed: {str(e)}")
return messages, ["\n".join(log)]

def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]:
tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
Expand Down
7 changes: 3 additions & 4 deletions airflow/executors/local_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,11 @@ def queue_task_instance(
cfg_path=cfg_path,
)

def get_task_log(self, ti: TaskInstance, log: str = "") -> None | str | tuple[str, dict[str, bool]]:
def get_task_log(self, ti: TaskInstance) -> tuple[list[str], list[str]]:
"""Fetch task log from kubernetes executor"""
if ti.queue == self.kubernetes_executor.kubernetes_queue:
return self.kubernetes_executor.get_task_log(ti=ti, log=log)

return None
return self.kubernetes_executor.get_task_log(ti=ti)
return [], []

def has_task(self, task_instance: TaskInstance) -> bool:
"""
Expand Down
4 changes: 3 additions & 1 deletion airflow/jobs/base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,14 +249,15 @@ def run(self):
"""Starts the job."""
Stats.incr(self.__class__.__name__.lower() + "_start", 1, 1)
# Adding an entry in the DB
ret = None
with create_session() as session:
self.state = State.RUNNING
session.add(self)
session.commit()
make_transient(self)

try:
self._execute()
ret = self._execute()
# In case of max runs or max duration
self.state = State.SUCCESS
except SystemExit:
Expand All @@ -272,6 +273,7 @@ def run(self):
session.commit()

Stats.incr(self.__class__.__name__.lower() + "_end", 1, 1)
return ret

def _execute(self):
raise NotImplementedError("This method needs to be overridden")
Loading