Skip to content

Commit

Permalink
feat: improve event logging for queries + refactor (apache#27943)
Browse files Browse the repository at this point in the history
Co-authored-by: Beto Dealmeida <[email protected]>
  • Loading branch information
2 people authored and jzhao62 committed May 16, 2024
1 parent 5968254 commit 4baaab9
Show file tree
Hide file tree
Showing 14 changed files with 170 additions and 133 deletions.
17 changes: 14 additions & 3 deletions docs/docs/contributing/testing-locally.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,20 @@ To run a single test file:
npm run test -- path/to/file.js
```

### Integration Testing
### e2e Integration Testing

We use [Cypress](https://www.cypress.io/) for integration tests. Tests can be run by `tox -e cypress`. To open Cypress and explore tests first setup and run test server:
We use [Cypress](https://www.cypress.io/) for end-to-end integration
tests. One easy option to get started quickly is to leverage `tox` to
run the whole suite in an isolated environment.

```bash
tox -e cypress
```

Alternatively, you can go lower level and set things up in your
development environment by following these steps:

First set up a python/flask backend:

```bash
export SUPERSET_CONFIG=tests.integration_tests.superset_test_config
Expand All @@ -98,7 +109,7 @@ superset load-examples --load-test-data
superset run --port 8081
```

Run Cypress tests:
In another terminal, prepare the frontend and run Cypress tests:

```bash
cd superset-frontend
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ usedevelop = true
allowlist_externals =
npm
pkill
{toxinidir}/superset-frontend/cypress_build.sh
[testenv:cypress]
setenv =
Expand Down
3 changes: 3 additions & 0 deletions superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@

# Realtime stats logger, a StatsD implementation exists
STATS_LOGGER = DummyStatsLogger()

# By default will log events to the metadata database with `DBEventLogger`
# Note that you can use `StdOutEventLogger` for debugging
EVENT_LOGGER = DBEventLogger()

SUPERSET_LOG_VIEW = True
Expand Down
14 changes: 0 additions & 14 deletions superset/connectors/sqla/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1398,20 +1398,6 @@ def get_fetch_values_predicate(
)
) from ex

def mutate_query_from_config(self, sql: str) -> str:
"""Apply config's SQL_QUERY_MUTATOR
Typically adds comments to the query with context"""
sql_query_mutator = config["SQL_QUERY_MUTATOR"]
mutate_after_split = config["MUTATE_AFTER_SPLIT"]
if sql_query_mutator and not mutate_after_split:
sql = sql_query_mutator(
sql,
security_manager=security_manager,
database=self.database,
)
return sql

def get_template_processor(self, **kwargs: Any) -> BaseTemplateProcessor:
return get_template_processor(table=self, database=self.database, **kwargs)

Expand Down
12 changes: 2 additions & 10 deletions superset/db_engine_specs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
from sqlalchemy.types import TypeEngine
from sqlparse.tokens import CTE

from superset import security_manager, sql_parse
from superset import sql_parse
from superset.constants import TimeGrain as TimeGrainConstants
from superset.databases.utils import make_url_safe
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
Expand Down Expand Up @@ -1682,16 +1682,8 @@ def process_statement(cls, statement: str, database: Database) -> str:
"""
parsed_query = ParsedQuery(statement, engine=cls.engine)
sql = parsed_query.stripped()
sql_query_mutator = current_app.config["SQL_QUERY_MUTATOR"]
mutate_after_split = current_app.config["MUTATE_AFTER_SPLIT"]
if sql_query_mutator and not mutate_after_split:
sql = sql_query_mutator(
sql,
security_manager=security_manager,
database=database,
)

return sql
return database.mutate_sql_based_on_config(sql, is_split=True)

@classmethod
def estimate_query_cost(
Expand Down
98 changes: 56 additions & 42 deletions superset/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
from superset.extensions import (
cache_manager,
encrypted_field_factory,
event_logger,
security_manager,
ssh_manager_factory,
)
Expand Down Expand Up @@ -564,6 +565,20 @@ def get_default_schema_for_query(self, query: Query) -> str | None:
"""
return self.db_engine_spec.get_default_schema_for_query(self, query)

@staticmethod
def post_process_df(df: pd.DataFrame) -> pd.DataFrame:
def column_needs_conversion(df_series: pd.Series) -> bool:
return (
not df_series.empty
and isinstance(df_series, pd.Series)
and isinstance(df_series[0], (list, dict))
)

for col, coltype in df.dtypes.to_dict().items():
if coltype == numpy.object_ and column_needs_conversion(df[col]):
df[col] = df[col].apply(utils.json_dumps_w_dates)
return df

@property
def quote_identifier(self) -> Callable[[str], str]:
"""Add quotes to potential identifier expressions if needed"""
Expand All @@ -572,7 +587,27 @@ def quote_identifier(self) -> Callable[[str], str]:
def get_reserved_words(self) -> set[str]:
return self.get_dialect().preparer.reserved_words

def get_df( # pylint: disable=too-many-locals
def mutate_sql_based_on_config(self, sql_: str, is_split: bool = False) -> str:
"""
Mutates the SQL query based on the app configuration.
Two config params here affect the behavior of the SQL query mutator:
- `SQL_QUERY_MUTATOR`: A user-provided function that mutates the SQL query.
- `MUTATE_AFTER_SPLIT`: If True, the SQL query mutator is only called after the
sql is broken down into smaller queries. If False, the SQL query mutator applies
on the group of queries as a whole. Here the called passes the context
as to whether the SQL is split or already.
"""
sql_mutator = config["SQL_QUERY_MUTATOR"]
if sql_mutator and (is_split == config["MUTATE_AFTER_SPLIT"]):
return sql_mutator(
sql_,
security_manager=security_manager,
database=self,
)
return sql_

def get_df(
self,
sql: str,
schema: str | None = None,
Expand All @@ -581,15 +616,6 @@ def get_df( # pylint: disable=too-many-locals
sqls = self.db_engine_spec.parse_sql(sql)
with self.get_sqla_engine(schema) as engine:
engine_url = engine.url
mutate_after_split = config["MUTATE_AFTER_SPLIT"]
sql_query_mutator = config["SQL_QUERY_MUTATOR"]

def needs_conversion(df_series: pd.Series) -> bool:
return (
not df_series.empty
and isinstance(df_series, pd.Series)
and isinstance(df_series[0], (list, dict))
)

def _log_query(sql: str) -> None:
if log_query:
Expand All @@ -603,42 +629,30 @@ def _log_query(sql: str) -> None:

with self.get_raw_connection(schema=schema) as conn:
cursor = conn.cursor()
for sql_ in sqls[:-1]:
if mutate_after_split:
sql_ = sql_query_mutator(
sql_,
security_manager=security_manager,
database=None,
)
df = None
for i, sql_ in enumerate(sqls):
sql_ = self.mutate_sql_based_on_config(sql_, is_split=True)
_log_query(sql_)
self.db_engine_spec.execute(cursor, sql_, self)
cursor.fetchall()

if mutate_after_split:
last_sql = sql_query_mutator(
sqls[-1],
security_manager=security_manager,
database=None,
)
_log_query(last_sql)
self.db_engine_spec.execute(cursor, last_sql, self)
else:
_log_query(sqls[-1])
self.db_engine_spec.execute(cursor, sqls[-1], self)

data = self.db_engine_spec.fetch_data(cursor)
result_set = SupersetResultSet(
data, cursor.description, self.db_engine_spec
)
df = result_set.to_pandas_df()
with event_logger.log_context(
action="execute_sql",
database=self,
object_ref=__name__,
):
self.db_engine_spec.execute(cursor, sql_, self)
if i < len(sqls) - 1:
# If it's not the last, we don't keep the results
cursor.fetchall()
else:
# Last query, fetch and process the results
data = self.db_engine_spec.fetch_data(cursor)
result_set = SupersetResultSet(
data, cursor.description, self.db_engine_spec
)
df = result_set.to_pandas_df()
if mutator:
df = mutator(df)

for col, coltype in df.dtypes.to_dict().items():
if coltype == numpy.object_ and needs_conversion(df[col]):
df[col] = df[col].apply(utils.json_dumps_w_dates)

return df
return self.post_process_df(df)

def compile_sqla_query(self, qry: Select, schema: str | None = None) -> str:
with self.get_sqla_engine(schema) as engine:
Expand Down
16 changes: 2 additions & 14 deletions superset/models/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,18 +880,6 @@ def make_sqla_column_compatible(
sqla_col.key = label_expected
return sqla_col

def mutate_query_from_config(self, sql: str) -> str:
"""Apply config's SQL_QUERY_MUTATOR
Typically adds comments to the query with context"""
if sql_query_mutator := config["SQL_QUERY_MUTATOR"]:
sql = sql_query_mutator(
sql,
security_manager=security_manager,
database=self.database,
)
return sql

@staticmethod
def _apply_cte(sql: str, cte: Optional[str]) -> str:
"""
Expand Down Expand Up @@ -919,7 +907,7 @@ def get_query_str_extended(
logger.warning("Unable to parse SQL to format it, passing it as-is")

if mutate:
sql = self.mutate_query_from_config(sql)
sql = self.database.mutate_sql_based_on_config(sql)
return QueryStringExtended(
applied_template_filters=sqlaq.applied_template_filters,
applied_filter_columns=sqlaq.applied_filter_columns,
Expand Down Expand Up @@ -1393,7 +1381,7 @@ def values_for_column(
with self.database.get_sqla_engine() as engine:
sql = qry.compile(engine, compile_kwargs={"literal_binds": True})
sql = self._apply_cte(sql, cte)
sql = self.mutate_query_from_config(sql)
sql = self.database.mutate_sql_based_on_config(sql)

df = pd.read_sql_query(sql=sql, con=engine)
# replace NaN with None to ensure it can be serialized to JSON
Expand Down
44 changes: 22 additions & 22 deletions superset/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
SupersetErrorException,
SupersetErrorsException,
)
from superset.extensions import celery_app
from superset.extensions import celery_app, event_logger
from superset.models.core import Database
from superset.models.sql_lab import Query
from superset.result_set import SupersetResultSet
Expand All @@ -73,7 +73,6 @@
SQLLAB_HARD_TIMEOUT = SQLLAB_TIMEOUT + 60
SQL_MAX_ROW = config["SQL_MAX_ROW"]
SQLLAB_CTAS_NO_LIMIT = config["SQLLAB_CTAS_NO_LIMIT"]
SQL_QUERY_MUTATOR = config["SQL_QUERY_MUTATOR"]
log_query = config["QUERY_LOGGER"]
logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -264,11 +263,7 @@ def execute_sql_statement( # pylint: disable=too-many-statements
sql = apply_limit_if_exists(database, increased_limit, query, sql)

# Hook to allow environment-specific mutation (usually comments) to the SQL
sql = SQL_QUERY_MUTATOR(
sql,
security_manager=security_manager,
database=database,
)
sql = database.mutate_sql_based_on_config(sql)
try:
query.executed_sql = sql
if log_query:
Expand All @@ -281,21 +276,26 @@ def execute_sql_statement( # pylint: disable=too-many-statements
log_params,
)
db.session.commit()
with stats_timing("sqllab.query.time_executing_query", stats_logger):
db_engine_spec.execute_with_cursor(cursor, sql, query)

with stats_timing("sqllab.query.time_fetching_results", stats_logger):
logger.debug(
"Query %d: Fetching data for query object: %s",
query.id,
str(query.to_dict()),
)
data = db_engine_spec.fetch_data(cursor, increased_limit)
if query.limit is None or len(data) <= query.limit:
query.limiting_factor = LimitingFactor.NOT_LIMITED
else:
# return 1 row less than increased_query
data = data[:-1]
with event_logger.log_context(
action="execute_sql",
database=database,
object_ref=__name__,
):
with stats_timing("sqllab.query.time_executing_query", stats_logger):
db_engine_spec.execute_with_cursor(cursor, sql, query)

with stats_timing("sqllab.query.time_fetching_results", stats_logger):
logger.debug(
"Query %d: Fetching data for query object: %s",
query.id,
str(query.to_dict()),
)
data = db_engine_spec.fetch_data(cursor, increased_limit)
if query.limit is None or len(data) <= query.limit:
query.limiting_factor = LimitingFactor.NOT_LIMITED
else:
# return 1 row less than increased_query
data = data[:-1]
except SoftTimeLimitExceeded as ex:
query.status = QueryStatus.TIMED_OUT

Expand Down
9 changes: 2 additions & 7 deletions superset/sql_validators/presto_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from contextlib import closing
from typing import Any, Optional

from superset import app, security_manager
from superset import app
from superset.models.core import Database
from superset.sql_parse import ParsedQuery
from superset.sql_validators.base import BaseSQLValidator, SQLValidationAnnotation
Expand Down Expand Up @@ -54,12 +54,7 @@ def validate_statement(
sql = parsed_query.stripped()

# Hook to allow environment-specific mutation (usually comments) to the SQL
if sql_query_mutator := config["SQL_QUERY_MUTATOR"]:
sql = sql_query_mutator(
sql,
security_manager=security_manager,
database=database,
)
sql = database.mutate_sql_based_on_config(sql)

# Transform the final statement to an explain call before sending it on
# to presto to validate
Expand Down
7 changes: 7 additions & 0 deletions superset/utils/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1907,3 +1907,10 @@ def remove_extra_adhoc_filters(form_data: dict[str, Any]) -> None:
form_data[key] = [
filter_ for filter_ in value or [] if not filter_.get("isExtra")
]


def to_int(v: Any, value_if_invalid: int = 0) -> int:
try:
return int(v)
except (ValueError, TypeError):
return value_if_invalid
Loading

0 comments on commit 4baaab9

Please sign in to comment.