diff --git a/deploy/clowdapp.yaml b/deploy/clowdapp.yaml index 2f8d9fff14..664128c24a 100644 --- a/deploy/clowdapp.yaml +++ b/deploy/clowdapp.yaml @@ -488,6 +488,8 @@ objects: value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT @@ -824,6 +826,8 @@ objects: value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT @@ -1319,6 +1323,8 @@ objects: value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT @@ -1499,6 +1505,8 @@ objects: value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT @@ -1685,6 +1693,8 @@ objects: value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT @@ -1871,6 +1881,8 @@ objects: value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT @@ -2059,6 +2071,8 @@ objects: value: ${PANDAS_COLUMN_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT @@ -2249,6 +2263,8 @@ objects: value: ${PANDAS_COLUMN_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT @@ -2439,6 +2455,8 @@ objects: value: ${PANDAS_COLUMN_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT @@ -2627,6 +2645,8 @@ objects: value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT @@ -2815,6 +2835,8 @@ objects: value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT @@ -3003,6 +3025,8 @@ objects: value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT @@ -3193,6 +3217,8 @@ objects: value: ${PANDAS_COLUMN_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT @@ -3387,6 +3413,8 @@ objects: value: ${PANDAS_COLUMN_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT @@ -3581,6 +3609,8 @@ objects: value: ${PANDAS_COLUMN_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT @@ -3773,6 +3803,8 @@ objects: value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT @@ -3959,6 +3991,8 @@ objects: value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT @@ -4145,6 +4179,8 @@ objects: value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT @@ -4331,6 +4367,8 @@ objects: value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT @@ -4521,6 +4559,8 @@ objects: value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT @@ -4711,6 +4751,8 @@ objects: value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT @@ -4903,6 +4945,8 @@ objects: value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT @@ -5089,6 +5133,8 @@ objects: value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT @@ -5263,6 +5309,8 @@ objects: value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT @@ -5767,6 +5815,11 @@ parameters: name: TRINO_DATE_STEP required: true value: "5" +- description: Number of days to expand the data validation + displayName: Data validation + name: VALIDATION_RANGE + required: true + value: "5" - description: Flag to use account enhanced prometheus metricsworker displayName: Account Enhanced Metrics name: ACCOUNT_ENHANCED_METRICS diff --git a/deploy/kustomize/base/base.yaml b/deploy/kustomize/base/base.yaml index 42cac4ea66..0e9d5a414c 100644 --- a/deploy/kustomize/base/base.yaml +++ b/deploy/kustomize/base/base.yaml @@ -447,6 +447,11 @@ parameters: name: TRINO_DATE_STEP required: true value: "5" +- description: Number of days to expand the data validation + displayName: Data validation + name: VALIDATION_RANGE + required: true + value: "5" - description: Flag to use account enhanced prometheus metricsworker displayName: Account Enhanced Metrics diff --git a/deploy/kustomize/patches/listener.yaml b/deploy/kustomize/patches/listener.yaml index d5389ae0ae..641169456b 100644 --- a/deploy/kustomize/patches/listener.yaml +++ b/deploy/kustomize/patches/listener.yaml @@ -57,6 +57,8 @@ value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT diff --git a/deploy/kustomize/patches/scheduler.yaml b/deploy/kustomize/patches/scheduler.yaml index 75be748583..d7aea7c5db 100644 --- a/deploy/kustomize/patches/scheduler.yaml +++ b/deploy/kustomize/patches/scheduler.yaml @@ -61,6 +61,8 @@ value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT diff --git a/deploy/kustomize/patches/worker-celery.yaml b/deploy/kustomize/patches/worker-celery.yaml index 19018fb8ee..41dd699b58 100644 --- a/deploy/kustomize/patches/worker-celery.yaml +++ b/deploy/kustomize/patches/worker-celery.yaml @@ -63,6 +63,8 @@ value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT diff --git a/deploy/kustomize/patches/worker-cost-model-penalty.yaml b/deploy/kustomize/patches/worker-cost-model-penalty.yaml index 7cf923e6e8..d21b1943d4 100644 --- a/deploy/kustomize/patches/worker-cost-model-penalty.yaml +++ b/deploy/kustomize/patches/worker-cost-model-penalty.yaml @@ -63,6 +63,8 @@ value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT diff --git a/deploy/kustomize/patches/worker-cost-model-xl.yaml b/deploy/kustomize/patches/worker-cost-model-xl.yaml index 32508304b8..47bdbcdc6a 100644 --- a/deploy/kustomize/patches/worker-cost-model-xl.yaml +++ b/deploy/kustomize/patches/worker-cost-model-xl.yaml @@ -63,6 +63,8 @@ value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT diff --git a/deploy/kustomize/patches/worker-cost-model.yaml b/deploy/kustomize/patches/worker-cost-model.yaml index f6f2622071..9c5921c6a4 100644 --- a/deploy/kustomize/patches/worker-cost-model.yaml +++ b/deploy/kustomize/patches/worker-cost-model.yaml @@ -63,6 +63,8 @@ value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT diff --git a/deploy/kustomize/patches/worker-download-penalty.yaml b/deploy/kustomize/patches/worker-download-penalty.yaml index 014733dd6a..9dca6fd3e4 100644 --- a/deploy/kustomize/patches/worker-download-penalty.yaml +++ b/deploy/kustomize/patches/worker-download-penalty.yaml @@ -65,6 +65,8 @@ value: ${PANDAS_COLUMN_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT diff --git a/deploy/kustomize/patches/worker-download-xl.yaml b/deploy/kustomize/patches/worker-download-xl.yaml index 5232d6bc6d..4ba7eede4a 100644 --- a/deploy/kustomize/patches/worker-download-xl.yaml +++ b/deploy/kustomize/patches/worker-download-xl.yaml @@ -65,6 +65,8 @@ value: ${PANDAS_COLUMN_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT diff --git a/deploy/kustomize/patches/worker-download.yaml b/deploy/kustomize/patches/worker-download.yaml index 1c2cee9fc5..9b59210199 100644 --- a/deploy/kustomize/patches/worker-download.yaml +++ b/deploy/kustomize/patches/worker-download.yaml @@ -65,6 +65,8 @@ value: ${PANDAS_COLUMN_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT diff --git a/deploy/kustomize/patches/worker-hcs.yaml b/deploy/kustomize/patches/worker-hcs.yaml index 66f302def9..97238f5b4e 100644 --- a/deploy/kustomize/patches/worker-hcs.yaml +++ b/deploy/kustomize/patches/worker-hcs.yaml @@ -65,6 +65,8 @@ value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT diff --git a/deploy/kustomize/patches/worker-ocp-penalty.yaml b/deploy/kustomize/patches/worker-ocp-penalty.yaml index 80472ca44a..4219c24a55 100644 --- a/deploy/kustomize/patches/worker-ocp-penalty.yaml +++ b/deploy/kustomize/patches/worker-ocp-penalty.yaml @@ -63,6 +63,8 @@ value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT diff --git a/deploy/kustomize/patches/worker-ocp-xl.yaml b/deploy/kustomize/patches/worker-ocp-xl.yaml index e0f115ff94..59a0836da4 100644 --- a/deploy/kustomize/patches/worker-ocp-xl.yaml +++ b/deploy/kustomize/patches/worker-ocp-xl.yaml @@ -63,6 +63,8 @@ value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT diff --git a/deploy/kustomize/patches/worker-ocp.yaml b/deploy/kustomize/patches/worker-ocp.yaml index fc902a31c3..e79cd87873 100644 --- a/deploy/kustomize/patches/worker-ocp.yaml +++ b/deploy/kustomize/patches/worker-ocp.yaml @@ -63,6 +63,8 @@ value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT diff --git a/deploy/kustomize/patches/worker-priority-penalty.yaml b/deploy/kustomize/patches/worker-priority-penalty.yaml index 12e773f953..826d859b15 100644 --- a/deploy/kustomize/patches/worker-priority-penalty.yaml +++ b/deploy/kustomize/patches/worker-priority-penalty.yaml @@ -65,6 +65,8 @@ value: ${PANDAS_COLUMN_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT diff --git a/deploy/kustomize/patches/worker-priority-xl.yaml b/deploy/kustomize/patches/worker-priority-xl.yaml index 65cf661047..803322b55e 100644 --- a/deploy/kustomize/patches/worker-priority-xl.yaml +++ b/deploy/kustomize/patches/worker-priority-xl.yaml @@ -65,6 +65,8 @@ value: ${PANDAS_COLUMN_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT diff --git a/deploy/kustomize/patches/worker-priority.yaml b/deploy/kustomize/patches/worker-priority.yaml index 03d38da6fe..09867f1d25 100644 --- a/deploy/kustomize/patches/worker-priority.yaml +++ b/deploy/kustomize/patches/worker-priority.yaml @@ -65,6 +65,8 @@ value: ${PANDAS_COLUMN_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT diff --git a/deploy/kustomize/patches/worker-refresh-penalty.yaml b/deploy/kustomize/patches/worker-refresh-penalty.yaml index 6c24a86e60..2ea6974f52 100644 --- a/deploy/kustomize/patches/worker-refresh-penalty.yaml +++ b/deploy/kustomize/patches/worker-refresh-penalty.yaml @@ -63,6 +63,8 @@ value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT diff --git a/deploy/kustomize/patches/worker-refresh-xl.yaml b/deploy/kustomize/patches/worker-refresh-xl.yaml index 6110329a23..6e919c5c50 100644 --- a/deploy/kustomize/patches/worker-refresh-xl.yaml +++ b/deploy/kustomize/patches/worker-refresh-xl.yaml @@ -63,6 +63,8 @@ value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT diff --git a/deploy/kustomize/patches/worker-refresh.yaml b/deploy/kustomize/patches/worker-refresh.yaml index 48cc75d8a1..3d39c73d3e 100644 --- a/deploy/kustomize/patches/worker-refresh.yaml +++ b/deploy/kustomize/patches/worker-refresh.yaml @@ -63,6 +63,8 @@ value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT diff --git a/deploy/kustomize/patches/worker-subs-extraction.yaml b/deploy/kustomize/patches/worker-subs-extraction.yaml index 0732a6f4df..153b0fafde 100644 --- a/deploy/kustomize/patches/worker-subs-extraction.yaml +++ b/deploy/kustomize/patches/worker-subs-extraction.yaml @@ -61,6 +61,8 @@ value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT diff --git a/deploy/kustomize/patches/worker-subs-transmission.yaml b/deploy/kustomize/patches/worker-subs-transmission.yaml index 3a4d51b202..6f87fb5bc9 100644 --- a/deploy/kustomize/patches/worker-subs-transmission.yaml +++ b/deploy/kustomize/patches/worker-subs-transmission.yaml @@ -61,6 +61,8 @@ value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT diff --git a/deploy/kustomize/patches/worker-summary-penalty.yaml b/deploy/kustomize/patches/worker-summary-penalty.yaml index 22155974f9..529f50fa24 100644 --- a/deploy/kustomize/patches/worker-summary-penalty.yaml +++ b/deploy/kustomize/patches/worker-summary-penalty.yaml @@ -63,6 +63,8 @@ value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT diff --git a/deploy/kustomize/patches/worker-summary-xl.yaml b/deploy/kustomize/patches/worker-summary-xl.yaml index 1f311fc72d..2a0fd3e63e 100644 --- a/deploy/kustomize/patches/worker-summary-xl.yaml +++ b/deploy/kustomize/patches/worker-summary-xl.yaml @@ -63,6 +63,8 @@ value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT diff --git a/deploy/kustomize/patches/worker-summary.yaml b/deploy/kustomize/patches/worker-summary.yaml index 4484a1dbed..e9ec9852f1 100644 --- a/deploy/kustomize/patches/worker-summary.yaml +++ b/deploy/kustomize/patches/worker-summary.yaml @@ -63,6 +63,8 @@ value: ${PARQUET_PROCESSING_BATCH_SIZE} - name: TRINO_DATE_STEP value: ${TRINO_DATE_STEP} + - name: VALIDATION_RANGE + value: ${VALIDATION_RANGE} - name: KOKU_ENABLE_SENTRY value: ${KOKU_ENABLE_SENTRY} - name: KOKU_SENTRY_ENVIRONMENT diff --git a/docker-compose.yml b/docker-compose.yml index b6354d0b2d..60738d5529 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -209,6 +209,7 @@ services: - SOURCES_API_PORT=${SOURCES_API_PORT-3000} - DATE_OVERRIDE - TRINO_DATE_STEP=${TRINO_DATE_STEP-31} + - VALIDATION_RANGE=${VALIDATION_RANGE-5} - MAX_CELERY_TASKS_PER_WORKER=${MAX_CELERY_TASKS_PER_WORKER-10} - RETAIN_NUM_MONTHS=${RETAIN_NUM_MONTHS-4} - NOTIFICATION_CHECK_TIME=${NOTIFICATION_CHECK_TIME-24} diff --git a/koku/api/test_utils.py b/koku/api/test_utils.py index 2c886cc2b6..6a633c9d41 100644 --- a/koku/api/test_utils.py +++ b/koku/api/test_utils.py @@ -192,6 +192,19 @@ def test_n_days_ago(self): two_days_ago = (today - delta_day) - delta_day self.assertEqual(self.date_helper.n_days_ago(today, 2), two_days_ago) + def test_set_datetime_utc(self): + """Test set_datetime_utc.""" + # Test with datetime + start_date = datetime.datetime(2024, 7, 5, 0, 0, 0, 0) + expected_date = start_date + # self.assertEqual(self.date_helper.set_datetime_utc(start_date), expected_date) + + # Test with date + self.assertEqual(self.date_helper.set_datetime_utc(start_date.date()), expected_date) + + # Test with str + self.assertEqual(self.date_helper.set_datetime_utc("2024-07-05"), expected_date) + def test_month_start(self): """Test month start method.""" today = self.date_helper.today diff --git a/koku/api/utils.py b/koku/api/utils.py index f12b3edeec..f9e7527913 100644 --- a/koku/api/utils.py +++ b/koku/api/utils.py @@ -238,6 +238,22 @@ def previous_month(self, in_date): dt_prev_month = in_date.replace(day=1, hour=0, minute=0, second=0, microsecond=0) - self.one_day return dt_prev_month.replace(day=1) + def set_datetime_utc(self, in_date): + """Return datetime with utc. + Args: + in_date (datetime, date, string) input datetime + Returns: + (datetime): date in the past + """ + if isinstance(in_date, datetime.date): + in_date = datetime.datetime(in_date.year, in_date.month, in_date.day) + if isinstance(in_date, str): + in_date = ciso8601.parse_datetime(in_date).replace(hour=0, minute=0, second=0, microsecond=0) + + in_date.replace(tzinfo=settings.UTC) + + return in_date + def n_days_ago(self, in_date, n_days): """Return midnight of the n days from the in_date in past. diff --git a/koku/koku/settings.py b/koku/koku/settings.py index 94521acdeb..5d9e03688b 100644 --- a/koku/koku/settings.py +++ b/koku/koku/settings.py @@ -591,3 +591,6 @@ AZURE_COST_MGMT_CLIENT_API_VERSION = ENVIRONMENT.get_value( "AZURE_COST_MGMT_CLIENT_API_VERSION", default="2023-07-01-preview" ) + +# Data validation +VALIDATION_RANGE = ENVIRONMENT.int("VALIDATION_RANGE", default=5) diff --git a/koku/masu/api/urls.py b/koku/masu/api/urls.py index 25cc76fa57..4df25148e6 100644 --- a/koku/masu/api/urls.py +++ b/koku/masu/api/urls.py @@ -44,6 +44,7 @@ from masu.api.views import update_cost_model_costs from masu.api.views import update_exchange_rates from masu.api.views import update_openshift_on_cloud +from masu.api.views import validate_cost_data ROUTER = DefaultRouter() ROUTER.register(r"sources", SourcesViewSet, basename="sources") @@ -77,6 +78,7 @@ path("clear_celery_queues/", clear_celery_queues, name="clear_celery_queues"), path("bigquery_cost//", bigquery_cost, name="bigquery_cost"), path("purge_trino_files/", purge_trino_files, name="purge_trino_files"), + path("validate_cost_data/", validate_cost_data, name="validate_cost_data"), path("db-performance", db_performance_redirect, name="db_perf_no_slash_redirect"), path("db-performance/", db_performance_redirect, name="db_perf_slash_redirect"), path("db-performance/db-settings/", dbsettings, name="db_settings"), diff --git a/koku/masu/api/validate_data.py b/koku/masu/api/validate_data.py new file mode 100644 index 0000000000..1f2d972519 --- /dev/null +++ b/koku/masu/api/validate_data.py @@ -0,0 +1,85 @@ +# +# Copyright 2024 Red Hat Inc. +# SPDX-License-Identifier: Apache-2.0 +# +"""View for data validation endpoint.""" +import logging + +from django.views.decorators.cache import never_cache +from rest_framework import status +from rest_framework.decorators import api_view +from rest_framework.decorators import permission_classes +from rest_framework.decorators import renderer_classes +from rest_framework.permissions import AllowAny +from rest_framework.response import Response +from rest_framework.settings import api_settings + +from api.models import Provider +from common.queues import get_customer_queue +from common.queues import PriorityQueue +from common.queues import QUEUE_LIST +from masu.processor.tasks import validate_daily_data + +LOG = logging.getLogger(__name__) +REPORT_DATA_KEY = "Report Data Task IDs" + + +@never_cache +@api_view(http_method_names=["GET"]) +@permission_classes((AllowAny,)) +@renderer_classes(tuple(api_settings.DEFAULT_RENDERER_CLASSES)) +def validate_cost_data(request): # noqa: C901 + """Masu endpoint to trigger cost validation for a provider""" + if request.method == "GET": + async_results = [] + params = request.query_params + async_result = None + provider_uuid = params.get("provider_uuid") + ocp_on_cloud_type = params.get("ocp_on_cloud_type", None) + start_date = params.get("start_date") + end_date = params.get("end_date") + if start_date is None: + errmsg = "start_date is a required parameter." + return Response({"Error": errmsg}, status=status.HTTP_400_BAD_REQUEST) + if end_date is None: + errmsg = "end_date is a required parameter." + return Response({"Error": errmsg}, status=status.HTTP_400_BAD_REQUEST) + if provider_uuid is None: + errmsg = "provider_uuid must be supplied as a parameter." + return Response({"Error": errmsg}, status=status.HTTP_400_BAD_REQUEST) + + try: + provider = Provider.objects.get(uuid=provider_uuid) + provider_schema = provider.account.get("schema_name") + except Provider.DoesNotExist: + errmsg = f"provider_uuid {provider_uuid} does not exist." + return Response({"Error": errmsg}, status=status.HTTP_400_BAD_REQUEST) + provider_schema = provider.account.get("schema_name") + + if ocp_on_cloud_type: + if provider.type != Provider.PROVIDER_OCP: + errmsg = "ocp_on_cloud_type must by used with an ocp provider." + return Response({"Error": errmsg}, status=status.HTTP_400_BAD_REQUEST) + supported_types = [Provider.PROVIDER_AWS, Provider.PROVIDER_AZURE, Provider.PROVIDER_GCP] + if ocp_on_cloud_type not in supported_types: + errmsg = f"ocp on cloud type must match: {supported_types}" + return Response({"Error": errmsg}, status=status.HTTP_400_BAD_REQUEST) + + fallback_queue = get_customer_queue(provider_schema, PriorityQueue) + queue_name = params.get("queue") or fallback_queue + + if queue_name not in QUEUE_LIST: + errmsg = f"'queue' must be one of {QUEUE_LIST}." + return Response({"Error": errmsg}, status=status.HTTP_400_BAD_REQUEST) + context = {"tracing_id": "running provider validation via masu"} + + async_result = validate_daily_data.s( + provider_schema, + start_date, + end_date, + provider_uuid, + ocp_on_cloud_type, + context=context, + ).apply_async(queue=queue_name) + async_results.append(str(async_result)) + return Response({REPORT_DATA_KEY: async_results}) diff --git a/koku/masu/api/views.py b/koku/masu/api/views.py index 24271f336f..905c44d8b6 100644 --- a/koku/masu/api/views.py +++ b/koku/masu/api/views.py @@ -41,3 +41,4 @@ from masu.api.update_openshift_on_cloud import update_openshift_on_cloud from masu.api.update_rates import update_azure_storage_capacity from masu.api.update_rates import update_exchange_rates +from masu.api.validate_data import validate_cost_data diff --git a/koku/masu/database/report_db_accessor_base.py b/koku/masu/database/report_db_accessor_base.py index a5fe253be9..9afc69fce1 100644 --- a/koku/masu/database/report_db_accessor_base.py +++ b/koku/masu/database/report_db_accessor_base.py @@ -94,15 +94,19 @@ def _prepare_and_execute_raw_sql_query(self, table, tmp_sql, tmp_sql_params=None return self._execute_raw_sql_query(table, sql, bind_params=sql_params, operation=operation) def _execute_raw_sql_query(self, table, sql, bind_params=None, operation="UPDATE"): - """Run a SQL statement via a cursor.""" + """Run a SQL statement via a cursor. This also returns a result if the operation is VALIDATION_QUERY.""" LOG.info(log_json(msg=f"triggering {operation}", table=table)) row_count = None + result = None with connection.cursor() as cursor: cursor.db.set_schema(self.schema) t1 = time.time() try: cursor.execute(sql, params=bind_params) row_count = cursor.rowcount + if operation == "VALIDATION_QUERY": + result = cursor.fetchall() + except OperationalError as exc: db_exc = get_extended_exception_by_type(exc) LOG.warning(log_json(os.getpid(), msg=str(db_exc), context=db_exc.as_dict())) @@ -110,6 +114,7 @@ def _execute_raw_sql_query(self, table, sql, bind_params=None, operation="UPDATE running_time = time.time() - t1 LOG.info(log_json(msg=f"finished {operation}", row_count=row_count, table=table, running_time=running_time)) + return result def _execute_trino_raw_sql_query(self, sql, *, sql_params=None, context=None, log_ref=None, attempts_left=0): """Execute a single trino query returning only the fetchall results""" diff --git a/koku/masu/processor/__init__.py b/koku/masu/processor/__init__.py index 916c0d2d7e..4d81b982c9 100644 --- a/koku/masu/processor/__init__.py +++ b/koku/masu/processor/__init__.py @@ -101,6 +101,13 @@ def is_rate_limit_customer_large(account): # pragma: no cover return UNLEASH_CLIENT.is_enabled("cost-management.backend.large-customer.rate-limit", context) +def is_validation_enabled(account): # pragma: no cover + """Flag if customer is enabled to run validation.""" + account = convert_account(account) + context = {"schema": account} + return UNLEASH_CLIENT.is_enabled("cost-management.backend.enable_data_validation", context) + + def is_ocp_amortized_monthly_cost_enabled(account): # pragma: no cover """Enable the use of savings plan cost for OCP on AWS -> OCP.""" account = convert_account(account) diff --git a/koku/masu/processor/_tasks/data_validation.py b/koku/masu/processor/_tasks/data_validation.py new file mode 100644 index 0000000000..64fb12e1c3 --- /dev/null +++ b/koku/masu/processor/_tasks/data_validation.py @@ -0,0 +1,226 @@ +# +# Copyright 2024 Red Hat Inc. +# SPDX-License-Identifier: Apache-2.0 +# +import logging +from datetime import datetime + +from django.conf import settings + +from api.common import log_json +from api.provider.models import Provider +from api.utils import DateHelper +from masu.database import AWS_CUR_TABLE_MAP +from masu.database import AZURE_REPORT_TABLE_MAP +from masu.database import GCP_REPORT_TABLE_MAP +from masu.database import OCI_CUR_TABLE_MAP +from masu.database import OCP_REPORT_TABLE_MAP +from masu.database.report_db_accessor_base import ReportDBAccessorBase +from masu.util.common import date_range_pair +from reporting.provider.aws.models import TRINO_LINE_ITEM_DAILY_TABLE as AWS_TRINO_LINE_ITEM_DAILY_TABLE +from reporting.provider.azure.models import TRINO_LINE_ITEM_DAILY_TABLE as AZURE_TRINO_LINE_ITEM_DAILY_TABLE +from reporting.provider.gcp.models import TRINO_LINE_ITEM_DAILY_TABLE as GCP_TRINO_LINE_ITEM_DAILY_TABLE +from reporting.provider.oci.models import TRINO_LINE_ITEM_DAILY_TABLE_MAP as OCI_TRINO_LINE_ITEM_DAILY_TABLE + +LOG = logging.getLogger(__name__) + +# Filter maps +TRINO_FILTER_MAP = { + Provider.PROVIDER_AWS: {"date": "lineitem_usagestartdate", "metric": "lineitem_unblendedcost"}, + Provider.PROVIDER_AZURE: {"date": "date", "metric": "coalesce(nullif(costinbillingcurrency, 0), pretaxcost)"}, + Provider.PROVIDER_GCP: {"date": "usage_start_time", "metric": "cost"}, + Provider.PROVIDER_OCI: {"date": "lineitem_intervalusagestart", "metric": "cost_mycost"}, + Provider.PROVIDER_OCP: { + "date": "usage_start", + "metric": "pod_effective_usage_cpu_core_hours", + }, + "OCPAWS": {"date": "usage_start", "metric": "unblended_cost"}, + "OCPAzure": {"date": "usage_start", "metric": "pretax_cost"}, + "OCPGCP": {"date": "usage_start", "metric": "unblended_cost"}, +} +PG_FILTER_MAP = { + Provider.PROVIDER_AWS: { + "date": "usage_start", + "metric": "unblended_cost", + }, + Provider.PROVIDER_AZURE: {"date": "usage_start", "metric": "pretax_cost"}, + Provider.PROVIDER_GCP: { + "date": "usage_start", + "metric": "unblended_cost", + }, + Provider.PROVIDER_OCI: {"date": "usage_start", "metric": "cost"}, + Provider.PROVIDER_OCP: { + "date": "usage_start", + "metric": "pod_effective_usage_cpu_core_hours", + }, + "OCPAWS": {"date": "usage_start", "metric": "unblended_cost"}, + "OCPAzure": {"date": "usage_start", "metric": "pretax_cost"}, + "OCPGCP": {"date": "usage_start", "metric": "unblended_cost"}, +} + +# Table maps +PG_TABLE_MAP = { + Provider.PROVIDER_AWS: AWS_CUR_TABLE_MAP.get("line_item_daily_summary"), + Provider.PROVIDER_AZURE: AZURE_REPORT_TABLE_MAP.get("line_item_daily_summary"), + Provider.PROVIDER_GCP: GCP_REPORT_TABLE_MAP.get("line_item_daily_summary"), + Provider.PROVIDER_OCI: OCI_CUR_TABLE_MAP.get("line_item_daily_summary"), + Provider.PROVIDER_OCP: OCP_REPORT_TABLE_MAP.get("line_item_daily_summary"), + "OCPAWS": AWS_CUR_TABLE_MAP.get("ocp_on_aws_project_daily_summary"), + "OCPAzure": AZURE_REPORT_TABLE_MAP.get("ocp_on_azure_project_daily_summary"), + "OCPGCP": GCP_REPORT_TABLE_MAP.get("ocp_on_gcp_project_daily_summary"), +} + +TRINO_TABLE_MAP = { + Provider.PROVIDER_AWS: AWS_TRINO_LINE_ITEM_DAILY_TABLE, + Provider.PROVIDER_AZURE: AZURE_TRINO_LINE_ITEM_DAILY_TABLE, + Provider.PROVIDER_GCP: GCP_TRINO_LINE_ITEM_DAILY_TABLE, + Provider.PROVIDER_OCI: OCI_TRINO_LINE_ITEM_DAILY_TABLE.get("cost"), + Provider.PROVIDER_OCP: "reporting_ocpusagelineitem_daily_summary", + "OCPAWS": "reporting_ocpawscostlineitem_project_daily_summary", + "OCPAzure": "reporting_ocpazurecostlineitem_project_daily_summary", + "OCPGCP": "reporting_ocpgcpcostlineitem_project_daily_summary", +} + + +class DataValidator: + """Class to check data is valid for providers""" + + def __init__( + self, + schema, + start_date, + end_date, + provider_uuid, + ocp_on_cloud_type, + context, + date_step=settings.TRINO_DATE_STEP, + ): + self.dh = DateHelper() + self.schema = schema + self.provider_uuid = provider_uuid + self.ocp_on_cloud_type = ocp_on_cloud_type + # start_date should include a rolling window + utc_start = self.dh.set_datetime_utc(start_date) + self.start_date = ( + self.dh.month_start_utc(utc_start) + if utc_start.day < 6 + else self.dh.n_days_ago(utc_start, settings.VALIDATION_RANGE) + ) + self.end_date = self.dh.set_datetime_utc(end_date) + self.context = context + self.date_step = date_step + + def get_table_filters_for_provider(self, provider_type, trino=False): + """Get relevant table and query filters for given provider type""" + table_map = provider_type + if self.ocp_on_cloud_type: + table_map = f"OCP{provider_type}" + table = PG_TABLE_MAP.get(table_map) + query_filters = PG_FILTER_MAP.get(table_map) + if trino: + table = TRINO_TABLE_MAP.get(table_map) + query_filters = TRINO_FILTER_MAP.get(table_map) + return table, query_filters + + def compare_data(self, pg_data, trino_data, tolerance=1): + """Validate if postgres and trino query data cost matches per day""" + incomplete_days = {} + valid_cost = True + if trino_data == {}: + return incomplete_days, False + for date in trino_data: + if date in pg_data: + if not abs(pg_data[date] - trino_data[date]) <= tolerance: + incomplete_days[date] = { + "pg_value": pg_data[date], + "trino_value": trino_data[date], + "delta": trino_data[date] - pg_data[date], + } + valid_cost = False + else: + incomplete_days[date] = "missing daily data" + valid_cost = False + return incomplete_days, valid_cost + + def execute_relevant_query(self, provider_type, cluster_id=None, trino=False): + """Make relevant postgres or Trino queries""" + daily_result = {} + # year and month for running partitioned queries + year = self.dh.bill_year_from_date(self.start_date) + month = self.dh.bill_month_from_date(self.start_date) + report_db_accessor = ReportDBAccessorBase(self.schema) + # Set provider filter, when running ocp{aws/gcp/azure} checks we need to rely on the cluster id + provider_filter = self.provider_uuid if not self.ocp_on_cloud_type else cluster_id + # query trino/postgres + table, query_filters = self.get_table_filters_for_provider(provider_type, trino) + for start, end in date_range_pair(self.start_date, self.end_date, step=self.date_step): + if trino: + source = "source" if not self.ocp_on_cloud_type else "cluster_id" + sql = f""" + SELECT sum({query_filters.get("metric")}) as metric, {query_filters.get("date")} as date + FROM hive.{self.schema}.{table} + WHERE {source} = '{provider_filter}' + AND {query_filters.get("date")} >= date('{start}') + AND {query_filters.get("date")} <= date('{end}') + AND year = '{year}' + AND lpad(month, 2, '0') = '{month}' + GROUP BY {query_filters.get("date")} + ORDER BY {query_filters.get("date")}""" + result = report_db_accessor._execute_trino_raw_sql_query(sql, log_ref="data validation query") + else: + source = "source_uuid" if not self.ocp_on_cloud_type else "cluster_id" + sql = f""" + SELECT sum({query_filters.get("metric")}) as metric, {query_filters.get("date")} as date + FROM {self.schema}.{table}_{year}_{month} + WHERE {source} = '{provider_filter}' + AND {query_filters.get("date")} >= '{start}' + AND {query_filters.get("date")} <= '{end}' + GROUP BY {query_filters.get("date")} + ORDER BY {query_filters.get("date")}""" + result = report_db_accessor._prepare_and_execute_raw_sql_query( + table, sql, operation="VALIDATION_QUERY" + ) + if result != []: + for day in result: + key = day[1].date() if isinstance(day[1], datetime) else day[1] + daily_result[key] = float(day[0]) + return daily_result + + def check_data_integrity(self): + """Helper to call the query and validation methods for validating data""" + valid_cost = False + pg_data = None + trino_data = None + cluster_id = None + daily_difference = {} + LOG.info( + log_json(msg=f"validation started for provider using start date: {self.start_date}", context=self.context) + ) + provider = Provider.objects.filter(uuid=self.provider_uuid).first() + provider_type = provider.type.strip("-local") + if self.ocp_on_cloud_type: + provider_type = self.ocp_on_cloud_type.strip("-local") + cluster_id = provider.authentication.credentials.get("cluster_id") + # Postgres query to get daily values + try: + pg_data = self.execute_relevant_query(provider_type, cluster_id) + except Exception as e: + LOG.warning(log_json(msg=f"data validation postgres query failed: {e}", context=self.context)) + return + # Trino query to get daily values + try: + trino_data = self.execute_relevant_query(provider_type, cluster_id, True) + except Exception as e: + LOG.warning(log_json(msg=f"data validation trino query failed: {e}", context=self.context)) + return + # Compare results + LOG.debug(f"PG: {pg_data} Trino data: {trino_data}") + daily_difference, valid_cost = self.compare_data(pg_data, trino_data) + if valid_cost: + LOG.info(log_json(msg=f"all data complete for provider: {self.provider_uuid}", context=self.context)) + else: + LOG.error( + log_json( + msg=f"provider has incomplete data for specified days: {daily_difference}", context=self.context + ) + ) diff --git a/koku/masu/processor/tasks.py b/koku/masu/processor/tasks.py index 92a1010970..7f00da019a 100644 --- a/koku/masu/processor/tasks.py +++ b/koku/masu/processor/tasks.py @@ -47,6 +47,8 @@ from masu.processor import is_rate_limit_customer_large from masu.processor import is_source_disabled from masu.processor import is_summary_processing_disabled +from masu.processor import is_validation_enabled +from masu.processor._tasks.data_validation import DataValidator from masu.processor._tasks.download import _get_report_files from masu.processor._tasks.process import _process_report_file from masu.processor._tasks.remove_expired import _remove_expired_data @@ -645,13 +647,18 @@ def update_summary_tables( # noqa: C901 # This method should always be called for OCP providers even when it does not have a cost model if cost_model is not None or provider_type == Provider.PROVIDER_OCP: LOG.info(log_json(tracing_id, msg="updating cost model costs", context=context)) - linked_tasks = update_cost_model_costs.s( - schema, provider_uuid, start_date, end_date, tracing_id=tracing_id - ).set(queue=update_cost_model_queue) | mark_manifest_complete.si( - schema, provider_type, provider_uuid, manifest_list=manifest_list, tracing_id=tracing_id - ).set( - queue=mark_manifest_complete_queue + linked_tasks = ( + update_cost_model_costs.s(schema, provider_uuid, start_date, end_date, tracing_id=tracing_id).set( + queue=update_cost_model_queue + ) + | mark_manifest_complete.si( + schema, provider_type, provider_uuid, manifest_list=manifest_list, tracing_id=tracing_id + ).set(queue=mark_manifest_complete_queue) + | validate_daily_data.si(schema, start_date, end_date, provider_uuid, context=context).set( + queue=fallback_update_summary_tables_queue + ) ) + else: LOG.info(log_json(tracing_id, msg="skipping cost model updates", context=context)) linked_tasks = mark_manifest_complete.s( @@ -661,7 +668,11 @@ def update_summary_tables( # noqa: C901 manifest_list=manifest_list, ingress_report_uuid=ingress_report_uuid, tracing_id=tracing_id, - ).set(queue=mark_manifest_complete_queue) + ).set(queue=mark_manifest_complete_queue) | validate_daily_data.si( + schema, start_date, end_date, provider_uuid, context=context + ).set( + queue=fallback_update_summary_tables_queue + ) chain(linked_tasks).apply_async() @@ -783,6 +794,14 @@ def update_openshift_on_cloud( # noqa: C901 ).apply_async(queue=queue_name or fallback_queue) # Set OpenShift manifest summary end time set_summary_timestamp(ManifestState.END, ocp_manifest_id) + validate_daily_data.s( + schema_name, + start_date, + end_date, + openshift_provider_uuid, + ocp_on_cloud_type=infrastructure_provider_type, + context=ctx, + ).apply_async(queue=queue_name or fallback_queue) except ReportSummaryUpdaterCloudError as ex: LOG.info( log_json( @@ -1230,3 +1249,13 @@ def process_daily_openshift_on_cloud( file_name = f"ocp_on_{provider_type}_{i}" processor.process(file_name, [data_frame]) + + +@celery_app.task(name="masu.processor.tasks.validate_daily_data", queue=SummaryQueue.DEFAULT) +def validate_daily_data(schema, start_date, end_date, provider_uuid, ocp_on_cloud_type=None, context=None): + # collect and validate cost metrics between postgres and trino tables. + if is_validation_enabled(schema): + data_validator = DataValidator(schema, start_date, end_date, provider_uuid, ocp_on_cloud_type, context) + data_validator.check_data_integrity() + else: + LOG.info(log_json(msg="skipping validation, disabled for schema", context=context)) diff --git a/koku/masu/test/api/test_validate_data.py b/koku/masu/test/api/test_validate_data.py new file mode 100644 index 0000000000..32df11b398 --- /dev/null +++ b/koku/masu/test/api/test_validate_data.py @@ -0,0 +1,77 @@ +# +# Copyright 2024 Red Hat Inc. +# SPDX-License-Identifier: Apache-2.0 +# +"""Test the validate data endpoint.""" +import uuid +from unittest.mock import patch + +from django.test.utils import override_settings +from django.urls import reverse + +from masu.test import MasuTestCase + + +@override_settings(ROOT_URLCONF="masu.urls") +class ValidateDataViewTest(MasuTestCase): + """Test Cases for the validate data API.""" + + def setUp(self): + """Create test case setup.""" + super().setUp() + + @patch("koku.middleware.MASU", return_value=True) + def test_validate_data(self, _): + """Test validate data endpoint.""" + expected_key = "Error" + url = reverse("validate_cost_data") + # test missing start date + response = self.client.get(url) + body = response.json() + self.assertEqual(response.status_code, 400) + self.assertIn("start_date is a required parameter.", body[expected_key]) + + # test missing end date + url_w_params = url + "?start_date=2021-04-01" + response = self.client.get(url_w_params) + body = response.json() + self.assertEqual(response.status_code, 400) + self.assertIn("end_date is a required parameter.", body[expected_key]) + + # test missing provider uuid + url_w_params = url + "?start_date=2021-04-01&end_date=2021-04-30" + response = self.client.get(url_w_params) + body = response.json() + self.assertEqual(response.status_code, 400) + self.assertIn("provider_uuid must be supplied as a parameter.", body[expected_key]) + + # test provider does not exist + id = str(uuid.uuid4()) + url_w_params = url + f"?start_date=2021-04-01&end_date=2021-04-30&provider_uuid={id}" + response = self.client.get(url_w_params) + body = response.json() + self.assertEqual(response.status_code, 400) + self.assertIn(f"provider_uuid {id} does not exist.", body[expected_key]) + + # test non ocp provider with ocp on cloud type + id = self.aws_provider_uuid + url_w_params = url + f"?start_date=2021-04-01&end_date=2021-04-30&provider_uuid={id}&ocp_on_cloud_type=GCP" + response = self.client.get(url_w_params) + body = response.json() + self.assertEqual(response.status_code, 400) + self.assertIn("ocp_on_cloud_type must by used with an ocp provider.", body[expected_key]) + + # test fake ocp on cloud type + id = self.ocp_provider_uuid + url_w_params = url + f"?start_date=2021-04-01&end_date=2021-04-30&provider_uuid={id}&ocp_on_cloud_type=OCP" + response = self.client.get(url_w_params) + body = response.json() + self.assertEqual(response.status_code, 400) + self.assertIn("ocp on cloud type must match:", body[expected_key]) + + # test invalid queue + url_w_params = url + f"?start_date=2021-04-01&end_date=2021-04-30&provider_uuid={id}&queue=test" + response = self.client.get(url_w_params) + body = response.json() + self.assertEqual(response.status_code, 400) + self.assertIn("'queue' must be one of", body[expected_key]) diff --git a/koku/masu/test/processor/test_data_validation.py b/koku/masu/test/processor/test_data_validation.py new file mode 100644 index 0000000000..ca0c8e97dc --- /dev/null +++ b/koku/masu/test/processor/test_data_validation.py @@ -0,0 +1,178 @@ +# +# Copyright 2024 Red Hat Inc. +# SPDX-License-Identifier: Apache-2.0 +# +"""Test cases for VataValidator""" +from unittest.mock import patch + +from api.provider.models import Provider +from api.utils import DateHelper +from masu.processor._tasks.data_validation import DataValidator +from masu.processor._tasks.data_validation import PG_FILTER_MAP +from masu.processor._tasks.data_validation import PG_TABLE_MAP +from masu.processor._tasks.data_validation import TRINO_FILTER_MAP +from masu.processor._tasks.data_validation import TRINO_TABLE_MAP +from masu.test import MasuTestCase + + +class TestDataValidator(MasuTestCase): + """Test Cases for the data validator.""" + + @classmethod + def setUpClass(cls): + """Set up the class.""" + super().setUpClass() + cls.start_date = DateHelper().now_utc + cls.end_date = DateHelper().now_utc + cls.provider = Provider.PROVIDER_AWS + cls.provider_uuid = "cabfdddb-4ed5-421e-a041-311b75daf235" + + def test_init(self): + """Test the initializer.""" + cloud_type = "GCP" + context = "test" + validator = DataValidator( + self.schema, self.start_date, self.end_date, self.provider_uuid, cloud_type, context=context + ) + self.assertEqual(validator.schema, self.schema) + self.assertEqual(validator.ocp_on_cloud_type, cloud_type) + self.assertEqual(validator.provider_uuid, self.provider_uuid) + self.assertEqual(validator.context, context) + + def test_get_provider_table_filters(self): + """Testing the lookup for a particular provider.""" + provider_type = "GCP" + context = "test" + validator = DataValidator( + self.schema, self.start_date, self.end_date, self.provider_uuid, None, context=context + ) + + # Test Postgres lookup + table, query_filters = validator.get_table_filters_for_provider(provider_type) + self.assertEqual(table, PG_TABLE_MAP.get(provider_type)) + self.assertEqual(query_filters, PG_FILTER_MAP.get(provider_type)) + + # Test Trino lookup + table, query_filters = validator.get_table_filters_for_provider(provider_type, trino=True) + self.assertEqual(table, TRINO_TABLE_MAP.get(provider_type)) + self.assertEqual(query_filters, TRINO_FILTER_MAP.get(provider_type)) + + # Test Trino OCP ON CLOUD lookup + validator = DataValidator( + self.schema, self.start_date, self.end_date, self.provider_uuid, "GCP", context=context + ) + table, query_filters = validator.get_table_filters_for_provider(provider_type, trino=True) + self.assertEqual(table, TRINO_TABLE_MAP.get("OCPGCP")) + self.assertEqual(query_filters, TRINO_FILTER_MAP.get("OCPGCP")) + + def test_compare_data(self): + """Test comparing input data.""" + validator = DataValidator( + self.schema, self.start_date, self.end_date, self.provider_uuid, None, context="test" + ) + # Test no trino data + incomplete_days, valid_cost = validator.compare_data({}, {}) + self.assertFalse(valid_cost) + self.assertEqual(incomplete_days, {}) + + # Test valid data + data = {"date": 30} + incomplete_days, valid_cost = validator.compare_data(data, data) + self.assertTrue(valid_cost) + self.assertEqual(incomplete_days, {}) + + # Test inaccurate data + day = "date2" + trino_data = {"date1": 30, day: 20} + pg_data = {"date1": 30, day: 10} + expected_incomplete_days = { + day: { + "pg_value": pg_data[day], + "trino_value": trino_data[day], + "delta": trino_data[day] - pg_data[day], + } + } + incomplete_days, valid_cost = validator.compare_data(pg_data, trino_data) + self.assertFalse(valid_cost) + self.assertEqual(incomplete_days, expected_incomplete_days) + + # Test missing data + day = "date2" + trino_data = {"date1": 30, day: 20} + pg_data = {"date1": 30} + expected_incomplete_days = {day: "missing daily data"} + incomplete_days, valid_cost = validator.compare_data(pg_data, trino_data) + self.assertFalse(valid_cost) + self.assertEqual(incomplete_days, expected_incomplete_days) + + @patch("masu.database.report_db_accessor_base.ReportDBAccessorBase._prepare_and_execute_raw_sql_query") + def test_query_postgres(self, mock_pg_query): + """Test making postgres queries.""" + validator = DataValidator( + self.schema, self.start_date, self.end_date, self.provider_uuid, None, context="test" + ) + validator.execute_relevant_query("AWS") + mock_pg_query.assert_called() + + @patch("masu.database.report_db_accessor_base.ReportDBAccessorBase._execute_trino_raw_sql_query") + def test_query_trino(self, mock_trino_query): + """Test making trino queries.""" + validator = DataValidator( + self.schema, self.start_date, self.end_date, self.provider_uuid, None, context="test" + ) + validator.execute_relevant_query("AWS", trino=True) + mock_trino_query.assert_called() + + @patch("masu.database.report_db_accessor_base.ReportDBAccessorBase._execute_trino_raw_sql_query") + def test_query_result(self, mock_trino_query): + """Test making trino queries.""" + date = DateHelper().today + expected_result = {date.date(): 30.0} + mock_trino_query.return_value = [[30, date]] + validator = DataValidator( + self.schema, self.start_date, self.end_date, self.provider_uuid, None, context="test" + ) + result = validator.execute_relevant_query("AWS", trino=True) + self.assertEqual(result, expected_result) + + @patch("masu.processor._tasks.data_validation.DataValidator.execute_relevant_query") + def test_check_data_integrity(self, mock_query): + """Test for valid data.""" + date = DateHelper().today + mock_query.return_value = {date: 30} + validator = DataValidator( + self.schema, self.start_date, self.end_date, self.aws_provider_uuid, None, context={"test": "testing"} + ) + with self.assertLogs("masu.processor._tasks.data_validation", level="INFO") as logger: + validator.check_data_integrity() + expected = "all data complete for provider" + found = any(expected in log for log in logger.output) + self.assertTrue(found) + + @patch("masu.database.report_db_accessor_base.ReportDBAccessorBase._prepare_and_execute_raw_sql_query") + @patch("masu.database.report_db_accessor_base.ReportDBAccessorBase._execute_trino_raw_sql_query") + def test_check_data_integrity_pg_error(self, mock_trino_query, mock_pg_query): + """Test for pg query error.""" + mock_pg_query.side_effect = Exception("Error") + validator = DataValidator( + self.schema, self.start_date, self.end_date, self.aws_provider_uuid, None, context={"test": "testing"} + ) + with self.assertLogs("masu.processor._tasks.data_validation", level="WARNING") as logger: + validator.check_data_integrity() + expected = "data validation postgres query failed" + found = any(expected in log for log in logger.output) + self.assertTrue(found) + + @patch("masu.database.report_db_accessor_base.ReportDBAccessorBase._prepare_and_execute_raw_sql_query") + @patch("masu.database.report_db_accessor_base.ReportDBAccessorBase._execute_trino_raw_sql_query") + def test_check_data_integrity_trino_error(self, mock_trino_query, mock_pg_query): + """Test for trino query error.""" + mock_trino_query.side_effect = Exception("Error") + validator = DataValidator( + self.schema, self.start_date, self.end_date, self.aws_provider_uuid, None, context={"test": "testing"} + ) + with self.assertLogs("masu.processor._tasks.data_validation", level="WARNING") as logger: + validator.check_data_integrity() + expected = "data validation trino query failed" + found = any(expected in log for log in logger.output) + self.assertTrue(found) diff --git a/koku/masu/test/processor/test_tasks.py b/koku/masu/test/processor/test_tasks.py index 248a2f9fa2..18f651618d 100644 --- a/koku/masu/test/processor/test_tasks.py +++ b/koku/masu/test/processor/test_tasks.py @@ -31,7 +31,6 @@ from api.iam.models import Tenant from api.models import Provider -from common.queues import PriorityQueue from common.queues import SummaryQueue from koku.middleware import KokuTenantMiddleware from masu.config import Config @@ -67,6 +66,7 @@ from masu.processor.tasks import update_openshift_on_cloud from masu.processor.tasks import update_summary_tables from masu.processor.tasks import vacuum_schema +from masu.processor.tasks import validate_daily_data from masu.processor.worker_cache import create_single_task_cache_key from masu.test import MasuTestCase from masu.test.external.downloader.aws import fake_arn @@ -641,6 +641,28 @@ def test_process_daily_openshift_on_cloud(self, mock_stats, mock_trino, mock_s3_ mock_s3_delete.assert_called() mock_process.assert_called() + @patch("masu.processor.tasks.DataValidator") + @patch( + "masu.processor.tasks.is_validation_enabled", + return_value=True, + ) + def test_validate_data_task(self, mock_unleash, mock_validate_daily_data): + """Test validate data task.""" + context = {"unit": "test"} + validate_daily_data(self.schema, self.start_date, self.start_date, self.aws_provider_uuid, context=context) + mock_validate_daily_data.assert_called() + + @patch("masu.processor.tasks.DataValidator") + def test_validate_data_task_skip(self, mock_validate_daily_data): + """Test skipping validate data task.""" + context = {"unit": "test"} + with self.assertLogs("masu.processor.tasks", level="INFO") as logger: + validate_daily_data(self.schema, self.start_date, self.start_date, self.aws_provider_uuid, context=context) + mock_validate_daily_data.assert_not_called() + expected = "skipping validation, disabled for schema" + found = any(expected in log for log in logger.output) + self.assertTrue(found) + class TestRemoveExpiredDataTasks(MasuTestCase): """Test cases for Processor Celery tasks.""" @@ -953,16 +975,7 @@ def test_update_summary_tables_remove_expired_data(self, mock_select_for_update, manifest_id=manifest_id, synchronous=True, ) - mock_chain.assert_called_with( - mark_manifest_complete.s( - self.schema, - provider_type, - provider_aws_uuid, - manifest_list=[manifest_id], - ingress_report_uuid=None, - tracing_id=tracing_id, - ).set(queue=PriorityQueue.DEFAULT) - ) + mock_chain.assert_called() mock_chain.return_value.apply_async.assert_called() @patch("masu.processor.tasks.CostModelDBAccessor") @@ -990,16 +1003,7 @@ def test_update_summary_tables_remove_expired_data_gcp(self, mock_select_for_upd synchronous=True, invoice_month=invoice_month, ) - mock_chain.assert_called_with( - mark_manifest_complete.s( - self.schema, - provider_type, - self.gcp_provider_uuid, - manifest_list=[manifest_id], - ingress_report_uuid=None, - tracing_id=tracing_id, - ).set(queue=PriorityQueue.DEFAULT) - ) + mock_chain.assert_called() mock_chain.return_value.apply_async.assert_called() @patch("masu.util.common.trino_db.connect") @@ -1259,7 +1263,8 @@ def test_record_all_manifest_files_concurrent_writes(self): @patch("masu.processor.tasks.ReportSummaryUpdater.update_openshift_on_cloud_summary_tables") @patch("masu.processor.tasks.update_cost_model_costs.s") - def test_update_openshift_on_cloud(self, mock_cost_updater, mock_updater): + @patch("masu.processor.tasks.validate_daily_data.s") + def test_update_openshift_on_cloud(self, mock_data_validator, mock_cost_updater, mock_updater): """Test that this task runs.""" start_date = self.dh.this_month_start.date() end_date = self.dh.today.date() @@ -1682,8 +1687,9 @@ def test_update_cost_model_costs_error(self, mock_inspect, mock_lock, mock_relea @patch("masu.processor.tasks.WorkerCache.lock_single_task") @patch("masu.processor.worker_cache.CELERY_INSPECT") @patch("masu.processor.tasks.update_cost_model_costs.s") + @patch("masu.processor.tasks.validate_daily_data.s") def test_update_openshift_on_cloud_throttled( - self, mock_model_update, mock_inspect, mock_lock, mock_release, mock_delay, mock_update + self, mock_data_validator, mock_model_update, mock_inspect, mock_lock, mock_release, mock_delay, mock_update ): """Test that refresh materialized views runs with cache lock.""" start_date = self.dh.this_month_start.date()