Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: (opt-in): terminate handling of work when the request has already timed out #328

Merged
merged 16 commits into from
May 17, 2024
Merged
2 changes: 1 addition & 1 deletion examples/cloud_run_cloud_events/send_cloud_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from cloudevents.http import CloudEvent, to_structured
import requests

from cloudevents.http import CloudEvent, to_structured

# Create a cloudevent using https://github.com/cloudevents/sdk-python
# Note we only need source and type because the cloudevents constructor by
Expand Down
16 changes: 16 additions & 0 deletions playground/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import logging
import time

import functions_framework

logger = logging.getLogger(__name__)


@functions_framework.http
def main(request):
timeout = 2
for _ in range(timeout * 10):
time.sleep(0.1)
logger.info("logging message after timeout elapsed")
return "Hello, world!"

42 changes: 37 additions & 5 deletions src/functions_framework/_http/gunicorn.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2020 Google LLC
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -16,17 +16,43 @@

import gunicorn.app.base

from gunicorn.workers.gthread import ThreadWorker

from ..request_timeout import ThreadingTimeout

# global for use in our custom gthread worker; the gunicorn arbiter spawns these
# and it's not possible to inject (and self.timeout means something different to
# async workers!)
# set/managed in gunicorn application init for test-friendliness
TIMEOUT_SECONDS = None


class GunicornApplication(gunicorn.app.base.BaseApplication):
def __init__(self, app, host, port, debug, **options):
threads = int(os.environ.get("THREADS", (os.cpu_count() or 1) * 4))

global TIMEOUT_SECONDS
TIMEOUT_SECONDS = int(os.environ.get("CLOUD_RUN_TIMEOUT_SECONDS", 0))

self.options = {
"bind": "%s:%s" % (host, port),
"workers": os.environ.get("WORKERS", 1),
"threads": os.environ.get("THREADS", (os.cpu_count() or 1) * 4),
"timeout": os.environ.get("CLOUD_RUN_TIMEOUT_SECONDS", 0),
"loglevel": "error",
"workers": int(os.environ.get("WORKERS", 1)),
"threads": threads,
"loglevel": os.environ.get("GUNICORN_LOG_LEVEL", "error"),
"limit_request_line": 0,
}

if (
TIMEOUT_SECONDS > 0
and threads > 1
and (os.environ.get("THREADED_TIMEOUT_ENABLED", "False").lower() == "true")
): # pragma: no cover
self.options["worker_class"] = (
"functions_framework._http.gunicorn.GThreadWorkerWithTimeoutSupport"
)
else:
self.options["timeout"] = TIMEOUT_SECONDS

self.options.update(options)
self.app = app

Expand All @@ -38,3 +64,9 @@ def load_config(self):

def load(self):
return self.app


class GThreadWorkerWithTimeoutSupport(ThreadWorker): # pragma: no cover
def handle_request(self, req, conn):
with ThreadingTimeout(TIMEOUT_SECONDS):
super(GThreadWorkerWithTimeoutSupport, self).handle_request(req, conn)
6 changes: 5 additions & 1 deletion src/functions_framework/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2020 Google LLC
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,3 +35,7 @@ class MissingTargetException(FunctionsFrameworkException):

class EventConversionException(FunctionsFrameworkException):
pass


class RequestTimeoutException(FunctionsFrameworkException):
pass
42 changes: 42 additions & 0 deletions src/functions_framework/request_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import ctypes
import logging
import threading

from .exceptions import RequestTimeoutException

logger = logging.getLogger(__name__)


class ThreadingTimeout(object): # pragma: no cover
def __init__(self, seconds):
self.seconds = seconds
self.target_tid = threading.current_thread().ident
self.timer = None

def __enter__(self):
self.timer = threading.Timer(self.seconds, self._raise_exc)
self.timer.start()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.timer.cancel()
if exc_type is RequestTimeoutException:
logger.warning(
"Request handling exceeded {0} seconds timeout; terminating request handling...".format(
self.seconds
),
exc_info=(exc_type, exc_val, exc_tb),
)
return False

def _raise_exc(self):
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(self.target_tid), ctypes.py_object(RequestTimeoutException)
)
if ret == 0:
raise ValueError("Invalid thread ID {}".format(self.target_tid))
elif ret > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(self.target_tid), None
)
raise SystemError("PyThreadState_SetAsyncExc failed")
3 changes: 2 additions & 1 deletion tests/test_execution_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,4 +378,5 @@ async def test_maintains_execution_id_for_concurrent_requests(monkeypatch, capsy
logs = record.err.strip().split("\n")
logs_as_json = tuple(json.loads(log) for log in logs)

assert logs_as_json == expected_logs
sort_key = lambda d: d["message"]
assert sorted(logs_as_json, key=sort_key) == sorted(expected_logs, key=sort_key)
12 changes: 12 additions & 0 deletions tests/test_functions/timeout/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import logging
import time

logger = logging.getLogger(__name__)


def function(request):
# sleep for 1200 total ms (1.2 sec)
for _ in range(12):
time.sleep(0.1)
logger.info("some extra logging message")
return "success", 200
Loading
Loading