-
Notifications
You must be signed in to change notification settings - Fork 53
/
Copy pathsnapshot.sql
99 lines (69 loc) · 3.59 KB
/
snapshot.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
{% materialization snapshot, default %}
{%- set config = model['config'] -%}
{%- set target_table = model.get('alias', model.get('name')) -%}
{%- set strategy_name = config.get('strategy') -%}
{%- set unique_key = config.get('unique_key') %}
-- grab current tables grants config for comparision later on
{%- set grant_config = config.get('grants') -%}
{% set target_relation_exists, target_relation = get_or_create_relation(
database=model.database,
schema=model.schema,
identifier=target_table,
type='table') -%}
{%- if not target_relation.is_table -%}
{% do exceptions.relation_wrong_type(target_relation, 'table') %}
{%- endif -%}
{{ run_hooks(pre_hooks, inside_transaction=False) }}
{{ run_hooks(pre_hooks, inside_transaction=True) }}
{% set strategy_macro = strategy_dispatch(strategy_name) %}
{% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %}
{% if not target_relation_exists %}
{% set build_sql = build_snapshot_table(strategy, model['compiled_code']) %}
{% set final_sql = create_table_as(False, target_relation, build_sql) %}
{% else %}
{{ adapter.valid_snapshot_target(target_relation) }}
{% set staging_table = build_snapshot_staging_table(strategy, sql, target_relation) %}
-- this may no-op if the database does not require column expansion
{% do adapter.expand_target_column_types(from_relation=staging_table,
to_relation=target_relation) %}
{% set missing_columns = adapter.get_missing_columns(staging_table, target_relation)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| list %}
{% do create_columns(target_relation, missing_columns) %}
{% set source_columns = adapter.get_columns_in_relation(staging_table)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| list %}
{% set quoted_source_columns = [] %}
{% for column in source_columns %}
{% do quoted_source_columns.append(adapter.quote(column.name)) %}
{% endfor %}
{% set final_sql = snapshot_merge_sql(
target = target_relation,
source = staging_table,
insert_cols = quoted_source_columns
)
%}
{% endif %}
{% call statement('main') %}
{{ final_sql }}
{% endcall %}
{% set should_revoke = should_revoke(target_relation_exists, full_refresh_mode=False) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
{% do persist_docs(target_relation, model) %}
{% if not target_relation_exists %}
{% do create_indexes(target_relation) %}
{% endif %}
{{ run_hooks(post_hooks, inside_transaction=True) }}
{{ adapter.commit() }}
{% if staging_table is defined %}
{% do post_snapshot(staging_table) %}
{% endif %}
{{ run_hooks(post_hooks, inside_transaction=False) }}
{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}