Skip to content

Commit

Permalink
Fix Query Performance (#658)
Browse files Browse the repository at this point in the history
* Fix TWA performance and update tests for case insensitivity

Signed-off-by: rodalynbarce <[email protected]>

* Black formatting

Signed-off-by: rodalynbarce <[email protected]>

---------

Signed-off-by: rodalynbarce <[email protected]>
  • Loading branch information
rodalynbarce authored Feb 12, 2024
1 parent 88fb840 commit f997c1a
Show file tree
Hide file tree
Showing 12 changed files with 164 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,11 @@ def _interpolation_query(

interpolate_query = (
f"WITH resample AS ({sample_query})"
"{% if case_insensitivity_tag_search is defined and case_insensitivity_tag_search == true %}"
',date_array AS (SELECT DISTINCT explode(sequence(from_utc_timestamp(to_timestamp("{{ start_date }}"), "{{ time_zone }}"), from_utc_timestamp(to_timestamp("{{ end_date }}"), "{{ time_zone }}"), INTERVAL \'{{ time_interval_rate + \' \' + time_interval_unit }}\')) AS `{{ timestamp_column }}`, explode(array(`{{ tagname_column }}`)) AS `{{ tagname_column }}` FROM resample) '
"{% else %}"
",date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp(\"{{ start_date }}\"), \"{{ time_zone }}\"), from_utc_timestamp(to_timestamp(\"{{ end_date }}\"), \"{{ time_zone }}\"), INTERVAL '{{ time_interval_rate + ' ' + time_interval_unit }}')) AS `{{ timestamp_column }}`, explode(array('{{ tag_names | join('\\', \\'') }}')) AS `{{ tagname_column }}`) "
"{% endif %}"
'{% if (interpolation_method is defined) and (interpolation_method == "forward_fill" or interpolation_method == "backward_fill") %}'
",project AS (SELECT a.`{{ timestamp_column }}`, a.`{{ tagname_column }}`, {{ interpolation_options_0 }}(b.`{{ value_column }}`, true) OVER (PARTITION BY a.`{{ tagname_column }}` ORDER BY a.`{{ timestamp_column }}` ROWS BETWEEN {{ interpolation_options_1 }} AND {{ interpolation_options_2 }}) AS `{{ value_column }}` FROM date_array a LEFT OUTER JOIN resample b ON a.`{{ timestamp_column }}` = b.`{{ timestamp_column }}` AND a.`{{ tagname_column }}` = b.`{{ tagname_column }}`) "
'{% elif (interpolation_method is defined) and (interpolation_method == "linear") %}'
Expand Down Expand Up @@ -275,11 +279,19 @@ def _interpolation_at_time(parameters_dict: dict) -> str:
"AND `{{ tagname_column }}` IN ('{{ tag_names | join('\\', \\'') }}')"
"{% endif %} "
"{% if include_status is defined and include_status == true and include_bad_data is defined and include_bad_data == false %} AND `{{ status_column }}` = 'Good' {% endif %}) "
", date_array AS (SELECT DISTINCT explode(array( "
"{% if case_insensitivity_tag_search is defined and case_insensitivity_tag_search == true %}"
", date_array AS (SELECT DISTINCT explode(array("
"{% else %}"
", date_array AS (SELECT explode(array( "
"{% endif %} "
"{% for timestamp in timestamps -%} "
'from_utc_timestamp(to_timestamp("{{timestamp}}"), "{{time_zone}}") '
"{% if not loop.last %} , {% endif %} {% endfor %} )) AS `{{ timestamp_column }}`, "
"explode(array(`{{ tagname_column }}`)) AS `{{ tagname_column }}` FROM raw_events) "
"{% if case_insensitivity_tag_search is defined and case_insensitivity_tag_search == true %}"
"explode(array(`{{ tagname_column }}`)) AS `{{ tagname_column }}` FROM raw_events)"
"{% else %}"
"explode(array('{{ tag_names | join('\\', \\'') }}')) AS `{{ tagname_column }}`) "
"{% endif %} "
", interpolation_events AS (SELECT coalesce(a.`{{ tagname_column }}`, b.`{{ tagname_column }}`) AS `{{ tagname_column }}`, coalesce(a.`{{ timestamp_column }}`, b.`{{ timestamp_column }}`) AS `{{ timestamp_column }}`, a.`{{ timestamp_column }}` AS `Requested_{{ timestamp_column }}`, b.`{{ timestamp_column }}` AS `Found_{{ timestamp_column }}`, b.`{{ status_column }}`, b.`{{ value_column }}` FROM date_array a FULL OUTER JOIN raw_events b ON a.`{{ timestamp_column }}` = b.`{{ timestamp_column }}` AND a.`{{ tagname_column }}` = b.`{{ tagname_column }}`) "
", interpolation_calculations AS (SELECT *, lag(`{{ timestamp_column }}`) OVER (PARTITION BY `{{ tagname_column }}` ORDER BY `{{ timestamp_column }}`) AS `Previous_{{ timestamp_column }}`, lag(`{{ value_column }}`) OVER (PARTITION BY `{{ tagname_column }}` ORDER BY `{{ timestamp_column }}`) AS `Previous_{{ value_column }}`, lead(`{{ timestamp_column }}`) OVER (PARTITION BY `{{ tagname_column }}` ORDER BY `{{ timestamp_column }}`) AS `Next_{{ timestamp_column }}`, lead(`{{ value_column }}`) OVER (PARTITION BY `{{ tagname_column }}` ORDER BY `{{ timestamp_column }}`) AS `Next_{{ value_column }}`, "
"CASE WHEN `Requested_{{ timestamp_column }}` = `Found_{{ timestamp_column }}` THEN `{{ value_column }}` WHEN `Next_{{ timestamp_column }}` IS NULL THEN `Previous_{{ value_column }}` WHEN `Previous_{{ timestamp_column }}` IS NULL AND `Next_{{ timestamp_column }}` IS NULL THEN NULL "
Expand Down Expand Up @@ -460,7 +472,11 @@ def _time_weighted_average_query(parameters_dict: dict) -> str:
"WHERE to_date(`{{ timestamp_column }}`) BETWEEN date_sub(to_date(to_timestamp(\"{{ start_date }}\")), {{ window_length }}) AND date_add(to_date(to_timestamp(\"{{ end_date }}\")), {{ window_length }}) AND `{{ tagname_column }}` IN ('{{ tag_names | join('\\', \\'') }}') "
"{% endif %}"
"{% if include_status is defined and include_status == true and include_bad_data is defined and include_bad_data == false %} AND `{{ status_column }}` = 'Good' {% endif %}) "
"{% if case_insensitivity_tag_search is defined and case_insensitivity_tag_search == true %}"
',date_array AS (SELECT DISTINCT explode(sequence(from_utc_timestamp(to_timestamp("{{ start_date }}"), "{{ time_zone }}"), from_utc_timestamp(to_timestamp("{{ end_date }}"), "{{ time_zone }}"), INTERVAL \'{{ time_interval_rate + \' \' + time_interval_unit }}\')) AS `{{ timestamp_column }}`, explode(array(`{{ tagname_column }}`)) AS `{{ tagname_column }}` FROM raw_events) '
"{% else %}"
",date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp(\"{{ start_date }}\"), \"{{ time_zone }}\"), from_utc_timestamp(to_timestamp(\"{{ end_date }}\"), \"{{ time_zone }}\"), INTERVAL '{{ time_interval_rate + ' ' + time_interval_unit }}')) AS `{{ timestamp_column }}`, explode(array('{{ tag_names | join('\\', \\'') }}')) AS `{{ tagname_column }}`) "
"{% endif %}"
",boundary_events AS (SELECT coalesce(a.`{{ tagname_column }}`, b.`{{ tagname_column }}`) AS `{{ tagname_column }}`, coalesce(a.`{{ timestamp_column }}`, b.`{{ timestamp_column }}`) AS `{{ timestamp_column }}`, b.`{{ status_column }}`, b.`{{ value_column }}` FROM date_array a FULL OUTER JOIN raw_events b ON a.`{{ timestamp_column }}` = b.`{{ timestamp_column }}` AND a.`{{ tagname_column }}` = b.`{{ tagname_column }}`) "
",window_buckets AS (SELECT `{{ timestamp_column }}` AS window_start, LEAD(`{{ timestamp_column }}`) OVER (ORDER BY `{{ timestamp_column }}`) AS window_end FROM (SELECT distinct `{{ timestamp_column }}` FROM date_array) ) "
",window_events AS (SELECT /*+ RANGE_JOIN(b, {{ range_join_seconds }} ) */ b.`{{ tagname_column }}`, b.`{{ timestamp_column }}`, a.window_start AS `WindowEventTime`, b.`{{ status_column }}`, b.`{{ value_column }}` FROM boundary_events b LEFT OUTER JOIN window_buckets a ON a.window_start <= b.`{{ timestamp_column }}` AND a.window_end > b.`{{ timestamp_column }}`) "
Expand Down Expand Up @@ -571,7 +587,11 @@ def _circular_stats_query(parameters_dict: dict) -> str:
"WHERE `{{ timestamp_column }}` BETWEEN TO_TIMESTAMP(\"{{ start_date }}\") AND TO_TIMESTAMP(\"{{ end_date }}\") AND `{{ tagname_column }}` IN ('{{ tag_names | join('\\', \\'') }}') "
"{% endif %}"
"{% if include_status is defined and include_status == true and include_bad_data is defined and include_bad_data == false %} AND `{{ status_column }}` = 'Good' {% endif %}) "
"{% if case_insensitivity_tag_search is defined and case_insensitivity_tag_search == true %}"
',date_array AS (SELECT DISTINCT EXPLODE(SEQUENCE(FROM_UTC_TIMESTAMP(TO_TIMESTAMP("{{ start_date }}"), "{{ time_zone }}"), FROM_UTC_TIMESTAMP(TO_TIMESTAMP("{{ end_date }}"), "{{ time_zone }}"), INTERVAL \'{{ time_interval_rate + \' \' + time_interval_unit }}\')) AS `{{ timestamp_column }}`, EXPLODE(ARRAY(`{{ tagname_column }}`)) AS `{{ tagname_column }}` FROM raw_events) '
"{% else %}"
",date_array AS (SELECT EXPLODE(SEQUENCE(FROM_UTC_TIMESTAMP(TO_TIMESTAMP(\"{{ start_date }}\"), \"{{ time_zone }}\"), FROM_UTC_TIMESTAMP(TO_TIMESTAMP(\"{{ end_date }}\"), \"{{ time_zone }}\"), INTERVAL '{{ time_interval_rate + ' ' + time_interval_unit }}')) AS `{{ timestamp_column }}`, EXPLODE(ARRAY('{{ tag_names | join('\\', \\'') }}')) AS `{{ tagname_column }}`) "
"{% endif %}"
",window_events AS (SELECT COALESCE(a.`{{ tagname_column }}`, b.`{{ tagname_column }}`) AS `{{ tagname_column }}`, COALESCE(a.`{{ timestamp_column }}`, b.`{{ timestamp_column }}`) AS `{{ timestamp_column }}`, WINDOW(COALESCE(a.`{{ timestamp_column }}`, b.`{{ timestamp_column }}`), '{{ time_interval_rate + ' ' + time_interval_unit }}').START `Window{{ timestamp_column }}`, b.`{{ status_column }}`, b.`{{ value_column }}` FROM date_array a FULL OUTER JOIN raw_events b ON CAST(a.`{{ timestamp_column }}` AS LONG) = CAST(b.`{{ timestamp_column }}` AS LONG) AND a.`{{ tagname_column }}` = b.`{{ tagname_column }}`) "
",calculation_set_up AS (SELECT `{{ timestamp_column }}`, `Window{{ timestamp_column }}`, `{{ tagname_column }}`, `{{ value_column }}`, MOD(`{{ value_column }}` - {{ lower_bound }}, ({{ upper_bound }} - {{ lower_bound }}))*(2*pi()/({{ upper_bound }} - {{ lower_bound }})) AS `{{ value_column }}_in_Radians`, LAG(`{{ timestamp_column }}`) OVER (PARTITION BY `{{ tagname_column }}` ORDER BY `{{ timestamp_column }}`) AS `Previous_{{ timestamp_column }}`, (unix_millis(`{{ timestamp_column }}`) - unix_millis(`Previous_{{ timestamp_column }}`)) / 86400000 AS Time_Difference, COS(`{{ value_column }}_in_Radians`) AS Cos_Value, SIN(`{{ value_column }}_in_Radians`) AS Sin_Value FROM window_events) "
",circular_average_calculations AS (SELECT `Window{{ timestamp_column }}`, `{{ tagname_column }}`, Time_Difference, AVG(Cos_Value) OVER (PARTITION BY `{{ tagname_column }}` ORDER BY `{{ timestamp_column }}` ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS Average_Cos, AVG(Sin_Value) OVER (PARTITION BY `{{ tagname_column }}` ORDER BY `{{ timestamp_column }}` ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS Average_Sin, SQRT(POW(Average_Cos, 2) + POW(Average_Sin, 2)) AS Vector_Length, Average_Cos/Vector_Length AS Rescaled_Average_Cos, Average_Sin/Vector_Length AS Rescaled_Average_Sin, Time_Difference * Rescaled_Average_Cos AS Diff_Average_Cos, Time_Difference * Rescaled_Average_Sin AS Diff_Average_Sin FROM calculation_set_up) "
Expand Down
Loading

0 comments on commit f997c1a

Please sign in to comment.