Skip to content

Commit

Permalink
Use ASOF JOIN in Snowflake offline store query
Browse files Browse the repository at this point in the history
Signed-off-by: hkuepers <[email protected]>
  • Loading branch information
hkuepers committed Dec 13, 2024
1 parent 88a92cf commit da7b74a
Showing 1 changed file with 35 additions and 94 deletions.
129 changes: 35 additions & 94 deletions sdk/python/feast/infra/offline_stores/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -716,8 +716,8 @@ def _get_entity_df_event_timestamp_range(

MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN = """
/*
Compute a deterministic hash for the `left_table_query_string` that will be used throughout
all the logic as the field to GROUP BY the data
0. Compute a deterministic hash for the `left_table_query_string` that will be used throughout
all the logic as the field to GROUP BY the data.
*/
WITH "entity_dataframe" AS (
SELECT *,
Expand All @@ -739,6 +739,10 @@ def _get_entity_df_event_timestamp_range(
{% for featureview in featureviews %}
/*
1. Only select the required columns with entities of the featureview.
*/
"{{ featureview.name }}__entity_dataframe" AS (
SELECT
{{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}
Expand All @@ -751,120 +755,57 @@ def _get_entity_df_event_timestamp_range(
"{{featureview.name}}__entity_row_unique_id"
),
/*
This query template performs the point-in-time correctness join for a single feature set table
to the provided entity table.
1. We first join the current feature_view to the entity dataframe that has been passed.
This JOIN has the following logic:
- For each row of the entity dataframe, only keep the rows where the `timestamp_field`
is less than the one provided in the entity dataframe
- If there a TTL for the current feature_view, also keep the rows where the `timestamp_field`
is higher the the one provided minus the TTL
- For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been
computed previously
The output of this CTE will contain all the necessary information and already filtered out most
of the data that is not relevant.
*/
"{{ featureview.name }}__subquery" AS (
SELECT
"{{ featureview.timestamp_field }}" as "event_timestamp",
{{'"' ~ featureview.created_timestamp_column ~ '" as "created_timestamp",' if featureview.created_timestamp_column else '' }}
{{featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %}
{% for feature in featureview.features %}
"{{ feature }}" as {% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }}
WHERE "{{ featureview.timestamp_field }}" <= '{{ featureview.max_event_timestamp }}'
{% if featureview.ttl == 0 %}{% else %}
AND "{{ featureview.timestamp_field }}" >= '{{ featureview.min_event_timestamp }}'
{% endif %}
),
"{{ featureview.name }}__base" AS (
SELECT
"subquery".*,
"entity_dataframe"."entity_timestamp",
"entity_dataframe"."{{featureview.name}}__entity_row_unique_id"
FROM "{{ featureview.name }}__subquery" AS "subquery"
INNER JOIN "{{ featureview.name }}__entity_dataframe" AS "entity_dataframe"
ON TRUE
AND "subquery"."event_timestamp" <= "entity_dataframe"."entity_timestamp"
{% if featureview.ttl == 0 %}{% else %}
AND "subquery"."event_timestamp" >= TIMESTAMPADD(second,-{{ featureview.ttl }},"entity_dataframe"."entity_timestamp")
{% endif %}
{% for entity in featureview.entities %}
AND "subquery"."{{ entity }}" = "entity_dataframe"."{{ entity }}"
{% endfor %}
),
/*
2. If the `created_timestamp_column` has been set, we need to
deduplicate the data first. This is done by calculating the
`MAX(created_at_timestamp)` for each event_timestamp.
We then join the data on the next CTE
Otherwise, the ASOF JOIN can have unstable side effects
https://docs.snowflake.com/en/sql-reference/constructs/asof-join#expected-behavior-when-ties-exist-in-the-right-table
*/
{% if featureview.created_timestamp_column %}
"{{ featureview.name }}__dedup" AS (
SELECT
"{{featureview.name}}__entity_row_unique_id",
{{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %},
"event_timestamp",
MAX("created_timestamp") AS "created_timestamp"
FROM "{{ featureview.name }}__base"
GROUP BY "{{featureview.name}}__entity_row_unique_id", "event_timestamp"
FROM "{{ featureview.table_subquery }}"
GROUP BY {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}, "event_timestamp"
),
{% endif %}
/*
3. The data has been filtered during the first CTE "*__base"
Thus we only need to compute the latest timestamp of each feature.
3. Make ASOF JOIN of deduplicated feature CTE on reduced entity dataframe.
*/
"{{ featureview.name }}__latest" AS (
"{{ featureview.name }}__asof_join" AS (
SELECT
"event_timestamp",
{% if featureview.created_timestamp_column %}"created_timestamp",{% endif %}
"{{featureview.name}}__entity_row_unique_id"
FROM
(
SELECT *,
ROW_NUMBER() OVER(
PARTITION BY "{{featureview.name}}__entity_row_unique_id"
ORDER BY "event_timestamp" DESC{% if featureview.created_timestamp_column %},"created_timestamp" DESC{% endif %}
) AS "row_number"
FROM "{{ featureview.name }}__base"
{% if featureview.created_timestamp_column %}
INNER JOIN "{{ featureview.name }}__dedup"
USING ("{{featureview.name}}__entity_row_unique_id", "event_timestamp", "created_timestamp")
{% endif %}
)
WHERE "row_number" = 1
e.*,
{% for feature in featureview.features %}
v."{{ feature }}" as {% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}{% if loop.last %}{% else %}, {% endif %}
{% endfor %},
v."{{ featureview.timestamp_field }}"
FROM "{{ featureview.name }}__entity_dataframe" e
ASOF JOIN {% if featureview.created_timestamp_column %}"{{ featureview.name }}__dedup"{% else %}{{ featureview.table_subquery }}{% endif %} v
MATCH_CONDITION (e."entity_timestamp" >= v."{{ featureview.timestamp_field }}")
USING({% for entity in featureview.entities %}{% if not loop.first %},{% endif %}"{{ entity }}"{% endfor %})
),
/*
4. Once we know the latest value of each feature for a given timestamp,
we can join again the data back to the original "base" dataset
4. If TTL is configured filter the CTE to remove rows where the feature values are older than the configured ttl.
*/
"{{ featureview.name }}__cleaned" AS (
SELECT "base".*
FROM "{{ featureview.name }}__base" AS "base"
INNER JOIN "{{ featureview.name }}__latest"
USING(
"{{featureview.name}}__entity_row_unique_id",
"event_timestamp"
{% if featureview.created_timestamp_column %}
,"created_timestamp"
{% endif %}
)
){% if loop.last %}{% else %}, {% endif %}
"{{ featureview.name }}__ttl" AS (
SELECT *
FROM "{{ featureview.name }}__asof_join"
{% if featureview.ttl == 0 %}{% else %}
WHERE "{{ featureview.timestamp_field }}" >= TIMESTAMPADD(second,-{{ featureview.ttl }},"entity_timestamp")
{% endif %}
){% if loop.last %}{% else %}, {% endif %}
{% endfor %}
/*
Joins the outputs of multiple time travel joins to a single table.
Join the outputs of multiple time travel joins to a single table.
The entity_dataframe dataset being our source of truth here.
*/
Expand All @@ -877,7 +818,7 @@ def _get_entity_df_event_timestamp_range(
{% for feature in featureview.features %}
,{% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}
{% endfor %}
FROM "{{ featureview.name }}__cleaned"
) "{{ featureview.name }}__cleaned" USING ("{{featureview.name}}__entity_row_unique_id")
FROM "{{ featureview.name }}__ttl"
) "{{ featureview.name }}__ttl" USING ("{{featureview.name}}__entity_row_unique_id")
{% endfor %}
"""

0 comments on commit da7b74a

Please sign in to comment.