From 50ad2abdb09e97de7c9b2f51fdc8dcd85bc96f5a Mon Sep 17 00:00:00 2001 From: czgu Date: Wed, 11 Oct 2023 17:39:13 -0700 Subject: [PATCH] Default back to multiprocess worker, but with Timeout dynamic for gevent and non-gevent cases --- containers/docker-compose.prod.yml | 2 +- docker-compose.yml | 2 +- package.json | 2 +- .../connection_checker.py | 4 +-- .../select_one_checker.py | 4 +-- .../server/lib/query_executor/clients/hive.py | 4 +-- querybook/server/lib/utils/utils.py | 34 ++++++++++++++++--- .../tests/test_lib/test_utils/test_utils.py | 28 +++++++++++++-- requirements/base.txt | 2 ++ 9 files changed, 65 insertions(+), 17 deletions(-) diff --git a/containers/docker-compose.prod.yml b/containers/docker-compose.prod.yml index 5ced2c9de..ec93107d9 100644 --- a/containers/docker-compose.prod.yml +++ b/containers/docker-compose.prod.yml @@ -23,7 +23,7 @@ services: worker: extends: service: base - command: ./querybook/scripts/runservice prod_worker -P gevent -c 150 -l info -Ofair -n celery@%h + command: ./querybook/scripts/runservice prod_worker -c 150 -l info -Ofair -n celery@%h scheduler: extends: service: base diff --git a/docker-compose.yml b/docker-compose.yml index 98759fd8c..1e36f3d86 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -43,7 +43,7 @@ services: image: *querybook-image tty: true stdin_open: true - command: './querybook/scripts/runservice worker -P gevent -c 5' + command: './querybook/scripts/runservice worker -c 5' volumes: *querybook-volumes depends_on: *querybook-depends-on scheduler: diff --git a/package.json b/package.json index c13a7ae23..3d9e4aeb7 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "querybook", - "version": "3.29.0", + "version": "3.28.3", "description": "A Big Data Webapp", "private": true, "scripts": { diff --git a/querybook/server/lib/engine_status_checker/connection_checker.py b/querybook/server/lib/engine_status_checker/connection_checker.py index 1a18fde43..52a12ff8c 100644 --- a/querybook/server/lib/engine_status_checker/connection_checker.py +++ b/querybook/server/lib/engine_status_checker/connection_checker.py @@ -4,7 +4,7 @@ from .base_checker import BaseEngineStatusChecker, EngineStatus from const.query_execution import QueryEngineStatus from lib.query_executor.base_executor import QueryExecutorBaseClass -from lib.utils.utils import GeventTimeout +from lib.utils.utils import Timeout class ConnectionChecker(BaseEngineStatusChecker): @@ -24,7 +24,7 @@ def check_connection( ) -> EngineStatus: result: EngineStatus = {"status": QueryEngineStatus.GOOD.value, "messages": []} try: - with GeventTimeout(20, "Connection took too long"): + with Timeout(20, "Connection took too long"): cursor = executor._get_client(client_settings).cursor() utc_now_str = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") result["messages"].append( diff --git a/querybook/server/lib/engine_status_checker/select_one_checker.py b/querybook/server/lib/engine_status_checker/select_one_checker.py index 88a096868..c41986937 100644 --- a/querybook/server/lib/engine_status_checker/select_one_checker.py +++ b/querybook/server/lib/engine_status_checker/select_one_checker.py @@ -5,7 +5,7 @@ from const.query_execution import QueryEngineStatus from lib.query_executor.base_executor import QueryExecutorBaseClass from lib.query_executor.base_client import CursorBaseClass -from lib.utils.utils import GeventTimeout, TimeoutError +from lib.utils.utils import Timeout, TimeoutError class SelectOneChecker(BaseEngineStatusChecker): @@ -29,7 +29,7 @@ def check_select_one( ) -> EngineStatus: result: EngineStatus = {"status": QueryEngineStatus.GOOD.value, "messages": []} try: - with GeventTimeout(20, "Select 1 took too long"): + with Timeout(20, "Select 1 took too long"): cursor: CursorBaseClass = executor._get_client(client_settings).cursor() cursor.run("select 1") cursor.poll_until_finish() diff --git a/querybook/server/lib/query_executor/clients/hive.py b/querybook/server/lib/query_executor/clients/hive.py index b390a64ea..6d4fd9714 100644 --- a/querybook/server/lib/query_executor/clients/hive.py +++ b/querybook/server/lib/query_executor/clients/hive.py @@ -3,7 +3,7 @@ from pyhive import hive from TCLIService.ttypes import TOperationState -from lib.utils.utils import GeventTimeout +from lib.utils.utils import Timeout from lib.query_executor.base_client import ClientBaseClass, CursorBaseClass from lib.query_executor.connection_string.hive import get_hive_connection_conf @@ -21,7 +21,7 @@ def __init__( *args, **kwargs ): - with GeventTimeout(120, "Timeout connecting to HiveServer"): + with Timeout(120, "Timeout connecting to HiveServer"): connection_conf = get_hive_connection_conf(connection_string) port = 10000 if not connection_conf.port else connection_conf.port diff --git a/querybook/server/lib/utils/utils.py b/querybook/server/lib/utils/utils.py index bc31ba08c..ec966607d 100644 --- a/querybook/server/lib/utils/utils.py +++ b/querybook/server/lib/utils/utils.py @@ -95,11 +95,33 @@ class TimeoutError(Exception): pass +def is_gevent_monkey_patched(): + try: + from gevent import monkey + except ImportError: + return False + else: + # Choose a random library to test if it's patched + return monkey.is_module_patched("socket") + + +@contextmanager +def Timeout(sec: Union[int, float] = 1, custom_error_message: Optional[str] = None): + if is_gevent_monkey_patched(): + with GeventTimeout(sec, custom_error_message=custom_error_message): + yield + else: + with SignalTimeout(sec, custom_error_message=custom_error_message): + yield + + @contextmanager def GeventTimeout( sec: Union[int, float] = 1, custom_error_message: Optional[str] = None ): - """This timeout function can be used in gevent celery worker or the web server (which is powered by gevent)""" + """ + This timeout function can be used in gevent celery worker or the web server (which is powered by gevent) + """ error_message = custom_error_message or f"Timeout Exception: {sec} seconds" timeout = gevent.Timeout(sec, TimeoutError(error_message)) @@ -112,17 +134,19 @@ def GeventTimeout( # Deprecated: use GeventTimeout if possible, the Timeout would break in gevent worker -class Timeout: - def __init__(self, sec, custom_error_message=None): +class SignalTimeout: + def __init__( + self, sec: Union[int, float], custom_error_message: Optional[str] = None + ): self.error_message = custom_error_message or f"Timeout Exception: {sec} seconds" self.sec = sec def __enter__(self): signal.signal(signal.SIGALRM, self.raise_timeout) - signal.alarm(self.sec) + signal.setitimer(signal.ITIMER_REAL, self.sec) def __exit__(self, *args): - signal.alarm(0) # disable alarm + signal.setitimer(signal.ITIMER_REAL, 0) # disable alarm def raise_timeout(self, *args): raise TimeoutError(self.error_message) diff --git a/querybook/tests/test_lib/test_utils/test_utils.py b/querybook/tests/test_lib/test_utils/test_utils.py index 6b0e5db33..970b49721 100644 --- a/querybook/tests/test_lib/test_utils/test_utils.py +++ b/querybook/tests/test_lib/test_utils/test_utils.py @@ -2,10 +2,32 @@ import pytest -from lib.utils.utils import GeventTimeout, TimeoutError +from lib.utils.utils import GeventTimeout, SignalTimeout, TimeoutError -def test_timeout(): +def test_gevent_timeout(): with pytest.raises(TimeoutError): with GeventTimeout(0.1): - time.sleep(1) + time.sleep(0.2) + + +def test_gevent_no_timeout(): + try: + with GeventTimeout(0.2): + time.sleep(0.1) + except TimeoutError: + pytest.fail("TimeoutError raised when it shouldn't") + + +def test_signal_timeout(): + with pytest.raises(TimeoutError): + with SignalTimeout(0.1): + time.sleep(0.2) + + +def test_signal_no_timeout(): + try: + with SignalTimeout(0.2): + time.sleep(0.1) + except TimeoutError: + pytest.fail("TimeoutError raised when it shouldn't") diff --git a/requirements/base.txt b/requirements/base.txt index 80a81898c..b5975d040 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -22,6 +22,8 @@ Jinja2==3.1.2 # From Flask # Celery celery==5.2.7 +kombu==5.3.1 # not a direct dependency (from celery), pinned by due to bug: https://github.com/celery/kombu/issues/1785 + # Ops pyyaml==5.4.1