Skip to content

Commit

Permalink
feat: enable metadata sync for virtual tables (apache#10645)
Browse files Browse the repository at this point in the history
* feat: enable metadata sync for virtual tables

* add migration and check for empty schema name

* simplify request

* truncate trailing column attributes for MySQL

* add unit test

* use db_engine_spec func to truncate collation and charset

* Remove redundant migration

* add more tests

* address review comments and apply templating to query

* add todo for refactoring

* remove schema from tests

* check column datatype
  • Loading branch information
villebro authored and auxten committed Nov 20, 2020
1 parent 89bd321 commit d82a4b2
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 76 deletions.
17 changes: 3 additions & 14 deletions superset-frontend/src/datasource/DatasourceEditor.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -392,14 +392,9 @@ class DatasourceEditor extends React.PureComponent {

syncMetadata() {
const { datasource } = this.state;
// Handle carefully when the schema is empty
const endpoint =
`/datasource/external_metadata/${
datasource.type || datasource.datasource_type
}/${datasource.id}/` +
`?db_id=${datasource.database.id}` +
`&schema=${datasource.schema || ''}` +
`&table_name=${datasource.datasource_name || datasource.table_name}`;
const endpoint = `/datasource/external_metadata/${
datasource.type || datasource.datasource_type
}/${datasource.id}/`;
this.setState({ metadataLoading: true });

SupersetClient.get({ endpoint })
Expand Down Expand Up @@ -930,12 +925,6 @@ class DatasourceEditor extends React.PureComponent {
buttonStyle="primary"
onClick={this.syncMetadata}
className="sync-from-source"
disabled={!!datasource.sql}
tooltip={
datasource.sql
? t('This option is not yet available for views')
: null
}
>
{t('Sync columns from source')}
</Button>
Expand Down
103 changes: 66 additions & 37 deletions superset/connectors/sqla/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import json
import logging
from collections import defaultdict, OrderedDict
from contextlib import closing
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any, Dict, Hashable, List, NamedTuple, Optional, Tuple, Union
Expand Down Expand Up @@ -44,18 +45,24 @@
Table,
Text,
)
from sqlalchemy.exc import CompileError, SQLAlchemyError
from sqlalchemy.exc import CompileError
from sqlalchemy.orm import backref, Query, relationship, RelationshipProperty, Session
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.schema import UniqueConstraint
from sqlalchemy.sql import column, ColumnElement, literal_column, table, text
from sqlalchemy.sql.expression import Label, Select, TextAsFrom
from sqlalchemy.types import TypeEngine

from superset import app, db, is_feature_enabled, security_manager
from superset.connectors.base.models import BaseColumn, BaseDatasource, BaseMetric
from superset.constants import NULL_STRING
from superset.db_engine_specs.base import TimestampExpression
from superset.exceptions import DatabaseNotFound, QueryObjectValidationError
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.exceptions import (
DatabaseNotFound,
QueryObjectValidationError,
SupersetSecurityException,
)
from superset.jinja_context import (
BaseTemplateProcessor,
ExtraCache,
Expand All @@ -64,6 +71,7 @@
from superset.models.annotations import Annotation
from superset.models.core import Database
from superset.models.helpers import AuditMixinNullable, QueryResult
from superset.result_set import SupersetResultSet
from superset.sql_parse import ParsedQuery
from superset.typing import Metric, QueryObjectDict
from superset.utils import core as utils, import_datasource
Expand Down Expand Up @@ -643,12 +651,52 @@ def sql_url(self) -> str:
return self.database.sql_url + "?table_name=" + str(self.table_name)

def external_metadata(self) -> List[Dict[str, str]]:
cols = self.database.get_columns(self.table_name, schema=self.schema)
for col in cols:
try:
col["type"] = str(col["type"])
except CompileError:
col["type"] = "UNKNOWN"
db_engine_spec = self.database.db_engine_spec
if self.sql:
engine = self.database.get_sqla_engine(schema=self.schema)
sql = self.get_template_processor().process_template(self.sql)
parsed_query = ParsedQuery(sql)
if not parsed_query.is_readonly():
raise SupersetSecurityException(
SupersetError(
error_type=SupersetErrorType.DATASOURCE_SECURITY_ACCESS_ERROR,
message=_("Only `SELECT` statements are allowed"),
level=ErrorLevel.ERROR,
)
)
statements = parsed_query.get_statements()
if len(statements) > 1:
raise SupersetSecurityException(
SupersetError(
error_type=SupersetErrorType.DATASOURCE_SECURITY_ACCESS_ERROR,
message=_("Only single queries supported"),
level=ErrorLevel.ERROR,
)
)
# TODO(villebro): refactor to use same code that's used by
# sql_lab.py:execute_sql_statements
with closing(engine.raw_connection()) as conn:
with closing(conn.cursor()) as cursor:
query = self.database.apply_limit_to_sql(statements[0])
db_engine_spec.execute(cursor, query)
result = db_engine_spec.fetch_data(cursor, limit=1)
result_set = SupersetResultSet(
result, cursor.description, db_engine_spec
)
cols = result_set.columns
else:
db_dialect = self.database.get_dialect()
cols = self.database.get_columns(
self.table_name, schema=self.schema or None
)
for col in cols:
try:
if isinstance(col["type"], TypeEngine):
col["type"] = db_engine_spec.column_datatype_to_string(
col["type"], db_dialect
)
except CompileError:
col["type"] = "UNKNOWN"
return cols

@property
Expand Down Expand Up @@ -1310,61 +1358,42 @@ def fetch_metadata(self, commit: bool = True) -> MetadataResult:
:param commit: should the changes be committed or not.
:return: Tuple with lists of added, removed and modified column names.
"""
try:
new_table = self.get_sqla_table_object()
except SQLAlchemyError:
raise QueryObjectValidationError(
_(
"Table %(table)s doesn't seem to exist in the specified database, "
"couldn't fetch column information",
table=self.table_name,
)
)

new_columns = self.external_metadata()
metrics = []
any_date_col = None
db_engine_spec = self.database.db_engine_spec
db_dialect = self.database.get_dialect()
old_columns = db.session.query(TableColumn).filter(TableColumn.table == self)

old_columns_by_name = {col.column_name: col for col in old_columns}
results = MetadataResult(
removed=[
col
for col in old_columns_by_name
if col not in {col.name for col in new_table.columns}
if col not in {col["name"] for col in new_columns}
]
)

# clear old columns before adding modified columns back
self.columns = []
for col in new_table.columns:
try:
datatype = db_engine_spec.column_datatype_to_string(
col.type, db_dialect
)
except Exception as ex: # pylint: disable=broad-except
datatype = "UNKNOWN"
logger.error("Unrecognized data type in %s.%s", new_table, col.name)
logger.exception(ex)
old_column = old_columns_by_name.get(col.name, None)
for col in new_columns:
old_column = old_columns_by_name.get(col["name"], None)
if not old_column:
results.added.append(col.name)
results.added.append(col["name"])
new_column = TableColumn(
column_name=col.name, type=datatype, table=self
column_name=col["name"], type=col["type"], table=self
)
new_column.is_dttm = new_column.is_temporal
db_engine_spec.alter_new_orm_column(new_column)
else:
new_column = old_column
if new_column.type != datatype:
results.modified.append(col.name)
new_column.type = datatype
if new_column.type != col["type"]:
results.modified.append(col["name"])
new_column.type = col["type"]
new_column.groupby = True
new_column.filterable = True
self.columns.append(new_column)
if not any_date_col and new_column.is_temporal:
any_date_col = col.name
any_date_col = col["name"]
metrics.append(
SqlMetric(
metric_name="count",
Expand Down
22 changes: 6 additions & 16 deletions superset/views/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from superset import db
from superset.connectors.connector_registry import ConnectorRegistry
from superset.models.core import Database
from superset.exceptions import SupersetException
from superset.typing import FlaskResponse

from .base import api, BaseSupersetView, handle_api_exception, json_error_response
Expand Down Expand Up @@ -100,21 +100,11 @@ def external_metadata(
self, datasource_type: str, datasource_id: int
) -> FlaskResponse:
"""Gets column info from the source system"""
if datasource_type == "druid":
try:
datasource = ConnectorRegistry.get_datasource(
datasource_type, datasource_id, db.session
)
elif datasource_type == "table":
database = (
db.session.query(Database).filter_by(id=request.args.get("db_id")).one()
)
table_class = ConnectorRegistry.sources["table"]
datasource = table_class(
database=database,
table_name=request.args.get("table_name"),
schema=request.args.get("schema") or None,
)
else:
raise Exception(f"Unsupported datasource_type: {datasource_type}")
external_metadata = datasource.external_metadata()
return self.json_response(external_metadata)
external_metadata = datasource.external_metadata()
return self.json_response(external_metadata)
except SupersetException as ex:
return json_error_response(str(ex), status=400)
71 changes: 63 additions & 8 deletions tests/datasource_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,82 @@
import json
from copy import deepcopy

from superset import db
from superset.connectors.sqla.models import SqlaTable
from superset.utils.core import get_example_database

from .base_tests import SupersetTestCase
from .fixtures.datasource import datasource_post


class TestDatasource(SupersetTestCase):
def test_external_metadata(self):
def test_external_metadata_for_physical_table(self):
self.login(username="admin")
tbl = self.get_table_by_name("birth_names")
schema = tbl.schema or ""
url = (
f"/datasource/external_metadata/table/{tbl.id}/?"
f"db_id={tbl.database.id}&"
f"table_name={tbl.table_name}&"
f"schema={schema}&"
)
url = f"/datasource/external_metadata/table/{tbl.id}/"
resp = self.get_json_resp(url)
col_names = {o.get("name") for o in resp}
self.assertEqual(
col_names, {"sum_boys", "num", "gender", "name", "ds", "state", "sum_girls"}
)

def test_external_metadata_for_virtual_table(self):
self.login(username="admin")
session = db.session
table = SqlaTable(
table_name="dummy_sql_table",
database=get_example_database(),
sql="select 123 as intcol, 'abc' as strcol",
)
session.add(table)
session.commit()

table = self.get_table_by_name("dummy_sql_table")
url = f"/datasource/external_metadata/table/{table.id}/"
resp = self.get_json_resp(url)
assert {o.get("name") for o in resp} == {"intcol", "strcol"}
session.delete(table)
session.commit()

def test_external_metadata_for_malicious_virtual_table(self):
self.login(username="admin")
session = db.session
table = SqlaTable(
table_name="malicious_sql_table",
database=get_example_database(),
sql="delete table birth_names",
)
session.add(table)
session.commit()

table = self.get_table_by_name("malicious_sql_table")
url = f"/datasource/external_metadata/table/{table.id}/"
resp = self.get_json_resp(url)
assert "error" in resp

session.delete(table)
session.commit()

def test_external_metadata_for_mutistatement_virtual_table(self):
self.login(username="admin")
session = db.session
table = SqlaTable(
table_name="multistatement_sql_table",
database=get_example_database(),
sql="select 123 as intcol, 'abc' as strcol;"
"select 123 as intcol, 'abc' as strcol",
)
session.add(table)
session.commit()

table = self.get_table_by_name("multistatement_sql_table")
url = f"/datasource/external_metadata/table/{table.id}/"
resp = self.get_json_resp(url)
assert "error" in resp

session.delete(table)
session.commit()

def compare_lists(self, l1, l2, key):
l2_lookup = {o.get(key): o for o in l2}
for obj1 in l1:
Expand Down
2 changes: 1 addition & 1 deletion tests/import_export_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def create_dashboard(self, title, id=0, slcs=[]):
json_metadata=json.dumps(json_metadata),
)

def create_table(self, name, schema="", id=0, cols_names=[], metric_names=[]):
def create_table(self, name, schema=None, id=0, cols_names=[], metric_names=[]):
params = {"remote_id": id, "database_name": "examples"}
table = SqlaTable(
id=id, schema=schema, table_name=name, params=json.dumps(params)
Expand Down

0 comments on commit d82a4b2

Please sign in to comment.