-
Notifications
You must be signed in to change notification settings - Fork 208
/
postgres_sample_dag.py
158 lines (131 loc) · 6.79 KB
/
postgres_sample_dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
import uuid
from datetime import datetime, timedelta
from airflow import DAG # noqa
from airflow import macros # noqa
from airflow.operators.python_operator import PythonOperator # noqa
from elasticsearch import Elasticsearch
from pyhocon import ConfigFactory
from databuilder.extractor.neo4j_extractor import Neo4jExtractor
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor
from databuilder.extractor.postgres_metadata_extractor import PostgresMetadataExtractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.job.job import DefaultJob
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader
from databuilder.publisher import neo4j_csv_publisher
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher
from databuilder.task.task import DefaultTask
from databuilder.transformer.base_transformer import NoopTransformer
dag_args = {
'concurrency': 10,
# One dagrun at a time
'max_active_runs': 1,
# 4AM, 4PM PST
'schedule_interval': '0 11 * * *',
'catchup': False
}
default_args = {
'owner': 'amundsen',
'start_date': datetime(2018, 6, 18),
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'priority_weight': 10,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(minutes=120)
}
# NEO4J cluster endpoints
NEO4J_ENDPOINT = 'bolt://neo4j:7687'
neo4j_endpoint = NEO4J_ENDPOINT
neo4j_user = 'neo4j'
neo4j_password = 'test'
es = Elasticsearch([
{'host': 'elasticsearch'},
])
# TODO: user provides a list of schema for indexing
SUPPORTED_SCHEMAS = ['public']
# String format - ('schema1', schema2', .... 'schemaN')
SUPPORTED_SCHEMA_SQL_IN_CLAUSE = "('{schemas}')".format(schemas="', '".join(SUPPORTED_SCHEMAS))
OPTIONAL_TABLE_NAMES = ''
def connection_string():
user = 'user'
password = 'password'
host = 'host.docker.internal'
port = '5432'
db = 'moviesdemo'
return "postgresql://%s:%s@%s:%s/%s" % (user, password, host, port, db)
def create_table_extract_job():
where_clause_suffix = f'where table_schema in {SUPPORTED_SCHEMA_SQL_IN_CLAUSE}'
tmp_folder = '/var/tmp/amundsen/table_metadata'
node_files_folder = f'{tmp_folder}/nodes/'
relationship_files_folder = f'{tmp_folder}/relationships/'
job_config = ConfigFactory.from_dict({
f'extractor.postgres_metadata.{PostgresMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY}': where_clause_suffix,
f'extractor.postgres_metadata.{PostgresMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME}': True,
f'extractor.postgres_metadata.extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}': connection_string(),
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}': node_files_folder,
f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}': relationship_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}': node_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}': relationship_files_folder,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}': neo4j_endpoint,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}': neo4j_user,
f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}': neo4j_password,
f'publisher.neo4j.{neo4j_csv_publisher.JOB_PUBLISH_TAG}': 'unique_tag', # should use unique tag here like {ds}
})
job = DefaultJob(conf=job_config,
task=DefaultTask(extractor=PostgresMetadataExtractor(), loader=FsNeo4jCSVLoader()),
publisher=Neo4jCsvPublisher())
job.launch()
def create_es_publisher_sample_job():
# loader saves data to this location and publisher reads it from here
extracted_search_data_path = '/var/tmp/amundsen/search_data.json'
task = DefaultTask(loader=FSElasticsearchJSONLoader(),
extractor=Neo4jSearchDataExtractor(),
transformer=NoopTransformer())
# elastic search client instance
elasticsearch_client = es
# unique name of new index in Elasticsearch
elasticsearch_new_index_key = f'tables{uuid.uuid4()}'
# related to mapping type from /databuilder/publisher/elasticsearch_publisher.py#L38
elasticsearch_new_index_key_type = 'table'
# alias for Elasticsearch used in amundsensearchlibrary/search_service/config.py as an index
elasticsearch_index_alias = 'table_search_index'
job_config = ConfigFactory.from_dict({
f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.GRAPH_URL_CONFIG_KEY}': neo4j_endpoint,
f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.MODEL_CLASS_CONFIG_KEY}':
'databuilder.models.table_elasticsearch_document.TableESDocument',
f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_USER}': neo4j_user,
f'extractor.search_data.extractor.neo4j.{Neo4jExtractor.NEO4J_AUTH_PW}': neo4j_password,
f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
f'loader.filesystem.elasticsearch.{FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY}': 'w',
f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_PATH_CONFIG_KEY}': extracted_search_data_path,
f'publisher.elasticsearch.{ElasticsearchPublisher.FILE_MODE_CONFIG_KEY}': 'r',
f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY}':
elasticsearch_client,
f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY}':
elasticsearch_new_index_key,
f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY}':
elasticsearch_new_index_key_type,
f'publisher.elasticsearch.{ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY}':
elasticsearch_index_alias
})
job = DefaultJob(conf=job_config,
task=task,
publisher=ElasticsearchPublisher())
job.launch()
with DAG('amundsen_databuilder', default_args=default_args, **dag_args) as dag:
postgres_table_extract_job = PythonOperator(
task_id='postgres_table_extract_job',
python_callable=create_table_extract_job
)
postgres_es_index_job = PythonOperator(
task_id='postgres_es_publisher_sample_job',
python_callable=create_es_publisher_sample_job
)
# elastic search update run after table metadata has been updated
postgres_table_extract_job >> postgres_es_index_job