Skip to content

Commit

Permalink
Merge branch 'main' into dataflow-job-id-pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
lukas-mi authored Aug 28, 2024
2 parents 9cea2fc + e55ecd5 commit 4c9ac32
Show file tree
Hide file tree
Showing 14 changed files with 190 additions and 17 deletions.
9 changes: 9 additions & 0 deletions PROVIDERS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,15 @@ the community. Also it is often easier to advertise and promote usage of the pro
themselves when they own, manage and release their provider, especially when they can synchronize releases
of their provider with new feature, the service might get added.

Examples:

Huawei Cloud provider - `Discussion <https://lists.apache.org/thread/f5tk9c734wlyv616vyy8r34ymth3dqbc>`_
Cloudera provider - `Discussion <https://lists.apache.org/thread/2z0lvgj466ksxxrbvofx41qvn03jrwwb>`_, `Vote <https://lists.apache.org/thread/8b1jvld3npgzz2z0o3gv14lvtornbdrm>`_
PgVector / Weaviate/ OpenAI provider - `Discussion <https://lists.apache.org/thread/0d669fmy4hn29h5c0wj0ottdskd77ktp>`_, `Lazy Consensus vote <https://lists.apache.org/thread/zrq6554lwobhngtwyzp7tpgnyfsxxybh>`_
Pinecone / OpenAI / Cohere provider - `Discussion <https://lists.apache.org/thread/0d669fmy4hn29h5c0wj0ottdskd77ktp>`_, `Vote <https://lists.apache.org/thread/skh32jksvcf4yx4fhhsfz8lq6w5nhfjc>`_, `VOTE Result <https://lists.apache.org/thread/oq7h2n88zfo3dzldy5w8xlp9kyngs7x8>`_

Note that some providers have regular vote and some lazy consensus, please refer to the above sections for explanation why it's not the same for all providers

Community providers release process
-----------------------------------

Expand Down
10 changes: 8 additions & 2 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,14 +526,20 @@ def deactivate_stale_dags(
dags_parsed = session.execute(query)

for dag in dags_parsed:
# When the DAG processor runs as part of the scheduler, and the user changes the DAGs folder,
# DAGs from the previous DAGs folder will be marked as stale. Note that this change has no impact
# on standalone DAG processors.
dag_not_in_current_dag_folder = os.path.commonpath([dag.fileloc, dag_directory]) != dag_directory
# The largest valid difference between a DagFileStat's last_finished_time and a DAG's
# last_parsed_time is the processor_timeout. Longer than that indicates that the DAG is
# no longer present in the file. We have a stale_dag_threshold configured to prevent a
# significant delay in deactivation of stale dags when a large timeout is configured
if (
dag_removed_from_dag_folder_or_file = (
dag.fileloc in last_parsed
and (dag.last_parsed_time + timedelta(seconds=stale_dag_threshold)) < last_parsed[dag.fileloc]
):
)

if dag_not_in_current_dag_folder or dag_removed_from_dag_folder_or_file:
cls.logger().info("DAG %s is missing and will be deactivated.", dag.dag_id)
to_deactivate.add(dag.dag_id)

Expand Down
8 changes: 4 additions & 4 deletions airflow/providers/google/cloud/operators/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,11 +795,11 @@ def execute(self, context: Context) -> list[str]:
orig_end = context["data_interval_end"]
except KeyError:
orig_start = pendulum.instance(context["execution_date"])
following_execution_date = context["dag"].following_schedule(context["execution_date"])
if following_execution_date is None:
orig_end = None
next_dagrun = context["dag"].next_dagrun_info(last_automated_dagrun=None, restricted=False)
if next_dagrun and next_dagrun.data_interval and next_dagrun.data_interval.end:
orig_end = next_dagrun.data_interval.end
else:
orig_end = pendulum.instance(following_execution_date)
orig_end = None

timespan_start = orig_start
if orig_end is None: # Only possible in Airflow before 2.2.
Expand Down
2 changes: 1 addition & 1 deletion airflow/triggers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def handle_submit(self, *, task_instance: TaskInstance, session: Session = NEW_S
"""
# Mark the task with terminal state and prevent it from resuming on worker
task_instance.trigger_id = None
task_instance.state = self.task_instance_state
task_instance.set_state(self.task_instance_state, session=session)
self._submit_callback_if_necessary(task_instance=task_instance, session=session)
self._push_xcoms_if_necessary(task_instance=task_instance)

Expand Down
3 changes: 3 additions & 0 deletions chart/templates/triggerer/triggerer-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ spec:
{{- end }}
tolerations: {{- toYaml $tolerations | nindent 8 }}
topologySpreadConstraints: {{- toYaml $topologySpreadConstraints | nindent 8 }}
{{- if .Values.triggerer.hostAliases }}
hostAliases: {{- toYaml .Values.triggerer.hostAliases | nindent 8 }}
{{- end }}
terminationGracePeriodSeconds: {{ .Values.triggerer.terminationGracePeriodSeconds }}
restartPolicy: Always
serviceAccountName: {{ include "triggerer.serviceAccountName" . }}
Expand Down
22 changes: 22 additions & 0 deletions chart/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -2806,6 +2806,28 @@
"type": "boolean",
"default": true
},
"hostAliases": {
"description": "HostAliases for the triggerer pod.",
"items": {
"$ref": "#/definitions/io.k8s.api.core.v1.HostAlias"
},
"type": "array",
"default": [],
"examples": [
{
"ip": "127.0.0.1",
"hostnames": [
"foo.local"
]
},
{
"ip": "10.1.2.3",
"hostnames": [
"foo.remote"
]
}
]
},
"livenessProbe": {
"description": "Liveness probe configuration for triggerer.",
"type": "object",
Expand Down
9 changes: 9 additions & 0 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1623,6 +1623,15 @@ triggerer:
tolerations: []
topologySpreadConstraints: []

# hostAliases for the triggerer pod
hostAliases: []
# - ip: "127.0.0.1"
# hostnames:
# - "foo.local"
# - ip: "10.1.2.3"
# hostnames:
# - "foo.remote"

priorityClassName: ~

# annotations for the triggerer deployment
Expand Down
20 changes: 20 additions & 0 deletions generated/provider_metadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -2167,6 +2167,10 @@
"3.8.0": {
"associated_airflow_version": "2.10.0",
"date_released": "2024-08-22T10:37:58Z"
},
"3.8.1": {
"associated_airflow_version": "2.10.0",
"date_released": "2024-08-28T10:31:24Z"
}
},
"cloudant": {
Expand Down Expand Up @@ -2491,6 +2495,10 @@
"8.4.0": {
"associated_airflow_version": "2.10.0",
"date_released": "2024-08-22T10:37:58Z"
},
"8.4.1": {
"associated_airflow_version": "2.10.0",
"date_released": "2024-08-28T10:31:24Z"
}
},
"cohere": {
Expand Down Expand Up @@ -6085,6 +6093,10 @@
"1.10.0": {
"associated_airflow_version": "2.10.0",
"date_released": "2024-08-06T20:34:43Z"
},
"1.11.0": {
"associated_airflow_version": "2.10.0",
"date_released": "2024-08-28T10:31:24Z"
}
},
"opensearch": {
Expand Down Expand Up @@ -8221,6 +8233,10 @@
"3.13.0": {
"associated_airflow_version": "2.10.0",
"date_released": "2024-08-22T10:37:57Z"
},
"3.13.1": {
"associated_airflow_version": "2.10.0",
"date_released": "2024-08-28T10:31:24Z"
}
},
"tableau": {
Expand Down Expand Up @@ -8373,6 +8389,10 @@
"1.6.0": {
"associated_airflow_version": "2.9.2",
"date_released": "2024-08-22T10:37:58Z"
},
"1.6.1": {
"associated_airflow_version": "2.9.2",
"date_released": "2024-08-28T10:31:24Z"
}
},
"telegram": {
Expand Down
13 changes: 13 additions & 0 deletions helm_tests/airflow_core/test_triggerer.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,19 @@ def test_should_add_component_specific_annotations(self):
assert "annotations" in jmespath.search("metadata", docs[0])
assert jmespath.search("metadata.annotations", docs[0])["test_annotation"] == "test_annotation_value"

def test_triggerer_pod_hostaliases(self):
docs = render_chart(
values={
"triggerer": {
"hostAliases": [{"ip": "127.0.0.1", "hostnames": ["foo.local"]}],
},
},
show_only=["templates/triggerer/triggerer-deployment.yaml"],
)

assert "127.0.0.1" == jmespath.search("spec.template.spec.hostAliases[0].ip", docs[0])
assert "foo.local" == jmespath.search("spec.template.spec.hostAliases[0].hostnames[0]", docs[0])

def test_triggerer_template_storage_class_name(self):
docs = render_chart(
values={"triggerer": {"persistence": {"storageClassName": "{{ .Release.Name }}-storage-class"}}},
Expand Down
71 changes: 67 additions & 4 deletions tests/dag_processing/test_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import random
import socket
import sys
import tempfile
import textwrap
import threading
import time
Expand Down Expand Up @@ -638,7 +639,7 @@ def test_scan_stale_dags(self):
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory="directory",
dag_directory=str(TEST_DAG_FOLDER),
max_runs=1,
processor_timeout=timedelta(minutes=10),
signal_conn=MagicMock(),
Expand Down Expand Up @@ -712,11 +713,11 @@ def test_scan_stale_dags_standalone_mode(self):
"""
Ensure only dags from current dag_directory are updated
"""
dag_directory = "directory"
dag_directory = str(TEST_DAG_FOLDER)
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=dag_directory,
dag_directory=TEST_DAG_FOLDER,
max_runs=1,
processor_timeout=timedelta(minutes=10),
signal_conn=MagicMock(),
Expand All @@ -740,7 +741,7 @@ def test_scan_stale_dags_standalone_mode(self):
# Add stale DAG to the DB
other_dag = other_dagbag.get_dag("test_start_date_scheduling")
other_dag.last_parsed_time = timezone.utcnow()
other_dag.sync_to_db(processor_subdir="other")
other_dag.sync_to_db(processor_subdir="/other")

# Add DAG to the file_parsing_stats
stat = DagFileStat(
Expand All @@ -762,6 +763,68 @@ def test_scan_stale_dags_standalone_mode(self):
active_dag_count = session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
assert active_dag_count == 1

def test_scan_stale_dags_when_dag_folder_change(self):
"""
Ensure dags from old dag_folder is marked as stale when dag processor
is running as part of scheduler.
"""

def get_dag_string(filename) -> str:
return open(TEST_DAG_FOLDER / filename).read()

with tempfile.TemporaryDirectory() as tmpdir:
old_dag_home = tempfile.mkdtemp(dir=tmpdir)
old_dag_file = tempfile.NamedTemporaryFile(dir=old_dag_home, suffix=".py")
old_dag_file.write(get_dag_string("test_example_bash_operator.py").encode())
old_dag_file.flush()
new_dag_home = tempfile.mkdtemp(dir=tmpdir)
new_dag_file = tempfile.NamedTemporaryFile(dir=new_dag_home, suffix=".py")
new_dag_file.write(get_dag_string("test_scheduler_dags.py").encode())
new_dag_file.flush()

manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=new_dag_home,
max_runs=1,
processor_timeout=timedelta(minutes=10),
signal_conn=MagicMock(),
dag_ids=[],
pickle_dags=False,
async_mode=True,
),
)

dagbag = DagBag(old_dag_file.name, read_dags_from_db=False)
other_dagbag = DagBag(new_dag_file.name, read_dags_from_db=False)

with create_session() as session:
# Add DAG from old dah home to the DB
dag = dagbag.get_dag("test_example_bash_operator")
dag.fileloc = old_dag_file.name
dag.last_parsed_time = timezone.utcnow()
dag.sync_to_db(processor_subdir=old_dag_home)

# Add DAG from new DAG home to the DB
other_dag = other_dagbag.get_dag("test_start_date_scheduling")
other_dag.fileloc = new_dag_file.name
other_dag.last_parsed_time = timezone.utcnow()
other_dag.sync_to_db(processor_subdir=new_dag_home)

manager.processor._file_paths = [new_dag_file]

active_dag_count = (
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
)
assert active_dag_count == 2

manager.processor._scan_stale_dags()

active_dag_count = (
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
)
assert active_dag_count == 1

@mock.patch(
"airflow.dag_processing.processor.DagFileProcessorProcess.waitable_handle", new_callable=PropertyMock
)
Expand Down
1 change: 1 addition & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3568,6 +3568,7 @@ def test_retry_still_in_executor(self, dag_maker):
dag_id="test_retry_still_in_executor",
schedule="@once",
session=session,
fileloc=os.devnull + "/test_retry_still_in_executor.py",
):
dag_task1 = BashOperator(
task_id="test_retry_handling_op",
Expand Down
10 changes: 9 additions & 1 deletion tests/models/test_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import datetime
import json
from typing import Any, AsyncIterator
from unittest.mock import patch

import pendulum
import pytest
import pytz
from cryptography.fernet import Fernet
Expand Down Expand Up @@ -161,11 +163,15 @@ def test_submit_failure(session, create_task_instance):
(TaskSkippedEvent, "skipped"),
],
)
def test_submit_event_task_end(session, create_task_instance, event_cls, expected):
@patch("airflow.utils.timezone.utcnow")
def test_submit_event_task_end(mock_utcnow, session, create_task_instance, event_cls, expected):
"""
Tests that events inheriting BaseTaskEndEvent *don't* re-wake their dependent
but mark them in the appropriate terminal state and send xcom
"""
now = pendulum.now("UTC")
mock_utcnow.return_value = now

# Make a trigger
trigger = Trigger(classpath="does.not.matter", kwargs={})
trigger.id = 1
Expand Down Expand Up @@ -199,6 +205,8 @@ def get_xcoms(ti):
ti = session.query(TaskInstance).one()
assert ti.state == expected
assert ti.next_kwargs is None
assert ti.end_date == now
assert ti.duration is not None
actual_xcoms = {x.key: x.value for x in get_xcoms(ti)}
assert actual_xcoms == {"return_value": "xcomret", "a": "b", "c": "d"}

Expand Down
22 changes: 20 additions & 2 deletions tests/providers/google/cloud/operators/test_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from pathlib import Path
from unittest import mock

import pendulum
import pytest

from airflow.providers.common.compat.openlineage.facet import (
Expand All @@ -40,6 +41,7 @@
GCSSynchronizeBucketsOperator,
GCSTimeSpanFileTransformOperator,
)
from airflow.timetables.base import DagRunInfo, DataInterval

TASK_ID = "test-gcs-operator"
TEST_BUCKET = "test-bucket"
Expand Down Expand Up @@ -395,7 +397,15 @@ def test_execute(self, mock_hook, mock_subprocess, mock_tempdir):
timespan_start = datetime(2015, 2, 1, 15, 16, 17, 345, tzinfo=timezone.utc)
timespan_end = timespan_start + timedelta(hours=1)
mock_dag = mock.Mock()
mock_dag.following_schedule = lambda x: x + timedelta(hours=1)
mock_dag.next_dagrun_info.side_effect = [
DagRunInfo(
run_after=pendulum.instance(timespan_start),
data_interval=DataInterval(
start=pendulum.instance(timespan_start),
end=pendulum.instance(timespan_end),
),
),
]
mock_ti = mock.Mock()
context = dict(
execution_date=timespan_start,
Expand Down Expand Up @@ -575,7 +585,15 @@ def test_get_openlineage_facets_on_complete(

timespan_start = datetime(2015, 2, 1, 15, 16, 17, 345, tzinfo=timezone.utc)
mock_dag = mock.Mock()
mock_dag.following_schedule = lambda x: x + timedelta(hours=1)
mock_dag.next_dagrun_info.side_effect = [
DagRunInfo(
run_after=pendulum.instance(timespan_start),
data_interval=DataInterval(
start=pendulum.instance(timespan_start),
end=None,
),
),
]
context = dict(
execution_date=timespan_start,
dag=mock_dag,
Expand Down
Loading

0 comments on commit 4c9ac32

Please sign in to comment.