From 5503a06eb7ccafb3ed68f8a65270b862d2e8d259 Mon Sep 17 00:00:00 2001 From: pablodanswer Date: Sun, 10 Nov 2024 14:48:46 -0800 Subject: [PATCH] log! --- .../danswer/background/celery/apps/beat.py | 1 + .../background/celery/apps/scheduler.py | 96 ------------------- .../celery-worker-primary-scaledobject.yaml | 4 +- .../indexing-model-server-scaledobject.yaml | 2 +- deployment/cloud_kubernetes/workers/beat.yaml | 2 +- .../workers/heavy_worker.yaml | 2 +- .../workers/indexing_worker.yaml | 2 +- .../workers/light_worker.yaml | 2 +- .../cloud_kubernetes/workers/primary.yaml | 2 +- 9 files changed, 9 insertions(+), 104 deletions(-) delete mode 100644 backend/danswer/background/celery/apps/scheduler.py diff --git a/backend/danswer/background/celery/apps/beat.py b/backend/danswer/background/celery/apps/beat.py index 8842343ffae..12b9ad97daa 100644 --- a/backend/danswer/background/celery/apps/beat.py +++ b/backend/danswer/background/celery/apps/beat.py @@ -55,6 +55,7 @@ def _update_tenant_tasks(self) -> None: logger.info("Fetching all tenant IDs") tenant_ids = get_all_tenant_ids() logger.info(f"Found {len(tenant_ids)} tenants") + logger.info(f"Tenant IDs: {', '.join(str(id) for id in tenant_ids)}") logger.info("Fetching tasks to schedule") tasks_to_schedule = fetch_versioned_implementation( diff --git a/backend/danswer/background/celery/apps/scheduler.py b/backend/danswer/background/celery/apps/scheduler.py deleted file mode 100644 index 3ddf1dc169c..00000000000 --- a/backend/danswer/background/celery/apps/scheduler.py +++ /dev/null @@ -1,96 +0,0 @@ -from datetime import timedelta -from typing import Any - -from celery.beat import PersistentScheduler # type: ignore -from celery.utils.log import get_task_logger - -from danswer.db.engine import get_all_tenant_ids -from danswer.utils.variable_functionality import fetch_versioned_implementation - -logger = get_task_logger(__name__) - - -class DynamicTenantScheduler(PersistentScheduler): - def __init__(self, *args: Any, **kwargs: Any) -> None: - super().__init__(*args, **kwargs) - self._reload_interval = timedelta(minutes=1) - self._last_reload = self.app.now() - self._reload_interval - - def setup_schedule(self) -> None: - super().setup_schedule() - - def tick(self) -> float: - retval = super().tick() - now = self.app.now() - if ( - self._last_reload is None - or (now - self._last_reload) > self._reload_interval - ): - logger.info("Reloading schedule to check for new tenants...") - self._update_tenant_tasks() - self._last_reload = now - return retval - - def _update_tenant_tasks(self) -> None: - logger.info("Checking for tenant task updates...") - try: - tenant_ids = get_all_tenant_ids() - tasks_to_schedule = fetch_versioned_implementation( - "danswer.background.celery.tasks.beat_schedule", "get_tasks_to_schedule" - ) - - new_beat_schedule: dict[str, dict[str, Any]] = {} - - current_schedule = getattr(self, "_store", {"entries": {}}).get( - "entries", {} - ) - - existing_tenants = set() - for task_name in current_schedule.keys(): - if "-" in task_name: - existing_tenants.add(task_name.split("-")[-1]) - - for tenant_id in tenant_ids: - if tenant_id not in existing_tenants: - logger.info(f"Found new tenant: {tenant_id}") - - for task in tasks_to_schedule(): - task_name = f"{task['name']}-{tenant_id}" - new_task = { - "task": task["task"], - "schedule": task["schedule"], - "kwargs": {"tenant_id": tenant_id}, - } - if options := task.get("options"): - new_task["options"] = options - new_beat_schedule[task_name] = new_task - - if self._should_update_schedule(current_schedule, new_beat_schedule): - logger.info( - "Updating schedule", - extra={ - "new_tasks": len(new_beat_schedule), - "current_tasks": len(current_schedule), - }, - ) - if not hasattr(self, "_store"): - self._store: dict[str, dict] = {"entries": {}} - self.update_from_dict(new_beat_schedule) - logger.info(f"New schedule: {new_beat_schedule}") - - logger.info("Tenant tasks updated successfully") - else: - logger.debug("No schedule updates needed") - - except (AttributeError, KeyError): - logger.exception("Failed to process task configuration") - except Exception: - logger.exception("Unexpected error updating tenant tasks") - - def _should_update_schedule( - self, current_schedule: dict, new_schedule: dict - ) -> bool: - """Compare schedules to determine if an update is needed.""" - current_tasks = set(current_schedule.keys()) - new_tasks = set(new_schedule.keys()) - return current_tasks != new_tasks diff --git a/deployment/cloud_kubernetes/keda/celery-worker-primary-scaledobject.yaml b/deployment/cloud_kubernetes/keda/celery-worker-primary-scaledobject.yaml index 60f97e3b56c..2c535b2c822 100644 --- a/deployment/cloud_kubernetes/keda/celery-worker-primary-scaledobject.yaml +++ b/deployment/cloud_kubernetes/keda/celery-worker-primary-scaledobject.yaml @@ -10,8 +10,8 @@ spec: name: celery-worker-primary pollingInterval: 15 # Check every 15 seconds cooldownPeriod: 30 # Wait 30 seconds before scaling down - minReplicaCount: 0 - maxReplicaCount: 0 + minReplicaCount: 1 + maxReplicaCount: 10 triggers: - type: redis metadata: diff --git a/deployment/cloud_kubernetes/keda/indexing-model-server-scaledobject.yaml b/deployment/cloud_kubernetes/keda/indexing-model-server-scaledobject.yaml index 072124ab5d9..efadb716df1 100644 --- a/deployment/cloud_kubernetes/keda/indexing-model-server-scaledobject.yaml +++ b/deployment/cloud_kubernetes/keda/indexing-model-server-scaledobject.yaml @@ -11,7 +11,7 @@ spec: pollingInterval: 15 # Check every 15 seconds cooldownPeriod: 30 # Wait 30 seconds before scaling down minReplicaCount: 1 - maxReplicaCount: 5 + maxReplicaCount: 12 triggers: - type: cpu metadata: diff --git a/deployment/cloud_kubernetes/workers/beat.yaml b/deployment/cloud_kubernetes/workers/beat.yaml index 323558fc712..c9bc443dc91 100644 --- a/deployment/cloud_kubernetes/workers/beat.yaml +++ b/deployment/cloud_kubernetes/workers/beat.yaml @@ -14,7 +14,7 @@ spec: spec: containers: - name: celery-beat - image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.7 + image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.8 imagePullPolicy: Always command: [ diff --git a/deployment/cloud_kubernetes/workers/heavy_worker.yaml b/deployment/cloud_kubernetes/workers/heavy_worker.yaml index f6d9dd1d541..76f8686764b 100644 --- a/deployment/cloud_kubernetes/workers/heavy_worker.yaml +++ b/deployment/cloud_kubernetes/workers/heavy_worker.yaml @@ -14,7 +14,7 @@ spec: spec: containers: - name: celery-worker-heavy - image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.7 + image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.8 imagePullPolicy: Always command: [ diff --git a/deployment/cloud_kubernetes/workers/indexing_worker.yaml b/deployment/cloud_kubernetes/workers/indexing_worker.yaml index 491ad34cbfd..3c89b98ba44 100644 --- a/deployment/cloud_kubernetes/workers/indexing_worker.yaml +++ b/deployment/cloud_kubernetes/workers/indexing_worker.yaml @@ -14,7 +14,7 @@ spec: spec: containers: - name: celery-worker-indexing - image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.7 + image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.8 imagePullPolicy: Always command: [ diff --git a/deployment/cloud_kubernetes/workers/light_worker.yaml b/deployment/cloud_kubernetes/workers/light_worker.yaml index 4ef7c909b6d..62a249686cb 100644 --- a/deployment/cloud_kubernetes/workers/light_worker.yaml +++ b/deployment/cloud_kubernetes/workers/light_worker.yaml @@ -14,7 +14,7 @@ spec: spec: containers: - name: celery-worker-light - image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.7 + image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.8 imagePullPolicy: Always command: [ diff --git a/deployment/cloud_kubernetes/workers/primary.yaml b/deployment/cloud_kubernetes/workers/primary.yaml index bc4378d193a..a8a4e9130c3 100644 --- a/deployment/cloud_kubernetes/workers/primary.yaml +++ b/deployment/cloud_kubernetes/workers/primary.yaml @@ -14,7 +14,7 @@ spec: spec: containers: - name: celery-worker-primary - image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.7 + image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.8 imagePullPolicy: Always command: [