Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Misc cleanup + fixes #95

Merged
merged 1 commit into from
Aug 6, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 156 additions & 124 deletions ldapauthenticator/ldapauthenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def _server_port_default(self):
)

bind_dn_template = Union(
[List(),Unicode()],
[List(), Unicode()],
config=True,
help="""
Template from which to construct the full dn
Expand Down Expand Up @@ -204,53 +204,6 @@ def _server_port_default(self):
"""
)

def resolve_username(self, username_supplied_by_user):
if self.lookup_dn:
server = ldap3.Server(
self.server_address,
port=self.server_port,
use_ssl=self.use_ssl
)

search_filter = self.lookup_dn_search_filter.format(
login_attr=self.user_attribute,
login=username_supplied_by_user
)
self.log.debug(
"Looking up user with search_base={search_base}, search_filter='{search_filter}', attributes={attributes}".format(
search_base=self.user_search_base,
search_filter=search_filter,
attributes=self.user_attribute
)
)

conn = ldap3.Connection(server, user=self.escape_userdn_if_needed(self.lookup_dn_search_user), password=self.lookup_dn_search_password)
is_bound = conn.bind()
if not is_bound:
self.log.warn("Can't connect to LDAP")
return None

conn.search(
search_base=self.user_search_base,
search_scope=ldap3.SUBTREE,
search_filter=search_filter,
attributes=[self.lookup_dn_user_dn_attribute]
)

if len(conn.response) == 0 or 'attributes' not in conn.response[0].keys():
self.log.warn('username:%s No such user entry found when looking up with attribute %s', username_supplied_by_user,
self.user_attribute)
return None
return conn.response[0]['attributes'][self.lookup_dn_user_dn_attribute]
else:
return username_supplied_by_user

def escape_userdn_if_needed(self, userdn):
if self.escape_userdn:
return escape_filter_chars(userdn)
else:
return userdn

search_filter = Unicode(
config=True,
help="LDAP3 Search Filter whose results are allowed access"
Expand All @@ -261,124 +214,203 @@ def escape_userdn_if_needed(self, userdn):
help="List of attributes to be searched"
)

def resolve_username(self, username_supplied_by_user):
search_dn = self.lookup_dn_search_user
if self.escape_userdn:
search_dn = escape_filter_chars(search_dn)
conn = self.get_connection(
userdn=search_dn,
password=self.lookup_dn_search_password,
)
is_bound = conn.bind()
if not is_bound:
msg = "Failed to connect to LDAP server with search user '{search_dn}'"
self.log.warn(msg.format(search_dn=search_dn))
return None

search_filter = self.lookup_dn_search_filter.format(
login_attr=self.user_attribute,
login=username_supplied_by_user,
)
msg = '\n'.join([
"Looking up user with:",
" search_base = '{search_base}'",
" search_filter = '{search_filter}'",
" attributes = '{attributes}'",
])
self.log.debug(msg.format(
search_base=self.user_search_base,
search_filter=search_filter,
attributes=self.user_attribute,
))
conn.search(
search_base=self.user_search_base,
search_scope=ldap3.SUBTREE,
search_filter=search_filter,
attributes=[self.lookup_dn_user_dn_attribute],
)
response = conn.response
if len(response) == 0 or 'attributes' not in response[0].keys():
msg = (
"No entry found for user '{username}' "
"when looking up attribute '{attribute}'"
)
self.log.warn(msg.format(
username=username_supplied_by_user,
attribute=self.user_attribute,
))
return None
return conn.response[0]['attributes'][self.lookup_dn_user_dn_attribute]

def get_connection(self, userdn, password):
server = ldap3.Server(
self.server_address,
port=self.server_port,
use_ssl=self.use_ssl
)
auto_bind = (
self.use_ssl
and ldap3.AUTO_BIND_TLS_BEFORE_BIND
or ldap3.AUTO_BIND_NO_TLS
)
conn = ldap3.Connection(
server,
user=userdn,
password=password,
auto_bind=auto_bind,
)
return conn

@gen.coroutine
def authenticate(self, handler, data):
username = data['username']
password = data['password']
# Get LDAP Connection
def getConnection(userdn, username, password):
server = ldap3.Server(
self.server_address,
port=self.server_port,
use_ssl=self.use_ssl
)
self.log.debug('Attempting to bind {username} with {userdn}'.format(
username=username,
userdn=userdn
))
conn = ldap3.Connection(
server,
user=self.escape_userdn_if_needed(userdn),
password=password,
auto_bind=self.use_ssl and ldap3.AUTO_BIND_TLS_BEFORE_BIND or ldap3.AUTO_BIND_NO_TLS,
)
return conn

# Protect against invalid usernames as well as LDAP injection attacks
if not re.match(self.valid_username_regex, username):
self.log.warn('username:%s Illegal characters in username, must match regex %s', username, self.valid_username_regex)
self.log.warn(
'username:%s Illegal characters in username, must match regex %s',
username, self.valid_username_regex
)
return None

# No empty passwords!
if password is None or password.strip() == '':
self.log.warn('username:%s Login denied for blank password', username)
return None

isBound = False
self.log.debug("TYPE= '%s'",isinstance(self.bind_dn_template, list))

resolved_username = self.resolve_username(username)
if resolved_username is None:
return None
if self.lookup_dn:
username = self.resolve_username(username)
if not username:
return None

if self.lookup_dn:
if str(self.lookup_dn_user_dn_attribute).upper() == 'CN':
# Only escape commas if the lookup attribute is CN
resolved_username = re.subn(r"([^\\]),", r"\1\,", resolved_username)[0]
username = re.subn(r"([^\\]),", r"\1\,", username)[0]

bind_dn_template = self.bind_dn_template
if isinstance(bind_dn_template, str):
# bind_dn_template should be of type List[str]
bind_dn_template = [bind_dn_template]

is_bound = False
for dn in bind_dn_template:
userdn = dn.format(username=resolved_username)
msg = 'Status of user bind {username} with {userdn} : {isBound}'
if not dn:
self.log.warn("Ignoring blank 'bind_dn_template' entry!")
continue
userdn = dn.format(username=username)
if self.escape_userdn:
userdn = escape_filter_chars(userdn)
msg = 'Attempting to bind {username} with {userdn}'
self.log.debug(msg.format(username=username, userdn=userdn))
msg = "Status of user bind {username} with {userdn} : {is_bound}"
try:
conn = getConnection(userdn, username, password)
conn = self.get_connection(userdn, password)
except ldap3.core.exceptions.LDAPBindError as exc:
isBound = False
is_bound = False
msg += '\n{exc_type}: {exc_msg}'.format(
exc_type=exc.__class__.__name__,
exc_msg=exc.args[0] if exc.args else ''
)
else:
isBound = conn.bind()
is_bound = conn.bind()
msg = msg.format(
username=username,
userdn=userdn,
isBound=isBound
is_bound=is_bound
)
self.log.debug(msg)
if isBound:
if is_bound:
break

if isBound:
if self.allowed_groups:
self.log.debug('username:%s Using dn %s', username, userdn)
for group in self.allowed_groups:
groupfilter = (
'(|'
'(member={userdn})'
'(uniqueMember={userdn})'
'(memberUid={uid})'
')'
).format(userdn=escape_filter_chars(userdn), uid=escape_filter_chars(username))
groupattributes = ['member', 'uniqueMember', 'memberUid']
if conn.search(
group,
search_scope=ldap3.BASE,
search_filter=groupfilter,
attributes=groupattributes
):
return username
# If we reach here, then none of the groups matched
self.log.warn('username:%s User not in any of the allowed groups', username)
if not is_bound:
msg = "Invalid password for user '{username}'"
self.log.warn(msg.format(username=username))
return None

if self.search_filter:
search_filter = self.search_filter.format(
userattr=self.user_attribute,
username=username,
)
conn.search(
search_base=self.user_search_base,
search_scope=ldap3.SUBTREE,
search_filter=search_filter,
attributes=self.attributes
)
n_users = len(conn.response)
if n_users == 0:
msg = "User with '{userattr}={username}' not found in directory"
self.log.warn(msg.format(
userattr=self.user_attribute,
username=username)
)
return None
elif self.search_filter:
conn.search(
search_base=self.user_search_base,
search_scope=ldap3.SUBTREE,
search_filter=self.search_filter.format(userattr=self.user_attribute,username=username),
attributes=self.attributes
if n_users > 1:
msg = (
"Duplicate users found! "
"{n_users} users found with '{userattr}={username}'"
)
if len(conn.response) == 0:
self.log.warn('User with {userattr}={username} not found in directory'.format(
userattr=self.user_attribute, username=username))
return None
elif len(conn.response) > 1:
self.log.warn('User with {userattr}={username} found more than {len}-fold in directory'.format(
userattr=self.user_attribute, username=username, len=len(conn.response)))
return None
return username
else:
return username
else:
self.log.warn('Invalid password for user {username}'.format(
username=username,
))
return None
self.log.warn(msg.format(
userattr=self.user_attribute,
username=username,
n_users=n_users)
)
return None

if self.allowed_groups:
self.log.debug('username:%s Using dn %s', username, userdn)
found = False
for group in self.allowed_groups:
group_filter = (
'(|'
'(member={userdn})'
'(uniqueMember={userdn})'
'(memberUid={uid})'
')'
)
group_filter = group_filter.format(
userdn=userdn,
uid=username
)
group_attributes = ['member', 'uniqueMember', 'memberUid']
found = conn.search(
group,
search_scope=ldap3.BASE,
search_filter=group_filter,
attributes=group_attributes
)
if found:
break
if not found:
# If we reach here, then none of the groups matched
msg = 'username:{username} User not in any of the allowed groups'
self.log.warn(msg.format(username=username))
return None

return username


if __name__ == "__main__":
Expand Down