Skip to content

Commit

Permalink
Refactor telemetry to collect events during DAG run and not during DA…
Browse files Browse the repository at this point in the history
…G parsing (#300)

DAG Factory 0.20 started collecting telemetry as part of the PR
#250. However, one
limitation of this initial implementation is that it emitted telemetry
every time DAGs were parsed. This means that the data collected did not
represent the actual usage and was proportional to the number of times a
DAG was parsed. This PR aims to address this limitation by changing DAG
Factory to emit telemetry during DAG runs.

This implementation leverages Airflow listeners to only emit events
after a Factory-Built DAG is run.

Closes: #282

With this data, we can get the following insight
- Number of failed DagRuns
- Number of successful DagRuns
- Total tasks associated to each DagRun
- DagRun hash

**Airflow Version**
<img width="1380" alt="Screenshot 2024-12-03 at 8 00 14 PM"
src="https://github.com/user-attachments/assets/b67968a2-171a-4f4b-83cc-e3bba13ef35b">

**DAG Hash**
<img width="1386" alt="Screenshot 2024-12-03 at 8 01 28 PM"
src="https://github.com/user-attachments/assets/fea64d40-7e71-4714-b0e4-2fdb09331962">

**DAG Factory Version**
<img width="1382" alt="Screenshot 2024-12-03 at 8 02 08 PM"
src="https://github.com/user-attachments/assets/4b9de161-32f3-4c85-a740-814d86526f60">

**Event Type**
<img width="1392" alt="Screenshot 2024-12-03 at 8 02 46 PM"
src="https://github.com/user-attachments/assets/e7a5b795-f54a-4d12-9676-274b98584b4a">

**Platform Machine**
<img width="1384" alt="Screenshot 2024-12-03 at 8 03 18 PM"
src="https://github.com/user-attachments/assets/35ede730-e87a-4d6e-acf9-47e34147e331">

**Platform System**
<img width="1380" alt="Screenshot 2024-12-03 at 8 04 23 PM"
src="https://github.com/user-attachments/assets/9f59dc92-51ae-4c9d-9958-1cc36c4a9149">

**Python Version**
<img width="1389" alt="Screenshot 2024-12-03 at 8 05 05 PM"
src="https://github.com/user-attachments/assets/085f51ad-1e0a-4785-9744-a0b713d9a267">


**DAG Run Status**
<img width="1408" alt="Screenshot 2024-12-03 at 8 06 45 PM"
src="https://github.com/user-attachments/assets/1c47bb61-cfeb-49d9-9d93-5a43240c9b51">

**Task Count in DAG run**
<img width="1394" alt="Screenshot 2024-12-03 at 8 07 25 PM"
src="https://github.com/user-attachments/assets/22c011de-34b4-48cb-8c09-78571d3b6229">

** Telemetry Version**
<img width="1395" alt="Screenshot 2024-12-03 at 8 07 58 PM"
src="https://github.com/user-attachments/assets/4c2d6006-72c1-47a2-9cde-6802923dedba">
  • Loading branch information
pankajastro authored Dec 3, 2024
1 parent 9d2b8f5 commit 72bc85b
Show file tree
Hide file tree
Showing 13 changed files with 100 additions and 62 deletions.
7 changes: 4 additions & 3 deletions PRIVACY_NOTICE.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ In addition to Scarf's default data collection, DAG Factory collects the followi
- Python version
- Operating system & machine architecture
- Event type
- Number of DAGs
- Number of TaskGroups
- Number of Tasks
- Number of failed DagRuns
- Number of successful DagRuns
- Total tasks associated to each DagRun
- Dag hash

No user-identifiable information (IP included) is stored in Scarf.
4 changes: 2 additions & 2 deletions dagfactory/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
TELEMETRY_URL = "https://astronomer.gateway.scarf.sh/dag-factory/{telemetry_version}/{dagfactory_version}/{airflow_version}/{python_version}/{platform_system}/{platform_machine}?{query_string}"
TELEMETRY_VERSION = "v1"
TELEMETRY_URL = "https://astronomer.gateway.scarf.sh/dag-factory/{telemetry_version}/{dagfactory_version}/{airflow_version}/{python_version}/{platform_system}/{platform_machine}/{event_type}/{status}/{dag_hash}/{task_count}"
TELEMETRY_VERSION = "v2"
TELEMETRY_TIMEOUT = 5.0
11 changes: 4 additions & 7 deletions dagfactory/dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ def __init__(
self.dag_name: str = dag_name
self.dag_config: Dict[str, Any] = deepcopy(dag_config)
self.default_config: Dict[str, Any] = deepcopy(default_config)
self.tasks_count: int = 0
self.taskgroups_count: int = 0
self._yml_dag = yml_dag

# pylint: disable=too-many-branches,too-many-statements
Expand Down Expand Up @@ -807,19 +805,18 @@ def build(self) -> Dict[str, Union[str, DAG]]:
else:
dag.doc_md += f"\n{subtitle}\n```yaml\n{self._yml_dag}\n```"

# tags parameter introduced in Airflow 1.10.8
if version.parse(AIRFLOW_VERSION) >= version.parse("1.10.8"):
dag.tags = dag_params.get("tags", None)
tags = dag_params.get("tags", [])
if "dagfactory" not in tags:
tags.append("dagfactory")
dag.tags = tags

tasks: Dict[str, Dict[str, Any]] = dag_params["tasks"]
self.tasks_count = len(tasks)

# add a property to mark this dag as an auto-generated on
dag.is_dagfactory_auto_generated = True

# create dictionary of task groups
task_groups_dict: Dict[str, "TaskGroup"] = self.make_task_groups(dag_params.get("task_groups", {}), dag)
self.taskgroups_count = len(task_groups_dict)

# create dictionary to track tasks and set dependencies
tasks_dict: Dict[str, BaseOperator] = {}
Expand Down
22 changes: 0 additions & 22 deletions dagfactory/dagfactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from airflow.configuration import conf as airflow_conf
from airflow.models import DAG

from dagfactory import telemetry
from dagfactory.dagbuilder import DagBuilder
from dagfactory.exceptions import DagFactoryConfigException, DagFactoryException

Expand All @@ -30,9 +29,6 @@ class DagFactory:
"""

def __init__(self, config_filepath: Optional[str] = None, config: Optional[dict] = None) -> None:
self.dags_count: int = 0
self.tasks_count: int = 0
self.taskgroups_count: int = 0
assert bool(config_filepath) ^ bool(config), "Either `config_filepath` or `config` should be provided"
if config_filepath:
DagFactory._validate_config_filepath(config_filepath=config_filepath)
Expand Down Expand Up @@ -132,21 +128,9 @@ def build_dags(self) -> Dict[str, DAG]:
dags[dag["dag_id"]]: DAG = dag["dag"]
except Exception as err:
raise DagFactoryException(f"Failed to generate dag {dag_name}. verify config is correct") from err
else:
self.dags_count += 1
self.taskgroups_count += dag_builder.taskgroups_count
self.tasks_count += dag_builder.tasks_count

return dags

def emit_telemetry(self, event_type: str) -> None:
additional_telemetry_metrics = {
"dags_count": self.dags_count,
"tasks_count": self.tasks_count,
"taskgroups_count": self.taskgroups_count,
}
telemetry.emit_usage_metrics_if_enabled(event_type, additional_telemetry_metrics)

# pylint: disable=redefined-builtin
@staticmethod
def register_dags(dags: Dict[str, DAG], globals: Dict[str, Any]) -> None:
Expand All @@ -168,7 +152,6 @@ def generate_dags(self, globals: Dict[str, Any]) -> None:
"""
dags: Dict[str, Any] = self.build_dags()
self.register_dags(dags, globals)
self.emit_telemetry("generate_dags")

def clean_dags(self, globals: Dict[str, Any]) -> None:
"""
Expand All @@ -192,10 +175,6 @@ def clean_dags(self, globals: Dict[str, Any]) -> None:
for dag_to_remove in dags_to_remove:
del globals[dag_to_remove]

self.emit_telemetry("clean_dags")

# pylint: enable=redefined-builtin


def load_yaml_dags(
globals_dict: Dict[str, Any],
Expand Down Expand Up @@ -229,5 +208,4 @@ def load_yaml_dags(
except Exception: # pylint: disable=broad-except
logging.exception("Failed to load dag from %s", config_file_path)
else:
factory.emit_telemetry("load_yaml_dags")
logging.info("DAG loaded: %s", config_file_path)
Empty file.
49 changes: 49 additions & 0 deletions dagfactory/listeners/runtime_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from __future__ import annotations

from airflow.listeners import hookimpl
from airflow.models.dag import DAG
from airflow.models.dagrun import DagRun

from dagfactory import telemetry


class EventStatus:
SUCCESS = "success"
FAILED = "failed"


DAG_RUN = "dag_run"


def is_dagfactory_dag(dag: DAG | None = None):
if "dagfactory" in dag.tags:
return True
return False


@hookimpl
def on_dag_run_success(dag_run: DagRun, msg: str):
dag = dag_run.get_dag()
if not is_dagfactory_dag(dag):
return
additional_telemetry_metrics = {
"dag_hash": dag_run.dag_hash,
"status": EventStatus.SUCCESS,
"task_count": len(dag.task_ids),
}

telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics)


@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
dag = dag_run.get_dag()
if not is_dagfactory_dag(dag):
return
additional_telemetry_metrics = {
"dag_hash": dag_run.dag_hash,
"status": EventStatus.FAILED,
"task_count": len(dag.task_ids),
}

telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics)
11 changes: 11 additions & 0 deletions dagfactory/plugin/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from airflow.plugins_manager import AirflowPlugin

from dagfactory.listeners import runtime_event


class DagFactoryPlugin(AirflowPlugin):
name = "Dag Factory Plugin"
listeners = [runtime_event]


dagfactory_plugin = DagFactoryPlugin()
8 changes: 5 additions & 3 deletions dagfactory/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
import platform
from urllib import parse
from urllib.parse import urlencode

import httpx
Expand All @@ -24,7 +25,7 @@ def collect_standard_usage_metrics() -> dict[str, object]:
"""
metrics = {
"dagfactory_version": dagfactory.__version__,
"airflow_version": airflow_version,
"airflow_version": parse.quote(airflow_version),
"python_version": platform.python_version(),
"platform_system": platform.system(),
"platform_machine": platform.machine(),
Expand All @@ -44,7 +45,7 @@ def emit_usage_metrics(metrics: dict[str, object]) -> bool:
**metrics, telemetry_version=constants.TELEMETRY_VERSION, query_string=query_string
)
logging.debug("Telemetry is enabled. Emitting the following usage metrics to %s: %s", telemetry_url, metrics)
response = httpx.get(telemetry_url, timeout=constants.TELEMETRY_TIMEOUT)
response = httpx.get(telemetry_url, timeout=constants.TELEMETRY_TIMEOUT, follow_redirects=True)
if not response.is_success:
logging.warning(
"Unable to emit usage metrics to %s. Status code: %s. Message: %s",
Expand All @@ -64,8 +65,9 @@ def emit_usage_metrics_if_enabled(event_type: str, additional_metrics: dict[str,
"""
if should_emit():
metrics = collect_standard_usage_metrics()
metrics["type"] = event_type
metrics["event_type"] = event_type
metrics["variables"].update(additional_metrics)
metrics.update(additional_metrics)
is_success = emit_usage_metrics(metrics)
return is_success
else:
Expand Down
6 changes: 3 additions & 3 deletions dev/dags/example_dag_factory.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
default:
default_args:
catchup: false,
owner: "default_owner"
start_date: 2018-03-01
end_date: 2018-03-05
start_date: 2024-11-11
retries: 1
retry_delay_sec: 300
retry_delay_sec: 30
on_success_callback_name: print_hello_from_callback
on_success_callback_file: $CONFIG_ROOT_DIR/print_hello.py
concurrency: 1
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ Source = "https://github.com/astronomer/dag-factory"
[tool.hatch.version]
path = "dagfactory/__init__.py"

[project.entry-points."airflow.plugins"]
dagfactory = "dagfactory.plugin:DagFactoryPlugin"

[tool.hatch.build]
sources = ["."]

Expand Down
2 changes: 1 addition & 1 deletion tests/test_dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ def test_build():
assert len(actual["dag"].tasks) == 3
assert actual["dag"].task_dict["task_1"].downstream_task_ids == {"task_2", "task_3"}
if version.parse(AIRFLOW_VERSION) >= version.parse("1.10.8"):
assert actual["dag"].tags == ["tag1", "tag2"]
assert actual["dag"].tags == ["tag1", "tag2", "dagfactory"]


def test_get_dag_params_dag_with_task_group():
Expand Down
15 changes: 0 additions & 15 deletions tests/test_dagfactory.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import datetime
import logging
import os
from unittest.mock import patch

import pytest
from airflow import __version__ as AIRFLOW_VERSION
Expand Down Expand Up @@ -459,20 +458,6 @@ def test_load_invalid_yaml_logs_error(caplog):
assert caplog.messages == ["Failed to load dag from tests/fixtures/invalid_yaml.yml"]


@patch("dagfactory.telemetry.emit_usage_metrics_if_enabled")
def test_load_yaml_dags_succeed(mock_emit_usage_metrics_if_enabled):
load_yaml_dags(
globals_dict=globals(),
dags_folder="tests/fixtures",
suffix=["dag_factory_variables_as_arguments.yml"],
)

# Confirm the representative telemetry for all the DAGs defined in the desired YAML is being sent
args = mock_emit_usage_metrics_if_enabled.call_args.args
assert args[0] == "load_yaml_dags"
assert args[1] == {"dags_count": 2, "tasks_count": 4, "taskgroups_count": 0}


def test_load_yaml_dags_default_suffix_succeed(caplog):
caplog.set_level(logging.INFO)
load_yaml_dags(
Expand Down
24 changes: 18 additions & 6 deletions tests/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,19 @@ def test_emit_usage_metrics_fails(mock_httpx_get, caplog):
"python_version": "3.11",
"platform_system": "darwin",
"platform_machine": "amd64",
"variables": {"a": 1, "b": 2},
"event_type": "dag_run",
"status": "success",
"dag_hash": "d151d1fa2f03270ea116cc7494f2c591",
"task_count": 3,
}
is_success = telemetry.emit_usage_metrics(sample_metrics)
mock_httpx_get.assert_called_once_with(
"https://astronomer.gateway.scarf.sh/dag-factory/v1/0.2.0a1/2.10.1/3.11/darwin/amd64?dagfactory_version=0.2.0a1&airflow_version=2.10.1&python_version=3.11&platform_system=darwin&platform_machine=amd64&variables=%7B%27a%27%3A+1%2C+%27b%27%3A+2%7D",
f"""https://astronomer.gateway.scarf.sh/dag-factory/v2/0.2.0a1/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/3""",
timeout=5.0,
follow_redirects=True,
)
assert not is_success
log_msg = "Unable to emit usage metrics to https://astronomer.gateway.scarf.sh/dag-factory/v1/0.2.0a1/2.10.1/3.11/darwin/amd64?dagfactory_version=0.2.0a1&airflow_version=2.10.1&python_version=3.11&platform_system=darwin&platform_machine=amd64&variables=%7B%27a%27%3A+1%2C+%27b%27%3A+2%7D. Status code: 404. Message: Non existent URL"
log_msg = f"""Unable to emit usage metrics to https://astronomer.gateway.scarf.sh/dag-factory/v2/0.2.0a1/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/3. Status code: 404. Message: Non existent URL"""
assert caplog.text.startswith("WARNING")
assert log_msg in caplog.text

Expand All @@ -74,7 +78,10 @@ def test_emit_usage_metrics_succeeds(caplog):
"python_version": "3.11",
"platform_system": "darwin",
"platform_machine": "amd64",
"variables": {"a": 1, "b": 2},
"event_type": "dag_run",
"status": "success",
"dag_hash": "d151d1fa2f03270ea116cc7494f2c591",
"task_count": 3,
}
is_success = telemetry.emit_usage_metrics(sample_metrics)
assert is_success
Expand All @@ -91,11 +98,16 @@ def test_emit_usage_metrics_if_enabled_fails(mock_should_emit, caplog):


@patch("dagfactory.telemetry.should_emit", return_value=True)
@patch("dagfactory.telemetry.collect_standard_usage_metrics", return_value={"k1": "v1", "variables": {}})
@patch("dagfactory.telemetry.collect_standard_usage_metrics", return_value={"k1": "v1", "k2": "v2", "variables": {}})
@patch("dagfactory.telemetry.emit_usage_metrics")
def test_emit_usage_metrics_if_enabled_succeeds(
mock_emit_usage_metrics, mock_collect_standard_usage_metrics, mock_should_emit
):
assert telemetry.emit_usage_metrics_if_enabled("any", {"k2": "v2"})
mock_emit_usage_metrics.assert_called_once()
assert mock_emit_usage_metrics.call_args.args[0] == {"k1": "v1", "variables": {"k2": "v2"}, "type": "any"}
assert mock_emit_usage_metrics.call_args.args[0] == {
"k1": "v1",
"k2": "v2",
"event_type": "any",
"variables": {"k2": "v2"},
}

0 comments on commit 72bc85b

Please sign in to comment.