From decfb0fc2609a856746f9252c28dc77c6502bc7e Mon Sep 17 00:00:00 2001 From: Haiqi Xu <14502009+haiqi96@users.noreply.github.com> Date: Mon, 17 Jun 2024 20:40:10 -0400 Subject: [PATCH 01/11] Add job types --- .../scripts/native/search.py | 6 +- .../initialize-orchestration-db.py | 1 + .../job_orchestration/scheduler/constants.py | 10 ++ .../scheduler/query/query_scheduler.py | 92 ++++++++++--------- .../scheduler/scheduler_data.py | 13 ++- 5 files changed, 75 insertions(+), 47 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/search.py b/components/clp-package-utils/clp_package_utils/scripts/native/search.py index aa261d904..e8fc8da3c 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/search.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/search.py @@ -15,7 +15,7 @@ import pymongo from clp_py_utils.clp_config import Database, QUERY_JOBS_TABLE_NAME, ResultsCache from clp_py_utils.sql_adapter import SQL_Adapter -from job_orchestration.scheduler.constants import QueryJobStatus +from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType from job_orchestration.scheduler.job_config import AggregationConfig, SearchConfig from clp_package_utils.general import ( @@ -111,8 +111,8 @@ def create_and_monitor_job_in_db( ) as db_cursor: # Create job db_cursor.execute( - f"INSERT INTO `{QUERY_JOBS_TABLE_NAME}` (`job_config`) VALUES (%s)", - (msgpack.packb(search_config.dict()),), + f"INSERT INTO `{QUERY_JOBS_TABLE_NAME}` (`job_config`, `type`) VALUES (%s, %s)", + (msgpack.packb(search_config.dict()), QueryJobType.SEARCH), ) db_conn.commit() job_id = db_cursor.lastrowid diff --git a/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py b/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py index 32a285c42..4899fb85d 100644 --- a/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py +++ b/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py @@ -97,6 +97,7 @@ def main(argv): f""" CREATE TABLE IF NOT EXISTS `{QUERY_JOBS_TABLE_NAME}` ( `id` INT NOT NULL AUTO_INCREMENT, + `type`INT NOT NULL, `status` INT NOT NULL DEFAULT '{QueryJobStatus.PENDING}', `creation_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), `num_tasks` INT NOT NULL DEFAULT '0', diff --git a/components/job-orchestration/job_orchestration/scheduler/constants.py b/components/job-orchestration/job_orchestration/scheduler/constants.py index 62f06f0cf..3d813e30f 100644 --- a/components/job-orchestration/job_orchestration/scheduler/constants.py +++ b/components/job-orchestration/job_orchestration/scheduler/constants.py @@ -67,3 +67,13 @@ def __str__(self) -> str: def to_str(self) -> str: return str(self.name) + +class QueryJobType(IntEnum): + SEARCH = 0 + EXTRACT_IR = auto() + + def __str__(self) -> str: + return str(self.value) + + def to_str(self) -> str: + return str(self.name) \ No newline at end of file diff --git a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py index d8a045f31..51b93fe1a 100644 --- a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py @@ -40,7 +40,7 @@ from clp_py_utils.decorators import exception_default_value from clp_py_utils.sql_adapter import SQL_Adapter from job_orchestration.executor.query.fs_search_task import search -from job_orchestration.scheduler.constants import QueryJobStatus, QueryTaskStatus +from job_orchestration.scheduler.constants import QueryJobStatus, QueryTaskStatus, QueryJobType from job_orchestration.scheduler.job_config import SearchConfig from job_orchestration.scheduler.query.reducer_handler import ( handle_reducer_connection, @@ -102,7 +102,8 @@ def fetch_new_search_jobs(db_conn) -> list: db_cursor.execute( f""" SELECT {QUERY_JOBS_TABLE_NAME}.id as job_id, - {QUERY_JOBS_TABLE_NAME}.job_config + {QUERY_JOBS_TABLE_NAME}.job_config, + {QUERY_JOBS_TABLE_NAME}.type FROM {QUERY_JOBS_TABLE_NAME} WHERE {QUERY_JOBS_TABLE_NAME}.status={QueryJobStatus.PENDING} """ @@ -370,55 +371,62 @@ def handle_pending_search_jobs( reducer_acquisition_tasks = [] - pending_jobs = [ - job for job in active_jobs.values() if InternalJobState.WAITING_FOR_DISPATCH == job.state + pending_search_jobs = [ + job for job in active_jobs.values() if InternalJobState.WAITING_FOR_DISPATCH == job.state and QueryJobType.SEARCH == job.type ] with contextlib.closing(db_conn_pool.connect()) as db_conn: for job in fetch_new_search_jobs(db_conn): job_id = str(job["job_id"]) + job_type = job["type"] + job_config = job["job_config"] + + if QueryJobType.SEARCH == job_type: + # Avoid double-dispatch when a job is WAITING_FOR_REDUCER + if job_id in active_jobs: + continue + + search_config = SearchConfig.parse_obj(msgpack.unpackb(job_config)) + archives_for_search = get_archives_for_search(db_conn, search_config) + if len(archives_for_search) == 0: + if set_job_or_task_status( + db_conn, + QUERY_JOBS_TABLE_NAME, + job_id, + QueryJobStatus.SUCCEEDED, + QueryJobStatus.PENDING, + start_time=datetime.datetime.now(), + num_tasks=0, + duration=0, + ): + logger.info(f"No matching archives, skipping job {job_id}.") + continue + + new_search_job = SearchJob( + id=job_id, + search_config=search_config, + state=InternalJobState.WAITING_FOR_DISPATCH, + num_archives_to_search=len(archives_for_search), + num_archives_searched=0, + remaining_archives_for_search=archives_for_search, + ) - # Avoid double-dispatch when a job is WAITING_FOR_REDUCER - if job_id in active_jobs: - continue - - search_config = SearchConfig.parse_obj(msgpack.unpackb(job["job_config"])) - archives_for_search = get_archives_for_search(db_conn, search_config) - if len(archives_for_search) == 0: - if set_job_or_task_status( - db_conn, - QUERY_JOBS_TABLE_NAME, - job_id, - QueryJobStatus.SUCCEEDED, - QueryJobStatus.PENDING, - start_time=datetime.datetime.now(), - num_tasks=0, - duration=0, - ): - logger.info(f"No matching archives, skipping job {job['job_id']}.") - continue - - new_search_job = SearchJob( - id=job_id, - search_config=search_config, - state=InternalJobState.WAITING_FOR_DISPATCH, - num_archives_to_search=len(archives_for_search), - num_archives_searched=0, - remaining_archives_for_search=archives_for_search, - ) + if search_config.aggregation_config is not None: + new_search_job.search_config.aggregation_config.job_id = int(job_id) + new_search_job.state = InternalJobState.WAITING_FOR_REDUCER + new_search_job.reducer_acquisition_task = asyncio.create_task( + acquire_reducer_for_job(new_search_job) + ) + reducer_acquisition_tasks.append(new_search_job.reducer_acquisition_task) + else: + pending_search_jobs.append(new_search_job) + active_jobs[job_id] = new_search_job - if search_config.aggregation_config is not None: - new_search_job.search_config.aggregation_config.job_id = job["job_id"] - new_search_job.state = InternalJobState.WAITING_FOR_REDUCER - new_search_job.reducer_acquisition_task = asyncio.create_task( - acquire_reducer_for_job(new_search_job) - ) - reducer_acquisition_tasks.append(new_search_job.reducer_acquisition_task) else: - pending_jobs.append(new_search_job) - active_jobs[job_id] = new_search_job + logger.error(f"Unexpected job type: {job_type}, skipping job {job_id}") + continue - for job in pending_jobs: + for job in pending_search_jobs: job_id = job.id if ( diff --git a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py index a3aa5f436..5939ba0f2 100644 --- a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py +++ b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py @@ -3,10 +3,10 @@ from enum import auto, Enum from typing import Any, Dict, List, Optional -from job_orchestration.scheduler.constants import CompressionTaskStatus, QueryTaskStatus +from job_orchestration.scheduler.constants import CompressionTaskStatus, QueryTaskStatus, QueryJobType from job_orchestration.scheduler.job_config import SearchConfig from job_orchestration.scheduler.query.reducer_handler import ReducerHandlerMessageQueues -from pydantic import BaseModel, validator +from pydantic import BaseModel, validator, Field class CompressionJob(BaseModel): @@ -37,10 +37,17 @@ class InternalJobState(Enum): class QueryJob(BaseModel): id: str + type: QueryJobType state: InternalJobState start_time: Optional[datetime.datetime] current_sub_job_async_task_result: Optional[Any] + @validator("type") + def valid_type(cls, field): + supported_job = [QueryJobType.SEARCH, QueryJobType.EXTRACT_IR] + if field not in supported_job: + raise ValueError(f'must be one of the following {"|".join(supported_job)}') + return field class SearchJob(QueryJob): search_config: SearchConfig @@ -50,6 +57,8 @@ class SearchJob(QueryJob): reducer_acquisition_task: Optional[asyncio.Task] reducer_handler_msg_queues: Optional[ReducerHandlerMessageQueues] + type: QueryJobType = Field(default=QueryJobType.SEARCH, const=True) + class Config: # To allow asyncio.Task and asyncio.Queue arbitrary_types_allowed = True From e52877ce75141d16312f1def48b3bd886541c414 Mon Sep 17 00:00:00 2001 From: Haiqi Xu <14502009+haiqi96@users.noreply.github.com> Date: Wed, 19 Jun 2024 11:50:02 -0400 Subject: [PATCH 02/11] Add base class for SearchQuery --- .../executor/query/fs_search_task.py | 4 +- .../job_orchestration/scheduler/job_config.py | 15 +- .../scheduler/query/query_scheduler.py | 320 ++++++++++-------- .../scheduler/scheduler_data.py | 39 ++- .../webui/imports/api/search/constants.js | 14 + .../api/search/server/QueryJobsDbManager.js | 8 +- 6 files changed, 239 insertions(+), 161 deletions(-) diff --git a/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py b/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py index f51eae407..ac3d1312f 100644 --- a/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py +++ b/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py @@ -168,7 +168,7 @@ def search( task_id=task_id, status=QueryTaskStatus.FAILED, duration=0, - error_log_path=clo_log_path, + error_log_path=str(clo_log_path), ).dict() update_search_task_metadata( @@ -231,6 +231,6 @@ def sigterm_handler(_signo, _stack_frame): ) if QueryTaskStatus.FAILED == search_status: - search_task_result.error_log_path = clo_log_path + search_task_result.error_log_path = str(clo_log_path) return search_task_result.dict() diff --git a/components/job-orchestration/job_orchestration/scheduler/job_config.py b/components/job-orchestration/job_orchestration/scheduler/job_config.py index 93d4ede4e..6968b6dad 100644 --- a/components/job-orchestration/job_orchestration/scheduler/job_config.py +++ b/components/job-orchestration/job_orchestration/scheduler/job_config.py @@ -3,7 +3,7 @@ import typing from pydantic import BaseModel, validator - +from abc import ABC class PathsToCompress(BaseModel): file_paths: typing.List[str] @@ -39,7 +39,18 @@ class AggregationConfig(BaseModel): count_by_time_bucket_size: typing.Optional[int] = None # Milliseconds -class SearchConfig(BaseModel): +class QueryConfig(BaseModel, ABC): + ... + + +class ExtractConfig(QueryConfig): + orig_file_id: str + msg_ix: int + file_split_id: typing.Optional[str] = None + target_size: typing.Optional[int] = None + + +class SearchConfig(QueryConfig): query_string: str max_num_results: int tags: typing.Optional[typing.List[str]] = None diff --git a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py index 51b93fe1a..434a03f7f 100644 --- a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py @@ -24,7 +24,7 @@ import pathlib import sys from pathlib import Path -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Any import celery import msgpack @@ -41,21 +41,21 @@ from clp_py_utils.sql_adapter import SQL_Adapter from job_orchestration.executor.query.fs_search_task import search from job_orchestration.scheduler.constants import QueryJobStatus, QueryTaskStatus, QueryJobType -from job_orchestration.scheduler.job_config import SearchConfig +from job_orchestration.scheduler.job_config import SearchConfig, ExtractConfig from job_orchestration.scheduler.query.reducer_handler import ( handle_reducer_connection, ReducerHandlerMessage, ReducerHandlerMessageQueues, ReducerHandlerMessageType, ) -from job_orchestration.scheduler.scheduler_data import InternalJobState, QueryTaskResult, SearchJob +from job_orchestration.scheduler.scheduler_data import InternalJobState, QueryTaskResult, SearchJob, QueryJob, ExtractJob from pydantic import ValidationError # Setup logging logger = get_logger("search-job-handler") # Dictionary of active jobs indexed by job id -active_jobs: Dict[str, SearchJob] = {} +active_jobs: Dict[str, QueryJob] = {} reducer_connection_queue: Optional[asyncio.Queue] = None @@ -91,7 +91,7 @@ async def release_reducer_for_job(job: SearchJob): @exception_default_value(default=[]) -def fetch_new_search_jobs(db_conn) -> list: +def fetch_new_query_jobs(db_conn) -> list: """ Fetches search jobs with status=PENDING from the database. :param db_conn: @@ -112,7 +112,7 @@ def fetch_new_search_jobs(db_conn) -> list: @exception_default_value(default=[]) -def fetch_cancelling_search_jobs(db_conn) -> list: +def fetch_cancelling_query_jobs(db_conn) -> list: """ Fetches search jobs with status=CANCELLING from the database. :param db_conn: @@ -175,55 +175,67 @@ def set_job_or_task_status( return rval -async def handle_cancelling_search_jobs(db_conn_pool) -> None: +async def handle_cancelling_query_jobs(db_conn_pool) -> None: global active_jobs with contextlib.closing(db_conn_pool.connect()) as db_conn: - cancelling_jobs = fetch_cancelling_search_jobs(db_conn) - + cancelling_jobs = fetch_cancelling_query_jobs(db_conn) for cancelling_job in cancelling_jobs: job_id = str(cancelling_job["job_id"]) - if job_id in active_jobs: - job = active_jobs.pop(job_id) - cancel_job_except_reducer(job) - # Perform any async tasks last so that it's easier to reason about synchronization - # issues between concurrent tasks - await release_reducer_for_job(job) - else: - continue + job_type = job.type() + if QueryJobType.SEARCH == job_type: + if job_id in active_jobs: + job = active_jobs.pop(job_id) + cancel_job_except_reducer(job) + # Perform any async tasks last so that it's easier to reason about synchronization + # issues between concurrent tasks + await release_reducer_for_job(job) + else: + continue - set_job_or_task_status( - db_conn, - QUERY_TASKS_TABLE_NAME, - job_id, - QueryTaskStatus.CANCELLED, - QueryTaskStatus.PENDING, - duration=0, - ) + set_job_or_task_status( + db_conn, + QUERY_TASKS_TABLE_NAME, + job_id, + QueryTaskStatus.CANCELLED, + QueryTaskStatus.PENDING, + duration=0, + ) - set_job_or_task_status( - db_conn, - QUERY_TASKS_TABLE_NAME, - job_id, - QueryTaskStatus.CANCELLED, - QueryTaskStatus.RUNNING, - duration="TIMESTAMPDIFF(MICROSECOND, start_time, NOW())/1000000.0", - ) + set_job_or_task_status( + db_conn, + QUERY_TASKS_TABLE_NAME, + job_id, + QueryTaskStatus.CANCELLED, + QueryTaskStatus.RUNNING, + duration="TIMESTAMPDIFF(MICROSECOND, start_time, NOW())/1000000.0", + ) - if set_job_or_task_status( - db_conn, - QUERY_JOBS_TABLE_NAME, - job_id, - QueryJobStatus.CANCELLED, - QueryJobStatus.CANCELLING, - duration=(datetime.datetime.now() - job.start_time).total_seconds(), - ): - logger.info(f"Cancelled job {job_id}.") + if set_job_or_task_status( + db_conn, + QUERY_JOBS_TABLE_NAME, + job_id, + QueryJobStatus.CANCELLED, + QueryJobStatus.CANCELLING, + duration=(datetime.datetime.now() - job.start_time).total_seconds(), + ): + logger.info(f"Cancelled job {job_id}.") + else: + logger.error(f"Failed to cancel job {job_id}.") else: - logger.error(f"Failed to cancel job {job_id}.") + logger.error(f"Unexpected job type: {job_type} for cancellation, marking job {job_id} as failed.") + if not set_job_or_task_status( + db_conn, + QUERY_JOBS_TABLE_NAME, + job_id, + QueryJobStatus.FAILED, + duration=(datetime.datetime.now() - job.start_time).total_seconds(), + ): + logger.error(f"Failed to mark job {job_id} as failed.") + -def insert_search_tasks_into_db(db_conn, job_id, archive_ids: List[str]) -> List[int]: +def insert_query_tasks_into_db(db_conn, job_id, archive_ids: List[str]) -> List[int]: task_ids = [] with contextlib.closing(db_conn.cursor()) as cursor: for archive_id in archive_ids: @@ -274,18 +286,17 @@ def get_archives_for_search( def get_task_group_for_job( archive_ids: List[str], task_ids: List[int], - job_id: str, - search_config: SearchConfig, + job: QueryJob, clp_metadata_db_conn_params: Dict[str, any], results_cache_uri: str, ): - search_config_obj = search_config.dict() + job_config_obj = job.job_config().dict() return celery.group( search.s( - job_id=job_id, + job_id=job.id, archive_id=archive_ids[i], task_id=task_ids[i], - search_config_obj=search_config_obj, + search_config_obj=job_config_obj, clp_metadata_db_conn_params=clp_metadata_db_conn_params, results_cache_uri=results_cache_uri, ) @@ -293,22 +304,20 @@ def get_task_group_for_job( ) -def dispatch_search_job( +def dispatch_query_job( db_conn, - job: SearchJob, - archives_for_search: List[Dict[str, any]], + job: QueryJob, + archive_ids: List[str], clp_metadata_db_conn_params: Dict[str, any], results_cache_uri: str, ) -> None: global active_jobs - archive_ids = [archive["archive_id"] for archive in archives_for_search] - task_ids = insert_search_tasks_into_db(db_conn, job.id, archive_ids) + task_ids = insert_query_tasks_into_db(db_conn, job.id, archive_ids) task_group = get_task_group_for_job( archive_ids, task_ids, - job.id, - job.search_config, + job, clp_metadata_db_conn_params, results_cache_uri, ) @@ -376,7 +385,7 @@ def handle_pending_search_jobs( ] with contextlib.closing(db_conn_pool.connect()) as db_conn: - for job in fetch_new_search_jobs(db_conn): + for job in fetch_new_query_jobs(db_conn): job_id = str(job["job_id"]) job_type = job["type"] job_config = job["job_config"] @@ -421,14 +430,12 @@ def handle_pending_search_jobs( else: pending_search_jobs.append(new_search_job) active_jobs[job_id] = new_search_job - else: logger.error(f"Unexpected job type: {job_type}, skipping job {job_id}") continue for job in pending_search_jobs: job_id = job.id - if ( job.search_config.network_address is None and len(job.remaining_archives_for_search) > num_archives_to_search_per_sub_job @@ -443,11 +450,15 @@ def handle_pending_search_jobs( archives_for_search = job.remaining_archives_for_search job.remaining_archives_for_search = [] - dispatch_search_job( - db_conn, job, archives_for_search, clp_metadata_db_conn_params, results_cache_uri + archive_ids_for_search = [ + archive["archive_id"] for archive in archives_for_search + ] + + dispatch_query_job( + db_conn, job, archive_ids_for_search, clp_metadata_db_conn_params, results_cache_uri ) logger.info( - f"Dispatched job {job_id} with {len(archives_for_search)} archives to search." + f"Dispatched job {job_id} with {len(archive_ids_for_search)} archives to search." ) start_time = datetime.datetime.now() job.start_time = start_time @@ -495,6 +506,95 @@ def found_max_num_latest_results( return max_timestamp_in_remaining_archives <= min_timestamp_in_top_results +async def handle_returned_search_job( + db_conn, + job: SearchJob, + task_results: Optional[Any], + results_cache_uri: str +) -> None: + global active_jobs + + job_id = job.id + is_reducer_job = job.reducer_handler_msg_queues is not None + new_job_status = QueryJobStatus.RUNNING + for task_result_obj in task_results: + task_result = QueryTaskResult.parse_obj(task_result_obj) + task_id = task_result.task_id + task_status = task_result.status + if not task_status == QueryTaskStatus.SUCCEEDED: + new_job_status = QueryJobStatus.FAILED + logger.error( + f"Search task job-{job_id}-task-{task_id} failed. " + f"Check {task_result.error_log_path} for details." + ) + else: + job.num_archives_searched += 1 + logger.info( + f"Search task job-{job_id}-task-{task_id} succeeded in " + f"{task_result.duration} second(s)." + ) + + if new_job_status != QueryJobStatus.FAILED: + max_num_results = job.search_config.max_num_results + # Check if we've searched all archives + if len(job.remaining_archives_for_search) == 0: + new_job_status = QueryJobStatus.SUCCEEDED + # Check if we've reached max results + elif False == is_reducer_job and max_num_results > 0: + if found_max_num_latest_results( + results_cache_uri, + job_id, + max_num_results, + job.remaining_archives_for_search[0]["end_timestamp"], + ): + new_job_status = QueryJobStatus.SUCCEEDED + if new_job_status == QueryJobStatus.RUNNING: + job.current_sub_job_async_task_result = None + job.state = InternalJobState.WAITING_FOR_DISPATCH + logger.info(f"Job {job_id} waiting for more archives to search.") + set_job_or_task_status( + db_conn, + QUERY_JOBS_TABLE_NAME, + job_id, + QueryJobStatus.RUNNING, + QueryJobStatus.RUNNING, + num_tasks_completed=job.num_archives_searched, + ) + return + + reducer_failed = False + if is_reducer_job: + # Notify reducer that it should have received all results + msg = ReducerHandlerMessage(ReducerHandlerMessageType.SUCCESS) + await job.reducer_handler_msg_queues.put_to_handler(msg) + + msg = await job.reducer_handler_msg_queues.get_from_handler() + if ReducerHandlerMessageType.FAILURE == msg.msg_type: + reducer_failed = True + new_job_status = QueryJobStatus.FAILED + elif ReducerHandlerMessageType.SUCCESS != msg.msg_type: + error_msg = f"Unexpected msg_type: {msg.msg_type.name}" + raise NotImplementedError(error_msg) + + # We set the status regardless of the job's previous status to handle the case where the + # job is cancelled (status = CANCELLING) while we're in this method. + if set_job_or_task_status( + db_conn, + QUERY_JOBS_TABLE_NAME, + job_id, + new_job_status, + num_tasks_completed=job.num_archives_searched, + duration=(datetime.datetime.now() - job.start_time).total_seconds(), + ): + if new_job_status == QueryJobStatus.SUCCEEDED: + logger.info(f"Completed job {job_id}.") + elif reducer_failed: + logger.error(f"Completed job {job_id} with failing reducer.") + else: + logger.info(f"Completed job {job_id} with failing tasks.") + del active_jobs[job_id] + + async def check_job_status_and_update_db(db_conn_pool, results_cache_uri): global active_jobs @@ -503,16 +603,15 @@ async def check_job_status_and_update_db(db_conn_pool, results_cache_uri): id for id, job in active_jobs.items() if InternalJobState.RUNNING == job.state ]: job = active_jobs[job_id] - is_reducer_job = job.reducer_handler_msg_queues is not None - try: returned_results = try_getting_task_result(job.current_sub_job_async_task_result) except Exception as e: logger.error(f"Job `{job_id}` failed: {e}.") # Clean up - if is_reducer_job: - msg = ReducerHandlerMessage(ReducerHandlerMessageType.FAILURE) - await job.reducer_handler_msg_queues.put_to_handler(msg) + if QueryJobType.SEARCH == job.type(): + if job.reducer_handler_msg_queues is not None: + msg = ReducerHandlerMessage(ReducerHandlerMessageType.FAILURE) + await job.reducer_handler_msg_queues.put_to_handler(msg) del active_jobs[job_id] set_job_or_task_status( @@ -527,89 +626,24 @@ async def check_job_status_and_update_db(db_conn_pool, results_cache_uri): if returned_results is None: continue - - new_job_status = QueryJobStatus.RUNNING - for task_result_obj in returned_results: - task_result = QueryTaskResult.parse_obj(task_result_obj) - task_id = task_result.task_id - task_status = task_result.status - if not task_status == QueryTaskStatus.SUCCEEDED: - new_job_status = QueryJobStatus.FAILED - logger.error( - f"Search task job-{job_id}-task-{task_id} failed. " - f"Check {task_result.error_log_path} for details." - ) - else: - job.num_archives_searched += 1 - logger.info( - f"Search task job-{job_id}-task-{task_id} succeeded in " - f"{task_result.duration} second(s)." - ) - - if new_job_status != QueryJobStatus.FAILED: - max_num_results = job.search_config.max_num_results - # Check if we've searched all archives - if len(job.remaining_archives_for_search) == 0: - new_job_status = QueryJobStatus.SUCCEEDED - # Check if we've reached max results - elif False == is_reducer_job and max_num_results > 0: - if found_max_num_latest_results( - results_cache_uri, - job_id, - max_num_results, - job.remaining_archives_for_search[0]["end_timestamp"], - ): - new_job_status = QueryJobStatus.SUCCEEDED - if new_job_status == QueryJobStatus.RUNNING: - job.current_sub_job_async_task_result = None - job.state = InternalJobState.WAITING_FOR_DISPATCH - logger.info(f"Job {job_id} waiting for more archives to search.") - set_job_or_task_status( + job_type = job.type() + if QueryJobType.SEARCH == job.type(): + search_job: SearchJob = job + await handle_returned_search_job( db_conn, - QUERY_JOBS_TABLE_NAME, - job_id, - QueryJobStatus.RUNNING, - QueryJobStatus.RUNNING, - num_tasks_completed=job.num_archives_searched, + search_job, + returned_results, + results_cache_uri ) - continue + else: + logger.error(f"Unexpected job type: {job_type}, skipping job {job_id}") + - reducer_failed = False - if is_reducer_job: - # Notify reducer that it should have received all results - msg = ReducerHandlerMessage(ReducerHandlerMessageType.SUCCESS) - await job.reducer_handler_msg_queues.put_to_handler(msg) - - msg = await job.reducer_handler_msg_queues.get_from_handler() - if ReducerHandlerMessageType.FAILURE == msg.msg_type: - reducer_failed = True - new_job_status = QueryJobStatus.FAILED - elif ReducerHandlerMessageType.SUCCESS != msg.msg_type: - error_msg = f"Unexpected msg_type: {msg.msg_type.name}" - raise NotImplementedError(error_msg) - - # We set the status regardless of the job's previous status to handle the case where the - # job is cancelled (status = CANCELLING) while we're in this method. - if set_job_or_task_status( - db_conn, - QUERY_JOBS_TABLE_NAME, - job_id, - new_job_status, - num_tasks_completed=job.num_archives_searched, - duration=(datetime.datetime.now() - job.start_time).total_seconds(), - ): - if new_job_status == QueryJobStatus.SUCCEEDED: - logger.info(f"Completed job {job_id}.") - elif reducer_failed: - logger.error(f"Completed job {job_id} with failing reducer.") - else: - logger.info(f"Completed job {job_id} with failing tasks.") - del active_jobs[job_id] async def handle_job_updates(db_conn_pool, results_cache_uri: str, jobs_poll_delay: float): while True: - await handle_cancelling_search_jobs(db_conn_pool) + await handle_cancelling_query_jobs(db_conn_pool) await check_job_status_and_update_db(db_conn_pool, results_cache_uri) await asyncio.sleep(jobs_poll_delay) diff --git a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py index 5939ba0f2..04ff0fc50 100644 --- a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py +++ b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py @@ -4,10 +4,10 @@ from typing import Any, Dict, List, Optional from job_orchestration.scheduler.constants import CompressionTaskStatus, QueryTaskStatus, QueryJobType -from job_orchestration.scheduler.job_config import SearchConfig +from job_orchestration.scheduler.job_config import SearchConfig, QueryConfig, ExtractConfig from job_orchestration.scheduler.query.reducer_handler import ReducerHandlerMessageQueues from pydantic import BaseModel, validator, Field - +from abc import ABC, abstractmethod class CompressionJob(BaseModel): id: int @@ -30,24 +30,37 @@ def valid_status(cls, field): class InternalJobState(Enum): + PENDING = auto() WAITING_FOR_REDUCER = auto() WAITING_FOR_DISPATCH = auto() RUNNING = auto() -class QueryJob(BaseModel): +class QueryJob(BaseModel, ABC): id: str - type: QueryJobType state: InternalJobState start_time: Optional[datetime.datetime] current_sub_job_async_task_result: Optional[Any] - @validator("type") - def valid_type(cls, field): - supported_job = [QueryJobType.SEARCH, QueryJobType.EXTRACT_IR] - if field not in supported_job: - raise ValueError(f'must be one of the following {"|".join(supported_job)}') - return field + @abstractmethod + def type(self) -> QueryJobType: + ... + + @abstractmethod + def job_config(self) -> QueryConfig: + ... + + +class ExtractJob(QueryJob): + extract_config: ExtractConfig + archive_id: str + + def type(self) -> QueryJobType: + return QueryJobType.EXTRACT_IR + + def job_config(self) -> QueryConfig: + return self.extract_config + class SearchJob(QueryJob): search_config: SearchConfig @@ -57,7 +70,11 @@ class SearchJob(QueryJob): reducer_acquisition_task: Optional[asyncio.Task] reducer_handler_msg_queues: Optional[ReducerHandlerMessageQueues] - type: QueryJobType = Field(default=QueryJobType.SEARCH, const=True) + def type(self) -> QueryJobType: + return QueryJobType.SEARCH + + def job_config(self) -> QueryConfig: + return self.search_config class Config: # To allow asyncio.Task and asyncio.Queue arbitrary_types_allowed = True diff --git a/components/webui/imports/api/search/constants.js b/components/webui/imports/api/search/constants.js index ec4c13ad6..2da303915 100644 --- a/components/webui/imports/api/search/constants.js +++ b/components/webui/imports/api/search/constants.js @@ -56,6 +56,20 @@ const isOperationInProgress = (s) => ( (true === isSearchSignalReq(s)) || (true === isSearchSignalQuerying(s)) ); +/* eslint-disable sort-keys */ +let enumQueryType; +/** + * Enum of job type, matching the `QueryJobType` class in + * `job_orchestration.query_scheduler.constants`. + * + * @enum {number} + */ +const QUERY_JOB_TYPE = Object.freeze({ + SEARCH: (enumQueryType = 0), + EXTRACT_IR: ++enumQueryType, + +}); +/* eslint-enable sort-keys */ /* eslint-disable sort-keys */ let enumQueryJobStatus; diff --git a/components/webui/imports/api/search/server/QueryJobsDbManager.js b/components/webui/imports/api/search/server/QueryJobsDbManager.js index 4d3bed94a..266aac523 100644 --- a/components/webui/imports/api/search/server/QueryJobsDbManager.js +++ b/components/webui/imports/api/search/server/QueryJobsDbManager.js @@ -5,6 +5,7 @@ import {sleep} from "/imports/utils/misc"; import { QUERY_JOB_STATUS, QUERY_JOB_STATUS_WAITING_STATES, + QUERY_JOB_TYPE, } from "../constants"; @@ -21,6 +22,7 @@ const JOB_COMPLETION_STATUS_POLL_INTERVAL_MILLIS = 0.5; const QUERY_JOBS_TABLE_COLUMN_NAMES = Object.freeze({ ID: "id", STATUS: "status", + TYPE: "type", JOB_CONFIG: "job_config", }); @@ -52,9 +54,9 @@ class QueryJobsDbManager { async submitSearchJob (searchConfig) { const [queryInsertResults] = await this.#sqlDbConnPool.query( `INSERT INTO ${this.#queryJobsTableName} - (${QUERY_JOBS_TABLE_COLUMN_NAMES.JOB_CONFIG}) - VALUES (?)`, - [Buffer.from(msgpack.encode(searchConfig))], + (${QUERY_JOBS_TABLE_COLUMN_NAMES.JOB_CONFIG}, ${QUERY_JOBS_TABLE_COLUMN_NAMES.TYPE}) + VALUES (?, ?)`, + [Buffer.from(msgpack.encode(searchConfig)), QUERY_JOB_TYPE.SEARCH], ); return queryInsertResults.insertId; From d28681cc3adbe5461ddfdd81083f74cb1d1f221a Mon Sep 17 00:00:00 2001 From: Haiqi Xu <14502009+haiqi96@users.noreply.github.com> Date: Wed, 19 Jun 2024 12:58:32 -0400 Subject: [PATCH 03/11] Fix --- .../scheduler/query/query_scheduler.py | 94 +++++++++---------- .../webui/imports/api/search/constants.js | 29 +++--- 2 files changed, 57 insertions(+), 66 deletions(-) diff --git a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py index 434a03f7f..9107a5aa4 100644 --- a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py @@ -93,9 +93,9 @@ async def release_reducer_for_job(job: SearchJob): @exception_default_value(default=[]) def fetch_new_query_jobs(db_conn) -> list: """ - Fetches search jobs with status=PENDING from the database. + Fetches query jobs with status=PENDING from the database. :param db_conn: - :return: The pending search jobs on success. An empty list if an exception occurs while + :return: The pending query jobs on success. An empty list if an exception occurs while interacting with the database. """ with contextlib.closing(db_conn.cursor(dictionary=True)) as db_cursor: @@ -112,7 +112,7 @@ def fetch_new_query_jobs(db_conn) -> list: @exception_default_value(default=[]) -def fetch_cancelling_query_jobs(db_conn) -> list: +def fetch_cancelling_search_jobs(db_conn) -> list: """ Fetches search jobs with status=CANCELLING from the database. :param db_conn: @@ -125,6 +125,7 @@ def fetch_cancelling_query_jobs(db_conn) -> list: SELECT {QUERY_JOBS_TABLE_NAME}.id as job_id FROM {QUERY_JOBS_TABLE_NAME} WHERE {QUERY_JOBS_TABLE_NAME}.status={QueryJobStatus.CANCELLING} + AND {QUERY_JOBS_TABLE_NAME}.type={QueryJobType.SEARCH} """ ) return db_cursor.fetchall() @@ -175,63 +176,52 @@ def set_job_or_task_status( return rval -async def handle_cancelling_query_jobs(db_conn_pool) -> None: +async def handle_cancelling_search_jobs(db_conn_pool) -> None: global active_jobs with contextlib.closing(db_conn_pool.connect()) as db_conn: - cancelling_jobs = fetch_cancelling_query_jobs(db_conn) + cancelling_jobs = fetch_cancelling_search_jobs(db_conn) + for cancelling_job in cancelling_jobs: job_id = str(cancelling_job["job_id"]) - job_type = job.type() - if QueryJobType.SEARCH == job_type: - if job_id in active_jobs: - job = active_jobs.pop(job_id) - cancel_job_except_reducer(job) - # Perform any async tasks last so that it's easier to reason about synchronization - # issues between concurrent tasks - await release_reducer_for_job(job) - else: - continue + if job_id in active_jobs: + job = active_jobs.pop(job_id) + cancel_job_except_reducer(job) + # Perform any async tasks last so that it's easier to reason about synchronization + # issues between concurrent tasks + await release_reducer_for_job(job) + else: + continue - set_job_or_task_status( - db_conn, - QUERY_TASKS_TABLE_NAME, - job_id, - QueryTaskStatus.CANCELLED, - QueryTaskStatus.PENDING, - duration=0, - ) + set_job_or_task_status( + db_conn, + QUERY_TASKS_TABLE_NAME, + job_id, + QueryTaskStatus.CANCELLED, + QueryTaskStatus.PENDING, + duration=0, + ) - set_job_or_task_status( - db_conn, - QUERY_TASKS_TABLE_NAME, - job_id, - QueryTaskStatus.CANCELLED, - QueryTaskStatus.RUNNING, - duration="TIMESTAMPDIFF(MICROSECOND, start_time, NOW())/1000000.0", - ) + set_job_or_task_status( + db_conn, + QUERY_TASKS_TABLE_NAME, + job_id, + QueryTaskStatus.CANCELLED, + QueryTaskStatus.RUNNING, + duration="TIMESTAMPDIFF(MICROSECOND, start_time, NOW())/1000000.0", + ) - if set_job_or_task_status( - db_conn, - QUERY_JOBS_TABLE_NAME, - job_id, - QueryJobStatus.CANCELLED, - QueryJobStatus.CANCELLING, - duration=(datetime.datetime.now() - job.start_time).total_seconds(), - ): - logger.info(f"Cancelled job {job_id}.") - else: - logger.error(f"Failed to cancel job {job_id}.") + if set_job_or_task_status( + db_conn, + QUERY_JOBS_TABLE_NAME, + job_id, + QueryJobStatus.CANCELLED, + QueryJobStatus.CANCELLING, + duration=(datetime.datetime.now() - job.start_time).total_seconds(), + ): + logger.info(f"Cancelled job {job_id}.") else: - logger.error(f"Unexpected job type: {job_type} for cancellation, marking job {job_id} as failed.") - if not set_job_or_task_status( - db_conn, - QUERY_JOBS_TABLE_NAME, - job_id, - QueryJobStatus.FAILED, - duration=(datetime.datetime.now() - job.start_time).total_seconds(), - ): - logger.error(f"Failed to mark job {job_id} as failed.") + logger.error(f"Failed to cancel job {job_id}.") @@ -643,7 +633,7 @@ async def check_job_status_and_update_db(db_conn_pool, results_cache_uri): async def handle_job_updates(db_conn_pool, results_cache_uri: str, jobs_poll_delay: float): while True: - await handle_cancelling_query_jobs(db_conn_pool) + await handle_cancelling_search_jobs(db_conn_pool) await check_job_status_and_update_db(db_conn_pool, results_cache_uri) await asyncio.sleep(jobs_poll_delay) diff --git a/components/webui/imports/api/search/constants.js b/components/webui/imports/api/search/constants.js index 2da303915..908367b76 100644 --- a/components/webui/imports/api/search/constants.js +++ b/components/webui/imports/api/search/constants.js @@ -56,20 +56,6 @@ const isOperationInProgress = (s) => ( (true === isSearchSignalReq(s)) || (true === isSearchSignalQuerying(s)) ); -/* eslint-disable sort-keys */ -let enumQueryType; -/** - * Enum of job type, matching the `QueryJobType` class in - * `job_orchestration.query_scheduler.constants`. - * - * @enum {number} - */ -const QUERY_JOB_TYPE = Object.freeze({ - SEARCH: (enumQueryType = 0), - EXTRACT_IR: ++enumQueryType, - -}); -/* eslint-enable sort-keys */ /* eslint-disable sort-keys */ let enumQueryJobStatus; @@ -95,6 +81,20 @@ const QUERY_JOB_STATUS_WAITING_STATES = [ QUERY_JOB_STATUS.CANCELLING, ]; +/* eslint-disable sort-keys */ +let enumQueryType; +/** + * Enum of job type, matching the `QueryJobType` class in + * `job_orchestration.query_scheduler.constants`. + * + * @enum {number} + */ +const QUERY_JOB_TYPE = Object.freeze({ + SEARCH: (enumQueryType = 0), + EXTRACT_IR: ++enumQueryType, +}); +/* eslint-enable sort-keys */ + /** * Enum of Mongo Collection sort orders. * @@ -128,6 +128,7 @@ export { MONGO_SORT_ORDER, QUERY_JOB_STATUS, QUERY_JOB_STATUS_WAITING_STATES, + QUERY_JOB_TYPE, SEARCH_MAX_NUM_RESULTS, SEARCH_RESULTS_FIELDS, SEARCH_SIGNAL, From d027279bfae0a4524d99597a832b150782ef9238 Mon Sep 17 00:00:00 2001 From: Haiqi Xu <14502009+haiqi96@users.noreply.github.com> Date: Wed, 19 Jun 2024 14:53:13 -0400 Subject: [PATCH 04/11] Linter --- .../job_orchestration/scheduler/constants.py | 3 +- .../job_orchestration/scheduler/job_config.py | 6 +- .../scheduler/query/query_scheduler.py | 55 +++++++++---------- .../scheduler/scheduler_data.py | 19 ++++--- .../api/search/server/QueryJobsDbManager.js | 3 +- 5 files changed, 44 insertions(+), 42 deletions(-) diff --git a/components/job-orchestration/job_orchestration/scheduler/constants.py b/components/job-orchestration/job_orchestration/scheduler/constants.py index 3d813e30f..22e5e34f4 100644 --- a/components/job-orchestration/job_orchestration/scheduler/constants.py +++ b/components/job-orchestration/job_orchestration/scheduler/constants.py @@ -68,6 +68,7 @@ def __str__(self) -> str: def to_str(self) -> str: return str(self.name) + class QueryJobType(IntEnum): SEARCH = 0 EXTRACT_IR = auto() @@ -76,4 +77,4 @@ def __str__(self) -> str: return str(self.value) def to_str(self) -> str: - return str(self.name) \ No newline at end of file + return str(self.name) diff --git a/components/job-orchestration/job_orchestration/scheduler/job_config.py b/components/job-orchestration/job_orchestration/scheduler/job_config.py index 6968b6dad..fda83af6b 100644 --- a/components/job-orchestration/job_orchestration/scheduler/job_config.py +++ b/components/job-orchestration/job_orchestration/scheduler/job_config.py @@ -1,9 +1,10 @@ from __future__ import annotations import typing +from abc import ABC from pydantic import BaseModel, validator -from abc import ABC + class PathsToCompress(BaseModel): file_paths: typing.List[str] @@ -39,8 +40,7 @@ class AggregationConfig(BaseModel): count_by_time_bucket_size: typing.Optional[int] = None # Milliseconds -class QueryConfig(BaseModel, ABC): - ... +class QueryConfig(BaseModel, ABC): ... class ExtractConfig(QueryConfig): diff --git a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py index 9107a5aa4..bbcc14fc3 100644 --- a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py @@ -24,7 +24,7 @@ import pathlib import sys from pathlib import Path -from typing import Dict, List, Optional, Any +from typing import Any, Dict, List, Optional import celery import msgpack @@ -40,15 +40,21 @@ from clp_py_utils.decorators import exception_default_value from clp_py_utils.sql_adapter import SQL_Adapter from job_orchestration.executor.query.fs_search_task import search -from job_orchestration.scheduler.constants import QueryJobStatus, QueryTaskStatus, QueryJobType -from job_orchestration.scheduler.job_config import SearchConfig, ExtractConfig +from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType, QueryTaskStatus +from job_orchestration.scheduler.job_config import ExtractConfig, SearchConfig from job_orchestration.scheduler.query.reducer_handler import ( handle_reducer_connection, ReducerHandlerMessage, ReducerHandlerMessageQueues, ReducerHandlerMessageType, ) -from job_orchestration.scheduler.scheduler_data import InternalJobState, QueryTaskResult, SearchJob, QueryJob, ExtractJob +from job_orchestration.scheduler.scheduler_data import ( + ExtractJob, + InternalJobState, + QueryJob, + QueryTaskResult, + SearchJob, +) from pydantic import ValidationError # Setup logging @@ -224,7 +230,6 @@ async def handle_cancelling_search_jobs(db_conn_pool) -> None: logger.error(f"Failed to cancel job {job_id}.") - def insert_query_tasks_into_db(db_conn, job_id, archive_ids: List[str]) -> List[int]: task_ids = [] with contextlib.closing(db_conn.cursor()) as cursor: @@ -371,7 +376,9 @@ def handle_pending_search_jobs( reducer_acquisition_tasks = [] pending_search_jobs = [ - job for job in active_jobs.values() if InternalJobState.WAITING_FOR_DISPATCH == job.state and QueryJobType.SEARCH == job.type + job + for job in active_jobs.values() + if InternalJobState.WAITING_FOR_DISPATCH == job.state and job.type() == QueryJobType.SEARCH ] with contextlib.closing(db_conn_pool.connect()) as db_conn: @@ -440,9 +447,7 @@ def handle_pending_search_jobs( archives_for_search = job.remaining_archives_for_search job.remaining_archives_for_search = [] - archive_ids_for_search = [ - archive["archive_id"] for archive in archives_for_search - ] + archive_ids_for_search = [archive["archive_id"] for archive in archives_for_search] dispatch_query_job( db_conn, job, archive_ids_for_search, clp_metadata_db_conn_params, results_cache_uri @@ -497,10 +502,7 @@ def found_max_num_latest_results( async def handle_returned_search_job( - db_conn, - job: SearchJob, - task_results: Optional[Any], - results_cache_uri: str + db_conn, job: SearchJob, task_results: Optional[Any], results_cache_uri: str ) -> None: global active_jobs @@ -532,10 +534,10 @@ async def handle_returned_search_job( # Check if we've reached max results elif False == is_reducer_job and max_num_results > 0: if found_max_num_latest_results( - results_cache_uri, - job_id, - max_num_results, - job.remaining_archives_for_search[0]["end_timestamp"], + results_cache_uri, + job_id, + max_num_results, + job.remaining_archives_for_search[0]["end_timestamp"], ): new_job_status = QueryJobStatus.SUCCEEDED if new_job_status == QueryJobStatus.RUNNING: @@ -569,12 +571,12 @@ async def handle_returned_search_job( # We set the status regardless of the job's previous status to handle the case where the # job is cancelled (status = CANCELLING) while we're in this method. if set_job_or_task_status( - db_conn, - QUERY_JOBS_TABLE_NAME, - job_id, - new_job_status, - num_tasks_completed=job.num_archives_searched, - duration=(datetime.datetime.now() - job.start_time).total_seconds(), + db_conn, + QUERY_JOBS_TABLE_NAME, + job_id, + new_job_status, + num_tasks_completed=job.num_archives_searched, + duration=(datetime.datetime.now() - job.start_time).total_seconds(), ): if new_job_status == QueryJobStatus.SUCCEEDED: logger.info(f"Completed job {job_id}.") @@ -620,17 +622,12 @@ async def check_job_status_and_update_db(db_conn_pool, results_cache_uri): if QueryJobType.SEARCH == job.type(): search_job: SearchJob = job await handle_returned_search_job( - db_conn, - search_job, - returned_results, - results_cache_uri + db_conn, search_job, returned_results, results_cache_uri ) else: logger.error(f"Unexpected job type: {job_type}, skipping job {job_id}") - - async def handle_job_updates(db_conn_pool, results_cache_uri: str, jobs_poll_delay: float): while True: await handle_cancelling_search_jobs(db_conn_pool) diff --git a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py index 04ff0fc50..145f99c56 100644 --- a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py +++ b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py @@ -1,13 +1,18 @@ import asyncio import datetime +from abc import ABC, abstractmethod from enum import auto, Enum from typing import Any, Dict, List, Optional -from job_orchestration.scheduler.constants import CompressionTaskStatus, QueryTaskStatus, QueryJobType -from job_orchestration.scheduler.job_config import SearchConfig, QueryConfig, ExtractConfig +from job_orchestration.scheduler.constants import ( + CompressionTaskStatus, + QueryJobType, + QueryTaskStatus, +) +from job_orchestration.scheduler.job_config import ExtractConfig, QueryConfig, SearchConfig from job_orchestration.scheduler.query.reducer_handler import ReducerHandlerMessageQueues -from pydantic import BaseModel, validator, Field -from abc import ABC, abstractmethod +from pydantic import BaseModel, Field, validator + class CompressionJob(BaseModel): id: int @@ -43,12 +48,10 @@ class QueryJob(BaseModel, ABC): current_sub_job_async_task_result: Optional[Any] @abstractmethod - def type(self) -> QueryJobType: - ... + def type(self) -> QueryJobType: ... @abstractmethod - def job_config(self) -> QueryConfig: - ... + def job_config(self) -> QueryConfig: ... class ExtractJob(QueryJob): diff --git a/components/webui/imports/api/search/server/QueryJobsDbManager.js b/components/webui/imports/api/search/server/QueryJobsDbManager.js index 266aac523..68b3f5a9b 100644 --- a/components/webui/imports/api/search/server/QueryJobsDbManager.js +++ b/components/webui/imports/api/search/server/QueryJobsDbManager.js @@ -56,7 +56,8 @@ class QueryJobsDbManager { `INSERT INTO ${this.#queryJobsTableName} (${QUERY_JOBS_TABLE_COLUMN_NAMES.JOB_CONFIG}, ${QUERY_JOBS_TABLE_COLUMN_NAMES.TYPE}) VALUES (?, ?)`, - [Buffer.from(msgpack.encode(searchConfig)), QUERY_JOB_TYPE.SEARCH], + [Buffer.from(msgpack.encode(searchConfig)), + QUERY_JOB_TYPE.SEARCH], ); return queryInsertResults.insertId; From 92e69fd7f4fe55bde677ce5ca45426925b6ac722 Mon Sep 17 00:00:00 2001 From: Haiqi Xu <14502009+haiqi96@users.noreply.github.com> Date: Wed, 19 Jun 2024 15:03:39 -0400 Subject: [PATCH 05/11] fixes --- .../clp-py-utils/clp_py_utils/initialize-orchestration-db.py | 2 +- .../job_orchestration/executor/query/fs_search_task.py | 4 ++-- .../job_orchestration/scheduler/query/query_scheduler.py | 2 +- .../job_orchestration/scheduler/scheduler_data.py | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py b/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py index 4899fb85d..1ed727367 100644 --- a/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py +++ b/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py @@ -97,7 +97,7 @@ def main(argv): f""" CREATE TABLE IF NOT EXISTS `{QUERY_JOBS_TABLE_NAME}` ( `id` INT NOT NULL AUTO_INCREMENT, - `type`INT NOT NULL, + `type` INT NOT NULL, `status` INT NOT NULL DEFAULT '{QueryJobStatus.PENDING}', `creation_time` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), `num_tasks` INT NOT NULL DEFAULT '0', diff --git a/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py b/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py index ac3d1312f..a17c75a5b 100644 --- a/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py +++ b/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py @@ -113,7 +113,7 @@ def search( self: Task, job_id: str, task_id: int, - search_config_obj: dict, + job_config_obj: dict, archive_id: str, clp_metadata_db_conn_params: dict, results_cache_uri: str, @@ -133,7 +133,7 @@ def search( logger.info(f"Started task for job {job_id}") - search_config = SearchConfig.parse_obj(search_config_obj) + search_config = SearchConfig.parse_obj(job_config_obj) sql_adapter = SQL_Adapter(Database.parse_obj(clp_metadata_db_conn_params)) start_time = datetime.datetime.now() diff --git a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py index bbcc14fc3..67a55a9e0 100644 --- a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py @@ -291,7 +291,7 @@ def get_task_group_for_job( job_id=job.id, archive_id=archive_ids[i], task_id=task_ids[i], - search_config_obj=job_config_obj, + job_config_obj=job_config_obj, clp_metadata_db_conn_params=clp_metadata_db_conn_params, results_cache_uri=results_cache_uri, ) diff --git a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py index 145f99c56..6f885b62a 100644 --- a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py +++ b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py @@ -11,7 +11,7 @@ ) from job_orchestration.scheduler.job_config import ExtractConfig, QueryConfig, SearchConfig from job_orchestration.scheduler.query.reducer_handler import ReducerHandlerMessageQueues -from pydantic import BaseModel, Field, validator +from pydantic import BaseModel, validator class CompressionJob(BaseModel): From f0ee6862b37df900577e30f12fce90b1f4c7f32c Mon Sep 17 00:00:00 2001 From: Haiqi Xu <14502009+haiqi96@users.noreply.github.com> Date: Wed, 19 Jun 2024 15:22:11 -0400 Subject: [PATCH 06/11] fixes --- .../job_orchestration/scheduler/query/query_scheduler.py | 4 ++-- .../job_orchestration/scheduler/scheduler_data.py | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py index 67a55a9e0..94247cf01 100644 --- a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py @@ -365,7 +365,7 @@ async def acquire_reducer_for_job(job: SearchJob): logger.info(f"Got reducer for job {job.id} at {reducer_host}:{reducer_port}") -def handle_pending_search_jobs( +def handle_pending_query_jobs( db_conn_pool, clp_metadata_db_conn_params: Dict[str, any], results_cache_uri: str, @@ -648,7 +648,7 @@ async def handle_jobs( tasks = [handle_updating_task] while True: - reducer_acquisition_tasks = handle_pending_search_jobs( + reducer_acquisition_tasks = handle_pending_query_jobs( db_conn_pool, clp_metadata_db_conn_params, results_cache_uri, diff --git a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py index 6f885b62a..e05abb7e8 100644 --- a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py +++ b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py @@ -35,7 +35,6 @@ def valid_status(cls, field): class InternalJobState(Enum): - PENDING = auto() WAITING_FOR_REDUCER = auto() WAITING_FOR_DISPATCH = auto() RUNNING = auto() From 28218ae79de11d55779f0e27c150187a9192ba68 Mon Sep 17 00:00:00 2001 From: Haiqi Xu <14502009+haiqi96@users.noreply.github.com> Date: Wed, 19 Jun 2024 17:06:19 -0400 Subject: [PATCH 07/11] Remove Extract IR related change --- .../job_orchestration/scheduler/constants.py | 1 - .../job_orchestration/scheduler/job_config.py | 7 ------- .../scheduler/query/query_scheduler.py | 3 +-- .../job_orchestration/scheduler/scheduler_data.py | 13 +------------ components/webui/imports/api/search/constants.js | 1 - 5 files changed, 2 insertions(+), 23 deletions(-) diff --git a/components/job-orchestration/job_orchestration/scheduler/constants.py b/components/job-orchestration/job_orchestration/scheduler/constants.py index 22e5e34f4..fc3d5db0e 100644 --- a/components/job-orchestration/job_orchestration/scheduler/constants.py +++ b/components/job-orchestration/job_orchestration/scheduler/constants.py @@ -71,7 +71,6 @@ def to_str(self) -> str: class QueryJobType(IntEnum): SEARCH = 0 - EXTRACT_IR = auto() def __str__(self) -> str: return str(self.value) diff --git a/components/job-orchestration/job_orchestration/scheduler/job_config.py b/components/job-orchestration/job_orchestration/scheduler/job_config.py index fda83af6b..c43d797bc 100644 --- a/components/job-orchestration/job_orchestration/scheduler/job_config.py +++ b/components/job-orchestration/job_orchestration/scheduler/job_config.py @@ -43,13 +43,6 @@ class AggregationConfig(BaseModel): class QueryConfig(BaseModel, ABC): ... -class ExtractConfig(QueryConfig): - orig_file_id: str - msg_ix: int - file_split_id: typing.Optional[str] = None - target_size: typing.Optional[int] = None - - class SearchConfig(QueryConfig): query_string: str max_num_results: int diff --git a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py index 94247cf01..f3fdde31b 100644 --- a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py @@ -41,7 +41,7 @@ from clp_py_utils.sql_adapter import SQL_Adapter from job_orchestration.executor.query.fs_search_task import search from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType, QueryTaskStatus -from job_orchestration.scheduler.job_config import ExtractConfig, SearchConfig +from job_orchestration.scheduler.job_config import SearchConfig from job_orchestration.scheduler.query.reducer_handler import ( handle_reducer_connection, ReducerHandlerMessage, @@ -49,7 +49,6 @@ ReducerHandlerMessageType, ) from job_orchestration.scheduler.scheduler_data import ( - ExtractJob, InternalJobState, QueryJob, QueryTaskResult, diff --git a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py index e05abb7e8..433e6fb9c 100644 --- a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py +++ b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py @@ -9,7 +9,7 @@ QueryJobType, QueryTaskStatus, ) -from job_orchestration.scheduler.job_config import ExtractConfig, QueryConfig, SearchConfig +from job_orchestration.scheduler.job_config import QueryConfig, SearchConfig from job_orchestration.scheduler.query.reducer_handler import ReducerHandlerMessageQueues from pydantic import BaseModel, validator @@ -53,17 +53,6 @@ def type(self) -> QueryJobType: ... def job_config(self) -> QueryConfig: ... -class ExtractJob(QueryJob): - extract_config: ExtractConfig - archive_id: str - - def type(self) -> QueryJobType: - return QueryJobType.EXTRACT_IR - - def job_config(self) -> QueryConfig: - return self.extract_config - - class SearchJob(QueryJob): search_config: SearchConfig num_archives_to_search: int diff --git a/components/webui/imports/api/search/constants.js b/components/webui/imports/api/search/constants.js index 908367b76..fa24a40f0 100644 --- a/components/webui/imports/api/search/constants.js +++ b/components/webui/imports/api/search/constants.js @@ -91,7 +91,6 @@ let enumQueryType; */ const QUERY_JOB_TYPE = Object.freeze({ SEARCH: (enumQueryType = 0), - EXTRACT_IR: ++enumQueryType, }); /* eslint-enable sort-keys */ From 192094bf4fa83f7d6d90273c71bc273d69619501 Mon Sep 17 00:00:00 2001 From: Haiqi Xu <14502009+haiqi96@users.noreply.github.com> Date: Wed, 19 Jun 2024 17:08:14 -0400 Subject: [PATCH 08/11] small fix --- .../job_orchestration/scheduler/query/query_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py index f3fdde31b..56da5e759 100644 --- a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py @@ -618,7 +618,7 @@ async def check_job_status_and_update_db(db_conn_pool, results_cache_uri): if returned_results is None: continue job_type = job.type() - if QueryJobType.SEARCH == job.type(): + if QueryJobType.SEARCH == job_type: search_job: SearchJob = job await handle_returned_search_job( db_conn, search_job, returned_results, results_cache_uri From 4ff9288c623c80d40bf326a224e3eac4e2cce8fd Mon Sep 17 00:00:00 2001 From: Haiqi Xu <14502009+haiqi96@users.noreply.github.com> Date: Mon, 24 Jun 2024 16:23:24 -0400 Subject: [PATCH 09/11] Address code review concerns --- .../scripts/native/search.py | 6 +++--- .../executor/query/fs_search_task.py | 6 +++--- .../job_orchestration/scheduler/constants.py | 2 +- .../job_orchestration/scheduler/job_config.py | 5 ++--- .../scheduler/query/query_scheduler.py | 21 ++++++++++--------- .../scheduler/scheduler_data.py | 14 ++++++------- .../webui/imports/api/search/constants.js | 2 +- .../api/search/server/QueryJobsDbManager.js | 5 +++-- 8 files changed, 31 insertions(+), 30 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/search.py b/components/clp-package-utils/clp_package_utils/scripts/native/search.py index e8fc8da3c..9041b0006 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/search.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/search.py @@ -16,7 +16,7 @@ from clp_py_utils.clp_config import Database, QUERY_JOBS_TABLE_NAME, ResultsCache from clp_py_utils.sql_adapter import SQL_Adapter from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType -from job_orchestration.scheduler.job_config import AggregationConfig, SearchConfig +from job_orchestration.scheduler.job_config import AggregationConfig, SearchJobConfig from clp_package_utils.general import ( CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, @@ -83,7 +83,7 @@ def create_and_monitor_job_in_db( do_count_aggregation: bool | None, count_by_time_bucket_size: int | None, ): - search_config = SearchConfig( + search_config = SearchJobConfig( query_string=wildcard_query, begin_timestamp=begin_timestamp, end_timestamp=end_timestamp, @@ -112,7 +112,7 @@ def create_and_monitor_job_in_db( # Create job db_cursor.execute( f"INSERT INTO `{QUERY_JOBS_TABLE_NAME}` (`job_config`, `type`) VALUES (%s, %s)", - (msgpack.packb(search_config.dict()), QueryJobType.SEARCH), + (msgpack.packb(search_config.dict()), QueryJobType.SEARCH_OR_AGGREGATION), ) db_conn.commit() job_id = db_cursor.lastrowid diff --git a/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py b/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py index a17c75a5b..92522a2d0 100644 --- a/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py +++ b/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py @@ -13,7 +13,7 @@ from clp_py_utils.clp_logging import set_logging_level from clp_py_utils.sql_adapter import SQL_Adapter from job_orchestration.executor.query.celery import app -from job_orchestration.scheduler.job_config import SearchConfig +from job_orchestration.scheduler.job_config import SearchJobConfig from job_orchestration.scheduler.scheduler_data import QueryTaskResult, QueryTaskStatus # Setup logging @@ -41,7 +41,7 @@ def make_command( clp_home: Path, archives_dir: Path, archive_id: str, - search_config: SearchConfig, + search_config: SearchJobConfig, results_cache_uri: str, results_collection: str, ): @@ -133,7 +133,7 @@ def search( logger.info(f"Started task for job {job_id}") - search_config = SearchConfig.parse_obj(job_config_obj) + search_config = SearchJobConfig.parse_obj(job_config_obj) sql_adapter = SQL_Adapter(Database.parse_obj(clp_metadata_db_conn_params)) start_time = datetime.datetime.now() diff --git a/components/job-orchestration/job_orchestration/scheduler/constants.py b/components/job-orchestration/job_orchestration/scheduler/constants.py index fc3d5db0e..b640524d9 100644 --- a/components/job-orchestration/job_orchestration/scheduler/constants.py +++ b/components/job-orchestration/job_orchestration/scheduler/constants.py @@ -70,7 +70,7 @@ def to_str(self) -> str: class QueryJobType(IntEnum): - SEARCH = 0 + SEARCH_OR_AGGREGATION = 0 def __str__(self) -> str: return str(self.value) diff --git a/components/job-orchestration/job_orchestration/scheduler/job_config.py b/components/job-orchestration/job_orchestration/scheduler/job_config.py index c43d797bc..528dce21a 100644 --- a/components/job-orchestration/job_orchestration/scheduler/job_config.py +++ b/components/job-orchestration/job_orchestration/scheduler/job_config.py @@ -1,7 +1,6 @@ from __future__ import annotations import typing -from abc import ABC from pydantic import BaseModel, validator @@ -40,10 +39,10 @@ class AggregationConfig(BaseModel): count_by_time_bucket_size: typing.Optional[int] = None # Milliseconds -class QueryConfig(BaseModel, ABC): ... +class QueryJobConfig(BaseModel): ... -class SearchConfig(QueryConfig): +class SearchJobConfig(QueryJobConfig): query_string: str max_num_results: int tags: typing.Optional[typing.List[str]] = None diff --git a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py index 56da5e759..c2ca50944 100644 --- a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py @@ -41,7 +41,7 @@ from clp_py_utils.sql_adapter import SQL_Adapter from job_orchestration.executor.query.fs_search_task import search from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType, QueryTaskStatus -from job_orchestration.scheduler.job_config import SearchConfig +from job_orchestration.scheduler.job_config import SearchJobConfig from job_orchestration.scheduler.query.reducer_handler import ( handle_reducer_connection, ReducerHandlerMessage, @@ -130,7 +130,7 @@ def fetch_cancelling_search_jobs(db_conn) -> list: SELECT {QUERY_JOBS_TABLE_NAME}.id as job_id FROM {QUERY_JOBS_TABLE_NAME} WHERE {QUERY_JOBS_TABLE_NAME}.status={QueryJobStatus.CANCELLING} - AND {QUERY_JOBS_TABLE_NAME}.type={QueryJobType.SEARCH} + AND {QUERY_JOBS_TABLE_NAME}.type={QueryJobType.SEARCH_OR_AGGREGATION} """ ) return db_cursor.fetchall() @@ -248,7 +248,7 @@ def insert_query_tasks_into_db(db_conn, job_id, archive_ids: List[str]) -> List[ @exception_default_value(default=[]) def get_archives_for_search( db_conn, - search_config: SearchConfig, + search_config: SearchJobConfig, ): query = f"""SELECT id as archive_id, end_timestamp FROM {CLP_METADATA_TABLE_PREFIX}archives @@ -284,7 +284,7 @@ def get_task_group_for_job( clp_metadata_db_conn_params: Dict[str, any], results_cache_uri: str, ): - job_config_obj = job.job_config().dict() + job_config_obj = job.get_config().dict() return celery.group( search.s( job_id=job.id, @@ -377,7 +377,8 @@ def handle_pending_query_jobs( pending_search_jobs = [ job for job in active_jobs.values() - if InternalJobState.WAITING_FOR_DISPATCH == job.state and job.type() == QueryJobType.SEARCH + if InternalJobState.WAITING_FOR_DISPATCH == job.state + and job.get_type() == QueryJobType.SEARCH_OR_AGGREGATION ] with contextlib.closing(db_conn_pool.connect()) as db_conn: @@ -386,12 +387,12 @@ def handle_pending_query_jobs( job_type = job["type"] job_config = job["job_config"] - if QueryJobType.SEARCH == job_type: + if QueryJobType.SEARCH_OR_AGGREGATION == job_type: # Avoid double-dispatch when a job is WAITING_FOR_REDUCER if job_id in active_jobs: continue - search_config = SearchConfig.parse_obj(msgpack.unpackb(job_config)) + search_config = SearchJobConfig.parse_obj(msgpack.unpackb(job_config)) archives_for_search = get_archives_for_search(db_conn, search_config) if len(archives_for_search) == 0: if set_job_or_task_status( @@ -599,7 +600,7 @@ async def check_job_status_and_update_db(db_conn_pool, results_cache_uri): except Exception as e: logger.error(f"Job `{job_id}` failed: {e}.") # Clean up - if QueryJobType.SEARCH == job.type(): + if QueryJobType.SEARCH_OR_AGGREGATION == job.get_type(): if job.reducer_handler_msg_queues is not None: msg = ReducerHandlerMessage(ReducerHandlerMessageType.FAILURE) await job.reducer_handler_msg_queues.put_to_handler(msg) @@ -617,8 +618,8 @@ async def check_job_status_and_update_db(db_conn_pool, results_cache_uri): if returned_results is None: continue - job_type = job.type() - if QueryJobType.SEARCH == job_type: + job_type = job.get_type() + if QueryJobType.SEARCH_OR_AGGREGATION == job_type: search_job: SearchJob = job await handle_returned_search_job( db_conn, search_job, returned_results, results_cache_uri diff --git a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py index 433e6fb9c..d337e0806 100644 --- a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py +++ b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py @@ -9,7 +9,7 @@ QueryJobType, QueryTaskStatus, ) -from job_orchestration.scheduler.job_config import QueryConfig, SearchConfig +from job_orchestration.scheduler.job_config import QueryJobConfig, SearchJobConfig from job_orchestration.scheduler.query.reducer_handler import ReducerHandlerMessageQueues from pydantic import BaseModel, validator @@ -47,24 +47,24 @@ class QueryJob(BaseModel, ABC): current_sub_job_async_task_result: Optional[Any] @abstractmethod - def type(self) -> QueryJobType: ... + def get_type(self) -> QueryJobType: ... @abstractmethod - def job_config(self) -> QueryConfig: ... + def get_config(self) -> QueryJobConfig: ... class SearchJob(QueryJob): - search_config: SearchConfig + search_config: SearchJobConfig num_archives_to_search: int num_archives_searched: int remaining_archives_for_search: List[Dict[str, Any]] reducer_acquisition_task: Optional[asyncio.Task] reducer_handler_msg_queues: Optional[ReducerHandlerMessageQueues] - def type(self) -> QueryJobType: - return QueryJobType.SEARCH + def get_type(self) -> QueryJobType: + return QueryJobType.SEARCH_OR_AGGREGATION - def job_config(self) -> QueryConfig: + def get_config(self) -> QueryJobConfig: return self.search_config class Config: # To allow asyncio.Task and asyncio.Queue diff --git a/components/webui/imports/api/search/constants.js b/components/webui/imports/api/search/constants.js index fa24a40f0..fbc0c3188 100644 --- a/components/webui/imports/api/search/constants.js +++ b/components/webui/imports/api/search/constants.js @@ -90,7 +90,7 @@ let enumQueryType; * @enum {number} */ const QUERY_JOB_TYPE = Object.freeze({ - SEARCH: (enumQueryType = 0), + SEARCH_OR_AGGREGATION: (enumQueryType = 0), }); /* eslint-enable sort-keys */ diff --git a/components/webui/imports/api/search/server/QueryJobsDbManager.js b/components/webui/imports/api/search/server/QueryJobsDbManager.js index 68b3f5a9b..835aae796 100644 --- a/components/webui/imports/api/search/server/QueryJobsDbManager.js +++ b/components/webui/imports/api/search/server/QueryJobsDbManager.js @@ -54,10 +54,11 @@ class QueryJobsDbManager { async submitSearchJob (searchConfig) { const [queryInsertResults] = await this.#sqlDbConnPool.query( `INSERT INTO ${this.#queryJobsTableName} - (${QUERY_JOBS_TABLE_COLUMN_NAMES.JOB_CONFIG}, ${QUERY_JOBS_TABLE_COLUMN_NAMES.TYPE}) + (${QUERY_JOBS_TABLE_COLUMN_NAMES.JOB_CONFIG}, + ${QUERY_JOBS_TABLE_COLUMN_NAMES.TYPE}) VALUES (?, ?)`, [Buffer.from(msgpack.encode(searchConfig)), - QUERY_JOB_TYPE.SEARCH], + QUERY_JOB_TYPE.SEARCH_OR_AGGREGATION], ); return queryInsertResults.insertId; From 261a4dc5652a22e56c24ee608fee44e9f61a0281 Mon Sep 17 00:00:00 2001 From: haiqi96 <14502009+haiqi96@users.noreply.github.com> Date: Mon, 24 Jun 2024 16:26:28 -0400 Subject: [PATCH 10/11] Update components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py Co-authored-by: kirkrodrigues <2454684+kirkrodrigues@users.noreply.github.com> --- .../job_orchestration/scheduler/query/query_scheduler.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py index c2ca50944..882b12353 100644 --- a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py @@ -428,6 +428,9 @@ def handle_pending_query_jobs( pending_search_jobs.append(new_search_job) active_jobs[job_id] = new_search_job else: + # NOTE: We're skipping the job for this iteration, but its status will remain + # unchanged. So this log will print again in the next iteration unless the user + # cancels the job. logger.error(f"Unexpected job type: {job_type}, skipping job {job_id}") continue From 10c08c3d6795cb9c234400bc6d9b8a03ebff609d Mon Sep 17 00:00:00 2001 From: Haiqi Xu <14502009+haiqi96@users.noreply.github.com> Date: Mon, 24 Jun 2024 16:40:21 -0400 Subject: [PATCH 11/11] Address code review concerns --- .../job_orchestration/scheduler/query/query_scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py index c2ca50944..016de8abd 100644 --- a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py @@ -501,7 +501,7 @@ def found_max_num_latest_results( return max_timestamp_in_remaining_archives <= min_timestamp_in_top_results -async def handle_returned_search_job( +async def handle_finished_search_job( db_conn, job: SearchJob, task_results: Optional[Any], results_cache_uri: str ) -> None: global active_jobs @@ -621,7 +621,7 @@ async def check_job_status_and_update_db(db_conn_pool, results_cache_uri): job_type = job.get_type() if QueryJobType.SEARCH_OR_AGGREGATION == job_type: search_job: SearchJob = job - await handle_returned_search_job( + await handle_finished_search_job( db_conn, search_job, returned_results, results_cache_uri ) else: