diff --git a/src/middlewared/middlewared/plugins/account.py b/src/middlewared/middlewared/plugins/account.py index c24eb19490f58..cdd1b66f267fd 100644 --- a/src/middlewared/middlewared/plugins/account.py +++ b/src/middlewared/middlewared/plugins/account.py @@ -24,6 +24,7 @@ import middlewared.sqlalchemy as sa from middlewared.utils import run, filter_list from middlewared.utils.crypto import sha512_crypt +from middlewared.utils.directoryservices.constants import DSType, DSStatus from middlewared.utils.nss import pwd, grp from middlewared.utils.nss.nss_common import NssModule from middlewared.utils.privilege import credential_has_full_admin, privileges_group_mapping @@ -251,9 +252,6 @@ async def query(self, filters, options): The following `additional_information` options are supported: `SMB` - include Windows SID and NT Name for user. If this option is not specified, then these keys will have `null` value. - `DS` - include users from Directory Service (LDAP or Active Directory) in results - - `"extra": {"search_dscache": true}` is a legacy method of querying for directory services users. """ ds_users = [] options = options or {} @@ -269,13 +267,8 @@ async def query(self, filters, options): datastore_options.pop('select', None) extra = options.get('extra', {}) - dssearch = extra.pop('search_dscache', False) additional_information = extra.get('additional_information', []) - if 'DS' in additional_information: - dssearch = True - additional_information.remove('DS') - username_sid = {} if 'SMB' in additional_information: try: @@ -289,16 +282,23 @@ async def query(self, filters, options): # broken self.logger.error('Failed to retrieve passdb information', exc_info=True) - if dssearch: - ds_state = await self.middleware.call('directoryservices.get_state') - if ds_state['activedirectory'] == 'HEALTHY' or ds_state['ldap'] == 'HEALTHY': - ds_users = await self.middleware.call('directoryservices.cache.query', 'USER', filters, options.copy()) - # For AD users, we will not have 2FA attribute normalized so let's do that - ad_users_2fa_mapping = await self.middleware.call('auth.twofactor.get_ad_users') - for index, user in enumerate(filter( - lambda u: not u['local'] and 'twofactor_auth_configured' not in u, ds_users) - ): - ds_users[index]['twofactor_auth_configured'] = bool(ad_users_2fa_mapping.get(user['sid'])) + if ['local', '=', True] not in filters: + ds = await self.middleware.call('directoryservices.summary') + if ds['type'] is not None and ds['status'] == DSStatus.HEALTHY.value: + ds_users = await self.middleware.call( + 'directoryservices.cache.query', 'USER', filters, options.copy() + ) + + match DSType[ds['type'].lower()]: + case DSType.AD: + ad_users_2fa_mapping = await self.middleware.call('auth.twofactor.get_ad_users') + for index, user in enumerate(filter( + lambda u: not u['local'] and 'twofactor_auth_configured' not in u, ds_users) + ): + ds_users[index]['twofactor_auth_configured'] = bool(ad_users_2fa_mapping.get(user['sid'])) + case _: + # FIXME - map twofactor_auth_configured hint for LDAP users + pass result = await self.middleware.call( 'datastore.query', self._config.datastore, [], datastore_options @@ -1713,9 +1713,6 @@ async def query(self, filters, options): The following `additional_information` options are supported: `SMB` - include Windows SID and NT Name for group. If this option is not specified, then these keys will have `null` value. - `DS` - include groups from Directory Service (LDAP or Active Directory) in results - - `"extra": {"search_dscache": true}` is a legacy method of querying for directory services groups. """ ds_groups = [] options = options or {} @@ -1731,16 +1728,11 @@ async def query(self, filters, options): datastore_options.pop('select', None) extra = options.get('extra', {}) - dssearch = extra.pop('search_dscache', False) additional_information = extra.get('additional_information', []) - if 'DS' in additional_information: - dssearch = True - additional_information.remove('DS') - - if dssearch: - ds_state = await self.middleware.call('directoryservices.get_state') - if ds_state['activedirectory'] == 'HEALTHY' or ds_state['ldap'] == 'HEALTHY': + if ['local', '=', True] not in filters: + ds = await self.middleware.call('directoryservices.summary') + if ds['type'] is not None and ds['status'] == DSStatus.HEALTHY: ds_groups = await self.middleware.call('directoryservices.cache.query', 'GROUP', filters, options) if 'SMB' in additional_information: