diff --git a/backend/onyx/background/celery/tasks/indexing/tasks.py b/backend/onyx/background/celery/tasks/indexing/tasks.py index c6c9b05476d..366f8cc23a4 100644 --- a/backend/onyx/background/celery/tasks/indexing/tasks.py +++ b/backend/onyx/background/celery/tasks/indexing/tasks.py @@ -5,7 +5,6 @@ from datetime import timezone from http import HTTPStatus from time import sleep -from typing import cast import redis import sentry_sdk @@ -63,6 +62,7 @@ from onyx.redis.redis_connector_index import RedisConnectorIndex from onyx.redis.redis_connector_index import RedisConnectorIndexPayload from onyx.redis.redis_pool import get_redis_client +from onyx.redis.redis_pool import redis_lock_dump from onyx.utils.logger import setup_logger from onyx.utils.variable_functionality import global_version from shared_configs.configs import INDEXING_MODEL_SERVER_HOST @@ -95,6 +95,7 @@ def __init__( self.last_tag: str = "IndexingCallback.__init__" self.last_lock_reacquire: datetime = datetime.now(timezone.utc) + self.last_lock_monotonic = time.monotonic() self.last_parent_check = time.monotonic() @@ -122,9 +123,15 @@ def progress(self, tag: str, amount: int) -> None: # self.last_parent_check = now try: - self.redis_lock.reacquire() + current_time = time.monotonic() + if current_time - self.last_lock_monotonic >= ( + CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT / 4 + ): + self.redis_lock.reacquire() + self.last_lock_reacquire = datetime.now(timezone.utc) + self.last_lock_monotonic = time.monotonic() + self.last_tag = tag - self.last_lock_reacquire = datetime.now(timezone.utc) except LockError: logger.exception( f"IndexingCallback - lock.reacquire exceptioned: " @@ -135,29 +142,7 @@ def progress(self, tag: str, amount: int) -> None: f"now={datetime.now(timezone.utc)}" ) - # diagnostic logging for lock errors - name = self.redis_lock.name - ttl = self.redis_client.ttl(name) - locked = self.redis_lock.locked() - owned = self.redis_lock.owned() - local_token: str | None = self.redis_lock.local.token # type: ignore - - remote_token_raw = self.redis_client.get(self.redis_lock.name) - if remote_token_raw: - remote_token_bytes = cast(bytes, remote_token_raw) - remote_token = remote_token_bytes.decode("utf-8") - else: - remote_token = None - - logger.warning( - f"IndexingCallback - lock diagnostics: " - f"name={name} " - f"locked={locked} " - f"owned={owned} " - f"local_token={local_token} " - f"remote_token={remote_token} " - f"ttl={ttl}" - ) + redis_lock_dump(self.redis_lock, self.redis_client) raise self.redis_client.incrby(self.generator_progress_key, amount) @@ -349,6 +334,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: # Fail any index attempts in the DB that don't have fences # This shouldn't ever happen! with get_session_with_tenant(tenant_id) as db_session: + lock_beat.reacquire() unfenced_attempt_ids = get_unfenced_index_attempt_ids( db_session, redis_client ) @@ -372,6 +358,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: # we want to run this less frequently than the overall task if not redis_client.exists(OnyxRedisSignals.VALIDATE_INDEXING_FENCES): + lock_beat.reacquire() # clear any indexing fences that don't have associated celery tasks in progress # tasks can be in the queue in redis, in reserved tasks (prefetched by the worker), # or be currently executing @@ -399,6 +386,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: "check_for_indexing - Lock not owned on completion: " f"tenant={tenant_id}" ) + redis_lock_dump(lock_beat, redis_client) time_elapsed = time.monotonic() - time_start task_logger.debug(f"check_for_indexing finished: elapsed={time_elapsed:.2f}") diff --git a/backend/onyx/background/celery/tasks/vespa/tasks.py b/backend/onyx/background/celery/tasks/vespa/tasks.py index 7bef0bcca40..57d8793eadc 100644 --- a/backend/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/onyx/background/celery/tasks/vespa/tasks.py @@ -68,6 +68,7 @@ from onyx.redis.redis_connector_prune import RedisConnectorPrune from onyx.redis.redis_document_set import RedisDocumentSet from onyx.redis.redis_pool import get_redis_client +from onyx.redis.redis_pool import redis_lock_dump from onyx.redis.redis_usergroup import RedisUserGroup from onyx.utils.logger import setup_logger from onyx.utils.variable_functionality import fetch_versioned_implementation @@ -111,6 +112,7 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> bool | No ) # region document set scan + lock_beat.reacquire() document_set_ids: list[int] = [] with get_session_with_tenant(tenant_id) as db_session: # check if any document sets are not synced @@ -122,6 +124,7 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> bool | No document_set_ids.append(document_set.id) for document_set_id in document_set_ids: + lock_beat.reacquire() with get_session_with_tenant(tenant_id) as db_session: try_generate_document_set_sync_tasks( self.app, document_set_id, db_session, r, lock_beat, tenant_id @@ -130,6 +133,8 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> bool | No # check if any user groups are not synced if global_version.is_ee_version(): + lock_beat.reacquire() + try: fetch_user_groups = fetch_versioned_implementation( "onyx.db.user_group", "fetch_user_groups" @@ -149,6 +154,7 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> bool | No usergroup_ids.append(usergroup.id) for usergroup_id in usergroup_ids: + lock_beat.reacquire() with get_session_with_tenant(tenant_id) as db_session: try_generate_user_group_sync_tasks( self.app, usergroup_id, db_session, r, lock_beat, tenant_id @@ -163,6 +169,12 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> bool | No finally: if lock_beat.owned(): lock_beat.release() + else: + task_logger.error( + "check_for_vespa_sync_task - Lock not owned on completion: " + f"tenant={tenant_id}" + ) + redis_lock_dump(lock_beat, r) time_elapsed = time.monotonic() - time_start task_logger.debug(f"check_for_vespa_sync_task finished: elapsed={time_elapsed:.2f}") diff --git a/backend/onyx/configs/constants.py b/backend/onyx/configs/constants.py index 302d7d147be..7c3349b3e5f 100644 --- a/backend/onyx/configs/constants.py +++ b/backend/onyx/configs/constants.py @@ -76,7 +76,10 @@ KV_CUSTOM_ANALYTICS_SCRIPT_KEY = "__custom_analytics_script__" KV_DOCUMENTS_SEEDED_KEY = "documents_seeded" -CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT = 60 +# NOTE: we use this timeout / 4 in various places to refresh a lock +# might be worth separating this timeout into separate timeouts for each situation +CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT = 120 + CELERY_PRIMARY_WORKER_LOCK_TIMEOUT = 120 # needs to be long enough to cover the maximum time it takes to download an object diff --git a/backend/onyx/redis/redis_document_set.py b/backend/onyx/redis/redis_document_set.py index 6ace3e325c2..1433cb04aed 100644 --- a/backend/onyx/redis/redis_document_set.py +++ b/backend/onyx/redis/redis_document_set.py @@ -13,6 +13,7 @@ from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask from onyx.db.document_set import construct_document_select_by_docset +from onyx.db.models import Document from onyx.redis.redis_object_helper import RedisObjectHelper @@ -60,6 +61,7 @@ def generate_tasks( async_results = [] stmt = construct_document_select_by_docset(int(self._id), current_only=False) for doc in db_session.scalars(stmt).yield_per(1): + doc = cast(Document, doc) current_time = time.monotonic() if current_time - last_lock_time >= ( CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT / 4 diff --git a/backend/onyx/redis/redis_pool.py b/backend/onyx/redis/redis_pool.py index 9bb0808b00a..e253caaf00e 100644 --- a/backend/onyx/redis/redis_pool.py +++ b/backend/onyx/redis/redis_pool.py @@ -4,12 +4,14 @@ import threading from collections.abc import Callable from typing import Any +from typing import cast from typing import Optional import redis from fastapi import Request from redis import asyncio as aioredis from redis.client import Redis +from redis.lock import Lock as RedisLock from onyx.configs.app_configs import REDIS_AUTH_KEY_PREFIX from onyx.configs.app_configs import REDIS_DB_NUMBER @@ -262,3 +264,29 @@ async def retrieve_auth_token_data_from_redis(request: Request) -> dict | None: raise ValueError( f"Unexpected error in retrieve_auth_token_data_from_redis: {str(e)}" ) + + +def redis_lock_dump(lock: RedisLock, r: Redis) -> None: + # diagnostic logging for lock errors + name = lock.name + ttl = r.ttl(name) + locked = lock.locked() + owned = lock.owned() + local_token: str | None = lock.local.token # type: ignore + + remote_token_raw = r.get(lock.name) + if remote_token_raw: + remote_token_bytes = cast(bytes, remote_token_raw) + remote_token = remote_token_bytes.decode("utf-8") + else: + remote_token = None + + logger.warning( + f"RedisLock diagnostic logging: " + f"name={name} " + f"locked={locked} " + f"owned={owned} " + f"local_token={local_token} " + f"remote_token={remote_token} " + f"ttl={ttl}" + )