Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cancel db query on stop #15403

Merged
merged 17 commits into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions superset-frontend/src/SqlLab/components/SqlEditor.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ class SqlEditor extends React.PureComponent {
WINDOW_RESIZE_THROTTLE_MS,
);

this.onBeforeUnload = this.onBeforeUnload.bind(this);
this.renderDropdown = this.renderDropdown.bind(this);
}

Expand All @@ -212,6 +213,7 @@ class SqlEditor extends React.PureComponent {
this.setState({ height: this.getSqlEditorHeight() });

window.addEventListener('resize', this.handleWindowResize);
window.addEventListener('beforeunload', this.onBeforeUnload);

// setup hotkeys
const hotkeys = this.getHotkeyConfig();
Expand All @@ -222,6 +224,7 @@ class SqlEditor extends React.PureComponent {

componentWillUnmount() {
window.removeEventListener('resize', this.handleWindowResize);
window.removeEventListener('beforeunload', this.onBeforeUnload);
}

onResizeStart() {
Expand All @@ -242,6 +245,16 @@ class SqlEditor extends React.PureComponent {
}
}

onBeforeUnload(event) {
if (
this.props.database?.extra_json?.cancel_query_on_windows_unload &&
this.props.latestQuery?.state === 'running'
) {
event.preventDefault();
this.stopQuery();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more question about this: can we make sure that we stop only queries when the DB is in sync mode? if the DB is in async mode the expectation I don't think we should ever cancel the query by navigating away from the browser.

Copy link
Contributor Author

@koszti koszti Jul 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make it at least configurable? I’m happy to go for the extra mile if needed and add a new extra param to the db config if no objections. We really need to cancel all db queries both in sync and async modes.

This is because we’d like to go with async mode to utilise the celery workers but we still expect a lot of accidental cross joins, missing WHERE clauses and unwanted long running queries. In this case people will just close the browser and that’s making other people impossible running new queries. DBs like snowflake start queuing queries very quickly and unwanted long running queries easily could block using the entire db for long hours. Or DBAs have to kill unwanted queries manually.

Furthermore if it works only in sync mode then the stop button is basically doing nothing in async mode with non hive/presto databases.

Let me know your thoughts 🙇

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, if there's a legit use case let's make it configurable. I like the idea of having a per-DB option, this way for Hive we could have it off (since queries can take hours), and for Postgres (say) we could have it on.

@yousoph, any thoughts here?

Furthermore if it works only in sync mode then the stop button is basically doing nothing in async mode with non hive/presto databases.

Oh, sorry if I wasn't clear, I meant stopping a query by navigating away. If we press the stop button we should still cancel async queries, regardless of the configuration flag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now configurable at Database -> Performance -> Cancel query on window unload event and defaults to False to keep the current behaviour. Commit: e5223ec

}
}

onSqlChanged(sql) {
this.setState({ sql });
this.setQueryEditorSqlWithDebounce(sql);
Expand Down
5 changes: 4 additions & 1 deletion superset-frontend/src/SqlLab/reducers/sqlLab.js
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,10 @@ export default function sqlLabReducer(state = {}, action) {
[actions.SET_DATABASES]() {
const databases = {};
action.databases.forEach(db => {
databases[db.id] = db;
databases[db.id] = {
...db,
extra_json: JSON.parse(db.extra || ''),
};
});
return { ...state, databases };
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,24 @@ const ExtraOptions = ({
/>
</div>
</StyledInputContainer>
<StyledInputContainer css={{ no_margin_bottom }}>
<div className="input-container">
<IndeterminateCheckbox
id="cancel_query_on_windows_unload"
indeterminate={false}
checked={!!db?.extra_json?.cancel_query_on_windows_unload}
onChange={onExtraInputChange}
labelText={t('Cancel query on window unload event')}
/>
<InfoTooltip
tooltip={t(
'Terminate running queries when browser window closed or navigated ' +
'to another page. Available for Presto, Hive, MySQL, Postgres and ' +
'Snowflake databases.',
)}
/>
</div>
</StyledInputContainer>
</Collapse.Panel>
<Collapse.Panel
header={
Expand Down
1 change: 1 addition & 0 deletions superset-frontend/src/views/CRUD/data/database/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ export type DatabaseObject = {
}; // No field, holds schema and table timeout
allows_virtual_table_explore?: boolean; // in SQL Lab
schemas_allowed_for_csv_upload?: [] | string; // in Security
cancel_query_on_windows_unload?: boolean; // in Performance
version?: string;

// todo: ask beto where this should live
Expand Down
1 change: 1 addition & 0 deletions superset/databases/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ class DatabaseRestApi(BaseSupersetModelRestApi):
"database_name",
"explore_database_id",
"expose_in_sqllab",
"extra",
"force_ctas_schema",
"id",
]
Expand Down
25 changes: 25 additions & 0 deletions superset/db_engine_specs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1304,6 +1304,31 @@ def get_column_spec(
)
return None

@classmethod
def get_cancel_query_id(cls, cursor: Any, query: Query) -> Optional[str]:
"""
Select identifiers from the database engine that uniquely identifies the
queries to cancel. The identifier is typically a session id, process id
or similar.

:param cursor: Cursor instance in which the query will be executed
:param query: Query instance
:return: Query identifier
"""
return None

@classmethod
def cancel_query(cls, cursor: Any, query: Query, cancel_query_id: str) -> bool:
"""
Cancel query in the underlying database.

:param cursor: New cursor instance to the db of the query
:param query: Query instance
:param cancel_query_id: Value returned by get_cancel_query_payload or set in
other life-cycle methods of the query
:return: True if query cancelled successfully, False otherwise
"""
betodealmeida marked this conversation as resolved.
Show resolved Hide resolved


# schema for adding a database by providing parameters instead of the
# full SQLAlchemy URI
Expand Down
32 changes: 32 additions & 0 deletions superset/db_engine_specs/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

from superset.db_engine_specs.base import BaseEngineSpec, BasicParametersMixin
from superset.errors import SupersetErrorType
from superset.models.sql_lab import Query
from superset.utils import core as utils
from superset.utils.core import ColumnSpec, GenericDataType

Expand Down Expand Up @@ -220,3 +221,34 @@ def get_column_spec( # type: ignore
return super().get_column_spec(
native_type, column_type_mappings=column_type_mappings
)

@classmethod
def get_cancel_query_id(cls, cursor: Any, query: Query) -> Optional[str]:
"""
Get MySQL connection ID that will be used to cancel all other running
queries in the same connection.

:param cursor: Cursor instance in which the query will be executed
:param query: Query instance
:return: MySQL Connection ID
"""
cursor.execute("SELECT CONNECTION_ID()")
row = cursor.fetchone()
return row[0]

@classmethod
def cancel_query(cls, cursor: Any, query: Query, cancel_query_id: str) -> bool:
"""
Cancel query in the underlying database.

:param cursor: New cursor instance to the db of the query
:param query: Query instance
:param cancel_query_id: MySQL Connection ID
:return: True if query cancelled successfully, False otherwise
"""
try:
cursor.execute(f"KILL CONNECTION {cancel_query_id}")
except Exception: # pylint: disable=broad-except
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catching broad exceptions (per the Pylint message) is undesirable. This should most likely be sqlalchemy.exc.DBAPIError.

return False

return True
36 changes: 36 additions & 0 deletions superset/db_engine_specs/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from superset.db_engine_specs.base import BaseEngineSpec, BasicParametersMixin
from superset.errors import SupersetErrorType
from superset.exceptions import SupersetException
from superset.models.sql_lab import Query
from superset.utils import core as utils
from superset.utils.core import ColumnSpec, GenericDataType

Expand Down Expand Up @@ -296,3 +297,38 @@ def get_column_spec( # type: ignore
return super().get_column_spec(
native_type, column_type_mappings=column_type_mappings
)

@classmethod
def get_cancel_query_id(cls, cursor: Any, query: Query) -> Optional[str]:
"""
Get Postgres PID that will be used to cancel all other running
queries in the same session.

:param cursor: Cursor instance in which the query will be executed
:param query: Query instance
:return: Postgres PID
"""
cursor.execute("SELECT pg_backend_pid()")
row = cursor.fetchone()
return row[0]

@classmethod
def cancel_query(cls, cursor: Any, query: Query, cancel_query_id: str) -> bool:
"""
Cancel query in the underlying database.

:param cursor: New cursor instance to the db of the query
:param query: Query instance
:param cancel_query_id: Postgres PID
:return: True if query cancelled successfully, False otherwise
"""
try:
cursor.execute(
"SELECT pg_terminate_backend(pid) "
"FROM pg_stat_activity "
f"WHERE pid='{cancel_query_id}'"
)
except Exception: # pylint: disable=broad-except
return False

return True
32 changes: 32 additions & 0 deletions superset/db_engine_specs/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from superset.db_engine_specs.postgres import PostgresBaseEngineSpec
from superset.errors import SupersetErrorType
from superset.models.sql_lab import Query
from superset.utils import core as utils

if TYPE_CHECKING:
Expand Down Expand Up @@ -128,3 +129,34 @@ def mutate_db_for_connection_test(database: "Database") -> None:
engine_params["connect_args"] = connect_args
extra["engine_params"] = engine_params
database.extra = json.dumps(extra)

@classmethod
def get_cancel_query_id(cls, cursor: Any, query: Query) -> Optional[str]:
"""
Get Snowflake session ID that will be used to cancel all other running
queries in the same session.

:param cursor: Cursor instance in which the query will be executed
:param query: Query instance
:return: Snowflake Session ID
"""
cursor.execute("SELECT CURRENT_SESSION()")
Copy link
Member

@john-bodley john-bodley Jul 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@koszti does this work? The reason I ask is per the documentation, CURRENT_SESSION

Returns a unique system identifier for the Snowflake session corresponding to the present connection

yet the connection associated with said cursor doesn't not the same connection which instantiated the query (in all likelihood for async queries that connection resides on a Celery host). Thus I would suspect there are actually no queries running within the current session.

The same is true for the other engines.

Copy link
Contributor Author

@koszti koszti Jul 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query that you're referring to is running when the query initiated and not when the stop button pressed.
It's using the same cursor that is created in the first place in execute_sql_statements here. The returned session id then saved into the query table in the superset backend database.

When the stop button pressed, we get the saved session id from the query table and kill the query by another SQL. In this way queries can be killed from every celery worker.

It's tested on Postgres, MySQL and Snowflake engines with sync and async query mode.

Copy link
Member

@john-bodley john-bodley Jul 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies. I misread the code logic. I originally thought the new cursor which is used to stop the query was being used to obtain the session identifier.

row = cursor.fetchone()
return row[0]

@classmethod
def cancel_query(cls, cursor: Any, query: Query, cancel_query_id: str) -> bool:
"""
Cancel query in the underlying database.

:param cursor: New cursor instance to the db of the query
:param query: Query instance
:param cancel_query_id: Snowflake Session ID
:return: True if query cancelled successfully, False otherwise
"""
try:
cursor.execute(f"SELECT SYSTEM$CANCEL_ALL_QUERIES({cancel_query_id})")
except Exception: # pylint: disable=broad-except
return False

return True
4 changes: 4 additions & 0 deletions superset/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,7 @@ def __init__(self, error: ValidationError):
extra={"messages": error.messages},
)
super().__init__(error)


class SupersetCancelQueryException(SupersetException):
pass
51 changes: 48 additions & 3 deletions superset/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def dummy_sql_query_mutator(
SQL_QUERY_MUTATOR = config.get("SQL_QUERY_MUTATOR") or dummy_sql_query_mutator
log_query = config["QUERY_LOGGER"]
logger = logging.getLogger(__name__)
cancel_query_key = "cancel_query"


class SqlLabException(Exception):
Expand All @@ -83,6 +84,10 @@ class SqlLabSecurityException(SqlLabException):
pass


class SqlLabQueryStoppedException(SqlLabException):
pass


def handle_query_error(
ex: Exception,
query: Query,
Expand Down Expand Up @@ -187,7 +192,7 @@ def get_sql_results( # pylint: disable=too-many-arguments
return handle_query_error(ex, query, session)


# pylint: disable=too-many-arguments, too-many-locals
# pylint: disable=too-many-arguments, too-many-locals, too-many-statements
def execute_sql_statement(
sql_statement: str,
query: Query,
Expand Down Expand Up @@ -288,6 +293,12 @@ def execute_sql_statement(
)
)
except Exception as ex:
# query is stopped in another thread/worker
# stopping raises expected exceptions which we should skip
session.refresh(query)
if query.status == QueryStatus.STOPPED:
raise SqlLabQueryStoppedException()

logger.error("Query %d: %s", query.id, type(ex), exc_info=True)
logger.debug("Query %d: %s", query.id, ex)
raise SqlLabException(db_engine_spec.extract_error_message(ex))
Expand Down Expand Up @@ -438,12 +449,17 @@ def execute_sql_statements( # pylint: disable=too-many-arguments, too-many-loca
with closing(engine.raw_connection()) as conn:
# closing the connection closes the cursor as well
cursor = conn.cursor()
cancel_query_id = db_engine_spec.get_cancel_query_id(cursor, query)
if cancel_query_id is not None:
query.set_extra_json_key(cancel_query_key, cancel_query_id)
session.commit()
statement_count = len(statements)
for i, statement in enumerate(statements):
# Check if stopped
query = get_query(query_id, session)
session.refresh(query)
if query.status == QueryStatus.STOPPED:
return None
payload.update({"status": query.status})
return payload

# For CTAS we create the table only on the last statement
apply_ctas = query.select_as_cta and (
Expand All @@ -466,6 +482,9 @@ def execute_sql_statements( # pylint: disable=too-many-arguments, too-many-loca
log_params,
apply_ctas,
)
except SqlLabQueryStoppedException:
payload.update({"status": QueryStatus.STOPPED})
return payload
except Exception as ex: # pylint: disable=broad-except
msg = str(ex)
prefix_message = (
Expand Down Expand Up @@ -562,3 +581,29 @@ def execute_sql_statements( # pylint: disable=too-many-arguments, too-many-loca
return payload

return None


def cancel_query(query: Query, user_name: Optional[str] = None) -> bool:
"""
Cancel a running query.

:param query: Query to cancel
:param user_name: Default username
:return: True if query cancelled successfully, False otherwise
"""
cancel_query_id = query.extra.get(cancel_query_key, None)
if cancel_query_id is None:
return False

database = query.database
engine = database.get_sqla_engine(
schema=query.schema,
nullpool=True,
user_name=user_name,
source=QuerySource.SQL_LAB,
)
db_engine_spec = database.db_engine_spec

with closing(engine.raw_connection()) as conn:
with closing(conn.cursor()) as cursor:
return db_engine_spec.cancel_query(cursor, query, cancel_query_id)
5 changes: 5 additions & 0 deletions superset/views/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
CertificateException,
DatabaseNotFound,
SerializationError,
SupersetCancelQueryException,
SupersetErrorException,
SupersetErrorsException,
SupersetException,
Expand Down Expand Up @@ -2335,6 +2336,10 @@ def stop_query(self) -> FlaskResponse:
str(client_id),
)
return self.json_response("OK")

if not sql_lab.cancel_query(query, g.user.username if g.user else None):
raise SupersetCancelQueryException("Could not cancel query")

query.status = QueryStatus.STOPPED
db.session.commit()

Expand Down
1 change: 1 addition & 0 deletions tests/integration_tests/databases/api_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def test_get_items(self):
"database_name",
"explore_database_id",
"expose_in_sqllab",
"extra",
"force_ctas_schema",
"id",
]
Expand Down
Loading