Skip to content

Commit

Permalink
Add button to toggle datasets on/off in dag graph (#41200)
Browse files Browse the repository at this point in the history
  • Loading branch information
bbovenzi authored Aug 5, 2024
1 parent cc16f48 commit d6bc1f6
Showing 1 changed file with 76 additions and 59 deletions.
135 changes: 76 additions & 59 deletions airflow/www/static/js/dag/details/graph/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/

import React, { useRef, useState, useEffect, useMemo } from "react";
import { Box, useTheme, Select, Text } from "@chakra-ui/react";
import { Box, useTheme, Select, Text, Switch, Flex } from "@chakra-ui/react";
import ReactFlow, {
ReactFlowProvider,
Controls,
Expand Down Expand Up @@ -138,6 +138,7 @@ const Graph = ({ openGroupIds, onToggleGroups, hoveredTaskState }: Props) => {
const [hasRendered, setHasRendered] = useState(false);
const [isZoomedOut, setIsZoomedOut] = useState(false);
const { selected } = useSelection();
const [showDatasets, setShowDatasets] = useState(true);

const {
data: { dagRuns, groups },
Expand All @@ -149,89 +150,91 @@ const Graph = ({ openGroupIds, onToggleGroups, hoveredTaskState }: Props) => {
setArrange(data?.arrange || "LR");
}, [data?.arrange]);

const { nodes: datasetsNodes, edges: datasetEdges } = getUpstreamDatasets(
dagDetails.datasetExpression as DatasetExpression,
data?.nodes?.children ? data.nodes.children[0].id : ""
);
const { data: datasetsCollection } = useDatasets({
dagIds: [dagId],
});

let datasetNodes: DepNode[] = [];
let datasetEdges: WebserverEdge[] = [];

const { nodes: upstreamDatasetNodes, edges: upstreamDatasetEdges } =
getUpstreamDatasets(
dagDetails.datasetExpression as DatasetExpression,
data?.nodes?.children ? data.nodes.children[0].id : ""
);

const {
data: { datasetEvents: upstreamDatasetEvents = [] },
} = useUpstreamDatasetEvents({
dagId,
dagRunId: selected.runId || "",
options: { enabled: !!datasetsNodes.length && !!selected.runId },
options: {
enabled:
!!upstreamDatasetNodes.length && !!selected.runId && showDatasets,
},
});

const {
data: { datasetEvents: downstreamDatasetEvents = [] },
} = useDatasetEvents({
sourceDagId: dagId,
sourceRunId: selected.runId || undefined,
options: { enabled: !!selected.runId },
options: { enabled: !!selected.runId && showDatasets },
});

const { data: datasetsCollection } = useDatasets({
dagIds: [dagId],
});

const downstreamDatasetNodes: DepNode[] = [];
const downstreamDatasetEdges: WebserverEdge[] = [];
if (showDatasets) {
datasetNodes = [...upstreamDatasetNodes];
datasetEdges = [...upstreamDatasetEdges];
datasetsCollection?.datasets?.forEach((dataset) => {
const producingTask = dataset?.producingTasks?.find(
(t) => t.dagId === dagId
);
if (dataset.uri) {
// check that the task is in the graph
if (
producingTask?.taskId &&
getTask({ taskId: producingTask?.taskId, task: groups })
) {
datasetEdges.push({
sourceId: producingTask.taskId,
targetId: dataset.uri,
});
datasetNodes.push({
id: dataset.uri,
value: {
class: "dataset",
label: dataset.uri,
},
});
}
}
});

datasetsCollection?.datasets?.forEach((dataset) => {
const producingTask = dataset?.producingTasks?.find(
(t) => t.dagId === dagId
);
if (dataset.uri) {
// check that the task is in the graph
if (
producingTask?.taskId &&
getTask({ taskId: producingTask?.taskId, task: groups })
) {
downstreamDatasetEdges.push({
sourceId: producingTask.taskId,
targetId: dataset.uri,
// Check if there is a dataset event even though we did not find a dataset
downstreamDatasetEvents.forEach((de) => {
const hasNode = datasetNodes.find((node) => node.id === de.datasetUri);
if (!hasNode && de.sourceTaskId && de.datasetUri) {
datasetEdges.push({
sourceId: de.sourceTaskId,
targetId: de.datasetUri,
});
downstreamDatasetNodes.push({
id: dataset.uri,
datasetNodes.push({
id: de.datasetUri,
value: {
class: "dataset",
label: dataset.uri,
label: de.datasetUri,
},
});
}
}
});

// Check if there is a dataset event even though we did not find a dataset
downstreamDatasetEvents.forEach((de) => {
const hasNode = downstreamDatasetNodes.find(
(node) => node.id === de.datasetUri
);
if (!hasNode && de.sourceTaskId && de.datasetUri) {
downstreamDatasetEdges.push({
sourceId: de.sourceTaskId,
targetId: de.datasetUri,
});
downstreamDatasetNodes.push({
id: de.datasetUri,
value: {
class: "dataset",
label: de.datasetUri,
},
});
}
});
});
}

const { data: graphData } = useGraphLayout({
edges: [...(data?.edges || []), ...datasetEdges, ...downstreamDatasetEdges],
edges: [...(data?.edges || []), ...datasetEdges],
nodes: data?.nodes
? {
...data.nodes,
children: [
...(data?.nodes.children || []),
...datasetsNodes,
...downstreamDatasetNodes,
],
children: [...(data?.nodes.children || []), ...datasetNodes],
}
: data?.nodes,
openGroupIds,
Expand Down Expand Up @@ -331,11 +334,25 @@ const Graph = ({ openGroupIds, onToggleGroups, hoveredTaskState }: Props) => {
}}
>
<Panel position="top-right">
<Box bg="#ffffffdd" p={1}>
<Text>Layout:</Text>
<Box bg={colors.whiteAlpha[800]} p={1}>
{!!datasetsCollection?.datasets?.length && (
<Flex display="flex" alignItems="center">
<Text fontSize="sm" mr={1}>
Show datasets:
</Text>
<Switch
id="show-datasets"
isChecked={showDatasets}
onChange={() => setShowDatasets(!showDatasets)}
/>
</Flex>
)}
<Text fontSize="sm">Layout:</Text>
<Select
value={arrange}
onChange={(e) => setArrange(e.target.value)}
fontSize="sm"
size="sm"
>
<option value="LR">Left -&gt; Right</option>
<option value="RL">Right -&gt; Left</option>
Expand Down

0 comments on commit d6bc1f6

Please sign in to comment.