From a335ff10c6b8f27ab4446ed178ae67c9c9db5a40 Mon Sep 17 00:00:00 2001 From: Thelin90 Date: Sun, 19 Jun 2022 19:17:03 +0100 Subject: [PATCH 1/3] fix(20428)-Address-Presto/Trino-Poll-Issue-Refacto r Update linter --- superset/db_engine_specs/presto.py | 4 -- superset/db_engine_specs/trino.py | 81 +++++++++++++++++++++++++++++- 2 files changed, 80 insertions(+), 5 deletions(-) diff --git a/superset/db_engine_specs/presto.py b/superset/db_engine_specs/presto.py index d0621e288ef41..cd6fa032b39ed 100644 --- a/superset/db_engine_specs/presto.py +++ b/superset/db_engine_specs/presto.py @@ -949,11 +949,7 @@ def get_create_view( sql = f"SHOW CREATE VIEW {schema}.{table}" try: cls.execute(cursor, sql) - polled = cursor.poll() - while polled: - time.sleep(0.2) - polled = cursor.poll() except DatabaseError: # not a VIEW return None rows = cls.fetch_data(cursor, 1) diff --git a/superset/db_engine_specs/trino.py b/superset/db_engine_specs/trino.py index 46e3ed55dec0c..e80b130f96641 100644 --- a/superset/db_engine_specs/trino.py +++ b/superset/db_engine_specs/trino.py @@ -15,15 +15,20 @@ # specific language governing permissions and limitations # under the License. import logging -from typing import Any, Dict, Optional, TYPE_CHECKING +import time +from typing import Any, Dict, List, Optional, TYPE_CHECKING import simplejson as json from flask import current_app +from sqlalchemy.engine.reflection import Inspector from sqlalchemy.engine.url import URL +from sqlalchemy.orm import Session +from superset.common.db_query_status import QueryStatus from superset.databases.utils import make_url_safe from superset.db_engine_specs.base import BaseEngineSpec from superset.db_engine_specs.presto import PrestoEngineSpec +from superset.models.sql_lab import Query from superset.utils import core as utils if TYPE_CHECKING: @@ -77,6 +82,80 @@ def modify_url_for_impersonation( def get_allow_cost_estimate(cls, extra: Dict[str, Any]) -> bool: return True + @classmethod + def get_table_names( + cls, + database: "Database", + inspector: Inspector, + schema: Optional[str], + ) -> List[str]: + return BaseEngineSpec.get_table_names( + database=database, + inspector=inspector, + schema=schema, + ) + + @classmethod + def get_view_names( + cls, + database: "Database", + inspector: Inspector, + schema: Optional[str], + ) -> List[str]: + return BaseEngineSpec.get_view_names( + database=database, + inspector=inspector, + schema=schema, + ) + + @classmethod + def handle_cursor(cls, cursor: Any, query: Query, session: Session) -> None: + """Updates progress information""" + query_id = query.id + sleep_interval = query.database.connect_args.get( + "sleep_interval", current_app.config["PRESTO_POLL_INTERVAL"] + ) + + logger.info("Query %i: Validating the cursor for progress", query_id) + + while True: + stats = cursor.stats + query = session.query(type(query)).filter_by(id=query_id).one() + + if query.status in [QueryStatus.STOPPED, QueryStatus.TIMED_OUT]: + cursor.cancel() + logger.info("Query %i: cancelled", query_id) + break + + if stats: + state = stats.get("state") + + if state == "FINISHED": + logger.info("Query %i: Finished", query_id) + break + + completed_splits = float(stats.get("completedSplits")) + total_splits = float(stats.get("totalSplits")) + + if total_splits and completed_splits: + progress = 100 * (completed_splits / total_splits) + + logger.info( + "Query %s progress: %s / %s", + query_id, + completed_splits, + total_splits, + ) + + if progress > query.progress: + query.progress = progress + + session.commit() + + time.sleep(sleep_interval) + + logger.info("Query %i: Validating the cursor for progress", query_id) + @staticmethod def get_extra_params(database: "Database") -> Dict[str, Any]: """ From 068f40960d3c14d6fc2aece5686f889aa7b209ba Mon Sep 17 00:00:00 2001 From: Thelin90 Date: Sun, 19 Jun 2022 21:08:08 +0100 Subject: [PATCH 2/3] Update to only use BaseEngineSpec handle_cursor --- superset/db_engine_specs/trino.py | 45 +------------------------------ 1 file changed, 1 insertion(+), 44 deletions(-) diff --git a/superset/db_engine_specs/trino.py b/superset/db_engine_specs/trino.py index e80b130f96641..d27eefdeedd52 100644 --- a/superset/db_engine_specs/trino.py +++ b/superset/db_engine_specs/trino.py @@ -111,50 +111,7 @@ def get_view_names( @classmethod def handle_cursor(cls, cursor: Any, query: Query, session: Session) -> None: """Updates progress information""" - query_id = query.id - sleep_interval = query.database.connect_args.get( - "sleep_interval", current_app.config["PRESTO_POLL_INTERVAL"] - ) - - logger.info("Query %i: Validating the cursor for progress", query_id) - - while True: - stats = cursor.stats - query = session.query(type(query)).filter_by(id=query_id).one() - - if query.status in [QueryStatus.STOPPED, QueryStatus.TIMED_OUT]: - cursor.cancel() - logger.info("Query %i: cancelled", query_id) - break - - if stats: - state = stats.get("state") - - if state == "FINISHED": - logger.info("Query %i: Finished", query_id) - break - - completed_splits = float(stats.get("completedSplits")) - total_splits = float(stats.get("totalSplits")) - - if total_splits and completed_splits: - progress = 100 * (completed_splits / total_splits) - - logger.info( - "Query %s progress: %s / %s", - query_id, - completed_splits, - total_splits, - ) - - if progress > query.progress: - query.progress = progress - - session.commit() - - time.sleep(sleep_interval) - - logger.info("Query %i: Validating the cursor for progress", query_id) + BaseEngineSpec.handle_cursor(cursor=cursor, query=query, session=session) @staticmethod def get_extra_params(database: "Database") -> Dict[str, Any]: From 210d611e90759e610d8899367b95722f941be2da Mon Sep 17 00:00:00 2001 From: John Bodley <4567245+john-bodley@users.noreply.github.com> Date: Sun, 19 Jun 2022 15:27:41 -0700 Subject: [PATCH 3/3] Fix CI --- superset/db_engine_specs/trino.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/superset/db_engine_specs/trino.py b/superset/db_engine_specs/trino.py index d27eefdeedd52..acddb97100266 100644 --- a/superset/db_engine_specs/trino.py +++ b/superset/db_engine_specs/trino.py @@ -15,7 +15,6 @@ # specific language governing permissions and limitations # under the License. import logging -import time from typing import Any, Dict, List, Optional, TYPE_CHECKING import simplejson as json @@ -24,7 +23,6 @@ from sqlalchemy.engine.url import URL from sqlalchemy.orm import Session -from superset.common.db_query_status import QueryStatus from superset.databases.utils import make_url_safe from superset.db_engine_specs.base import BaseEngineSpec from superset.db_engine_specs.presto import PrestoEngineSpec