Skip to content

Commit

Permalink
Merge branch 'master' into gcp_iqe_local_update
Browse files Browse the repository at this point in the history
  • Loading branch information
Douglas Curtis authored Mar 23, 2021
2 parents 019076f + 30bad14 commit cac5e64
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 103 deletions.
32 changes: 28 additions & 4 deletions koku/api/report/test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,13 @@ def load_openshift_data(self, customer, static_data_file, cluster_id):
self.process_report(report, "PLAIN", provider_type, provider, manifest)
with patch("masu.processor.tasks.chain"):
update_summary_tables(
self.schema, provider_type, provider.uuid, start_date, end_date, manifest_id=manifest.id
self.schema,
provider_type,
provider.uuid,
start_date,
end_date,
manifest_id=manifest.id,
synchronous=True,
)
update_cost_model_costs.s(
self.schema, provider.uuid, self.dh.last_month_start, self.dh.today, synchronous=True
Expand Down Expand Up @@ -213,7 +219,13 @@ def load_aws_data(self, customer, static_data_file, account_id=None, role_arn=No
self.process_report(report, "GZIP", provider_type, provider, manifest)
with patch("masu.processor.tasks.chain"), patch.object(settings, "AUTO_DATA_INGEST", False):
update_summary_tables(
self.schema, provider_type, provider.uuid, start_date, end_date, manifest_id=manifest.id
self.schema,
provider_type,
provider.uuid,
start_date,
end_date,
manifest_id=manifest.id,
synchronous=True,
)
update_cost_model_costs.s(
self.schema, provider.uuid, self.dh.last_month_start, self.dh.today, synchronous=True
Expand Down Expand Up @@ -274,7 +286,13 @@ def load_azure_data(self, customer, static_data_file, credentials=None, data_sou
self.process_report(report, "PLAIN", provider_type, provider, manifest)
with patch("masu.processor.tasks.chain"), patch.object(settings, "AUTO_DATA_INGEST", False):
update_summary_tables(
self.schema, provider_type, provider.uuid, start_date, end_date, manifest_id=manifest.id
self.schema,
provider_type,
provider.uuid,
start_date,
end_date,
manifest_id=manifest.id,
synchronous=True,
)
update_cost_model_costs.s(
self.schema, provider.uuid, self.dh.last_month_start, self.dh.today, synchronous=True
Expand Down Expand Up @@ -319,7 +337,13 @@ def load_gcp_data(self, customer, static_data_file):
self.process_report(report, "PLAIN", provider_type, provider, manifest)
with patch("masu.processor.tasks.chain"), patch.object(settings, "AUTO_DATA_INGEST", False):
update_summary_tables(
self.schema, provider_type, provider.uuid, start_date, end_date, manifest_id=manifest.id
self.schema,
provider_type,
provider.uuid,
start_date,
end_date,
manifest_id=manifest.id,
synchronous=True,
)
update_cost_model_costs.s(
self.schema, provider.uuid, self.dh.last_month_start, self.dh.today, synchronous=True
Expand Down
57 changes: 48 additions & 9 deletions koku/masu/processor/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import datetime
import json
import os
import time
from decimal import Decimal
from decimal import InvalidOperation

Expand Down Expand Up @@ -304,7 +303,14 @@ def summarize_reports(reports_to_summarize, queue_name=None):

@app.task(name="masu.processor.tasks.update_summary_tables", queue_name="reporting")
def update_summary_tables(
schema_name, provider, provider_uuid, start_date, end_date=None, manifest_id=None, queue_name=None
schema_name,
provider,
provider_uuid,
start_date,
end_date=None,
manifest_id=None,
queue_name=None,
synchronous=False,
):
"""Populate the summary tables for reporting.
Expand All @@ -321,6 +327,19 @@ def update_summary_tables(
"""
worker_stats.REPORT_SUMMARY_ATTEMPTS_COUNTER.labels(provider_type=provider).inc()
task_name = "masu.processor.tasks.update_summary_tables"
cache_args = [schema_name]

if not synchronous:
worker_cache = WorkerCache()
if worker_cache.single_task_is_running(task_name, cache_args):
msg = f"Task {task_name} already running for {cache_args}. Requeuing."
LOG.info(msg)
update_summary_tables.delay(
schema_name, provider, provider_uuid, start_date, end_date=end_date, manifest_id=manifest_id
)
return
worker_cache.lock_single_task(task_name, cache_args, timeout=3600)

stmt = (
f"update_summary_tables called with args:\n"
Expand Down Expand Up @@ -394,6 +413,8 @@ def update_summary_tables(
).set(queue=queue_name or REMOVE_EXPIRED_DATA_QUEUE)

chain(linked_tasks).apply_async()
if not synchronous:
worker_cache.release_single_task(task_name, cache_args)


@app.task(name="masu.processor.tasks.update_all_summary_tables", queue_name="reporting")
Expand Down Expand Up @@ -451,9 +472,19 @@ def update_cost_model_costs(
cache_args = [schema_name, provider_uuid, start_date, end_date]
if not synchronous:
worker_cache = WorkerCache()
while worker_cache.single_task_is_running(task_name, cache_args):
time.sleep(5)
worker_cache.lock_single_task(task_name, cache_args, timeout=300)
if worker_cache.single_task_is_running(task_name, cache_args):
msg = f"Task {task_name} already running for {cache_args}. Requeuing."
LOG.info(msg)
update_cost_model_costs.delay(
schema_name,
provider_uuid,
start_date=start_date,
end_date=end_date,
provider_type=provider_uuid,
synchronous=synchronous,
)
return
worker_cache.lock_single_task(task_name, cache_args, timeout=600)

worker_stats.COST_MODEL_COST_UPDATE_ATTEMPTS_COUNTER.inc()

Expand Down Expand Up @@ -481,10 +512,18 @@ def refresh_materialized_views(schema_name, provider_type, manifest_id=None, pro
cache_args = [schema_name]
if not synchronous:
worker_cache = WorkerCache()
while worker_cache.single_task_is_running(task_name, cache_args):
time.sleep(5)

worker_cache.lock_single_task(task_name, cache_args)
if worker_cache.single_task_is_running(task_name, cache_args):
msg = f"Task {task_name} already running for {cache_args}. Requeuing."
LOG.info(msg)
refresh_materialized_views.delay(
schema_name,
provider_type,
manifest_id=manifest_id,
provider_uuid=provider_uuid,
synchronous=synchronous
)
return
worker_cache.lock_single_task(task_name, cache_args, timeout=600)
materialized_views = ()
if provider_type in (Provider.PROVIDER_AWS, Provider.PROVIDER_AWS_LOCAL):
materialized_views = (
Expand Down
Loading

0 comments on commit cac5e64

Please sign in to comment.