-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathcreate_dataset.sql
77 lines (66 loc) · 2.41 KB
/
create_dataset.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
{% macro create_dataset(
label,
features
) %}
{% if not execute %}
{{ return('') }}
{% endif %}
{# Load and assert we have necessary attributes #}
{% do feature_store.__f__load_meta(label) %}
{% for feature in features %}
{% do feature_store.__f__load_meta(feature) %}
{% endfor %}
WITH __f__label AS (
SELECT
{% for column in label.columns %}
{{ column }},
{% endfor %}
{{ label.entity_column }} AS __f__entity_column,
{{ label.timestamp_column }} AS __f__timestamp_column
FROM {{ label.table }}
)
{# Set internal CTE names for future usage #}
{% for feature in features %}
{% set lookalike = dbt_utils.slugify(feature.table) | reverse | truncate(30, False, '', 0) | reverse %}
{# HACK: making sure CTE names are different based on current timestamp's seconds and microseconds #}
{% set millis = dbt_utils.pretty_time(format='%S%f') %}
{% do feature.update({'__f__cte_name': '__f__cte_' + lookalike + '_' + millis}) %}
{% endfor %}
{% for feature in features %}
, {{ feature.__f__cte_name }} AS (
SELECT
{% for column in feature.columns %}
{{ column }},
{% endfor %}
{{ feature.entity_column }} AS __f__entity_column,
{{ feature.timestamp_column }} AS __f__timestamp_column,
{{ feature_store.next_timestamp(
feature.entity_column,
feature.timestamp_column
) }} AS __f__next_timestamp
FROM {{ feature.table }}
)
{% endfor %}
SELECT
{% for feature in features %}
{% for column in feature.columns %}
{{ feature.__f__cte_name }}.{{ column }},
{% endfor %}
{% endfor %}
{% for column in label.columns | reject("in", [label.entity_column, label.timestamp_column]) %}
__f__label.{{ column }},
{% endfor %}
__f__label.__f__entity_column AS {{ label.entity_column }},
__f__label.__f__timestamp_column AS {{ label.timestamp_column }}
FROM __f__label
{% for feature in features %}
LEFT JOIN {{ feature.__f__cte_name }}
ON {{ feature_store.label_feature_join(
'__f__label.__f__entity_column',
'__f__label.__f__timestamp_column',
feature.__f__cte_name + '.__f__entity_column',
feature.__f__cte_name + '.__f__timestamp_column',
feature.__f__cte_name + '.__f__next_timestamp'
) }}
{% endfor %}
{% endmacro %}