diff --git a/backend/alembic/env.py b/backend/alembic/env.py index 019ea94b836..4b6693a24c8 100644 --- a/backend/alembic/env.py +++ b/backend/alembic/env.py @@ -132,7 +132,8 @@ async def run_async_migrations() -> None: ) except Exception as e: logger.error(f"Error migrating schema {schema}: {e}") - raise + logger.warning("CONTINUING") + # raise else: try: logger.info(f"Migrating schema: {schema_name}") diff --git a/backend/danswer/background/celery/tasks/indexing/tasks.py b/backend/danswer/background/celery/tasks/indexing/tasks.py index 666defd9586..41301ded31f 100644 --- a/backend/danswer/background/celery/tasks/indexing/tasks.py +++ b/backend/danswer/background/celery/tasks/indexing/tasks.py @@ -440,15 +440,24 @@ def connector_indexing_proxy_task( if not index_attempt.is_finished(): continue + # After job.done() returns True + if job.process: + exit_code = job.process.exitcode + task_logger.info( + f"Job exit code: {exit_code} for attempt={index_attempt_id} " + f"tenant={tenant_id} cc_pair={cc_pair_id} search_settings={search_settings_id}" + ) if job.status == "error": + if job.exception_queue and not job.exception_queue.empty(): + error_message = job.exception_queue.get() + else: + error_message = job.exception() task_logger.error( f"Indexing proxy - spawned task exceptioned: " - f"attempt={index_attempt_id} " - f"tenant={tenant_id} " - f"cc_pair={cc_pair_id} " - f"search_settings={search_settings_id} " - f"error={job.exception()}" + f"attempt={index_attempt_id} tenant={tenant_id} " + f"cc_pair={cc_pair_id} search_settings={search_settings_id} " + f"error={error_message}" ) job.release() diff --git a/backend/danswer/background/indexing/job_client.py b/backend/danswer/background/indexing/job_client.py index 6808a52c5ca..ec5bd12d123 100644 --- a/backend/danswer/background/indexing/job_client.py +++ b/backend/danswer/background/indexing/job_client.py @@ -4,9 +4,11 @@ NOTE: cannot use Celery directly due to https://github.com/celery/celery/issues/7007#issuecomment-1740139367""" +import traceback from collections.abc import Callable from dataclasses import dataclass from multiprocessing import Process +from multiprocessing import Queue from typing import Any from typing import Literal from typing import Optional @@ -56,6 +58,8 @@ class SimpleJob: id: int process: Optional["Process"] = None + process: Optional[Process] = None + exception_info: Optional[str] = None def cancel(self) -> bool: return self.release() @@ -89,18 +93,17 @@ def done(self) -> bool: def exception(self) -> str: """Needed to match the Dask API, but not implemented since we don't currently have a way to get back the exception information from the child process.""" - return ( - f"Job with ID '{self.id}' was killed or encountered an unhandled exception." - ) + if self.exception_info: + return self.exception_info + else: + return f"Job with ID '{self.id}' was killed or encountered an unhandled exception." -class SimpleJobClient: - """Drop in replacement for `dask.distributed.Client`""" - def __init__(self, n_workers: int = 1) -> None: - self.n_workers = n_workers - self.job_id_counter = 0 - self.jobs: dict[int, SimpleJob] = {} +class SimpleJobClient: + def __init__(self): + self.jobs: list[SimpleJob] = [] + self.job_counter = 0 def _cleanup_completed_jobs(self) -> None: current_job_ids = list(self.jobs.keys()) @@ -110,22 +113,24 @@ def _cleanup_completed_jobs(self) -> None: logger.debug(f"Cleaning up job with id: '{job.id}'") del self.jobs[job.id] - def submit(self, func: Callable, *args: Any, pure: bool = True) -> SimpleJob | None: - """NOTE: `pure` arg is needed so this can be a drop in replacement for Dask""" - self._cleanup_completed_jobs() - if len(self.jobs) >= self.n_workers: - logger.debug( - f"No available workers to run job. Currently running '{len(self.jobs)}' jobs, with a limit of '{self.n_workers}'." - ) - return None - - job_id = self.job_id_counter - self.job_id_counter += 1 - - process = Process(target=_run_in_process, args=(func, args), daemon=True) - job = SimpleJob(id=job_id, process=process) - process.start() - - self.jobs[job_id] = job - + def submit(self, func: Callable, *args, **kwargs) -> Optional[SimpleJob]: + self.job_counter += 1 + job_id = self.job_counter + + def wrapper(q, *args, **kwargs): + try: + func(*args, **kwargs) + except Exception: + error_trace = traceback.format_exc() + q.put(error_trace) + # Re-raise the exception to ensure the process exits with a non-zero code + raise + + q = Queue() + p = Process(target=wrapper, args=(q, *args), kwargs=kwargs) + job = SimpleJob(id=job_id, process=p) + p.start() + job.process = p + job.exception_queue = q # Store the queue in the job object + self.jobs.append(job) return job diff --git a/deployment/cloud_kubernetes/workers/indexing_worker.yaml b/deployment/cloud_kubernetes/workers/indexing_worker.yaml index 98158f62ef8..73a0a54dbcb 100644 --- a/deployment/cloud_kubernetes/workers/indexing_worker.yaml +++ b/deployment/cloud_kubernetes/workers/indexing_worker.yaml @@ -26,6 +26,7 @@ spec: "--hostname=indexing@%n", "-Q", "connector_indexing", + "--concurrency=1", ] env: - name: REDIS_PASSWORD