Skip to content

Commit

Permalink
add additional logs
Browse files Browse the repository at this point in the history
  • Loading branch information
pablonyx committed Nov 8, 2024
1 parent 950b1c3 commit 5e25488
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 33 deletions.
3 changes: 2 additions & 1 deletion backend/alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ async def run_async_migrations() -> None:
)
except Exception as e:
logger.error(f"Error migrating schema {schema}: {e}")
raise
logger.warning("CONTINUING")
# raise
else:
try:
logger.info(f"Migrating schema: {schema_name}")
Expand Down
19 changes: 14 additions & 5 deletions backend/danswer/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,15 +440,24 @@ def connector_indexing_proxy_task(

if not index_attempt.is_finished():
continue
# After job.done() returns True
if job.process:
exit_code = job.process.exitcode
task_logger.info(
f"Job exit code: {exit_code} for attempt={index_attempt_id} "
f"tenant={tenant_id} cc_pair={cc_pair_id} search_settings={search_settings_id}"
)

if job.status == "error":
if job.exception_queue and not job.exception_queue.empty():
error_message = job.exception_queue.get()
else:
error_message = job.exception()
task_logger.error(
f"Indexing proxy - spawned task exceptioned: "
f"attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} "
f"error={job.exception()}"
f"attempt={index_attempt_id} tenant={tenant_id} "
f"cc_pair={cc_pair_id} search_settings={search_settings_id} "
f"error={error_message}"
)

job.release()
Expand Down
59 changes: 32 additions & 27 deletions backend/danswer/background/indexing/job_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
NOTE: cannot use Celery directly due to
https://github.com/celery/celery/issues/7007#issuecomment-1740139367"""
import traceback
from collections.abc import Callable
from dataclasses import dataclass
from multiprocessing import Process
from multiprocessing import Queue
from typing import Any
from typing import Literal
from typing import Optional
Expand Down Expand Up @@ -56,6 +58,8 @@ class SimpleJob:

id: int
process: Optional["Process"] = None
process: Optional[Process] = None
exception_info: Optional[str] = None

def cancel(self) -> bool:
return self.release()
Expand Down Expand Up @@ -89,18 +93,17 @@ def done(self) -> bool:
def exception(self) -> str:
"""Needed to match the Dask API, but not implemented since we don't currently
have a way to get back the exception information from the child process."""
return (
f"Job with ID '{self.id}' was killed or encountered an unhandled exception."
)

if self.exception_info:
return self.exception_info
else:
return f"Job with ID '{self.id}' was killed or encountered an unhandled exception."

class SimpleJobClient:
"""Drop in replacement for `dask.distributed.Client`"""

def __init__(self, n_workers: int = 1) -> None:
self.n_workers = n_workers
self.job_id_counter = 0
self.jobs: dict[int, SimpleJob] = {}
class SimpleJobClient:
def __init__(self):
self.jobs: list[SimpleJob] = []
self.job_counter = 0

def _cleanup_completed_jobs(self) -> None:
current_job_ids = list(self.jobs.keys())
Expand All @@ -110,22 +113,24 @@ def _cleanup_completed_jobs(self) -> None:
logger.debug(f"Cleaning up job with id: '{job.id}'")
del self.jobs[job.id]

def submit(self, func: Callable, *args: Any, pure: bool = True) -> SimpleJob | None:
"""NOTE: `pure` arg is needed so this can be a drop in replacement for Dask"""
self._cleanup_completed_jobs()
if len(self.jobs) >= self.n_workers:
logger.debug(
f"No available workers to run job. Currently running '{len(self.jobs)}' jobs, with a limit of '{self.n_workers}'."
)
return None

job_id = self.job_id_counter
self.job_id_counter += 1

process = Process(target=_run_in_process, args=(func, args), daemon=True)
job = SimpleJob(id=job_id, process=process)
process.start()

self.jobs[job_id] = job

def submit(self, func: Callable, *args, **kwargs) -> Optional[SimpleJob]:
self.job_counter += 1
job_id = self.job_counter

def wrapper(q, *args, **kwargs):
try:
func(*args, **kwargs)
except Exception:
error_trace = traceback.format_exc()
q.put(error_trace)
# Re-raise the exception to ensure the process exits with a non-zero code
raise

q = Queue()
p = Process(target=wrapper, args=(q, *args), kwargs=kwargs)
job = SimpleJob(id=job_id, process=p)
p.start()
job.process = p
job.exception_queue = q # Store the queue in the job object
self.jobs.append(job)
return job
1 change: 1 addition & 0 deletions deployment/cloud_kubernetes/workers/indexing_worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ spec:
"--hostname=indexing@%n",
"-Q",
"connector_indexing",
"--concurrency=1",
]
env:
- name: REDIS_PASSWORD
Expand Down

0 comments on commit 5e25488

Please sign in to comment.