diff --git a/src/ocrd/mets_server.py b/src/ocrd/mets_server.py index c85368e30..4b4ffa728 100644 --- a/src/ocrd/mets_server.py +++ b/src/ocrd/mets_server.py @@ -1,8 +1,10 @@ """ # METS server functionality """ +import os import re from os import _exit, chmod +import signal from typing import Dict, Optional, Union, List, Tuple from time import sleep from pathlib import Path @@ -428,13 +430,17 @@ def create_process(mets_server_url: str, ws_dir_path: str, log_file: str) -> int @staticmethod def kill_process(mets_server_pid: int): - subprocess_run(args=["kill", "-s", "SIGINT", f"{mets_server_pid}"], shell=False, universal_newlines=True) - return + os.kill(mets_server_pid, signal.SIGINT) + sleep(3) + try: + os.kill(mets_server_pid, signal.SIGKILL) + except ProcessLookupError as e: + pass def shutdown(self): if self.is_uds: if Path(self.url).exists(): - self.log.debug(f'UDS socket {self.url} still exists, removing it') + self.log.warning(f"Due to a server shutdown, removing the existing UDS socket file: {self.url}") Path(self.url).unlink() # os._exit because uvicorn catches SystemExit raised by sys.exit _exit(0) diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index 9c7f15c88..350cf64b9 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -2,6 +2,7 @@ from json import dumps from typing import List, Optional, Tuple from ocrd.decorators.parameter_option import parameter_option, parameter_override_option +from ocrd_network.constants import JobState from ocrd_utils import DEFAULT_METS_BASENAME from ocrd_utils.introspect import set_json_key_value_overrides from ocrd_utils.str import parse_json_string_or_file @@ -104,8 +105,10 @@ def check_processing_job_status(address: Optional[str], processing_job_id: str): @click.option('--result-queue-name') @click.option('--callback-url') @click.option('--agent-type', default='worker') -@click.option('-b', '--block', default=False, +@click.option('-b', '--block', default=False, is_flag=True, help='If set, the client will block till job timeout, fail or success.') +@click.option('-p', '--print-state', default=False, is_flag=True, + help='If set, the client will print job states by each iteration.') def send_processing_job_request( address: Optional[str], processor_name: str, @@ -120,7 +123,8 @@ def send_processing_job_request( # TODO: This is temporally available to toggle # between the ProcessingWorker/ProcessorServer agent_type: Optional[str], - block: Optional[bool] + block: Optional[bool], + print_state: Optional[bool] ): """ Submit a processing job to the processing server. @@ -146,7 +150,7 @@ def send_processing_job_request( assert processing_job_id print(f"Processing job id: {processing_job_id}") if block: - client.poll_job_status(job_id=processing_job_id) + client.poll_job_status(job_id=processing_job_id, print_state=print_state) @client_cli.group('workflow') @@ -176,24 +180,39 @@ def check_workflow_job_status(address: Optional[str], workflow_job_id: str): 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') @click.option('-m', '--path-to-mets', required=True) @click.option('-w', '--path-to-workflow', required=True) -@click.option('-b', '--block', default=False, +@click.option('--page-wise/--no-page-wise', is_flag=True, default=False, help="Whether to generate per-page jobs") +@click.option('-b', '--block', default=False, is_flag=True, help='If set, the client will block till job timeout, fail or success.') +@click.option('-p', '--print-state', default=False, is_flag=True, + help='If set, the client will print job states by each iteration.') def send_workflow_job_request( address: Optional[str], path_to_mets: str, path_to_workflow: str, - block: Optional[bool] + page_wise: bool, + block: bool, + print_state: bool ): """ Submit a workflow job to the processing server. """ client = Client(server_addr_processing=address) - workflow_job_id = client.send_workflow_job_request(path_to_wf=path_to_workflow, path_to_mets=path_to_mets) + workflow_job_id = client.send_workflow_job_request( + path_to_wf=path_to_workflow, + path_to_mets=path_to_mets, + page_wise=page_wise, + ) assert workflow_job_id print(f"Workflow job id: {workflow_job_id}") if block: - client.poll_workflow_status(job_id=workflow_job_id) - + print(f"Polling state of workflow job {workflow_job_id}") + state = client.poll_workflow_status(job_id=workflow_job_id, print_state=print_state) + if state != JobState.success: + print(f"Workflow failed with {state}") + exit(1) + else: + print(f"Workflow succeeded") + exit(0) @client_cli.group('workspace') def workspace_cli(): diff --git a/src/ocrd_network/client.py b/src/ocrd_network/client.py index 8ec8e541e..bb7cf4dbf 100644 --- a/src/ocrd_network/client.py +++ b/src/ocrd_network/client.py @@ -46,18 +46,21 @@ def check_job_status(self, job_id: str): def check_workflow_status(self, workflow_job_id: str): return get_ps_workflow_job_status(self.server_addr_processing, workflow_job_id=workflow_job_id) - def poll_job_status(self, job_id: str) -> str: + def poll_job_status(self, job_id: str, print_state: bool = False) -> str: return poll_job_status_till_timeout_fail_or_success( - ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait) + ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait, + print_state=print_state) - def poll_workflow_status(self, job_id: str) -> str: + def poll_workflow_status(self, job_id: str, print_state: bool = False) -> str: return poll_wf_status_till_timeout_fail_or_success( - ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait) + ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait, + print_state=print_state) def send_processing_job_request(self, processor_name: str, req_params: dict) -> str: return post_ps_processing_request( ps_server_host=self.server_addr_processing, processor=processor_name, job_input=req_params) - def send_workflow_job_request(self, path_to_wf: str, path_to_mets: str): + def send_workflow_job_request(self, path_to_wf: str, path_to_mets: str, page_wise: bool = False): return post_ps_workflow_request( - ps_server_host=self.server_addr_processing, path_to_wf=path_to_wf, path_to_mets=path_to_mets) + ps_server_host=self.server_addr_processing, path_to_wf=path_to_wf, path_to_mets=path_to_mets, + page_wise=page_wise) diff --git a/src/ocrd_network/client_utils.py b/src/ocrd_network/client_utils.py index 9b924c16a..4eaf4ea95 100644 --- a/src/ocrd_network/client_utils.py +++ b/src/ocrd_network/client_utils.py @@ -1,9 +1,10 @@ +import json from requests import get as request_get, post as request_post from time import sleep from .constants import JobState, NETWORK_PROTOCOLS -def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries: int, wait: int): +def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries: int, wait: int, print_state: bool = False) -> JobState: if job_type not in ["workflow", "processor"]: raise ValueError(f"Unknown job type '{job_type}', expected 'workflow' or 'processor'") job_state = JobState.unset @@ -13,18 +14,22 @@ def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries job_state = get_ps_processing_job_status(ps_server_host, job_id) if job_type == "workflow": job_state = get_ps_workflow_job_status(ps_server_host, job_id) + if print_state: + print(f"State of the {job_type} job {job_id}: {job_state}") if job_state == JobState.success or job_state == JobState.failed: break tries -= 1 return job_state -def poll_job_status_till_timeout_fail_or_success(ps_server_host: str, job_id: str, tries: int, wait: int) -> JobState: - return _poll_endpoint_status(ps_server_host, job_id, "processor", tries, wait) +def poll_job_status_till_timeout_fail_or_success( + ps_server_host: str, job_id: str, tries: int, wait: int, print_state: bool = False) -> JobState: + return _poll_endpoint_status(ps_server_host, job_id, "processor", tries, wait, print_state) -def poll_wf_status_till_timeout_fail_or_success(ps_server_host: str, job_id: str, tries: int, wait: int) -> JobState: - return _poll_endpoint_status(ps_server_host, job_id, "workflow", tries, wait) +def poll_wf_status_till_timeout_fail_or_success( + ps_server_host: str, job_id: str, tries: int, wait: int, print_state: bool = False) -> JobState: + return _poll_endpoint_status(ps_server_host, job_id, "workflow", tries, wait, print_state) def get_ps_deployed_processors(ps_server_host: str): @@ -47,22 +52,21 @@ def get_ps_processing_job_log(ps_server_host: str, processing_job_id: str): return response -def get_ps_processing_job_status(ps_server_host: str, processing_job_id: str) -> str: +def get_ps_processing_job_status(ps_server_host: str, processing_job_id: str) -> JobState: request_url = f"{ps_server_host}/processor/job/{processing_job_id}" response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"}) assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}" job_state = response.json()["state"] assert job_state - return job_state - + return getattr(JobState, job_state.lower()) -def get_ps_workflow_job_status(ps_server_host: str, workflow_job_id: str) -> str: +def get_ps_workflow_job_status(ps_server_host: str, workflow_job_id: str) -> JobState: request_url = f"{ps_server_host}/workflow/job-simple/{workflow_job_id}" response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"}) assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}" job_state = response.json()["state"] assert job_state - return job_state + return getattr(JobState, job_state.lower()) def post_ps_processing_request(ps_server_host: str, processor: str, job_input: dict) -> str: @@ -78,9 +82,13 @@ def post_ps_processing_request(ps_server_host: str, processor: str, job_input: d return processing_job_id -# TODO: Can be extended to include other parameters such as page_wise -def post_ps_workflow_request(ps_server_host: str, path_to_wf: str, path_to_mets: str) -> str: - request_url = f"{ps_server_host}/workflow/run?mets_path={path_to_mets}&page_wise=True" +def post_ps_workflow_request( + ps_server_host: str, + path_to_wf: str, + path_to_mets: str, + page_wise: bool = False, +) -> str: + request_url = f"{ps_server_host}/workflow/run?mets_path={path_to_mets}&page_wise={'True' if page_wise else 'False'}" response = request_post( url=request_url, headers={"accept": "application/json; charset=utf-8"}, @@ -88,8 +96,11 @@ def post_ps_workflow_request(ps_server_host: str, path_to_wf: str, path_to_mets: ) # print(response.json()) # print(response.__dict__) + json_resp_raw = response.text + # print(f'post_ps_workflow_request >> {response.status_code}') + # print(f'post_ps_workflow_request >> {json_resp_raw}') assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}" - wf_job_id = response.json()["job_id"] + wf_job_id = json.loads(json_resp_raw)["job_id"] assert wf_job_id return wf_job_id diff --git a/src/ocrd_network/processing_server.py b/src/ocrd_network/processing_server.py index 34c22e5cf..336d04f0d 100644 --- a/src/ocrd_network/processing_server.py +++ b/src/ocrd_network/processing_server.py @@ -1,7 +1,7 @@ from datetime import datetime from os import getpid from pathlib import Path -from typing import Dict, List, Union +from typing import Dict, List, Optional, Union from uvicorn import run as uvicorn_run from fastapi import APIRouter, FastAPI, File, HTTPException, Request, status, UploadFile @@ -48,6 +48,7 @@ get_workflow_content, get_from_database_workspace, get_from_database_workflow_job, + kill_mets_server_zombies, parse_workflow_tasks, raise_http_exception, request_processor_server_tool_json, @@ -200,6 +201,14 @@ def add_api_routes_others(self): tags=[ServerApiTags.WORKSPACE], summary="Forward a TCP request to UDS mets server" ) + others_router.add_api_route( + path="/kill_mets_server_zombies", + endpoint=self.kill_mets_server_zombies, + methods=["DELETE"], + tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING], + status_code=status.HTTP_200_OK, + summary="!! Workaround Do Not Use Unless You Have A Reason !! Kill all METS servers on this machine that have been created more than 60 minutes ago." + ) self.include_router(others_router) def add_api_routes_processing(self): @@ -817,6 +826,10 @@ async def get_workflow_info(self, workflow_job_id) -> Dict: response = self._produce_workflow_status_response(processing_jobs=jobs) return response + async def kill_mets_server_zombies(self, minutes_ago : Optional[int] = None, dry_run : Optional[bool] = None) -> List[int]: + pids_killed = kill_mets_server_zombies(minutes_ago=minutes_ago, dry_run=dry_run) + return pids_killed + async def get_workflow_info_simple(self, workflow_job_id) -> Dict[str, JobState]: """ Simplified version of the `get_workflow_info` that returns a single state for the entire workflow. diff --git a/src/ocrd_network/runtime_data/deployer.py b/src/ocrd_network/runtime_data/deployer.py index b956904d0..90f7c6d5c 100644 --- a/src/ocrd_network/runtime_data/deployer.py +++ b/src/ocrd_network/runtime_data/deployer.py @@ -146,6 +146,11 @@ def start_uds_mets_server(self, ws_dir_path: str) -> Path: if is_mets_server_running(mets_server_url=str(mets_server_url)): self.log.debug(f"The UDS mets server for {ws_dir_path} is already started: {mets_server_url}") return mets_server_url + elif Path(mets_server_url).is_socket(): + self.log.warning( + f"The UDS mets server for {ws_dir_path} is not running but the socket file exists: {mets_server_url}." + "Removing to avoid any weird behavior before starting the server.") + Path(mets_server_url).unlink() self.log.info(f"Starting UDS mets server: {mets_server_url}") pid = OcrdMetsServer.create_process(mets_server_url=mets_server_url, ws_dir_path=ws_dir_path, log_file=log_file) self.mets_servers[mets_server_url] = pid @@ -160,6 +165,9 @@ def stop_uds_mets_server(self, mets_server_url: str, stop_with_pid: bool = False raise Exception(message) mets_server_pid = self.mets_servers[Path(mets_server_url)] OcrdMetsServer.kill_process(mets_server_pid=mets_server_pid) + if Path(mets_server_url).exists(): + self.log.warning(f"Deployer is removing the existing UDS socket file: {mets_server_url}") + Path(mets_server_url).unlink() return # TODO: Reconsider this again # Not having this sleep here causes connection errors diff --git a/src/ocrd_network/server_utils.py b/src/ocrd_network/server_utils.py index 9d8628170..6e485f261 100644 --- a/src/ocrd_network/server_utils.py +++ b/src/ocrd_network/server_utils.py @@ -1,12 +1,18 @@ +import os +import re +import signal +from pathlib import Path +from json import dumps, loads +from urllib.parse import urljoin +from typing import Dict, List, Optional, Union +from time import time + from fastapi import HTTPException, status, UploadFile from fastapi.responses import FileResponse from httpx import AsyncClient, Timeout -from json import dumps, loads from logging import Logger -from pathlib import Path from requests import get as requests_get -from typing import Dict, List, Union -from urllib.parse import urljoin +from requests_unixsocket import sys from ocrd.resolver import Resolver from ocrd.task_sequence import ProcessorTask @@ -241,3 +247,33 @@ def validate_first_task_input_file_groups_existence(logger: Logger, mets_path: s if group not in available_groups: message = f"Input file group '{group}' of the first processor not found: {input_file_grps}" raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message) + + +def kill_mets_server_zombies(minutes_ago : Optional[int], dry_run : Optional[bool]) -> List[int]: + if minutes_ago == None: + minutes_ago = 90 + if dry_run == None: + dry_run = False + + now = time() + cmdline_pat = r'.*ocrd workspace -U.*server start $' + ret = [] + for procdir in sorted(Path('/proc').glob('*'), key=os.path.getctime): + if not procdir.is_dir(): + continue + cmdline_file = procdir.joinpath('cmdline') + if not cmdline_file.is_file(): + continue + ctime_ago = int((now - procdir.stat().st_ctime) / 60) + if ctime_ago < minutes_ago: + continue + cmdline = cmdline_file.read_text().replace('\x00', ' ') + if re.match(cmdline_pat, cmdline): + pid = int(procdir.name) + ret.append(pid) + print(f'METS Server with PID {pid} was created {ctime_ago} minutes ago, more than {minutes_ago}, so killing (cmdline="{cmdline})', file=sys.stderr) + if dry_run: + print(f'[dry_run is active] kill {pid}') + else: + os.kill(pid, signal.SIGTERM) + return ret diff --git a/src/ocrd_utils/config.py b/src/ocrd_utils/config.py index 418245643..d2cc4efce 100644 --- a/src/ocrd_utils/config.py +++ b/src/ocrd_utils/config.py @@ -160,7 +160,7 @@ def _ocrd_download_timeout_parser(val): config.add("OCRD_NETWORK_CLIENT_POLLING_SLEEP", description="How many seconds to sleep before trying again.", parser=int, - default=(True, 30)) + default=(True, 10)) config.add("OCRD_NETWORK_CLIENT_POLLING_TIMEOUT", description="Timeout for a blocking ocrd network client (in seconds).",