Skip to content

Commit

Permalink
Merge pull request #659 from gschwind/new-handle-process
Browse files Browse the repository at this point in the history
Better handle of crashed processes
  • Loading branch information
cehbrecht authored Sep 5, 2022
2 parents 5f84f5b + 57b6301 commit 85ca819
Showing 11 changed files with 169 additions and 71 deletions.
2 changes: 1 addition & 1 deletion docs/api.rst
Original file line number Diff line number Diff line change
@@ -95,7 +95,7 @@ Request and response objects
A MultiDict object containing input values sent by the client.


.. autoclass:: pywps.response.WPSResponse
.. autoclass:: pywps.response.basic.WPSResponse
:members:

.. attribute:: status
6 changes: 3 additions & 3 deletions docs/process.rst
Original file line number Diff line number Diff line change
@@ -53,7 +53,7 @@ The instance of a *Process* needs following attributes to be configured:
:outputs:
list of process outputs
:handler:
method which recieves :class:`pywps.app.WPSRequest` and :class:`pywps.response.WPSResponse` as inputs.
method which recieves :class:`pywps.app.WPSRequest` and :class:`pywps.response.basic.WPSResponse` as inputs.

Example vector buffer process
=============================
@@ -118,12 +118,12 @@ Next we create a new list variables for inputs and outputs.

Next we define the *handler* method. In it, *geospatial analysis
may happen*. The method gets a :class:`pywps.app.WPSRequest` and a
:class:`pywps.response.WPSResponse` object as parameters. In our case, we
:class:`pywps.response.basic.WPSResponse` object as parameters. In our case, we
calculate the buffer around each vector feature using
`GDAL/OGR library <https://gdal.org>`_. We will not got much into the details,
what you should note is how to get input data from the
:class:`pywps.app.WPSRequest` object and how to set data as outputs in the
:class:`pywps.response.WPSResponse` object.
:class:`pywps.response.basic.WPSResponse` object.

.. literalinclude:: demobuffer.py
:language: python
9 changes: 8 additions & 1 deletion pywps/app/Process.py
Original file line number Diff line number Diff line change
@@ -194,6 +194,11 @@ def _execute_process(self, async_, wps_request, wps_response):

running, stored = dblog.get_process_counts()

if maxparallel != -1 and running >= maxparallel:
# Try to check for crashed process
dblog.cleanup_crashed_process()
running, stored = dblog.get_process_counts()

# async
if async_:

@@ -238,7 +243,9 @@ def _run_async(self, wps_request, wps_response):
# This function may not raise exception and must return a valid wps_response
# Failure must be reported as wps_response.status = WPS_STATUS.FAILED
def _run_process(self, wps_request, wps_response):
LOGGER.debug("Started processing request: {}".format(self.uuid))
LOGGER.debug("Started processing request: {} with pid: {}".format(self.uuid, os.getpid()))
# Update the actual pid of current process to check if failed latter
dblog.update_pid(self.uuid, os.getpid())
try:
self._set_grass(wps_request)
# if required set HOME to the current working directory.
52 changes: 52 additions & 0 deletions pywps/dblog.py
Original file line number Diff line number Diff line change
@@ -8,6 +8,8 @@
"""

import logging
import sys

from pywps import configuration
from pywps.exceptions import NoApplicableCode
import sqlite3
@@ -23,6 +25,8 @@
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool, StaticPool

from pywps.response.status import WPS_STATUS

LOGGER = logging.getLogger('PYWPS')
_SESSION_MAKER = None

@@ -128,6 +132,54 @@ def store_status(uuid, wps_status, message=None, status_percentage=None):
session.close()


def update_pid(uuid, pid):
"""Update actual pid for the uuid processing
"""
session = get_session()

requests = session.query(ProcessInstance).filter_by(uuid=str(uuid))
if requests.count():
request = requests.one()
request.pid = pid
session.commit()
session.close()


def cleanup_crashed_process():
# TODO: implement other platform
if sys.platform != "linux":
return

session = get_session()
stored_query = session.query(RequestInstance.uuid)
running_cur = (
session.query(ProcessInstance)
.filter(ProcessInstance.status.in_([WPS_STATUS.STARTED, WPS_STATUS.PAUSED]))
.filter(~ProcessInstance.uuid.in_(stored_query))
)

failed = []
running = [(p.uuid, p.pid) for p in running_cur]
for uuid, pid in running:
# No process with this pid, the process has crashed
if not os.path.exists(os.path.join("/proc", str(pid))):
failed.append(uuid)
continue

# If we can't read the environ, that mean the process belong another user
# which mean that this is not our process, thus our process has crashed
# this not work because root is the user for the apache
# if not os.access(os.path.join("/proc", str(pid), "environ"), os.R_OK):
# failed.append(uuid)
# continue
pass

for uuid in failed:
store_status(uuid, WPS_STATUS.FAILED, "Process crashed", 100)

session.close()


def _get_identifier(request):
"""Get operation identifier
"""
6 changes: 5 additions & 1 deletion pywps/processing/__init__.py
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@
##################################################################

import pywps.configuration as config
from pywps.processing.basic import MultiProcessing
from pywps.processing.basic import MultiProcessing, DetachProcessing
from pywps.processing.scheduler import Scheduler
# api only
from pywps.processing.basic import Processing # noqa: F401
@@ -14,6 +14,7 @@
LOGGER = logging.getLogger("PYWPS")

MULTIPROCESSING = 'multiprocessing'
DETACHPROCESSING = 'detachprocessing'
SCHEDULER = 'scheduler'
DEFAULT = MULTIPROCESSING

@@ -29,6 +30,9 @@ def Process(process, wps_request, wps_response):
LOGGER.info("Processing mode: {}".format(mode))
if mode == SCHEDULER:
process = Scheduler(process, wps_request, wps_response)
elif mode == DETACHPROCESSING:
process = DetachProcessing(process, wps_request, wps_response)
else:
process = MultiProcessing(process, wps_request, wps_response)

return process
30 changes: 30 additions & 0 deletions pywps/processing/basic.py
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@
# Copyright 2018 Open Source Geospatial Foundation and others #
# licensed under MIT, Please consult LICENSE.txt for details #
##################################################################
import os

from pywps.processing.job import Job

@@ -34,3 +35,32 @@ def start(self):
args=(self.job.wps_request, self.job.wps_response)
)
process.start()


class DetachProcessing(Processing):
"""
:class:`DetachProcessing` run job as detached process. The process will be run as child of pid 1
"""

def start(self):
pid = os.fork()
if pid != 0:
# Wait that the children get detached.
os.waitpid(pid, 0)
return

# Detach ourself.

# Ensure that we are the session leader to avoid to be zombified.
os.setsid()
if os.fork():
# Stop running now
os._exit(0)

# We are the detached child, run the actual process
try:
getattr(self.job.process, self.job.method)(self.job.wps_request, self.job.wps_response)
except Exception:
pass
# Ensure to stop ourself here what ever append.
os._exit(0)
64 changes: 2 additions & 62 deletions pywps/response/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
from abc import abstractmethod
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from pywps import WPSRequest

from pywps.dblog import store_status
from pywps.response.status import WPS_STATUS
from pywps.translations import get_translation
from jinja2 import Environment, PackageLoader
import os

from jinja2 import Environment


class RelEnvironment(Environment):
"""Override join_path() to enable relative template paths."""
@@ -28,57 +22,3 @@ def get_response(operation):
return DescribeResponse
elif operation == "execute":
return ExecuteResponse


class WPSResponse(object):

def __init__(self, wps_request: 'WPSRequest', uuid=None, version="1.0.0"):

self.wps_request = wps_request
self.uuid = uuid
self.message = ''
self.status = WPS_STATUS.ACCEPTED
self.status_percentage = 0
self.doc = None
self.content_type = None
self.version = version
self.template_env = RelEnvironment(
loader=PackageLoader('pywps', 'templates'),
trim_blocks=True, lstrip_blocks=True,
autoescape=True,
)
self.template_env.globals.update(get_translation=get_translation)

def _update_status(self, status, message, status_percentage):
"""
Update status report of currently running process instance
:param str message: Message you need to share with the client
:param int status_percentage: Percent done (number betwen <0-100>)
:param pywps.response.status.WPS_STATUS status: process status - user should usually
ommit this parameter
"""
self.message = message
self.status = status
self.status_percentage = status_percentage
store_status(self.uuid, self.status, self.message, self.status_percentage)

@abstractmethod
def _construct_doc(self):
...

def get_response_doc(self):
try:
self.doc, self.content_type = self._construct_doc()
except Exception as e:
if hasattr(e, "description"):
msg = e.description
else:
msg = e
self._update_status(WPS_STATUS.FAILED, msg, 100)
raise e

else:
self._update_status(WPS_STATUS.SUCCEEDED, "Response generated", 100)

return self.doc, self.content_type
65 changes: 65 additions & 0 deletions pywps/response/basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from abc import abstractmethod
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from pywps import WPSRequest

from pywps.dblog import store_status
from . import RelEnvironment
from .status import WPS_STATUS
from pywps.translations import get_translation
from jinja2 import Environment, PackageLoader
import os


class WPSResponse(object):

def __init__(self, wps_request: 'WPSRequest', uuid=None, version="1.0.0"):

self.wps_request = wps_request
self.uuid = uuid
self.message = ''
self.status = WPS_STATUS.ACCEPTED
self.status_percentage = 0
self.doc = None
self.content_type = None
self.version = version
self.template_env = RelEnvironment(
loader=PackageLoader('pywps', 'templates'),
trim_blocks=True, lstrip_blocks=True,
autoescape=True,
)
self.template_env.globals.update(get_translation=get_translation)

def _update_status(self, status, message, status_percentage):
"""
Update status report of currently running process instance
:param str message: Message you need to share with the client
:param int status_percentage: Percent done (number betwen <0-100>)
:param pywps.response.status.WPS_STATUS status: process status - user should usually
ommit this parameter
"""
self.message = message
self.status = status
self.status_percentage = status_percentage
store_status(self.uuid, self.status, self.message, self.status_percentage)

@abstractmethod
def _construct_doc(self):
...

def get_response_doc(self):
try:
self.doc, self.content_type = self._construct_doc()
except Exception as e:
if hasattr(e, "description"):
msg = e.description
else:
msg = e
self._update_status(WPS_STATUS.FAILED, msg, 100)
raise e

else:
self._update_status(WPS_STATUS.SUCCEEDED, "Response generated", 100)

return self.doc, self.content_type
2 changes: 1 addition & 1 deletion pywps/response/capabilities.py
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@
from werkzeug.wrappers import Request
import pywps.configuration as config
from pywps.app.basic import make_response, get_response_type, get_json_indent
from pywps.response import WPSResponse
from .basic import WPSResponse
from pywps import __version__
from pywps.exceptions import NoApplicableCode
import os
2 changes: 1 addition & 1 deletion pywps/response/describe.py
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@
from pywps.exceptions import NoApplicableCode
from pywps.exceptions import MissingParameterValue
from pywps.exceptions import InvalidParameterValue
from pywps.response import WPSResponse
from .basic import WPSResponse
from pywps import __version__
import os

2 changes: 1 addition & 1 deletion pywps/response/execute.py
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@

from pywps.inout.array_encode import ArrayEncoder
from pywps.response.status import WPS_STATUS
from pywps.response import WPSResponse
from .basic import WPSResponse
from pywps.inout.formats import FORMATS
from pywps.inout.outputs import ComplexOutput

0 comments on commit 85ca819

Please sign in to comment.