Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

COST-1201: Enable Trino on a per source basis #2745

Merged
merged 7 commits into from
Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ services:
- DEMO_ACCOUNTS
- ACCOUNT_ENHANCED_METRICS=${ACCOUNT_ENHANCED_METRICS-False}
- RUN_GUNICORN=${RUN_GUNICORN}
- ENABLE_TRINO_SOURCES
privileged: true
ports:
- 8000:8000
Expand Down Expand Up @@ -99,6 +100,8 @@ services:
- SOURCES_API_PORT=${SOURCES_API_PORT-3000}
- MASU_DATE_OVERRIDE
- MASU_DEBUG
- ENABLE_TRINO_SOURCES

privileged: true
ports:
- 5000:8000
Expand Down Expand Up @@ -274,6 +277,7 @@ services:
- PRESTO_PORT=${PRESTO_PORT-8080}
- DATE_OVERRIDE
- MASU_DEBUG
- ENABLE_TRINO_SOURCES

privileged: true
volumes:
Expand Down Expand Up @@ -336,6 +340,8 @@ services:
- S3_SECRET
- PRESTO_HOST=${PRESTO_HOST-presto}
- PRESTO_PORT=${PRESTO_PORT-8080}
- ENABLE_TRINO_SOURCES

privileged: true
ports:
- "9988:9999"
Expand Down
3 changes: 2 additions & 1 deletion koku/api/provider/provider_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from api.provider.models import Sources
from api.utils import DateHelper
from cost_models.models import CostModelMap
from masu.processor import enable_trino_processing
from masu.processor.tasks import refresh_materialized_views
from reporting.provider.aws.models import AWSCostEntryBill
from reporting.provider.azure.models import AzureCostEntryBill
Expand Down Expand Up @@ -258,7 +259,7 @@ def provider_post_delete_callback(*args, **kwargs):
customer.date_updated = DateHelper().now_utc
customer.save()

if settings.ENABLE_S3_ARCHIVING or settings.ENABLE_PARQUET_PROCESSING:
if settings.ENABLE_S3_ARCHIVING or enable_trino_processing(provider.uuid):
# Local import of task function to avoid potential import cycle.
from masu.celery.tasks import delete_archived_data

Expand Down
1 change: 1 addition & 0 deletions koku/koku/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@
S3_SECRET = ENVIRONMENT.get_value("S3_SECRET", default=None)
ENABLE_S3_ARCHIVING = ENVIRONMENT.bool("ENABLE_S3_ARCHIVING", default=False)
ENABLE_PARQUET_PROCESSING = ENVIRONMENT.bool("ENABLE_PARQUET_PROCESSING", default=False)
ENABLE_TRINO_SOURCES = ENVIRONMENT.list("ENABLE_TRINO_SOURCES", default=[])

# Presto Settings
PRESTO_HOST = ENVIRONMENT.get_value("PRESTO_HOST", default=None)
Expand Down
3 changes: 2 additions & 1 deletion koku/masu/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from masu.database.report_manifest_db_accessor import ReportManifestDBAccessor
from masu.external.accounts.hierarchy.aws.aws_org_unit_crawler import AWSOrgUnitCrawler
from masu.external.date_accessor import DateAccessor
from masu.processor import enable_trino_processing
from masu.processor.orchestrator import Orchestrator
from masu.processor.tasks import autovacuum_tune_schema
from masu.processor.tasks import vacuum_schema
Expand Down Expand Up @@ -130,7 +131,7 @@ def delete_archived_data(schema_name, provider_type, provider_uuid):
messages.append(message)
raise TypeError("delete_archived_data() %s", ", ".join(messages))

if not (settings.ENABLE_S3_ARCHIVING or settings.ENABLE_PARQUET_PROCESSING):
if not (settings.ENABLE_S3_ARCHIVING or enable_trino_processing(provider_uuid)):
LOG.info("Skipping delete_archived_data. Upload feature is disabled.")
return
else:
Expand Down
3 changes: 2 additions & 1 deletion koku/masu/external/downloader/gcp/gcp_report_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from masu.external.date_accessor import DateAccessor
from masu.external.downloader.downloader_interface import DownloaderInterface
from masu.external.downloader.report_downloader_base import ReportDownloaderBase
from masu.processor import enable_trino_processing
from masu.util.aws.common import copy_local_report_file_to_s3_bucket
from masu.util.common import date_range_pair
from masu.util.common import get_path_prefix
Expand Down Expand Up @@ -91,7 +92,7 @@ def create_daily_archives(request_id, account, provider_uuid, filename, filepath
context (Dict): Logging context dictionary
"""
daily_file_names = []
if settings.ENABLE_S3_ARCHIVING or settings.ENABLE_PARQUET_PROCESSING:
if settings.ENABLE_S3_ARCHIVING or enable_trino_processing(provider_uuid):
daily_files = divide_csv_daily(filepath)
for daily_file in daily_files:
# Push to S3
Expand Down
3 changes: 2 additions & 1 deletion koku/masu/external/downloader/ocp/ocp_report_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from masu.external import UNCOMPRESSED
from masu.external.downloader.downloader_interface import DownloaderInterface
from masu.external.downloader.report_downloader_base import ReportDownloaderBase
from masu.processor import enable_trino_processing
from masu.util.aws.common import copy_local_report_file_to_s3_bucket
from masu.util.common import get_path_prefix
from masu.util.ocp import common as utils
Expand Down Expand Up @@ -86,7 +87,7 @@ def create_daily_archives(request_id, account, provider_uuid, filename, filepath
context (Dict): Logging context dictionary
"""
daily_file_names = []
if settings.ENABLE_S3_ARCHIVING or settings.ENABLE_PARQUET_PROCESSING:
if settings.ENABLE_S3_ARCHIVING or enable_trino_processing(provider_uuid):
daily_files = divide_csv_daily(filepath, filename)
for daily_file in daily_files:
# Push to S3
Expand Down
9 changes: 9 additions & 0 deletions koku/masu/processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,18 @@
"""Masu Processor."""
import logging

from django.conf import settings

from masu.external import GZIP_COMPRESSED
from masu.external import UNCOMPRESSED

LOG = logging.getLogger(__name__)

ALLOWED_COMPRESSIONS = (UNCOMPRESSED, GZIP_COMPRESSED)


def enable_trino_processing(source_uuid):
"""Helper to determine if source is enabled for Trino."""
if settings.ENABLE_PARQUET_PROCESSING or source_uuid in settings.ENABLE_TRINO_SOURCES:
return True
return False
5 changes: 3 additions & 2 deletions koku/masu/processor/parquet/parquet_report_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from api.provider.models import Provider
from masu.config import Config
from masu.database.provider_db_accessor import ProviderDBAccessor
from masu.processor import enable_trino_processing
from masu.processor.aws.aws_report_parquet_processor import AWSReportParquetProcessor
from masu.processor.azure.azure_report_parquet_processor import AzureReportParquetProcessor
from masu.processor.gcp.gcp_report_parquet_processor import GCPReportParquetProcessor
Expand Down Expand Up @@ -95,7 +96,7 @@ def convert_to_parquet( # noqa: C901
if not context:
context = {"account": account, "provider_uuid": provider_uuid}

if not settings.ENABLE_PARQUET_PROCESSING:
if not enable_trino_processing(provider_uuid):
msg = "Skipping convert_to_parquet. Parquet processing is disabled."
LOG.info(log_json(request_id, msg, context))
return
Expand Down Expand Up @@ -200,7 +201,7 @@ def get_file_keys_from_s3_with_manifest_id(self, request_id, s3_path, manifest_i
"""
Get all files in a given prefix that match the given manifest_id.
"""
if not settings.ENABLE_PARQUET_PROCESSING:
if not enable_trino_processing(context.get("provider_uuid")):
return []

keys = []
Expand Down
4 changes: 2 additions & 2 deletions koku/masu/processor/report_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
"""Report processor external interface."""
import logging

from django.conf import settings
from django.db import InterfaceError as DjangoInterfaceError
from django.db import OperationalError
from psycopg2 import InterfaceError

from api.models import Provider
from masu.processor import enable_trino_processing
from masu.processor.aws.aws_report_processor import AWSReportProcessor
from masu.processor.azure.azure_report_processor import AzureReportProcessor
from masu.processor.gcp.gcp_report_processor import GCPReportProcessor
Expand Down Expand Up @@ -75,7 +75,7 @@ def _set_processor(self):
(Object) : Provider-specific report processor

"""
if settings.ENABLE_PARQUET_PROCESSING:
if enable_trino_processing(self.provider_uuid):
return ParquetReportProcessor(
schema_name=self.schema_name,
report_path=self.report_path,
Expand Down
23 changes: 16 additions & 7 deletions koku/masu/processor/report_summary_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
import datetime
import logging

from django.conf import settings

from api.models import Provider
from koku.cache import invalidate_view_cache_for_tenant_and_source_type
from masu.database.provider_db_accessor import ProviderDBAccessor
from masu.database.report_manifest_db_accessor import ReportManifestDBAccessor
from masu.external.date_accessor import DateAccessor
from masu.processor import enable_trino_processing
from masu.processor.aws.aws_report_parquet_summary_updater import AWSReportParquetSummaryUpdater
from masu.processor.aws.aws_report_summary_updater import AWSReportSummaryUpdater
from masu.processor.azure.azure_report_parquet_summary_updater import AzureReportParquetSummaryUpdater
Expand Down Expand Up @@ -94,25 +93,35 @@ def _set_updater(self):
"""
if self._provider.type in (Provider.PROVIDER_AWS, Provider.PROVIDER_AWS_LOCAL):
report_summary_updater = (
AWSReportParquetSummaryUpdater if settings.ENABLE_PARQUET_PROCESSING else AWSReportSummaryUpdater
AWSReportParquetSummaryUpdater
if enable_trino_processing(self._provider_uuid)
else AWSReportSummaryUpdater
)
elif self._provider.type in (Provider.PROVIDER_AZURE, Provider.PROVIDER_AZURE_LOCAL):
report_summary_updater = (
AzureReportParquetSummaryUpdater if settings.ENABLE_PARQUET_PROCESSING else AzureReportSummaryUpdater
AzureReportParquetSummaryUpdater
if enable_trino_processing(self._provider_uuid)
else AzureReportSummaryUpdater
)
elif self._provider.type in (Provider.PROVIDER_OCP,):
report_summary_updater = (
OCPReportParquetSummaryUpdater if settings.ENABLE_PARQUET_PROCESSING else OCPReportSummaryUpdater
OCPReportParquetSummaryUpdater
if enable_trino_processing(self._provider_uuid)
else OCPReportSummaryUpdater
)
elif self._provider.type in (Provider.PROVIDER_GCP, Provider.PROVIDER_GCP_LOCAL):
report_summary_updater = (
GCPReportParquetSummaryUpdater if settings.ENABLE_PARQUET_PROCESSING else GCPReportSummaryUpdater
GCPReportParquetSummaryUpdater
if enable_trino_processing(self._provider_uuid)
else GCPReportSummaryUpdater
)
else:
return (None, None)

ocp_cloud_updater = (
OCPCloudParquetReportSummaryUpdater if settings.ENABLE_PARQUET_PROCESSING else OCPCloudReportSummaryUpdater
OCPCloudParquetReportSummaryUpdater
if enable_trino_processing(self._provider_uuid)
else OCPCloudReportSummaryUpdater
)

LOG.info(f"Set report_summary_updater = {report_summary_updater.__name__}")
Expand Down
4 changes: 2 additions & 2 deletions koku/masu/processor/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from celery import chain
from celery.utils.log import get_task_logger
from dateutil import parser
from django.conf import settings
from django.db import connection
from django.db.utils import IntegrityError
from tenant_schemas.utils import schema_context
Expand All @@ -46,6 +45,7 @@
from masu.external.accounts_accessor import AccountsAccessor
from masu.external.accounts_accessor import AccountsAccessorError
from masu.external.date_accessor import DateAccessor
from masu.processor import enable_trino_processing
from masu.processor._tasks.download import _get_report_files
from masu.processor._tasks.process import _process_report_file
from masu.processor._tasks.remove_expired import _remove_expired_data
Expand Down Expand Up @@ -342,7 +342,7 @@ def update_summary_tables(
).apply_async(queue=queue_name or REFRESH_MATERIALIZED_VIEWS_QUEUE)
return

if settings.ENABLE_PARQUET_PROCESSING and provider in (
if enable_trino_processing(provider_uuid) and provider in (
Provider.PROVIDER_AWS,
Provider.PROVIDER_AWS_LOCAL,
Provider.PROVIDER_AZURE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def test_convert_to_parquet(self):
"missing required argument: provider_uuid",
]
with self.assertLogs("masu.processor.parquet.parquet_report_processor", level="INFO") as logger:
with patch("masu.processor.parquet.parquet_report_processor.settings", ENABLE_S3_ARCHIVING=True):
with patch("masu.processor.parquet.parquet_report_processor.enable_trino_processing", return_value=True):
self.report_processor.convert_to_parquet(None, None, None, None, "start_date", "manifest_id", [])
for expected in expected_logs:
self.assertIn(expected, " ".join(logger.output))
Expand All @@ -93,30 +93,30 @@ def test_convert_to_parquet(self):
self.assertIn(expected, " ".join(logger.output))

expected = "Parquet processing is enabled, but no start_date was given for processing."
with patch("masu.processor.parquet.parquet_report_processor.settings", ENABLE_S3_ARCHIVING=True):
with patch("masu.processor.parquet.parquet_report_processor.enable_trino_processing", return_value=True):
with self.assertLogs("masu.processor.parquet.parquet_report_processor", level="INFO") as logger:
self.report_processor.convert_to_parquet(
"request_id", "account", "provider_uuid", "provider_type", None, "manifest_id", "csv_file"
)
self.assertIn(expected, " ".join(logger.output))

expected = "Parquet processing is enabled, but the start_date was not a valid date string ISO 8601 format."
with patch("masu.processor.parquet.parquet_report_processor.settings", ENABLE_S3_ARCHIVING=True):
with patch("masu.processor.parquet.parquet_report_processor.enable_trino_processing", return_value=True):
with self.assertLogs("masu.processor.parquet.parquet_report_processor", level="INFO") as logger:
self.report_processor.convert_to_parquet(
"request_id", "account", "provider_uuid", "provider_type", "bad_date", "manifest_id", "csv_file"
)
self.assertIn(expected, " ".join(logger.output))

expected = "Could not establish report type"
with patch("masu.processor.parquet.parquet_report_processor.settings", ENABLE_S3_ARCHIVING=True):
with patch("masu.processor.parquet.parquet_report_processor.enable_trino_processing", return_value=True):
with self.assertLogs("masu.processor.parquet.parquet_report_processor", level="INFO") as logger:
self.report_processor.convert_to_parquet(
"request_id", "account", "provider_uuid", "OCP", "2020-01-01T12:00:00", "manifest_id", "csv_file"
)
self.assertIn(expected, " ".join(logger.output))

with patch("masu.processor.parquet.parquet_report_processor.settings", ENABLE_S3_ARCHIVING=True):
with patch("masu.processor.parquet.parquet_report_processor.enable_trino_processing", return_value=True):
with patch("masu.processor.parquet.parquet_report_processor.get_path_prefix"):
with patch(
(
Expand All @@ -143,7 +143,7 @@ def test_convert_to_parquet(self):
)

expected = "Failed to convert the following files to parquet"
with patch("masu.processor.parquet.parquet_report_processor.settings", ENABLE_S3_ARCHIVING=True):
with patch("masu.processor.parquet.parquet_report_processor.enable_trino_processing", return_value=True):
with patch("masu.processor.parquet.parquet_report_processor.get_path_prefix"):
with patch(
(
Expand All @@ -170,7 +170,7 @@ def test_convert_to_parquet(self):
)
self.assertIn(expected, " ".join(logger.output))

with patch("masu.processor.parquet.parquet_report_processor.settings", ENABLE_S3_ARCHIVING=True):
with patch("masu.processor.parquet.parquet_report_processor.enable_trino_processing", return_value=True):
with patch("masu.processor.parquet.parquet_report_processor.get_path_prefix"):
with patch(
(
Expand All @@ -192,7 +192,7 @@ def test_convert_to_parquet(self):
"csv_file",
)

with patch("masu.processor.parquet.parquet_report_processor.settings", ENABLE_S3_ARCHIVING=True):
with patch("masu.processor.parquet.parquet_report_processor.enable_trino_processing", return_value=True):
with patch("masu.processor.parquet.parquet_report_processor.get_path_prefix"):
with patch(
(
Expand Down
5 changes: 3 additions & 2 deletions koku/masu/util/aws/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from api.models import Provider
from masu.database.aws_report_db_accessor import AWSReportDBAccessor
from masu.database.provider_db_accessor import ProviderDBAccessor
from masu.processor import enable_trino_processing
from masu.util import common as utils
from reporting.provider.aws.models import PRESTO_REQUIRED_COLUMNS

Expand Down Expand Up @@ -290,7 +291,7 @@ def copy_data_to_s3_bucket(request_id, path, filename, data, manifest_id=None, c
"""
Copies data to s3 bucket file
"""
if not (settings.ENABLE_S3_ARCHIVING or settings.ENABLE_PARQUET_PROCESSING):
if not (settings.ENABLE_S3_ARCHIVING or enable_trino_processing(context.get("provider_uuid"))):
return None

upload = None
Expand All @@ -315,7 +316,7 @@ def copy_local_report_file_to_s3_bucket(
"""
Copies local report file to s3 bucket
"""
if s3_path and (settings.ENABLE_S3_ARCHIVING or settings.ENABLE_PARQUET_PROCESSING):
if s3_path and (settings.ENABLE_S3_ARCHIVING or enable_trino_processing(context.get("provider_uuid"))):
LOG.info(f"copy_local_report_file_to_s3_bucket: {s3_path} {full_file_path}")
with open(full_file_path, "rb") as fin:
data = BytesIO(fin.read())
Expand Down
Loading