-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: enable metadata sync for virtual tables #10645
Changes from all commits
142bb8f
e0822b2
eec7e59
b117c75
e513305
3dac92d
6587292
9948233
45d031a
c0b918b
5148ded
9cbf857
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
import json | ||
import logging | ||
from collections import defaultdict, OrderedDict | ||
from contextlib import closing | ||
from dataclasses import dataclass, field | ||
from datetime import datetime, timedelta | ||
from typing import Any, Dict, Hashable, List, NamedTuple, Optional, Tuple, Union | ||
|
@@ -44,18 +45,24 @@ | |
Table, | ||
Text, | ||
) | ||
from sqlalchemy.exc import CompileError, SQLAlchemyError | ||
from sqlalchemy.exc import CompileError | ||
from sqlalchemy.orm import backref, Query, relationship, RelationshipProperty, Session | ||
from sqlalchemy.orm.exc import NoResultFound | ||
from sqlalchemy.schema import UniqueConstraint | ||
from sqlalchemy.sql import column, ColumnElement, literal_column, table, text | ||
from sqlalchemy.sql.expression import Label, Select, TextAsFrom | ||
from sqlalchemy.types import TypeEngine | ||
|
||
from superset import app, db, is_feature_enabled, security_manager | ||
from superset.connectors.base.models import BaseColumn, BaseDatasource, BaseMetric | ||
from superset.constants import NULL_STRING | ||
from superset.db_engine_specs.base import TimestampExpression | ||
from superset.exceptions import DatabaseNotFound, QueryObjectValidationError | ||
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType | ||
from superset.exceptions import ( | ||
DatabaseNotFound, | ||
QueryObjectValidationError, | ||
SupersetSecurityException, | ||
) | ||
from superset.jinja_context import ( | ||
BaseTemplateProcessor, | ||
ExtraCache, | ||
|
@@ -64,6 +71,7 @@ | |
from superset.models.annotations import Annotation | ||
from superset.models.core import Database | ||
from superset.models.helpers import AuditMixinNullable, QueryResult | ||
from superset.result_set import SupersetResultSet | ||
from superset.sql_parse import ParsedQuery | ||
from superset.typing import Metric, QueryObjectDict | ||
from superset.utils import core as utils, import_datasource | ||
|
@@ -629,12 +637,52 @@ def sql_url(self) -> str: | |
return self.database.sql_url + "?table_name=" + str(self.table_name) | ||
|
||
def external_metadata(self) -> List[Dict[str, str]]: | ||
cols = self.database.get_columns(self.table_name, schema=self.schema) | ||
for col in cols: | ||
try: | ||
col["type"] = str(col["type"]) | ||
except CompileError: | ||
col["type"] = "UNKNOWN" | ||
db_engine_spec = self.database.db_engine_spec | ||
if self.sql: | ||
engine = self.database.get_sqla_engine(schema=self.schema) | ||
sql = self.get_template_processor().process_template(self.sql) | ||
parsed_query = ParsedQuery(sql) | ||
if not parsed_query.is_readonly(): | ||
raise SupersetSecurityException( | ||
SupersetError( | ||
error_type=SupersetErrorType.DATASOURCE_SECURITY_ACCESS_ERROR, | ||
message=_("Only `SELECT` statements are allowed"), | ||
level=ErrorLevel.ERROR, | ||
) | ||
) | ||
statements = parsed_query.get_statements() | ||
if len(statements) > 1: | ||
raise SupersetSecurityException( | ||
SupersetError( | ||
error_type=SupersetErrorType.DATASOURCE_SECURITY_ACCESS_ERROR, | ||
message=_("Only single queries supported"), | ||
level=ErrorLevel.ERROR, | ||
) | ||
) | ||
# TODO(villebro): refactor to use same code that's used by | ||
# sql_lab.py:execute_sql_statements | ||
with closing(engine.raw_connection()) as conn: | ||
with closing(conn.cursor()) as cursor: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd like to have a single code path that does this. Is there a way we can refactor/share code with the sqllab modules here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm also wondering if this should run on an async worker when possible, but that makes more complex here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this particular case the work is very much synchronous, but I agree that the single code path is desirable (this solution was a compromise for quick delivery as I feel There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mistercrunch I looked into joining these code paths and making it possible to make it async, but came to the conclusion that that refactoring is best done once we start working on the async query framework. I added a todo with my name next to it stating that the metadata fetching should be merged with the SQL Lab code, and will be happy to do that once we have the necessary structures in place. |
||
query = self.database.apply_limit_to_sql(statements[0]) | ||
db_engine_spec.execute(cursor, query) | ||
result = db_engine_spec.fetch_data(cursor, limit=1) | ||
result_set = SupersetResultSet( | ||
result, cursor.description, db_engine_spec | ||
) | ||
cols = result_set.columns | ||
else: | ||
db_dialect = self.database.get_dialect() | ||
cols = self.database.get_columns( | ||
self.table_name, schema=self.schema or None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had to add checking for empty strings here ( |
||
) | ||
for col in cols: | ||
try: | ||
if isinstance(col["type"], TypeEngine): | ||
col["type"] = db_engine_spec.column_datatype_to_string( | ||
col["type"], db_dialect | ||
) | ||
except CompileError: | ||
col["type"] = "UNKNOWN" | ||
return cols | ||
|
||
@property | ||
|
@@ -1295,61 +1343,42 @@ def fetch_metadata(self, commit: bool = True) -> MetadataResult: | |
:param commit: should the changes be committed or not. | ||
:return: Tuple with lists of added, removed and modified column names. | ||
""" | ||
try: | ||
new_table = self.get_sqla_table_object() | ||
except SQLAlchemyError: | ||
raise QueryObjectValidationError( | ||
_( | ||
"Table %(table)s doesn't seem to exist in the specified database, " | ||
"couldn't fetch column information", | ||
table=self.table_name, | ||
) | ||
) | ||
|
||
new_columns = self.external_metadata() | ||
metrics = [] | ||
any_date_col = None | ||
db_engine_spec = self.database.db_engine_spec | ||
db_dialect = self.database.get_dialect() | ||
old_columns = db.session.query(TableColumn).filter(TableColumn.table == self) | ||
|
||
old_columns_by_name = {col.column_name: col for col in old_columns} | ||
results = MetadataResult( | ||
removed=[ | ||
col | ||
for col in old_columns_by_name | ||
if col not in {col.name for col in new_table.columns} | ||
if col not in {col["name"] for col in new_columns} | ||
] | ||
) | ||
|
||
# clear old columns before adding modified columns back | ||
self.columns = [] | ||
for col in new_table.columns: | ||
try: | ||
datatype = db_engine_spec.column_datatype_to_string( | ||
col.type, db_dialect | ||
) | ||
except Exception as ex: # pylint: disable=broad-except | ||
datatype = "UNKNOWN" | ||
logger.error("Unrecognized data type in %s.%s", new_table, col.name) | ||
logger.exception(ex) | ||
Comment on lines
-1327
to
-1334
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This compilation step was moved to |
||
old_column = old_columns_by_name.get(col.name, None) | ||
for col in new_columns: | ||
old_column = old_columns_by_name.get(col["name"], None) | ||
if not old_column: | ||
results.added.append(col.name) | ||
results.added.append(col["name"]) | ||
new_column = TableColumn( | ||
column_name=col.name, type=datatype, table=self | ||
column_name=col["name"], type=col["type"], table=self | ||
) | ||
new_column.is_dttm = new_column.is_temporal | ||
db_engine_spec.alter_new_orm_column(new_column) | ||
else: | ||
new_column = old_column | ||
if new_column.type != datatype: | ||
results.modified.append(col.name) | ||
new_column.type = datatype | ||
if new_column.type != col["type"]: | ||
results.modified.append(col["name"]) | ||
new_column.type = col["type"] | ||
new_column.groupby = True | ||
new_column.filterable = True | ||
self.columns.append(new_column) | ||
if not any_date_col and new_column.is_temporal: | ||
any_date_col = col.name | ||
any_date_col = col["name"] | ||
metrics.append( | ||
SqlMetric( | ||
metric_name="count", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,7 +24,7 @@ | |
|
||
from superset import db | ||
from superset.connectors.connector_registry import ConnectorRegistry | ||
from superset.models.core import Database | ||
from superset.exceptions import SupersetException | ||
from superset.typing import FlaskResponse | ||
|
||
from .base import api, BaseSupersetView, handle_api_exception, json_error_response | ||
|
@@ -100,21 +100,11 @@ def external_metadata( | |
self, datasource_type: str, datasource_id: int | ||
) -> FlaskResponse: | ||
"""Gets column info from the source system""" | ||
if datasource_type == "druid": | ||
try: | ||
datasource = ConnectorRegistry.get_datasource( | ||
datasource_type, datasource_id, db.session | ||
) | ||
elif datasource_type == "table": | ||
database = ( | ||
db.session.query(Database).filter_by(id=request.args.get("db_id")).one() | ||
) | ||
table_class = ConnectorRegistry.sources["table"] | ||
datasource = table_class( | ||
database=database, | ||
table_name=request.args.get("table_name"), | ||
schema=request.args.get("schema") or None, | ||
) | ||
else: | ||
raise Exception(f"Unsupported datasource_type: {datasource_type}") | ||
Comment on lines
-107
to
-118
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know why the table was created like this instead of just using the available functionality that populates all fields like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mmmmh, oh it could b that we use this for tables that don't have associated datasets yet, for example in the SQL Lab code base where we show the schema of a table on the left panel. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just checked and it looks like we call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NOTE: looked a bit deeper and it turns out that |
||
external_metadata = datasource.external_metadata() | ||
return self.json_response(external_metadata) | ||
external_metadata = datasource.external_metadata() | ||
return self.json_response(external_metadata) | ||
except SupersetException as ex: | ||
return json_error_response(str(ex), status=400) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,27 +18,82 @@ | |
import json | ||
from copy import deepcopy | ||
|
||
from superset import db | ||
from superset.connectors.sqla.models import SqlaTable | ||
from superset.utils.core import get_example_database | ||
|
||
from .base_tests import SupersetTestCase | ||
from .fixtures.datasource import datasource_post | ||
|
||
|
||
class TestDatasource(SupersetTestCase): | ||
def test_external_metadata(self): | ||
def test_external_metadata_for_physical_table(self): | ||
self.login(username="admin") | ||
tbl = self.get_table_by_name("birth_names") | ||
schema = tbl.schema or "" | ||
url = ( | ||
f"/datasource/external_metadata/table/{tbl.id}/?" | ||
f"db_id={tbl.database.id}&" | ||
f"table_name={tbl.table_name}&" | ||
f"schema={schema}&" | ||
) | ||
url = f"/datasource/external_metadata/table/{tbl.id}/" | ||
resp = self.get_json_resp(url) | ||
col_names = {o.get("name") for o in resp} | ||
self.assertEqual( | ||
col_names, {"sum_boys", "num", "gender", "name", "ds", "state", "sum_girls"} | ||
) | ||
|
||
def test_external_metadata_for_virtual_table(self): | ||
self.login(username="admin") | ||
session = db.session | ||
table = SqlaTable( | ||
table_name="dummy_sql_table", | ||
database=get_example_database(), | ||
sql="select 123 as intcol, 'abc' as strcol", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @bkyryliuk this query was raising an exception, as either the type for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
) | ||
session.add(table) | ||
session.commit() | ||
|
||
table = self.get_table_by_name("dummy_sql_table") | ||
url = f"/datasource/external_metadata/table/{table.id}/" | ||
resp = self.get_json_resp(url) | ||
assert {o.get("name") for o in resp} == {"intcol", "strcol"} | ||
session.delete(table) | ||
session.commit() | ||
|
||
def test_external_metadata_for_malicious_virtual_table(self): | ||
self.login(username="admin") | ||
session = db.session | ||
table = SqlaTable( | ||
table_name="malicious_sql_table", | ||
database=get_example_database(), | ||
sql="delete table birth_names", | ||
) | ||
session.add(table) | ||
session.commit() | ||
|
||
table = self.get_table_by_name("malicious_sql_table") | ||
url = f"/datasource/external_metadata/table/{table.id}/" | ||
resp = self.get_json_resp(url) | ||
assert "error" in resp | ||
|
||
session.delete(table) | ||
session.commit() | ||
|
||
def test_external_metadata_for_mutistatement_virtual_table(self): | ||
self.login(username="admin") | ||
session = db.session | ||
table = SqlaTable( | ||
table_name="multistatement_sql_table", | ||
database=get_example_database(), | ||
sql="select 123 as intcol, 'abc' as strcol;" | ||
"select 123 as intcol, 'abc' as strcol", | ||
) | ||
session.add(table) | ||
session.commit() | ||
|
||
table = self.get_table_by_name("multistatement_sql_table") | ||
url = f"/datasource/external_metadata/table/{table.id}/" | ||
resp = self.get_json_resp(url) | ||
assert "error" in resp | ||
|
||
session.delete(table) | ||
session.commit() | ||
|
||
def compare_lists(self, l1, l2, key): | ||
l2_lookup = {o.get(key): o for o in l2} | ||
for obj1 in l1: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the simplification of the code in the endpoint, the slightly hackish query params (
db_id
,schema
andtable_name
) are now redundant.