Skip to content

Commit

Permalink
Filter dataset dependency data on webserver (#27046)
Browse files Browse the repository at this point in the history
* filter dataset dependency data on webserver

* rename u,v to source,target

(cherry picked from commit 1c9a87e)
  • Loading branch information
bbovenzi authored and ephraimbuddy committed Oct 18, 2022
1 parent 28a0d00 commit f5a60b7
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 24 deletions.
13 changes: 2 additions & 11 deletions airflow/www/static/js/api/useDatasetDependencies.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ const generateGraph = ({ nodes, edges, font }: GenerateProps) => ({
height: 40,
value,
})),
edges: edges.map((e) => ({ id: `${e.u}-${e.v}`, sources: [e.u], targets: [e.v] })),
edges: edges.map((e) => ({ id: `${e.source}-${e.target}`, sources: [e.source], targets: [e.target] })),
});

const formatDependencies = async ({ edges, nodes }: DatasetDependencies) => {
Expand All @@ -81,17 +81,8 @@ const formatDependencies = async ({ edges, nodes }: DatasetDependencies) => {
// get computed style to calculate how large each node should be
const font = `bold ${16}px ${window.getComputedStyle(document.body).fontFamily}`;

// Make sure we only show edges that are connected to two nodes.
const newEdges = edges.filter((e) => {
const edgeNodes = nodes.filter((n) => n.id === e.u || n.id === e.v);
return edgeNodes.length === 2;
});

// Then filter out any nodes without an edge.
const newNodes = nodes.filter((n) => newEdges.some((e) => e.u === n.id || e.v === n.id));

// Finally generate the graph data with elk
const data = await elk.layout(generateGraph({ nodes: newNodes, edges: newEdges, font }));
const data = await elk.layout(generateGraph({ nodes, edges, font }));
return data as Data;
};

Expand Down
4 changes: 2 additions & 2 deletions airflow/www/static/js/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ interface DepNode {
}

interface DepEdge {
u: string;
v: string;
source: string;
target: string;
}

interface DatasetListItem extends API.Dataset {
Expand Down
24 changes: 13 additions & 11 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3526,19 +3526,21 @@ def dataset_dependencies(self):

for dag, dependencies in SerializedDagModel.get_dag_dependencies().items():
dag_node_id = f"dag:{dag}"
if dag_node_id not in nodes_dict:
nodes_dict[dag_node_id] = node_dict(dag_node_id, dag, "dag")

for dep in dependencies:
if dep.node_id not in nodes_dict and (
dep.dependency_type == 'dag' or dep.dependency_type == 'dataset'
):
nodes_dict[dep.node_id] = node_dict(dep.node_id, dep.dependency_id, dep.dependency_type)
edge_tuples.add((f"dag:{dep.source}", dep.node_id))
edge_tuples.add((dep.node_id, f"dag:{dep.target}"))
if dag_node_id not in nodes_dict and len(dependencies) > 0:
for dep in dependencies:
if dep.dependency_type == 'dag' or dep.dependency_type == 'dataset':
nodes_dict[dag_node_id] = node_dict(dag_node_id, dag, 'dag')
if dep.node_id not in nodes_dict:
nodes_dict[dep.node_id] = node_dict(
dep.node_id, dep.dependency_id, dep.dependency_type
)
if dep.source != 'dataset':
edge_tuples.add((f"dag:{dep.source}", dep.node_id))
if dep.target != 'dataset':
edge_tuples.add((dep.node_id, f"dag:{dep.target}"))

nodes = list(nodes_dict.values())
edges = [{"u": u, "v": v} for u, v in edge_tuples]
edges = [{"source": source, "target": target} for source, target in edge_tuples]

data = {
'nodes': nodes,
Expand Down

0 comments on commit f5a60b7

Please sign in to comment.