Skip to content

Commit

Permalink
change approach to reuse same results_key
Browse files Browse the repository at this point in the history
  • Loading branch information
timifasubaa committed Jun 1, 2018
1 parent 0d3dc52 commit 8d54892
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 129 deletions.
12 changes: 5 additions & 7 deletions superset/assets/src/SqlLab/actions.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,11 @@ export function startQuery(query) {
export function querySuccess(query, results) {
return { type: QUERY_SUCCESS, query, results };
}

export function prefetchSuccess(query, results) {
return { type: PREFETCH_SUCCESS, query, results };
}

export function queryFailed(query, msg) {
return { type: QUERY_FAILED, query, msg };
}
Expand All @@ -103,18 +105,14 @@ export function requestQueryResults(query) {

export function fetchQueryResults(query) {
return function (dispatch) {
dispatch(requestQueryResults(query));
let sqlJsonUrl = `/superset/results/${query.resultsKey}`;

if (query.state === 'prefetched') {
sqlJsonUrl += '_prefetch';
}
dispatch(requestQueryResults(query));
const sqlJsonUrl = `/superset/results/${query.resultsKey}/`;
$.ajax({
type: 'GET',
dataType: 'json',
url: sqlJsonUrl,
success(results) {
if (results.status === "prefetched") {
if (results.status === 'prefetched') {
dispatch(prefetchSuccess(query, results));
} else {
dispatch(querySuccess(query, results));
Expand Down
2 changes: 1 addition & 1 deletion superset/assets/src/SqlLab/components/QueryAutoRefresh.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class QueryAutoRefresh extends React.PureComponent {
stopwatch() {
// only poll /superset/queries/ if there are started or running queries
if (this.shouldCheckForQueries()) {
const url = `/superset/queries/${this.props.queriesLastUpdate - QUERY_UPDATE_BUFFER_MS}`;
const url = `/superset/queries/${this.props.queriesLastUpdate - QUERY_UPDATE_BUFFER_MS}`;
$.getJSON(url, (data) => {
if (Object.keys(data).length > 0) {
this.props.actions.refreshQueries(data);
Expand Down
8 changes: 0 additions & 8 deletions superset/assets/src/SqlLab/components/ResultSet.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ export default class ResultSet extends React.PureComponent {
this.clearQueryResults(nextProps.query),
);
}

if (nextProps.query.resultsKey
&& nextProps.query.resultsKey !== this.props.query.resultsKey) {
this.fetchResults(nextProps.query);
Expand All @@ -64,18 +63,12 @@ export default class ResultSet extends React.PureComponent {
getControls() {
if (this.props.search || this.props.visualize || this.props.csv) {
let csvButton;
let next;
if (this.props.csv && this.props.query.state === 'success') {
csvButton = (
<Button bsSize="small" href={'/superset/csv/' + this.props.query.id}>
<i className="fa fa-file-text-o" /> {t('.CSV')}
</Button>
);
next = (
<Button bsSize="small" onClick={ () => {this.fetchResults(this.props.query);} } >
<i className="fa fa-file-text-o" /> {t('Next page of results')}
</Button>
);
}
let visualizeButton;
if (this.props.visualize) {
Expand Down Expand Up @@ -106,7 +99,6 @@ export default class ResultSet extends React.PureComponent {
<ButtonGroup>
{visualizeButton}
{csvButton}
{next}
</ButtonGroup>
</div>
<div className="pull-right">
Expand Down
12 changes: 0 additions & 12 deletions superset/assets/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5287,25 +5287,13 @@ lodash._basecreate@^3.0.0:
version "3.0.3"
resolved "https://registry.yarnpkg.com/lodash._basecreate/-/lodash._basecreate-3.0.3.tgz#1bc661614daa7fc311b7d03bf16806a0213cf821"

lodash._baseisequal@^3.0.0:
version "3.0.7"
resolved "https://registry.yarnpkg.com/lodash._baseisequal/-/lodash._baseisequal-3.0.7.tgz#d8025f76339d29342767dcc887ce5cb95a5b51f1"
dependencies:
lodash.isarray "^3.0.0"
lodash.istypedarray "^3.0.0"
lodash.keys "^3.0.0"

lodash._baseuniq@~4.6.0:
version "4.6.0"
resolved "https://registry.yarnpkg.com/lodash._baseuniq/-/lodash._baseuniq-4.6.0.tgz#0ebb44e456814af7905c6212fa2c9b2d51b841e8"
dependencies:
lodash._createset "~4.0.0"
lodash._root "~3.0.0"

lodash._bindcallback@^3.0.0:
version "3.0.1"
resolved "https://registry.yarnpkg.com/lodash._bindcallback/-/lodash._bindcallback-3.0.1.tgz#e531c27644cf8b57a99e17ed95b35c748789392e"

lodash._createset@~4.0.0:
version "4.0.3"
resolved "https://registry.yarnpkg.com/lodash._createset/-/lodash._createset-4.0.3.tgz#0f4659fbb09d75194fa9e2b88a6644d363c9fe26"
Expand Down
7 changes: 5 additions & 2 deletions superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@
MAPBOX_API_KEY = os.environ.get('MAPBOX_API_KEY', '')

# Maximum number of rows returned in the SQL editor
SQL_MAX_ROW = 1000000
SQL_MAX_ROW = 10000
DISPLAY_SQL_MAX_ROW = 1000

# Maximum number of tables/views displayed in the dropdown window in SQL Lab.
Expand Down Expand Up @@ -295,7 +295,10 @@ class CeleryConfig(object):
SQLLAB_TIMEOUT = 30

# When set to true, results from asynchronous sql lab are prefetched
PREFETCH_PRESTO = True
PREFETCH_ASYNC = True

# Howmany rows to prefetch from asyncronous queries
PREFETCH_ROWS = 100

# SQLLAB_DEFAULT_DBID
SQLLAB_DEFAULT_DBID = None
Expand Down
24 changes: 13 additions & 11 deletions superset/db_engine_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ class BaseEngineSpec(object):
inner_joins = True

@classmethod
def fetch_data(cls, cursor, limit):
if cls.limit_method == LimitMethod.FETCH_MANY:
def fetch_data(cls, cursor, limit, prefetch=False):
if cls.limit_method == LimitMethod.FETCH_MANY or prefetch:
return cursor.fetchmany(limit)
return cursor.fetchall()

Expand Down Expand Up @@ -728,9 +728,10 @@ def extra_table_metadata(cls, database, table_name, schema_name):
}

@classmethod
def prefetch_results(cls, cursor, query, cache_timeout, session, limit=1000):
def prefetch_results(cls, cursor, query, cache_timeout, session, limit):
data = cursor.fetchmany(limit)
cdf = utils.convert_results_to_df(cursor.description, data)
column_names = cls.get_normalized_column_names(cursor.description)
cdf = utils.convert_results_to_df(column_names, data)
payload = dict(query_id=query.id)
payload.update({
'status': utils.QueryStatus.PREFETCHED,
Expand All @@ -741,7 +742,7 @@ def prefetch_results(cls, cursor, query, cache_timeout, session, limit=1000):

json_payload = json.dumps(payload, default=utils.json_iso_dttm_ser)
key = '{}'.format(uuid.uuid4())
prefetch_key = key + '_prefetch'
prefetch_key = key
results_backend.set(
prefetch_key, utils.zlib_compress(json_payload), cache_timeout)
query.status = utils.QueryStatus.PREFETCHED
Expand All @@ -760,20 +761,19 @@ def handle_cursor(cls, cursor, query, session, cache_timeout=0):
while polled:
# Update the object and wait for the kill signal.
stats = polled.get('stats', {})
processed_rows = stats['processedRows']

query = session.query(type(query)).filter_by(id=query.id).one()
if query.status in [QueryStatus.STOPPED, QueryStatus.TIMED_OUT]:
cursor.cancel()
break

if (
config.get('PREFETCH_PRESTO') and
processed_rows > 1000 and
not query.has_loaded_early
config.get('PREFETCH_ASYNC') and
(not query.has_loaded_early)
):
query.has_loaded_early = True
PrestoEngineSpec.prefetch_results(cursor, query, cache_timeout, session)
limit = config.get('PREFETCH_ROWS')
PrestoEngineSpec.prefetch_results(
cursor, query, cache_timeout, session, limit)

if stats:
completed_splits = float(stats.get('completedSplits'))
Expand Down Expand Up @@ -1111,6 +1111,8 @@ def handle_cursor(cls, cursor, query, session):
if query.status == QueryStatus.STOPPED:
cursor.cancel()
break
if hive.ttypes.TOperationState.RUNNING_STATE == polled.operationState:
BaseEngineSpec.fetch_data(cursor, 100, prefetch=True)

log = cursor.fetch_logs() or ''
if log:
Expand Down
75 changes: 9 additions & 66 deletions superset/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,12 @@
import uuid

from celery.exceptions import SoftTimeLimitExceeded
<<<<<<< HEAD
from contextlib2 import contextmanager
import numpy as np
import pandas as pd
=======
>>>>>>> prefetch asyncronous query results from presto
import sqlalchemy
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool

<<<<<<< HEAD
from superset import app, dataframe, db, results_backend, security_manager, utils
=======
from superset import app, db, results_backend, security_manager, utils
from superset.db_engine_specs import LimitMethod
>>>>>>> prefetch asyncronous query results from presto
from superset.models.sql_lab import Query
from superset.sql_parse import SupersetQuery
from superset.utils import get_celery_app, QueryStatus
Expand All @@ -39,27 +29,6 @@ class SqlLabException(Exception):
pass


def dedup(l, suffix='__'):
"""De-duplicates a list of string by suffixing a counter
Always returns the same number of entries as provided, and always returns
unique values.
>>> print(','.join(dedup(['foo', 'bar', 'bar', 'bar'])))
foo,bar,bar__1,bar__2
"""
new_l = []
seen = {}
for s in l:
if s in seen:
seen[s] += 1
s += suffix + str(seen[s])
else:
seen[s] = 0
new_l.append(s)
return new_l


def get_query(query_id, session, retry_count=5):
"""attemps to get the query and retry if it cannot"""
query = None
Expand Down Expand Up @@ -104,40 +73,19 @@ def session_scope(nullpool):
session.close()


def convert_results_to_df(column_names, data):
"""Convert raw query results to a DataFrame."""
column_names = dedup(column_names)

# check whether the result set has any nested dict columns
if data:
first_row = data[0]
has_dict_col = any([isinstance(c, dict) for c in first_row])
df_data = list(data) if has_dict_col else np.array(data, dtype=object)
else:
df_data = []

cdf = dataframe.SupersetDataFrame(
pd.DataFrame(df_data, columns=column_names))

return cdf


@celery_app.task(bind=True, soft_time_limit=SQLLAB_TIMEOUT)
def get_sql_results(
ctask, query_id, rendered_query, return_results=True, store_results=False,
user_name=None):
"""Executes the sql query returns the results."""
<<<<<<< HEAD
with session_scope(not ctask.request.called_directly) as session:

try:
return execute_sql(
ctask, query_id, rendered_query, return_results, store_results, user_name,
session=session)
except Exception as e:
logging.exception(e)
stats_logger.incr('error_sqllab_unhandled')
sesh = get_session(not ctask.request.called_directly)
query = get_query(query_id, session)
query.error_message = str(e)
query.status = QueryStatus.FAILED
Expand Down Expand Up @@ -176,9 +124,6 @@ def handle_error(msg):

if store_results and not results_backend:
return handle_error("Results backend isn't configured.")
cache_timeout = database.cache_timeout
if cache_timeout is None:
cache_timeout = config.get('CACHE_DEFAULT_TIMEOUT', 0)

# Limit enforced only for retrieving the data, not for the CTA queries.
superset_query = SupersetQuery(rendered_query)
Expand Down Expand Up @@ -233,7 +178,7 @@ def handle_error(msg):
logging.info('Handling cursor')
db_engine_spec.handle_cursor(cursor, query, session)
logging.info('Fetching data: {}'.format(query.to_dict()))
data = db_engine_spec.fetch_data(cursor, query)
data = db_engine_spec.fetch_data(cursor, query.limit)
except SoftTimeLimitExceeded as e:
logging.exception(e)
if conn is not None:
Expand All @@ -255,12 +200,6 @@ def handle_error(msg):
conn.close()

if query.status == utils.QueryStatus.STOPPED:
<<<<<<< HEAD
return handle_error('The query has been stopped')

cdf = convert_results_to_df(column_names, data)

=======
return json.dumps(
{
'query_id': query.id,
Expand All @@ -269,8 +208,13 @@ def handle_error(msg):
},
default=utils.json_iso_dttm_ser)

cdf = utils.convert_results_to_df(cursor_description, data)
>>>>>>> prefetch asyncronous query results from presto
if query.has_loaded_early:
preloaded_data = results_backend.get(query.results_key)
preloaded_data = json.loads(
utils.zlib_decompress_to_string(preloaded_data))['data']
preloaded_data = [list(row.values()) for row in preloaded_data]
data = preloaded_data + data
cdf = utils.convert_results_to_df(column_names, data)
query.rows = cdf.size
query.progress = 100
query.status = QueryStatus.SUCCESS
Expand All @@ -285,12 +229,11 @@ def handle_error(msg):
query.end_time = utils.now_as_float()
session.merge(query)
session.flush()
new_key = query.results_key if cdf.data else utils.prefetch_key(query.results_key)

payload.update({
'status': query.status,
'data': cdf.data if cdf.data else [],
'results_key': new_key
'results_key': query.results_key,
'columns': cdf.columns if cdf.columns else [],
'query': query.to_dict(),
})
Expand Down
12 changes: 3 additions & 9 deletions superset/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def convert_results_to_df(cursor_description, data):
import pandas as pd
"""Convert raw query results to a DataFrame."""
column_names = (
[col[0] for col in cursor_description] if cursor_description else [])
[col for col in cursor_description] if cursor_description else [])
column_names = dedup(column_names)

# check whether the result set has any nested dict columns
Expand All @@ -188,9 +188,9 @@ def convert_results_to_df(cursor_description, data):
else:
df_data = []

pdf = pd.DataFrame(df_data, columns=column_names)
cdf = dataframe.SupersetDataFrame(
pd.DataFrame(df_data, columns=column_names))

pdf)
return cdf


Expand Down Expand Up @@ -927,9 +927,3 @@ def split_adhoc_filters_into_base_filters(fd):
fd['having_filters'] = simple_having_filters
fd['filters'] = simple_where_filters
del fd['adhoc_filters']



def prefetch_key(key):
# given a key, this returns the location of the prefetched key
return key + "_prefetch"
Loading

0 comments on commit 8d54892

Please sign in to comment.