Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use ASOF JOIN in Snowflake offline store query #4850

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 35 additions & 94 deletions sdk/python/feast/infra/offline_stores/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -716,8 +716,8 @@ def _get_entity_df_event_timestamp_range(

MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN = """
/*
Compute a deterministic hash for the `left_table_query_string` that will be used throughout
all the logic as the field to GROUP BY the data
0. Compute a deterministic hash for the `left_table_query_string` that will be used throughout
all the logic as the field to GROUP BY the data.
*/
WITH "entity_dataframe" AS (
SELECT *,
Expand All @@ -739,6 +739,10 @@ def _get_entity_df_event_timestamp_range(

{% for featureview in featureviews %}

/*
1. Only select the required columns with entities of the featureview.
*/

"{{ featureview.name }}__entity_dataframe" AS (
SELECT
{{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}
Expand All @@ -751,120 +755,57 @@ def _get_entity_df_event_timestamp_range(
"{{featureview.name}}__entity_row_unique_id"
),

/*
This query template performs the point-in-time correctness join for a single feature set table
to the provided entity table.

1. We first join the current feature_view to the entity dataframe that has been passed.
This JOIN has the following logic:
- For each row of the entity dataframe, only keep the rows where the `timestamp_field`
is less than the one provided in the entity dataframe
- If there a TTL for the current feature_view, also keep the rows where the `timestamp_field`
is higher the the one provided minus the TTL
- For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been
computed previously

The output of this CTE will contain all the necessary information and already filtered out most
of the data that is not relevant.
*/

"{{ featureview.name }}__subquery" AS (
SELECT
"{{ featureview.timestamp_field }}" as "event_timestamp",
{{'"' ~ featureview.created_timestamp_column ~ '" as "created_timestamp",' if featureview.created_timestamp_column else '' }}
{{featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %}
{% for feature in featureview.features %}
"{{ feature }}" as {% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }}
WHERE "{{ featureview.timestamp_field }}" <= '{{ featureview.max_event_timestamp }}'
{% if featureview.ttl == 0 %}{% else %}
AND "{{ featureview.timestamp_field }}" >= '{{ featureview.min_event_timestamp }}'
{% endif %}
),

"{{ featureview.name }}__base" AS (
SELECT
"subquery".*,
"entity_dataframe"."entity_timestamp",
"entity_dataframe"."{{featureview.name}}__entity_row_unique_id"
FROM "{{ featureview.name }}__subquery" AS "subquery"
INNER JOIN "{{ featureview.name }}__entity_dataframe" AS "entity_dataframe"
ON TRUE
AND "subquery"."event_timestamp" <= "entity_dataframe"."entity_timestamp"

{% if featureview.ttl == 0 %}{% else %}
AND "subquery"."event_timestamp" >= TIMESTAMPADD(second,-{{ featureview.ttl }},"entity_dataframe"."entity_timestamp")
{% endif %}

{% for entity in featureview.entities %}
AND "subquery"."{{ entity }}" = "entity_dataframe"."{{ entity }}"
{% endfor %}
),

/*
2. If the `created_timestamp_column` has been set, we need to
deduplicate the data first. This is done by calculating the
`MAX(created_at_timestamp)` for each event_timestamp.
We then join the data on the next CTE
Otherwise, the ASOF JOIN can have unstable side effects
https://docs.snowflake.com/en/sql-reference/constructs/asof-join#expected-behavior-when-ties-exist-in-the-right-table
*/

{% if featureview.created_timestamp_column %}
"{{ featureview.name }}__dedup" AS (
SELECT
"{{featureview.name}}__entity_row_unique_id",
{{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %},
"event_timestamp",
MAX("created_timestamp") AS "created_timestamp"
FROM "{{ featureview.name }}__base"
GROUP BY "{{featureview.name}}__entity_row_unique_id", "event_timestamp"
FROM "{{ featureview.table_subquery }}"
GROUP BY {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}, "event_timestamp"
),
{% endif %}

/*
3. The data has been filtered during the first CTE "*__base"
Thus we only need to compute the latest timestamp of each feature.
3. Make ASOF JOIN of deduplicated feature CTE on reduced entity dataframe.
*/
"{{ featureview.name }}__latest" AS (

"{{ featureview.name }}__asof_join" AS (
SELECT
"event_timestamp",
{% if featureview.created_timestamp_column %}"created_timestamp",{% endif %}
"{{featureview.name}}__entity_row_unique_id"
FROM
(
SELECT *,
ROW_NUMBER() OVER(
PARTITION BY "{{featureview.name}}__entity_row_unique_id"
ORDER BY "event_timestamp" DESC{% if featureview.created_timestamp_column %},"created_timestamp" DESC{% endif %}
) AS "row_number"
FROM "{{ featureview.name }}__base"
{% if featureview.created_timestamp_column %}
INNER JOIN "{{ featureview.name }}__dedup"
USING ("{{featureview.name}}__entity_row_unique_id", "event_timestamp", "created_timestamp")
{% endif %}
)
WHERE "row_number" = 1
e.*,
{% for feature in featureview.features %}
v."{{ feature }}" as {% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}{% if loop.last %}{% else %}, {% endif %}
{% endfor %},
v."{{ featureview.timestamp_field }}"
FROM "{{ featureview.name }}__entity_dataframe" e
ASOF JOIN {% if featureview.created_timestamp_column %}"{{ featureview.name }}__dedup"{% else %}{{ featureview.table_subquery }}{% endif %} v
MATCH_CONDITION (e."entity_timestamp" >= v."{{ featureview.timestamp_field }}")
USING({% for entity in featureview.entities %}{% if not loop.first %},{% endif %}"{{ entity }}"{% endfor %})
),

/*
4. Once we know the latest value of each feature for a given timestamp,
we can join again the data back to the original "base" dataset
4. If TTL is configured filter the CTE to remove rows where the feature values are older than the configured ttl.
*/
"{{ featureview.name }}__cleaned" AS (
SELECT "base".*
FROM "{{ featureview.name }}__base" AS "base"
INNER JOIN "{{ featureview.name }}__latest"
USING(
"{{featureview.name}}__entity_row_unique_id",
"event_timestamp"
{% if featureview.created_timestamp_column %}
,"created_timestamp"
{% endif %}
)
){% if loop.last %}{% else %}, {% endif %}

"{{ featureview.name }}__ttl" AS (
SELECT *
FROM "{{ featureview.name }}__asof_join"
{% if featureview.ttl == 0 %}{% else %}
WHERE "{{ featureview.timestamp_field }}" >= TIMESTAMPADD(second,-{{ featureview.ttl }},"entity_timestamp")
{% endif %}
){% if loop.last %}{% else %}, {% endif %}

{% endfor %}
/*
Joins the outputs of multiple time travel joins to a single table.
Join the outputs of multiple time travel joins to a single table.
The entity_dataframe dataset being our source of truth here.
*/

Expand All @@ -877,7 +818,7 @@ def _get_entity_df_event_timestamp_range(
{% for feature in featureview.features %}
,{% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}
{% endfor %}
FROM "{{ featureview.name }}__cleaned"
) "{{ featureview.name }}__cleaned" USING ("{{featureview.name}}__entity_row_unique_id")
FROM "{{ featureview.name }}__ttl"
) "{{ featureview.name }}__ttl" USING ("{{featureview.name}}__entity_row_unique_id")
{% endfor %}
"""
Loading