Skip to content

Commit

Permalink
[COST-5657] subs: use subquery in Trino to join from Postgres (#5381)
Browse files Browse the repository at this point in the history
* join postgres table in trino sql
  • Loading branch information
maskarb authored Nov 14, 2024
1 parent 6eea791 commit 52a10c7
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 68 deletions.
74 changes: 26 additions & 48 deletions koku/subs/subs_data_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,24 +79,20 @@ def determine_latest_processed_time_for_provider(self, rid, year, month):

def determine_ids_for_provider(self, year, month):
"""Determine the relevant IDs to process data for this provider."""
sql_file = f"trino_sql/{self.provider_type.lower()}/determine_ids_for_provider.sql"
sql = pkgutil.get_data("subs", sql_file)
sql = sql.decode("utf-8")
sql_params = {
"schema": self.schema,
"source_uuid": self.provider_uuid,
"year": year,
"month": month,
}
ids = self._execute_trino_raw_sql_query(
sql, sql_params=sql_params, context=self.context, log_ref="subs_determine_ids_for_provider"
)

with schema_context(self.schema):
# get a list of IDs to exclude from this source processing
excluded_ids = list(
SubsIDMap.objects.exclude(source_uuid=self.provider_uuid).values_list("usage_id", flat=True)
)
sql_file = f"trino_sql/{self.provider_type.lower()}/determine_ids_for_provider.sql"
sql = pkgutil.get_data("subs", sql_file)
sql = sql.decode("utf-8")
sql_params = {
"schema": self.schema,
"source_uuid": self.provider_uuid,
"year": year,
"month": month,
"excluded_ids": excluded_ids,
}
ids = self._execute_trino_raw_sql_query(
sql, sql_params=sql_params, context=self.context, log_ref="subs_determine_ids_for_provider"
)
id_list = []
bulk_maps = []
for id in ids:
Expand All @@ -115,37 +111,19 @@ def determine_row_count(self, sql_params):

def get_resource_ids_for_usage_account(self, usage_account, year, month):
"""Determine the relevant resource ids and end time to process to for each resource id."""
with schema_context(self.schema):
# get a list of IDs to exclude from this source processing
excluded_ids = list(
SubsLastProcessed.objects.exclude(source_uuid=self.provider_uuid).values_list("resource_id", flat=True)
)
sql_file = f"trino_sql/{self.provider_type.lower()}/determine_resource_ids_for_usage_account.sql"
sql = pkgutil.get_data("subs", sql_file)
sql = sql.decode("utf-8")
sql_params = {
"schema": self.schema,
"source_uuid": self.provider_uuid,
"year": year,
"month": month,
"excluded_ids": excluded_ids,
"usage_account": usage_account,
}
LOG.info(
log_json(
self.tracing_id,
msg="get_resource_ids_for_usage_account: number of exclude_ids and text length",
context=self.context
| {
"excluded_ids_length": len(excluded_ids),
"excluded_ids_text_length": sum(len(x) for x in excluded_ids),
},
)
)
ids = self._execute_trino_raw_sql_query(
sql, sql_params=sql_params, context=self.context, log_ref="subs_determine_rids_for_provider"
)
return ids
sql_file = f"trino_sql/{self.provider_type.lower()}/determine_resource_ids_for_usage_account.sql"
sql = pkgutil.get_data("subs", sql_file)
sql = sql.decode("utf-8")
sql_params = {
"schema": self.schema,
"source_uuid": self.provider_uuid,
"year": year,
"month": month,
"usage_account": usage_account,
}
return self._execute_trino_raw_sql_query(
sql, sql_params=sql_params, context=self.context, log_ref="subs_determine_rids_for_provider"
)

def gather_and_upload_for_resource_batch(self, year, month, batch, base_filename):
"""Gather the data and upload it to S3 for a batch of resource ids"""
Expand Down
12 changes: 8 additions & 4 deletions koku/subs/trino_sql/aws/determine_ids_for_provider.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
SELECT DISTINCT lineitem_usageaccountid
SELECT
DISTINCT lineitem_usageaccountid
FROM hive.{{schema | sqlsafe}}.aws_line_items
WHERE source={{source_uuid}}
AND year={{year}}
AND month={{month}}
AND lineitem_productcode = 'AmazonEC2'
AND strpos(lower(resourcetags), 'com_redhat_rhel') > 0
{% if excluded_ids %}
AND lineitem_usageaccountid NOT IN {{excluded_ids | inclause}}
{% endif %}
AND lineitem_usageaccountid NOT IN (
SELECT
DISTINCT usage_id
FROM postgres.{{schema | sqlsafe}}.reporting_subs_id_map
WHERE source_uuid!=cast({{source_uuid}} as uuid)
)
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
SELECT lineitem_resourceid, max(lineitem_usagestartdate)
SELECT
lineitem_resourceid,
max(lineitem_usagestartdate)
FROM hive.{{schema | sqlsafe}}.aws_line_items
WHERE source={{source_uuid}}
AND year={{year}}
AND month={{month}}
AND lineitem_productcode = 'AmazonEC2'
AND strpos(lower(resourcetags), 'com_redhat_rhel') > 0
AND lineitem_usageaccountid = {{usage_account}}
{% if excluded_ids %}
AND lineitem_resourceid NOT IN {{excluded_ids | inclause}}
{% endif %}
AND year={{year}}
AND month={{month}}
AND lineitem_productcode = 'AmazonEC2'
AND strpos(lower(resourcetags), 'com_redhat_rhel') > 0
AND lineitem_usageaccountid = {{usage_account}}
AND lineitem_resourceid NOT IN (
SELECT
DISTINCT resource_id
FROM postgres.{{schema | sqlsafe}}.reporting_subs_last_processed_time
WHERE source_uuid != cast({{source_uuid}} as uuid)
AND year={{year}}
AND month={{month}}
)
GROUP BY lineitem_resourceid
12 changes: 8 additions & 4 deletions koku/subs/trino_sql/azure/determine_ids_for_provider.sql
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
SELECT DISTINCT COALESCE(NULLIF(subscriptionid, ''), subscriptionguid)
SELECT
DISTINCT COALESCE(NULLIF(subscriptionid, ''), subscriptionguid)
FROM hive.{{schema | sqlsafe}}.azure_line_items
WHERE source={{source_uuid}}
AND year={{year}}
AND month={{month}}
AND metercategory = 'Virtual Machines'
AND json_extract_scalar(lower(additionalinfo), '$.vcpus') IS NOT NULL
AND json_extract_scalar(lower(tags), '$.com_redhat_rhel') IS NOT NULL
{% if excluded_ids %}
AND COALESCE(NULLIF(subscriptionid, ''), subscriptionguid) NOT IN {{excluded_ids | inclause}}
{% endif %}
AND COALESCE(NULLIF(subscriptionid, ''), subscriptionguid) NOT IN (
SELECT
DISTINCT usage_id
FROM postgres.{{schema | sqlsafe}}.reporting_subs_id_map
WHERE source_uuid!=cast({{source_uuid}} as uuid)
)
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@ FROM (
AND json_extract_scalar(lower(additionalinfo), '$.vcpus') IS NOT NULL
AND json_extract_scalar(lower(tags), '$.com_redhat_rhel') IS NOT NULL
AND (subscriptionid = {{usage_account}} or subscriptionguid = {{usage_account}})
{% if excluded_ids %}
and resourceid NOT IN {{excluded_ids | inclause}}
{% endif %}
AND resourceid NOT IN (
SELECT
DISTINCT resource_id
FROM postgres.{{schema | sqlsafe}}.reporting_subs_last_processed_time
WHERE source_uuid != cast({{source_uuid}} as uuid)
AND year={{year}}
AND month={{month}}
)
GROUP BY resourceid, subscriptionguid, subscriptionid, resourcegroup, json_extract_scalar(lower(additionalinfo), '$.vmname')
)

0 comments on commit 52a10c7

Please sign in to comment.