Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SQL Lab] Async query results serialization with MessagePack and PyArrow #8069

Merged
merged 9 commits into from
Aug 27, 2019
6 changes: 6 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ assists people when migrating to a new version.

## Next Version

* [8069](https://github.com/apache/incubator-superset/pull/8069): introduces
[MessagePack](https://github.com/msgpack/msgpack-python) and
[PyArrow](https://arrow.apache.org/docs/python/) for async query results
backend serialization. To disable set `RESULTS_BACKEND_USE_MSGPACK = False`
in your configuration.

* [7848](https://github.com/apache/incubator-superset/pull/7848): If you are
running redis with celery, celery bump to 4.3.0 requires redis-py upgrade to
3.2.0 or later.
Expand Down
6 changes: 6 additions & 0 deletions docs/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,12 @@ look something like:
RESULTS_BACKEND = RedisCache(
host='localhost', port=6379, key_prefix='superset_results')

For performance gains, `MessagePack <https://github.com/msgpack/msgpack-python>`_
and `PyArrow <https://arrow.apache.org/docs/python/>`_ are now used for results
serialization. This can be disabled by setting ``RESULTS_BACKEND_USE_MSGPACK = False``
in your configuration, should any issues arise. Please clear your existing results
cache store when upgrading an existing environment.

**Important notes**

* It is important that all the worker nodes and web servers in
Expand Down
6 changes: 4 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,15 @@ markupsafe==1.1.1 # via jinja2, mako
marshmallow-enum==1.4.1 # via flask-appbuilder
marshmallow-sqlalchemy==0.17.0 # via flask-appbuilder
marshmallow==2.19.5 # via flask-appbuilder, marshmallow-enum, marshmallow-sqlalchemy
numpy==1.17.0 # via pandas
msgpack==0.6.1
numpy==1.17.0 # via pandas, pyarrow
pandas==0.24.2
parsedatetime==2.4
pathlib2==2.3.4
polyline==1.4.0
prison==0.1.2 # via flask-appbuilder
py==1.8.0 # via retry
pyarrow==0.14.1
pycparser==2.19 # via cffi
pyjwt==1.7.1 # via flask-appbuilder, flask-jwt-extended
pyrsistent==0.15.4 # via jsonschema
Expand All @@ -70,7 +72,7 @@ pyyaml==5.1.2
retry==0.9.2
selenium==3.141.0
simplejson==3.16.0
six==1.12.0 # via bleach, cryptography, flask-jwt-extended, flask-talisman, isodate, jsonschema, pathlib2, polyline, prison, pyrsistent, python-dateutil, sqlalchemy-utils, wtforms-json
six==1.12.0 # via bleach, cryptography, flask-jwt-extended, flask-talisman, isodate, jsonschema, pathlib2, polyline, prison, pyarrow, pyrsistent, python-dateutil, sqlalchemy-utils, wtforms-json
sqlalchemy-utils==0.34.1
sqlalchemy==1.3.6
sqlparse==0.3.0
Expand Down
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,15 @@ def get_git_sha():
"idna",
"isodate",
"markdown>=3.0",
"msgpack>=0.6.1, <0.7.0",
mistercrunch marked this conversation as resolved.
Show resolved Hide resolved
"pandas>=0.24.2, <0.25.0",
"parsedatetime",
"pathlib2",
"polyline",
"python-dateutil",
"python-dotenv",
"python-geohash",
"pyarrow>=0.14.1, <0.15.0",
mistercrunch marked this conversation as resolved.
Show resolved Hide resolved
"pyyaml>=5.1",
"retry>=0.9.2",
"selenium>=3.141.0",
Expand Down
1 change: 1 addition & 0 deletions superset/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ def index(self):
security_manager = appbuilder.sm

results_backend = app.config.get("RESULTS_BACKEND")
results_backend_use_msgpack = app.config.get("RESULTS_BACKEND_USE_MSGPACK")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, I think we like to add the config var to the config.py file so that it's easier to see all the configurations available. That said, I'm wondering if this should be a config var at all if it has such good performance improvements. If you're worried about it being globally enabled without testing, maybe make this a feature flag that we can default off to begin with, but then remove and enable everywhere once we're sure everything is good

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, all configs should be set as default in superset/config.py and commented/documented there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feature flag feels a bit heavy for this case, since it's a global setting and not needed on the frontend. My main concern about pushing this change without a config flag is the lack of testing with data sources other than Postgres, but I do think we should push that testing forward by perhaps defaulting RESULTS_BACKEND_USE_MSGPACK = True, and providing an escape hatch to disable should problems crop up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, maybe I have a misunderstanding of the semantics around making something a feature flag. I understood it as gating a new feature that we would want to roll out to 100% of users in the future. So while testing people can enable/disable the feature, but with the expectation of in the future it being enabled by default. Feature flags are then necessary to implement on the frontend to support that goal, but aren't required to be both frontend and backend changes. Maybe @mistercrunch can clarify which understanding is correct

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding of the feature flag framework is limited, so open to feedback here. Since the change is more middleware than feature, and adding the flag to the JS payload isn't necessary, it feels more like config, but curious what others think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Config keys are static, environment-wide scoped.

Feature flags can be dynamic and currently they all flow to the frontend. Being dynamic, they can be used to do progressive rollouts or A/B testing.

This current flag in this PR seems more like the former to me


# Merge user defined feature flags with default feature flags
_feature_flags = app.config.get("DEFAULT_FEATURE_FLAGS") or {}
Expand Down
6 changes: 6 additions & 0 deletions superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,12 @@ class CeleryConfig(object):
# in SQL Lab by using the "Run Async" button/feature
RESULTS_BACKEND = None

# Use PyArrow and MessagePack for async query results serialization,
# rather than JSON. This feature requires additional testing from the
# community before it is fully adopted, so this config option is provided
# in order to disable should breaking issues be discovered.
RESULTS_BACKEND_USE_MSGPACK = True

# The S3 bucket where you want to store your external hive tables created
# from CSV files. For example, 'companyname-superset'
CSV_TO_HIVE_UPLOAD_S3_BUCKET = None
Expand Down
12 changes: 10 additions & 2 deletions superset/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,27 @@ def __init__(self, data, cursor_description, db_engine_spec):
except Exception as e:
logging.exception(e)

@property
def raw_df(self):
return self.df

@property
def size(self):
return len(self.df.index)

@property
def data(self):
return self.format_data(self.df)

@classmethod
def format_data(cls, df):
# work around for https://github.com/pandas-dev/pandas/issues/18372
data = [
dict(
(k, maybe_box_datetimelike(v))
for k, v in zip(self.df.columns, np.atleast_1d(row))
for k, v in zip(df.columns, np.atleast_1d(row))
)
for row in self.df.values
for row in df.values
]
for d in data:
for k, v in list(d.items()):
Expand Down
78 changes: 68 additions & 10 deletions superset/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,30 @@
from contextlib import closing
from datetime import datetime
import logging
from sys import getsizeof
from time import sleep
from typing import Optional, Tuple, Union
import uuid

from celery.exceptions import SoftTimeLimitExceeded
from contextlib2 import contextmanager
from flask_babel import lazy_gettext as _
import msgpack
import pyarrow as pa
import simplejson as json
import sqlalchemy
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool

from superset import app, dataframe, db, results_backend, security_manager
from superset import (
app,
db,
results_backend,
results_backend_use_msgpack,
security_manager,
)
from superset.dataframe import SupersetDataFrame
from superset.db_engine_specs import BaseEngineSpec
from superset.models.sql_lab import Query
from superset.sql_parse import ParsedQuery
from superset.tasks.celery_app import app as celery_app
Expand Down Expand Up @@ -221,7 +233,46 @@ def execute_sql_statement(sql_statement, query, user_name, session, cursor):

logging.debug("Fetching cursor description")
cursor_description = cursor.description
return dataframe.SupersetDataFrame(data, cursor_description, db_engine_spec)
return SupersetDataFrame(data, cursor_description, db_engine_spec)


def _serialize_payload(
payload: dict, use_msgpack: Optional[bool] = False
) -> Union[bytes, str]:
logging.debug(f"Serializing to msgpack: {use_msgpack}")
if use_msgpack:
return msgpack.dumps(payload, default=json_iso_dttm_ser, use_bin_type=True)
else:
return json.dumps(payload, default=json_iso_dttm_ser, ignore_nan=True)


def _serialize_and_expand_data(
cdf: SupersetDataFrame,
db_engine_spec: BaseEngineSpec,
use_msgpack: Optional[bool] = False,
) -> Tuple[Union[bytes, str], list, list, list]:
selected_columns: list = cdf.columns or []
expanded_columns: list

if use_msgpack:
with stats_timing(
"sqllab.query.results_backend_pa_serialization", stats_logger
):
data = (
pa.default_serialization_context()
.serialize(cdf.raw_df)
.to_buffer()
.to_pybytes()
)
# expand when loading data from results backend
all_columns, expanded_columns = (selected_columns, [])
else:
data = cdf.data or []
all_columns, data, expanded_columns = db_engine_spec.expand_data(
mistercrunch marked this conversation as resolved.
Show resolved Hide resolved
selected_columns, data
)

return (data, selected_columns, all_columns, expanded_columns)


def execute_sql_statements(
Expand Down Expand Up @@ -300,10 +351,8 @@ def execute_sql_statements(
)
query.end_time = now_as_float()

selected_columns = cdf.columns or []
data = cdf.data or []
all_columns, data, expanded_columns = db_engine_spec.expand_data(
selected_columns, data
data, selected_columns, all_columns, expanded_columns = _serialize_and_expand_data(
cdf, db_engine_spec, store_results and results_backend_use_msgpack
)

payload.update(
Expand All @@ -322,13 +371,22 @@ def execute_sql_statements(
key = str(uuid.uuid4())
logging.info(f"Storing results in results backend, key: {key}")
with stats_timing("sqllab.query.results_backend_write", stats_logger):
json_payload = json.dumps(
payload, default=json_iso_dttm_ser, ignore_nan=True
)
with stats_timing(
"sqllab.query.results_backend_write_serialization", stats_logger
):
serialized_payload = _serialize_payload(
payload, results_backend_use_msgpack
)
cache_timeout = database.cache_timeout
if cache_timeout is None:
cache_timeout = config.get("CACHE_DEFAULT_TIMEOUT", 0)
results_backend.set(key, zlib_compress(json_payload), cache_timeout)

compressed = zlib_compress(serialized_payload)
logging.debug(
f"*** serialized payload size: {getsizeof(serialized_payload)}"
mistercrunch marked this conversation as resolved.
Show resolved Hide resolved
)
logging.debug(f"*** compressed payload size: {getsizeof(compressed)}")
results_backend.set(key, compressed, cache_timeout)
query.results_key = key

query.status = QueryStatus.SUCCESS
Expand Down
8 changes: 4 additions & 4 deletions superset/utils/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import sys
from time import struct_time
import traceback
from typing import List, NamedTuple, Optional, Tuple
from typing import List, NamedTuple, Optional, Tuple, Union
from urllib.parse import unquote_plus
import uuid
import zlib
Expand Down Expand Up @@ -803,12 +803,12 @@ def zlib_compress(data):
return zlib.compress(data)


def zlib_decompress_to_string(blob):
def zlib_decompress(blob: bytes, decode: Optional[bool] = True) -> Union[bytes, str]:
"""
Decompress things to a string in a py2/3 safe fashion
>>> json_str = '{"test": 1}'
>>> blob = zlib_compress(json_str)
>>> got_str = zlib_decompress_to_string(blob)
>>> got_str = zlib_decompress(blob)
>>> got_str == json_str
True
"""
Expand All @@ -817,7 +817,7 @@ def zlib_decompress_to_string(blob):
decompressed = zlib.decompress(blob)
else:
decompressed = zlib.decompress(bytes(blob, "utf-8"))
return decompressed.decode("utf-8")
return decompressed.decode("utf-8") if decode else decompressed
mistercrunch marked this conversation as resolved.
Show resolved Hide resolved
return zlib.decompress(blob)


Expand Down
54 changes: 47 additions & 7 deletions superset/views/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from datetime import datetime, timedelta
import logging
import re
from typing import Dict, List # noqa: F401
from typing import Dict, List, Optional, Union # noqa: F401
from urllib import parse

from flask import (
Expand All @@ -40,7 +40,9 @@
from flask_appbuilder.security.sqla import models as ab_models
from flask_babel import gettext as __
from flask_babel import lazy_gettext as _
import msgpack
import pandas as pd
import pyarrow as pa
import simplejson as json
from sqlalchemy import and_, or_, select
from werkzeug.routing import BaseConverter
Expand All @@ -50,11 +52,13 @@
appbuilder,
cache,
conf,
dataframe,
db,
event_logger,
get_feature_flags,
is_feature_enabled,
results_backend,
results_backend_use_msgpack,
security_manager,
sql_lab,
viz,
Expand All @@ -76,7 +80,7 @@
from superset.utils import core as utils
from superset.utils import dashboard_import_export
from superset.utils.dates import now_as_float
from superset.utils.decorators import etag_cache
from superset.utils.decorators import etag_cache, stats_timing
from .base import (
api,
BaseSupersetView,
Expand Down Expand Up @@ -186,6 +190,38 @@ def check_slice_perms(self, slice_id):
security_manager.assert_datasource_permission(viz_obj.datasource)


def _deserialize_results_payload(
payload: Union[bytes, str], query, use_msgpack: Optional[bool] = False
) -> dict:
logging.debug(f"Deserializing from msgpack: {use_msgpack}")
if use_msgpack:
with stats_timing(
"sqllab.query.results_backend_msgpack_deserialize", stats_logger
):
ds_payload = msgpack.loads(payload, raw=False)

with stats_timing("sqllab.query.results_backend_pa_deserialize", stats_logger):
df = pa.deserialize(ds_payload["data"])

# TODO: optimize this, perhaps via df.to_dict, then traversing
ds_payload["data"] = dataframe.SupersetDataFrame.format_data(df) or []

db_engine_spec = query.database.db_engine_spec
all_columns, data, expanded_columns = db_engine_spec.expand_data(
ds_payload["selected_columns"], ds_payload["data"]
)
ds_payload.update(
{"data": data, "columns": all_columns, "expanded_columns": expanded_columns}
)

return ds_payload
else:
with stats_timing(
"sqllab.query.results_backend_json_deserialize", stats_logger
):
return json.loads(payload) # noqa


class SliceFilter(SupersetFilter):
def apply(self, query, func): # noqa
if security_manager.all_datasource_access():
Expand Down Expand Up @@ -2410,12 +2446,12 @@ def results(self, key):
status=403,
)

payload = utils.zlib_decompress_to_string(blob)
payload_json = json.loads(payload)
payload = utils.zlib_decompress(blob, decode=not results_backend_use_msgpack)
obj = _deserialize_results_payload(payload, query, results_backend_use_msgpack)

return json_success(
json.dumps(
apply_display_max_row_limit(payload_json),
apply_display_max_row_limit(obj),
default=utils.json_iso_dttm_ser,
ignore_nan=True,
)
Expand Down Expand Up @@ -2663,8 +2699,12 @@ def csv(self, client_id):
blob = results_backend.get(query.results_key)
if blob:
logging.info("Decompressing")
json_payload = utils.zlib_decompress_to_string(blob)
obj = json.loads(json_payload)
payload = utils.zlib_decompress(
blob, decode=not results_backend_use_msgpack
)
obj = _deserialize_results_payload(
payload, query, results_backend_use_msgpack
)
columns = [c["name"] for c in obj["columns"]]
df = pd.DataFrame.from_records(obj["data"], columns=columns)
logging.info("Using pandas to convert to CSV")
Expand Down
Loading