Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cancel db query on stop #15403

Merged
merged 17 commits into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions superset-frontend/src/SqlLab/components/SqlEditor.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ class SqlEditor extends React.PureComponent {
WINDOW_RESIZE_THROTTLE_MS,
);

this.onBeforeUnload = this.onBeforeUnload.bind(this);
this.renderDropdown = this.renderDropdown.bind(this);
}

Expand All @@ -212,6 +213,7 @@ class SqlEditor extends React.PureComponent {
this.setState({ height: this.getSqlEditorHeight() });

window.addEventListener('resize', this.handleWindowResize);
window.addEventListener('beforeunload', this.onBeforeUnload);

// setup hotkeys
const hotkeys = this.getHotkeyConfig();
Expand All @@ -222,6 +224,7 @@ class SqlEditor extends React.PureComponent {

componentWillUnmount() {
window.removeEventListener('resize', this.handleWindowResize);
window.removeEventListener('beforeunload', this.onBeforeUnload);
}

onResizeStart() {
Expand All @@ -242,6 +245,13 @@ class SqlEditor extends React.PureComponent {
}
}

onBeforeUnload(event) {
if (this.props.latestQuery?.state === 'running') {
event.preventDefault();
this.stopQuery();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more question about this: can we make sure that we stop only queries when the DB is in sync mode? if the DB is in async mode the expectation I don't think we should ever cancel the query by navigating away from the browser.

Copy link
Contributor Author

@koszti koszti Jul 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make it at least configurable? I’m happy to go for the extra mile if needed and add a new extra param to the db config if no objections. We really need to cancel all db queries both in sync and async modes.

This is because we’d like to go with async mode to utilise the celery workers but we still expect a lot of accidental cross joins, missing WHERE clauses and unwanted long running queries. In this case people will just close the browser and that’s making other people impossible running new queries. DBs like snowflake start queuing queries very quickly and unwanted long running queries easily could block using the entire db for long hours. Or DBAs have to kill unwanted queries manually.

Furthermore if it works only in sync mode then the stop button is basically doing nothing in async mode with non hive/presto databases.

Let me know your thoughts 🙇

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, if there's a legit use case let's make it configurable. I like the idea of having a per-DB option, this way for Hive we could have it off (since queries can take hours), and for Postgres (say) we could have it on.

@yousoph, any thoughts here?

Furthermore if it works only in sync mode then the stop button is basically doing nothing in async mode with non hive/presto databases.

Oh, sorry if I wasn't clear, I meant stopping a query by navigating away. If we press the stop button we should still cancel async queries, regardless of the configuration flag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now configurable at Database -> Performance -> Cancel query on window unload event and defaults to False to keep the current behaviour. Commit: e5223ec

}
}

onSqlChanged(sql) {
this.setState({ sql });
this.setQueryEditorSqlWithDebounce(sql);
Expand Down
24 changes: 24 additions & 0 deletions superset/db_engine_specs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1304,6 +1304,30 @@ def get_column_spec(
)
return None

@classmethod
def get_cancel_query_id(cls, cursor: Any, query: Query) -> Optional[str]:
"""
Select identifiers from the database engine that uniquely identifies the
queries to cancel. The identifier is typically a session id, process id
or similar.

:param cursor: Cursor instance in which the query will be executed
:param query: Query instance
:return: Query identifier
"""
return None

@classmethod
def cancel_query(cls, cursor: Any, query: Query, cancel_query_id: str) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to return a boolean indicating if the query was canceled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed by 517c792

"""
Cancel query in the underlying database.

:param cursor: New cursor instance to the db of the query
:param query: Query instance
:param cancel_query_id: Value returned by get_cancel_query_payload or set in
other life-cycle methods of the query
"""
betodealmeida marked this conversation as resolved.
Show resolved Hide resolved


# schema for adding a database by providing parameters instead of the
# full SQLAlchemy URI
Expand Down
26 changes: 26 additions & 0 deletions superset/db_engine_specs/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

from superset.db_engine_specs.base import BaseEngineSpec, BasicParametersMixin
from superset.errors import SupersetErrorType
from superset.models.sql_lab import Query
from superset.utils import core as utils
from superset.utils.core import ColumnSpec, GenericDataType

Expand Down Expand Up @@ -220,3 +221,28 @@ def get_column_spec( # type: ignore
return super().get_column_spec(
native_type, column_type_mappings=column_type_mappings
)

@classmethod
def get_cancel_query_id(cls, cursor: Any, query: Query) -> Optional[str]:
"""
Get MySQL connection ID that will be used to cancel all other running
queries in the same connection.

:param cursor: Cursor instance in which the query will be executed
:param query: Query instance
:return: MySQL Connection ID
"""
cursor.execute("SELECT CONNECTION_ID()")
row = cursor.fetchone()
return row[0]

@classmethod
def cancel_query(cls, cursor: Any, query: Query, cancel_query_id: str) -> None:
"""
Cancel query in the underlying database.

:param cursor: New cursor instance to the db of the query
:param query: Query instance
:param cancel_query_id: MySQL Connection ID
"""
cursor.execute(f"KILL CONNECTION {cancel_query_id}")
30 changes: 30 additions & 0 deletions superset/db_engine_specs/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from superset.db_engine_specs.base import BaseEngineSpec, BasicParametersMixin
from superset.errors import SupersetErrorType
from superset.exceptions import SupersetException
from superset.models.sql_lab import Query
from superset.utils import core as utils
from superset.utils.core import ColumnSpec, GenericDataType

Expand Down Expand Up @@ -296,3 +297,32 @@ def get_column_spec( # type: ignore
return super().get_column_spec(
native_type, column_type_mappings=column_type_mappings
)

@classmethod
def get_cancel_query_id(cls, cursor: Any, query: Query) -> Optional[str]:
"""
Get Postgres PID that will be used to cancel all other running
queries in the same session.

:param cursor: Cursor instance in which the query will be executed
:param query: Query instance
:return: Postgres PID
"""
cursor.execute("SELECT pg_backend_pid()")
row = cursor.fetchone()
return row[0]

@classmethod
def cancel_query(cls, cursor: Any, query: Query, cancel_query_id: str) -> None:
"""
Cancel query in the underlying database.

:param cursor: New cursor instance to the db of the query
:param query: Query instance
:param cancel_query_id: Postgres PID
"""
cursor.execute(
"SELECT pg_terminate_backend(pid) "
"FROM pg_stat_activity "
f"WHERE pid='{cancel_query_id}'"
)
26 changes: 26 additions & 0 deletions superset/db_engine_specs/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from superset.db_engine_specs.postgres import PostgresBaseEngineSpec
from superset.errors import SupersetErrorType
from superset.models.sql_lab import Query
from superset.utils import core as utils

if TYPE_CHECKING:
Expand Down Expand Up @@ -128,3 +129,28 @@ def mutate_db_for_connection_test(database: "Database") -> None:
engine_params["connect_args"] = connect_args
extra["engine_params"] = engine_params
database.extra = json.dumps(extra)

@classmethod
def get_cancel_query_id(cls, cursor: Any, query: Query) -> Optional[str]:
"""
Get Snowflake session ID that will be used to cancel all other running
queries in the same session.

:param cursor: Cursor instance in which the query will be executed
:param query: Query instance
:return: Snowflake Session ID
"""
cursor.execute("SELECT CURRENT_SESSION()")
Copy link
Member

@john-bodley john-bodley Jul 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@koszti does this work? The reason I ask is per the documentation, CURRENT_SESSION

Returns a unique system identifier for the Snowflake session corresponding to the present connection

yet the connection associated with said cursor doesn't not the same connection which instantiated the query (in all likelihood for async queries that connection resides on a Celery host). Thus I would suspect there are actually no queries running within the current session.

The same is true for the other engines.

Copy link
Contributor Author

@koszti koszti Jul 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query that you're referring to is running when the query initiated and not when the stop button pressed.
It's using the same cursor that is created in the first place in execute_sql_statements here. The returned session id then saved into the query table in the superset backend database.

When the stop button pressed, we get the saved session id from the query table and kill the query by another SQL. In this way queries can be killed from every celery worker.

It's tested on Postgres, MySQL and Snowflake engines with sync and async query mode.

Copy link
Member

@john-bodley john-bodley Jul 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies. I misread the code logic. I originally thought the new cursor which is used to stop the query was being used to obtain the session identifier.

row = cursor.fetchone()
return row[0]

@classmethod
def cancel_query(cls, cursor: Any, query: Query, cancel_query_id: str) -> None:
"""
Cancel query in the underlying database.

:param cursor: New cursor instance to the db of the query
:param query: Query instance
:param cancel_query_id: Snowflake Session ID
"""
cursor.execute(f"SELECT SYSTEM$CANCEL_ALL_QUERIES({cancel_query_id})")
51 changes: 48 additions & 3 deletions superset/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def dummy_sql_query_mutator(
SQL_QUERY_MUTATOR = config.get("SQL_QUERY_MUTATOR") or dummy_sql_query_mutator
log_query = config["QUERY_LOGGER"]
logger = logging.getLogger(__name__)
cancel_query_key = "cancel_query"


class SqlLabException(Exception):
Expand All @@ -83,6 +84,10 @@ class SqlLabSecurityException(SqlLabException):
pass


class SqlLabQueryStoppedException(SqlLabException):
pass


def handle_query_error(
ex: Exception,
query: Query,
Expand Down Expand Up @@ -187,7 +192,7 @@ def get_sql_results( # pylint: disable=too-many-arguments
return handle_query_error(ex, query, session)


# pylint: disable=too-many-arguments, too-many-locals
# pylint: disable=too-many-arguments, too-many-locals, too-many-statements
def execute_sql_statement(
sql_statement: str,
query: Query,
Expand Down Expand Up @@ -288,6 +293,12 @@ def execute_sql_statement(
)
)
except Exception as ex:
# query is stopped in another thread/worker
# stopping rases expected exceptions which we should skip
koszti marked this conversation as resolved.
Show resolved Hide resolved
session.refresh(query)
if query.status == QueryStatus.STOPPED:
raise SqlLabQueryStoppedException()

logger.error("Query %d: %s", query.id, type(ex), exc_info=True)
logger.debug("Query %d: %s", query.id, ex)
raise SqlLabException(db_engine_spec.extract_error_message(ex))
Expand Down Expand Up @@ -438,12 +449,17 @@ def execute_sql_statements( # pylint: disable=too-many-arguments, too-many-loca
with closing(engine.raw_connection()) as conn:
# closing the connection closes the cursor as well
cursor = conn.cursor()
cancel_query_id = db_engine_spec.get_cancel_query_id(cursor, query)
if cancel_query_id is not None:
query.set_extra_json_key(cancel_query_key, cancel_query_id)
session.commit()
statement_count = len(statements)
for i, statement in enumerate(statements):
# Check if stopped
query = get_query(query_id, session)
session.refresh(query)
if query.status == QueryStatus.STOPPED:
return None
payload.update({"status": query.status})
return payload

# For CTAS we create the table only on the last statement
apply_ctas = query.select_as_cta and (
Expand All @@ -466,6 +482,9 @@ def execute_sql_statements( # pylint: disable=too-many-arguments, too-many-loca
log_params,
apply_ctas,
)
except SqlLabQueryStoppedException:
payload.update({"status": QueryStatus.STOPPED})
return payload
except Exception as ex: # pylint: disable=broad-except
msg = str(ex)
prefix_message = (
Expand Down Expand Up @@ -562,3 +581,29 @@ def execute_sql_statements( # pylint: disable=too-many-arguments, too-many-loca
return payload

return None


def cancel_query(query: Query, user_name: Optional[str] = None) -> None:
"""
Cancel a running query.

:param query: Query to cancel
:param user_name: Default username
:return: None
"""
cancel_query_id = query.extra.get(cancel_query_key, None)
if cancel_query_id is None:
return

database = query.database
engine = database.get_sqla_engine(
schema=query.schema,
nullpool=True,
user_name=user_name,
source=QuerySource.SQL_LAB,
)
db_engine_spec = database.db_engine_spec

with closing(engine.raw_connection()) as conn:
with closing(conn.cursor()) as cursor:
db_engine_spec.cancel_query(cursor, query, cancel_query_id)
1 change: 1 addition & 0 deletions superset/views/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2337,6 +2337,7 @@ def stop_query(self) -> FlaskResponse:
return self.json_response("OK")
query.status = QueryStatus.STOPPED
db.session.commit()
sql_lab.cancel_query(query, g.user.username if g.user else None)

return self.json_response("OK")

Expand Down
14 changes: 14 additions & 0 deletions tests/integration_tests/db_engine_specs/mysql_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from superset.db_engine_specs.mysql import MySQLEngineSpec
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.models.sql_lab import Query
from superset.utils.core import GenericDataType
from tests.integration_tests.db_engine_specs.base_tests import (
assert_generic_types,
Expand Down Expand Up @@ -238,3 +239,16 @@ def test_extract_errors(self):
},
)
]

@unittest.mock.patch("sqlalchemy.engine.Engine.connect")
def test_get_cancel_query_id(self, engine_mock):
query = Query()
cursor_mock = engine_mock.return_value.__enter__.return_value
cursor_mock.fetchone.return_value = [123]
assert MySQLEngineSpec.get_cancel_query_id(cursor_mock, query) == 123

@unittest.mock.patch("sqlalchemy.engine.Engine.connect")
def test_cancel_query(self, engine_mock):
query = Query()
cursor_mock = engine_mock.return_value.__enter__.return_value
assert MySQLEngineSpec.cancel_query(cursor_mock, query, 123) is None
14 changes: 14 additions & 0 deletions tests/integration_tests/db_engine_specs/postgres_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from superset.db_engine_specs import get_engine_specs
from superset.db_engine_specs.postgres import PostgresEngineSpec
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.models.sql_lab import Query
from superset.utils.core import GenericDataType
from tests.integration_tests.db_engine_specs.base_tests import (
assert_generic_types,
Expand Down Expand Up @@ -443,6 +444,19 @@ def test_extract_errors(self):
)
]

@mock.patch("sqlalchemy.engine.Engine.connect")
def test_get_cancel_query_id(self, engine_mock):
query = Query()
cursor_mock = engine_mock.return_value.__enter__.return_value
cursor_mock.fetchone.return_value = [123]
assert PostgresEngineSpec.get_cancel_query_id(cursor_mock, query) == 123

@mock.patch("sqlalchemy.engine.Engine.connect")
def test_cancel_query(self, engine_mock):
query = Query()
cursor_mock = engine_mock.return_value.__enter__.return_value
assert PostgresEngineSpec.cancel_query(cursor_mock, query, 123) is None


def test_base_parameters_mixin():
parameters = {
Expand Down
15 changes: 15 additions & 0 deletions tests/integration_tests/db_engine_specs/snowflake_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
# specific language governing permissions and limitations
# under the License.
import json
from unittest import mock

from sqlalchemy import column

from superset.db_engine_specs.snowflake import SnowflakeEngineSpec
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.models.core import Database
from superset.models.sql_lab import Query
from tests.integration_tests.db_engine_specs.base_tests import TestDbEngineSpec


Expand Down Expand Up @@ -99,3 +101,16 @@ def test_extract_errors(self):
},
)
]

@mock.patch("sqlalchemy.engine.Engine.connect")
def test_get_cancel_query_id(self, engine_mock):
query = Query()
cursor_mock = engine_mock.return_value.__enter__.return_value
cursor_mock.fetchone.return_value = [123]
assert SnowflakeEngineSpec.get_cancel_query_id(cursor_mock, query) == 123

@mock.patch("sqlalchemy.engine.Engine.connect")
def test_cancel_query(self, engine_mock):
query = Query()
cursor_mock = engine_mock.return_value.__enter__.return_value
assert SnowflakeEngineSpec.cancel_query(cursor_mock, query, 123) is None