forked from amundsen-io/amundsendatabuilder
-
Notifications
You must be signed in to change notification settings - Fork 0
/
tableau_dashboard_table_extractor.py
160 lines (136 loc) · 6.87 KB
/
tableau_dashboard_table_extractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
import logging
from typing import (
Any, Dict, Iterator,
)
from pyhocon import ConfigFactory, ConfigTree
import databuilder.extractor.dashboard.tableau.tableau_dashboard_constants as const
from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.dashboard.tableau.tableau_dashboard_utils import (
TableauDashboardUtils, TableauGraphQLApiExtractor,
)
from databuilder.extractor.restapi.rest_api_extractor import STATIC_RECORD_DICT
from databuilder.models.table_metadata import TableMetadata
from databuilder.transformer.base_transformer import ChainedTransformer
from databuilder.transformer.dict_to_model import MODEL_CLASS, DictToModel
LOGGER = logging.getLogger(__name__)
class TableauGraphQLDashboardTableExtractor(TableauGraphQLApiExtractor):
"""
Implements the extraction-time logic for parsing the GraphQL result and transforming into a dict
that fills the DashboardTable model. Allows workbooks to be exlcuded based on their project.
"""
CLUSTER = const.CLUSTER
DATABASE = const.DATABASE
EXCLUDED_PROJECTS = const.EXCLUDED_PROJECTS
EXTERNAL_CLUSTER_NAME = const.EXTERNAL_CLUSTER_NAME
def execute(self) -> Iterator[Dict[str, Any]]:
response = self.execute_query()
workbooks_data = [workbook for workbook in response['workbooks']
if workbook['projectName'] not in
self._conf.get_list(TableauGraphQLDashboardTableExtractor.EXCLUDED_PROJECTS, [])]
for workbook in workbooks_data:
data = {
'dashboard_group_id': workbook['projectName'],
'dashboard_id': TableauDashboardUtils.sanitize_workbook_name(workbook['name']),
'cluster': self._conf.get_string(TableauGraphQLDashboardTableExtractor.CLUSTER),
'table_ids': []
}
for table in workbook['upstreamTables']:
# external tables have no schema, so they must be parsed differently
# see TableauExternalTableExtractor for more specifics
if table['schema'] != '':
cluster = self._conf.get_string(TableauGraphQLDashboardTableExtractor.CLUSTER)
database = self._conf.get_string(TableauGraphQLDashboardTableExtractor.DATABASE)
# Tableau sometimes incorrectly assigns the "schema" value
# based on how the datasource connection is used in a workbook.
# It will hide the real schema inside the table name, like "real_schema.real_table",
# and set the "schema" value to "wrong_schema". In every case discovered so far, the schema
# key is incorrect, so the "inner" schema from the table name is used instead.
if '.' in table['name']:
schema, name = table['name'].split('.')
else:
schema, name = table['schema'], table['name']
schema = TableauDashboardUtils.sanitize_schema_name(schema)
name = TableauDashboardUtils.sanitize_table_name(name)
else:
cluster = self._conf.get_string(TableauGraphQLDashboardTableExtractor.EXTERNAL_CLUSTER_NAME)
database = TableauDashboardUtils.sanitize_database_name(
table['database']['connectionType']
)
schema = TableauDashboardUtils.sanitize_schema_name(table['database']['name'])
name = TableauDashboardUtils.sanitize_table_name(table['name'])
table_id = TableMetadata.TABLE_KEY_FORMAT.format(
db=database,
cluster=cluster,
schema=schema,
tbl=name,
)
data['table_ids'].append(table_id)
yield data
class TableauDashboardTableExtractor(Extractor):
"""
Extracts metadata about the tables associated with Tableau workbooks.
It can handle both "regular" database tables as well as "external" tables
(see TableauExternalTableExtractor for more info on external tables).
Assumes that all the nodes for both the dashboards and the tables have already been created.
"""
API_BASE_URL = const.API_BASE_URL
API_VERSION = const.API_VERSION
CLUSTER = const.CLUSTER
DATABASE = const.DATABASE
EXCLUDED_PROJECTS = const.EXCLUDED_PROJECTS
EXTERNAL_CLUSTER_NAME = const.EXTERNAL_CLUSTER_NAME
SITE_NAME = const.SITE_NAME
TABLEAU_ACCESS_TOKEN_NAME = const.TABLEAU_ACCESS_TOKEN_NAME
TABLEAU_ACCESS_TOKEN_SECRET = const.TABLEAU_ACCESS_TOKEN_SECRET
VERIFY_REQUEST = const.VERIFY_REQUEST
def init(self, conf: ConfigTree) -> None:
self._conf = conf
self.query = """query {
workbooks {
name
projectName
upstreamTables {
name
schema
database {
name
connectionType
}
}
}
}"""
self._extractor = self._build_extractor()
transformers = []
dict_to_model_transformer = DictToModel()
dict_to_model_transformer.init(
conf=Scoped.get_scoped_conf(self._conf, dict_to_model_transformer.get_scope()).with_fallback(
ConfigFactory.from_dict(
{MODEL_CLASS: 'databuilder.models.dashboard.dashboard_table.DashboardTable'})))
transformers.append(dict_to_model_transformer)
self._transformer = ChainedTransformer(transformers=transformers)
def extract(self) -> Any:
record = self._extractor.extract()
if not record:
return None
return next(self._transformer.transform(record=record), None)
def get_scope(self) -> str:
return 'extractor.tableau_dashboard_table'
def _build_extractor(self) -> TableauGraphQLDashboardTableExtractor:
"""
Builds a TableauGraphQLDashboardTableExtractor. All data required can be retrieved with a single GraphQL call.
:return: A TableauGraphQLDashboardTableExtractor that creates dashboard <> table relationships.
"""
extractor = TableauGraphQLDashboardTableExtractor()
tableau_extractor_conf = \
Scoped.get_scoped_conf(self._conf, extractor.get_scope())\
.with_fallback(self._conf)\
.with_fallback(ConfigFactory.from_dict({TableauGraphQLApiExtractor.QUERY: self.query,
STATIC_RECORD_DICT: {'product': 'tableau'}
}
)
)
extractor.init(conf=tableau_extractor_conf)
return extractor