diff --git a/tests/test_issue31.py b/tests/test_issue31.py new file mode 100644 index 00000000000..e83d5d931a8 --- /dev/null +++ b/tests/test_issue31.py @@ -0,0 +1,52 @@ +#!python3 + +import os +import time +import unittest +import chdb +import zipfile +import urllib.request + +from timeout_decorator import timeout + +csv_url = "https://media.githubusercontent.com/media/datablist/sample-csv-files/main/files/organizations/organizations-2000000.zip" + + +# download csv file, and unzip it +def download_and_extract(url, save_path): + print("Downloading file...") + urllib.request.urlretrieve(url, save_path) + + print("Extracting file...") + with zipfile.ZipFile(save_path, "r") as zip_ref: + zip_ref.extractall(os.path.dirname(save_path)) + + print("Done!") + + +@timeout(20, use_signals=False) +def payload(): + now = time.time() + res = chdb.query( + 'select Name, count(*) cnt from file("organizations-2000000.csv", CSVWithNames) group by Name order by cnt desc', + "CSV", + ) + print(res.get_memview().tobytes().decode("utf-8")) + used_time = time.time() - now + print("used time: ", used_time) + + +class TestAggOnCSVSpeed(unittest.TestCase): + def setUp(self): + download_and_extract(csv_url, "organizations-2000000.zip") + + def tearDown(self): + os.remove("organizations-2000000.csv") + os.remove("organizations-2000000.zip") + + def test_agg(self): + payload() + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/timeout_decorator.py b/tests/timeout_decorator.py new file mode 100644 index 00000000000..c9d1e2bf320 --- /dev/null +++ b/tests/timeout_decorator.py @@ -0,0 +1,176 @@ +""" +Timeout decorator. + + :copyright: (c) 2012-2013 by PN. + :license: MIT, see LICENSE for more details. +""" + +from __future__ import print_function +from __future__ import unicode_literals +from __future__ import division + +import sys +import time +import multiprocessing +import signal +from functools import wraps + +############################################################ +# Timeout +############################################################ + +# http://www.saltycrane.com/blog/2010/04/using-python-timeout-decorator-uploading-s3/ +# Used work of Stephen "Zero" Chappell +# in https://code.google.com/p/verse-quiz/source/browse/trunk/timeout.py + + +class TimeoutError(AssertionError): + + """Thrown when a timeout occurs in the `timeout` context manager.""" + + def __init__(self, value="Timed Out"): + self.value = value + + def __str__(self): + return repr(self.value) + + +def _raise_exception(exception, exception_message): + """ This function checks if a exception message is given. + + If there is no exception message, the default behaviour is maintained. + If there is an exception message, the message is passed to the exception with the 'value' keyword. + """ + if exception_message is None: + raise exception() + else: + raise exception(exception_message) + + +def timeout(seconds=None, use_signals=True, timeout_exception=TimeoutError, exception_message=None): + """Add a timeout parameter to a function and return it. + + :param seconds: optional time limit in seconds or fractions of a second. If None is passed, no timeout is applied. + This adds some flexibility to the usage: you can disable timing out depending on the settings. + :type seconds: float + :param use_signals: flag indicating whether signals should be used for timing function out or the multiprocessing + When using multiprocessing, timeout granularity is limited to 10ths of a second. + :type use_signals: bool + + :raises: TimeoutError if time limit is reached + + It is illegal to pass anything other than a function as the first + parameter. The function is wrapped and returned to the caller. + """ + def decorate(function): + + if use_signals: + def handler(signum, frame): + _raise_exception(timeout_exception, exception_message) + + @wraps(function) + def new_function(*args, **kwargs): + new_seconds = kwargs.pop('timeout', seconds) + if new_seconds: + old = signal.signal(signal.SIGALRM, handler) + signal.setitimer(signal.ITIMER_REAL, new_seconds) + + if not seconds: + return function(*args, **kwargs) + + try: + return function(*args, **kwargs) + finally: + if new_seconds: + signal.setitimer(signal.ITIMER_REAL, 0) + signal.signal(signal.SIGALRM, old) + return new_function + else: + @wraps(function) + def new_function(*args, **kwargs): + timeout_wrapper = _Timeout(function, timeout_exception, exception_message, seconds) + return timeout_wrapper(*args, **kwargs) + return new_function + + return decorate + + +def _target(queue, function, *args, **kwargs): + """Run a function with arguments and return output via a queue. + + This is a helper function for the Process created in _Timeout. It runs + the function with positional arguments and keyword arguments and then + returns the function's output by way of a queue. If an exception gets + raised, it is returned to _Timeout to be raised by the value property. + """ + try: + queue.put((True, function(*args, **kwargs))) + except: + queue.put((False, sys.exc_info()[1])) + + +class _Timeout(object): + + """Wrap a function and add a timeout (limit) attribute to it. + + Instances of this class are automatically generated by the add_timeout + function defined above. Wrapping a function allows asynchronous calls + to be made and termination of execution after a timeout has passed. + """ + + def __init__(self, function, timeout_exception, exception_message, limit): + """Initialize instance in preparation for being called.""" + self.__limit = limit + self.__function = function + self.__timeout_exception = timeout_exception + self.__exception_message = exception_message + self.__name__ = function.__name__ + self.__doc__ = function.__doc__ + self.__timeout = time.time() + self.__process = multiprocessing.Process() + self.__queue = multiprocessing.Queue() + + def __call__(self, *args, **kwargs): + """Execute the embedded function object asynchronously. + + The function given to the constructor is transparently called and + requires that "ready" be intermittently polled. If and when it is + True, the "value" property may then be checked for returned data. + """ + self.__limit = kwargs.pop('timeout', self.__limit) + self.__queue = multiprocessing.Queue(1) + args = (self.__queue, self.__function) + args + self.__process = multiprocessing.Process(target=_target, + args=args, + kwargs=kwargs) + self.__process.daemon = True + self.__process.start() + if self.__limit is not None: + self.__timeout = self.__limit + time.time() + while not self.ready: + time.sleep(0.01) + return self.value + + def cancel(self): + """Terminate any possible execution of the embedded function.""" + if self.__process.is_alive(): + print("Terminating process: %s" % self.__process, file=sys.stderr) + self.__process.kill() + + _raise_exception(self.__timeout_exception, self.__exception_message) + + @property + def ready(self): + """Read-only property indicating status of "value" property.""" + if self.__limit and self.__timeout < time.time(): + self.cancel() + return self.__queue.full() and not self.__queue.empty() + + @property + def value(self): + """Read-only property containing data returned from function.""" + if self.ready is True: + flag, load = self.__queue.get() + if flag: + return load + raise load \ No newline at end of file