diff --git a/flask_appbuilder/security/sqla/apis/__init__.py b/flask_appbuilder/security/sqla/apis/__init__.py new file mode 100644 index 0000000000..220edd07c4 --- /dev/null +++ b/flask_appbuilder/security/sqla/apis/__init__.py @@ -0,0 +1,7 @@ +from flask_appbuilder.security.sqla.apis.permission import PermissionApi # noqa: F401 +from flask_appbuilder.security.sqla.apis.permission_view_menu import ( # noqa: F401 + PermissionViewMenuApi, +) +from flask_appbuilder.security.sqla.apis.role import RoleApi # noqa: F401 +from flask_appbuilder.security.sqla.apis.user import UserApi # noqa: F401 +from flask_appbuilder.security.sqla.apis.view_menu import ViewMenuApi # noqa: F401 diff --git a/flask_appbuilder/security/sqla/apis/permission/__init__.py b/flask_appbuilder/security/sqla/apis/permission/__init__.py new file mode 100644 index 0000000000..fbefe5f12b --- /dev/null +++ b/flask_appbuilder/security/sqla/apis/permission/__init__.py @@ -0,0 +1 @@ +from .api import PermissionApi # noqa: F401 diff --git a/flask_appbuilder/security/sqla/apis/permission/api.py b/flask_appbuilder/security/sqla/apis/permission/api.py new file mode 100644 index 0000000000..19c522ce6b --- /dev/null +++ b/flask_appbuilder/security/sqla/apis/permission/api.py @@ -0,0 +1,19 @@ +from flask_appbuilder import ModelRestApi +from flask_appbuilder.models.sqla.interface import SQLAInterface +from flask_appbuilder.security.sqla.models import Permission + + +class PermissionApi(ModelRestApi): + resource_name = "permissions" + openapi_spec_tag = "Security Permissions" + + class_permission_name = "Permission" + datamodel = SQLAInterface(Permission) + allow_browser_login = True + include_route_methods = {"info", "get", "get_list"} + + list_columns = ["id", "name"] + show_columns = list_columns + add_columns = ["name"] + edit_columns = add_columns + search_columns = list_columns diff --git a/flask_appbuilder/security/sqla/apis/permission_view_menu/__init__.py b/flask_appbuilder/security/sqla/apis/permission_view_menu/__init__.py new file mode 100644 index 0000000000..bccf326bd0 --- /dev/null +++ b/flask_appbuilder/security/sqla/apis/permission_view_menu/__init__.py @@ -0,0 +1 @@ +from .api import PermissionViewMenuApi # noqa: F401 diff --git a/flask_appbuilder/security/sqla/apis/permission_view_menu/api.py b/flask_appbuilder/security/sqla/apis/permission_view_menu/api.py new file mode 100644 index 0000000000..61b743881c --- /dev/null +++ b/flask_appbuilder/security/sqla/apis/permission_view_menu/api.py @@ -0,0 +1,17 @@ +from flask_appbuilder import ModelRestApi +from flask_appbuilder.models.sqla.interface import SQLAInterface +from flask_appbuilder.security.sqla.models import PermissionView + + +class PermissionViewMenuApi(ModelRestApi): + resource_name = "permissionsviewmenus" + openapi_spec_tag = "Security Permissions View Menus" + class_permission_name = "PermissionViewMenu" + datamodel = SQLAInterface(PermissionView) + allow_browser_login = True + + list_columns = ["id", "permission.name", "view_menu.name"] + show_columns = list_columns + add_columns = ["permission_id", "view_menu_id"] + edit_columns = add_columns + search_columns = list_columns diff --git a/flask_appbuilder/security/sqla/apis/role/__init__.py b/flask_appbuilder/security/sqla/apis/role/__init__.py new file mode 100644 index 0000000000..640bca7c27 --- /dev/null +++ b/flask_appbuilder/security/sqla/apis/role/__init__.py @@ -0,0 +1 @@ +from .api import RoleApi # noqa: F401 diff --git a/flask_appbuilder/security/sqla/apis/role/api.py b/flask_appbuilder/security/sqla/apis/role/api.py new file mode 100644 index 0000000000..c5853ddc28 --- /dev/null +++ b/flask_appbuilder/security/sqla/apis/role/api.py @@ -0,0 +1,154 @@ +from flask import current_app, request +from flask_appbuilder import ModelRestApi +from flask_appbuilder.api import expose, safe +from flask_appbuilder.const import API_RESULT_RES_KEY +from flask_appbuilder.models.sqla.interface import SQLAInterface +from flask_appbuilder.security.decorators import permission_name, protect +from flask_appbuilder.security.sqla.apis.role.schema import ( + RolePermissionListSchema, + RolePermissionPostSchema, +) +from flask_appbuilder.security.sqla.models import PermissionView, Role +from marshmallow import ValidationError +from sqlalchemy.exc import IntegrityError + + +class RoleApi(ModelRestApi): + resource_name = "roles" + openapi_spec_tag = "Security Roles" + class_permission_name = "Role" + datamodel = SQLAInterface(Role) + allow_browser_login = True + + list_columns = ["id", "name"] + show_columns = list_columns + add_columns = ["name"] + edit_columns = ["name"] + search_columns = list_columns + + list_role_permission_schema = RolePermissionListSchema() + add_role_permission_schema = RolePermissionPostSchema() + openapi_spec_component_schemas = ( + RolePermissionListSchema, + RolePermissionPostSchema, + ) + + @expose("//permissions", methods=["GET"]) + @protect() + @safe + @permission_name("list_role_permissions") + def list_role_permissions(self, pk): + """list role permissions + --- + get: + parameters: + - in: path + schema: + type: integer + name: pk + responses: + 200: + description: List of permissions + content: + application/json: + schema: + type: object + properties: + result: + $ref: '#/components/schemas/RolePermissionListSchema' + 400: + $ref: '#/components/responses/400' + 401: + $ref: '#/components/responses/401' + 404: + $ref: '#/components/responses/404' + 422: + $ref: '#/components/responses/422' + 500: + $ref: '#/components/responses/500' + """ + role = self.datamodel.get(pk, select_columns=["permissions"]) + if not role: + return self.response_404() + + permissions = [ + { + "id": p.id, + "permission_name": p.permission.name, + "view_menu_name": p.view_menu.name, + } + for p in role.permissions + ] + return self.response(200, **{API_RESULT_RES_KEY: permissions}) + + @expose("//permissions", methods=["POST"]) + @protect() + @safe + @permission_name("add_role_permissions") + def add_role_permissions(self, role_id): + """add role permissions + --- + post: + parameters: + - in: path + schema: + type: integer + name: role_id + requestBody: + description: Add role permissions schema + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/RolePermissionPostSchema' + responses: + 200: + description: Permissions added + content: + application/json: + schema: + type: object + properties: + result: + $ref: '#/components/schemas/RolePermissionPostSchema' + 400: + $ref: '#/components/responses/400' + 401: + $ref: '#/components/responses/401' + 404: + $ref: '#/components/responses/404' + 422: + $ref: '#/components/responses/422' + 500: + $ref: '#/components/responses/500' + """ + try: + item = self.add_role_permission_schema.load(request.json) + role = self.datamodel.get(role_id) + if not role: + return self.response_404() + permissions = [] + for id in item["permission_view_menu_ids"]: + permission = ( + current_app.appbuilder.get_session.query(PermissionView) + .filter_by(id=id) + .one_or_none() + ) + if permission: + permissions.append(permission) + + role.permissions = permissions + self.datamodel.edit(role, raise_exception=True) + return self.response( + 200, + **{ + API_RESULT_RES_KEY: self.add_role_permission_schema.dump( + item, many=False + ) + }, + ) + + except ValidationError as error: + return self.response_400(message=error.messages) + except IntegrityError as e: + return self.response_422(message=str(e.orig)) diff --git a/flask_appbuilder/security/sqla/apis/role/schema.py b/flask_appbuilder/security/sqla/apis/role/schema.py new file mode 100644 index 0000000000..8dbd59218b --- /dev/null +++ b/flask_appbuilder/security/sqla/apis/role/schema.py @@ -0,0 +1,13 @@ +from marshmallow import fields, Schema + + +class RolePermissionPostSchema(Schema): + permission_view_menu_ids = fields.List( + fields.Integer, required=True, description="List of permission view menu id" + ) + + +class RolePermissionListSchema(Schema): + id = fields.Integer() + permission_name = fields.String() + view_menu_name = fields.String() diff --git a/flask_appbuilder/security/sqla/apis/user/__init__.py b/flask_appbuilder/security/sqla/apis/user/__init__.py new file mode 100644 index 0000000000..44378357a6 --- /dev/null +++ b/flask_appbuilder/security/sqla/apis/user/__init__.py @@ -0,0 +1 @@ +from .api import UserApi # noqa: F401 diff --git a/flask_appbuilder/security/sqla/apis/user/api.py b/flask_appbuilder/security/sqla/apis/user/api.py new file mode 100644 index 0000000000..6fd3582bac --- /dev/null +++ b/flask_appbuilder/security/sqla/apis/user/api.py @@ -0,0 +1,209 @@ +from datetime import datetime + +from flask import current_app +from flask import g, request +from flask_appbuilder import ModelRestApi +from flask_appbuilder.api import expose, safe +from flask_appbuilder.const import API_RESULT_RES_KEY +from flask_appbuilder.models.sqla.interface import SQLAInterface +from flask_appbuilder.security.decorators import permission_name, protect +from flask_appbuilder.security.sqla.apis.user.schema import ( + UserPostSchema, + UserPutSchema, +) +from flask_appbuilder.security.sqla.models import Role, User +from marshmallow import ValidationError +from sqlalchemy.exc import IntegrityError +from werkzeug.security import generate_password_hash + + +class UserApi(ModelRestApi): + resource_name = "users" + openapi_spec_tag = "Security Users" + class_permission_name = "User" + datamodel = SQLAInterface(User) + allow_browser_login = True + + list_columns = [ + "id", + "roles.id", + "roles.name", + "first_name", + "last_name", + "username", + "active", + "email", + "last_login", + "login_count", + "fail_login_count", + "created_on", + "changed_on", + "created_by.id", + "changed_by.id", + ] + show_columns = list_columns + add_columns = [ + "roles", + "first_name", + "last_name", + "username", + "active", + "email", + "password", + ] + edit_columns = add_columns + search_columns = list_columns + + add_model_schema = UserPostSchema() + edit_model_schema = UserPutSchema() + + def pre_update(self, item): + item.changed_on = datetime.now() + item.changed_by_fk = g.user.id + if item.password: + item.password = generate_password_hash(item.password) + + def pre_add(self, item): + item.password = generate_password_hash(item.password) + + @expose("/", methods=["POST"]) + @protect() + @safe + @permission_name("post") + def post(self): + """Create new user + --- + post: + requestBody: + description: Model schema + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/{{self.__class__.__name__}}.post' + responses: + 201: + description: Item changed + content: + application/json: + schema: + type: object + properties: + result: + $ref: '#/components/schemas/{{self.__class__.__name__}}.post' + 400: + $ref: '#/components/responses/400' + 401: + $ref: '#/components/responses/401' + 404: + $ref: '#/components/responses/404' + 422: + $ref: '#/components/responses/422' + 500: + $ref: '#/components/responses/500' + """ + try: + item = self.add_model_schema.load(request.json) + model = User() + roles = [] + for key, value in item.items(): + if key != "roles": + setattr(model, key, value) + else: + for role_id in item[key]: + role = ( + current_app.appbuilder.get_session.query(Role) + .filter(Role.id == role_id) + .one_or_none() + ) + if role: + role.user_id = model.id + role.role_id = role_id + roles.append(role) + + if "roles" in item.keys(): + model.roles = roles + + self.pre_add(model) + self.datamodel.add(model, raise_exception=True) + return self.response(201, id=model.id) + except ValidationError as error: + return self.response_400(message=error.messages) + except IntegrityError as e: + return self.response_422(message=str(e.orig)) + + @expose("/", methods=["PUT"]) + @protect() + @safe + @permission_name("put") + def put(self, pk): + """Edit user + --- + put: + parameters: + - in: path + schema: + type: integer + name: pk + requestBody: + description: Model schema + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/{{self.__class__.__name__}}.put' + responses: + 200: + description: Item changed + content: + application/json: + schema: + type: object + properties: + result: + $ref: '#/components/schemas/{{self.__class__.__name__}}.put' + 400: + $ref: '#/components/responses/400' + 401: + $ref: '#/components/responses/401' + 404: + $ref: '#/components/responses/404' + 422: + $ref: '#/components/responses/422' + 500: + $ref: '#/components/responses/500' + """ + try: + item = self.edit_model_schema.load(request.json) + model = self.datamodel.get(pk, self._base_filters) + roles = [] + + for key, value in item.items(): + if key != "roles": + setattr(model, key, value) + else: + for role_id in item[key]: + role = ( + current_app.appbuilder.session.query(Role) + .filter(Role.id == role_id) + .one_or_none() + ) + if role: + role.user_id = model.id + role.role_id = role_id + roles.append(role) + + if "roles" in item.keys(): + model.roles = roles + + self.pre_update(model) + self.datamodel.edit(model, raise_exception=True) + return self.response( + 200, + **{API_RESULT_RES_KEY: self.edit_model_schema.dump(item, many=False)}, + ) + + except ValidationError as e: + return self.response_400(message=e.messages) + except IntegrityError as e: + return self.response_422(message=str(e.orig)) diff --git a/flask_appbuilder/security/sqla/apis/user/schema.py b/flask_appbuilder/security/sqla/apis/user/schema.py new file mode 100644 index 0000000000..df0b7f4c15 --- /dev/null +++ b/flask_appbuilder/security/sqla/apis/user/schema.py @@ -0,0 +1,60 @@ +from flask_appbuilder.security.sqla.models import User +from marshmallow import fields, Schema +from marshmallow.validate import Length + +from .validator import PasswordComplexityValidator + +active_description = ( + "Is user active?" "It's not a good policy to remove a user, just make it inactive" +) +email_description = "The user's email" +first_name_description = "The user's first name" +last_name_description = "The user's last name" +password_description = "The user's password for authentication" +roles_description = "The user's roles" +username_description = "The user's username" + + +class UserPostSchema(Schema): + model_cls = User + active = fields.Boolean( + required=False, default=True, description=active_description + ) + email = fields.String(required=True, description=email_description) + first_name = fields.String(required=True, description=first_name_description) + last_name = fields.String(required=True, description=last_name_description) + password = fields.String( + required=True, + validate=[PasswordComplexityValidator()], + description=password_description, + ) + roles = fields.List( + fields.Integer, + required=True, + validate=[Length(1)], + description=roles_description, + ) + username = fields.String( + required=True, validate=[Length(1, 250)], description=username_description + ) + + +class UserPutSchema(Schema): + active = fields.Boolean(required=False, description=active_description) + email = fields.String(required=False, description=email_description) + first_name = fields.String(required=False, description=first_name_description) + last_name = fields.String(required=False, description=last_name_description) + password = fields.String( + required=False, + validate=[PasswordComplexityValidator()], + description=password_description, + ) + roles = fields.List( + fields.Integer, + required=False, + validate=[Length(1)], + description=roles_description, + ) + username = fields.String( + required=False, validate=[Length(1, 250)], description=username_description + ) diff --git a/flask_appbuilder/security/sqla/apis/user/validator.py b/flask_appbuilder/security/sqla/apis/user/validator.py new file mode 100644 index 0000000000..e7dd62ccb3 --- /dev/null +++ b/flask_appbuilder/security/sqla/apis/user/validator.py @@ -0,0 +1,28 @@ +from flask import current_app +from flask_appbuilder.exceptions import PasswordComplexityValidationError +from flask_appbuilder.validators import default_password_complexity +from marshmallow.exceptions import ValidationError +from marshmallow.validate import Validator + + +class PasswordComplexityValidator(Validator): + """Validator for password. + """ + + def __call__(self, value: str) -> str: + if current_app.config.get("FAB_PASSWORD_COMPLEXITY_ENABLED", False): + password_complexity_validator = current_app.config.get( + "FAB_PASSWORD_COMPLEXITY_VALIDATOR", None + ) + if password_complexity_validator is not None: + try: + password_complexity_validator(value) + except PasswordComplexityValidationError as exc: + raise ValidationError(str(exc)) + else: + try: + default_password_complexity(value) + except PasswordComplexityValidationError as exc: + raise ValidationError(str(exc)) + + return value diff --git a/flask_appbuilder/security/sqla/apis/view_menu/__init__.py b/flask_appbuilder/security/sqla/apis/view_menu/__init__.py new file mode 100644 index 0000000000..6652d77a00 --- /dev/null +++ b/flask_appbuilder/security/sqla/apis/view_menu/__init__.py @@ -0,0 +1 @@ +from .api import ViewMenuApi # noqa: F401 diff --git a/flask_appbuilder/security/sqla/apis/view_menu/api.py b/flask_appbuilder/security/sqla/apis/view_menu/api.py new file mode 100644 index 0000000000..3177d645e7 --- /dev/null +++ b/flask_appbuilder/security/sqla/apis/view_menu/api.py @@ -0,0 +1,18 @@ +from flask_appbuilder import ModelRestApi +from flask_appbuilder.models.sqla.interface import SQLAInterface +from flask_appbuilder.security.sqla.models import ViewMenu + + +class ViewMenuApi(ModelRestApi): + resource_name = "viewmenus" + openapi_spec_tag = "Security View Menus" + + class_permission_name = "ViewMenu" + datamodel = SQLAInterface(ViewMenu) + allow_browser_login = True + + list_columns = ["id", "name"] + show_columns = list_columns + add_columns = ["name"] + edit_columns = add_columns + search_columns = list_columns diff --git a/flask_appbuilder/security/sqla/manager.py b/flask_appbuilder/security/sqla/manager.py index 0db0b2941b..aef28d1650 100755 --- a/flask_appbuilder/security/sqla/manager.py +++ b/flask_appbuilder/security/sqla/manager.py @@ -10,6 +10,7 @@ from sqlalchemy.orm.exc import MultipleResultsFound from werkzeug.security import generate_password_hash +from .apis import PermissionApi, PermissionViewMenuApi, RoleApi, UserApi, ViewMenuApi from .models import ( assoc_permissionview_role, Permission, @@ -45,6 +46,13 @@ class SecurityManager(BaseSecurityManager): permissionview_model = PermissionView registeruser_model = RegisterUser + # APIs + permission_api = PermissionApi + role_api = RoleApi + user_api = UserApi + view_menu_api = ViewMenuApi + permission_view_menu_api = PermissionViewMenuApi + def __init__(self, appbuilder): """ SecurityManager contructor @@ -84,6 +92,13 @@ def get_session(self): return self.appbuilder.get_session def register_views(self): + if self.appbuilder.app.config.get("FAB_ADD_SECURITY_API", False): + self.appbuilder.add_api(self.permission_api) + self.appbuilder.add_api(self.role_api) + self.appbuilder.add_api(self.user_api) + self.appbuilder.add_api(self.view_menu_api) + self.appbuilder.add_api(self.permission_view_menu_api) + super(SecurityManager, self).register_views() def create_db(self): diff --git a/flask_appbuilder/tests/test_security_api.py b/flask_appbuilder/tests/test_security_api.py new file mode 100644 index 0000000000..baf0e98a07 --- /dev/null +++ b/flask_appbuilder/tests/test_security_api.py @@ -0,0 +1,1082 @@ +import json +import logging +import os + +from flask_appbuilder import SQLA +from flask_appbuilder.security.sqla.models import Permission, Role, ViewMenu + +from .base import FABTestCase +from .const import PASSWORD_ADMIN, USERNAME_ADMIN + +log = logging.getLogger(__name__) + + +class UserAPITestCase(FABTestCase): + def setUp(self): + from flask import Flask + from flask_appbuilder import AppBuilder + from flask_appbuilder.security.sqla.models import User, Role + + self.app = Flask(__name__) + self.basedir = os.path.abspath(os.path.dirname(__file__)) + self.app.config.from_object("flask_appbuilder.tests.config_api") + self.app.config["FAB_ADD_SECURITY_API"] = True + self.db = SQLA(self.app) + self.session = self.db.session + self.appbuilder = AppBuilder(self.app, self.session) + self.user_model = User + self.role_model = Role + + # TODO: this heinous hack is to avoid using stale db session leaking from + # RolePermissionAPITestCase + # don't know why all baseviews in Appbuilder are attached to stale session, + # causing error when adding a new user which reads roles from this session and + # datamodel uses stale session to add it. + for b in self.appbuilder.baseviews: + if hasattr(b, "datamodel") and b.datamodel.session is not None: + b.datamodel.session = self.db.session + + def tearDown(self): + self.appbuilder.get_session.close() + engine = self.db.session.get_bind(mapper=None, clause=None) + engine.dispose() + + def test_user_list(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + total_users = self.appbuilder.sm.count_users() + uri = "api/v1/users/" + rv = self.auth_client_get(client, token, uri) + response = json.loads(rv.data) + + self.assertEqual(rv.status_code, 200) + assert "count" in response + self.assertEqual(response["count"], total_users) + self.assertEqual(len(response["result"]), total_users) + + def test_get_single_user(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + username = "test_get_single_user_1" + first_name = "first" + last_name = "last" + email = "test_get_single_user@fab.com" + password = "a" + role_name = "get_single_user_role" + + role = self.appbuilder.sm.add_role(role_name) + user = self.appbuilder.sm.add_user( + username, first_name, last_name, email, role, password + ) + + uri = f"api/v1/users/{user.id}" + rv = self.auth_client_get(client, token, uri) + self.assertEqual(rv.status_code, 200) + response = json.loads(rv.data) + + assert "result" in response + result = response["result"] + self.assertEqual(result["username"], username) + self.assertEqual(result["first_name"], first_name) + self.assertEqual(result["last_name"], last_name) + self.assertEqual(result["email"], email) + self.assertEqual(result["roles"], [{"id": role.id, "name": role_name}]) + + user = ( + self.session.query(self.user_model) + .filter(self.user_model.id == user.id) + .first() + ) + self.session.delete(user) + role = ( + self.session.query(self.role_model) + .filter(self.role_model.id == role.id) + .first() + ) + self.session.delete(role) + + self.session.commit() + + def test_get_single_invalid_user(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + uri = "api/v1/users/99999999" + rv = self.auth_client_get(client, token, uri) + self.assertEqual(rv.status_code, 404) + response = json.loads(rv.data) + + assert "message" in response + self.assertEqual(response["message"], "Not found") + + def test_create_user(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + role_name = "test_create_user_api" + role = self.appbuilder.sm.add_role(role_name) + + uri = "api/v1/users/" + create_user_payload = { + "active": True, + "email": "fab@test_create_user_3.com", + "first_name": "fab", + "last_name": "admin", + "password": "password", + "roles": [role.id], + "username": "fab_usear_api_test_4", + } + rv = self.auth_client_post(client, token, uri, create_user_payload) + add_user_response = json.loads(rv.data) + self.assertEqual(rv.status_code, 201) + + assert "id" in add_user_response + + user = self.appbuilder.sm.get_user_by_id(add_user_response["id"]) + + self.assertEqual(user.active, create_user_payload["active"]) + self.assertEqual(user.email, create_user_payload["email"]) + self.assertEqual(user.first_name, create_user_payload["first_name"]) + self.assertEqual(user.last_name, create_user_payload["last_name"]) + self.assertEqual(user.username, create_user_payload["username"]) + self.assertEqual(len(user.roles), 1) + self.assertEqual(user.roles[0].name, role_name) + + user = ( + self.session.query(self.user_model) + .filter(self.user_model.id == user.id) + .first() + ) + self.session.delete(user) + self.session.commit() + + def test_create_user_without_role(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + uri = "api/v1/users/" + create_user_payload = { + "active": True, + "email": "fab@test_create_user_1.com", + "first_name": "fab", + "last_name": "admin", + "password": "password", + "roles": [], + "username": "fab_usear_api_test_2", + } + rv = self.auth_client_post(client, token, uri, create_user_payload) + add_user_response = json.loads(rv.data) + self.assertEqual(rv.status_code, 400) + + assert "message" in add_user_response + self.assertEqual( + add_user_response["message"], {"roles": ["Shorter than minimum length 1."]} + ) + + def test_create_user_with_invalid_role(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + uri = "api/v1/users/" + create_user_payload = { + "active": True, + "email": "fab@test_create_user_1.com", + "first_name": "fab", + "last_name": "admin", + "password": "password", + "roles": [999999], + "username": "fab_usear_api_test_2", + } + rv = self.auth_client_post(client, token, uri, create_user_payload) + add_user_response = json.loads(rv.data) + self.assertEqual(rv.status_code, 201) + + user = self.appbuilder.sm.get_user_by_id(add_user_response["id"]) + + self.assertEqual(user.active, create_user_payload["active"]) + self.assertEqual(user.email, create_user_payload["email"]) + self.assertEqual(user.first_name, create_user_payload["first_name"]) + self.assertEqual(user.last_name, create_user_payload["last_name"]) + self.assertEqual(user.username, create_user_payload["username"]) + self.assertEqual(len(user.roles), 0) + + user = ( + self.session.query(self.user_model) + .filter(self.user_model.id == user.id) + .first() + ) + self.session.delete(user) + self.session.commit() + + def test_edit_user(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + username = "edit_user_13" + first_name = "first" + last_name = "last" + email = "test_edit_user13@fab.com" + password = "a" + role_name_1 = "edit_user_role_1" + role_name_2 = "edit_user_role_2" + role_name_3 = "edit_user_role_3" + updated_email = "test_edit_user_new7@fab.com" + + role_1 = self.appbuilder.sm.add_role(role_name_1) + role_2 = self.appbuilder.sm.add_role(role_name_2) + role_3 = self.appbuilder.sm.add_role(role_name_3) + user = self.appbuilder.sm.add_user( + username, first_name, last_name, email, [role_1], password + ) + + user_id = user.id + role_1_id = role_1.id + role_2_id = role_2.id + role_3_id = role_3.id + + uri = f"api/v1/users/{user_id}" + rv = self.auth_client_put( + client, + token, + uri, + {"email": updated_email, "roles": [role_2_id, role_3_id]}, + ) + self.assertEqual(rv.status_code, 200) + updated_user = self.appbuilder.sm.get_user_by_id(user_id) + self.assertEqual(len(updated_user.roles), 2) + self.assertEqual(updated_user.roles[0].name, role_name_2) + self.assertEqual(updated_user.roles[1].name, role_name_3) + self.assertEqual(updated_user.email, updated_email) + + roles = ( + self.session.query(self.role_model) + .filter(self.role_model.id.in_([role_1_id, role_2_id, role_3_id])) + .all() + ) + user = ( + self.session.query(self.user_model) + .filter(self.user_model.id == user_id) + .first() + ) + self.session.delete(user) + for r in roles: + self.session.delete(r) + self.session.commit() + + def test_delete_user(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + username = "delete_user_2" + first_name = "first" + last_name = "last" + email = "test_delete_user_2@fab.com" + password = "a" + role_name_1 = "delete_user_role_2" + + role = self.appbuilder.sm.add_role(role_name_1) + user = self.appbuilder.sm.add_user( + username, first_name, last_name, email, [role], password + ) + role_id = role.id + user_id = user.id + + uri = f"api/v1/users/{user_id}" + rv = self.auth_client_delete(client, token, uri) + self.assertEqual(rv.status_code, 200) + + updated_user = self.appbuilder.sm.get_user_by_id(user_id) + assert not updated_user + + role = ( + self.session.query(self.role_model) + .filter(self.role_model.id == role_id) + .first() + ) + self.session.delete(role) + self.session.commit() + + +class RolePermissionAPITestCase(FABTestCase): + def setUp(self): + from flask import Flask + from flask_appbuilder import AppBuilder + + self.app = Flask(__name__) + self.basedir = os.path.abspath(os.path.dirname(__file__)) + self.app.config.from_object("flask_appbuilder.tests.config_api") + self.app.config["FAB_ADD_SECURITY_API"] = True + self.db = SQLA(self.app) + self.session = self.db.session + self.appbuilder = AppBuilder(self.app, self.db.session) + self.permission_model = Permission + self.viewmenu_model = ViewMenu + self.role_model = Role + + for b in self.appbuilder.baseviews: + if hasattr(b, "datamodel") and b.datamodel.session is not None: + b.datamodel.session = self.db.session + + def tearDown(self): + self.appbuilder.get_session.close() + engine = self.db.session.get_bind(mapper=None, clause=None) + engine.dispose() + + def test_list_permission_api(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + count = self.session.query(self.permission_model).count() + + uri = "api/v1/permissions/" + rv = self.auth_client_get(client, token, uri) + self.assertEqual(rv.status_code, 200) + + response = json.loads(rv.data) + + assert "count" and "result" in response + self.assertEqual(response["count"], count) + + def test_get_permission_api(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + permission_name = "test_get_permission_api_1" + permission = self.appbuilder.sm.add_permission(permission_name) + permission_id = permission.id + + uri = f"api/v1/permissions/{permission_id}" + rv = self.auth_client_get(client, token, uri) + self.assertEqual(rv.status_code, 200) + + response = json.loads(rv.data) + + assert "id" and "result" in response + self.assertEqual(response["id"], permission_id) + self.assertEqual(response["result"]["name"], permission_name) + + self.session.delete(permission) + + def test_get_invalid_permission_api(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + uri = "api/v1/permissions/9999999" + rv = self.auth_client_get(client, token, uri) + response = json.loads(rv.data) + + self.assertEqual(rv.status_code, 404) + self.assertEqual(response, {"message": "Not found"}) + + def test_add_permission_api(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + uri = "api/v1/permissions/" + permission_name = "super duper fab permission" + + create_permission_payload = {"name": permission_name} + rv = self.auth_client_post(client, token, uri, create_permission_payload) + self.assertEqual(rv.status_code, 405) + permission = self.appbuilder.sm.find_permission(permission_name) + assert permission is None + + def test_edit_permission_api(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + permission_name = "test_edit_permission_api_2" + new_permission_name = "different_test_edit_permission_api_2" + permission = self.appbuilder.sm.add_permission(permission_name) + permission_id = permission.id + + uri = f"api/v1/permissions/{permission_id}" + rv = self.auth_client_put(client, token, uri, {"name": new_permission_name}) + + self.assertEqual(rv.status_code, 405) + + new_permission = self.appbuilder.sm.find_permission(new_permission_name) + assert new_permission is None + + self.appbuilder.sm.del_permission(permission_name) + + def test_delete_permission_api(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + permission_name = "test_delete_permission_api" + permission = self.appbuilder.sm.add_permission(permission_name) + + uri = f"api/v1/permissions/{permission.id}" + rv = self.auth_client_delete(client, token, uri) + self.assertEqual(rv.status_code, 405) + + new_permission = self.appbuilder.sm.find_permission(permission_name) + assert new_permission is not None + self.appbuilder.sm.del_permission(permission_name) + + def test_list_view_api(self): + """REST Api: Test view apis + """ + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + count = self.session.query(self.viewmenu_model).count() + + uri = "api/v1/viewmenus/" + rv = self.auth_client_get(client, token, uri) + response = json.loads(rv.data) + + self.assertEqual(rv.status_code, 200) + assert "count" and "result" in response + self.assertEqual(response["count"], count) + + def test_get_view_api(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + view_name = "test_get_view_api" + view = self.appbuilder.sm.add_view_menu(view_name) + view_id = view.id + + uri = f"api/v1/viewmenus/{view_id}" + rv = self.auth_client_get(client, token, uri) + response = json.loads(rv.data) + + self.assertEqual(rv.status_code, 200) + assert "id" and "result" in response + self.assertEqual(response["id"], view_id) + self.assertEqual(response["result"]["name"], view_name) + + self.session.delete(view) + + def test_get_invalid_view_api(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + uri = "api/v1/viewmenus/99999999" + rv = self.auth_client_get(client, token, uri) + response = json.loads(rv.data) + + self.assertEqual(rv.status_code, 404) + self.assertEqual(response, {"message": "Not found"}) + + def test_add_view_api(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + view_name = "super duper fab view" + uri = "api/v1/viewmenus/" + create_permission_payload = {"name": view_name} + rv = self.auth_client_post(client, token, uri, create_permission_payload) + add_permission_response = json.loads(rv.data) + + self.assertEqual(rv.status_code, 201) + assert "id" and "result" in add_permission_response + self.assertEqual(create_permission_payload, add_permission_response["result"]) + + self.appbuilder.sm.del_view_menu(view_name) + + def test_add_view_without_name_api(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + uri = "api/v1/viewmenus/" + create_view_payload = {} + rv = self.auth_client_post(client, token, uri, create_view_payload) + add_permission_response = json.loads(rv.data) + self.assertEqual(rv.status_code, 422) + assert "message" in add_permission_response + self.assertEqual( + {"message": {"name": ["Missing data for required field."]}}, + add_permission_response, + ) + + def test_edit_view_api(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + view_name = "test_edit_view_api" + new_view_name = "different_test_edit_view_api" + view_menu = self.appbuilder.sm.add_view_menu(view_name) + view_menu_id = view_menu.id + + uri = f"api/v1/viewmenus/{view_menu_id}" + rv = self.auth_client_put(client, token, uri, {"name": new_view_name}) + put_permission_response = json.loads(rv.data) + self.assertEqual(rv.status_code, 200) + self.assertEqual( + put_permission_response["result"].get("name", ""), new_view_name + ) + + new_view = self.appbuilder.sm.find_view_menu(new_view_name) + assert new_view + self.assertEqual(new_view.name, new_view_name) + + self.appbuilder.sm.del_view_menu(new_view_name) + + def test_delete_view_api(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + view_menu_name = "test_delete_view_api" + view_menu = self.appbuilder.sm.add_view_menu(view_menu_name) + + uri = f"api/v1/viewmenus/{view_menu.id}" + rv = self.auth_client_delete(client, token, uri) + self.assertEqual(rv.status_code, 200) + + new_view_menu = self.appbuilder.sm.find_view_menu(view_menu_name) + assert new_view_menu is None + + def test_list_permission_view_api(self): + """REST Api: Test permission view apis + """ + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + uri = "api/v1/permissionsviewmenus/" + rv = self.auth_client_get(client, token, uri) + self.assertEqual(rv.status_code, 200) + + def test_get_permission_view_api(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + permission_name = "test_get_permission_view_permission" + view_name = "test_get_permission_view_view" + permission_view_menu = self.appbuilder.sm.add_permission_view_menu( + permission_name, view_name + ) + + uri = f"api/v1/permissionsviewmenus/{permission_view_menu.id}" + rv = self.auth_client_get(client, token, uri) + self.assertEqual(rv.status_code, 200) + + self.appbuilder.sm.del_permission_view_menu(permission_name, view_name, True) + + def test_get_invalid_permission_view_api(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + uri = "api/v1/permissionsviewmenus/9999999" + rv = self.auth_client_get(client, token, uri) + self.assertEqual(rv.status_code, 404) + + def test_add_permission_view_api(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + permission_name = "test_add_permission_3" + view_menu_name = "test_add_view_3" + + permission = self.appbuilder.sm.add_permission(permission_name) + view_menu = self.appbuilder.sm.add_view_menu(view_menu_name) + + uri = "api/v1/permissionsviewmenus/" + create_permission_payload = { + "permission_id": permission.id, + "view_menu_id": view_menu.id, + } + rv = self.auth_client_post(client, token, uri, create_permission_payload) + add_permission_response = json.loads(rv.data) + + self.assertEqual(rv.status_code, 201) + assert "id" and "result" in add_permission_response + self.assertEqual(create_permission_payload, add_permission_response["result"]) + + self.appbuilder.sm.del_permission_view_menu( + permission_name, view_menu_name, True + ) + + def test_edit_permission_view_api(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + permission_name = "test_edit_permission_view_permission" + view_name = "test_edit_permission_view" + new_view_name = "test_edit_permission_view_new" + permission_view_menu = self.appbuilder.sm.add_permission_view_menu( + permission_name, view_name + ) + new_view_menu = self.appbuilder.sm.add_view_menu(new_view_name) + + new_view_menu_id = new_view_menu.id + + uri = f"api/v1/permissionsviewmenus/{permission_view_menu.id}" + rv = self.auth_client_put( + client, token, uri, {"view_menu_id": new_view_menu.id} + ) + put_permission_response = json.loads(rv.data) + self.assertEqual(rv.status_code, 200) + self.assertEqual( + put_permission_response["result"].get("view_menu_id", None), + new_view_menu_id, + ) + + self.appbuilder.sm.del_view_menu(view_name) + self.appbuilder.sm.del_permission_view_menu( + permission_name, new_view_name, True + ) + + def test_delete_permission_view_api(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + permission_name = "test_delete_permission_view_permission_3" + view_name = "test_get_permission_view_3" + permission_view_menu = self.appbuilder.sm.add_permission_view_menu( + permission_name, view_name + ) + + uri = f"api/v1/permissionsviewmenus/{permission_view_menu.id}" + rv = self.auth_client_delete(client, token, uri) + self.assertEqual(rv.status_code, 200) + + pvm = self.appbuilder.sm.find_permission_view_menu(permission_name, view_name) + assert pvm is None + + def test_list_role_api(self): + """REST Api: Test role apis + """ + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + uri = "api/v1/roles/" + rv = self.auth_client_get(client, token, uri) + self.assertEqual(rv.status_code, 200) + + def test_get_role_api(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + role_name = "test_get_role_api_3" + role = self.appbuilder.sm.add_role(role_name) + role_id = role.id + + uri = f"api/v1/roles/{role_id}" + rv = self.auth_client_get(client, token, uri) + response = json.loads(rv.data) + + self.assertEqual(rv.status_code, 200) + assert "id" and "result" in response + self.assertEqual(response["result"].get("name", ""), role_name) + + self.session.delete(role) + self.session.commit() + + def test_create_role_api(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + uri = "api/v1/roles/" + role_name = "test_create_role_api" + create_user_payload = {"name": role_name} + rv = self.auth_client_post(client, token, uri, create_user_payload) + add_role_response = json.loads(rv.data) + self.assertEqual(rv.status_code, 201) + assert "id" and "result" in add_role_response + self.assertEqual(create_user_payload, add_role_response["result"]) + + role = self.session.query(self.role_model).filter_by(name=role_name).first() + self.session.delete(role) + self.session.commit() + + def test_edit_role_api(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + num = 3 + role_name = f"test_edit_role_api_{num}" + role_2_name = f"test_edit_role_api_{num+1}" + permission_1_name = f"test_edit_role_permission_{num}" + permission_2_name = f"test_edit_role_permission_{num+1}" + view_menu_name = f"test_edit_role_view_menu_{num}" + + role = self.appbuilder.sm.add_role(role_name) + + role_id = role.id + + uri = f"api/v1/roles/{role_id}" + rv = self.auth_client_put(client, token, uri, {"name": role_2_name}) + + put_role_response = json.loads(rv.data) + self.assertEqual(rv.status_code, 200) + + self.assertEqual(put_role_response["result"].get("name", ""), role_2_name) + + self.appbuilder.sm.del_permission_view_menu( + permission_1_name, view_menu_name, False + ) + self.appbuilder.sm.del_permission_view_menu( + permission_2_name, view_menu_name, False + ) + self.appbuilder.sm.del_permission(permission_1_name) + self.appbuilder.sm.del_permission(permission_2_name) + self.appbuilder.sm.del_view_menu(view_menu_name) + + role = self.appbuilder.sm.find_role(role_2_name) + + self.session.delete(role) + self.session.commit() + + def test_add_view_menu_permissions_to_role(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + num = 1 + role_name = f"test_edit_role_api_{num}" + permission_1_name = f"test_edit_role_permission_{num}" + permission_2_name = f"test_edit_role_permission_{num+1}" + view_menu_name = f"test_edit_role_view_menu_{num}" + + permission_1_view_menu = self.appbuilder.sm.add_permission_view_menu( + permission_1_name, view_menu_name + ) + permission_2_view_menu = self.appbuilder.sm.add_permission_view_menu( + permission_2_name, view_menu_name + ) + role = self.appbuilder.sm.add_role(role_name) + role_id = role.id + permission_1_view_menu_id = permission_1_view_menu.id + permission_2_view_menu_id = permission_2_view_menu.id + + uri = f"api/v1/roles/{role_id}/permissions" + rv = self.auth_client_post( + client, + token, + uri, + { + "permission_view_menu_ids": [ + permission_1_view_menu.id, + permission_2_view_menu.id, + ] + }, + ) + + post_permissions_response = json.loads(rv.data) + + self.assertEqual(rv.status_code, 200) + assert "result" in post_permissions_response + self.assertEqual( + post_permissions_response["result"]["permission_view_menu_ids"], + [permission_1_view_menu_id, permission_2_view_menu_id], + ) + + role = self.appbuilder.sm.find_role(role_name) + + self.assertEqual(len(role.permissions), 2) + self.assertEqual( + [p.id for p in role.permissions], + [permission_1_view_menu_id, permission_2_view_menu_id], + ) + + role = self.appbuilder.sm.find_role(role_name) + self.session.delete(role) + + self.appbuilder.sm.del_permission_view_menu( + permission_1_name, view_menu_name, cascade=True + ) + self.appbuilder.sm.del_permission_view_menu( + permission_2_name, view_menu_name, cascade=True + ) + + def test_add_invalid_view_menu_permissions_to_role(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + num = 1 + role_name = f"test_add_permissions_to_role_api_{num}" + + role = self.appbuilder.sm.add_role(role_name) + role_id = role.id + + uri = f"api/v1/roles/{role_id}/permissions" + rv = self.auth_client_post(client, token, uri, {}) + + self.assertEqual(rv.status_code, 400) + role = self.appbuilder.sm.find_role(role_name) + self.session.delete(role) + + def test_add_view_menu_permissions_to_invalid_role(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + num = 1 + permission_1_name = f"test_edit_role_permission_{num}" + permission_2_name = f"test_edit_role_permission_{num+1}" + view_menu_name = f"test_edit_role_view_menu_{num}" + + permission_1_view_menu = self.appbuilder.sm.add_permission_view_menu( + permission_1_name, view_menu_name + ) + permission_2_view_menu = self.appbuilder.sm.add_permission_view_menu( + permission_2_name, view_menu_name + ) + + uri = f"api/v1/roles/{9999999}/permissions" + rv = self.auth_client_post( + client, + token, + uri, + { + "permission_view_menu_ids": [ + permission_1_view_menu.id, + permission_2_view_menu.id, + ] + }, + ) + self.assertEqual(rv.status_code, 404) + self.appbuilder.sm.del_permission_view_menu( + permission_1_name, view_menu_name, cascade=True + ) + self.appbuilder.sm.del_permission_view_menu( + permission_2_name, view_menu_name, cascade=True + ) + + def test_list_view_menu_permissions_of_role(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + num = 1 + role_name = f"test_list_role_api_{num}" + permission_1_name = f"test_list_role_permission_{num}" + permission_2_name = f"test_list_role_permission_{num+1}" + view_menu_name = f"test_list_role_view_menu_{num}" + + permission_1_view_menu = self.appbuilder.sm.add_permission_view_menu( + permission_1_name, view_menu_name + ) + permission_2_view_menu = self.appbuilder.sm.add_permission_view_menu( + permission_2_name, view_menu_name + ) + role = self.appbuilder.sm.add_role(role_name) + self.appbuilder.sm.add_permission_role(role, permission_1_view_menu) + self.appbuilder.sm.add_permission_role(role, permission_2_view_menu) + + role_id = role.id + permission_1_view_menu_id = permission_1_view_menu.id + permission_2_view_menu_id = permission_2_view_menu.id + + uri = f"api/v1/roles/{role_id}/permissions" + rv = self.auth_client_get(client, token, uri) + + self.assertEqual(rv.status_code, 200) + + list_permissions_response = json.loads(rv.data) + + assert "result" in list_permissions_response + self.assertEqual(len(list_permissions_response["result"]), 2) + self.assertEqual( + list_permissions_response["result"], + [ + { + "id": permission_1_view_menu_id, + "permission_name": permission_1_name, + "view_menu_name": view_menu_name, + }, + { + "id": permission_2_view_menu_id, + "permission_name": permission_2_name, + "view_menu_name": view_menu_name, + }, + ], + ) + + role = self.appbuilder.sm.find_role(role_name) + self.session.delete(role) + + def test_list_view_menu_permissions_of_invalid_role(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + uri = f"api/v1/roles/{999999}/permissions" + rv = self.auth_client_get(client, token, uri) + + self.assertEqual(rv.status_code, 404) + + def test_delete_role_api(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + role_name = "test_delete_role_api" + permission_1_name = "test_delete_role_permission" + view_menu_name = "test_delete_role_view_menu" + + permission_1_view_menu = self.appbuilder.sm.add_permission_view_menu( + permission_1_name, view_menu_name + ) + role = self.appbuilder.sm.add_role(role_name, [permission_1_view_menu]) + + uri = f"api/v1/roles/{role.id}" + rv = self.auth_client_delete(client, token, uri) + self.assertEqual(rv.status_code, 200) + + get_role = self.appbuilder.sm.find_role(role_name) + assert get_role is None + + +class UserRolePermissionDisabledTestCase(FABTestCase): + def setUp(self): + from flask import Flask + from flask_appbuilder import AppBuilder + + self.app = Flask(__name__) + self.basedir = os.path.abspath(os.path.dirname(__file__)) + self.app.config.from_object("flask_appbuilder.tests.config_api") + self.db = SQLA(self.app) + self.appbuilder = AppBuilder(self.app, self.db.session) + + def tearDown(self): + self.appbuilder.get_session.close() + engine = self.db.session.get_bind(mapper=None, clause=None) + engine.dispose() + + def test_user_role_permission(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + uri = "api/v1/users/" + rv = self.auth_client_get(client, token, uri) + self.assertEqual(rv.status_code, 404) + + uri = "api/v1/roles/" + rv = self.auth_client_get(client, token, uri) + self.assertEqual(rv.status_code, 404) + + uri = "api/v1/permissions/" + rv = self.auth_client_get(client, token, uri) + self.assertEqual(rv.status_code, 404) + + uri = "api/v1/viewmenus/" + rv = self.auth_client_get(client, token, uri) + self.assertEqual(rv.status_code, 404) + + uri = "api/v1/permissionsviewmenus/" + rv = self.auth_client_get(client, token, uri) + self.assertEqual(rv.status_code, 404) + + +class UserCustomPasswordComplexityValidatorTestCase(FABTestCase): + def setUp(self): + from flask import Flask + from flask_appbuilder import AppBuilder + from flask_appbuilder.exceptions import PasswordComplexityValidationError + from flask_appbuilder.security.sqla.models import User + + def passwordValidator(password): + if len(password) < 5: + raise PasswordComplexityValidationError + + self.app = Flask(__name__) + self.basedir = os.path.abspath(os.path.dirname(__file__)) + self.app.config.from_object("flask_appbuilder.tests.config_api") + self.app.config["FAB_ADD_SECURITY_API"] = True + self.app.config["FAB_PASSWORD_COMPLEXITY_ENABLED"] = True + self.app.config["FAB_PASSWORD_COMPLEXITY_VALIDATOR"] = passwordValidator + self.db = SQLA(self.app) + self.appbuilder = AppBuilder(self.app, self.db.session) + self.user_model = User + + # TODO:remove this hack + for b in self.appbuilder.baseviews: + if hasattr(b, "datamodel") and b.datamodel.session is not None: + b.datamodel.session = self.db.session + + def tearDown(self): + self.appbuilder.get_session.close() + engine = self.db.session.get_bind(mapper=None, clause=None) + engine.dispose() + + def test_password_complexity(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + uri = "api/v1/users/" + create_user_payload = { + "active": True, + "email": "fab@usertest1.com", + "first_name": "fab", + "last_name": "admin", + "password": "a", + "roles": [1], + "username": "password complexity test user 10", + } + rv = self.auth_client_post(client, token, uri, create_user_payload) + self.assertEqual(rv.status_code, 400) + + create_user_payload["password"] = "bigger password" + rv = self.auth_client_post(client, token, uri, create_user_payload) + self.assertEqual(rv.status_code, 201) + + session = self.appbuilder.get_session + user = ( + session.query(self.user_model) + .filter(self.user_model.username == "password complexity test user 10") + .one_or_none() + ) + session.delete(user) + session.commit() + + +class UserDefaultPasswordComplexityValidatorTestCase(FABTestCase): + def setUp(self): + from flask import Flask + from flask_appbuilder import AppBuilder + from flask_appbuilder.exceptions import PasswordComplexityValidationError + from flask_appbuilder.security.sqla.models import User + + def passwordValidator(password): + if len(password) < 5: + raise PasswordComplexityValidationError + + self.app = Flask(__name__) + self.basedir = os.path.abspath(os.path.dirname(__file__)) + self.app.config.from_object("flask_appbuilder.tests.config_api") + self.app.config["FAB_ADD_SECURITY_API"] = True + self.app.config["FAB_PASSWORD_COMPLEXITY_ENABLED"] = True + self.db = SQLA(self.app) + self.appbuilder = AppBuilder(self.app, self.db.session) + self.user_model = User + + # TODO:remove this hack + for b in self.appbuilder.baseviews: + if hasattr(b, "datamodel") and b.datamodel.session is not None: + b.datamodel.session = self.db.session + + def tearDown(self): + self.appbuilder.get_session.close() + engine = self.db.session.get_bind(mapper=None, clause=None) + engine.dispose() + + def test_password_complexity(self): + client = self.app.test_client() + token = self.login(client, USERNAME_ADMIN, PASSWORD_ADMIN) + + uri = "api/v1/users/" + create_user_payload = { + "active": True, + "email": "fab@defalultpasswordtest.com", + "first_name": "fab", + "last_name": "admin", + "password": "this is very big pasword", + "roles": [1], + "username": "password complexity test user", + } + rv = self.auth_client_post(client, token, uri, create_user_payload) + self.assertEqual(rv.status_code, 400) + + create_user_payload["password"] = "AB@12abcef" + rv = self.auth_client_post(client, token, uri, create_user_payload) + self.assertEqual(rv.status_code, 201) + + session = self.appbuilder.get_session + user = ( + session.query(self.user_model) + .filter(self.user_model.username == "password complexity test user") + .one_or_none() + ) + session.delete(user) + session.commit()