Skip to content

Commit

Permalink
Merge branch 'network_client_block_prints'
Browse files Browse the repository at this point in the history
  • Loading branch information
kba committed Oct 10, 2024
2 parents a68782d + 7512bd6 commit 252fb4d
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 37 deletions.
12 changes: 9 additions & 3 deletions src/ocrd/mets_server.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""
# METS server functionality
"""
import os
import re
from os import _exit, chmod
import signal
from typing import Dict, Optional, Union, List, Tuple
from time import sleep
from pathlib import Path
Expand Down Expand Up @@ -428,13 +430,17 @@ def create_process(mets_server_url: str, ws_dir_path: str, log_file: str) -> int

@staticmethod
def kill_process(mets_server_pid: int):
subprocess_run(args=["kill", "-s", "SIGINT", f"{mets_server_pid}"], shell=False, universal_newlines=True)
return
os.kill(mets_server_pid, signal.SIGINT)
sleep(3)
try:
os.kill(mets_server_pid, signal.SIGKILL)
except ProcessLookupError as e:
pass

def shutdown(self):
if self.is_uds:
if Path(self.url).exists():
self.log.debug(f'UDS socket {self.url} still exists, removing it')
self.log.warning(f"Due to a server shutdown, removing the existing UDS socket file: {self.url}")
Path(self.url).unlink()
# os._exit because uvicorn catches SystemExit raised by sys.exit
_exit(0)
Expand Down
35 changes: 27 additions & 8 deletions src/ocrd_network/cli/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from json import dumps
from typing import List, Optional, Tuple
from ocrd.decorators.parameter_option import parameter_option, parameter_override_option
from ocrd_network.constants import JobState
from ocrd_utils import DEFAULT_METS_BASENAME
from ocrd_utils.introspect import set_json_key_value_overrides
from ocrd_utils.str import parse_json_string_or_file
Expand Down Expand Up @@ -104,8 +105,10 @@ def check_processing_job_status(address: Optional[str], processing_job_id: str):
@click.option('--result-queue-name')
@click.option('--callback-url')
@click.option('--agent-type', default='worker')
@click.option('-b', '--block', default=False,
@click.option('-b', '--block', default=False, is_flag=True,
help='If set, the client will block till job timeout, fail or success.')
@click.option('-p', '--print-state', default=False, is_flag=True,
help='If set, the client will print job states by each iteration.')
def send_processing_job_request(
address: Optional[str],
processor_name: str,
Expand All @@ -120,7 +123,8 @@ def send_processing_job_request(
# TODO: This is temporally available to toggle
# between the ProcessingWorker/ProcessorServer
agent_type: Optional[str],
block: Optional[bool]
block: Optional[bool],
print_state: Optional[bool]
):
"""
Submit a processing job to the processing server.
Expand All @@ -146,7 +150,7 @@ def send_processing_job_request(
assert processing_job_id
print(f"Processing job id: {processing_job_id}")
if block:
client.poll_job_status(job_id=processing_job_id)
client.poll_job_status(job_id=processing_job_id, print_state=print_state)


@client_cli.group('workflow')
Expand Down Expand Up @@ -176,24 +180,39 @@ def check_workflow_job_status(address: Optional[str], workflow_job_id: str):
'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default')
@click.option('-m', '--path-to-mets', required=True)
@click.option('-w', '--path-to-workflow', required=True)
@click.option('-b', '--block', default=False,
@click.option('--page-wise/--no-page-wise', is_flag=True, default=False, help="Whether to generate per-page jobs")
@click.option('-b', '--block', default=False, is_flag=True,
help='If set, the client will block till job timeout, fail or success.')
@click.option('-p', '--print-state', default=False, is_flag=True,
help='If set, the client will print job states by each iteration.')
def send_workflow_job_request(
address: Optional[str],
path_to_mets: str,
path_to_workflow: str,
block: Optional[bool]
page_wise: bool,
block: bool,
print_state: bool
):
"""
Submit a workflow job to the processing server.
"""
client = Client(server_addr_processing=address)
workflow_job_id = client.send_workflow_job_request(path_to_wf=path_to_workflow, path_to_mets=path_to_mets)
workflow_job_id = client.send_workflow_job_request(
path_to_wf=path_to_workflow,
path_to_mets=path_to_mets,
page_wise=page_wise,
)
assert workflow_job_id
print(f"Workflow job id: {workflow_job_id}")
if block:
client.poll_workflow_status(job_id=workflow_job_id)

print(f"Polling state of workflow job {workflow_job_id}")
state = client.poll_workflow_status(job_id=workflow_job_id, print_state=print_state)
if state != JobState.success:
print(f"Workflow failed with {state}")
exit(1)
else:
print(f"Workflow succeeded")
exit(0)

@client_cli.group('workspace')
def workspace_cli():
Expand Down
15 changes: 9 additions & 6 deletions src/ocrd_network/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,21 @@ def check_job_status(self, job_id: str):
def check_workflow_status(self, workflow_job_id: str):
return get_ps_workflow_job_status(self.server_addr_processing, workflow_job_id=workflow_job_id)

def poll_job_status(self, job_id: str) -> str:
def poll_job_status(self, job_id: str, print_state: bool = False) -> str:
return poll_job_status_till_timeout_fail_or_success(
ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait)
ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait,
print_state=print_state)

def poll_workflow_status(self, job_id: str) -> str:
def poll_workflow_status(self, job_id: str, print_state: bool = False) -> str:
return poll_wf_status_till_timeout_fail_or_success(
ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait)
ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait,
print_state=print_state)

def send_processing_job_request(self, processor_name: str, req_params: dict) -> str:
return post_ps_processing_request(
ps_server_host=self.server_addr_processing, processor=processor_name, job_input=req_params)

def send_workflow_job_request(self, path_to_wf: str, path_to_mets: str):
def send_workflow_job_request(self, path_to_wf: str, path_to_mets: str, page_wise: bool = False):
return post_ps_workflow_request(
ps_server_host=self.server_addr_processing, path_to_wf=path_to_wf, path_to_mets=path_to_mets)
ps_server_host=self.server_addr_processing, path_to_wf=path_to_wf, path_to_mets=path_to_mets,
page_wise=page_wise)
39 changes: 25 additions & 14 deletions src/ocrd_network/client_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import json
from requests import get as request_get, post as request_post
from time import sleep
from .constants import JobState, NETWORK_PROTOCOLS


def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries: int, wait: int):
def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries: int, wait: int, print_state: bool = False) -> JobState:
if job_type not in ["workflow", "processor"]:
raise ValueError(f"Unknown job type '{job_type}', expected 'workflow' or 'processor'")
job_state = JobState.unset
Expand All @@ -13,18 +14,22 @@ def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries
job_state = get_ps_processing_job_status(ps_server_host, job_id)
if job_type == "workflow":
job_state = get_ps_workflow_job_status(ps_server_host, job_id)
if print_state:
print(f"State of the {job_type} job {job_id}: {job_state}")
if job_state == JobState.success or job_state == JobState.failed:
break
tries -= 1
return job_state


def poll_job_status_till_timeout_fail_or_success(ps_server_host: str, job_id: str, tries: int, wait: int) -> JobState:
return _poll_endpoint_status(ps_server_host, job_id, "processor", tries, wait)
def poll_job_status_till_timeout_fail_or_success(
ps_server_host: str, job_id: str, tries: int, wait: int, print_state: bool = False) -> JobState:
return _poll_endpoint_status(ps_server_host, job_id, "processor", tries, wait, print_state)


def poll_wf_status_till_timeout_fail_or_success(ps_server_host: str, job_id: str, tries: int, wait: int) -> JobState:
return _poll_endpoint_status(ps_server_host, job_id, "workflow", tries, wait)
def poll_wf_status_till_timeout_fail_or_success(
ps_server_host: str, job_id: str, tries: int, wait: int, print_state: bool = False) -> JobState:
return _poll_endpoint_status(ps_server_host, job_id, "workflow", tries, wait, print_state)


def get_ps_deployed_processors(ps_server_host: str):
Expand All @@ -47,22 +52,21 @@ def get_ps_processing_job_log(ps_server_host: str, processing_job_id: str):
return response


def get_ps_processing_job_status(ps_server_host: str, processing_job_id: str) -> str:
def get_ps_processing_job_status(ps_server_host: str, processing_job_id: str) -> JobState:
request_url = f"{ps_server_host}/processor/job/{processing_job_id}"
response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"})
assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}"
job_state = response.json()["state"]
assert job_state
return job_state

return getattr(JobState, job_state.lower())

def get_ps_workflow_job_status(ps_server_host: str, workflow_job_id: str) -> str:
def get_ps_workflow_job_status(ps_server_host: str, workflow_job_id: str) -> JobState:
request_url = f"{ps_server_host}/workflow/job-simple/{workflow_job_id}"
response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"})
assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}"
job_state = response.json()["state"]
assert job_state
return job_state
return getattr(JobState, job_state.lower())


def post_ps_processing_request(ps_server_host: str, processor: str, job_input: dict) -> str:
Expand All @@ -78,18 +82,25 @@ def post_ps_processing_request(ps_server_host: str, processor: str, job_input: d
return processing_job_id


# TODO: Can be extended to include other parameters such as page_wise
def post_ps_workflow_request(ps_server_host: str, path_to_wf: str, path_to_mets: str) -> str:
request_url = f"{ps_server_host}/workflow/run?mets_path={path_to_mets}&page_wise=True"
def post_ps_workflow_request(
ps_server_host: str,
path_to_wf: str,
path_to_mets: str,
page_wise: bool = False,
) -> str:
request_url = f"{ps_server_host}/workflow/run?mets_path={path_to_mets}&page_wise={'True' if page_wise else 'False'}"
response = request_post(
url=request_url,
headers={"accept": "application/json; charset=utf-8"},
files={"workflow": open(path_to_wf, "rb")}
)
# print(response.json())
# print(response.__dict__)
json_resp_raw = response.text
# print(f'post_ps_workflow_request >> {response.status_code}')
# print(f'post_ps_workflow_request >> {json_resp_raw}')
assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}"
wf_job_id = response.json()["job_id"]
wf_job_id = json.loads(json_resp_raw)["job_id"]
assert wf_job_id
return wf_job_id

Expand Down
15 changes: 14 additions & 1 deletion src/ocrd_network/processing_server.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import datetime
from os import getpid
from pathlib import Path
from typing import Dict, List, Union
from typing import Dict, List, Optional, Union
from uvicorn import run as uvicorn_run

from fastapi import APIRouter, FastAPI, File, HTTPException, Request, status, UploadFile
Expand Down Expand Up @@ -48,6 +48,7 @@
get_workflow_content,
get_from_database_workspace,
get_from_database_workflow_job,
kill_mets_server_zombies,
parse_workflow_tasks,
raise_http_exception,
request_processor_server_tool_json,
Expand Down Expand Up @@ -200,6 +201,14 @@ def add_api_routes_others(self):
tags=[ServerApiTags.WORKSPACE],
summary="Forward a TCP request to UDS mets server"
)
others_router.add_api_route(
path="/kill_mets_server_zombies",
endpoint=self.kill_mets_server_zombies,
methods=["DELETE"],
tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING],
status_code=status.HTTP_200_OK,
summary="!! Workaround Do Not Use Unless You Have A Reason !! Kill all METS servers on this machine that have been created more than 60 minutes ago."
)
self.include_router(others_router)

def add_api_routes_processing(self):
Expand Down Expand Up @@ -817,6 +826,10 @@ async def get_workflow_info(self, workflow_job_id) -> Dict:
response = self._produce_workflow_status_response(processing_jobs=jobs)
return response

async def kill_mets_server_zombies(self, minutes_ago : Optional[int] = None, dry_run : Optional[bool] = None) -> List[int]:
pids_killed = kill_mets_server_zombies(minutes_ago=minutes_ago, dry_run=dry_run)
return pids_killed

async def get_workflow_info_simple(self, workflow_job_id) -> Dict[str, JobState]:
"""
Simplified version of the `get_workflow_info` that returns a single state for the entire workflow.
Expand Down
8 changes: 8 additions & 0 deletions src/ocrd_network/runtime_data/deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ def start_uds_mets_server(self, ws_dir_path: str) -> Path:
if is_mets_server_running(mets_server_url=str(mets_server_url)):
self.log.debug(f"The UDS mets server for {ws_dir_path} is already started: {mets_server_url}")
return mets_server_url
elif Path(mets_server_url).is_socket():
self.log.warning(
f"The UDS mets server for {ws_dir_path} is not running but the socket file exists: {mets_server_url}."
"Removing to avoid any weird behavior before starting the server.")
Path(mets_server_url).unlink()
self.log.info(f"Starting UDS mets server: {mets_server_url}")
pid = OcrdMetsServer.create_process(mets_server_url=mets_server_url, ws_dir_path=ws_dir_path, log_file=log_file)
self.mets_servers[mets_server_url] = pid
Expand All @@ -160,6 +165,9 @@ def stop_uds_mets_server(self, mets_server_url: str, stop_with_pid: bool = False
raise Exception(message)
mets_server_pid = self.mets_servers[Path(mets_server_url)]
OcrdMetsServer.kill_process(mets_server_pid=mets_server_pid)
if Path(mets_server_url).exists():
self.log.warning(f"Deployer is removing the existing UDS socket file: {mets_server_url}")
Path(mets_server_url).unlink()
return
# TODO: Reconsider this again
# Not having this sleep here causes connection errors
Expand Down
44 changes: 40 additions & 4 deletions src/ocrd_network/server_utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import os
import re
import signal
from pathlib import Path
from json import dumps, loads
from urllib.parse import urljoin
from typing import Dict, List, Optional, Union
from time import time

from fastapi import HTTPException, status, UploadFile
from fastapi.responses import FileResponse
from httpx import AsyncClient, Timeout
from json import dumps, loads
from logging import Logger
from pathlib import Path
from requests import get as requests_get
from typing import Dict, List, Union
from urllib.parse import urljoin
from requests_unixsocket import sys

from ocrd.resolver import Resolver
from ocrd.task_sequence import ProcessorTask
Expand Down Expand Up @@ -241,3 +247,33 @@ def validate_first_task_input_file_groups_existence(logger: Logger, mets_path: s
if group not in available_groups:
message = f"Input file group '{group}' of the first processor not found: {input_file_grps}"
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message)


def kill_mets_server_zombies(minutes_ago : Optional[int], dry_run : Optional[bool]) -> List[int]:
if minutes_ago == None:
minutes_ago = 90
if dry_run == None:
dry_run = False

now = time()
cmdline_pat = r'.*ocrd workspace -U.*server start $'
ret = []
for procdir in sorted(Path('/proc').glob('*'), key=os.path.getctime):
if not procdir.is_dir():
continue
cmdline_file = procdir.joinpath('cmdline')
if not cmdline_file.is_file():
continue
ctime_ago = int((now - procdir.stat().st_ctime) / 60)
if ctime_ago < minutes_ago:
continue
cmdline = cmdline_file.read_text().replace('\x00', ' ')
if re.match(cmdline_pat, cmdline):
pid = int(procdir.name)
ret.append(pid)
print(f'METS Server with PID {pid} was created {ctime_ago} minutes ago, more than {minutes_ago}, so killing (cmdline="{cmdline})', file=sys.stderr)
if dry_run:
print(f'[dry_run is active] kill {pid}')
else:
os.kill(pid, signal.SIGTERM)
return ret
2 changes: 1 addition & 1 deletion src/ocrd_utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def _ocrd_download_timeout_parser(val):
config.add("OCRD_NETWORK_CLIENT_POLLING_SLEEP",
description="How many seconds to sleep before trying again.",
parser=int,
default=(True, 30))
default=(True, 10))

config.add("OCRD_NETWORK_CLIENT_POLLING_TIMEOUT",
description="Timeout for a blocking ocrd network client (in seconds).",
Expand Down

0 comments on commit 252fb4d

Please sign in to comment.