Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve event logging for queries + refactor #27943

Merged
merged 11 commits into from
Apr 22, 2024
17 changes: 14 additions & 3 deletions docs/docs/contributing/testing-locally.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,20 @@ To run a single test file:
npm run test -- path/to/file.js
```

### Integration Testing
### e2e Integration Testing

We use [Cypress](https://www.cypress.io/) for integration tests. Tests can be run by `tox -e cypress`. To open Cypress and explore tests first setup and run test server:
We use [Cypress](https://www.cypress.io/) for end-to-end integration
tests. One easy option to get started quickly is to leverage `tox` to
run the whole suite in an isolated environment.

```bash
tox -e cypress
```

Alternatively, you can go lower level and set things up in your
development environment by following these steps:

First set up a python/flask backend:

```bash
export SUPERSET_CONFIG=tests.integration_tests.superset_test_config
Expand All @@ -98,7 +109,7 @@ superset load-examples --load-test-data
superset run --port 8081
```

Run Cypress tests:
In another terminal, prepare the frontend and run Cypress tests:

```bash
cd superset-frontend
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ usedevelop = true
allowlist_externals =
npm
pkill
{toxinidir}/superset-frontend/cypress_build.sh

[testenv:cypress]
setenv =
Expand Down
3 changes: 3 additions & 0 deletions superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@

# Realtime stats logger, a StatsD implementation exists
STATS_LOGGER = DummyStatsLogger()

# By default will log events to the metadata database with `DBEventLogger`
# Note that you can use `StdOutEventLogger` for debugging
EVENT_LOGGER = DBEventLogger()

SUPERSET_LOG_VIEW = True
Expand Down
14 changes: 0 additions & 14 deletions superset/connectors/sqla/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1398,20 +1398,6 @@ def get_fetch_values_predicate(
)
) from ex

def mutate_query_from_config(self, sql: str) -> str:
"""Apply config's SQL_QUERY_MUTATOR

Typically adds comments to the query with context"""
sql_query_mutator = config["SQL_QUERY_MUTATOR"]
mutate_after_split = config["MUTATE_AFTER_SPLIT"]
if sql_query_mutator and not mutate_after_split:
sql = sql_query_mutator(
sql,
security_manager=security_manager,
database=self.database,
)
return sql

def get_template_processor(self, **kwargs: Any) -> BaseTemplateProcessor:
return get_template_processor(table=self, database=self.database, **kwargs)

Expand Down
12 changes: 2 additions & 10 deletions superset/db_engine_specs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
from sqlalchemy.types import TypeEngine
from sqlparse.tokens import CTE

from superset import security_manager, sql_parse
from superset import sql_parse
from superset.constants import TimeGrain as TimeGrainConstants
from superset.databases.utils import make_url_safe
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
Expand Down Expand Up @@ -1669,16 +1669,8 @@ def process_statement(cls, statement: str, database: Database) -> str:
"""
parsed_query = ParsedQuery(statement, engine=cls.engine)
sql = parsed_query.stripped()
sql_query_mutator = current_app.config["SQL_QUERY_MUTATOR"]
mutate_after_split = current_app.config["MUTATE_AFTER_SPLIT"]
if sql_query_mutator and not mutate_after_split:
sql = sql_query_mutator(
sql,
security_manager=security_manager,
database=database,
)

return sql
return database.mutate_sql_based_on_config(sql, is_split=True)

@classmethod
def estimate_query_cost(
Expand Down
98 changes: 56 additions & 42 deletions superset/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
from superset.extensions import (
cache_manager,
encrypted_field_factory,
event_logger,
security_manager,
ssh_manager_factory,
)
Expand Down Expand Up @@ -564,6 +565,20 @@ def get_default_schema_for_query(self, query: Query) -> str | None:
"""
return self.db_engine_spec.get_default_schema_for_query(self, query)

@staticmethod
def post_process_df(df: pd.DataFrame) -> pd.DataFrame:
def column_needs_conversion(df_series: pd.Series) -> bool:
return (
not df_series.empty
and isinstance(df_series, pd.Series)
and isinstance(df_series[0], (list, dict))
)

for col, coltype in df.dtypes.to_dict().items():
if coltype == numpy.object_ and column_needs_conversion(df[col]):
df[col] = df[col].apply(utils.json_dumps_w_dates)
return df

@property
def quote_identifier(self) -> Callable[[str], str]:
"""Add quotes to potential identifier expressions if needed"""
Expand All @@ -572,7 +587,27 @@ def quote_identifier(self) -> Callable[[str], str]:
def get_reserved_words(self) -> set[str]:
return self.get_dialect().preparer.reserved_words

def get_df( # pylint: disable=too-many-locals
def mutate_sql_based_on_config(self, sql_: str, is_split: bool = False) -> str:
"""
Mutates the SQL query based on the app configuration.

Two config params here affect the behavior of the SQL query mutator:
- `SQL_QUERY_MUTATOR`: A user-provided function that mutates the SQL query.
- `MUTATE_AFTER_SPLIT`: If True, the SQL query mutator is only called after the
sql is broken down into smaller queries. If False, the SQL query mutator applies
on the group of queries as a whole. Here the called passes the context
as to whether the SQL is split or already.
"""
sql_mutator = config["SQL_QUERY_MUTATOR"]
if sql_mutator and (is_split == config["MUTATE_AFTER_SPLIT"]):
return sql_mutator(
sql_,
security_manager=security_manager,
database=self,
)
return sql_

def get_df(
self,
sql: str,
schema: str | None = None,
Expand All @@ -581,15 +616,6 @@ def get_df( # pylint: disable=too-many-locals
sqls = self.db_engine_spec.parse_sql(sql)
with self.get_sqla_engine(schema) as engine:
engine_url = engine.url
mutate_after_split = config["MUTATE_AFTER_SPLIT"]
sql_query_mutator = config["SQL_QUERY_MUTATOR"]

def needs_conversion(df_series: pd.Series) -> bool:
return (
not df_series.empty
and isinstance(df_series, pd.Series)
and isinstance(df_series[0], (list, dict))
)

def _log_query(sql: str) -> None:
if log_query:
Expand All @@ -603,42 +629,30 @@ def _log_query(sql: str) -> None:

with self.get_raw_connection(schema=schema) as conn:
cursor = conn.cursor()
for sql_ in sqls[:-1]:
if mutate_after_split:
sql_ = sql_query_mutator(
sql_,
security_manager=security_manager,
database=None,
)
df = None
for i, sql_ in enumerate(sqls):
sql_ = self.mutate_sql_based_on_config(sql_, is_split=True)
_log_query(sql_)
self.db_engine_spec.execute(cursor, sql_, self)
cursor.fetchall()

if mutate_after_split:
last_sql = sql_query_mutator(
sqls[-1],
security_manager=security_manager,
database=None,
)
_log_query(last_sql)
self.db_engine_spec.execute(cursor, last_sql, self)
else:
_log_query(sqls[-1])
self.db_engine_spec.execute(cursor, sqls[-1], self)

data = self.db_engine_spec.fetch_data(cursor)
result_set = SupersetResultSet(
data, cursor.description, self.db_engine_spec
)
df = result_set.to_pandas_df()
with event_logger.log_context(
mistercrunch marked this conversation as resolved.
Show resolved Hide resolved
action="execute_sql",
database=self,
object_ref=__name__,
):
self.db_engine_spec.execute(cursor, sql_, self)
if i < len(sqls) - 1:
# If it's not the last, we don't keep the results
cursor.fetchall()
mistercrunch marked this conversation as resolved.
Show resolved Hide resolved
else:
# Last query, fetch and process the results
data = self.db_engine_spec.fetch_data(cursor)
result_set = SupersetResultSet(
data, cursor.description, self.db_engine_spec
)
df = result_set.to_pandas_df()
if mutator:
df = mutator(df)

for col, coltype in df.dtypes.to_dict().items():
if coltype == numpy.object_ and needs_conversion(df[col]):
df[col] = df[col].apply(utils.json_dumps_w_dates)

return df
return self.post_process_df(df)

def compile_sqla_query(self, qry: Select, schema: str | None = None) -> str:
with self.get_sqla_engine(schema) as engine:
Expand Down
16 changes: 2 additions & 14 deletions superset/models/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,18 +880,6 @@ def make_sqla_column_compatible(
sqla_col.key = label_expected
return sqla_col

def mutate_query_from_config(self, sql: str) -> str:
"""Apply config's SQL_QUERY_MUTATOR

Typically adds comments to the query with context"""
if sql_query_mutator := config["SQL_QUERY_MUTATOR"]:
sql = sql_query_mutator(
sql,
security_manager=security_manager,
database=self.database,
)
return sql

@staticmethod
def _apply_cte(sql: str, cte: Optional[str]) -> str:
"""
Expand Down Expand Up @@ -919,7 +907,7 @@ def get_query_str_extended(
logger.warning("Unable to parse SQL to format it, passing it as-is")

if mutate:
sql = self.mutate_query_from_config(sql)
sql = self.database.mutate_sql_based_on_config(sql)
return QueryStringExtended(
applied_template_filters=sqlaq.applied_template_filters,
applied_filter_columns=sqlaq.applied_filter_columns,
Expand Down Expand Up @@ -1393,7 +1381,7 @@ def values_for_column(
with self.database.get_sqla_engine() as engine:
sql = qry.compile(engine, compile_kwargs={"literal_binds": True})
sql = self._apply_cte(sql, cte)
sql = self.mutate_query_from_config(sql)
sql = self.database.mutate_sql_based_on_config(sql)

df = pd.read_sql_query(sql=sql, con=engine)
# replace NaN with None to ensure it can be serialized to JSON
Expand Down
44 changes: 22 additions & 22 deletions superset/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
SupersetErrorException,
SupersetErrorsException,
)
from superset.extensions import celery_app
from superset.extensions import celery_app, event_logger
from superset.models.core import Database
from superset.models.sql_lab import Query
from superset.result_set import SupersetResultSet
Expand All @@ -73,7 +73,6 @@
SQLLAB_HARD_TIMEOUT = SQLLAB_TIMEOUT + 60
SQL_MAX_ROW = config["SQL_MAX_ROW"]
SQLLAB_CTAS_NO_LIMIT = config["SQLLAB_CTAS_NO_LIMIT"]
SQL_QUERY_MUTATOR = config["SQL_QUERY_MUTATOR"]
mistercrunch marked this conversation as resolved.
Show resolved Hide resolved
log_query = config["QUERY_LOGGER"]
logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -264,11 +263,7 @@ def execute_sql_statement( # pylint: disable=too-many-statements
sql = apply_limit_if_exists(database, increased_limit, query, sql)

# Hook to allow environment-specific mutation (usually comments) to the SQL
sql = SQL_QUERY_MUTATOR(
sql,
security_manager=security_manager,
database=database,
)
sql = database.mutate_sql_based_on_config(sql)
mistercrunch marked this conversation as resolved.
Show resolved Hide resolved
try:
query.executed_sql = sql
if log_query:
Expand All @@ -281,21 +276,26 @@ def execute_sql_statement( # pylint: disable=too-many-statements
log_params,
)
db.session.commit()
with stats_timing("sqllab.query.time_executing_query", stats_logger):
db_engine_spec.execute_with_cursor(cursor, sql, query)

with stats_timing("sqllab.query.time_fetching_results", stats_logger):
logger.debug(
"Query %d: Fetching data for query object: %s",
query.id,
str(query.to_dict()),
)
data = db_engine_spec.fetch_data(cursor, increased_limit)
if query.limit is None or len(data) <= query.limit:
query.limiting_factor = LimitingFactor.NOT_LIMITED
else:
# return 1 row less than increased_query
data = data[:-1]
with event_logger.log_context(
mistercrunch marked this conversation as resolved.
Show resolved Hide resolved
action="execute_sql",
database=database,
object_ref=__name__,
):
with stats_timing("sqllab.query.time_executing_query", stats_logger):
db_engine_spec.execute_with_cursor(cursor, sql, query)

with stats_timing("sqllab.query.time_fetching_results", stats_logger):
logger.debug(
"Query %d: Fetching data for query object: %s",
query.id,
str(query.to_dict()),
)
data = db_engine_spec.fetch_data(cursor, increased_limit)
if query.limit is None or len(data) <= query.limit:
query.limiting_factor = LimitingFactor.NOT_LIMITED
else:
# return 1 row less than increased_query
data = data[:-1]
except SoftTimeLimitExceeded as ex:
query.status = QueryStatus.TIMED_OUT

Expand Down
9 changes: 2 additions & 7 deletions superset/sql_validators/presto_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from contextlib import closing
from typing import Any, Optional

from superset import app, security_manager
from superset import app
from superset.models.core import Database
from superset.sql_parse import ParsedQuery
from superset.sql_validators.base import BaseSQLValidator, SQLValidationAnnotation
Expand Down Expand Up @@ -54,12 +54,7 @@ def validate_statement(
sql = parsed_query.stripped()

# Hook to allow environment-specific mutation (usually comments) to the SQL
if sql_query_mutator := config["SQL_QUERY_MUTATOR"]:
sql = sql_query_mutator(
sql,
security_manager=security_manager,
database=database,
)
sql = database.mutate_sql_based_on_config(sql)

# Transform the final statement to an explain call before sending it on
# to presto to validate
Expand Down
7 changes: 7 additions & 0 deletions superset/utils/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1907,3 +1907,10 @@ def remove_extra_adhoc_filters(form_data: dict[str, Any]) -> None:
form_data[key] = [
filter_ for filter_ in value or [] if not filter_.get("isExtra")
]


def to_int(v: Any, value_if_invalid: int = 0) -> int:
try:
return int(v)
except (ValueError, TypeError):
return value_if_invalid
Loading
Loading