diff --git a/README.md b/README.md index 54f876981b980..33c3344c34508 100644 --- a/README.md +++ b/README.md @@ -17,13 +17,10 @@ Buzz Phrases Database Support ---------------- -Panoramix was originally designed on to of Druid.io, but quickly broadened -to support other databases through the use of SqlAlchemy, a Python +Panoramix was originally designed on to of Druid.io, but quickly broadened +its scope to support other databases through the use of SqlAlchemy, a Python ORM that is compatible with [many external databases](http://docs.sqlalchemy.org/en/rel_1_0/core/engines.html). -At the moment the SQL has been hard coded to use ``LIMIT``-type -dialect and needs to be extended to support other syntax -(``TOP``, ``ROWNUM``, ...) What's Druid? ------------- diff --git a/app/models.py b/app/models.py index 7a9b915f80ed1..6dea1f17a9667 100644 --- a/app/models.py +++ b/app/models.py @@ -8,16 +8,20 @@ from dateutil.parser import parse from pydruid import client from pydruid.utils.filters import Dimension, Filter +from pandas import read_sql_query +from sqlalchemy.sql import table, literal_column +from sqlalchemy import select, and_, text, String from copy import deepcopy, copy from collections import namedtuple from datetime import datetime import logging import json +import sqlparse import requests import textwrap -from app import db, get_session +from app import db, get_session, utils QueryResult = namedtuple('namedtuple', ['df', 'query', 'duration']) @@ -87,7 +91,7 @@ def metrics_combo(self): for m in self.metrics], key=lambda x: x[1]) - def query( + def query_bkp( self, groupby, metrics, granularity, from_dttm, to_dttm, @@ -95,6 +99,11 @@ def query( filter=None, is_timeseries=True, timeseries_limit=15, row_limit=None): + """ + Unused, legacy way of querying by building a SQL string without + using the sqlalchemy expression API (new approach which supports + all dialects) + """ from pandas import read_sql_query qry_start_dttm = datetime.now() metrics_exprs = [ @@ -172,6 +181,82 @@ def query( return QueryResult( df=df, duration=datetime.now() - qry_start_dttm, query=sql) + def query( + self, groupby, metrics, + granularity, + from_dttm, to_dttm, + limit_spec=None, + filter=None, + is_timeseries=True, + timeseries_limit=15, row_limit=None): + + qry_start_dttm = datetime.now() + timestamp = literal_column( + self.main_datetime_column.column_name).label('timestamp') + metrics_exprs = [ + literal_column(m.expression).label(m.metric_name) + for m in self.metrics if m.metric_name in metrics] + + if metrics: + main_metric_expr = literal_column( + [m.expression for m in self.metrics if m.metric_name == metrics[0]][0]) + else: + main_metric_expr = literal_column("COUNT(*)") + + select_exprs = [] + groupby_exprs = [] + + if groupby: + select_exprs = [literal_column(s) for s in groupby] + groupby_exprs = [literal_column(s) for s in groupby] + inner_groupby_exprs = [literal_column(s).label('__' + s) for s in groupby] + select_exprs += metrics_exprs + if granularity != "all": + select_exprs += [timestamp] + groupby_exprs += [timestamp] + + qry = select(select_exprs) + from_clause = table(self.table_name) + qry = qry.group_by(*groupby_exprs) + + where_clause_and = [ + timestamp >= from_dttm.isoformat(), + timestamp < to_dttm.isoformat(), + ] + for col, op, eq in filter: + if op in ('in', 'not in'): + values = eq.split(",") + cond = literal_column(col).in_(values) + if op == 'not in': + cond = ~cond + where_clause_and.append(cond) + qry = qry.where(and_(*where_clause_and)) + qry = qry.limit(row_limit) + + if timeseries_limit and groupby: + subq = select(inner_groupby_exprs) + subq = subq.select_from(table(self.table_name)) + subq = subq.where(and_(*where_clause_and)) + subq = subq.group_by(*inner_groupby_exprs) + subq = subq.limit(timeseries_limit) + on_clause = [] + for gb in groupby: + on_clause.append(literal_column(s)==literal_column("__" + s)) + + from_clause = from_clause.join(subq.alias(), and_(*on_clause)) + + qry = qry.select_from(from_clause) + + engine = self.database.get_sqla_engine() + sql = str(qry.compile(engine, compile_kwargs={"literal_binds": True})) + df = read_sql_query( + sql=sql, + con=engine + ) + sql = sqlparse.format(sql, reindent=True) + return QueryResult( + df=df, duration=datetime.now() - qry_start_dttm, query=sql) + def fetch_metadata(self): table = self.database.get_table(self.table_name) @@ -421,7 +506,9 @@ def query( }], } client.groupby(**pre_qry) + query_str += "// Two phase query\n// Phase 1\n" query_str += json.dumps(client.query_dict, indent=2) + "\n" + query_str += "//\nPhase 2 (built based on phase one's results)\n" df = client.export_pandas() if not df is None and not df.empty: dims = qry['dimensions'] diff --git a/requirements.txt b/requirements.txt index d6cd8d4b866a1..6819d9960f921 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,3 +8,4 @@ pydruid pyhive python-dateutil requests +sqlparse