diff --git a/koku/api/report/test/utils.py b/koku/api/report/test/utils.py index cb30985d10..518f613313 100644 --- a/koku/api/report/test/utils.py +++ b/koku/api/report/test/utils.py @@ -144,7 +144,13 @@ def load_openshift_data(self, customer, static_data_file, cluster_id): self.process_report(report, "PLAIN", provider_type, provider, manifest) with patch("masu.processor.tasks.chain"): update_summary_tables( - self.schema, provider_type, provider.uuid, start_date, end_date, manifest_id=manifest.id + self.schema, + provider_type, + provider.uuid, + start_date, + end_date, + manifest_id=manifest.id, + synchronous=True, ) update_cost_model_costs.s( self.schema, provider.uuid, self.dh.last_month_start, self.dh.today, synchronous=True @@ -213,7 +219,13 @@ def load_aws_data(self, customer, static_data_file, account_id=None, role_arn=No self.process_report(report, "GZIP", provider_type, provider, manifest) with patch("masu.processor.tasks.chain"), patch.object(settings, "AUTO_DATA_INGEST", False): update_summary_tables( - self.schema, provider_type, provider.uuid, start_date, end_date, manifest_id=manifest.id + self.schema, + provider_type, + provider.uuid, + start_date, + end_date, + manifest_id=manifest.id, + synchronous=True, ) update_cost_model_costs.s( self.schema, provider.uuid, self.dh.last_month_start, self.dh.today, synchronous=True @@ -274,7 +286,13 @@ def load_azure_data(self, customer, static_data_file, credentials=None, data_sou self.process_report(report, "PLAIN", provider_type, provider, manifest) with patch("masu.processor.tasks.chain"), patch.object(settings, "AUTO_DATA_INGEST", False): update_summary_tables( - self.schema, provider_type, provider.uuid, start_date, end_date, manifest_id=manifest.id + self.schema, + provider_type, + provider.uuid, + start_date, + end_date, + manifest_id=manifest.id, + synchronous=True, ) update_cost_model_costs.s( self.schema, provider.uuid, self.dh.last_month_start, self.dh.today, synchronous=True @@ -319,7 +337,13 @@ def load_gcp_data(self, customer, static_data_file): self.process_report(report, "PLAIN", provider_type, provider, manifest) with patch("masu.processor.tasks.chain"), patch.object(settings, "AUTO_DATA_INGEST", False): update_summary_tables( - self.schema, provider_type, provider.uuid, start_date, end_date, manifest_id=manifest.id + self.schema, + provider_type, + provider.uuid, + start_date, + end_date, + manifest_id=manifest.id, + synchronous=True, ) update_cost_model_costs.s( self.schema, provider.uuid, self.dh.last_month_start, self.dh.today, synchronous=True diff --git a/koku/masu/processor/tasks.py b/koku/masu/processor/tasks.py index 00c690f758..e84696e12b 100644 --- a/koku/masu/processor/tasks.py +++ b/koku/masu/processor/tasks.py @@ -18,7 +18,6 @@ import datetime import json import os -import time from decimal import Decimal from decimal import InvalidOperation @@ -304,7 +303,14 @@ def summarize_reports(reports_to_summarize, queue_name=None): @app.task(name="masu.processor.tasks.update_summary_tables", queue_name="reporting") def update_summary_tables( - schema_name, provider, provider_uuid, start_date, end_date=None, manifest_id=None, queue_name=None + schema_name, + provider, + provider_uuid, + start_date, + end_date=None, + manifest_id=None, + queue_name=None, + synchronous=False, ): """Populate the summary tables for reporting. @@ -321,6 +327,19 @@ def update_summary_tables( """ worker_stats.REPORT_SUMMARY_ATTEMPTS_COUNTER.labels(provider_type=provider).inc() + task_name = "masu.processor.tasks.update_summary_tables" + cache_args = [schema_name] + + if not synchronous: + worker_cache = WorkerCache() + if worker_cache.single_task_is_running(task_name, cache_args): + msg = f"Task {task_name} already running for {cache_args}. Requeuing." + LOG.info(msg) + update_summary_tables.delay( + schema_name, provider, provider_uuid, start_date, end_date=end_date, manifest_id=manifest_id + ) + return + worker_cache.lock_single_task(task_name, cache_args, timeout=3600) stmt = ( f"update_summary_tables called with args:\n" @@ -394,6 +413,8 @@ def update_summary_tables( ).set(queue=queue_name or REMOVE_EXPIRED_DATA_QUEUE) chain(linked_tasks).apply_async() + if not synchronous: + worker_cache.release_single_task(task_name, cache_args) @app.task(name="masu.processor.tasks.update_all_summary_tables", queue_name="reporting") @@ -451,9 +472,19 @@ def update_cost_model_costs( cache_args = [schema_name, provider_uuid, start_date, end_date] if not synchronous: worker_cache = WorkerCache() - while worker_cache.single_task_is_running(task_name, cache_args): - time.sleep(5) - worker_cache.lock_single_task(task_name, cache_args, timeout=300) + if worker_cache.single_task_is_running(task_name, cache_args): + msg = f"Task {task_name} already running for {cache_args}. Requeuing." + LOG.info(msg) + update_cost_model_costs.delay( + schema_name, + provider_uuid, + start_date=start_date, + end_date=end_date, + provider_type=provider_uuid, + synchronous=synchronous, + ) + return + worker_cache.lock_single_task(task_name, cache_args, timeout=600) worker_stats.COST_MODEL_COST_UPDATE_ATTEMPTS_COUNTER.inc() @@ -481,10 +512,18 @@ def refresh_materialized_views(schema_name, provider_type, manifest_id=None, pro cache_args = [schema_name] if not synchronous: worker_cache = WorkerCache() - while worker_cache.single_task_is_running(task_name, cache_args): - time.sleep(5) - - worker_cache.lock_single_task(task_name, cache_args) + if worker_cache.single_task_is_running(task_name, cache_args): + msg = f"Task {task_name} already running for {cache_args}. Requeuing." + LOG.info(msg) + refresh_materialized_views.delay( + schema_name, + provider_type, + manifest_id=manifest_id, + provider_uuid=provider_uuid, + synchronous=synchronous + ) + return + worker_cache.lock_single_task(task_name, cache_args, timeout=600) materialized_views = () if provider_type in (Provider.PROVIDER_AWS, Provider.PROVIDER_AWS_LOCAL): materialized_views = ( diff --git a/koku/masu/test/processor/test_tasks.py b/koku/masu/test/processor/test_tasks.py index ee0ed15425..6bbef3274c 100644 --- a/koku/masu/test/processor/test_tasks.py +++ b/koku/masu/test/processor/test_tasks.py @@ -566,7 +566,7 @@ def test_update_summary_tables_aws(self, mock_charge_info, mock_views, mock_chai self.assertEqual(initial_daily_count, 0) self.assertEqual(initial_summary_count, 0) - update_summary_tables(self.schema, provider, provider_aws_uuid, start_date) + update_summary_tables(self.schema, provider, provider_aws_uuid, start_date, synchronous=True) with schema_context(self.schema): self.assertNotEqual(daily_query.count(), initial_daily_count) @@ -606,7 +606,7 @@ def test_update_summary_tables_aws_end_date(self, mock_charge_info): expected_end_date = min(end_date, ce_end_date) expected_end_date = expected_end_date.replace(hour=0, minute=0, second=0, microsecond=0) - update_summary_tables(self.schema, provider, provider_aws_uuid, start_date, end_date) + update_summary_tables(self.schema, provider, provider_aws_uuid, start_date, end_date, synchronous=True) with schema_context(self.schema): daily_entry = daily_table.objects.all().aggregate(Min("usage_start"), Max("usage_end")) @@ -661,7 +661,7 @@ def test_update_summary_tables_ocp( initial_daily_count = daily_query.count() self.assertEqual(initial_daily_count, 0) - update_summary_tables(self.schema, provider, provider_ocp_uuid, start_date, end_date) + update_summary_tables(self.schema, provider, provider_ocp_uuid, start_date, end_date, synchronous=True) with schema_context(self.schema): self.assertNotEqual(daily_query.count(), initial_daily_count) @@ -737,7 +737,7 @@ def test_update_summary_tables_ocp_end_date(self, mock_cpu_rate, mock_mem_rate, expected_start_date = max(start_date, ce_start_date) expected_end_date = min(end_date, ce_end_date) - update_summary_tables(self.schema, provider, provider_ocp_uuid, start_date, end_date) + update_summary_tables(self.schema, provider, provider_ocp_uuid, start_date, end_date, synchronous=True) with schema_context(self.schema): daily_entry = daily_table.objects.all().aggregate(Min("usage_start"), Max("usage_end")) result_start_date = daily_entry["usage_start__min"] @@ -757,7 +757,9 @@ def test_update_summary_tables_remove_expired_data(self, mock_accessor, mock_cha expected_end_date = end_date.strftime("%Y-%m-%d") manifest_id = 1 - update_summary_tables(self.schema, provider, provider_aws_uuid, start_date, end_date, manifest_id) + update_summary_tables( + self.schema, provider, provider_aws_uuid, start_date, end_date, manifest_id, synchronous=True + ) mock_chain.assert_called_once_with( update_cost_model_costs.s(self.schema, provider_aws_uuid, expected_start_date, expected_end_date).set( queue=UPDATE_COST_MODEL_COSTS_QUEUE @@ -902,91 +904,6 @@ def test_refresh_materialized_views_gcp(self, mock_cache): with ProviderDBAccessor(self.gcp_provider_uuid) as accessor: self.assertIsNotNone(accessor.provider.data_updated_timestamp) - @patch("masu.processor.tasks.WorkerCache.release_single_task") - @patch("masu.processor.tasks.WorkerCache.lock_single_task") - @patch("masu.processor.worker_cache.CELERY_INSPECT") - def test_update_cost_model_costs_throttled(self, mock_inspect, mock_lock, mock_release): - """Test that refresh materialized views runs with cache lock.""" - - def single_task_is_running(self, task_name, task_args=None): - """Check for a single task key in the cache.""" - cache = caches["worker"] - cache_str = create_single_task_cache_key(task_name, task_args) - return True if cache.get(cache_str) else False - - def lock_single_task(self, task_name, task_args=None, timeout=None): - """Add a cache entry for a single task to lock a specific task.""" - cache = caches["worker"] - cache_str = create_single_task_cache_key(task_name, task_args) - cache.add(cache_str, "true", 3) - - mock_lock.side_effect = lock_single_task - - start_date = DateHelper().last_month_start - relativedelta.relativedelta(months=1) - end_date = DateHelper().today - expected_start_date = start_date.strftime("%Y-%m-%d") - expected_end_date = end_date.strftime("%Y-%m-%d") - task_name = "masu.processor.tasks.update_cost_model_costs" - cache_args = [self.schema, self.aws_provider_uuid, expected_start_date, expected_end_date] - - manifest_dict = { - "assembly_id": "12345", - "billing_period_start_datetime": DateHelper().today, - "num_total_files": 2, - "provider_uuid": self.aws_provider_uuid, - } - - with ReportManifestDBAccessor() as manifest_accessor: - manifest = manifest_accessor.add(**manifest_dict) - manifest.save() - - update_cost_model_costs.s(self.schema, self.aws_provider_uuid, expected_start_date, expected_end_date).apply() - self.assertTrue(single_task_is_running(task_name, cache_args)) - # Let the cache entry expire - time.sleep(3) - self.assertFalse(single_task_is_running(task_name, cache_args)) - - @patch("masu.processor.tasks.WorkerCache.release_single_task") - @patch("masu.processor.tasks.WorkerCache.lock_single_task") - @patch("masu.processor.worker_cache.CELERY_INSPECT") - def test_refresh_materialized_views_throttled(self, mock_inspect, mock_lock, mock_release): - """Test that refresh materialized views runs with cache lock.""" - - def single_task_is_running(self, task_name, task_args=None): - """Check for a single task key in the cache.""" - cache = caches["worker"] - cache_str = create_single_task_cache_key(task_name, task_args) - return True if cache.get(cache_str) else False - - def lock_single_task(self, task_name, task_args=None, timeout=None): - """Add a cache entry for a single task to lock a specific task.""" - cache = caches["worker"] - cache_str = create_single_task_cache_key(task_name, task_args) - cache.add(cache_str, "true", 3) - - # mock_cache.return_value.single_task_is_running.side_effect = single_task_is_running - mock_lock.side_effect = lock_single_task - - task_name = "masu.processor.tasks.refresh_materialized_views" - cache_args = [self.schema] - - manifest_dict = { - "assembly_id": "12345", - "billing_period_start_datetime": DateHelper().today, - "num_total_files": 2, - "provider_uuid": self.aws_provider_uuid, - } - - with ReportManifestDBAccessor() as manifest_accessor: - manifest = manifest_accessor.add(**manifest_dict) - manifest.save() - - refresh_materialized_views.s(self.schema, Provider.PROVIDER_AWS, manifest_id=manifest.id).apply() - self.assertTrue(single_task_is_running(task_name, cache_args)) - # Let the cache entry expire - time.sleep(3) - self.assertFalse(single_task_is_running(task_name, cache_args)) - @patch("masu.processor.tasks.connection") def test_vacuum_schema(self, mock_conn): """Test that the vacuum schema task runs.""" @@ -1194,6 +1111,127 @@ def test_record_all_manifest_files_concurrent_writes(self): CostUsageReportStatus.objects.filter(report_name=report_file).exists() +class TestWorkerCacheThrottling(MasuTestCase): + """Tests for tasks that use the worker cache.""" + + def single_task_is_running(self, task_name, task_args=None): + """Check for a single task key in the cache.""" + cache = caches["worker"] + cache_str = create_single_task_cache_key(task_name, task_args) + return True if cache.get(cache_str) else False + + def lock_single_task(self, task_name, task_args=None, timeout=None): + """Add a cache entry for a single task to lock a specific task.""" + cache = caches["worker"] + cache_str = create_single_task_cache_key(task_name, task_args) + cache.add(cache_str, "true", 3) + + @patch("masu.processor.tasks.update_summary_tables.delay") + @patch("masu.processor.tasks.ReportSummaryUpdater.update_summary_tables") + @patch("masu.processor.tasks.ReportSummaryUpdater.update_daily_tables") + @patch("masu.processor.tasks.chain") + @patch("masu.processor.tasks.refresh_materialized_views") + @patch("masu.processor.tasks.update_cost_model_costs") + @patch("masu.processor.tasks.WorkerCache.release_single_task") + @patch("masu.processor.tasks.WorkerCache.lock_single_task") + @patch("masu.processor.worker_cache.CELERY_INSPECT") + def test_update_summary_tables_worker_throttled( + self, + mock_inspect, + mock_lock, + mock_release, + mock_update_cost, + mock_refresh, + mock_chain, + mock_daily, + mock_summary, + mock_delay, + ): + """Test that the worker cache is used.""" + task_name = "masu.processor.tasks.update_summary_tables" + cache_args = [self.schema] + mock_lock.side_effect = self.lock_single_task + + start_date = DateHelper().this_month_start + end_date = DateHelper().this_month_end + mock_daily.return_value = start_date, end_date + mock_summary.return_value = start_date, end_date + update_summary_tables(self.schema, Provider.PROVIDER_AWS, self.aws_provider_uuid, start_date, end_date) + mock_delay.assert_not_called() + update_summary_tables(self.schema, Provider.PROVIDER_AWS, self.aws_provider_uuid, start_date, end_date) + mock_delay.assert_called() + self.assertTrue(self.single_task_is_running(task_name, cache_args)) + # Let the cache entry expire + time.sleep(3) + self.assertFalse(self.single_task_is_running(task_name, cache_args)) + + @patch("masu.processor.tasks.update_cost_model_costs.delay") + @patch("masu.processor.tasks.WorkerCache.release_single_task") + @patch("masu.processor.tasks.WorkerCache.lock_single_task") + @patch("masu.processor.worker_cache.CELERY_INSPECT") + def test_update_cost_model_costs_throttled(self, mock_inspect, mock_lock, mock_release, mock_delay): + """Test that refresh materialized views runs with cache lock.""" + mock_lock.side_effect = self.lock_single_task + + start_date = DateHelper().last_month_start - relativedelta.relativedelta(months=1) + end_date = DateHelper().today + expected_start_date = start_date.strftime("%Y-%m-%d") + expected_end_date = end_date.strftime("%Y-%m-%d") + task_name = "masu.processor.tasks.update_cost_model_costs" + cache_args = [self.schema, self.aws_provider_uuid, expected_start_date, expected_end_date] + + manifest_dict = { + "assembly_id": "12345", + "billing_period_start_datetime": DateHelper().today, + "num_total_files": 2, + "provider_uuid": self.aws_provider_uuid, + } + + with ReportManifestDBAccessor() as manifest_accessor: + manifest = manifest_accessor.add(**manifest_dict) + manifest.save() + + update_cost_model_costs(self.schema, self.aws_provider_uuid, expected_start_date, expected_end_date) + mock_delay.assert_not_called() + update_cost_model_costs(self.schema, self.aws_provider_uuid, expected_start_date, expected_end_date) + mock_delay.assert_called() + self.assertTrue(self.single_task_is_running(task_name, cache_args)) + # Let the cache entry expire + time.sleep(3) + self.assertFalse(self.single_task_is_running(task_name, cache_args)) + + @patch("masu.processor.tasks.refresh_materialized_views.delay") + @patch("masu.processor.tasks.WorkerCache.release_single_task") + @patch("masu.processor.tasks.WorkerCache.lock_single_task") + @patch("masu.processor.worker_cache.CELERY_INSPECT") + def test_refresh_materialized_views_throttled(self, mock_inspect, mock_lock, mock_release, mock_delay): + """Test that refresh materialized views runs with cache lock.""" + mock_lock.side_effect = self.lock_single_task + + task_name = "masu.processor.tasks.refresh_materialized_views" + cache_args = [self.schema] + + manifest_dict = { + "assembly_id": "12345", + "billing_period_start_datetime": DateHelper().today, + "num_total_files": 2, + "provider_uuid": self.aws_provider_uuid, + } + + with ReportManifestDBAccessor() as manifest_accessor: + manifest = manifest_accessor.add(**manifest_dict) + manifest.save() + + refresh_materialized_views(self.schema, Provider.PROVIDER_AWS, manifest_id=manifest.id) + mock_delay.assert_not_called() + refresh_materialized_views(self.schema, Provider.PROVIDER_AWS, manifest_id=manifest.id) + mock_delay.assert_called() + self.assertTrue(self.single_task_is_running(task_name, cache_args)) + # Let the cache entry expire + time.sleep(3) + self.assertFalse(self.single_task_is_running(task_name, cache_args)) + + class TestRemoveStaleTenants(MasuTestCase): def setUp(self): """Set up middleware tests."""