Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wait for db before allowing worker to proceed (reduces error spam on … #3079

Merged
merged 4 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 109 additions & 9 deletions backend/danswer/background/celery/apps/app_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import time
from typing import Any

import requests
import sentry_sdk
from celery import Task
from celery.app import trace
Expand All @@ -11,11 +12,15 @@
from celery.utils.log import get_task_logger
from celery.worker import strategy # type: ignore
from sentry_sdk.integrations.celery import CeleryIntegration
from sqlalchemy import text
from sqlalchemy.orm import Session

from danswer.background.celery.apps.task_formatters import CeleryTaskColoredFormatter
from danswer.background.celery.apps.task_formatters import CeleryTaskPlainFormatter
from danswer.background.celery.celery_utils import celery_is_worker_primary
from danswer.configs.constants import DanswerRedisLocks
from danswer.db.engine import get_sqlalchemy_engine
from danswer.document_index.vespa_constants import VESPA_CONFIG_SERVER_URL
from danswer.redis.redis_connector import RedisConnector
from danswer.redis.redis_connector_credential_pair import RedisConnectorCredentialPair
from danswer.redis.redis_connector_delete import RedisConnectorDelete
Expand Down Expand Up @@ -139,35 +144,130 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None


def wait_for_redis(sender: Any, **kwargs: Any) -> None:
"""Waits for redis to become ready subject to a hardcoded timeout.
Will raise WorkerShutdown to kill the celery worker if the timeout is reached."""

r = get_redis_client(tenant_id=None)

WAIT_INTERVAL = 5
WAIT_LIMIT = 60

ready = False
time_start = time.monotonic()
logger.info("Redis: Readiness check starting.")
logger.info("Redis: Readiness probe starting.")
while True:
try:
if r.ping():
ready = True
break
except Exception:
pass

time_elapsed = time.monotonic() - time_start
if time_elapsed > WAIT_LIMIT:
break

logger.info(
f"Redis: Ping failed. elapsed={time_elapsed:.1f} timeout={WAIT_LIMIT:.1f}"
f"Redis: Readiness probe ongoing. elapsed={time_elapsed:.1f} timeout={WAIT_LIMIT:.1f}"
)

time.sleep(WAIT_INTERVAL)

if not ready:
msg = (
f"Redis: Readiness probe did not succeed within the timeout "
f"({WAIT_LIMIT} seconds). Exiting..."
)
logger.error(msg)
raise WorkerShutdown(msg)

logger.info("Redis: Readiness probe succeeded. Continuing...")
return


def wait_for_db(sender: Any, **kwargs: Any) -> None:
"""Waits for the db to become ready subject to a hardcoded timeout.
Will raise WorkerShutdown to kill the celery worker if the timeout is reached."""

WAIT_INTERVAL = 5
WAIT_LIMIT = 60

ready = False
time_start = time.monotonic()
logger.info("Database: Readiness probe starting.")
while True:
try:
with Session(get_sqlalchemy_engine()) as db_session:
result = db_session.execute(text("SELECT NOW()")).scalar()
if result:
ready = True
break
except Exception:
pass

time_elapsed = time.monotonic() - time_start
if time_elapsed > WAIT_LIMIT:
msg = (
f"Redis: Readiness check did not succeed within the timeout "
f"({WAIT_LIMIT} seconds). Exiting..."
)
logger.error(msg)
raise WorkerShutdown(msg)
break

logger.info(
f"Database: Readiness probe ongoing. elapsed={time_elapsed:.1f} timeout={WAIT_LIMIT:.1f}"
)

time.sleep(WAIT_INTERVAL)

if not ready:
msg = (
f"Database: Readiness probe did not succeed within the timeout "
f"({WAIT_LIMIT} seconds). Exiting..."
)
logger.error(msg)
raise WorkerShutdown(msg)

logger.info("Database: Readiness probe succeeded. Continuing...")
return


def wait_for_vespa(sender: Any, **kwargs: Any) -> None:
"""Waits for Vespa to become ready subject to a hardcoded timeout.
Will raise WorkerShutdown to kill the celery worker if the timeout is reached."""

WAIT_INTERVAL = 5
WAIT_LIMIT = 60

ready = False
time_start = time.monotonic()
logger.info("Vespa: Readiness probe starting.")
while True:
try:
response = requests.get(f"{VESPA_CONFIG_SERVER_URL}/state/v1/health")
response.raise_for_status()

response_dict = response.json()
if response_dict["status"]["code"] == "up":
ready = True
break
except Exception:
pass

time_elapsed = time.monotonic() - time_start
if time_elapsed > WAIT_LIMIT:
break

logger.info(
f"Vespa: Readiness probe ongoing. elapsed={time_elapsed:.1f} timeout={WAIT_LIMIT:.1f}"
)

time.sleep(WAIT_INTERVAL)

logger.info("Redis: Readiness check succeeded. Continuing...")
if not ready:
msg = (
f"Vespa: Readiness probe did not succeed within the timeout "
f"({WAIT_LIMIT} seconds). Exiting..."
)
logger.error(msg)
raise WorkerShutdown(msg)

logger.info("Vespa: Readiness probe succeeded. Continuing...")
return


Expand Down
2 changes: 2 additions & 0 deletions backend/danswer/background/celery/apps/heavy.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
SqlEngine.init_engine(pool_size=4, max_overflow=12)

app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)
app_base.wait_for_vespa(sender, **kwargs)
app_base.on_secondary_worker_init(sender, **kwargs)


Expand Down
2 changes: 2 additions & 0 deletions backend/danswer/background/celery/apps/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
SqlEngine.init_engine(pool_size=8, max_overflow=0)

app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)
app_base.wait_for_vespa(sender, **kwargs)
app_base.on_secondary_worker_init(sender, **kwargs)


Expand Down
2 changes: 2 additions & 0 deletions backend/danswer/background/celery/apps/light.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=8)

app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)
app_base.wait_for_vespa(sender, **kwargs)
app_base.on_secondary_worker_init(sender, **kwargs)


Expand Down
2 changes: 2 additions & 0 deletions backend/danswer/background/celery/apps/primary.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
SqlEngine.init_engine(pool_size=8, max_overflow=0)

app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)
app_base.wait_for_vespa(sender, **kwargs)

logger.info("Running as the primary celery worker.")

Expand Down
Loading