Skip to content

Commit

Permalink
Retrieve DN using a search query
Browse files Browse the repository at this point in the history
When lookup_dn is set to True, execute LDAP query to retrieve DN.
Do not use bind_dn_template in that case.
  • Loading branch information
Benjamin Bertrand committed Apr 20, 2017
1 parent ebbdfdb commit 0c0583e
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 73 deletions.
8 changes: 1 addition & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ Don't forget the preceeding `c.` for setting configuration parameters! JupyterHu
uses [traitlets](https://traitlets.readthedocs.io) for configuration, and the `c` represents the [config object](https://traitlets.readthedocs.io/en/stable/config.html).

The `{username}` is expanded into the username the user provides.
This parameter is not used when `LDAPAuthenticator.lookup_dn` is set to True.

### Optional configuration ###

Expand Down Expand Up @@ -147,12 +148,6 @@ Technical account for user lookup, if `lookup_dn` is set to True.
If both lookup_dn_search_user and lookup_dn_search_password are None, then anonymous LDAP query will be done.


#### `LDAPAuthenticator.lookup_dn_user_dn_attribute` ####

Attribute containing user's name needed for building DN string, if `lookup_dn` is set to True.
See `user_search_base` for info on how this attribute is used.
For most LDAP servers, this is username. For Active Directory, it is cn.

#### `LDAPAuthenticator.escape_userdn` ####

If set to True, escape special chars in userdn when authenticating in LDAP.
Expand Down Expand Up @@ -180,7 +175,6 @@ c.LDAPAuthenticator.lookup_dn_search_user = 'ldap_search_user_technical_account'
c.LDAPAuthenticator.lookup_dn_search_password = 'secret'
c.LDAPAuthenticator.user_search_base = 'ou=people,dc=wikimedia,dc=org'
c.LDAPAuthenticator.user_attribute = 'sAMAccountName'
c.LDAPAuthenticator.lookup_dn_user_dn_attribute = 'cn'
c.LDAPAuthenticator.escape_userdn = False
```

Expand Down
116 changes: 50 additions & 66 deletions ldapauthenticator/ldapauthenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def _server_port_default(self):
is set to the current username to form the userdn.
For example, if all users objects existed under the base ou=people,dc=wikimedia,dc=org, and
the username users use is set with the attribute `uid`, you can use the following config:
the username users use is set with the attribute `sAMAccountName`, you can use the following config:
```
c.LDAPAuthenticator.lookup_dn = True
Expand All @@ -132,7 +132,6 @@ def _server_port_default(self):
c.LDAPAuthenticator.lookup_dn_search_password = 'secret'
c.LDAPAuthenticator.user_search_base = 'ou=people,dc=wikimedia,dc=org'
c.LDAPAuthenticator.user_attribute = 'sAMAccountName'
c.LDAPAuthenticator.lookup_dn_user_dn_attribute = 'cn'
```
"""
)
Expand Down Expand Up @@ -180,19 +179,6 @@ def _server_port_default(self):
"""
)

lookup_dn_user_dn_attribute = Unicode(
config=True,
default_value=None,
allow_none=True,
help="""
Attribute containing user's name needed for building DN string, if `lookup_dn` is set to True.
See `user_search_base` for info on how this attribute is used.
For most LDAP servers, this is username. For Active Directory, it is cn.
"""
)

escape_userdn = Bool(
False,
config=True,
Expand All @@ -204,46 +190,45 @@ def _server_port_default(self):
"""
)

def resolve_username(self, username_supplied_by_user):
if self.lookup_dn:
server = ldap3.Server(
self.server_address,
port=self.server_port,
use_ssl=self.use_ssl
def retrieve_userdn(self, username_supplied_by_user):
server = ldap3.Server(
self.server_address,
port=self.server_port,
use_ssl=self.use_ssl
)

search_filter = self.lookup_dn_search_filter.format(
login_attr=self.user_attribute,
login=username_supplied_by_user
)
self.log.debug(
"Looking up user with search_base={search_base}, search_filter='{search_filter}'".format(
search_base=self.user_search_base,
search_filter=search_filter
)
)

search_filter = self.lookup_dn_search_filter.format(
login_attr=self.user_attribute,
login=username_supplied_by_user
)
self.log.debug(
"Looking up user with search_base={search_base}, search_filter='{search_filter}', attributes={attributes}".format(
search_base=self.user_search_base,
search_filter=search_filter,
attributes=self.user_attribute
)
)
conn = ldap3.Connection(server, user=self.escape_userdn_if_needed(self.lookup_dn_search_user), password=self.lookup_dn_search_password)
is_bound = conn.bind()
if not is_bound:
self.log.warn("Can't connect to LDAP")
return None

conn = ldap3.Connection(server, user=self.escape_userdn_if_needed(self.lookup_dn_search_user), password=self.lookup_dn_search_password)
is_bound = conn.bind()
if not is_bound:
self.log.warn("Can't connect to LDAP")
return None
conn.search(
search_base=self.user_search_base,
search_scope=ldap3.SUBTREE,
search_filter=search_filter
)

conn.search(
search_base=self.user_search_base,
search_scope=ldap3.SUBTREE,
search_filter=search_filter,
attributes=[self.lookup_dn_user_dn_attribute]
if len(conn.response) == 0 or 'dn' not in conn.response[0]:
self.log.warn(
"No user entry found when looking up with search_base={search_base}, search_filter='{search_filter}'".format(
search_base=self.user_search_base,
search_filter=search_filter
)
)

if len(conn.response) == 0 or 'attributes' not in conn.response[0].keys():
self.log.warn('username:%s No such user entry found when looking up with attribute %s', username_supplied_by_user,
self.user_attribute)
return None
return conn.response[0]['attributes'][self.lookup_dn_user_dn_attribute]
else:
return username_supplied_by_user
return None
return conn.response[0]['dn']

def escape_userdn_if_needed(self, userdn):
if self.escape_userdn:
Expand All @@ -255,6 +240,7 @@ def escape_userdn_if_needed(self, userdn):
def authenticate(self, handler, data):
username = data['username']
password = data['password']

# Get LDAP Connection
def getConnection(userdn, username, password):
server = ldap3.Server(
Expand All @@ -263,12 +249,12 @@ def getConnection(userdn, username, password):
use_ssl=self.use_ssl
)
self.log.debug('Attempting to bind {username} with {userdn}'.format(
username=username,
userdn=userdn
username=username,
userdn=userdn
))
conn = ldap3.Connection(server, user=self.escape_userdn_if_needed(userdn), password=password)
return conn

# Protect against invalid usernames as well as LDAP injection attacks
if not re.match(self.valid_username_regex, username):
self.log.warn('username:%s Illegal characters in username, must match regex %s', username, self.valid_username_regex)
Expand All @@ -280,29 +266,27 @@ def getConnection(userdn, username, password):
return None

isBound = False
self.log.debug("TYPE= '%s'",isinstance(self.bind_dn_template, list))

resolved_username = self.resolve_username(username)
if resolved_username is None:
return None

# In case, there are multiple binding templates
if isinstance(self.bind_dn_template, list):
if self.lookup_dn:
userdn = self.retrieve_userdn(username)
if userdn is None:
return None
conn = getConnection(userdn, username, password)
isBound = conn.bind()
else:
if not isinstance(self.bind_dn_template, list):
self.bind_dn_template = [self.bind_dn_template]
for dn in self.bind_dn_template:
userdn = dn.format(username=resolved_username)
userdn = dn.format(username=username)
conn = getConnection(userdn, username, password)
isBound = conn.bind()
self.log.debug('Status of user bind {username} with {userdn} : {isBound}'.format(
username=username,
userdn=userdn,
isBound=isBound
))
))
if isBound:
break
else:
userdn = self.bind_dn_template.format(username=resolved_username)
conn = getConnection(userdn, username, password)
isBound = conn.bind()

if isBound:
if self.allowed_groups:
Expand Down

0 comments on commit 0c0583e

Please sign in to comment.