Skip to content

Commit

Permalink
Merge pull request #95 from dhirschfeld/misc-fixes
Browse files Browse the repository at this point in the history
Misc cleanup + fixes
  • Loading branch information
dhirschfeld authored Aug 6, 2018
2 parents 718eb59 + 0d5d903 commit 145fc55
Showing 1 changed file with 156 additions and 124 deletions.
280 changes: 156 additions & 124 deletions ldapauthenticator/ldapauthenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def _server_port_default(self):
)

bind_dn_template = Union(
[List(),Unicode()],
[List(), Unicode()],
config=True,
help="""
Template from which to construct the full dn
Expand Down Expand Up @@ -204,53 +204,6 @@ def _server_port_default(self):
"""
)

def resolve_username(self, username_supplied_by_user):
if self.lookup_dn:
server = ldap3.Server(
self.server_address,
port=self.server_port,
use_ssl=self.use_ssl
)

search_filter = self.lookup_dn_search_filter.format(
login_attr=self.user_attribute,
login=username_supplied_by_user
)
self.log.debug(
"Looking up user with search_base={search_base}, search_filter='{search_filter}', attributes={attributes}".format(
search_base=self.user_search_base,
search_filter=search_filter,
attributes=self.user_attribute
)
)

conn = ldap3.Connection(server, user=self.escape_userdn_if_needed(self.lookup_dn_search_user), password=self.lookup_dn_search_password)
is_bound = conn.bind()
if not is_bound:
self.log.warn("Can't connect to LDAP")
return None

conn.search(
search_base=self.user_search_base,
search_scope=ldap3.SUBTREE,
search_filter=search_filter,
attributes=[self.lookup_dn_user_dn_attribute]
)

if len(conn.response) == 0 or 'attributes' not in conn.response[0].keys():
self.log.warn('username:%s No such user entry found when looking up with attribute %s', username_supplied_by_user,
self.user_attribute)
return None
return conn.response[0]['attributes'][self.lookup_dn_user_dn_attribute]
else:
return username_supplied_by_user

def escape_userdn_if_needed(self, userdn):
if self.escape_userdn:
return escape_filter_chars(userdn)
else:
return userdn

search_filter = Unicode(
config=True,
help="LDAP3 Search Filter whose results are allowed access"
Expand All @@ -261,124 +214,203 @@ def escape_userdn_if_needed(self, userdn):
help="List of attributes to be searched"
)

def resolve_username(self, username_supplied_by_user):
search_dn = self.lookup_dn_search_user
if self.escape_userdn:
search_dn = escape_filter_chars(search_dn)
conn = self.get_connection(
userdn=search_dn,
password=self.lookup_dn_search_password,
)
is_bound = conn.bind()
if not is_bound:
msg = "Failed to connect to LDAP server with search user '{search_dn}'"
self.log.warn(msg.format(search_dn=search_dn))
return None

search_filter = self.lookup_dn_search_filter.format(
login_attr=self.user_attribute,
login=username_supplied_by_user,
)
msg = '\n'.join([
"Looking up user with:",
" search_base = '{search_base}'",
" search_filter = '{search_filter}'",
" attributes = '{attributes}'",
])
self.log.debug(msg.format(
search_base=self.user_search_base,
search_filter=search_filter,
attributes=self.user_attribute,
))
conn.search(
search_base=self.user_search_base,
search_scope=ldap3.SUBTREE,
search_filter=search_filter,
attributes=[self.lookup_dn_user_dn_attribute],
)
response = conn.response
if len(response) == 0 or 'attributes' not in response[0].keys():
msg = (
"No entry found for user '{username}' "
"when looking up attribute '{attribute}'"
)
self.log.warn(msg.format(
username=username_supplied_by_user,
attribute=self.user_attribute,
))
return None
return conn.response[0]['attributes'][self.lookup_dn_user_dn_attribute]

def get_connection(self, userdn, password):
server = ldap3.Server(
self.server_address,
port=self.server_port,
use_ssl=self.use_ssl
)
auto_bind = (
self.use_ssl
and ldap3.AUTO_BIND_TLS_BEFORE_BIND
or ldap3.AUTO_BIND_NO_TLS
)
conn = ldap3.Connection(
server,
user=userdn,
password=password,
auto_bind=auto_bind,
)
return conn

@gen.coroutine
def authenticate(self, handler, data):
username = data['username']
password = data['password']
# Get LDAP Connection
def getConnection(userdn, username, password):
server = ldap3.Server(
self.server_address,
port=self.server_port,
use_ssl=self.use_ssl
)
self.log.debug('Attempting to bind {username} with {userdn}'.format(
username=username,
userdn=userdn
))
conn = ldap3.Connection(
server,
user=self.escape_userdn_if_needed(userdn),
password=password,
auto_bind=self.use_ssl and ldap3.AUTO_BIND_TLS_BEFORE_BIND or ldap3.AUTO_BIND_NO_TLS,
)
return conn

# Protect against invalid usernames as well as LDAP injection attacks
if not re.match(self.valid_username_regex, username):
self.log.warn('username:%s Illegal characters in username, must match regex %s', username, self.valid_username_regex)
self.log.warn(
'username:%s Illegal characters in username, must match regex %s',
username, self.valid_username_regex
)
return None

# No empty passwords!
if password is None or password.strip() == '':
self.log.warn('username:%s Login denied for blank password', username)
return None

isBound = False
self.log.debug("TYPE= '%s'",isinstance(self.bind_dn_template, list))

resolved_username = self.resolve_username(username)
if resolved_username is None:
return None
if self.lookup_dn:
username = self.resolve_username(username)
if not username:
return None

if self.lookup_dn:
if str(self.lookup_dn_user_dn_attribute).upper() == 'CN':
# Only escape commas if the lookup attribute is CN
resolved_username = re.subn(r"([^\\]),", r"\1\,", resolved_username)[0]
username = re.subn(r"([^\\]),", r"\1\,", username)[0]

bind_dn_template = self.bind_dn_template
if isinstance(bind_dn_template, str):
# bind_dn_template should be of type List[str]
bind_dn_template = [bind_dn_template]

is_bound = False
for dn in bind_dn_template:
userdn = dn.format(username=resolved_username)
msg = 'Status of user bind {username} with {userdn} : {isBound}'
if not dn:
self.log.warn("Ignoring blank 'bind_dn_template' entry!")
continue
userdn = dn.format(username=username)
if self.escape_userdn:
userdn = escape_filter_chars(userdn)
msg = 'Attempting to bind {username} with {userdn}'
self.log.debug(msg.format(username=username, userdn=userdn))
msg = "Status of user bind {username} with {userdn} : {is_bound}"
try:
conn = getConnection(userdn, username, password)
conn = self.get_connection(userdn, password)
except ldap3.core.exceptions.LDAPBindError as exc:
isBound = False
is_bound = False
msg += '\n{exc_type}: {exc_msg}'.format(
exc_type=exc.__class__.__name__,
exc_msg=exc.args[0] if exc.args else ''
)
else:
isBound = conn.bind()
is_bound = conn.bind()
msg = msg.format(
username=username,
userdn=userdn,
isBound=isBound
is_bound=is_bound
)
self.log.debug(msg)
if isBound:
if is_bound:
break

if isBound:
if self.allowed_groups:
self.log.debug('username:%s Using dn %s', username, userdn)
for group in self.allowed_groups:
groupfilter = (
'(|'
'(member={userdn})'
'(uniqueMember={userdn})'
'(memberUid={uid})'
')'
).format(userdn=escape_filter_chars(userdn), uid=escape_filter_chars(username))
groupattributes = ['member', 'uniqueMember', 'memberUid']
if conn.search(
group,
search_scope=ldap3.BASE,
search_filter=groupfilter,
attributes=groupattributes
):
return username
# If we reach here, then none of the groups matched
self.log.warn('username:%s User not in any of the allowed groups', username)
if not is_bound:
msg = "Invalid password for user '{username}'"
self.log.warn(msg.format(username=username))
return None

if self.search_filter:
search_filter = self.search_filter.format(
userattr=self.user_attribute,
username=username,
)
conn.search(
search_base=self.user_search_base,
search_scope=ldap3.SUBTREE,
search_filter=search_filter,
attributes=self.attributes
)
n_users = len(conn.response)
if n_users == 0:
msg = "User with '{userattr}={username}' not found in directory"
self.log.warn(msg.format(
userattr=self.user_attribute,
username=username)
)
return None
elif self.search_filter:
conn.search(
search_base=self.user_search_base,
search_scope=ldap3.SUBTREE,
search_filter=self.search_filter.format(userattr=self.user_attribute,username=username),
attributes=self.attributes
if n_users > 1:
msg = (
"Duplicate users found! "
"{n_users} users found with '{userattr}={username}'"
)
if len(conn.response) == 0:
self.log.warn('User with {userattr}={username} not found in directory'.format(
userattr=self.user_attribute, username=username))
return None
elif len(conn.response) > 1:
self.log.warn('User with {userattr}={username} found more than {len}-fold in directory'.format(
userattr=self.user_attribute, username=username, len=len(conn.response)))
return None
return username
else:
return username
else:
self.log.warn('Invalid password for user {username}'.format(
username=username,
))
return None
self.log.warn(msg.format(
userattr=self.user_attribute,
username=username,
n_users=n_users)
)
return None

if self.allowed_groups:
self.log.debug('username:%s Using dn %s', username, userdn)
found = False
for group in self.allowed_groups:
group_filter = (
'(|'
'(member={userdn})'
'(uniqueMember={userdn})'
'(memberUid={uid})'
')'
)
group_filter = group_filter.format(
userdn=userdn,
uid=username
)
group_attributes = ['member', 'uniqueMember', 'memberUid']
found = conn.search(
group,
search_scope=ldap3.BASE,
search_filter=group_filter,
attributes=group_attributes
)
if found:
break
if not found:
# If we reach here, then none of the groups matched
msg = 'username:{username} User not in any of the allowed groups'
self.log.warn(msg.format(username=username))
return None

return username


if __name__ == "__main__":
Expand Down

0 comments on commit 145fc55

Please sign in to comment.