Skip to content

Commit

Permalink
Merge pull request #2511 from stveit/jwt-amend-library-with-pip-fix
Browse files Browse the repository at this point in the history
Support JWT token authentication
  • Loading branch information
lunkwill42 authored Jan 20, 2023
2 parents b916d10 + 12d0f95 commit 3dde833
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 7 deletions.
8 changes: 8 additions & 0 deletions python/nav/django/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from nav.config import NAV_CONFIG, getconfig, find_config_dir
from nav.db import get_connection_parameters
import nav.buildconf
from nav.jwtconf import JWTConf

ALLOWED_HOSTS = ['*']

Expand Down Expand Up @@ -262,3 +263,10 @@
from local_settings import *
except ImportError:
pass

_issuers_setting = JWTConf().get_issuers_setting()

OIDC_AUTH = {
'JWT_ISSUERS': _issuers_setting,
'JWT_AUTH_HEADER_PREFIX': 'Bearer',
}
32 changes: 32 additions & 0 deletions python/nav/etc/jwt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# This file contains configuration for JWT issuers that this NAV instance
# should accept tokens from.

# Each issuer is defined in a section. The name of the section should be
# the issuers name, the same value as the 'iss' claim will be in tokens
# generated by this issuer. This value is often a URL, but does not
# have to be.

# Each section must contain an _aud_ option. NAV will only accept
# tokens from this issuer where the 'aud' claim matches the value of _aud_.

# The _keytype_ option defines what kind of key the _key_ option contains.
# The value can be either JWKS or PEM.

# The _key_ option contains the key matching the _keytype_ option.
# If _keytype_ is JWKS, then _key_ must contain a URL to an endpoint
# exposing a JWKS object. If _keytype_ is PEM, then _key_ must contain
# a path to a file that contains a public key in PEM format.

# Example 1 (Issuer with _keytype_ JWKS)

# [https://issuer-with-url-name.no]
# aud=nav-instance-url
# keytype=JWKS
# key=https://some-url/jwks

# Example 2 (Issuer with _keytype_ PEM)

# [issuer-with-simpler-name]
# aud=nav-instance-url
# keytype=PEM
# key=/some/path/public_key.pem
56 changes: 56 additions & 0 deletions python/nav/jwtconf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import logging
from functools import partial
import configparser

from nav.config import ConfigurationError, NAVConfigParser

_logger = logging.getLogger('nav.jwtconf')


class JWTConf(NAVConfigParser):
"""jwt.conf config parser"""

DEFAULT_CONFIG_FILES = ('jwt.conf',)

def get_issuers_setting(self):
issuers_settings = dict()
for section in self.sections():
try:
get = partial(self.get, section)
key = self._validate_key(get('key'))
aud = self._validate_audience(get('aud'))
key_type = self._validate_type(get('keytype'))
if key_type == 'PEM':
key = self._read_file(key)
claims_options = {
'aud': {'values': [aud], 'essential': True},
}
issuers_settings[section] = {
'key': key,
'type': key_type,
'claims_options': claims_options,
}
except (configparser.Error, ConfigurationError) as error:
_logger.error('Error collecting stats for %s: %s', section, error)
return issuers_settings

def _read_file(self, file):
with open(file, "r") as f:
return f.read()

def _validate_key(self, key):
if not key:
raise ConfigurationError("Invalid 'key': 'key' must not be empty")
return key

def _validate_type(self, key_type):
if key_type not in ['JWKS', 'PEM']:
raise ConfigurationError(
"Invalid 'keytype': 'keytype' must be either 'JWKS' or 'PEM'"
)
return key_type

def _validate_audience(self, audience):
if not audience:
raise ConfigurationError("Invalid 'aud': 'aud' must not be empty")
return audience
25 changes: 20 additions & 5 deletions python/nav/web/api/v1/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

import logging
from datetime import datetime
from urllib.parse import urlparse
from rest_framework.permissions import BasePermission, SAFE_METHODS
from rest_framework.exceptions import AuthenticationFailed
from rest_framework.authentication import TokenAuthentication, BaseAuthentication
from urllib.parse import urlparse
from oidc_auth.authentication import JWTToken

from nav.models.api import APIToken

Expand Down Expand Up @@ -74,7 +75,7 @@ class TokenPermission(BasePermission):

def has_permission(self, request, _view):
token = request.auth # type: APIToken
if not token:
if not token or not isinstance(token, APIToken):
return False

endpoints_ok = self._check_endpoints(request)
Expand Down Expand Up @@ -143,13 +144,27 @@ def _ensure_trailing_slash(path):
return path if path.endswith('/') else path + '/'


class JWTPermission(BasePermission):
"""Checks if the token has correct permissions"""

url_prefix = '/api'
version = 1

def has_permission(self, request, _view):
token = request.auth # type: JWTToken
if not token or not isinstance(token, JWTToken):
return False
return True


class APIPermission(BasePermission):
"""Checks for correct permissions when accessing the API"""

def has_permission(self, request, view):
"""Checks if request is permissable
:type request: rest_framework.request.Request
"""
return LoggedInPermission().has_permission(
request, view
) or TokenPermission().has_permission(request, view)
return any(
permission().has_permission(request, view)
for permission in (LoggedInPermission, TokenPermission, JWTPermission)
)
13 changes: 11 additions & 2 deletions python/nav/web/api/v1/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from rest_framework.views import APIView
from rest_framework.viewsets import ViewSet
from rest_framework.generics import ListAPIView, get_object_or_404
from oidc_auth.authentication import JSONWebTokenAuthentication

from nav.models import manage, event, cabling, rack, profiles
from nav.models.fields import INFINITY, UNRESOLVED
Expand All @@ -50,7 +51,11 @@
from nav.web.api.v1 import serializers, alert_serializers
from nav.web.status2 import STATELESS_THRESHOLD
from nav.macaddress import MacPrefix
from .auth import APIPermission, APIAuthentication, NavBaseAuthentication
from .auth import (
APIPermission,
APIAuthentication,
NavBaseAuthentication,
)
from .helpers import prefix_collector
from .filter_backends import (
AlertHistoryFilterBackend,
Expand Down Expand Up @@ -200,7 +205,11 @@ def remove_invalid_fields(self, queryset, ordering, view, request):
class NAVAPIMixin(APIView):
"""Mixin for providing permissions and renderers"""

authentication_classes = (NavBaseAuthentication, APIAuthentication)
authentication_classes = (
NavBaseAuthentication,
APIAuthentication,
JSONWebTokenAuthentication,
)
permission_classes = (APIPermission,)
renderer_classes = (JSONRenderer, BrowsableAPIRenderer)
filter_backends = (filters.SearchFilter, DjangoFilterBackend, RelatedOrderingFilter)
Expand Down
2 changes: 2 additions & 0 deletions requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,5 @@ pynetsnmp-2>=0.1.8,<0.2.0
libsass==0.15.1

napalm==3.4.1

git+https://github.com/Uninett/[email protected]#egg=drf-oidc-auth
132 changes: 132 additions & 0 deletions tests/unittests/jwtconf_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
from mock import patch
from unittest import TestCase
from nav.jwtconf import JWTConf
from nav.config import ConfigurationError


class TestJWTConf(TestCase):
def setUp(self):
pass

def test_valid_jwks_config_should_pass(self):
config = u"""
[jwks-issuer]
keytype=JWKS
aud=nav
key=www.example.com
"""
expected_settings = {
'jwks-issuer': {
'key': 'www.example.com',
'type': 'JWKS',
'claims_options': {
'aud': {'values': ['nav'], 'essential': True},
},
}
}
with patch.object(JWTConf, 'DEFAULT_CONFIG', config):
jwtconf = JWTConf()
settings = jwtconf.get_issuers_setting()
self.assertEqual(settings, expected_settings)

def test_valid_pem_config_should_pass(self):
config = u"""
[pem-issuer]
keytype=PEM
aud=nav
key=key_path
"""
pem_key = "PEM KEY"
expected_settings = {
'pem-issuer': {
'key': pem_key,
'type': 'PEM',
'claims_options': {
'aud': {'values': ['nav'], 'essential': True},
},
}
}

def read_file_patch(self, file):
return pem_key

with patch.object(JWTConf, 'DEFAULT_CONFIG', config):
with patch.object(JWTConf, '_read_file', read_file_patch):
jwtconf = JWTConf()
settings = jwtconf.get_issuers_setting()
self.assertEqual(settings, expected_settings)

def test_invalid_ketype_should_fail(self):
config = u"""
[pem-issuer]
keytype=Fake
aud=nav
key=key
"""
with patch.object(JWTConf, 'DEFAULT_CONFIG', config):
jwtconf = JWTConf()
settings = jwtconf.get_issuers_setting()
self.assertEqual(settings, dict())

def test_empty_key_should_fail(self):
config = u"""
[pem-issuer]
keytype=JWKS
aud=nav
key=
"""
with patch.object(JWTConf, 'DEFAULT_CONFIG', config):
jwtconf = JWTConf()
settings = jwtconf.get_issuers_setting()
self.assertEqual(settings, dict())

def test_empty_aud_should_fail(self):
config = u"""
[pem-issuer]
keytype=JWKS
aud=
key=key
"""
with patch.object(JWTConf, 'DEFAULT_CONFIG', config):
jwtconf = JWTConf()
settings = jwtconf.get_issuers_setting()
self.assertEqual(settings, dict())

def test_validate_key_should_raise_error_if_key_is_empty(self):
jwtconf = JWTConf()
with self.assertRaises(ConfigurationError):
jwtconf._validate_key("")

def test_validate_key_should_allow_non_empty_string(self):
key = "key"
jwtconf = JWTConf()
validated_key = jwtconf._validate_key(key)
self.assertEqual(validated_key, key)

def test_validate_audience_should_raise_error_if_audience_is_empty(self):
jwtconf = JWTConf()
with self.assertRaises(ConfigurationError):
jwtconf._validate_audience("")

def test_validate_audience_should_allow_non_empty_string(self):
aud = "key"
jwtconf = JWTConf()
validated_aud = jwtconf._validate_key(aud)
self.assertEqual(validated_aud, aud)

def test_validate_type_should_raise_error_if_type_is_invalid(self):
jwtconf = JWTConf()
with self.assertRaises(ConfigurationError):
jwtconf._validate_type("invalid")

def test_JWKS_should_be_a_valid_type(self):
type = "JWKS"
jwtconf = JWTConf()
validated_type = jwtconf._validate_type(type)
self.assertEqual(validated_type, type)

def test_PEM_should_be_a_valid_type(self):
type = "PEM"
jwtconf = JWTConf()
validated_type = jwtconf._validate_type(type)
self.assertEqual(validated_type, type)

0 comments on commit 3dde833

Please sign in to comment.