forked from ScalefreeCOM/turbovault4dbt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathturbovault_snowflake.py
153 lines (117 loc) · 4.83 KB
/
turbovault_snowflake.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import os
from configparser import ConfigParser
from procs.sqlite3 import stage
from procs.sqlite3 import satellite
from procs.sqlite3 import hub
from procs.sqlite3 import link
from logging import Logger
import pandas as pd
import sqlite3
from gooey import Gooey
from gooey import GooeyParser
from datetime import datetime
import snowflake.connector
import time
image_path = os.path.join(os.path.dirname(__file__),"images")
def connect_snowflake():
config = ConfigParser()
config.read(os.path.join(os.path.dirname(__file__),"config.ini"))
database = config.get('Snowflake', 'database')
warehouse = config.get('Snowflake', 'warehouse')
role = config.get('Snowflake', 'role')
schema = config.get('Snowflake', 'meta_schema')
snowflake_credentials = ConfigParser()
snowflake_credentials.read(config.get('Snowflake', 'credential_path'))
user = snowflake_credentials.get('main', 'SNOWFLAKE_USER_NAME')
password = snowflake_credentials.get('main', 'SNOWFLAKE_PASSWORD')
ctx = snowflake.connector.connect(
user= user,
password=password,
account=config.get('Snowflake', 'account_identifier'),
database=database,
warehouse=warehouse,
role=role,
schema=schema
)
cursor = ctx.cursor()
sql_source_data = "SELECT * FROM source_data"
cursor.execute(sql_source_data)
df_source_data = cursor.fetch_pandas_all()
cursor.close()
cursor = ctx.cursor()
sql_hub_entities = "SELECT * FROM hub_entities"
cursor.execute(sql_hub_entities)
df_hub_entities = cursor.fetch_pandas_all()
cursor.close()
cursor = ctx.cursor()
sql_hub_satellites = "SELECT * FROM hub_satellites"
cursor.execute(sql_hub_satellites)
df_hub_satellites = cursor.fetch_pandas_all()
cursor.close()
cursor = ctx.cursor()
sql_link_entities = "SELECT * FROM link_entities"
cursor.execute(sql_link_entities)
df_link_entities = cursor.fetch_pandas_all()
cursor.close()
cursor = ctx.cursor()
sql_link_satellites = "SELECT * FROM link_satellites"
cursor.execute(sql_link_satellites)
df_link_satellites = cursor.fetch_pandas_all()
cursor.close()
ctx.close()
dfs = { "source_data": df_source_data,
"hub_entities": df_hub_entities,
"link_entities": df_link_entities,
"hub_satellites": df_hub_satellites,
"link_satellites": df_link_satellites}
db = sqlite3.connect(':memory:')
for table, df in dfs.items():
df.to_sql(table, db)
sqlite_cursor = db.cursor()
return sqlite_cursor
@Gooey(
navigation='TABBED',
program_name='TurboVault',
default_size=(800,800),
advanced=True,
image_dir=image_path)
def main():
config = ConfigParser()
config.read(os.path.join(os.path.dirname(__file__),"config.ini"))
model_path = config.get('Snowflake','model_path')
hashdiff_naming = config.get('Snowflake','hashdiff_naming')
cursor = connect_snowflake()
cursor.execute("SELECT DISTINCT SOURCE_SYSTEM || '_' || SOURCE_OBJECT FROM source_data")
results = cursor.fetchall()
available_sources = []
for row in results:
available_sources.append(row[0])
generated_timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
parser = GooeyParser(description='Config')
parser.add_argument("--Tasks",help="Select the entities which You want to generate",action="append",widget='Listbox',choices=['Stage','Hub','Satellite','Link'],default=['Stage','Hub','Satellite','Link'],nargs='*',gooey_options={'height': 300})
parser.add_argument("--Sources",action="append",nargs="+", widget='Listbox', choices=available_sources, gooey_options={'height': 300},
help="Select the sources which You want to process")
args = parser.parse_args()
try:
todo = args.Tasks[4]
except IndexError:
print("Keine Entitäten ausgesucht.")
todo = ""
rdv_default_schema = config.get('Snowflake', 'rdv_schema')
stage_default_schema = config.get('Snowflake', 'stage_schema')
for source in args.Sources[0]:
if 'Stage' in todo:
stage.generate_stage(cursor,source, generated_timestamp, stage_default_schema, model_path, hashdiff_naming)
if 'Hub' in todo:
hub.generate_hub(cursor,source, generated_timestamp, rdv_default_schema, model_path)
if 'Link' in todo:
link.generate_link(cursor,source, generated_timestamp, rdv_default_schema, model_path)
if 'Satellite' in todo:
satellite.generate_satellite(cursor, source, generated_timestamp, rdv_default_schema, model_path, hashdiff_naming)
if __name__ == "__main__":
print("Starting Script.")
start = time.time()
main()
end = time.time()
print("Script ends.")
print("Total Runtime: " + str(round(end - start, 2)) + "s")