Skip to content

Commit

Permalink
COST-1165: Add Priority worker/queue. (#2741)
Browse files Browse the repository at this point in the history
* COST-1165: Add Priority worker/queue.

* Address comments.

* Update docker-compose-multiworker.yml
  • Loading branch information
myersCody authored Mar 22, 2021
1 parent 3e253c6 commit 0c68f06
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 24 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ services:
hostname: koku-worker-1
image: koku_base
working_dir: /koku/koku
entrypoint: ['watchmedo', 'auto-restart', '--directory=./', '--pattern=*.py', '--recursive', '--', 'celery', '-A', 'koku', 'worker', '-l', 'info', '-Q', 'celery,download,ocp,remove_expired,reporting,process,upload,customer_data_sync,delete_archived_data,query_upload']
entrypoint: ['watchmedo', 'auto-restart', '--directory=./', '--pattern=*.py', '--recursive', '--', 'celery', '-A', 'koku', 'worker', '-l', 'info', '-Q', 'celery,download,ocp,remove_expired,reporting,process,upload,customer_data_sync,delete_archived_data,query_upload,priority']
environment:
- AWS_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY
Expand Down
9 changes: 4 additions & 5 deletions koku/cost_models/cost_model_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@
from cost_models.models import CostModel
from cost_models.models import CostModelMap
from masu.processor.tasks import OCP_QUEUE
from masu.processor.tasks import PRIORITY_QUEUE
from masu.processor.tasks import refresh_materialized_views
from masu.processor.tasks import REFRESH_MATERIALIZED_VIEWS_QUEUE
from masu.processor.tasks import update_cost_model_costs
from masu.processor.tasks import UPDATE_COST_MODEL_COSTS_QUEUE


LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -98,14 +97,14 @@ def update_provider_uuids(self, provider_uuids):
except Provider.DoesNotExist:
LOG.info(f"Provider {provider_uuid} does not exist. Skipping cost-model update.")
else:
queue = OCP_QUEUE if provider.type == Provider.PROVIDER_OCP else None
queue_choice = OCP_QUEUE if provider.type == Provider.PROVIDER_OCP else PRIORITY_QUEUE
schema_name = provider.customer.schema_name
chain(
update_cost_model_costs.s(schema_name, provider.uuid, start_date, end_date).set(
queue=queue or UPDATE_COST_MODEL_COSTS_QUEUE
queue=queue_choice
),
refresh_materialized_views.si(schema_name, provider.type, provider_uuid=provider.uuid).set(
queue=queue or REFRESH_MATERIALIZED_VIEWS_QUEUE
queue=queue_choice
),
).apply_async()

Expand Down
1 change: 1 addition & 0 deletions koku/masu/processor/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
SUMMARIZE_REPORTS_QUEUE = "process"
UPDATE_COST_MODEL_COSTS_QUEUE = "reporting"
UPDATE_SUMMARY_TABLES_QUEUE = "reporting"
PRIORITY_QUEUE = "priority"


def record_all_manifest_files(manifest_id, report_files):
Expand Down
3 changes: 2 additions & 1 deletion koku/sources/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
import logging

from koku import celery_app
from masu.processor.tasks import PRIORITY_QUEUE
from sources.sources_provider_coordinator import SourcesProviderCoordinator
from sources.storage import load_providers_to_delete

LOG = logging.getLogger(__name__)


@celery_app.task(name="sources.tasks.delete_source", queue="remove_expired")
@celery_app.task(name="sources.tasks.delete_source", queue=PRIORITY_QUEUE)
def delete_source(source_id, auth_header, koku_uuid):
"""Delete Provider and Source."""
LOG.info(f"Deleting Provider {koku_uuid} for Source ID: {source_id}")
Expand Down
76 changes: 59 additions & 17 deletions testing/compose_files/docker-compose-multiworker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -200,22 +200,7 @@ services:

privileged: true
volumes:
- '.:/koku'
- './testing:/testing'
- './testing/local_providers/azure_local:/tmp/local_container'
- './testing/local_providers/aws_local:/tmp/local_bucket'
- './testing/local_providers/aws_local_0:/tmp/local_bucket_0'
- './testing/local_providers/aws_local_1:/tmp/local_bucket_1'
- './testing/local_providers/aws_local_2:/tmp/local_bucket_2'
- './testing/local_providers/aws_local_3:/tmp/local_bucket_3'
- './testing/local_providers/aws_local_4:/tmp/local_bucket_4'
- './testing/local_providers/aws_local_5:/tmp/local_bucket_5'
- './testing/local_providers/insights_local:/var/tmp/masu/insights_local'
- './testing/local_providers/gcp_local/:/tmp/gcp_local_bucket'
- './testing/local_providers/gcp_local_0/:/tmp/gcp_local_bucket_0'
- './testing/local_providers/gcp_local_1/:/tmp/gcp_local_bucket_1'
- './testing/local_providers/gcp_local_2/:/tmp/gcp_local_bucket_2'
- './testing/local_providers/gcp_local_3/:/tmp/gcp_local_bucket_3'
- './../..:/koku/'
links:
- redis
depends_on:
Expand Down Expand Up @@ -283,7 +268,6 @@ services:
depends_on:
- redis
- koku-base
- redis

koku-worker-2:
container_name: koku_worker_2
Expand Down Expand Up @@ -904,6 +888,64 @@ services:
depends_on:
- koku-base

priority-worker:
container_name: priority-worker
hostname: priority-worker
image: koku_base
working_dir: /koku/koku
entrypoint: ['watchmedo', 'auto-restart', '--directory=./', '--pattern=*.py', '--recursive', '--', 'celery', '-A', 'koku', 'worker', '-l', 'info', '-Q', 'priority']
environment:
- AWS_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY
- DATABASE_SERVICE_NAME=POSTGRES_SQL
- DATABASE_ENGINE=postgresql
- DATABASE_NAME=${DATABASE_NAME-postgres}
- POSTGRES_SQL_SERVICE_HOST=db
- POSTGRES_SQL_SERVICE_PORT=5432
- DATABASE_USER=${DATABASE_USER-postgres}
- DATABASE_PASSWORD=${DATABASE_PASSWORD-postgres}
- RABBITMQ_HOST=${RABBITMQ_HOST-koku-rabbit}
- RABBITMQ_PORT=5672
- USE_RABBIT=${USE_RABBIT}
- DEVELOPMENT=${DEVELOPMENT-True}
- LOG_LEVEL=INFO
- DJANGO_SETTINGS_MODULE=koku.settings
- MASU_SECRET_KEY=abc
- prometheus_multiproc_dir=/tmp
- PROMETHEUS_PUSHGATEWAY=${PROMETHEUS_PUSHGATEWAY-pushgateway:9091}
- ENABLE_S3_ARCHIVING=${ENABLE_S3_ARCHIVING-False}
- ENABLE_PARQUET_PROCESSING=${ENABLE_PARQUET_PROCESSING-False}
- S3_BUCKET_NAME=${S3_BUCKET_NAME-koku-bucket}
- S3_BUCKET_PATH=${S3_BUCKET_PATH-data_archive}
- S3_ENDPOINT
- S3_ACCESS_KEY
- S3_SECRET
- PVC_DIR=${PVC_DIR-/testing/pvc_dir}
- GOOGLE_APPLICATION_CREDENTIALS=${GOOGLE_APPLICATION_CREDENTIALS}
- KOKU_CELERY_ENABLE_SENTRY
- KOKU_CELERY_SENTRY_DSN
- KOKU_SENTRY_ENVIRONMENT
- DEMO_ACCOUNTS
- INITIAL_INGEST_OVERRIDE=${INITIAL_INGEST_OVERRIDE-False}
- INITIAL_INGEST_NUM_MONTHS=${INITIAL_INGEST_NUM_MONTHS-2}
- AUTO_DATA_INGEST=${AUTO_DATA_INGEST-True}
- REPORT_PROCESSING_BATCH_SIZE=${REPORT_PROCESSING_BATCH_SIZE-100000}
- REPORT_PROCESSING_TIMEOUT_HOURS=${REPORT_PROCESSING_TIMEOUT_HOURS-2}
- PRESTO_HOST=${PRESTO_HOST-presto}
- PRESTO_PORT=${PRESTO_PORT-8080}
- DATE_OVERRIDE
- MASU_DEBUG

volumes:
- './../..:/koku/'

privileged: true
links:
- redis
depends_on:
- koku-base
- redis

networks:
default:
external:
Expand Down

0 comments on commit 0c68f06

Please sign in to comment.