Skip to content

Commit

Permalink
improved logging
Browse files Browse the repository at this point in the history
  • Loading branch information
pablonyx committed Nov 8, 2024
1 parent 5e25488 commit f91bac1
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 22 deletions.
3 changes: 1 addition & 2 deletions backend/danswer/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,6 @@ def connector_indexing_proxy_task(
search_settings_id,
tenant_id,
global_version.is_ee_version(),
pure=False,
)

if not job:
Expand Down Expand Up @@ -440,7 +439,7 @@ def connector_indexing_proxy_task(

if not index_attempt.is_finished():
continue
# After job.done() returns True

if job.process:
exit_code = job.process.exitcode
task_logger.info(
Expand Down
44 changes: 24 additions & 20 deletions backend/danswer/background/indexing/job_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import traceback
from collections.abc import Callable
from dataclasses import dataclass
from functools import partial
from multiprocessing import Process
from multiprocessing import Queue
from typing import Any
Expand Down Expand Up @@ -58,8 +59,8 @@ class SimpleJob:

id: int
process: Optional["Process"] = None
process: Optional[Process] = None
exception_info: Optional[str] = None
exception_queue: Optional[Queue] = None

def cancel(self) -> bool:
return self.release()
Expand Down Expand Up @@ -100,10 +101,21 @@ def exception(self) -> str:
return f"Job with ID '{self.id}' was killed or encountered an unhandled exception."


def _wrapper(q: Queue, func: Callable, *args: Any, **kwargs: Any) -> None:
try:
func(*args, **kwargs)
except Exception:
error_trace = traceback.format_exc()
q.put(error_trace)
# Re-raise the exception to ensure the process exits with a non-zero code
raise


class SimpleJobClient:
def __init__(self):
self.jobs: list[SimpleJob] = []
self.job_counter = 0
def __init__(self, n_workers: int = 1) -> None:
self.n_workers = n_workers
self.job_id_counter = 0
self.jobs: dict[int, SimpleJob] = {}

def _cleanup_completed_jobs(self) -> None:
current_job_ids = list(self.jobs.keys())
Expand All @@ -113,24 +125,16 @@ def _cleanup_completed_jobs(self) -> None:
logger.debug(f"Cleaning up job with id: '{job.id}'")
del self.jobs[job.id]

def submit(self, func: Callable, *args, **kwargs) -> Optional[SimpleJob]:
self.job_counter += 1
job_id = self.job_counter

def wrapper(q, *args, **kwargs):
try:
func(*args, **kwargs)
except Exception:
error_trace = traceback.format_exc()
q.put(error_trace)
# Re-raise the exception to ensure the process exits with a non-zero code
raise

q = Queue()
p = Process(target=wrapper, args=(q, *args), kwargs=kwargs)
def submit(self, func: Callable, *args: Any, **kwargs: Any) -> Optional[SimpleJob]:
self.job_id_counter += 1
job_id = self.job_id_counter

q: Queue = Queue()
wrapped_func = partial(_wrapper, q, func)
p = Process(target=wrapped_func, args=args, kwargs=kwargs)
job = SimpleJob(id=job_id, process=p)
p.start()
job.process = p
job.exception_queue = q # Store the queue in the job object
self.jobs.append(job)
self.jobs[job_id] = job
return job

0 comments on commit f91bac1

Please sign in to comment.