Skip to content

Commit

Permalink
Patches for Tabular Workflow Training and Inference pipeline (GoogleC…
Browse files Browse the repository at this point in the history
…loudPlatform#51)

* fix on the table schema location, tables lifecycle and pipeline bucket lifecycle

* fixing location for the tabular workflow bq jobs

* optimizing scalability of training and inference preparation sp

---------

Co-authored-by: Carlos Timoteo <[email protected]>
  • Loading branch information
chmstimoteo and Carlos Timoteo authored Oct 11, 2023
1 parent a506012 commit 9d8e9a0
Show file tree
Hide file tree
Showing 8 changed files with 1,146 additions and 270 deletions.
65 changes: 39 additions & 26 deletions python/pipelines/components/bigquery/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def bq_stored_procedure_exec(

query_job = client.query(
query=query,
location=location,
job_config=job_config)

query_job.result(timeout=timeout)
Expand Down Expand Up @@ -129,7 +130,9 @@ def bq_clustering_exec(
)

query_job = client.query(
query=query)
query=query,
location=location
)

r = query_job.result()

Expand Down Expand Up @@ -161,7 +164,9 @@ def bq_evaluate(
)

query_job = client.query(
query=query)
query=query,
location=location
)

r = query_job.result()
r = list(r)
Expand Down Expand Up @@ -262,7 +267,10 @@ def list(cls):
query = f"""
SELECT * FROM ML.EVALUATE(MODEL `{model_bq_name}`)
"""
query_job = client.query(query=query)
query_job = client.query(
query=query,
location=location
)

r = list(query_job.result())[0]

Expand Down Expand Up @@ -316,15 +324,19 @@ def bq_clustering_predictions(
destination_table.metadata["table_id"] = f"{bigquery_destination_prefix}_{timestamp}"
model_uri = f"{model.metadata['projectId']}.{model.metadata['datasetId']}.{model.metadata['modelId']}"

client = bigquery.Client(project=project_id, location=location)
client = bigquery.Client(
project=project_id,
location=location
)

query = f"""
SELECT * FROM ML.PREDICT(MODEL `{model_uri}`,
TABLE `{bigquery_source}`)
"""

query_job = client.query(
query,
query=query,
location=location,
job_config=bigquery.QueryJobConfig(
destination=destination_table.metadata["table_id"])
)
Expand Down Expand Up @@ -368,8 +380,8 @@ def bq_flatten_tabular_binary_prediction_table(

# View table properties
logging.info(
"Got table '{}.{}.{}'.".format(
bq_table.project, bq_table.dataset_id, bq_table.table_id)
"Got table '{}.{}.{} located at {}'.".format(
bq_table.project, bq_table.dataset_id, bq_table.table_id, bq_table.location)
)

predictions_column = None
Expand Down Expand Up @@ -400,15 +412,15 @@ def bq_flatten_tabular_binary_prediction_table(

job_config = bigquery.QueryJobConfig()
job_config.write_disposition = 'WRITE_TRUNCATE'
"""
# Make an API request to create the view.
view = bigquery.Table(f"{table.metadata['table_id']}_view")
view.view_query = query
view = client.create_table(table = view)
logging.info(f"Created {view.table_type}: {str(view.reference)}")
"""

# Reconstruct a BigQuery client object.
client = bigquery.Client(
project=project_id,
location=bq_table.location
)
query_job = client.query(
query
query=query,
location=bq_table.location
)

results = query_job.result()
Expand Down Expand Up @@ -451,8 +463,8 @@ def bq_flatten_tabular_regression_table(

# View table properties
logging.info(
"Got table '{}.{}.{}'.".format(
bq_table.project, bq_table.dataset_id, bq_table.table_id)
"Got table '{}.{}.{} located at {}'.".format(
bq_table.project, bq_table.dataset_id, bq_table.table_id, bq_table.location)
)

predictions_column = None
Expand All @@ -473,15 +485,15 @@ def bq_flatten_tabular_regression_table(
"""
job_config = bigquery.QueryJobConfig()
job_config.write_disposition = 'WRITE_TRUNCATE'
"""
# Make an API request to create the view.
view = bigquery.Table(f"{table.metadata['table_id']}_view")
view.view_query = query
view = client.create_table(table = view)
logging.info(f"Created {view.table_type}: {str(view.reference)}")
"""

# Reconstruct a BigQuery client object.
client = bigquery.Client(
project=project_id,
location=bq_table.location
)
query_job = client.query(
query
query=query,
location=bq_table.location,
)

results = query_job.result()
Expand Down Expand Up @@ -548,7 +560,8 @@ def bq_flatten_kmeans_prediction_table(
logging.info(f"Created {view.table_type}: {str(view.reference)}")
"""
query_job = client.query(
query
query=query,
location=location
)

results = query_job.result()
Expand Down
8 changes: 3 additions & 5 deletions python/pipelines/tabular_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def prediction_binary_classification_pl(
location=location,
source_table=bigquery_source,
predictions_table=predictions.outputs['destination_table'],
bq_unique_key = bq_unique_key,
bq_unique_key=bq_unique_key,
threashold=threashold,
positive_label=positive_label
)
Expand All @@ -96,8 +96,6 @@ def prediction_binary_classification_pl(
)




@dsl.pipeline()
def prediction_regression_pl(
project_id: str,
Expand All @@ -114,7 +112,7 @@ def prediction_regression_pl(
bigquery_source: str,
bigquery_destination_prefix: str,
bq_unique_key: str,

job_name_prefix: str,
machine_type: str = "n1-standard-4",
max_replica_count: int = 10,
Expand Down Expand Up @@ -151,7 +149,7 @@ def prediction_regression_pl(
location=location,
source_table=bigquery_source,
predictions_table=predictions.outputs['destination_table'],
bq_unique_key = bq_unique_key
bq_unique_key=bq_unique_key
)

send_pubsub_activation_msg(
Expand Down
141 changes: 124 additions & 17 deletions sql/procedure/audience_segmentation_inference_preparation.sqlx
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,18 @@
-- limitations under the License.
-- Setting procedure to lookback from the day before `inference_date`

SET inference_date = DATE_SUB(inference_date, INTERVAL 1 DAY);
DECLARE lastest_processed_time_ud TIMESTAMP;
DECLARE lastest_processed_time_uwm TIMESTAMP;
DECLARE lastest_processed_time_um TIMESTAMP;

CREATE TEMP TABLE inference_preparation AS (
-- Setting procedure to lookback from the day before `inference_date`
SET inference_date = DATE_SUB(inference_date, INTERVAL 1 DAY);

SET lastest_processed_time_ud = (SELECT MAX(processed_timestamp) FROM `{{feature_store_project_id}}.{{feature_store_dataset}}.user_segmentation_dimensions` WHERE feature_date = inference_date LIMIT 1);
SET lastest_processed_time_uwm = (SELECT MAX(processed_timestamp) FROM `{{feature_store_project_id}}.{{feature_store_dataset}}.user_lookback_metrics` WHERE feature_date = inference_date LIMIT 1);
SET lastest_processed_time_um = (SELECT MAX(processed_timestamp) FROM `{{feature_store_project_id}}.{{feature_store_dataset}}.user_scoped_segmentation_metrics` WHERE feature_date = inference_date LIMIT 1);

CREATE OR REPLACE TEMP TABLE inference_preparation_ud as (
SELECT DISTINCT
UD.user_pseudo_id,
MAX(UD.user_id) OVER(user_segmentation_dimensions_window) AS user_id,
Expand Down Expand Up @@ -43,7 +52,21 @@ CREATE TEMP TABLE inference_preparation AS (
MAX(UD.first_traffic_source_medium) OVER(user_segmentation_dimensions_window) AS first_traffic_source_medium,
MAX(UD.first_traffic_source_name) OVER(user_segmentation_dimensions_window) AS first_traffic_source_name,
MAX(UD.first_traffic_source_source) OVER(user_segmentation_dimensions_window) AS first_traffic_source_source,
MAX(UD.has_signed_in_with_user_id) OVER(user_segmentation_dimensions_window) AS has_signed_in_with_user_id,
MAX(UD.has_signed_in_with_user_id) OVER(user_segmentation_dimensions_window) AS has_signed_in_with_user_id
FROM
`{{feature_store_project_id}}.{{feature_store_dataset}}.user_segmentation_dimensions` UD
WHERE
-- Define the training+validation subset interval
UD.feature_date = inference_date
AND UD.processed_timestamp = lastest_processed_time_ud
WINDOW
user_segmentation_dimensions_window AS (PARTITION BY UD.user_pseudo_id ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
);

CREATE TEMP TABLE inference_preparation_uwm AS (
SELECT DISTINCT
UWM.user_pseudo_id,
UWM.feature_date,
MAX(UWM.active_users_past_1_7_day) OVER(user_lookback_metrics_window) AS active_users_past_1_7_day,
MAX(UWM.active_users_past_8_14_day) OVER(user_lookback_metrics_window) AS active_users_past_8_14_day,
MAX(UWM.purchases_past_1_7_day) OVER(user_lookback_metrics_window) AS purchases_past_1_7_day,
Expand All @@ -57,7 +80,22 @@ CREATE TEMP TABLE inference_preparation AS (
MAX(UWM.checkouts_past_1_7_day) OVER(user_lookback_metrics_window) AS checkouts_past_1_7_day,
MAX(UWM.checkouts_past_8_14_day) OVER(user_lookback_metrics_window) AS checkouts_past_8_14_day,
MAX(UWM.ltv_revenue_past_1_7_day) OVER(user_lookback_metrics_window) AS ltv_revenue_past_1_7_day,
MAX(UWM.ltv_revenue_past_7_15_day) OVER(user_lookback_metrics_window) AS ltv_revenue_past_7_15_day,
MAX(UWM.ltv_revenue_past_7_15_day) OVER(user_lookback_metrics_window) AS ltv_revenue_past_7_15_day
FROM
`{{feature_store_project_id}}.{{feature_store_dataset}}.user_lookback_metrics` UWM
WHERE
-- Define the training+validation subset interval
UWM.feature_date = inference_date
AND UWM.processed_timestamp = lastest_processed_time_uwm
WINDOW
user_lookback_metrics_window AS (PARTITION BY UWM.user_pseudo_id ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
);



CREATE TEMP TABLE inference_preparation_um AS (
SELECT DISTINCT
UM.feature_date,
MAX(UM.purchasers_users) OVER(user_scoped_segmentation_metrics_window) AS purchasers_users,
MAX(UM.average_daily_purchasers) OVER(user_scoped_segmentation_metrics_window) AS average_daily_purchasers,
MAX(UM.active_users) OVER(user_scoped_segmentation_metrics_window) AS active_users,
Expand All @@ -84,26 +122,95 @@ CREATE TEMP TABLE inference_preparation AS (
MAX(UM.avg_user_conversion_rate) OVER(user_scoped_segmentation_metrics_window) AS avg_user_conversion_rate,
MAX(UM.avg_session_conversion_rate) OVER(user_scoped_segmentation_metrics_window) AS avg_session_conversion_rate
FROM
`{{feature_store_project_id}}.{{feature_store_dataset}}.user_segmentation_dimensions` UD
`{{feature_store_project_id}}.{{feature_store_dataset}}.user_scoped_segmentation_metrics` UM
WHERE
-- Define the training+validation subset interval
UM.feature_date = inference_date
AND UM.processed_timestamp = lastest_processed_time_um
WINDOW
user_scoped_segmentation_metrics_window AS (PARTITION BY UM.feature_date ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
);


CREATE TEMP TABLE inference_preparation AS (
SELECT DISTINCT
UD.user_pseudo_id,
UD.user_id,
UD.feature_date,
UD.month_of_the_year,
UD.week_of_the_year,
UD.day_of_the_month,
UD.day_of_week,
UD.device_category,
UD.device_mobile_brand_name,
UD.device_mobile_model_name,
UD.device_os,
UD.device_os_version,
UD.device_language,
UD.device_web_browser,
UD.device_web_browser_version,
UD.geo_sub_continent,
UD.geo_country,
UD.geo_region,
UD.geo_city,
UD.geo_metro,
UD.last_traffic_source_medium,
UD.last_traffic_source_name,
UD.last_traffic_source_source,
UD.first_traffic_source_medium,
UD.first_traffic_source_name,
UD.first_traffic_source_source,
UD.has_signed_in_with_user_id,
UWM.active_users_past_1_7_day,
UWM.active_users_past_8_14_day,
UWM.purchases_past_1_7_day,
UWM.purchases_past_8_14_day,
UWM.visits_past_1_7_day,
UWM.visits_past_8_14_day,
UWM.view_items_past_1_7_day,
UWM.view_items_past_8_14_day,
UWM.add_to_carts_past_1_7_day,
UWM.add_to_carts_past_8_14_day,
UWM.checkouts_past_1_7_day,
UWM.checkouts_past_8_14_day,
UWM.ltv_revenue_past_1_7_day,
UWM.ltv_revenue_past_7_15_day,
UM.purchasers_users,
UM.average_daily_purchasers,
UM.active_users,
UM.DAU,
UM.MAU,
UM.WAU,
UM.dau_per_mau,
UM.dau_per_wau,
UM.wau_per_mau,
UM.users_engagement_duration_seconds,
UM.average_engagement_time,
UM.average_engagement_time_per_session,
UM.average_sessions_per_user,
UM.ARPPU,
UM.ARPU,
UM.average_daily_revenue,
UM.max_daily_revenue,
UM.min_daily_revenue,
UM.new_users,
UM.returning_users,
UM.first_time_purchasers,
UM.first_time_purchaser_conversion,
UM.first_time_purchasers_per_new_user,
UM.avg_user_conversion_rate,
UM.avg_session_conversion_rate
FROM
inference_preparation_ud UD
INNER JOIN
`{{feature_store_project_id}}.{{feature_store_dataset}}.user_lookback_metrics` UWM
inference_preparation_uwm UWM
ON
UWM.user_pseudo_id = UD.user_pseudo_id
AND UWM.feature_date = UD.feature_date
INNER JOIN
`{{feature_store_project_id}}.{{feature_store_dataset}}.user_scoped_segmentation_metrics` UM
inference_preparation_um UM
ON
UM.feature_date = UD.feature_date
WHERE
-- Define the training+validation subset interval
UD.feature_date = inference_date
AND UD.processed_timestamp = (SELECT MAX(processed_timestamp) FROM `{{feature_store_project_id}}.{{feature_store_dataset}}.user_segmentation_dimensions` WHERE feature_date = inference_date LIMIT 1)
AND UWM.processed_timestamp = (SELECT MAX(processed_timestamp) FROM `{{feature_store_project_id}}.{{feature_store_dataset}}.user_lookback_metrics` WHERE feature_date = inference_date LIMIT 1)
AND UM.processed_timestamp = (SELECT MAX(processed_timestamp) FROM `{{feature_store_project_id}}.{{feature_store_dataset}}.user_scoped_segmentation_metrics` WHERE feature_date = inference_date LIMIT 1)
WINDOW
user_segmentation_dimensions_window AS (PARTITION BY UD.user_pseudo_id, UD.feature_date ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING),
user_lookback_metrics_window AS (PARTITION BY UWM.user_pseudo_id, UWM.feature_date ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING),
user_scoped_segmentation_metrics_window AS (PARTITION BY UM.feature_date ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
);

DELETE FROM `{{project_id}}.{{dataset}}.{{insert_table}}` WHERE TRUE;
Expand Down
Loading

0 comments on commit 9d8e9a0

Please sign in to comment.