Skip to content

Commit

Permalink
Fix open-metadata#9777 - Pass XLets as input parameters for lineage r…
Browse files Browse the repository at this point in the history
…unner (open-metadata#9780)

* Pass XLets as input parameters for lineage runner

* Format
  • Loading branch information
pmbrull authored Jan 18, 2023
1 parent a888516 commit 9c237d9
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
get_lineage_config,
)
from airflow_provider_openmetadata.lineage.runner import AirflowLineageRunner
from airflow_provider_openmetadata.lineage.xlets import XLets, get_xlets_from_dag
from metadata.ingestion.ometa.ometa_api import OpenMetadata


Expand Down Expand Up @@ -69,12 +70,13 @@ def send_lineage(
try:
config: AirflowLineageConfig = get_lineage_config()
metadata = OpenMetadata(config.metadata_config)
xlets: XLets = get_xlets_from_dag(context["dag"])

runner = AirflowLineageRunner(
metadata=metadata,
service_name=config.airflow_service_name,
dag=context["dag"],
context=context,
xlets=xlets,
only_keep_dag_lineage=config.only_keep_dag_lineage,
max_status=config.max_status,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from airflow.utils.context import Context

from airflow_provider_openmetadata.lineage.runner import AirflowLineageRunner
from airflow_provider_openmetadata.lineage.xlets import XLets, get_xlets_from_dag
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
Expand Down Expand Up @@ -56,12 +57,13 @@ def execute(self, context: Context) -> None:
and push it to OpenMetadata using the Python Client.
"""
try:
xlets: XLets = get_xlets_from_dag(self.dag)
metadata = OpenMetadata(self.server_config)
runner = AirflowLineageRunner(
metadata=metadata,
service_name=self.service_name,
dag=self.dag,
context=context,
xlets=xlets,
only_keep_dag_lineage=self.only_keep_dag_lineage,
max_status=self.max_status,
)
Expand Down
47 changes: 13 additions & 34 deletions ingestion/src/airflow_provider_openmetadata/lineage/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
OpenMetadata Airflow Provider Lineage Runner
"""
from itertools import groupby
from typing import List, Set
from typing import List, Optional

from airflow.configuration import conf
from pydantic import BaseModel

from airflow_provider_openmetadata.lineage.utils import STATUS_MAP, get_xlets
from airflow_provider_openmetadata.lineage.status import STATUS_MAP
from airflow_provider_openmetadata.lineage.xlets import XLets
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.api.services.createPipelineService import (
Expand Down Expand Up @@ -49,15 +50,6 @@
from metadata.utils.helpers import datetime_to_ts


class XLets(BaseModel):
"""
Group inlets and outlets from all tasks in a DAG
"""

inlets: Set[str]
outlets: Set[str]


class SimpleEdge(BaseModel):
"""
Simple Edge representation with FQN and id
Expand Down Expand Up @@ -89,7 +81,7 @@ def __init__(
metadata: OpenMetadata,
service_name: str,
dag: "DAG",
context: "Context",
xlets: Optional[XLets] = None,
only_keep_dag_lineage: bool = False,
max_status: int = 10,
):
Expand All @@ -99,7 +91,7 @@ def __init__(
self.max_status = max_status

self.dag = dag
self.context = context
self.xlets = xlets

def get_or_create_pipeline_service(self) -> PipelineService:
"""
Expand Down Expand Up @@ -251,20 +243,6 @@ def add_all_pipeline_status(self, pipeline: Pipeline) -> None:
fqn=pipeline.fullyQualifiedName.__root__, status=status
)

def get_xlets(self) -> XLets:
"""
Fill the inlets and outlets of the Pipeline by iterating
over all its tasks
"""
_inlets = set()
_outlets = set()

for task in self.dag.tasks:
_inlets.update(get_xlets(operator=task, xlet_mode="_inlets") or [])
_outlets.update(get_xlets(operator=task, xlet_mode="_outlets") or [])

return XLets(inlets=_inlets, outlets=_outlets)

def add_lineage(self, pipeline: Pipeline, xlets: XLets) -> None:
"""
Add the lineage from inlets and outlets
Expand Down Expand Up @@ -365,10 +343,11 @@ def execute(self):
pipeline = self.create_pipeline_entity(pipeline_service)
self.add_all_pipeline_status(pipeline)

xlets = self.get_xlets()
self.add_lineage(pipeline, xlets)
if self.only_keep_dag_lineage:
self.dag.log.info(
"`only_keep_dag_lineage` is set to True. Cleaning lineage not in inlets or outlets..."
)
self.clean_lineage(pipeline, xlets)
if self.xlets:
self.dag.log.info("Got some xlet data. Processing lineage...")
self.add_lineage(pipeline, self.xlets)
if self.only_keep_dag_lineage:
self.dag.log.info(
"`only_keep_dag_lineage` is set to True. Cleaning lineage not in inlets or outlets..."
)
self.clean_lineage(pipeline, self.xlets)
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
OpenMetadata Airflow Provider utilities
"""

from typing import TYPE_CHECKING, Dict, List, Optional
from typing import TYPE_CHECKING, Dict, List

from metadata.generated.schema.entity.data.pipeline import (
Pipeline,
Expand All @@ -38,46 +38,6 @@
}


def parse_xlets(xlet: List[dict]) -> Optional[List[str]]:
"""
Parse airflow xlets for V1
:param xlet: airflow v2 xlet dict
:return: table list or None
"""
if len(xlet) and isinstance(xlet[0], dict):
tables = xlet[0].get("tables")
if tables and isinstance(tables, list):
return tables

return None


def get_xlets(
operator: "BaseOperator", xlet_mode: str = "_inlets"
) -> Optional[List[str]]:
"""
Given an Airflow DAG Task, obtain the tables
set in inlets or outlets.
We expect xlets to have the following structure:
[{'tables': ['FQN']}]
:param operator: task to get xlets from
:param xlet_mode: get inlet or outlet
:return: list of tables FQN
"""
xlet = getattr(operator, xlet_mode)
tables = parse_xlets(xlet)

if not tables:
operator.log.debug(f"Not finding proper {xlet_mode} in task {operator.task_id}")

else:
operator.log.info(f"Found {xlet_mode} {tables} in task {operator.task_id}")

return tables


def get_dag_status(all_tasks: List[str], task_status: List[TaskStatus]):
"""
Based on the task information and the total DAG tasks, cook the
Expand Down
85 changes: 85 additions & 0 deletions ingestion/src/airflow_provider_openmetadata/lineage/xlets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Handle Airflow inlets and outlets
"""
from typing import List, Optional, Set

from pydantic import BaseModel


class XLets(BaseModel):
"""
Group inlets and outlets from all tasks in a DAG
"""

inlets: Set[str]
outlets: Set[str]


def parse_xlets(xlet: List[dict]) -> Optional[List[str]]:
"""
Parse airflow xlets for V1
:param xlet: airflow v2 xlet dict
:return: table list or None
"""
if len(xlet) and isinstance(xlet[0], dict):
tables = xlet[0].get("tables")
if tables and isinstance(tables, list):
return tables

return None


def get_xlets_from_operator(
operator: "BaseOperator", xlet_mode: str = "_inlets"
) -> Optional[List[str]]:
"""
Given an Airflow DAG Task, obtain the tables
set in inlets or outlets.
We expect xlets to have the following structure:
[{'tables': ['FQN']}]
:param operator: task to get xlets from
:param xlet_mode: get inlet or outlet
:return: list of tables FQN
"""
xlet = getattr(operator, xlet_mode)
tables = parse_xlets(xlet)

if not tables:
operator.log.debug(f"Not finding proper {xlet_mode} in task {operator.task_id}")

else:
operator.log.info(f"Found {xlet_mode} {tables} in task {operator.task_id}")

return tables


def get_xlets_from_dag(dag: "DAG") -> XLets:
"""
Fill the inlets and outlets of the Pipeline by iterating
over all its tasks
"""
_inlets = set()
_outlets = set()

for task in dag.tasks:
_inlets.update(
get_xlets_from_operator(operator=task, xlet_mode="_inlets") or []
)
_outlets.update(
get_xlets_from_operator(operator=task, xlet_mode="_outlets") or []
)

return XLets(inlets=_inlets, outlets=_outlets)

0 comments on commit 9c237d9

Please sign in to comment.