Skip to content

Commit

Permalink
Merge pull request #3602 from onyx-dot-app/bugfix/lock_not_owned
Browse files Browse the repository at this point in the history
various lock diagnostics and timing adjustments
  • Loading branch information
rkuo-danswer authored Jan 5, 2025
2 parents 1db778b + 6fcd712 commit 27699c8
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 27 deletions.
40 changes: 14 additions & 26 deletions backend/onyx/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from datetime import timezone
from http import HTTPStatus
from time import sleep
from typing import cast

import redis
import sentry_sdk
Expand Down Expand Up @@ -63,6 +62,7 @@
from onyx.redis.redis_connector_index import RedisConnectorIndex
from onyx.redis.redis_connector_index import RedisConnectorIndexPayload
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import redis_lock_dump
from onyx.utils.logger import setup_logger
from onyx.utils.variable_functionality import global_version
from shared_configs.configs import INDEXING_MODEL_SERVER_HOST
Expand Down Expand Up @@ -95,6 +95,7 @@ def __init__(

self.last_tag: str = "IndexingCallback.__init__"
self.last_lock_reacquire: datetime = datetime.now(timezone.utc)
self.last_lock_monotonic = time.monotonic()

self.last_parent_check = time.monotonic()

Expand Down Expand Up @@ -122,9 +123,15 @@ def progress(self, tag: str, amount: int) -> None:
# self.last_parent_check = now

try:
self.redis_lock.reacquire()
current_time = time.monotonic()
if current_time - self.last_lock_monotonic >= (
CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT / 4
):
self.redis_lock.reacquire()
self.last_lock_reacquire = datetime.now(timezone.utc)
self.last_lock_monotonic = time.monotonic()

self.last_tag = tag
self.last_lock_reacquire = datetime.now(timezone.utc)
except LockError:
logger.exception(
f"IndexingCallback - lock.reacquire exceptioned: "
Expand All @@ -135,29 +142,7 @@ def progress(self, tag: str, amount: int) -> None:
f"now={datetime.now(timezone.utc)}"
)

# diagnostic logging for lock errors
name = self.redis_lock.name
ttl = self.redis_client.ttl(name)
locked = self.redis_lock.locked()
owned = self.redis_lock.owned()
local_token: str | None = self.redis_lock.local.token # type: ignore

remote_token_raw = self.redis_client.get(self.redis_lock.name)
if remote_token_raw:
remote_token_bytes = cast(bytes, remote_token_raw)
remote_token = remote_token_bytes.decode("utf-8")
else:
remote_token = None

logger.warning(
f"IndexingCallback - lock diagnostics: "
f"name={name} "
f"locked={locked} "
f"owned={owned} "
f"local_token={local_token} "
f"remote_token={remote_token} "
f"ttl={ttl}"
)
redis_lock_dump(self.redis_lock, self.redis_client)
raise

self.redis_client.incrby(self.generator_progress_key, amount)
Expand Down Expand Up @@ -349,6 +334,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
# Fail any index attempts in the DB that don't have fences
# This shouldn't ever happen!
with get_session_with_tenant(tenant_id) as db_session:
lock_beat.reacquire()
unfenced_attempt_ids = get_unfenced_index_attempt_ids(
db_session, redis_client
)
Expand All @@ -372,6 +358,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:

# we want to run this less frequently than the overall task
if not redis_client.exists(OnyxRedisSignals.VALIDATE_INDEXING_FENCES):
lock_beat.reacquire()
# clear any indexing fences that don't have associated celery tasks in progress
# tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
# or be currently executing
Expand Down Expand Up @@ -399,6 +386,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
"check_for_indexing - Lock not owned on completion: "
f"tenant={tenant_id}"
)
redis_lock_dump(lock_beat, redis_client)

time_elapsed = time.monotonic() - time_start
task_logger.debug(f"check_for_indexing finished: elapsed={time_elapsed:.2f}")
Expand Down
12 changes: 12 additions & 0 deletions backend/onyx/background/celery/tasks/vespa/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
from onyx.redis.redis_connector_prune import RedisConnectorPrune
from onyx.redis.redis_document_set import RedisDocumentSet
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import redis_lock_dump
from onyx.redis.redis_usergroup import RedisUserGroup
from onyx.utils.logger import setup_logger
from onyx.utils.variable_functionality import fetch_versioned_implementation
Expand Down Expand Up @@ -111,6 +112,7 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> bool | No
)

# region document set scan
lock_beat.reacquire()
document_set_ids: list[int] = []
with get_session_with_tenant(tenant_id) as db_session:
# check if any document sets are not synced
Expand All @@ -122,6 +124,7 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> bool | No
document_set_ids.append(document_set.id)

for document_set_id in document_set_ids:
lock_beat.reacquire()
with get_session_with_tenant(tenant_id) as db_session:
try_generate_document_set_sync_tasks(
self.app, document_set_id, db_session, r, lock_beat, tenant_id
Expand All @@ -130,6 +133,8 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> bool | No

# check if any user groups are not synced
if global_version.is_ee_version():
lock_beat.reacquire()

try:
fetch_user_groups = fetch_versioned_implementation(
"onyx.db.user_group", "fetch_user_groups"
Expand All @@ -149,6 +154,7 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> bool | No
usergroup_ids.append(usergroup.id)

for usergroup_id in usergroup_ids:
lock_beat.reacquire()
with get_session_with_tenant(tenant_id) as db_session:
try_generate_user_group_sync_tasks(
self.app, usergroup_id, db_session, r, lock_beat, tenant_id
Expand All @@ -163,6 +169,12 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> bool | No
finally:
if lock_beat.owned():
lock_beat.release()
else:
task_logger.error(
"check_for_vespa_sync_task - Lock not owned on completion: "
f"tenant={tenant_id}"
)
redis_lock_dump(lock_beat, r)

time_elapsed = time.monotonic() - time_start
task_logger.debug(f"check_for_vespa_sync_task finished: elapsed={time_elapsed:.2f}")
Expand Down
5 changes: 4 additions & 1 deletion backend/onyx/configs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@
KV_CUSTOM_ANALYTICS_SCRIPT_KEY = "__custom_analytics_script__"
KV_DOCUMENTS_SEEDED_KEY = "documents_seeded"

CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT = 60
# NOTE: we use this timeout / 4 in various places to refresh a lock
# might be worth separating this timeout into separate timeouts for each situation
CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT = 120

CELERY_PRIMARY_WORKER_LOCK_TIMEOUT = 120

# needs to be long enough to cover the maximum time it takes to download an object
Expand Down
2 changes: 2 additions & 0 deletions backend/onyx/redis/redis_document_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.db.document_set import construct_document_select_by_docset
from onyx.db.models import Document
from onyx.redis.redis_object_helper import RedisObjectHelper


Expand Down Expand Up @@ -60,6 +61,7 @@ def generate_tasks(
async_results = []
stmt = construct_document_select_by_docset(int(self._id), current_only=False)
for doc in db_session.scalars(stmt).yield_per(1):
doc = cast(Document, doc)
current_time = time.monotonic()
if current_time - last_lock_time >= (
CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT / 4
Expand Down
28 changes: 28 additions & 0 deletions backend/onyx/redis/redis_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import threading
from collections.abc import Callable
from typing import Any
from typing import cast
from typing import Optional

import redis
from fastapi import Request
from redis import asyncio as aioredis
from redis.client import Redis
from redis.lock import Lock as RedisLock

from onyx.configs.app_configs import REDIS_AUTH_KEY_PREFIX
from onyx.configs.app_configs import REDIS_DB_NUMBER
Expand Down Expand Up @@ -262,3 +264,29 @@ async def retrieve_auth_token_data_from_redis(request: Request) -> dict | None:
raise ValueError(
f"Unexpected error in retrieve_auth_token_data_from_redis: {str(e)}"
)


def redis_lock_dump(lock: RedisLock, r: Redis) -> None:
# diagnostic logging for lock errors
name = lock.name
ttl = r.ttl(name)
locked = lock.locked()
owned = lock.owned()
local_token: str | None = lock.local.token # type: ignore

remote_token_raw = r.get(lock.name)
if remote_token_raw:
remote_token_bytes = cast(bytes, remote_token_raw)
remote_token = remote_token_bytes.decode("utf-8")
else:
remote_token = None

logger.warning(
f"RedisLock diagnostic logging: "
f"name={name} "
f"locked={locked} "
f"owned={owned} "
f"local_token={local_token} "
f"remote_token={remote_token} "
f"ttl={ttl}"
)

0 comments on commit 27699c8

Please sign in to comment.