Skip to content

Commit

Permalink
fix(datasets): add support for removing owners (#16461)
Browse files Browse the repository at this point in the history
* fix(datasets): add support for removing owners

* default to current user
  • Loading branch information
villebro authored Aug 31, 2021
1 parent 1f1e2dd commit c5a5cf7
Showing 1 changed file with 21 additions and 7 deletions.
28 changes: 21 additions & 7 deletions superset/views/datasource/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from collections import Counter
from typing import Any

from flask import request
from flask import g, request
from flask_appbuilder import expose
from flask_appbuilder.api import rison
from flask_appbuilder.security.decorators import has_access_api
Expand All @@ -28,13 +28,15 @@
from sqlalchemy.orm.exc import NoResultFound

from superset import app, db, event_logger
from superset.commands.utils import populate_owners
from superset.connectors.connector_registry import ConnectorRegistry
from superset.connectors.sqla.utils import get_physical_table_metadata
from superset.datasets.commands.exceptions import (
DatasetForbiddenError,
DatasetNotFoundError,
)
from superset.exceptions import SupersetException, SupersetSecurityException
from superset.extensions import security_manager
from superset.models.core import Database
from superset.typing import FlaskResponse
from superset.views.base import (
Expand Down Expand Up @@ -79,16 +81,28 @@ def save(self) -> FlaskResponse:
if "owners" in datasource_dict and orm_datasource.owner_class is not None:
# Check ownership
if app.config["OLD_API_CHECK_DATASET_OWNERSHIP"]:
# mimic the behavior of the new dataset command that
# checks ownership and ensures that non-admins aren't locked out
# of the object
try:
check_ownership(orm_datasource)
except SupersetSecurityException as ex:
raise DatasetForbiddenError() from ex

datasource_dict["owners"] = (
db.session.query(orm_datasource.owner_class)
.filter(orm_datasource.owner_class.id.in_(datasource_dict["owners"]))
.all()
)
user = security_manager.get_user_by_id(g.user.id)
datasource_dict["owners"] = populate_owners(
user, datasource_dict["owners"], default_to_user=False
)
else:
# legacy behavior
datasource_dict["owners"] = (
db.session.query(orm_datasource.owner_class)
.filter(
orm_datasource.owner_class.id.in_(
datasource_dict["owners"] or []
)
)
.all()
)

duplicates = [
name
Expand Down

0 comments on commit c5a5cf7

Please sign in to comment.