Skip to content

Commit

Permalink
feat: temporarily disconnect metadata db during long analytics queries
Browse files Browse the repository at this point in the history
  • Loading branch information
mistercrunch committed Dec 9, 2024
1 parent 638f82b commit 43d37d2
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 39 deletions.
7 changes: 7 additions & 0 deletions superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,12 @@ def _try_json_readsha(filepath: str, length: int) -> str | None:
# `SQLALCHEMY_ENGINE_OPTIONS = {"isolation_level": "READ COMMITTED"}`
# Also note that we recommend READ COMMITTED for regular operation.
# Find out more here https://flask-sqlalchemy.palletsprojects.com/en/3.1.x/config/
# for example, if you don't want to use a connection pool, uncomment those next 2 lines
# from sqlalchemy.pool import NullPool
# SQLALCHEMY_ENGINE_OPTIONS = {"poolclass": NullPool}
SQLALCHEMY_ENGINE_OPTIONS = {}


# In order to hook up a custom password store for all SQLALCHEMY connections
# implement a function that takes a single argument of type 'sqla.engine.url',
# returns a password and set SQLALCHEMY_CUSTOM_PASSWORD_STORE.
Expand Down Expand Up @@ -563,6 +567,9 @@ class D3TimeFormat(TypedDict, total=False):
# If on, you'll want to add "https://avatars.slack-edge.com" to the list of allowed
# domains in your TALISMAN_CONFIG
"SLACK_ENABLE_AVATARS": False,
# This feature flag works only in combination with NullPool, and disconnects the metadata db
# connection temporarily during the execution of analytics queries, avoiding bottlenecks
"DISABLE_METADATA_DB_DURING_ANALYTICS": False,
}

# ------------------------------
Expand Down
82 changes: 61 additions & 21 deletions superset/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,45 @@
DB_CONNECTION_MUTATOR = config["DB_CONNECTION_MUTATOR"]


@contextmanager
def temporarily_disconnect_db(): # type: ignore
"""
Temporary disconnects the metadata database session.
This is meant to be used during long, blocking operations, so that we can release
the database connection for the duration of, for example, a potentially long running
query against an analytics database.
The goal here is to lower the number of concurrent connections to the metadata database,
given that Superset has no control over the duration of the analytics query.
NOTE: only has an effect if feature flag DISABLE_METADATA_DB_DURING_ANALYTICS
and using NullPool
"""
pool_type = db.engine.pool.__class__.__name__
# Currently only tested/available when used with NullPool
do_it = (
is_feature_enabled("DISABLE_METADATA_DB_DURING_ANALYTICS")
and pool_type == "NullPool"
)
conn = db.session.connection()
try:
if do_it:
logger.info("Disconnecting metadata database temporarily")
# Closing the session
db.session.close()
# Closing the connection
conn.close()
yield None
finally:
if do_it:
logger.info("Reconnecting to metadata database")
conn = db.session.connection()
# Creating a new scoped session
# NOTE: Interface changes in flask-sqlalchemy ~3.0
db.session = db.create_scoped_session()


class KeyValue(Model): # pylint: disable=too-few-public-methods
"""Used for any type of key-value store"""

Expand Down Expand Up @@ -691,27 +730,28 @@ def _log_query(sql: str) -> None:
with self.get_raw_connection(catalog=catalog, schema=schema) as conn:
cursor = conn.cursor()
df = None
for i, sql_ in enumerate(sqls):
sql_ = self.mutate_sql_based_on_config(sql_, is_split=True)
_log_query(sql_)
with event_logger.log_context(
action="execute_sql",
database=self,
object_ref=__name__,
):
self.db_engine_spec.execute(cursor, sql_, self)
if i < len(sqls) - 1:
# If it's not the last, we don't keep the results
cursor.fetchall()
else:
# Last query, fetch and process the results
data = self.db_engine_spec.fetch_data(cursor)
result_set = SupersetResultSet(
data, cursor.description, self.db_engine_spec
)
df = result_set.to_pandas_df()
if mutator:
df = mutator(df)
with temporarily_disconnect_db():
for i, sql_ in enumerate(sqls):
sql_ = self.mutate_sql_based_on_config(sql_, is_split=True)
_log_query(sql_)
with event_logger.log_context(
action="execute_sql",
database=self,
object_ref=__name__,
):
self.db_engine_spec.execute(cursor, sql_, self)
if i < len(sqls) - 1:
# If it's not the last, we don't keep the results
cursor.fetchall()
else:
# Last query, fetch and process the results
data = self.db_engine_spec.fetch_data(cursor)
result_set = SupersetResultSet(
data, cursor.description, self.db_engine_spec
)
df = result_set.to_pandas_df()
if mutator:
df = mutator(df)

return self.post_process_df(df)

Expand Down
35 changes: 18 additions & 17 deletions superset/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
SupersetParseError,
)
from superset.extensions import celery_app, event_logger
from superset.models.core import Database
from superset.models.core import Database, temporarily_disconnect_db
from superset.models.sql_lab import Query
from superset.result_set import SupersetResultSet
from superset.sql.parse import SQLStatement, Table
Expand Down Expand Up @@ -197,7 +197,7 @@ def get_sql_results( # pylint: disable=too-many-arguments
return handle_query_error(ex, query)


def execute_sql_statement( # pylint: disable=too-many-statements, too-many-locals
def execute_sql_statement( # pylint: disable=too-many-statements, too-many-locals, too-many-branches
sql_statement: str,
query: Query,
cursor: Any,
Expand Down Expand Up @@ -303,21 +303,22 @@ def execute_sql_statement( # pylint: disable=too-many-statements, too-many-loca
database=database,
object_ref=__name__,
):
with stats_timing("sqllab.query.time_executing_query", stats_logger):
db_engine_spec.execute_with_cursor(cursor, sql, query)

with stats_timing("sqllab.query.time_fetching_results", stats_logger):
logger.debug(
"Query %d: Fetching data for query object: %s",
query.id,
str(query.to_dict()),
)
data = db_engine_spec.fetch_data(cursor, increased_limit)
if query.limit is None or len(data) <= query.limit:
query.limiting_factor = LimitingFactor.NOT_LIMITED
else:
# return 1 row less than increased_query
data = data[:-1]
with temporarily_disconnect_db():
with stats_timing("sqllab.query.time_executing_query", stats_logger):
db_engine_spec.execute_with_cursor(cursor, sql, query)

with stats_timing("sqllab.query.time_fetching_results", stats_logger):
logger.debug(
"Query %d: Fetching data for query object: %s",
query.id,
str(query.to_dict()),
)
data = db_engine_spec.fetch_data(cursor, increased_limit)
if query.limit is None or len(data) <= query.limit:
query.limiting_factor = LimitingFactor.NOT_LIMITED
else:
# return 1 row less than increased_query
data = data[:-1]
except SoftTimeLimitExceeded as ex:
query.status = QueryStatus.TIMED_OUT

Expand Down
6 changes: 5 additions & 1 deletion superset/sqllab/sql_json_executer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
SupersetGenericDBErrorException,
SupersetTimeoutException,
)
from superset.models.core import temporarily_disconnect_db
from superset.sqllab.command_status import SqlJsonExecutionStatus
from superset.utils import core as utils
from superset.utils.core import get_username
Expand Down Expand Up @@ -127,7 +128,10 @@ def _get_sql_results_with_timeout(
seconds=self._timeout_duration_in_seconds,
error_message=self._get_timeout_error_msg(),
):
return self._get_sql_results(execution_context, rendered_query, log_params)
with temporarily_disconnect_db():
return self._get_sql_results(
execution_context, rendered_query, log_params
)

def _get_sql_results(
self,
Expand Down

0 comments on commit 43d37d2

Please sign in to comment.