Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[COST-4620] - Add basic provider data validation #5218

Merged
merged 18 commits into from
Aug 2, 2024
Merged
18 changes: 18 additions & 0 deletions koku/api/migrations/0064_provider_data_valid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.2.11 on 2024-07-16 18:55
from django.db import migrations
from django.db import models


class Migration(migrations.Migration):

dependencies = [
("api", "0063_remove_infra_map_account_region"),
]

operations = [
migrations.AddField(
model_name="provider",
name="data_valid",
field=models.BooleanField(default=None, null=True),
),
]
2 changes: 2 additions & 0 deletions koku/api/provider/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ class Meta:
customer = models.ForeignKey("Customer", null=True, on_delete=models.PROTECT)
created_by = models.ForeignKey("User", null=True, on_delete=models.SET_NULL)
setup_complete = models.BooleanField(default=False)
# Used for data validation checks
data_valid = models.BooleanField(null=True, default=None)
lcouzens marked this conversation as resolved.
Show resolved Hide resolved

created_timestamp = models.DateTimeField(auto_now_add=True, blank=True, null=True)
polling_timestamp = models.DateTimeField(blank=True, null=True, default=None)
Expand Down
2 changes: 2 additions & 0 deletions koku/masu/api/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from masu.api.views import update_cost_model_costs
from masu.api.views import update_exchange_rates
from masu.api.views import update_openshift_on_cloud
from masu.api.views import validate_cost_data

ROUTER = DefaultRouter()
ROUTER.register(r"sources", SourcesViewSet, basename="sources")
Expand Down Expand Up @@ -75,6 +76,7 @@
path("clear_celery_queues/", clear_celery_queues, name="clear_celery_queues"),
path("bigquery_cost/<uuid:source_uuid>/", bigquery_cost, name="bigquery_cost"),
path("purge_trino_files/", purge_trino_files, name="purge_trino_files"),
path("validate_cost_data/", validate_cost_data, name="validate_cost_data"),
path("db-performance", db_performance_redirect, name="db_perf_no_slash_redirect"),
path("db-performance/", db_performance_redirect, name="db_perf_slash_redirect"),
path("db-performance/db-settings/", dbsettings, name="db_settings"),
Expand Down
85 changes: 85 additions & 0 deletions koku/masu/api/validate_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#
# Copyright 2024 Red Hat Inc.
# SPDX-License-Identifier: Apache-2.0
#
"""View for data validation endpoint."""
import logging

from django.views.decorators.cache import never_cache
from rest_framework import status
from rest_framework.decorators import api_view
from rest_framework.decorators import permission_classes
from rest_framework.decorators import renderer_classes
from rest_framework.permissions import AllowAny
from rest_framework.response import Response
from rest_framework.settings import api_settings

from api.models import Provider
from common.queues import get_customer_queue
from common.queues import PriorityQueue
from common.queues import QUEUE_LIST
from masu.processor.tasks import validate_daily_data

LOG = logging.getLogger(__name__)
REPORT_DATA_KEY = "Report Data Task IDs"


@never_cache
@api_view(http_method_names=["GET"])
@permission_classes((AllowAny,))
@renderer_classes(tuple(api_settings.DEFAULT_RENDERER_CLASSES))
def validate_cost_data(request): # noqa: C901
"""Masu endpoint to trigger cost validation for a provider"""
if request.method == "GET":
async_results = []
params = request.query_params
async_result = None
provider_uuid = params.get("provider_uuid")
ocp_on_cloud_type = params.get("ocp_on_cloud_type", None)
start_date = params.get("start_date")
end_date = params.get("end_date")
if start_date is None:
errmsg = "start_date is a required parameter."
return Response({"Error": errmsg}, status=status.HTTP_400_BAD_REQUEST)
if end_date is None:
errmsg = "end_date is a required parameter."
return Response({"Error": errmsg}, status=status.HTTP_400_BAD_REQUEST)
if provider_uuid is None:
errmsg = "provider_uuid must be supplied as a parameter."
return Response({"Error": errmsg}, status=status.HTTP_400_BAD_REQUEST)

try:
provider = Provider.objects.get(uuid=provider_uuid)
provider_schema = provider.account.get("schema_name")
except Provider.DoesNotExist:
errmsg = f"provider_uuid {provider_uuid} does not exist."
return Response({"Error": errmsg}, status=status.HTTP_400_BAD_REQUEST)
provider_schema = provider.account.get("schema_name")

if ocp_on_cloud_type:
if provider.type != Provider.PROVIDER_OCP:
errmsg = "ocp_on_cloud_type must by used with an ocp provider."
return Response({"Error": errmsg}, status=status.HTTP_400_BAD_REQUEST)
supported_types = [Provider.PROVIDER_AWS, Provider.PROVIDER_AZURE, Provider.PROVIDER_GCP]
if ocp_on_cloud_type not in supported_types:
errmsg = f"ocp on cloud type must match: {supported_types}"
return Response({"Error": errmsg}, status=status.HTTP_400_BAD_REQUEST)

fallback_queue = get_customer_queue(provider_schema, PriorityQueue)
queue_name = params.get("queue") or fallback_queue

if queue_name not in QUEUE_LIST:
errmsg = f"'queue' must be one of {QUEUE_LIST}."
return Response({"Error": errmsg}, status=status.HTTP_400_BAD_REQUEST)
context = {"tracing_id": "running provider validation via masu"}

Check warning on line 74 in koku/masu/api/validate_data.py

View check run for this annotation

Codecov / codecov/patch

koku/masu/api/validate_data.py#L74

Added line #L74 was not covered by tests

async_result = validate_daily_data.s(

Check warning on line 76 in koku/masu/api/validate_data.py

View check run for this annotation

Codecov / codecov/patch

koku/masu/api/validate_data.py#L76

Added line #L76 was not covered by tests
provider_schema,
start_date,
end_date,
provider_uuid,
ocp_on_cloud_type,
context=context,
).apply_async(queue=queue_name)
async_results.append(str(async_result))
return Response({REPORT_DATA_KEY: async_results})

Check warning on line 85 in koku/masu/api/validate_data.py

View check run for this annotation

Codecov / codecov/patch

koku/masu/api/validate_data.py#L84-L85

Added lines #L84 - L85 were not covered by tests
1 change: 1 addition & 0 deletions koku/masu/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@
from masu.api.update_openshift_on_cloud import update_openshift_on_cloud
from masu.api.update_rates import update_azure_storage_capacity
from masu.api.update_rates import update_exchange_rates
from masu.api.validate_data import validate_cost_data
5 changes: 5 additions & 0 deletions koku/masu/database/report_db_accessor_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,24 @@
"""Run a SQL statement via a cursor."""
lcouzens marked this conversation as resolved.
Show resolved Hide resolved
LOG.info(log_json(msg=f"triggering {operation}", table=table))
row_count = None
result = None
with connection.cursor() as cursor:
cursor.db.set_schema(self.schema)
t1 = time.time()
try:
cursor.execute(sql, params=bind_params)
row_count = cursor.rowcount
if operation == "VALIDATION_QUERY":
result = cursor.fetchall()

Check warning on line 108 in koku/masu/database/report_db_accessor_base.py

View check run for this annotation

Codecov / codecov/patch

koku/masu/database/report_db_accessor_base.py#L108

Added line #L108 was not covered by tests

except OperationalError as exc:
db_exc = get_extended_exception_by_type(exc)
LOG.warning(log_json(os.getpid(), msg=str(db_exc), context=db_exc.as_dict()))
raise db_exc from exc

running_time = time.time() - t1
LOG.info(log_json(msg=f"finished {operation}", row_count=row_count, table=table, running_time=running_time))
return result
lcouzens marked this conversation as resolved.
Show resolved Hide resolved

def _execute_trino_raw_sql_query(self, sql, *, sql_params=None, context=None, log_ref=None, attempts_left=0):
"""Execute a single trino query returning only the fetchall results"""
Expand Down
222 changes: 222 additions & 0 deletions koku/masu/processor/_tasks/data_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
#
# Copyright 2024 Red Hat Inc.
# SPDX-License-Identifier: Apache-2.0
#
import logging
from datetime import datetime

from django.conf import settings

from api.common import log_json
from api.provider.models import Provider
from api.utils import DateHelper
from masu.database import AWS_CUR_TABLE_MAP
from masu.database import AZURE_REPORT_TABLE_MAP
from masu.database import GCP_REPORT_TABLE_MAP
from masu.database import OCI_CUR_TABLE_MAP
from masu.database import OCP_REPORT_TABLE_MAP
from masu.database.report_db_accessor_base import ReportDBAccessorBase
from masu.util.common import date_range_pair
from reporting.provider.aws.models import TRINO_LINE_ITEM_DAILY_TABLE as AWS_TRINO_LINE_ITEM_DAILY_TABLE
from reporting.provider.azure.models import TRINO_LINE_ITEM_DAILY_TABLE as AZURE_TRINO_LINE_ITEM_DAILY_TABLE
from reporting.provider.gcp.models import TRINO_LINE_ITEM_DAILY_TABLE as GCP_TRINO_LINE_ITEM_DAILY_TABLE
from reporting.provider.oci.models import TRINO_LINE_ITEM_DAILY_TABLE_MAP as OCI_TRINO_LINE_ITEM_DAILY_TABLE

LOG = logging.getLogger(__name__)

# Filter maps
TRINO_FILTER_MAP = {
Provider.PROVIDER_AWS: {"date": "lineitem_usagestartdate", "metric": "lineitem_unblendedcost"},
Provider.PROVIDER_AZURE: {"date": "date", "metric": "coalesce(nullif(costinbillingcurrency, 0), pretaxcost)"},
Provider.PROVIDER_GCP: {"date": "usage_start_time", "metric": "cost"},
Provider.PROVIDER_OCI: {"date": "lineitem_intervalusagestart", "metric": "cost_mycost"},
Provider.PROVIDER_OCP: {
"date": "usage_start",
"metric": "pod_effective_usage_cpu_core_hours",
},
"OCPAWS": {"date": "usage_start", "metric": "unblended_cost"},
"OCPAzure": {"date": "usage_start", "metric": "pretax_cost"},
"OCPGCP": {"date": "usage_start", "metric": "unblended_cost"},
}
PG_FILTER_MAP = {
Provider.PROVIDER_AWS: {
"date": "usage_start",
"metric": "unblended_cost",
},
Provider.PROVIDER_AZURE: {"date": "usage_start", "metric": "pretax_cost"},
Provider.PROVIDER_GCP: {
"date": "usage_start",
"metric": "unblended_cost",
},
Provider.PROVIDER_OCI: {"date": "usage_start", "metric": "cost"},
Provider.PROVIDER_OCP: {
"date": "usage_start",
"metric": "pod_effective_usage_cpu_core_hours",
},
"OCPAWS": {"date": "usage_start", "metric": "unblended_cost"},
"OCPAzure": {"date": "usage_start", "metric": "pretax_cost"},
"OCPGCP": {"date": "usage_start", "metric": "unblended_cost"},
}
lcouzens marked this conversation as resolved.
Show resolved Hide resolved

# Table maps
PG_TABLE_MAP = {
Provider.PROVIDER_AWS: AWS_CUR_TABLE_MAP.get("line_item_daily_summary"),
Provider.PROVIDER_AZURE: AZURE_REPORT_TABLE_MAP.get("line_item_daily_summary"),
Provider.PROVIDER_GCP: GCP_REPORT_TABLE_MAP.get("line_item_daily_summary"),
Provider.PROVIDER_OCI: OCI_CUR_TABLE_MAP.get("line_item_daily_summary"),
Provider.PROVIDER_OCP: OCP_REPORT_TABLE_MAP.get("line_item_daily_summary"),
"OCPAWS": AWS_CUR_TABLE_MAP.get("ocp_on_aws_project_daily_summary"),
"OCPAzure": AZURE_REPORT_TABLE_MAP.get("ocp_on_azure_project_daily_summary"),
"OCPGCP": GCP_REPORT_TABLE_MAP.get("ocp_on_gcp_project_daily_summary"),
}

TRINO_TABLE_MAP = {
Provider.PROVIDER_AWS: AWS_TRINO_LINE_ITEM_DAILY_TABLE,
Provider.PROVIDER_AZURE: AZURE_TRINO_LINE_ITEM_DAILY_TABLE,
Provider.PROVIDER_GCP: GCP_TRINO_LINE_ITEM_DAILY_TABLE,
Provider.PROVIDER_OCI: OCI_TRINO_LINE_ITEM_DAILY_TABLE.get("cost"),
Provider.PROVIDER_OCP: "reporting_ocpusagelineitem_daily_summary",
"OCPAWS": "reporting_ocpawscostlineitem_project_daily_summary",
"OCPAzure": "reporting_ocpazurecostlineitem_project_daily_summary",
"OCPGCP": "reporting_ocpgcpcostlineitem_project_daily_summary",
}


class DataValidator:
"""Class to check data is valid for providers"""

def __init__(
self,
schema,
start_date,
end_date,
provider_uuid,
ocp_on_cloud_type,
context,
date_step=settings.TRINO_DATE_STEP,
):
self.schema = schema
self.provider_uuid = provider_uuid
self.ocp_on_cloud_type = ocp_on_cloud_type
self.start_date = start_date
self.end_date = end_date
self.context = context
self.date_step = date_step

def get_table_filters_for_provider(self, provider_type, trino=False):
"""Get relevant table and query filters for given provider type"""
table_map = provider_type
if self.ocp_on_cloud_type:
table_map = f"OCP{provider_type}"
table = PG_TABLE_MAP.get(table_map)
query_filters = PG_FILTER_MAP.get(table_map)
if trino:
table = TRINO_TABLE_MAP.get(table_map)
query_filters = TRINO_FILTER_MAP.get(table_map)
return table, query_filters

def compare_data(self, pg_data, trino_data, tolerance=1):
"""Validate if postgres and trino query data cost matches per day"""
incomplete_days = {}
valid_cost = True
if trino_data == {}:
return incomplete_days, False
for date in trino_data:
if date in pg_data:
if not abs(pg_data[date] - trino_data[date]) <= tolerance:
incomplete_days[date] = {
"pg_value": pg_data[date],
"trino_value": trino_data[date],
"delta": trino_data[date] - pg_data[date],
}
valid_cost = False
else:
incomplete_days[date] = "missing daily data"
valid_cost = False
return incomplete_days, valid_cost

def execute_relevant_query(self, provider_type, cluster_id=None, trino=False):
"""Make relevant postgres or Trino queries"""
daily_result = {}
# year and month for running partitioned queries
dh = DateHelper()
year = dh.bill_year_from_date(self.start_date)
month = dh.bill_month_from_date(self.start_date)
report_db_accessor = ReportDBAccessorBase(self.schema)
# Set provider filter, when running ocp{aws/gcp/azure} checks we need to rely on the cluster id
provider_filter = self.provider_uuid if not self.ocp_on_cloud_type else cluster_id
# query trino/postgres
table, query_filters = self.get_table_filters_for_provider(provider_type, trino)
for start, end in date_range_pair(self.start_date, self.end_date, step=self.date_step):
lcouzens marked this conversation as resolved.
Show resolved Hide resolved
if trino:
source = "source" if not self.ocp_on_cloud_type else "cluster_id"
sql = f"""
SELECT sum({query_filters.get("metric")}) as metric, {query_filters.get("date")} as date
FROM hive.{self.schema}.{table}
WHERE {source} = '{provider_filter}'
AND {query_filters.get("date")} >= date('{start}')
AND {query_filters.get("date")} <= date('{end}')
AND year = '{year}'
AND lpad(month, 2, '0') = '{month}'
GROUP BY {query_filters.get("date")}
ORDER BY {query_filters.get("date")}"""
result = report_db_accessor._execute_trino_raw_sql_query(sql, log_ref="data validation query")
else:
source = "source_uuid" if not self.ocp_on_cloud_type else "cluster_id"
sql = f"""
SELECT sum({query_filters.get("metric")}) as metric, {query_filters.get("date")} as date
FROM {self.schema}.{table}_{year}_{month}
WHERE {source} = '{provider_filter}'
AND {query_filters.get("date")} >= '{start}'
AND {query_filters.get("date")} <= '{end}'
GROUP BY {query_filters.get("date")}
ORDER BY {query_filters.get("date")}"""
result = report_db_accessor._prepare_and_execute_raw_sql_query(
table, sql, operation="VALIDATION_QUERY"
)
if result != []:
for day in result:
key = day[1].date() if isinstance(day[1], datetime) else day[1]
daily_result[key] = float(day[0])
return daily_result

def check_data_integrity(self):
"""Helper to call the query and validation methods for validating data"""
valid_cost = False
pg_data = None
trino_data = None
cluster_id = None
daily_difference = {}
LOG.info(log_json(msg="validation started for provider", context=self.context))
provider = Provider.objects.filter(uuid=self.provider_uuid).first()
provider_type = provider.type.strip("-local")
if self.ocp_on_cloud_type:
provider_type = self.ocp_on_cloud_type.strip("-local")
cluster_id = provider.authentication.credentials.get("cluster_id")

Check warning on line 195 in koku/masu/processor/_tasks/data_validation.py

View check run for this annotation

Codecov / codecov/patch

koku/masu/processor/_tasks/data_validation.py#L194-L195

Added lines #L194 - L195 were not covered by tests
# Postgres query to get daily values
try:
pg_data = self.execute_relevant_query(provider_type, cluster_id)
except Exception as e:
LOG.warning(log_json(msg=f"data validation postgres query failed: {e}", context=self.context))
return
# Trino query to get daily values
try:
trino_data = self.execute_relevant_query(provider_type, cluster_id, True)
except Exception as e:
LOG.warning(log_json(msg=f"data validation trino query failed: {e}", context=self.context))
return
# Compare results
LOG.debug(f"PG: {pg_data} Trino data: {trino_data}")
daily_difference, valid_cost = self.compare_data(pg_data, trino_data)
if valid_cost:
LOG.info(log_json(msg=f"all data complete for provider: {self.provider_uuid}", context=self.context))
provider.data_valid = True
else:
LOG.warning(

Check warning on line 215 in koku/masu/processor/_tasks/data_validation.py

View check run for this annotation

Codecov / codecov/patch

koku/masu/processor/_tasks/data_validation.py#L215

Added line #L215 was not covered by tests
lcouzens marked this conversation as resolved.
Show resolved Hide resolved
lcouzens marked this conversation as resolved.
Show resolved Hide resolved
log_json(
msg=f"provider has incomplete data for specified days: {daily_difference}", context=self.context
)
)
provider.data_valid = False

Check warning on line 220 in koku/masu/processor/_tasks/data_validation.py

View check run for this annotation

Codecov / codecov/patch

koku/masu/processor/_tasks/data_validation.py#L220

Added line #L220 was not covered by tests
# update provider object with validation state
provider.save(update_fields=["data_valid"])
Loading
Loading