Skip to content

Commit

Permalink
bug fix feature engineering pipeline (GoogleCloudPlatform#43)
Browse files Browse the repository at this point in the history
Co-authored-by: Carlos Timoteo <[email protected]>
  • Loading branch information
chmstimoteo and Carlos Timoteo authored Sep 14, 2023
1 parent a1cd54a commit 2909706
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 87 deletions.
110 changes: 25 additions & 85 deletions config/config.yaml.tftpl
Original file line number Diff line number Diff line change
Expand Up @@ -89,89 +89,29 @@ vertex_ai:
project_id: "${project_id}"
location: "${location}"
query_customer_lifetime_value_label: "
DECLARE input_date DATE;
DECLARE end_date DATE;
DECLARE rows_added INT64 DEFAULT NULL;
SET end_date= IFNULL(@input_date, CURRENT_DATE('{date_timezone}'));
SET input_date= (SELECT DATE_SUB(end_date, INTERVAL 180 DAY));
CALL `{customer_lifetime_value_label_procedure_name}`(input_date, end_date, rows_added);"
CALL `{customer_lifetime_value_label_procedure_name}`();"
query_purchase_propensity_label: "
DECLARE input_date DATE;
DECLARE end_date DATE;
DECLARE rows_added INT64 DEFAULT NULL;
SET end_date= IFNULL(@input_date, CURRENT_DATE('{date_timezone}'));
SET input_date= (SELECT DATE_SUB(end_date, INTERVAL 180 DAY));
CALL `{purchase_propensity_label_procedure_name}`(input_date, end_date, rows_added);"
CALL `{purchase_propensity_label_procedure_name}`();"
query_user_dimensions: "
DECLARE input_date DATE;
DECLARE end_date DATE;
DECLARE rows_added INT64 DEFAULT NULL;
SET input_date= IFNULL(@input_date, CURRENT_DATE('{date_timezone}'));
SET end_date= (SELECT DATE_SUB(input_date, INTERVAL 30 DAY));
CALL `{user_dimensions_procedure_name}`(input_date, end_date, rows_added);"
CALL `{user_dimensions_procedure_name}`();"
query_user_lifetime_dimensions: "
DECLARE input_date DATE;
DECLARE end_date DATE;
DECLARE rows_added INT64 DEFAULT NULL;
SET input_date= IFNULL(@input_date, CURRENT_DATE('{date_timezone}'));
SET end_date= (SELECT DATE_SUB(input_date, INTERVAL 180 DAY));
CALL `{user_lifetime_dimensions_procedure_name}`(input_date, end_date, rows_added);"
CALL `{user_lifetime_dimensions_procedure_name}`();"
query_user_lookback_metrics: "
DECLARE input_date DATE;
DECLARE end_date DATE;
DECLARE rows_added INT64 DEFAULT NULL;
SET input_date= IFNULL(@input_date, CURRENT_DATE('{date_timezone}'));
SET end_date= (SELECT DATE_SUB(input_date, INTERVAL 15 DAY));
CALL `{user_lookback_metrics_procedure_name}`(input_date, end_date, rows_added);"
CALL `{user_lookback_metrics_procedure_name}`();"
query_user_rolling_window_lifetime_metrics: "
DECLARE input_date DATE;
DECLARE end_date DATE;
DECLARE rows_added INT64 DEFAULT NULL;
SET input_date= IFNULL(@input_date, CURRENT_DATE('{date_timezone}'));
SET end_date= (SELECT DATE_SUB(input_date, INTERVAL 180 DAY));
CALL `{user_rolling_window_lifetime_metrics_procedure_name}`(input_date, end_date, rows_added);"
CALL `{user_rolling_window_lifetime_metrics_procedure_name}`();"
query_user_rolling_window_metrics: "
DECLARE input_date DATE;
DECLARE end_date DATE;
DECLARE rows_added INT64 DEFAULT NULL;
SET input_date= IFNULL(@input_date, CURRENT_DATE('{date_timezone}'));
SET end_date= (SELECT DATE_SUB(input_date, INTERVAL 30 DAY));
CALL `{user_rolling_window_metrics_procedure_name}`(input_date, end_date, rows_added);"
CALL `{user_rolling_window_metrics_procedure_name}`();"
query_user_scoped_lifetime_metrics: "
DECLARE input_date DATE;
DECLARE end_date DATE;
DECLARE rows_added INT64 DEFAULT NULL;
SET input_date= IFNULL(@input_date, CURRENT_DATE('{date_timezone}'));
SET end_date= (SELECT DATE_SUB(input_date, INTERVAL 180 DAY));
CALL `{user_scoped_lifetime_metrics_procedure_name}`(input_date, end_date, rows_added);"
CALL `{user_scoped_lifetime_metrics_procedure_name}`();"
query_user_scoped_metrics: "
DECLARE input_date DATE;
DECLARE end_date DATE;
DECLARE rows_added INT64 DEFAULT NULL;
SET input_date= IFNULL(@input_date, CURRENT_DATE('{date_timezone}'));
SET end_date= (SELECT DATE_SUB(input_date, INTERVAL 30 DAY));
CALL `{user_scoped_metrics_procedure_name}`(input_date, end_date, rows_added);"
CALL `{user_scoped_metrics_procedure_name}`();"
query_user_scoped_segmentation_metrics: "
DECLARE input_date DATE;
DECLARE end_date DATE;
DECLARE rows_added INT64 DEFAULT NULL;
SET input_date= IFNULL(@input_date, CURRENT_DATE('{date_timezone}'));
SET end_date= (SELECT DATE_SUB(input_date, INTERVAL 15 DAY));
CALL `{user_scoped_segmentation_metrics_procedure_name}`(input_date, end_date, rows_added);"
CALL `{user_scoped_segmentation_metrics_procedure_name}`();"
query_user_segmentation_dimensions: "
DECLARE input_date DATE;
DECLARE end_date DATE;
DECLARE rows_added INT64 DEFAULT NULL;
SET input_date= IFNULL(@input_date, CURRENT_DATE('{date_timezone}'));
SET end_date= (SELECT DATE_SUB(input_date, INTERVAL 15 DAY));
CALL `{user_segmentation_dimensions_procedure_name}`(input_date, end_date, rows_added);"
CALL `{user_segmentation_dimensions_procedure_name}`();"
query_user_session_event_aggregated_metrics: "
DECLARE input_date DATE;
DECLARE end_date DATE;
DECLARE rows_added INT64 DEFAULT NULL;
SET input_date= IFNULL(@input_date, CURRENT_DATE('{date_timezone}'));
SET end_date= (SELECT DATE_SUB(input_date, INTERVAL 30 DAY));
CALL `{user_session_event_aggregated_metrics_procedure_name}`(input_date, end_date, rows_added);"
CALL `{user_session_event_aggregated_metrics_procedure_name}`();"

query_purchase_propensity_inference_preparation: "
CALL `{purchase_propensity_inference_preparation_procedure_name}`();"
Expand All @@ -190,20 +130,20 @@ vertex_ai:
query_parameters:
- { name: "input_date", type: "DATE", value: None } # If value is not defined then assume current_date()
#INT64
timeout: 600.0
timeout: 1800.0
pipeline_parameters_substitutions: # Substitutions are applied to the parameters before compilation
customer_lifetime_value_label_procedure_name: "${project_id}.feature_store.customer_lifetime_value_label"
purchase_propensity_label_procedure_name: "${project_id}.feature_store.purchase_propensity_label"
user_dimensions_procedure_name: "${project_id}.feature_store.user_dimensions"
user_lifetime_dimensions_procedure_name: "${project_id}.feature_store.user_lifetime_dimensions"
user_lookback_metrics_procedure_name: "${project_id}.feature_store.user_lookback_metrics"
user_rolling_window_lifetime_metrics_procedure_name: "${project_id}.feature_store.user_rolling_window_lifetime_metrics"
user_rolling_window_metrics_procedure_name: "${project_id}.feature_store.user_rolling_window_metrics"
user_scoped_lifetime_metrics_procedure_name: "${project_id}.feature_store.user_scoped_lifetime_metrics"
user_scoped_metrics_procedure_name: "${project_id}.feature_store.user_scoped_metrics"
user_scoped_segmentation_metrics_procedure_name: "${project_id}.feature_store.user_scoped_segmentation_metrics"
user_segmentation_dimensions_procedure_name: "${project_id}.feature_store.user_segmentation_dimensions"
user_session_event_aggregated_metrics_procedure_name: "${project_id}.feature_store.user_session_event_aggregated_metrics"
customer_lifetime_value_label_procedure_name: "${project_id}.feature_store.invoke_customer_lifetime_value_label"
purchase_propensity_label_procedure_name: "${project_id}.feature_store.invoke_purchase_propensity_label"
user_dimensions_procedure_name: "${project_id}.feature_store.invoke_user_dimensions"
user_lifetime_dimensions_procedure_name: "${project_id}.feature_store.invoke_user_lifetime_dimensions"
user_lookback_metrics_procedure_name: "${project_id}.feature_store.invoke_user_lookback_metrics"
user_rolling_window_lifetime_metrics_procedure_name: "${project_id}.feature_store.invoke_user_rolling_window_lifetime_metrics"
user_rolling_window_metrics_procedure_name: "${project_id}.feature_store.invoke_user_rolling_window_metrics"
user_scoped_lifetime_metrics_procedure_name: "${project_id}.feature_store.invoke_user_scoped_lifetime_metrics"
user_scoped_metrics_procedure_name: "${project_id}.feature_store.invoke_user_scoped_metrics"
user_scoped_segmentation_metrics_procedure_name: "${project_id}.feature_store.invoke_user_scoped_segmentation_metrics"
user_segmentation_dimensions_procedure_name: "${project_id}.feature_store.invoke_user_segmentation_dimensions"
user_session_event_aggregated_metrics_procedure_name: "${project_id}.feature_store.invoke_user_session_event_aggregated_metrics"
date_timezone: "UTC" # used when input_date is None and need to get current date.

purchase_propensity_inference_preparation_procedure_name: "${project_id}.purchase_propensity.invoke_purchase_propensity_inference_preparation"
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ docker = "^6.0.1"
ma-components = {path = "python/base_component_image/", develop = true}
google-cloud-pubsub = "2.15.0"
google-analytics-admin = "0.17.0"
google-analytics-data = "^0.17.1"

[tool.poetry.group.component_vertex.dependencies]
google-cloud-aiplatform = "1.22.0"
Expand Down
2 changes: 1 addition & 1 deletion python/pipelines/components/bigquery/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def bq_stored_procedure_exec(
location: str,
query: str,
query_parameters: Optional[list],
timeout: Optional[float] = None
timeout: Optional[float] = 1800
) -> None:

from google.cloud import bigquery
Expand Down
2 changes: 1 addition & 1 deletion python/pipelines/feature_engineering_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def pipeline(
query_audience_segmentation_training_preparation: str,

query_parameters: Optional[list] = None,
timeout: Optional[float] = 600.0
timeout: Optional[float] = 1800.0
):

phase_1 = list()
Expand Down

0 comments on commit 2909706

Please sign in to comment.