-
Notifications
You must be signed in to change notification settings - Fork 208
/
neo4j_search_data_extractor.py
175 lines (159 loc) · 8.3 KB
/
neo4j_search_data_extractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
import textwrap
from typing import Any
from pyhocon import ConfigTree
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.publisher.neo4j_csv_publisher import JOB_PUBLISH_TAG
class Neo4jSearchDataExtractor(Extractor):
"""
Extractor to fetch data required to support search from Neo4j graph database
Use Neo4jExtractor extractor class
"""
CYPHER_QUERY_CONFIG_KEY = 'cypher_query'
ENTITY_TYPE = 'entity_type'
DEFAULT_NEO4J_TABLE_CYPHER_QUERY = textwrap.dedent(
"""
MATCH (db:Database)<-[:CLUSTER_OF]-(cluster:Cluster)
<-[:SCHEMA_OF]-(schema:Schema)<-[:TABLE_OF]-(table:Table)
{publish_tag_filter}
OPTIONAL MATCH (table)-[:DESCRIPTION]->(table_description:Description)
OPTIONAL MATCH (schema)-[:DESCRIPTION]->(schema_description:Description)
OPTIONAL MATCH (table)-[:DESCRIPTION]->(prog_descs:Programmatic_Description)
WITH db, cluster, schema, schema_description, table, table_description,
COLLECT(prog_descs.description) as programmatic_descriptions
OPTIONAL MATCH (table)-[:TAGGED_BY]->(tags:Tag) WHERE tags.tag_type='default'
WITH db, cluster, schema, schema_description, table, table_description, programmatic_descriptions,
COLLECT(DISTINCT tags.key) as tags
OPTIONAL MATCH (table)-[:HAS_BADGE]->(badges:Badge)
WITH db, cluster, schema, schema_description, table, table_description, programmatic_descriptions, tags,
COLLECT(DISTINCT badges.key) as badges
OPTIONAL MATCH (table)-[read:READ_BY]->(user:User)
WITH db, cluster, schema, schema_description, table, table_description, programmatic_descriptions, tags, badges,
SUM(read.read_count) AS total_usage,
COUNT(DISTINCT user.email) as unique_usage
OPTIONAL MATCH (table)-[:COLUMN]->(col:Column)
OPTIONAL MATCH (col)-[:DESCRIPTION]->(col_description:Description)
WITH db, cluster, schema, schema_description, table, table_description, tags, badges, total_usage, unique_usage,
programmatic_descriptions,
COLLECT(col.name) AS column_names, COLLECT(col_description.description) AS column_descriptions
OPTIONAL MATCH (table)-[:LAST_UPDATED_AT]->(time_stamp:Timestamp)
RETURN db.name as database, cluster.name AS cluster, schema.name AS schema,
schema_description.description AS schema_description,
table.name AS name, table.key AS key, table_description.description AS description,
time_stamp.last_updated_timestamp AS last_updated_timestamp,
column_names,
column_descriptions,
total_usage,
unique_usage,
tags,
badges,
programmatic_descriptions
ORDER BY table.name;
"""
)
DEFAULT_NEO4J_USER_CYPHER_QUERY = textwrap.dedent(
"""
MATCH (user:User)
OPTIONAL MATCH (user)-[read:READ]->(a)
OPTIONAL MATCH (user)-[own:OWNER_OF]->(b)
OPTIONAL MATCH (user)-[follow:FOLLOWED_BY]->(c)
OPTIONAL MATCH (user)-[manage_by:MANAGE_BY]->(manager)
{publish_tag_filter}
with user, a, b, c, read, own, follow, manager
where user.full_name is not null
return user.email as email, user.first_name as first_name, user.last_name as last_name,
user.full_name as full_name, user.github_username as github_username, user.team_name as team_name,
user.employee_type as employee_type, manager.email as manager_email,
user.slack_id as slack_id, user.is_active as is_active, user.role_name as role_name,
REDUCE(sum_r = 0, r in COLLECT(DISTINCT read)| sum_r + r.read_count) AS total_read,
count(distinct b) as total_own,
count(distinct c) AS total_follow
order by user.email
"""
)
DEFAULT_NEO4J_DASHBOARD_CYPHER_QUERY = textwrap.dedent(
"""
MATCH (dashboard:Dashboard)
{publish_tag_filter}
MATCH (dashboard)-[:DASHBOARD_OF]->(dbg:Dashboardgroup)
MATCH (dbg)-[:DASHBOARD_GROUP_OF]->(cluster:Cluster)
OPTIONAL MATCH (dashboard)-[:DESCRIPTION]->(db_descr:Description)
OPTIONAL MATCH (dbg)-[:DESCRIPTION]->(dbg_descr:Description)
OPTIONAL MATCH (dashboard)-[:EXECUTED]->(last_exec:Execution)
WHERE split(last_exec.key, '/')[5] = '_last_successful_execution'
OPTIONAL MATCH (dashboard)-[read:READ_BY]->(user:User)
WITH dashboard, dbg, db_descr, dbg_descr, cluster, last_exec, SUM(read.read_count) AS total_usage
OPTIONAL MATCH (dashboard)-[:HAS_QUERY]->(query:Query)-[:HAS_CHART]->(chart:Chart)
WITH dashboard, dbg, db_descr, dbg_descr, cluster, last_exec, COLLECT(DISTINCT query.name) as query_names,
COLLECT(DISTINCT chart.name) as chart_names,
total_usage
OPTIONAL MATCH (dashboard)-[:TAGGED_BY]->(tags:Tag) WHERE tags.tag_type='default'
WITH dashboard, dbg, db_descr, dbg_descr, cluster, last_exec, query_names, chart_names, total_usage,
COLLECT(DISTINCT tags.key) as tags
OPTIONAL MATCH (dashboard)-[:HAS_BADGE]->(badges:Badge)
WITH dashboard, dbg, db_descr, dbg_descr, cluster, last_exec, query_names, chart_names, total_usage, tags,
COLLECT(DISTINCT badges.key) as badges
RETURN dbg.name as group_name, dashboard.name as name, cluster.name as cluster,
coalesce(db_descr.description, '') as description,
coalesce(dbg.description, '') as group_description, dbg.dashboard_group_url as group_url,
dashboard.dashboard_url as url, dashboard.key as uri,
split(dashboard.key, '_')[0] as product, toInteger(last_exec.timestamp) as last_successful_run_timestamp,
query_names, chart_names, total_usage, tags, badges
order by dbg.name
"""
)
# todo: we will add more once we add more entities
DEFAULT_QUERY_BY_ENTITY = {
'table': DEFAULT_NEO4J_TABLE_CYPHER_QUERY,
'user': DEFAULT_NEO4J_USER_CYPHER_QUERY,
'dashboard': DEFAULT_NEO4J_DASHBOARD_CYPHER_QUERY
}
def init(self, conf: ConfigTree) -> None:
"""
Initialize Neo4jExtractor object from configuration and use that for extraction
"""
self.conf = conf
self.entity = conf.get_string(Neo4jSearchDataExtractor.ENTITY_TYPE, default='table').lower()
# extract cypher query from conf, if specified, else use default query
if Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY in conf:
self.cypher_query = conf.get_string(Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY)
else:
default_query = Neo4jSearchDataExtractor.DEFAULT_QUERY_BY_ENTITY[self.entity]
self.cypher_query = self._add_publish_tag_filter(conf.get_string(JOB_PUBLISH_TAG, ''),
cypher_query=default_query)
self.neo4j_extractor = Neo4jExtractor()
# write the cypher query in configs in Neo4jExtractor scope
key = self.neo4j_extractor.get_scope() + '.' + Neo4jExtractor.CYPHER_QUERY_CONFIG_KEY
self.conf.put(key, self.cypher_query)
# initialize neo4j_extractor from configs
self.neo4j_extractor.init(Scoped.get_scoped_conf(self.conf, self.neo4j_extractor.get_scope()))
def close(self) -> None:
"""
Use close() method specified by neo4j_extractor
to close connection to neo4j cluster
"""
self.neo4j_extractor.close()
def extract(self) -> Any:
"""
Invoke extract() method defined by neo4j_extractor
"""
return self.neo4j_extractor.extract()
def get_scope(self) -> str:
return 'extractor.search_data'
def _add_publish_tag_filter(self, publish_tag: str, cypher_query: str) -> str:
"""
Adds publish tag filter into Cypher query
:param publish_tag: value of publish tag.
:param cypher_query:
:return:
"""
if not publish_tag:
publish_tag_filter = ''
else:
if not hasattr(self, 'entity'):
self.entity = 'table'
publish_tag_filter = f"WHERE {self.entity}.published_tag = '{publish_tag}'"
return cypher_query.format(publish_tag_filter=publish_tag_filter)