From 0d5d903510f7b8a87b6ae8fca237e515b62cfcf3 Mon Sep 17 00:00:00 2001 From: dhirschf Date: Thu, 19 Jul 2018 16:30:20 +1000 Subject: [PATCH] Misc cleanup + fixes - Fixes #44 Empty DN templates are now ignored - Fixes #93 search_filter and allowed_groups are no longer mutually exclusive --- ldapauthenticator/ldapauthenticator.py | 280 ++++++++++++++----------- 1 file changed, 156 insertions(+), 124 deletions(-) diff --git a/ldapauthenticator/ldapauthenticator.py b/ldapauthenticator/ldapauthenticator.py index 1e9f380..925c0d9 100644 --- a/ldapauthenticator/ldapauthenticator.py +++ b/ldapauthenticator/ldapauthenticator.py @@ -42,7 +42,7 @@ def _server_port_default(self): ) bind_dn_template = Union( - [List(),Unicode()], + [List(), Unicode()], config=True, help=""" Template from which to construct the full dn @@ -204,53 +204,6 @@ def _server_port_default(self): """ ) - def resolve_username(self, username_supplied_by_user): - if self.lookup_dn: - server = ldap3.Server( - self.server_address, - port=self.server_port, - use_ssl=self.use_ssl - ) - - search_filter = self.lookup_dn_search_filter.format( - login_attr=self.user_attribute, - login=username_supplied_by_user - ) - self.log.debug( - "Looking up user with search_base={search_base}, search_filter='{search_filter}', attributes={attributes}".format( - search_base=self.user_search_base, - search_filter=search_filter, - attributes=self.user_attribute - ) - ) - - conn = ldap3.Connection(server, user=self.escape_userdn_if_needed(self.lookup_dn_search_user), password=self.lookup_dn_search_password) - is_bound = conn.bind() - if not is_bound: - self.log.warn("Can't connect to LDAP") - return None - - conn.search( - search_base=self.user_search_base, - search_scope=ldap3.SUBTREE, - search_filter=search_filter, - attributes=[self.lookup_dn_user_dn_attribute] - ) - - if len(conn.response) == 0 or 'attributes' not in conn.response[0].keys(): - self.log.warn('username:%s No such user entry found when looking up with attribute %s', username_supplied_by_user, - self.user_attribute) - return None - return conn.response[0]['attributes'][self.lookup_dn_user_dn_attribute] - else: - return username_supplied_by_user - - def escape_userdn_if_needed(self, userdn): - if self.escape_userdn: - return escape_filter_chars(userdn) - else: - return userdn - search_filter = Unicode( config=True, help="LDAP3 Search Filter whose results are allowed access" @@ -261,33 +214,84 @@ def escape_userdn_if_needed(self, userdn): help="List of attributes to be searched" ) + def resolve_username(self, username_supplied_by_user): + search_dn = self.lookup_dn_search_user + if self.escape_userdn: + search_dn = escape_filter_chars(search_dn) + conn = self.get_connection( + userdn=search_dn, + password=self.lookup_dn_search_password, + ) + is_bound = conn.bind() + if not is_bound: + msg = "Failed to connect to LDAP server with search user '{search_dn}'" + self.log.warn(msg.format(search_dn=search_dn)) + return None + + search_filter = self.lookup_dn_search_filter.format( + login_attr=self.user_attribute, + login=username_supplied_by_user, + ) + msg = '\n'.join([ + "Looking up user with:", + " search_base = '{search_base}'", + " search_filter = '{search_filter}'", + " attributes = '{attributes}'", + ]) + self.log.debug(msg.format( + search_base=self.user_search_base, + search_filter=search_filter, + attributes=self.user_attribute, + )) + conn.search( + search_base=self.user_search_base, + search_scope=ldap3.SUBTREE, + search_filter=search_filter, + attributes=[self.lookup_dn_user_dn_attribute], + ) + response = conn.response + if len(response) == 0 or 'attributes' not in response[0].keys(): + msg = ( + "No entry found for user '{username}' " + "when looking up attribute '{attribute}'" + ) + self.log.warn(msg.format( + username=username_supplied_by_user, + attribute=self.user_attribute, + )) + return None + return conn.response[0]['attributes'][self.lookup_dn_user_dn_attribute] + + def get_connection(self, userdn, password): + server = ldap3.Server( + self.server_address, + port=self.server_port, + use_ssl=self.use_ssl + ) + auto_bind = ( + self.use_ssl + and ldap3.AUTO_BIND_TLS_BEFORE_BIND + or ldap3.AUTO_BIND_NO_TLS + ) + conn = ldap3.Connection( + server, + user=userdn, + password=password, + auto_bind=auto_bind, + ) + return conn @gen.coroutine def authenticate(self, handler, data): username = data['username'] password = data['password'] - # Get LDAP Connection - def getConnection(userdn, username, password): - server = ldap3.Server( - self.server_address, - port=self.server_port, - use_ssl=self.use_ssl - ) - self.log.debug('Attempting to bind {username} with {userdn}'.format( - username=username, - userdn=userdn - )) - conn = ldap3.Connection( - server, - user=self.escape_userdn_if_needed(userdn), - password=password, - auto_bind=self.use_ssl and ldap3.AUTO_BIND_TLS_BEFORE_BIND or ldap3.AUTO_BIND_NO_TLS, - ) - return conn # Protect against invalid usernames as well as LDAP injection attacks if not re.match(self.valid_username_regex, username): - self.log.warn('username:%s Illegal characters in username, must match regex %s', username, self.valid_username_regex) + self.log.warn( + 'username:%s Illegal characters in username, must match regex %s', + username, self.valid_username_regex + ) return None # No empty passwords! @@ -295,90 +299,118 @@ def getConnection(userdn, username, password): self.log.warn('username:%s Login denied for blank password', username) return None - isBound = False - self.log.debug("TYPE= '%s'",isinstance(self.bind_dn_template, list)) - - resolved_username = self.resolve_username(username) - if resolved_username is None: - return None + if self.lookup_dn: + username = self.resolve_username(username) + if not username: + return None if self.lookup_dn: if str(self.lookup_dn_user_dn_attribute).upper() == 'CN': # Only escape commas if the lookup attribute is CN - resolved_username = re.subn(r"([^\\]),", r"\1\,", resolved_username)[0] + username = re.subn(r"([^\\]),", r"\1\,", username)[0] bind_dn_template = self.bind_dn_template if isinstance(bind_dn_template, str): # bind_dn_template should be of type List[str] bind_dn_template = [bind_dn_template] + is_bound = False for dn in bind_dn_template: - userdn = dn.format(username=resolved_username) - msg = 'Status of user bind {username} with {userdn} : {isBound}' + if not dn: + self.log.warn("Ignoring blank 'bind_dn_template' entry!") + continue + userdn = dn.format(username=username) + if self.escape_userdn: + userdn = escape_filter_chars(userdn) + msg = 'Attempting to bind {username} with {userdn}' + self.log.debug(msg.format(username=username, userdn=userdn)) + msg = "Status of user bind {username} with {userdn} : {is_bound}" try: - conn = getConnection(userdn, username, password) + conn = self.get_connection(userdn, password) except ldap3.core.exceptions.LDAPBindError as exc: - isBound = False + is_bound = False msg += '\n{exc_type}: {exc_msg}'.format( exc_type=exc.__class__.__name__, exc_msg=exc.args[0] if exc.args else '' ) else: - isBound = conn.bind() + is_bound = conn.bind() msg = msg.format( username=username, userdn=userdn, - isBound=isBound + is_bound=is_bound ) self.log.debug(msg) - if isBound: + if is_bound: break - if isBound: - if self.allowed_groups: - self.log.debug('username:%s Using dn %s', username, userdn) - for group in self.allowed_groups: - groupfilter = ( - '(|' - '(member={userdn})' - '(uniqueMember={userdn})' - '(memberUid={uid})' - ')' - ).format(userdn=escape_filter_chars(userdn), uid=escape_filter_chars(username)) - groupattributes = ['member', 'uniqueMember', 'memberUid'] - if conn.search( - group, - search_scope=ldap3.BASE, - search_filter=groupfilter, - attributes=groupattributes - ): - return username - # If we reach here, then none of the groups matched - self.log.warn('username:%s User not in any of the allowed groups', username) + if not is_bound: + msg = "Invalid password for user '{username}'" + self.log.warn(msg.format(username=username)) + return None + + if self.search_filter: + search_filter = self.search_filter.format( + userattr=self.user_attribute, + username=username, + ) + conn.search( + search_base=self.user_search_base, + search_scope=ldap3.SUBTREE, + search_filter=search_filter, + attributes=self.attributes + ) + n_users = len(conn.response) + if n_users == 0: + msg = "User with '{userattr}={username}' not found in directory" + self.log.warn(msg.format( + userattr=self.user_attribute, + username=username) + ) return None - elif self.search_filter: - conn.search( - search_base=self.user_search_base, - search_scope=ldap3.SUBTREE, - search_filter=self.search_filter.format(userattr=self.user_attribute,username=username), - attributes=self.attributes + if n_users > 1: + msg = ( + "Duplicate users found! " + "{n_users} users found with '{userattr}={username}'" ) - if len(conn.response) == 0: - self.log.warn('User with {userattr}={username} not found in directory'.format( - userattr=self.user_attribute, username=username)) - return None - elif len(conn.response) > 1: - self.log.warn('User with {userattr}={username} found more than {len}-fold in directory'.format( - userattr=self.user_attribute, username=username, len=len(conn.response))) - return None - return username - else: - return username - else: - self.log.warn('Invalid password for user {username}'.format( - username=username, - )) - return None + self.log.warn(msg.format( + userattr=self.user_attribute, + username=username, + n_users=n_users) + ) + return None + + if self.allowed_groups: + self.log.debug('username:%s Using dn %s', username, userdn) + found = False + for group in self.allowed_groups: + group_filter = ( + '(|' + '(member={userdn})' + '(uniqueMember={userdn})' + '(memberUid={uid})' + ')' + ) + group_filter = group_filter.format( + userdn=userdn, + uid=username + ) + group_attributes = ['member', 'uniqueMember', 'memberUid'] + found = conn.search( + group, + search_scope=ldap3.BASE, + search_filter=group_filter, + attributes=group_attributes + ) + if found: + break + if not found: + # If we reach here, then none of the groups matched + msg = 'username:{username} User not in any of the allowed groups' + self.log.warn(msg.format(username=username)) + return None + + return username if __name__ == "__main__":