diff --git a/src/Processors/Sources/PythonSource.cpp b/src/Processors/Sources/PythonSource.cpp index 0628d24ed2c..d44138ec712 100644 --- a/src/Processors/Sources/PythonSource.cpp +++ b/src/Processors/Sources/PythonSource.cpp @@ -48,6 +48,7 @@ extern const int PY_EXCEPTION_OCCURED; PythonSource::PythonSource( py::object & data_source_, + bool isInheritsFromPyReader_, const Block & sample_block_, PyColumnVecPtr column_cache, size_t data_source_row_count, @@ -56,6 +57,7 @@ PythonSource::PythonSource( size_t num_streams) : ISource(sample_block_.cloneEmpty()) , data_source(data_source_) + , isInheritsFromPyReader(isInheritsFromPyReader_) , sample_block(sample_block_) , column_cache(column_cache) , data_source_row_count(data_source_row_count) @@ -544,7 +546,7 @@ Chunk PythonSource::generate() try { - if (isInheritsFromPyReader(data_source)) + if (isInheritsFromPyReader) { PyObjectVecPtr data; py::gil_scoped_acquire acquire; diff --git a/src/Processors/Sources/PythonSource.h b/src/Processors/Sources/PythonSource.h index c210020db9e..855cbc2d5af 100644 --- a/src/Processors/Sources/PythonSource.h +++ b/src/Processors/Sources/PythonSource.h @@ -26,6 +26,7 @@ class PythonSource : public ISource public: PythonSource( py::object & data_source_, + bool isInheritsFromPyReader_, const Block & sample_block_, PyColumnVecPtr column_cache, size_t data_source_row_count, @@ -42,6 +43,7 @@ class PythonSource : public ISource private: py::object & data_source; // Do not own the reference + bool isInheritsFromPyReader; // If the data_source is a PyReader object Block sample_block; PyColumnVecPtr column_cache; diff --git a/src/Storages/StoragePython.cpp b/src/Storages/StoragePython.cpp index 1bbf5328b9f..72df84f171b 100644 --- a/src/Storages/StoragePython.cpp +++ b/src/Storages/StoragePython.cpp @@ -70,7 +70,8 @@ Pipe StoragePython::read( if (isInheritsFromPyReader(data_source)) { - return Pipe(std::make_shared(data_source, sample_block, column_cache, data_source_row_count, max_block_size, 0, 1)); + return Pipe( + std::make_shared(data_source, true, sample_block, column_cache, data_source_row_count, max_block_size, 0, 1)); } prepareColumnCache(column_names, sample_block.getColumns(), sample_block); @@ -79,7 +80,7 @@ Pipe StoragePython::read( // num_streams = 32; // for chdb testing for (size_t stream = 0; stream < num_streams; ++stream) pipes.emplace_back(std::make_shared( - data_source, sample_block, column_cache, data_source_row_count, max_block_size, stream, num_streams)); + data_source, false, sample_block, column_cache, data_source_row_count, max_block_size, stream, num_streams)); return Pipe::unitePipes(std::move(pipes)); } diff --git a/tests/queries.sql b/tests/queries.sql new file mode 100644 index 00000000000..677894877a1 --- /dev/null +++ b/tests/queries.sql @@ -0,0 +1,43 @@ +SELECT COUNT(*) FROM Python(hits); +SELECT COUNT(*) FROM Python(hits) WHERE AdvEngineID <> 0; +SELECT SUM(AdvEngineID), COUNT(*), AVG(ResolutionWidth) FROM Python(hits); +SELECT AVG(UserID) FROM Python(hits); +SELECT COUNT(DISTINCT UserID) FROM Python(hits); +SELECT COUNT(DISTINCT SearchPhrase) FROM Python(hits); +SELECT MIN(EventDate), MAX(EventDate) FROM Python(hits); +SELECT AdvEngineID, COUNT(*) FROM Python(hits) WHERE AdvEngineID <> 0 GROUP BY AdvEngineID ORDER BY COUNT(*) DESC; +SELECT RegionID, COUNT(DISTINCT UserID) AS u FROM Python(hits) GROUP BY RegionID ORDER BY u DESC LIMIT 10; +SELECT RegionID, SUM(AdvEngineID), COUNT(*) AS c, AVG(ResolutionWidth), COUNT(DISTINCT UserID) FROM Python(hits) GROUP BY RegionID ORDER BY c DESC LIMIT 10; +SELECT MobilePhoneModel, COUNT(DISTINCT UserID) AS u FROM Python(hits) WHERE MobilePhoneModel <> '' GROUP BY MobilePhoneModel ORDER BY u DESC LIMIT 10; +SELECT MobilePhone, MobilePhoneModel, COUNT(DISTINCT UserID) AS u FROM Python(hits) WHERE MobilePhoneModel <> '' GROUP BY MobilePhone, MobilePhoneModel ORDER BY u DESC LIMIT 10; +SELECT SearchPhrase, COUNT(*) AS c FROM Python(hits) WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10; +SELECT SearchPhrase, COUNT(DISTINCT UserID) AS u FROM Python(hits) WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY u DESC LIMIT 10; +SELECT SearchEngineID, SearchPhrase, COUNT(*) AS c FROM Python(hits) WHERE SearchPhrase <> '' GROUP BY SearchEngineID, SearchPhrase ORDER BY c DESC LIMIT 10; +SELECT UserID, COUNT(*) FROM Python(hits) GROUP BY UserID ORDER BY COUNT(*) DESC LIMIT 10; +SELECT UserID, SearchPhrase, COUNT(*) FROM Python(hits) GROUP BY UserID, SearchPhrase ORDER BY COUNT(*) DESC LIMIT 10; +SELECT UserID, SearchPhrase, COUNT(*) FROM Python(hits) GROUP BY UserID, SearchPhrase LIMIT 10; +SELECT UserID, extract(minute FROM EventTime) AS m, SearchPhrase, COUNT(*) FROM Python(hits) GROUP BY UserID, m, SearchPhrase ORDER BY COUNT(*) DESC LIMIT 10; +SELECT UserID FROM Python(hits) WHERE UserID = 435090932899640449; +SELECT COUNT(*) FROM Python(hits) WHERE URL LIKE '%google%'; +SELECT SearchPhrase, MIN(URL), COUNT(*) AS c FROM Python(hits) WHERE URL LIKE '%google%' AND SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10; +SELECT SearchPhrase, MIN(URL), MIN(Title), COUNT(*) AS c, COUNT(DISTINCT UserID) FROM Python(hits) WHERE Title LIKE '%Google%' AND URL NOT LIKE '%.google.%' AND SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10; +SELECT * FROM Python(hits) WHERE URL LIKE '%google%' ORDER BY EventTime LIMIT 10; +SELECT SearchPhrase FROM Python(hits) WHERE SearchPhrase <> '' ORDER BY EventTime LIMIT 10; +SELECT SearchPhrase FROM Python(hits) WHERE SearchPhrase <> '' ORDER BY SearchPhrase LIMIT 10; +SELECT SearchPhrase FROM Python(hits) WHERE SearchPhrase <> '' ORDER BY EventTime, SearchPhrase LIMIT 10; +SELECT CounterID, AVG(length(URL)) AS l, COUNT(*) AS c FROM Python(hits) WHERE URL <> '' GROUP BY CounterID HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25; +SELECT REGEXP_REPLACE(Referer, '^https?://(?:www\.)?([^/]+)/.*$', '\1') AS k, AVG(length(Referer)) AS l, COUNT(*) AS c, MIN(Referer) FROM Python(hits) WHERE Referer <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25; +SELECT SUM(ResolutionWidth), SUM(ResolutionWidth + 1), SUM(ResolutionWidth + 2), SUM(ResolutionWidth + 3), SUM(ResolutionWidth + 4), SUM(ResolutionWidth + 5), SUM(ResolutionWidth + 6), SUM(ResolutionWidth + 7), SUM(ResolutionWidth + 8), SUM(ResolutionWidth + 9), SUM(ResolutionWidth + 10), SUM(ResolutionWidth + 11), SUM(ResolutionWidth + 12), SUM(ResolutionWidth + 13), SUM(ResolutionWidth + 14), SUM(ResolutionWidth + 15), SUM(ResolutionWidth + 16), SUM(ResolutionWidth + 17), SUM(ResolutionWidth + 18), SUM(ResolutionWidth + 19), SUM(ResolutionWidth + 20), SUM(ResolutionWidth + 21), SUM(ResolutionWidth + 22), SUM(ResolutionWidth + 23), SUM(ResolutionWidth + 24), SUM(ResolutionWidth + 25), SUM(ResolutionWidth + 26), SUM(ResolutionWidth + 27), SUM(ResolutionWidth + 28), SUM(ResolutionWidth + 29), SUM(ResolutionWidth + 30), SUM(ResolutionWidth + 31), SUM(ResolutionWidth + 32), SUM(ResolutionWidth + 33), SUM(ResolutionWidth + 34), SUM(ResolutionWidth + 35), SUM(ResolutionWidth + 36), SUM(ResolutionWidth + 37), SUM(ResolutionWidth + 38), SUM(ResolutionWidth + 39), SUM(ResolutionWidth + 40), SUM(ResolutionWidth + 41), SUM(ResolutionWidth + 42), SUM(ResolutionWidth + 43), SUM(ResolutionWidth + 44), SUM(ResolutionWidth + 45), SUM(ResolutionWidth + 46), SUM(ResolutionWidth + 47), SUM(ResolutionWidth + 48), SUM(ResolutionWidth + 49), SUM(ResolutionWidth + 50), SUM(ResolutionWidth + 51), SUM(ResolutionWidth + 52), SUM(ResolutionWidth + 53), SUM(ResolutionWidth + 54), SUM(ResolutionWidth + 55), SUM(ResolutionWidth + 56), SUM(ResolutionWidth + 57), SUM(ResolutionWidth + 58), SUM(ResolutionWidth + 59), SUM(ResolutionWidth + 60), SUM(ResolutionWidth + 61), SUM(ResolutionWidth + 62), SUM(ResolutionWidth + 63), SUM(ResolutionWidth + 64), SUM(ResolutionWidth + 65), SUM(ResolutionWidth + 66), SUM(ResolutionWidth + 67), SUM(ResolutionWidth + 68), SUM(ResolutionWidth + 69), SUM(ResolutionWidth + 70), SUM(ResolutionWidth + 71), SUM(ResolutionWidth + 72), SUM(ResolutionWidth + 73), SUM(ResolutionWidth + 74), SUM(ResolutionWidth + 75), SUM(ResolutionWidth + 76), SUM(ResolutionWidth + 77), SUM(ResolutionWidth + 78), SUM(ResolutionWidth + 79), SUM(ResolutionWidth + 80), SUM(ResolutionWidth + 81), SUM(ResolutionWidth + 82), SUM(ResolutionWidth + 83), SUM(ResolutionWidth + 84), SUM(ResolutionWidth + 85), SUM(ResolutionWidth + 86), SUM(ResolutionWidth + 87), SUM(ResolutionWidth + 88), SUM(ResolutionWidth + 89) FROM Python(hits); +SELECT SearchEngineID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM Python(hits) WHERE SearchPhrase <> '' GROUP BY SearchEngineID, ClientIP ORDER BY c DESC LIMIT 10; +SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM Python(hits) WHERE SearchPhrase <> '' GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10; +SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM Python(hits) GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10; +SELECT URL, COUNT(*) AS c FROM Python(hits) GROUP BY URL ORDER BY c DESC LIMIT 10; +SELECT 1, URL, COUNT(*) AS c FROM Python(hits) GROUP BY 1, URL ORDER BY c DESC LIMIT 10; +SELECT ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3, COUNT(*) AS c FROM Python(hits) GROUP BY ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3 ORDER BY c DESC LIMIT 10; +SELECT URL, COUNT(*) AS PageViews FROM Python(hits) WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND DontCountHits = 0 AND IsRefresh = 0 AND URL <> '' GROUP BY URL ORDER BY PageViews DESC LIMIT 10; +SELECT Title, COUNT(*) AS PageViews FROM Python(hits) WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND DontCountHits = 0 AND IsRefresh = 0 AND Title <> '' GROUP BY Title ORDER BY PageViews DESC LIMIT 10; +SELECT URL, COUNT(*) AS PageViews FROM Python(hits) WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND IsLink <> 0 AND IsDownload = 0 GROUP BY URL ORDER BY PageViews DESC LIMIT 10 OFFSET 1000; +SELECT TraficSourceID, SearchEngineID, AdvEngineID, CASE WHEN (SearchEngineID = 0 AND AdvEngineID = 0) THEN Referer ELSE '' END AS Src, URL AS Dst, COUNT(*) AS PageViews FROM Python(hits) WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 GROUP BY TraficSourceID, SearchEngineID, AdvEngineID, Src, Dst ORDER BY PageViews DESC LIMIT 10 OFFSET 1000; +SELECT URLHash, EventDate, COUNT(*) AS PageViews FROM Python(hits) WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND TraficSourceID IN (-1, 6) AND RefererHash = 3594120000172545465 GROUP BY URLHash, EventDate ORDER BY PageViews DESC LIMIT 10 OFFSET 100; +SELECT WindowClientWidth, WindowClientHeight, COUNT(*) AS PageViews FROM Python(hits) WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND DontCountHits = 0 AND URLHash = 2868770270353813622 GROUP BY WindowClientWidth, WindowClientHeight ORDER BY PageViews DESC LIMIT 10 OFFSET 10000; +SELECT DATE_TRUNC('minute', EventTime) AS M, COUNT(*) AS PageViews FROM Python(hits) WHERE CounterID = 62 AND EventDate >= '2013-07-14' AND EventDate <= '2013-07-15' AND IsRefresh = 0 AND DontCountHits = 0 GROUP BY DATE_TRUNC('minute', EventTime) ORDER BY DATE_TRUNC('minute', EventTime) LIMIT 10 OFFSET 1000 \ No newline at end of file diff --git a/tests/test_state2_dataframe.py b/tests/test_state2_dataframe.py new file mode 100644 index 00000000000..b61210cef99 --- /dev/null +++ b/tests/test_state2_dataframe.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 + +import unittest +import timeit +import datetime +import json +import tempfile +import pandas as pd +import chdb +import os +from urllib.request import urlretrieve + + +class TestChDBDataFrame(unittest.TestCase): + @classmethod + def setUpClass(cls): + # Download parquet file if it doesn't exist + parquet_file = "hits_0.parquet" + if not os.path.exists(parquet_file): + print(f"Downloading {parquet_file}...") + url = "https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_0.parquet" + urlretrieve(url, parquet_file) + print("Download complete!") + + # Load data and prepare DataFrame + cls.hits = pd.read_parquet(parquet_file) + cls.dataframe_size = cls.hits.memory_usage().sum() + + # Fix types + cls.hits["EventTime"] = pd.to_datetime(cls.hits["EventTime"], unit="s") + cls.hits["EventDate"] = pd.to_datetime(cls.hits["EventDate"], unit="D") + + # Convert object columns to string + for col in cls.hits.columns: + if cls.hits[col].dtype == "O": + cls.hits[col] = cls.hits[col].astype(str) + + # Load queries + with open("queries.sql") as f: + cls.queries = f.readlines() + + def setUp(self): + self.tmp_dir = tempfile.TemporaryDirectory() + self.conn = chdb.connect(f"{self.tmp_dir.name}") + + def tearDown(self): + self.conn.close() + self.tmp_dir.cleanup() + + def test_dataframe_size(self): + self.assertGreater(self.dataframe_size, 0, "DataFrame size should be positive") + + def test_query_execution(self): + queries_times = [] + for i, query in enumerate(self.queries, 1): + times = [] + for _ in range(3): + start = timeit.default_timer() + result = self.conn.query(query, "CSV") + end = timeit.default_timer() + times.append(end - start) + + # Verify query results are not empty + self.assertIsNotNone(result, f"Query {i} returned None") + + queries_times.append(times) + # Verify execution times are reasonable + self.assertTrue( + all(t > 0 for t in times), f"Query {i} has invalid execution times" + ) + + result_json = { + "system": "chDB 2.2(DataFrame)", + "date": datetime.date.today().strftime("%Y-%m-%d"), + "machine": "", + "cluster_size": 1, + "comment": "", + "tags": [ + "C++", + "column-oriented", + "embedded", + "stateless", + "serverless", + "dataframe", + "ClickHouse derivative", + ], + "load_time": 0, + "data_size": int(self.dataframe_size), + "result": queries_times, # Will be populated during test_query_execution + } + + print(json.dumps(result_json, indent=2)) + + +if __name__ == "__main__": + unittest.main()