Skip to content

Commit

Permalink
Default back to multiprocess worker, but with Timeout dynamic for gev…
Browse files Browse the repository at this point in the history
…ent and non-gevent cases
  • Loading branch information
czgu committed Oct 12, 2023
1 parent 3cad408 commit 50ad2ab
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 17 deletions.
2 changes: 1 addition & 1 deletion containers/docker-compose.prod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ services:
worker:
extends:
service: base
command: ./querybook/scripts/runservice prod_worker -P gevent -c 150 -l info -Ofair -n celery@%h
command: ./querybook/scripts/runservice prod_worker -c 150 -l info -Ofair -n celery@%h
scheduler:
extends:
service: base
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ services:
image: *querybook-image
tty: true
stdin_open: true
command: './querybook/scripts/runservice worker -P gevent -c 5'
command: './querybook/scripts/runservice worker -c 5'
volumes: *querybook-volumes
depends_on: *querybook-depends-on
scheduler:
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "querybook",
"version": "3.29.0",
"version": "3.28.3",
"description": "A Big Data Webapp",
"private": true,
"scripts": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from .base_checker import BaseEngineStatusChecker, EngineStatus
from const.query_execution import QueryEngineStatus
from lib.query_executor.base_executor import QueryExecutorBaseClass
from lib.utils.utils import GeventTimeout
from lib.utils.utils import Timeout


class ConnectionChecker(BaseEngineStatusChecker):
Expand All @@ -24,7 +24,7 @@ def check_connection(
) -> EngineStatus:
result: EngineStatus = {"status": QueryEngineStatus.GOOD.value, "messages": []}
try:
with GeventTimeout(20, "Connection took too long"):
with Timeout(20, "Connection took too long"):
cursor = executor._get_client(client_settings).cursor()
utc_now_str = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
result["messages"].append(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from const.query_execution import QueryEngineStatus
from lib.query_executor.base_executor import QueryExecutorBaseClass
from lib.query_executor.base_client import CursorBaseClass
from lib.utils.utils import GeventTimeout, TimeoutError
from lib.utils.utils import Timeout, TimeoutError


class SelectOneChecker(BaseEngineStatusChecker):
Expand All @@ -29,7 +29,7 @@ def check_select_one(
) -> EngineStatus:
result: EngineStatus = {"status": QueryEngineStatus.GOOD.value, "messages": []}
try:
with GeventTimeout(20, "Select 1 took too long"):
with Timeout(20, "Select 1 took too long"):
cursor: CursorBaseClass = executor._get_client(client_settings).cursor()
cursor.run("select 1")
cursor.poll_until_finish()
Expand Down
4 changes: 2 additions & 2 deletions querybook/server/lib/query_executor/clients/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from pyhive import hive
from TCLIService.ttypes import TOperationState
from lib.utils.utils import GeventTimeout
from lib.utils.utils import Timeout
from lib.query_executor.base_client import ClientBaseClass, CursorBaseClass
from lib.query_executor.connection_string.hive import get_hive_connection_conf

Expand All @@ -21,7 +21,7 @@ def __init__(
*args,
**kwargs
):
with GeventTimeout(120, "Timeout connecting to HiveServer"):
with Timeout(120, "Timeout connecting to HiveServer"):
connection_conf = get_hive_connection_conf(connection_string)

port = 10000 if not connection_conf.port else connection_conf.port
Expand Down
34 changes: 29 additions & 5 deletions querybook/server/lib/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,33 @@ class TimeoutError(Exception):
pass


def is_gevent_monkey_patched():
try:
from gevent import monkey
except ImportError:
return False
else:
# Choose a random library to test if it's patched
return monkey.is_module_patched("socket")


@contextmanager
def Timeout(sec: Union[int, float] = 1, custom_error_message: Optional[str] = None):
if is_gevent_monkey_patched():
with GeventTimeout(sec, custom_error_message=custom_error_message):
yield
else:
with SignalTimeout(sec, custom_error_message=custom_error_message):
yield


@contextmanager
def GeventTimeout(
sec: Union[int, float] = 1, custom_error_message: Optional[str] = None
):
"""This timeout function can be used in gevent celery worker or the web server (which is powered by gevent)"""
"""
This timeout function can be used in gevent celery worker or the web server (which is powered by gevent)
"""

error_message = custom_error_message or f"Timeout Exception: {sec} seconds"
timeout = gevent.Timeout(sec, TimeoutError(error_message))
Expand All @@ -112,17 +134,19 @@ def GeventTimeout(


# Deprecated: use GeventTimeout if possible, the Timeout would break in gevent worker
class Timeout:
def __init__(self, sec, custom_error_message=None):
class SignalTimeout:
def __init__(
self, sec: Union[int, float], custom_error_message: Optional[str] = None
):
self.error_message = custom_error_message or f"Timeout Exception: {sec} seconds"
self.sec = sec

def __enter__(self):
signal.signal(signal.SIGALRM, self.raise_timeout)
signal.alarm(self.sec)
signal.setitimer(signal.ITIMER_REAL, self.sec)

def __exit__(self, *args):
signal.alarm(0) # disable alarm
signal.setitimer(signal.ITIMER_REAL, 0) # disable alarm

def raise_timeout(self, *args):
raise TimeoutError(self.error_message)
Expand Down
28 changes: 25 additions & 3 deletions querybook/tests/test_lib/test_utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,32 @@

import pytest

from lib.utils.utils import GeventTimeout, TimeoutError
from lib.utils.utils import GeventTimeout, SignalTimeout, TimeoutError


def test_timeout():
def test_gevent_timeout():
with pytest.raises(TimeoutError):
with GeventTimeout(0.1):
time.sleep(1)
time.sleep(0.2)


def test_gevent_no_timeout():
try:
with GeventTimeout(0.2):
time.sleep(0.1)
except TimeoutError:
pytest.fail("TimeoutError raised when it shouldn't")


def test_signal_timeout():
with pytest.raises(TimeoutError):
with SignalTimeout(0.1):
time.sleep(0.2)


def test_signal_no_timeout():
try:
with SignalTimeout(0.2):
time.sleep(0.1)
except TimeoutError:
pytest.fail("TimeoutError raised when it shouldn't")
2 changes: 2 additions & 0 deletions requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ Jinja2==3.1.2 # From Flask

# Celery
celery==5.2.7
kombu==5.3.1 # not a direct dependency (from celery), pinned by due to bug: https://github.com/celery/kombu/issues/1785


# Ops
pyyaml==5.4.1
Expand Down

0 comments on commit 50ad2ab

Please sign in to comment.