Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add datasource.changed_on to cache_key #8901

Merged
merged 3 commits into from
Jan 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ This file documents any backwards-incompatible changes in Superset and
assists people when migrating to a new version.

## Next
* [8901](https://github.com/apache/incubator-superset/pull/8901): The datasource's update
timestamp has been added to the query object's cache key to ensure updates to
datasources are always reflected in associated query results. As a consequence all
previously cached results will be invalidated when updating to the next version.

* [8732](https://github.com/apache/incubator-superset/pull/8732): Swagger user interface is now enabled by default.
* [8732](https://github.com/apache/incubator-superset/pull/8732): Swagger user interface is now enabled by default.
A new permission `show on SwaggerView` is created by `superset init` and given to the `Admin` Role. To disable the UI,
set `FAB_API_SWAGGER_UI = False` on config.

Expand Down Expand Up @@ -90,17 +94,17 @@ which adds missing non-nullable fields to the `datasources` table. Depending on
the integrity of the data, manual intervention may be required.

* [5452](https://github.com/apache/incubator-superset/pull/5452): a change
which adds missing non-nullable fields and uniqueness constraints (which may be
case insensitive depending on your database configuration) to the `columns`and
`table_columns` tables. Depending on the integrity of the data, manual
which adds missing non-nullable fields and uniqueness constraints (which may be
case insensitive depending on your database configuration) to the `columns`and
`table_columns` tables. Depending on the integrity of the data, manual
intervention may be required.
* `fabmanager` command line is deprecated since Flask-AppBuilder 2.0.0, use
the new `flask fab <command>` integrated with *Flask cli*.
* `SUPERSET_UPDATE_PERMS` environment variable was replaced by
`FAB_UPDATE_PERMS` config boolean key. To disable automatic
creation of permissions set `FAB_UPDATE_PERMS = False` on config.
* [5453](https://github.com/apache/incubator-superset/pull/5453): a change
which adds missing non-nullable fields and uniqueness constraints (which may be
which adds missing non-nullable fields and uniqueness constraints (which may be
case insensitive depending on your database configuration) to the metrics
and sql_metrics tables. Depending on the integrity of the data, manual
intervention may be required.
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ combine_as_imports = true
include_trailing_comma = true
line_length = 88
known_first_party = superset
known_third_party =alembic,backoff,bleach,celery,click,colorama,contextlib2,croniter,dateutil,flask,flask_appbuilder,flask_babel,flask_caching,flask_compress,flask_login,flask_migrate,flask_sqlalchemy,flask_talisman,flask_testing,flask_wtf,geohash,geopy,humanize,isodate,jinja2,markdown,markupsafe,marshmallow,msgpack,numpy,pandas,parsedatetime,pathlib2,polyline,prison,psycopg2,pyarrow,pyhive,pytz,retry,selenium,setuptools,simplejson,sphinx_rtd_theme,sqlalchemy,sqlalchemy_utils,sqlparse,werkzeug,wtforms,wtforms_json,yaml
known_third_party =alembic,backoff,bleach,celery,click,colorama,contextlib2,croniter,dateutil,flask,flask_appbuilder,flask_babel,flask_caching,flask_compress,flask_login,flask_migrate,flask_sqlalchemy,flask_talisman,flask_testing,flask_wtf,geohash,geopy,humanize,isodate,jinja2,markdown,markupsafe,marshmallow,msgpack,numpy,pandas,parsedatetime,pathlib2,polyline,prison,pyarrow,pyhive,pytz,retry,selenium,setuptools,simplejson,sphinx_rtd_theme,sqlalchemy,sqlalchemy_utils,sqlparse,werkzeug,wtforms,wtforms_json,yaml
multi_line_output = 3
order_by_type = false

Expand Down
13 changes: 9 additions & 4 deletions superset/common/query_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,20 +157,25 @@ def cache_timeout(self) -> int:
return self.datasource.database.cache_timeout
return config["CACHE_DEFAULT_TIMEOUT"]

def get_df_payload( # pylint: disable=too-many-locals,too-many-statements
self, query_obj: QueryObject, **kwargs
) -> Dict[str, Any]:
"""Handles caching around the df payload retrieval"""
def cache_key(self, query_obj: QueryObject, **kwargs) -> Optional[str]:
extra_cache_keys = self.datasource.get_extra_cache_keys(query_obj.to_dict())
cache_key = (
query_obj.cache_key(
datasource=self.datasource.uid,
extra_cache_keys=extra_cache_keys,
changed_on=self.datasource.changed_on,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@villebro apologies for the late comment. In your PR description you mention that the cache key should be a function on whether the column or metric definitions associated with a datasource are changed.

On line #166 you merely use the datasource change_on and thus I was wondering whether a changes to the columns and/or metrics cascade, i.e., trigger an update to the datasource changed_on?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is my understanding that changed_on is updated every time any change is applied, i.e. does not check if only relevant metrics/expressions have changed. While this can cause unnecessary cache misses, I felt the added complexity of checking only for relevant changes was not warranted unless the ultimately proposed simpler solution was found to be too generic (I tried to convey this in the unit test which only changed the description). If this does cause unacceptable amounts of cache misses I think we need to revisit this logic; until then I personally think this is a good compromise. However, I'm happy to open up the discussion again if there are opinions to the contrary.

**kwargs
)
if query_obj
else None
)
return cache_key
Comment on lines +160 to +172
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cache key calculation logic was broken into a method of its own to enable easier unit testing.


def get_df_payload( # pylint: disable=too-many-locals,too-many-statements
self, query_obj: QueryObject, **kwargs
) -> Dict[str, Any]:
"""Handles caching around the df payload retrieval"""
cache_key = self.cache_key(query_obj, **kwargs)
logging.info("Cache key: %s", cache_key)
is_loaded = False
stacktrace = None
Expand Down
7 changes: 1 addition & 6 deletions superset/common/query_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,7 @@ def __init__(
self.time_shift = utils.parse_human_timedelta(time_shift)
self.groupby = groupby or []

# Temporal solution for backward compatability issue
# due the new format of non-ad-hoc metric.
self.metrics = [
metric if "expressionType" in metric else metric["label"] # type: ignore
for metric in metrics
]
self.metrics = [utils.get_metric_name(metric) for metric in metrics]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During testing I noticed that the existing logic was incomplete; utils.get_metric_name on the other hand is used elsewhere and handles all metric types correctly (legacy and ad-hoc).

self.row_limit = row_limit
self.filter = filters or []
self.timeseries_limit = timeseries_limit
Expand Down
1 change: 1 addition & 0 deletions superset/viz.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ def cache_key(self, query_obj, **extra):
cache_dict["time_range"] = self.form_data.get("time_range")
cache_dict["datasource"] = self.datasource.uid
cache_dict["extra_cache_keys"] = self.datasource.get_extra_cache_keys(query_obj)
cache_dict["changed_on"] = self.datasource.changed_on
json_data = self.json_dumps(cache_dict, sort_keys=True)
return hashlib.md5(json_data.encode("utf-8")).hexdigest()

Expand Down
76 changes: 52 additions & 24 deletions tests/core_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@
import random
import re
import string
from typing import Any, Dict
import unittest
from unittest import mock

import pandas as pd
import psycopg2
import sqlalchemy as sqla

from tests.test_app import app
from superset import dataframe, db, jinja_context, security_manager, sql_lab
from superset.common.query_context import QueryContext
from superset.connectors.connector_registry import ConnectorRegistry
from superset.connectors.sqla.models import SqlaTable
from superset.db_engine_specs.base import BaseEngineSpec
from superset.db_engine_specs.mssql import MssqlEngineSpec
Expand Down Expand Up @@ -98,7 +100,23 @@ def test_slice_endpoint(self):
resp = self.client.get("/superset/slice/-1/")
assert resp.status_code == 404

def test_cache_key(self):
def _get_query_context_dict(self) -> Dict[str, Any]:
self.login(username="admin")
slc = self.get_slice("Girl Name Cloud", db.session)
return {
"datasource": {"id": slc.datasource_id, "type": slc.datasource_type},
"queries": [
{
"granularity": "ds",
"groupby": ["name"],
"metrics": ["sum__num"],
"filters": [],
"row_limit": 100,
}
],
}

def test_viz_cache_key(self):
self.login(username="admin")
slc = self.get_slice("Girls", db.session)

Expand All @@ -110,30 +128,40 @@ def test_cache_key(self):
qobj["groupby"] = []
self.assertNotEqual(cache_key, viz.cache_key(qobj))

def test_cache_key_changes_when_datasource_is_updated(self):
qc_dict = self._get_query_context_dict()

# construct baseline cache_key
query_context = QueryContext(**qc_dict)
query_object = query_context.queries[0]
cache_key_original = query_context.cache_key(query_object)

# make temporary change and revert it to refresh the changed_on property
datasource = ConnectorRegistry.get_datasource(
datasource_type=qc_dict["datasource"]["type"],
datasource_id=qc_dict["datasource"]["id"],
session=db.session,
)
description_original = datasource.description
datasource.description = "temporary description"
db.session.commit()
datasource.description = description_original
db.session.commit()

# create new QueryContext with unchanged attributes and extract new cache_key
query_context = QueryContext(**qc_dict)
query_object = query_context.queries[0]
cache_key_new = query_context.cache_key(query_object)

# the new cache_key should be different due to updated datasource
self.assertNotEqual(cache_key_original, cache_key_new)

def test_api_v1_query_endpoint(self):
self.login(username="admin")
slc = self.get_slice("Girl Name Cloud", db.session)
form_data = slc.form_data
data = json.dumps(
{
"datasource": {"id": slc.datasource_id, "type": slc.datasource_type},
"queries": [
{
"granularity": "ds",
"groupby": ["name"],
"metrics": ["sum__num"],
"filters": [],
"time_range": "{} : {}".format(
form_data.get("since"), form_data.get("until")
),
"limit": 100,
}
],
}
)
# TODO: update once get_data is implemented for QueryObject
with self.assertRaises(Exception):
self.get_resp("/api/v1/query/", {"query_context": data})
qc_dict = self._get_query_context_dict()
data = json.dumps(qc_dict)
resp = json.loads(self.get_resp("/api/v1/query/", {"query_context": data}))
self.assertEqual(resp[0]["rowcount"], 100)
Comment on lines -134 to +164
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old unit test seemed to be incomplete, so fixed a few bugs in the body (limit -> row_limit and removed time_range) to make it work properly.


def test_old_slice_json_endpoint(self):
self.login(username="admin")
Expand Down