Skip to content

Commit

Permalink
fix: Time shifts with different granularity for ECharts (#24176)
Browse files Browse the repository at this point in the history
  • Loading branch information
michael-s-molina authored Jun 8, 2023
1 parent e922f09 commit e5b7f7c
Show file tree
Hide file tree
Showing 39 changed files with 682 additions and 416 deletions.
159 changes: 133 additions & 26 deletions superset/common/query_context_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from superset.common.utils.query_cache_manager import QueryCacheManager
from superset.common.utils.time_range_utils import get_since_until_from_query_object
from superset.connectors.base.models import BaseDatasource
from superset.constants import CacheRegion
from superset.constants import CacheRegion, TimeGrain
from superset.exceptions import (
InvalidPostProcessingError,
QueryObjectValidationError,
Expand Down Expand Up @@ -74,6 +74,27 @@
stats_logger: BaseStatsLogger = config["STATS_LOGGER"]
logger = logging.getLogger(__name__)

# Temporary column used for joining aggregated offset results
AGGREGATED_JOIN_COLUMN = "__aggregated_join_column"

# This only includes time grains that may influence
# the temporal column used for joining offset results.
# Given that we don't allow time shifts smaller than a day,
# we don't need to include smaller time grains aggregations.
AGGREGATED_JOIN_GRAINS = {
TimeGrain.WEEK,
TimeGrain.WEEK_STARTING_SUNDAY,
TimeGrain.WEEK_STARTING_MONDAY,
TimeGrain.WEEK_ENDING_SATURDAY,
TimeGrain.WEEK_ENDING_SUNDAY,
TimeGrain.MONTH,
TimeGrain.QUARTER,
TimeGrain.YEAR,
}

# Right suffix used for joining offset results
R_SUFFIX = "__right_suffix"


class CachedTimeOffset(TypedDict):
df: pd.DataFrame
Expand All @@ -89,10 +110,6 @@ class QueryContextProcessor:

_query_context: QueryContext
_qc_datasource: BaseDatasource
"""
The query context contains the query object and additional fields necessary
to retrieve the data payload for a given viz.
"""

def __init__(self, query_context: QueryContext):
self._query_context = query_context
Expand Down Expand Up @@ -307,6 +324,35 @@ def _get_timestamp_format(

return df

@staticmethod
def get_time_grain(query_object: QueryObject) -> Any | None:
if (
query_object.columns
and len(query_object.columns) > 0
and isinstance(query_object.columns[0], dict)
):
# If the time grain is in the columns it will be the first one
# and it will be of AdhocColumn type
return query_object.columns[0].get("timeGrain")

return query_object.extras.get("time_grain_sqla")

def add_aggregated_join_column(
self,
df: pd.DataFrame,
time_grain: str,
join_column_producer: Any = None,
) -> None:
if join_column_producer:
df[AGGREGATED_JOIN_COLUMN] = df.apply(
lambda row: join_column_producer(row, 0), axis=1
)
else:
df[AGGREGATED_JOIN_COLUMN] = df.apply(
lambda row: self.get_aggregated_join_column(row, 0, time_grain),
axis=1,
)

def processing_time_offsets( # pylint: disable=too-many-locals,too-many-statements
self,
df: pd.DataFrame,
Expand All @@ -317,9 +363,8 @@ def processing_time_offsets( # pylint: disable=too-many-locals,too-many-stateme
query_object_clone = copy.copy(query_object)
queries: list[str] = []
cache_keys: list[str | None] = []
rv_dfs: list[pd.DataFrame] = [df]
offset_dfs: list[pd.DataFrame] = []

time_offsets = query_object.time_offsets
outer_from_dttm, outer_to_dttm = get_since_until_from_query_object(query_object)
if not outer_from_dttm or not outer_to_dttm:
raise QueryObjectValidationError(
Expand All @@ -328,7 +373,31 @@ def processing_time_offsets( # pylint: disable=too-many-locals,too-many-stateme
"when using a Time Comparison."
)
)
for offset in time_offsets:

columns = df.columns
time_grain = self.get_time_grain(query_object)

if not time_grain:
raise QueryObjectValidationError(
_("Time Grain must be specified when using Time Shift.")
)

join_column_producer = config["TIME_GRAIN_JOIN_COLUMN_PRODUCERS"].get(
time_grain
)
use_aggregated_join_column = (
join_column_producer or time_grain in AGGREGATED_JOIN_GRAINS
)
if use_aggregated_join_column:
self.add_aggregated_join_column(df, time_grain, join_column_producer)
# skips the first column which is the temporal column
# because we'll use the aggregated join columns instead
columns = df.columns[1:]

metric_names = get_metric_names(query_object.metrics)
join_keys = [col for col in columns if col not in metric_names]

for offset in query_object.time_offsets:
try:
# pylint: disable=line-too-long
# Since the xaxis is also a column name for the time filter, xaxis_label will be set as granularity
Expand Down Expand Up @@ -364,13 +433,15 @@ def processing_time_offsets( # pylint: disable=too-many-locals,too-many-stateme
]

# `offset` is added to the hash function
cache_key = self.query_cache_key(query_object_clone, time_offset=offset)
cache_key = self.query_cache_key(
query_object_clone, time_offset=offset, time_grain=time_grain
)
cache = QueryCacheManager.get(
cache_key, CacheRegion.DATA, query_context.force
)
# whether hit on the cache
if cache.is_loaded:
rv_dfs.append(cache.df)
offset_dfs.append(cache.df)
queries.append(cache.query)
cache_keys.append(cache_key)
continue
Expand All @@ -379,11 +450,8 @@ def processing_time_offsets( # pylint: disable=too-many-locals,too-many-stateme
# rename metrics: SUM(value) => SUM(value) 1 year ago
metrics_mapping = {
metric: TIME_COMPARISON.join([metric, offset])
for metric in get_metric_names(
query_object_clone_dct.get("metrics", [])
)
for metric in metric_names
}
join_keys = [col for col in df.columns if col not in metrics_mapping.keys()]

if isinstance(self._qc_datasource, Query):
result = self._qc_datasource.exc_query(query_object_clone_dct)
Expand Down Expand Up @@ -420,21 +488,19 @@ def processing_time_offsets( # pylint: disable=too-many-locals,too-many-stateme
)
)

# modifies temporal column using offset
offset_metrics_df[index] = offset_metrics_df[index] - DateOffset(
**normalize_time_delta(offset)
)

# df left join `offset_metrics_df`
offset_df = dataframe_utils.left_join_df(
left_df=df,
right_df=offset_metrics_df,
join_keys=join_keys,
)
offset_slice = offset_df[metrics_mapping.values()]
if use_aggregated_join_column:
self.add_aggregated_join_column(
offset_metrics_df, time_grain, join_column_producer
)

# set offset_slice to cache and stack.
# cache df and query
value = {
"df": offset_slice,
"df": offset_metrics_df,
"query": result.query,
}
cache.set(
Expand All @@ -444,10 +510,51 @@ def processing_time_offsets( # pylint: disable=too-many-locals,too-many-stateme
datasource_uid=query_context.datasource.uid,
region=CacheRegion.DATA,
)
rv_dfs.append(offset_slice)
offset_dfs.append(offset_metrics_df)

if offset_dfs:
# iterate on offset_dfs, left join each with df
for offset_df in offset_dfs:
df = dataframe_utils.left_join_df(
left_df=df,
right_df=offset_df,
join_keys=join_keys,
rsuffix=R_SUFFIX,
)

rv_df = pd.concat(rv_dfs, axis=1, copy=False) if time_offsets else df
return CachedTimeOffset(df=rv_df, queries=queries, cache_keys=cache_keys)
# removes columns used for join
df.drop(
list(df.filter(regex=f"{AGGREGATED_JOIN_COLUMN}|{R_SUFFIX}")),
axis=1,
inplace=True,
)

return CachedTimeOffset(df=df, queries=queries, cache_keys=cache_keys)

@staticmethod
def get_aggregated_join_column(
row: pd.Series, column_index: int, time_grain: str
) -> str:
if time_grain in (
TimeGrain.WEEK_STARTING_SUNDAY,
TimeGrain.WEEK_ENDING_SATURDAY,
):
return row[column_index].strftime("%Y-W%U")

if time_grain in (
TimeGrain.WEEK,
TimeGrain.WEEK_STARTING_MONDAY,
TimeGrain.WEEK_ENDING_SUNDAY,
):
return row[column_index].strftime("%Y-W%W")

if time_grain == TimeGrain.MONTH:
return row[column_index].strftime("%Y-%m")

if time_grain == TimeGrain.QUARTER:
return row[column_index].strftime("%Y-Q") + str(row[column_index].quarter)

return row[column_index].strftime("%Y")

def get_data(self, df: pd.DataFrame) -> str | list[dict[str, Any]]:
if self._query_context.result_format in ChartDataResultFormat.table_like():
Expand Down
6 changes: 5 additions & 1 deletion superset/common/utils/dataframe_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@ def left_join_df(
left_df: pd.DataFrame,
right_df: pd.DataFrame,
join_keys: list[str],
lsuffix: str = "",
rsuffix: str = "",
) -> pd.DataFrame:
df = left_df.set_index(join_keys).join(right_df.set_index(join_keys))
df = left_df.set_index(join_keys).join(
right_df.set_index(join_keys), lsuffix=lsuffix, rsuffix=rsuffix
)
df.reset_index(inplace=True)
return df

Expand Down
12 changes: 12 additions & 0 deletions superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from dateutil import tz
from flask import Blueprint
from flask_appbuilder.security.manager import AUTH_DB
from pandas import Series
from pandas._libs.parsers import STR_NA_VALUES # pylint: disable=no-name-in-module
from sqlalchemy.orm.query import Query

Expand Down Expand Up @@ -773,6 +774,17 @@ class D3Format(TypedDict, total=False):
# }
TIME_GRAIN_ADDON_EXPRESSIONS: dict[str, dict[str, str]] = {}

# Map of custom time grains and artificial join column producers used
# when generating the join key between results and time shifts.
# See supeset/common/query_context_processor.get_aggregated_join_column
#
# Example of a join column producer that aggregates by fiscal year
# def join_producer(row: Series, column_index: int) -> str:
# return row[index].strftime("%F")
#
# TIME_GRAIN_JOIN_COLUMN_PRODUCERS = {"P1F": join_producer}
TIME_GRAIN_JOIN_COLUMN_PRODUCERS: dict[str, Callable[[Series, int], str]] = {}

# ---------------------------------------------------
# List of viz_types not allowed in your environment
# For example: Disable pivot table and treemap:
Expand Down
24 changes: 24 additions & 0 deletions superset/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,30 @@ class RouteMethod: # pylint: disable=too-few-public-methods
)


class TimeGrain(str, Enum):
SECOND = "PT1S"
FIVE_SECONDS = "PT5S"
THIRTY_SECONDS = "PT30S"
MINUTE = "PT1M"
FIVE_MINUTES = "PT5M"
TEN_MINUTES = "PT10M"
FIFTEEN_MINUTES = "PT15M"
THIRTY_MINUTES = "PT30M"
HALF_HOUR = "PT0.5H"
HOUR = "PT1H"
SIX_HOURS = "PT6H"
DAY = "P1D"
WEEK = "P1W"
WEEK_STARTING_SUNDAY = "1969-12-28T00:00:00Z/P1W"
WEEK_STARTING_MONDAY = "1969-12-29T00:00:00Z/P1W"
WEEK_ENDING_SATURDAY = "P1W/1970-01-03T00:00:00Z"
WEEK_ENDING_SUNDAY = "P1W/1970-01-04T00:00:00Z"
MONTH = "P1M"
QUARTER = "P3M"
QUARTER_YEAR = "P0.25Y"
YEAR = "P1Y"


class PandasAxis(int, Enum):
ROW = 0
COLUMN = 1
Expand Down
17 changes: 9 additions & 8 deletions superset/db_engine_specs/ascend.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
from sqlalchemy.dialects import registry

from superset.constants import TimeGrain
from superset.db_engine_specs.impala import ImpalaEngineSpec


Expand All @@ -29,12 +30,12 @@ class AscendEngineSpec(ImpalaEngineSpec):

_time_grain_expressions = {
None: "{col}",
"PT1S": "DATE_TRUNC('second', {col})",
"PT1M": "DATE_TRUNC('minute', {col})",
"PT1H": "DATE_TRUNC('hour', {col})",
"P1D": "DATE_TRUNC('day', {col})",
"P1W": "DATE_TRUNC('week', {col})",
"P1M": "DATE_TRUNC('month', {col})",
"P3M": "DATE_TRUNC('quarter', {col})",
"P1Y": "DATE_TRUNC('year', {col})",
TimeGrain.SECOND: "DATE_TRUNC('second', {col})",
TimeGrain.MINUTE: "DATE_TRUNC('minute', {col})",
TimeGrain.HOUR: "DATE_TRUNC('hour', {col})",
TimeGrain.DAY: "DATE_TRUNC('day', {col})",
TimeGrain.WEEK: "DATE_TRUNC('week', {col})",
TimeGrain.MONTH: "DATE_TRUNC('month', {col})",
TimeGrain.QUARTER: "DATE_TRUNC('quarter', {col})",
TimeGrain.YEAR: "DATE_TRUNC('year', {col})",
}
21 changes: 11 additions & 10 deletions superset/db_engine_specs/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from flask_babel import gettext as __
from sqlalchemy import types

from superset.constants import TimeGrain
from superset.db_engine_specs.base import BaseEngineSpec
from superset.errors import SupersetErrorType

Expand All @@ -38,17 +39,17 @@ class AthenaEngineSpec(BaseEngineSpec):

_time_grain_expressions = {
None: "{col}",
"PT1S": "date_trunc('second', CAST({col} AS TIMESTAMP))",
"PT1M": "date_trunc('minute', CAST({col} AS TIMESTAMP))",
"PT1H": "date_trunc('hour', CAST({col} AS TIMESTAMP))",
"P1D": "date_trunc('day', CAST({col} AS TIMESTAMP))",
"P1W": "date_trunc('week', CAST({col} AS TIMESTAMP))",
"P1M": "date_trunc('month', CAST({col} AS TIMESTAMP))",
"P3M": "date_trunc('quarter', CAST({col} AS TIMESTAMP))",
"P1Y": "date_trunc('year', CAST({col} AS TIMESTAMP))",
"P1W/1970-01-03T00:00:00Z": "date_add('day', 5, date_trunc('week', \
TimeGrain.SECOND: "date_trunc('second', CAST({col} AS TIMESTAMP))",
TimeGrain.MINUTE: "date_trunc('minute', CAST({col} AS TIMESTAMP))",
TimeGrain.HOUR: "date_trunc('hour', CAST({col} AS TIMESTAMP))",
TimeGrain.DAY: "date_trunc('day', CAST({col} AS TIMESTAMP))",
TimeGrain.WEEK: "date_trunc('week', CAST({col} AS TIMESTAMP))",
TimeGrain.MONTH: "date_trunc('month', CAST({col} AS TIMESTAMP))",
TimeGrain.QUARTER: "date_trunc('quarter', CAST({col} AS TIMESTAMP))",
TimeGrain.YEAR: "date_trunc('year', CAST({col} AS TIMESTAMP))",
TimeGrain.WEEK_ENDING_SATURDAY: "date_add('day', 5, date_trunc('week', \
date_add('day', 1, CAST({col} AS TIMESTAMP))))",
"1969-12-28T00:00:00Z/P1W": "date_add('day', -1, date_trunc('week', \
TimeGrain.WEEK_STARTING_SUNDAY: "date_add('day', -1, date_trunc('week', \
date_add('day', 1, CAST({col} AS TIMESTAMP))))",
}

Expand Down
Loading

0 comments on commit e5b7f7c

Please sign in to comment.