From 5568329885647532d45153dd9ba12e3b59da600c Mon Sep 17 00:00:00 2001 From: auxten Date: Sun, 23 Jul 2023 21:12:47 +0800 Subject: [PATCH] Fix 0.11 issues2 (#67) * Tests need psutil * Check thread count only in standalone test run * Ignore --- .github/workflows/build_wheels.yml | 6 +- .gitignore | 8 +- chdb/session/__init__.py | 1 + chdb/session/state.py | 38 ++++++++++ src/Common/ThreadPool.cpp | 2 +- src/Disks/IO/ThreadPoolReader.cpp | 2 +- src/Disks/IO/ThreadPoolRemoteFSReader.cpp | 2 +- src/Interpreters/Context.cpp | 6 +- tests/test_final_join.py | 26 +++++++ tests/test_gc.py | 41 +++++++++++ tests/test_stateful.py | 89 +++++++++++++++++++++++ 11 files changed, 206 insertions(+), 15 deletions(-) create mode 100644 chdb/session/__init__.py create mode 100644 chdb/session/state.py create mode 100644 tests/test_final_join.py create mode 100644 tests/test_gc.py create mode 100644 tests/test_stateful.py diff --git a/.github/workflows/build_wheels.yml b/.github/workflows/build_wheels.yml index 75f3f13eb48..f4a1e51a776 100644 --- a/.github/workflows/build_wheels.yml +++ b/.github/workflows/build_wheels.yml @@ -132,7 +132,7 @@ jobs: - name: Run tests run: | python3 -m pip install dist/*.whl - python3 -m pip install pandas pyarrow + python3 -m pip install pandas pyarrow psutil python3 -c "import chdb; res = chdb.query('select 1112222222,555', 'CSV'); print(res)" make test continue-on-error: false @@ -249,7 +249,7 @@ jobs: - name: Run tests run: | python3 -m pip install dist/*.whl - python3 -m pip install pandas pyarrow + python3 -m pip install pandas pyarrow psutil python3 -c "import chdb; res = chdb.query('select 1112222222,555', 'CSV'); print(res)" make test continue-on-error: false @@ -344,7 +344,7 @@ jobs: CIBW_BEFORE_BUILD: "pip install -U pip tox pybind11 && bash -x gen_manifest.sh && bash chdb/build.sh" CIBW_BUILD_VERBOSITY: 3 CIBW_BUILD: "cp38-macosx_x86_64 cp39-macosx_x86_64 cp310-macosx_x86_64" - CIBW_TEST_REQUIRES: "pyarrow pandas" + CIBW_TEST_REQUIRES: "pyarrow pandas psutil" CIBW_TEST_COMMAND: "cd {project} && make test" - name: Keep killall ccache and wait for ccache to finish if: always() diff --git a/.gitignore b/.gitignore index 3061de7c92d..4c47f1144a7 100644 --- a/.gitignore +++ b/.gitignore @@ -12,17 +12,13 @@ *.logrt /python_pkg/ +/tmps state_tmp_l3jfk /chdb-0.*/ *.strings /arrow1100 test_main -/buildlib -/builddbg -/buildx86 -/build -/build_* -/build-* +/build* /bench /tests/venv /obj-x86_64-linux-gnu/ diff --git a/chdb/session/__init__.py b/chdb/session/__init__.py new file mode 100644 index 00000000000..97ee55ff812 --- /dev/null +++ b/chdb/session/__init__.py @@ -0,0 +1 @@ +from .state import * \ No newline at end of file diff --git a/chdb/session/state.py b/chdb/session/state.py new file mode 100644 index 00000000000..c5d8d9ac924 --- /dev/null +++ b/chdb/session/state.py @@ -0,0 +1,38 @@ +import tempfile +import shutil + +from chdb import query_stateful + + +class Session(): + """ + Session will keep the state of query. All DDL and DML state will be kept in a dir. + Dir path could be passed in as an argument. If not, a temporary dir will be created. + + If path is not specified, the temporary dir will be deleted when the Session object is deleted. + Otherwise path will be kept. + + Note: The default database is "_local" and the default engine is "Memory" which means all data + will be stored in memory. If you want to store data in disk, you should create another database. + """ + + def __init__(self, path=None): + if path is None: + self._cleanup = True + self._path = tempfile.mkdtemp() + else: + self._cleanup = False + self._path = path + + def __del__(self): + if self._cleanup: + self.cleanup() + + def cleanup(self): + shutil.rmtree(self._path) + + def query(self, sql, fmt="CSV"): + """ + Execute a query. + """ + return query_stateful(sql, fmt, path=self._path) diff --git a/src/Common/ThreadPool.cpp b/src/Common/ThreadPool.cpp index b05be3640f3..f1d15b7fe4e 100644 --- a/src/Common/ThreadPool.cpp +++ b/src/Common/ThreadPool.cpp @@ -76,7 +76,7 @@ ThreadPoolImpl::ThreadPoolImpl( Metric metric_active_threads_, Metric metric_scheduled_jobs_, size_t max_threads_) - : ThreadPoolImpl(metric_threads_, metric_active_threads_, metric_scheduled_jobs_, max_threads_, max_threads_, max_threads_) + : ThreadPoolImpl(metric_threads_, metric_active_threads_, metric_scheduled_jobs_, max_threads_, 0, max_threads_) { } diff --git a/src/Disks/IO/ThreadPoolReader.cpp b/src/Disks/IO/ThreadPoolReader.cpp index da31388a8ea..947ccb9ed4d 100644 --- a/src/Disks/IO/ThreadPoolReader.cpp +++ b/src/Disks/IO/ThreadPoolReader.cpp @@ -92,7 +92,7 @@ static bool hasBugInPreadV2() #endif ThreadPoolReader::ThreadPoolReader(size_t pool_size, size_t queue_size_) - : pool(std::make_unique(CurrentMetrics::ThreadPoolFSReaderThreads, CurrentMetrics::ThreadPoolFSReaderThreadsActive, CurrentMetrics::ThreadPoolFSReaderThreadsScheduled, pool_size, pool_size, queue_size_)) + : pool(std::make_unique(CurrentMetrics::ThreadPoolFSReaderThreads, CurrentMetrics::ThreadPoolFSReaderThreadsActive, CurrentMetrics::ThreadPoolFSReaderThreadsScheduled, pool_size, 0, queue_size_)) { } diff --git a/src/Disks/IO/ThreadPoolRemoteFSReader.cpp b/src/Disks/IO/ThreadPoolRemoteFSReader.cpp index 2df087e941f..1dea7aad655 100644 --- a/src/Disks/IO/ThreadPoolRemoteFSReader.cpp +++ b/src/Disks/IO/ThreadPoolRemoteFSReader.cpp @@ -67,7 +67,7 @@ ThreadPoolRemoteFSReader::ThreadPoolRemoteFSReader(size_t pool_size, size_t queu : pool(std::make_unique(CurrentMetrics::ThreadPoolRemoteFSReaderThreads, CurrentMetrics::ThreadPoolRemoteFSReaderThreadsActive, CurrentMetrics::ThreadPoolRemoteFSReaderThreadsScheduled, - pool_size, pool_size, queue_size_)) + pool_size, 0, queue_size_)) { } diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index dc2709d6a12..c1488ab4c20 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -2913,7 +2913,7 @@ ThreadPool & Context::getLoadMarksThreadpool() const auto pool_size = config.getUInt(".load_marks_threadpool_pool_size", 50); auto queue_size = config.getUInt(".load_marks_threadpool_queue_size", 1000000); shared->load_marks_threadpool = std::make_unique( - CurrentMetrics::MarksLoaderThreads, CurrentMetrics::MarksLoaderThreadsActive, CurrentMetrics::MarksLoaderThreadsScheduled, pool_size, pool_size, queue_size); + CurrentMetrics::MarksLoaderThreads, CurrentMetrics::MarksLoaderThreadsActive, CurrentMetrics::MarksLoaderThreadsScheduled, pool_size, 0, queue_size); }); return *shared->load_marks_threadpool; @@ -3108,7 +3108,7 @@ ThreadPool & Context::getPrefetchThreadpool() const auto pool_size = config.getUInt(".prefetch_threadpool_pool_size", 100); auto queue_size = config.getUInt(".prefetch_threadpool_queue_size", 1000000); shared->prefetch_threadpool = std::make_unique( - CurrentMetrics::IOPrefetchThreads, CurrentMetrics::IOPrefetchThreadsActive, CurrentMetrics::IOPrefetchThreadsScheduled, pool_size, pool_size, queue_size); + CurrentMetrics::IOPrefetchThreads, CurrentMetrics::IOPrefetchThreadsActive, CurrentMetrics::IOPrefetchThreadsScheduled, pool_size, 0, queue_size); }); return *shared->prefetch_threadpool; @@ -5248,7 +5248,7 @@ ThreadPool & Context::getThreadPoolWriter() const auto queue_size = config.getUInt(".threadpool_writer_queue_size", 1000000); shared->threadpool_writer = std::make_unique( - CurrentMetrics::IOWriterThreads, CurrentMetrics::IOWriterThreadsActive, CurrentMetrics::IOWriterThreadsScheduled, pool_size, pool_size, queue_size); + CurrentMetrics::IOWriterThreads, CurrentMetrics::IOWriterThreadsActive, CurrentMetrics::IOWriterThreadsScheduled, pool_size, 0, queue_size); }); return *shared->threadpool_writer; diff --git a/tests/test_final_join.py b/tests/test_final_join.py new file mode 100644 index 00000000000..550f86ad289 --- /dev/null +++ b/tests/test_final_join.py @@ -0,0 +1,26 @@ +#!python3 + +import unittest +import psutil +from chdb import session + + +current_process = psutil.Process() +check_thread_count = False + + +class TestStateful(unittest.TestCase): + def test_zfree_thread_count(self): + sess2 = session.Session() + ret = sess2.query("SELECT sleep(2)", "Debug") + # self.assertEqual(str(ret), "") + thread_count = current_process.num_threads() + print("Number of threads using psutil library: ", thread_count) + if check_thread_count: + self.assertEqual(thread_count, 1) + + +if __name__ == "__main__": + check_thread_count = True + unittest.main() + diff --git a/tests/test_gc.py b/tests/test_gc.py new file mode 100644 index 00000000000..fd8c362b0e2 --- /dev/null +++ b/tests/test_gc.py @@ -0,0 +1,41 @@ +#!python3 + +import unittest +import gc +import chdb + +class TestGC(unittest.TestCase): + def test_gc(self): + print("query started") + gc.set_debug(gc.DEBUG_STATS) + + ret = chdb.query("SELECT 123,'adbcd'", 'CSV') + # print("ret:", ret) + # print("ret type:", type(ret)) + self.assertEqual(str(ret), '123,"adbcd"\n') + gc.collect() + + mv = ret.get_memview() + self.assertIsNotNone(mv) + gc.collect() + + self.assertEqual(len(mv), 12) + + out = mv.tobytes() + self.assertEqual(out, b'123,"adbcd"\n') + + ret2 = chdb.query("SELECT 123,'adbcdefg'", 'CSV').get_memview().tobytes() + self.assertEqual(ret2, b'123,"adbcdefg"\n') + + mv2 = chdb.query("SELECT 123,'adbcdefg'", 'CSV').get_memview() + gc.collect() + + self.assertEqual(mv2.tobytes(), b'123,"adbcdefg"\n') + + mv3 = mv2.view() + gc.collect() + self.assertEqual(mv3.tobytes(), b'123,"adbcdefg"\n') + self.assertEqual(len(mv3), 15) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_stateful.py b/tests/test_stateful.py new file mode 100644 index 00000000000..d212968adb7 --- /dev/null +++ b/tests/test_stateful.py @@ -0,0 +1,89 @@ +#!python3 + +import time +import shutil +import psutil +import unittest +from chdb import session + + +test_state_dir = ".state_tmp_auxten_" +current_process = psutil.Process() +check_thread_count = False + +class TestStateful(unittest.TestCase): + def setUp(self) -> None: + shutil.rmtree(test_state_dir, ignore_errors=True) + return super().setUp() + + def tearDown(self) -> None: + shutil.rmtree(test_state_dir, ignore_errors=True) + return super().tearDown() + + def test_path(self): + sess = session.Session(test_state_dir) + sess.query("CREATE FUNCTION chdb_xxx AS () -> '0.12.0'", "CSV") + ret = sess.query("SELECT chdb_xxx()", "CSV") + self.assertEqual(str(ret), '"0.12.0"\n') + + sess.query("CREATE DATABASE IF NOT EXISTS db_xxx ENGINE = Atomic", "CSV") + ret = sess.query("SHOW DATABASES", "CSV") + self.assertIn("db_xxx", str(ret)) + + sess.query( + "CREATE TABLE IF NOT EXISTS db_xxx.log_table_xxx (x UInt8) ENGINE = Log;" + ) + sess.query("INSERT INTO db_xxx.log_table_xxx VALUES (1), (2), (3), (4);") + + sess.query( + "CREATE VIEW db_xxx.view_xxx AS SELECT * FROM db_xxx.log_table_xxx LIMIT 2;" + ) + ret = sess.query("SELECT * FROM db_xxx.view_xxx", "CSV") + self.assertEqual(str(ret), "1\n2\n") + + del sess # name sess dir will not be deleted + + sess = session.Session(test_state_dir) + ret = sess.query("SELECT chdb_xxx()", "CSV") + self.assertEqual(str(ret), '"0.12.0"\n') + + ret = sess.query("SHOW DATABASES", "CSV") + self.assertIn("db_xxx", str(ret)) + + ret = sess.query("SELECT * FROM db_xxx.log_table_xxx", "CSV") + self.assertEqual(str(ret), "1\n2\n3\n4\n") + + # reuse session + sess2 = session.Session(test_state_dir) + + ret = sess2.query("SELECT chdb_xxx()", "CSV") + self.assertEqual(str(ret), '"0.12.0"\n') + + # remove session dir + sess2.cleanup() + ret = sess2.query("SELECT chdb_xxx()", "CSV") + self.assertEqual(str(ret), "") + + def test_tmp(self): + sess = session.Session() + sess.query("CREATE FUNCTION chdb_xxx AS () -> '0.12.0'", "CSV") + ret = sess.query("SELECT chdb_xxx()", "CSV") + self.assertEqual(str(ret), '"0.12.0"\n') + del sess + + # another session + sess2 = session.Session() + ret = sess2.query("SELECT chdb_xxx()", "CSV") + self.assertEqual(str(ret), "") + + def test_zfree_thread_count(self): + time.sleep(3) + thread_count = current_process.num_threads() + print("Number of threads using psutil library: ", thread_count) + if check_thread_count: + self.assertEqual(thread_count, 1) + +if __name__ == "__main__": + shutil.rmtree(test_state_dir, ignore_errors=True) + check_thread_count = True + unittest.main()