Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: (opt-in): terminate handling of work when the request has already timed out #328

Merged
merged 16 commits into from
May 17, 2024
Merged
2 changes: 1 addition & 1 deletion examples/cloud_run_cloud_events/send_cloud_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from cloudevents.http import CloudEvent, to_structured
import requests

from cloudevents.http import CloudEvent, to_structured

# Create a cloudevent using https://github.com/cloudevents/sdk-python
# Note we only need source and type because the cloudevents constructor by
Expand Down
16 changes: 16 additions & 0 deletions playground/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import logging
import time

import functions_framework

logger = logging.getLogger(__name__)


@functions_framework.http
def main(request):
timeout = 2
for _ in range(timeout * 10):
time.sleep(0.1)
logger.info("logging message after timeout elapsed")
return "Hello, world!"

46 changes: 41 additions & 5 deletions src/functions_framework/_http/gunicorn.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2020 Google LLC
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -16,17 +16,43 @@

import gunicorn.app.base

from gunicorn.workers.gthread import ThreadWorker

from ..request_timeout import ThreadingTimeout

# global for use in our custom gthread worker; the gunicorn arbiter spawns these
# and it's not possible to inject (and self.timeout means something different to
# async workers!)
# set/managed in gunicorn application init for test-friendliness
TIMEOUT_SECONDS = None


class GunicornApplication(gunicorn.app.base.BaseApplication):
def __init__(self, app, host, port, debug, **options):
threads = int(os.environ.get("THREADS", (os.cpu_count() or 1) * 4))

global TIMEOUT_SECONDS
TIMEOUT_SECONDS = int(os.environ.get("CLOUD_RUN_TIMEOUT_SECONDS", 0))

self.options = {
"bind": "%s:%s" % (host, port),
"workers": os.environ.get("WORKERS", 1),
"threads": os.environ.get("THREADS", (os.cpu_count() or 1) * 4),
"timeout": os.environ.get("CLOUD_RUN_TIMEOUT_SECONDS", 0),
"loglevel": "error",
"workers": int(os.environ.get("WORKERS", 1)),
"threads": threads,
"loglevel": os.environ.get("GUNICORN_LOG_LEVEL", "error"),
"limit_request_line": 0,
}

if (
TIMEOUT_SECONDS > 0
and threads > 1
and _is_truthy(os.environ.get("THREADED_TIMEOUT_ENABLED", "False"))
): # pragma: no cover
self.options["worker_class"] = (
"functions_framework._http.gunicorn.GThreadWorkerWithTimeoutSupport"
)
else:
self.options["timeout"] = TIMEOUT_SECONDS

self.options.update(options)
self.app = app

Expand All @@ -38,3 +64,13 @@ def load_config(self):

def load(self):
return self.app


class GThreadWorkerWithTimeoutSupport(ThreadWorker): # pragma: no cover
def handle_request(self, req, conn):
with ThreadingTimeout(TIMEOUT_SECONDS):
super(GThreadWorkerWithTimeoutSupport, self).handle_request(req, conn)


def _is_truthy(s): # pragma: no cover
return str(s).lower() in ("yes", "y", "1", "yeah", "true", "t")
6 changes: 5 additions & 1 deletion src/functions_framework/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2020 Google LLC
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,3 +35,7 @@ class MissingTargetException(FunctionsFrameworkException):

class EventConversionException(FunctionsFrameworkException):
pass


class RequestTimeoutException(FunctionsFrameworkException):
pass
42 changes: 42 additions & 0 deletions src/functions_framework/request_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import ctypes
import logging
import threading

from .exceptions import RequestTimeoutException

logger = logging.getLogger(__name__)


class ThreadingTimeout(object): # pragma: no cover
def __init__(self, seconds):
self.seconds = seconds
self.target_tid = threading.current_thread().ident
self.timer = None

def __enter__(self):
self.timer = threading.Timer(self.seconds, self._raise_exc)
self.timer.start()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.timer.cancel()
if exc_type is RequestTimeoutException:
logger.warning(
"Request handling exceeded {0} seconds timeout; terminating request handling...".format(
self.seconds
),
exc_info=(exc_type, exc_val, exc_tb),
)
return False

def _raise_exc(self):
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(self.target_tid), ctypes.py_object(RequestTimeoutException)
)
if ret == 0:
raise ValueError("Invalid thread ID {}".format(self.target_tid))
elif ret > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(self.target_tid), None
)
raise SystemError("PyThreadState_SetAsyncExc failed")
3 changes: 2 additions & 1 deletion tests/test_execution_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,4 +378,5 @@ async def test_maintains_execution_id_for_concurrent_requests(monkeypatch, capsy
logs = record.err.strip().split("\n")
logs_as_json = tuple(json.loads(log) for log in logs)

assert logs_as_json == expected_logs
sort_key = lambda d: d["message"]
assert sorted(logs_as_json, key=sort_key) == sorted(expected_logs, key=sort_key)
12 changes: 12 additions & 0 deletions tests/test_functions/timeout/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import logging
import time

logger = logging.getLogger(__name__)


def function(request):
# sleep for 1200 total ms (1.8 sec)
jrmfg marked this conversation as resolved.
Show resolved Hide resolved
for _ in range(12):
time.sleep(0.1)
logger.info("some extra logging message")
return "success", 200
198 changes: 198 additions & 0 deletions tests/test_timeouts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pathlib
import socket
import time

from multiprocessing import Process

import pytest
import requests

ff_gunicorn = pytest.importorskip("functions_framework._http.gunicorn")


from functions_framework import create_app

TEST_FUNCTIONS_DIR = pathlib.Path(__file__).resolve().parent / "test_functions"
TEST_HOST = "0.0.0.0"
TEST_PORT = "8080"


@pytest.fixture(autouse=True)
def run_around_tests():
# the test samples test also listens on 8080, so let's be good stewards of
# the port and make sure it's free
_wait_for_no_listen(TEST_HOST, TEST_PORT)
yield
_wait_for_no_listen(TEST_HOST, TEST_PORT)


@pytest.mark.skipif("platform.system() == 'Windows'")
@pytest.mark.skipif("platform.system() == 'Darwin'")
@pytest.mark.slow_integration_test
def test_no_timeout_allows_request_processing_to_finish():
source = TEST_FUNCTIONS_DIR / "timeout" / "main.py"
target = "function"

app = create_app(target, source)

options = {}

gunicorn_app = ff_gunicorn.GunicornApplication(
app, TEST_HOST, TEST_PORT, False, **options
)

gunicorn_p = Process(target=gunicorn_app.run)
gunicorn_p.start()

_wait_for_listen(TEST_HOST, TEST_PORT)

result = requests.get("http://{}:{}/".format(TEST_HOST, TEST_PORT))

gunicorn_p.terminate()
gunicorn_p.join()

assert result.status_code == 200


@pytest.mark.skipif("platform.system() == 'Windows'")
@pytest.mark.skipif("platform.system() == 'Darwin'")
@pytest.mark.slow_integration_test
def test_timeout_but_not_threaded_timeout_enabled_does_not_kill(monkeypatch):
monkeypatch.setenv("CLOUD_RUN_TIMEOUT_SECONDS", "1")
monkeypatch.setenv("THREADED_TIMEOUT_ENABLED", "false")
jrmfg marked this conversation as resolved.
Show resolved Hide resolved
source = TEST_FUNCTIONS_DIR / "timeout" / "main.py"
target = "function"

app = create_app(target, source)

options = {}

gunicorn_app = ff_gunicorn.GunicornApplication(
app, TEST_HOST, TEST_PORT, False, **options
)

gunicorn_p = Process(target=gunicorn_app.run)
gunicorn_p.start()

_wait_for_listen(TEST_HOST, TEST_PORT)

result = requests.get("http://{}:{}/".format(TEST_HOST, TEST_PORT))

gunicorn_p.terminate()
gunicorn_p.join()

assert result.status_code == 200


@pytest.mark.skipif("platform.system() == 'Windows'")
@pytest.mark.skipif("platform.system() == 'Darwin'")
@pytest.mark.slow_integration_test
def test_timeout_and_threaded_timeout_enabled_kills(monkeypatch):
monkeypatch.setenv("CLOUD_RUN_TIMEOUT_SECONDS", "1")
monkeypatch.setenv("THREADED_TIMEOUT_ENABLED", "true")
source = TEST_FUNCTIONS_DIR / "timeout" / "main.py"
target = "function"

app = create_app(target, source)

options = {}

gunicorn_app = ff_gunicorn.GunicornApplication(
app, TEST_HOST, TEST_PORT, False, **options
)

gunicorn_p = Process(target=gunicorn_app.run)
gunicorn_p.start()

_wait_for_listen(TEST_HOST, TEST_PORT)

result = requests.get("http://{}:{}/".format(TEST_HOST, TEST_PORT))

gunicorn_p.terminate()
gunicorn_p.join()

# Any exception raised in execution is a 500 error. Cloud Functions 1st gen and
# 2nd gen/Cloud Run infrastructure doing the timeout will return a 408 (gen 1)
# or 504 (gen 2/CR) at the infrastructure layer when request timeouts happen,
# and this code will only be available to the user in logs.
assert result.status_code == 500


@pytest.mark.skipif("platform.system() == 'Windows'")
@pytest.mark.skipif("platform.system() == 'Darwin'")
@pytest.mark.slow_integration_test
def test_timeout_and_threaded_timeout_enabled_but_timeout_not_exceeded_doesnt_kill(
monkeypatch,
):
monkeypatch.setenv("CLOUD_RUN_TIMEOUT_SECONDS", "2")
monkeypatch.setenv("THREADED_TIMEOUT_ENABLED", "true")
source = TEST_FUNCTIONS_DIR / "timeout" / "main.py"
target = "function"

app = create_app(target, source)

options = {}

gunicorn_app = ff_gunicorn.GunicornApplication(
app, TEST_HOST, TEST_PORT, False, **options
)

gunicorn_p = Process(target=gunicorn_app.run)
gunicorn_p.start()

_wait_for_listen(TEST_HOST, TEST_PORT)

result = requests.get("http://{}:{}/".format(TEST_HOST, TEST_PORT))

gunicorn_p.terminate()
gunicorn_p.join()

assert result.status_code == 200


@pytest.mark.skip
def _wait_for_listen(host, port, timeout=10):
# Used in tests to make sure that the gunicorn app has booted and is
# listening before sending a test request
start_time = time.perf_counter()
while True:
try:
with socket.create_connection((host, port), timeout=timeout):
break
except OSError as ex:
time.sleep(0.01)
if time.perf_counter() - start_time >= timeout:
raise TimeoutError(
"Waited too long for port {} on host {} to start accepting "
"connections.".format(port, host)
) from ex


@pytest.mark.skip
def _wait_for_no_listen(host, port, timeout=10):
# Used in tests to make sure that the
jrmfg marked this conversation as resolved.
Show resolved Hide resolved
start_time = time.perf_counter()
while True:
try:
with socket.create_connection((host, port), timeout=timeout):
time.sleep(0.01)
if time.perf_counter() - start_time >= timeout:
raise TimeoutError(
"Waited too long for port {} on host {} to stop accepting "
"connections.".format(port, host)
)
except OSError as ex:
break
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ envlist = py{35,36,37,38,39,310}-{ubuntu-latest,macos-latest,windows-latest},lin
usedevelop = true
deps =
docker
pytest-asyncio
pytest-cov
pytest-integration
pretend
Expand Down
Loading