Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cancel db query on stop #15403

Merged
merged 17 commits into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions superset-frontend/src/SqlLab/components/SqlEditor.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ class SqlEditor extends React.PureComponent {
WINDOW_RESIZE_THROTTLE_MS,
);

this.onBeforeUnload = this.onBeforeUnload.bind(this);
this.renderDropdown = this.renderDropdown.bind(this);
}

Expand All @@ -212,6 +213,7 @@ class SqlEditor extends React.PureComponent {
this.setState({ height: this.getSqlEditorHeight() });

window.addEventListener('resize', this.handleWindowResize);
window.addEventListener('beforeunload', this.onBeforeUnload.bind(this));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same thing.. don't think you need the bind.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks fixed by efb86e5

can you please approve the CI workflow so github actions will run again? 🙇‍♂️


// setup hotkeys
const hotkeys = this.getHotkeyConfig();
Expand All @@ -222,6 +224,7 @@ class SqlEditor extends React.PureComponent {

componentWillUnmount() {
window.removeEventListener('resize', this.handleWindowResize);
window.removeEventListener('beforeunload', this.onBeforeUnload.bind(this));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't think you need the bind here, since this has already been bound when the method was declared.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, fixed by efb86e5

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, true. And also I think it actually won't work with .bind since that returns a new function.

}

onResizeStart() {
Expand All @@ -242,6 +245,11 @@ class SqlEditor extends React.PureComponent {
}
}

onBeforeUnload(event) {
event.preventDefault();
this.stopQuery();
}

onSqlChanged(sql) {
this.setState({ sql });
this.setQueryEditorSqlWithDebounce(sql);
Expand Down
22 changes: 22 additions & 0 deletions superset/db_engine_specs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1304,6 +1304,28 @@ def get_column_spec(
)
return None

@classmethod
def get_cancel_query_payload(cls, cursor: Any, query: Query) -> Any:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we improve the types here (e.g. better than Any)?

Copy link
Contributor Author

@koszti koszti Jul 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the cursor is db driver specific, defined in the driver and it's hard to set anything better than Any that remains compatible with every driver. The cursor is currently using Any in the entire codebase. But please correct me if I'm wrong or if you have better idea that doesn't require major refactoring of the codebase.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like now the return type here is typed, so this lgtm

"""
Returns None if query can not be cancelled.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you provide a more descriptive description of what this function does, especially for the non-base case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed by ef8436b

:param cursor: Cursor instance in which the query will be executed
:param query: Query instance
:return: Type of the payload can vary depends on databases
but must be jsonable. None if query can't be cancelled.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None if query can't be cancelled

could possibly be implied by better type hints.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed by ef8436b

"""
return None

@classmethod
def cancel_query(cls, cursor: Any, query: Query, payload: Any) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same type question here

"""
Cancels query in the underlying database.
The method is called only when payload is not None.
:param cursor: New cursor instance to the db of the query
:param query: Query instance
:param payload: Value returned by get_cancel_query_payload or set in
other life-cycle methods of the query
"""
betodealmeida marked this conversation as resolved.
Show resolved Hide resolved


# schema for adding a database by providing parameters instead of the
# full SQLAlchemy URI
Expand Down
11 changes: 11 additions & 0 deletions superset/db_engine_specs/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

from superset.db_engine_specs.base import BaseEngineSpec, BasicParametersMixin
from superset.errors import SupersetErrorType
from superset.models.sql_lab import Query
from superset.utils import core as utils
from superset.utils.core import ColumnSpec, GenericDataType

Expand Down Expand Up @@ -220,3 +221,13 @@ def get_column_spec( # type: ignore
return super().get_column_spec(
native_type, column_type_mappings=column_type_mappings
)

@classmethod
def get_cancel_query_payload(cls, cursor: Any, query: Query) -> Any:
cursor.execute("SELECT CONNECTION_ID()")
row = cursor.fetchone()
return row[0]

@classmethod
def cancel_query(cls, cursor: Any, query: Query, payload: Any) -> None:
Copy link
Member

@etr2460 etr2460 Jul 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same typing questions here

cursor.execute("KILL CONNECTION %d" % payload)
15 changes: 15 additions & 0 deletions superset/db_engine_specs/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from superset.db_engine_specs.base import BaseEngineSpec, BasicParametersMixin
from superset.errors import SupersetErrorType
from superset.exceptions import SupersetException
from superset.models.sql_lab import Query
from superset.utils import core as utils
from superset.utils.core import ColumnSpec, GenericDataType

Expand Down Expand Up @@ -296,3 +297,17 @@ def get_column_spec( # type: ignore
return super().get_column_spec(
native_type, column_type_mappings=column_type_mappings
)

@classmethod
def get_cancel_query_payload(cls, cursor: Any, query: Query) -> Any:
cursor.execute("SELECT current_user")
row = cursor.fetchone()
return row[0]

@classmethod
def cancel_query(cls, cursor: Any, query: Query, payload: Any) -> None:
cursor.execute(
"SELECT pg_terminate_backend(pid) "
"FROM pg_stat_activity "
"WHERE pid <> pg_backend_pid() and usename='%s'" % payload
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"WHERE pid <> pg_backend_pid() and usename='%s'" % payload
"WHERE pid <> pg_backend_pid() and username='%s'" % payload

Won't this kill all user queries in progress?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, same question here. ideally we can only cancel this query

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, fixed by 0f49471

)
11 changes: 11 additions & 0 deletions superset/db_engine_specs/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from superset.db_engine_specs.postgres import PostgresBaseEngineSpec
from superset.errors import SupersetErrorType
from superset.models.sql_lab import Query
from superset.utils import core as utils

if TYPE_CHECKING:
Expand Down Expand Up @@ -128,3 +129,13 @@ def mutate_db_for_connection_test(database: "Database") -> None:
engine_params["connect_args"] = connect_args
extra["engine_params"] = engine_params
database.extra = json.dumps(extra)

@classmethod
def get_cancel_query_payload(cls, cursor: Any, query: Query) -> Any:
cursor.execute("SELECT CURRENT_SESSION()")
Copy link
Member

@john-bodley john-bodley Jul 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@koszti does this work? The reason I ask is per the documentation, CURRENT_SESSION

Returns a unique system identifier for the Snowflake session corresponding to the present connection

yet the connection associated with said cursor doesn't not the same connection which instantiated the query (in all likelihood for async queries that connection resides on a Celery host). Thus I would suspect there are actually no queries running within the current session.

The same is true for the other engines.

Copy link
Contributor Author

@koszti koszti Jul 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query that you're referring to is running when the query initiated and not when the stop button pressed.
It's using the same cursor that is created in the first place in execute_sql_statements here. The returned session id then saved into the query table in the superset backend database.

When the stop button pressed, we get the saved session id from the query table and kill the query by another SQL. In this way queries can be killed from every celery worker.

It's tested on Postgres, MySQL and Snowflake engines with sync and async query mode.

Copy link
Member

@john-bodley john-bodley Jul 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies. I misread the code logic. I originally thought the new cursor which is used to stop the query was being used to obtain the session identifier.

row = cursor.fetchone()
return row[0]

@classmethod
def cancel_query(cls, cursor: Any, query: Query, payload: Any) -> None:
cursor.execute("SELECT SYSTEM$CANCEL_ALL_QUERIES(%s)" % payload)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use a Python 3 format string instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed by bfc8d60

46 changes: 43 additions & 3 deletions superset/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def dummy_sql_query_mutator(
SQL_QUERY_MUTATOR = config.get("SQL_QUERY_MUTATOR") or dummy_sql_query_mutator
log_query = config["QUERY_LOGGER"]
logger = logging.getLogger(__name__)
cancel_payload_key = "cancel_payload"


class SqlLabException(Exception):
Expand All @@ -83,6 +84,10 @@ class SqlLabSecurityException(SqlLabException):
pass


class SqlLabQueryStoppedException(SqlLabException):
pass


def handle_query_error(
ex: Exception,
query: Query,
Expand Down Expand Up @@ -187,7 +192,7 @@ def get_sql_results( # pylint: disable=too-many-arguments
return handle_query_error(ex, query, session)


# pylint: disable=too-many-arguments, too-many-locals
# pylint: disable=too-many-arguments, too-many-locals, too-many-statements
def execute_sql_statement(
sql_statement: str,
query: Query,
Expand Down Expand Up @@ -288,6 +293,12 @@ def execute_sql_statement(
)
)
except Exception as ex:
# query is stopped in another thread/worker
# stopping rases expected exceptions which we should skip
koszti marked this conversation as resolved.
Show resolved Hide resolved
session.refresh(query)
if query.status == QueryStatus.STOPPED:
raise SqlLabQueryStoppedException()

logger.error("Query %d: %s", query.id, type(ex), exc_info=True)
logger.debug("Query %d: %s", query.id, ex)
raise SqlLabException(db_engine_spec.extract_error_message(ex))
Expand Down Expand Up @@ -438,12 +449,18 @@ def execute_sql_statements( # pylint: disable=too-many-arguments, too-many-loca
with closing(engine.raw_connection()) as conn:
# closing the connection closes the cursor as well
cursor = conn.cursor()
cancel_query_payload = db_engine_spec.get_cancel_query_payload(cursor, query)
logger.info(cancel_query_payload)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe remove the logger?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed by ec39477

if cancel_query_payload is not None:
query.set_extra_json_key(cancel_payload_key, cancel_query_payload)
session.commit()
statement_count = len(statements)
for i, statement in enumerate(statements):
# Check if stopped
query = get_query(query_id, session)
session.refresh(query)
if query.status == QueryStatus.STOPPED:
return None
payload.update({"status": query.status})
return payload

# For CTAS we create the table only on the last statement
apply_ctas = query.select_as_cta and (
Expand All @@ -466,6 +483,9 @@ def execute_sql_statements( # pylint: disable=too-many-arguments, too-many-loca
log_params,
apply_ctas,
)
except SqlLabQueryStoppedException:
payload.update({"status": QueryStatus.STOPPED})
return payload
except Exception as ex: # pylint: disable=broad-except
msg = str(ex)
prefix_message = (
Expand Down Expand Up @@ -562,3 +582,23 @@ def execute_sql_statements( # pylint: disable=too-many-arguments, too-many-loca
return payload

return None


def cancel_query(query: Query, user_name: Optional[str] = None) -> None:
"""Cancal a running query."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spelling nit: cancel

also please update the docblock to match all the others with arguments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

improved by 4706b43

cancel_payload = query.extra.get(cancel_payload_key, None)
if cancel_payload is None:
return

database = query.database
engine = database.get_sqla_engine(
schema=query.schema,
nullpool=True,
user_name=user_name,
source=QuerySource.SQL_LAB,
)
db_engine_spec = database.db_engine_spec

with closing(engine.raw_connection()) as conn:
with closing(conn.cursor()) as cursor:
db_engine_spec.cancel_query(cursor, query, cancel_payload)
1 change: 1 addition & 0 deletions superset/views/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2337,6 +2337,7 @@ def stop_query(self) -> FlaskResponse:
return self.json_response("OK")
query.status = QueryStatus.STOPPED
db.session.commit()
sql_lab.cancel_query(query, g.user.username if g.user else None)

return self.json_response("OK")

Expand Down
14 changes: 14 additions & 0 deletions tests/integration_tests/db_engine_specs/mysql_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from superset.db_engine_specs.mysql import MySQLEngineSpec
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.models.sql_lab import Query
from superset.utils.core import GenericDataType
from tests.integration_tests.db_engine_specs.base_tests import (
assert_generic_types,
Expand Down Expand Up @@ -238,3 +239,16 @@ def test_extract_errors(self):
},
)
]

@unittest.mock.patch("sqlalchemy.engine.Engine.connect")
def test_get_cancel_query_payload(self, engine_mock):
query = Query()
cursor_mock = engine_mock.return_value.__enter__.return_value
cursor_mock.fetchone.return_value = [123]
assert MySQLEngineSpec.get_cancel_query_payload(cursor_mock, query) == 123

@unittest.mock.patch("sqlalchemy.engine.Engine.connect")
def test_cancel_query(self, engine_mock):
query = Query()
cursor_mock = engine_mock.return_value.__enter__.return_value
assert MySQLEngineSpec.cancel_query(cursor_mock, query, 123) is None
17 changes: 17 additions & 0 deletions tests/integration_tests/db_engine_specs/postgres_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from superset.db_engine_specs import get_engine_specs
from superset.db_engine_specs.postgres import PostgresEngineSpec
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.models.sql_lab import Query
from superset.utils.core import GenericDataType
from tests.integration_tests.db_engine_specs.base_tests import (
assert_generic_types,
Expand Down Expand Up @@ -443,6 +444,22 @@ def test_extract_errors(self):
)
]

@mock.patch("sqlalchemy.engine.Engine.connect")
def test_get_cancel_query_payload(self, engine_mock):
query = Query()
cursor_mock = engine_mock.return_value.__enter__.return_value
cursor_mock.fetchone.return_value = ["testuser"]
assert (
PostgresEngineSpec.get_cancel_query_payload(cursor_mock, query)
== "testuser"
)

@mock.patch("sqlalchemy.engine.Engine.connect")
def test_cancel_query(self, engine_mock):
query = Query()
cursor_mock = engine_mock.return_value.__enter__.return_value
assert PostgresEngineSpec.cancel_query(cursor_mock, query, "testuser") is None


def test_base_parameters_mixin():
parameters = {
Expand Down
15 changes: 15 additions & 0 deletions tests/integration_tests/db_engine_specs/snowflake_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
# specific language governing permissions and limitations
# under the License.
import json
from unittest import mock

from sqlalchemy import column

from superset.db_engine_specs.snowflake import SnowflakeEngineSpec
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.models.core import Database
from superset.models.sql_lab import Query
from tests.integration_tests.db_engine_specs.base_tests import TestDbEngineSpec


Expand Down Expand Up @@ -99,3 +101,16 @@ def test_extract_errors(self):
},
)
]

@mock.patch("sqlalchemy.engine.Engine.connect")
def test_get_cancel_query_payload(self, engine_mock):
query = Query()
cursor_mock = engine_mock.return_value.__enter__.return_value
cursor_mock.fetchone.return_value = [123]
assert SnowflakeEngineSpec.get_cancel_query_payload(cursor_mock, query) == 123

@mock.patch("sqlalchemy.engine.Engine.connect")
def test_cancel_query(self, engine_mock):
query = Query()
cursor_mock = engine_mock.return_value.__enter__.return_value
assert SnowflakeEngineSpec.cancel_query(cursor_mock, query, None) is None