Skip to content

Commit

Permalink
Merge pull request #294 from consideRatio/pr/conn-stuff
Browse files Browse the repository at this point in the history
Fix parsing of search response
  • Loading branch information
consideRatio authored Nov 5, 2024
2 parents 0af43b8 + 769f4fb commit f338ca3
Showing 1 changed file with 55 additions and 39 deletions.
94 changes: 55 additions & 39 deletions ldapauthenticator/ldapauthenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,38 +455,49 @@ def resolve_username(self, username_supplied_by_user):
search_filter=search_filter,
attributes=[self.lookup_dn_user_dn_attribute],
)
response = conn.response
if len(response) == 0:
self.log.warning(
f"Failed to lookup a DN for username '{username_supplied_by_user}'"
)
return (None, None)
if len(response) > 1:
self.log.warning(
f"Failed to lookup a unique DN for username '{username_supplied_by_user}'"
)

# identify unique search response entry
n_entries = len(conn.entries)
if n_entries == 0:
self.log.warning(f"No response looking up '{username_supplied_by_user}'")
return (None, None)
if "attributes" not in response[0].keys():
self.log.warning(
f"Failed to lookup attribute '{self.lookup_dn_user_dn_attribute}' for username '{username_supplied_by_user}'"
if n_entries > 1:
self.log.error(
f"Looking up '{username_supplied_by_user}' gave multiple entries, "
f"expected 0 or 1 search response entries but received {n_entries}. "
"Is lookup_dn_search_filter and user_attribute configured to get a "
"unique match?"
)
return (None, None)
entry = conn.entries[0]

userdn = response[0]["dn"]
username = response[0]["attributes"][self.lookup_dn_user_dn_attribute]
if isinstance(username, list):
if len(username) == 0:
return (None, None)
elif len(username) == 1:
username = username[0]
# identify unique attribute value within the entry
attribute_values = entry.entry_attributes_as_dict.get(
self.lookup_dn_user_dn_attribute
)
if not attribute_values:
if attribute_values is None:
self.log.error(
f"No attribute '{self.lookup_dn_user_dn_attribute}' found. "
"Is lookup_dn_user_dn_attribute configured correctly?"
)
else:
self.log.error(
f"A lookup of the username '{username_supplied_by_user}' returned a list "
f"of entries for the attribute '{self.lookup_dn_user_dn_attribute}': "
f"({', '.join(username)})"
f"No attribute values for '{self.lookup_dn_user_dn_attribute}'. "
"Is lookup_dn_user_dn_attribute configured correctly?"
)
return None, None
return (None, None)
if len(attribute_values) > 1:
self.log.error(
f"Attribute '{self.lookup_dn_user_dn_attribute}' had multiple values, "
f"expected one attribute value but it had {len(attribute_values)} "
f"({';'.join(attribute_values)}). "
"Is lookup_dn_user_dn_attribute configured correctly?"
)
return None, None

userdn = entry.entry_dn
username = attribute_values[0]
return (username, userdn)

def get_connection(self, userdn, password):
Expand Down Expand Up @@ -539,17 +550,24 @@ def get_connection(self, userdn, password):
return conn

def get_user_attributes(self, conn, userdn):
attrs = {}
if self.auth_state_attributes:
found = conn.search(
conn.search(
search_base=userdn,
search_scope=ldap3.SUBTREE,
search_filter="(objectClass=*)",
attributes=self.auth_state_attributes,
)
if found:
attrs = conn.entries[0].entry_attributes_as_dict
return attrs

# identify unique search response entry
n_entries = len(conn.entries)
if n_entries == 1:
return conn.entries[0].entry_attributes_as_dict
self.log.error(
f"Expected 1 but got {n_entries} search response entries for DN '{userdn}' "
"when looking up attributes configured via auth_state_attributes. The user's "
"auth state will not include any attributes."
)
return {}

async def authenticate(self, handler, data):
"""
Expand Down Expand Up @@ -584,6 +602,9 @@ async def authenticate(self, handler, data):
if self.lookup_dn:
resolved_username, resolved_dn = self.resolve_username(login_username)
if not resolved_dn:
self.log.warning(
"username:%s Login denied for failed lookup", login_username
)
return None
if not bind_dn_template:
bind_dn_template = [resolved_dn]
Expand Down Expand Up @@ -632,18 +653,13 @@ async def authenticate(self, handler, data):
),
attributes=self.attributes,
)
n_users = len(conn.response)
if n_users == 0:
self.log.warning(
"Configured search_filter found no user associated with "
f"userattr='{self.user_attribute}' and username='{resolved_username}'"
)
return None
if n_users > 1:
n_entries = len(conn.entries)
if n_entries != 1:
self.log.warning(
"Configured search_filter found multiple users associated with "
f"Login of '{login_username}' denied. Configured search_filter "
f"found {n_entries} users associated with "
f"userattr='{self.user_attribute}' and username='{resolved_username}', "
"a unique match is required."
"and a unique match is required."
)
return None

Expand Down

0 comments on commit f338ca3

Please sign in to comment.