This repository has been archived by the owner on Jan 9, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
executable file
·86 lines (65 loc) · 2.93 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import logging
import os
from oic.utils.client_management import CDB
from mako.lookup import TemplateLookup
from oic.utils.http_util import Response, NotFound, get_post
__author__ = 'rohe0002'
LOGGER = logging.getLogger("")
LOGFILE_NAME = 'oicscr.log'
hdlr = logging.FileHandler(LOGFILE_NAME)
base_formatter = logging.Formatter(
"%(asctime)s %(name)s:%(levelname)s %(message)s")
CPC = ('%(asctime)s %(name)s:%(levelname)s '
'[%(client)s,%(path)s,%(cid)s] %(message)s')
cpc_formatter = logging.Formatter(CPC)
hdlr.setFormatter(base_formatter)
LOGGER.addHandler(hdlr)
LOGGER.setLevel(logging.DEBUG)
LOOKUP = TemplateLookup(directories=['templates', 'htdocs'],
input_encoding='utf-8', output_encoding='utf-8')
def registration(environ, start_response):
resp = Response(template_lookup=LOOKUP, mako_template="registration.html")
return resp(environ, start_response)
def generate_static_client_credentials(parameters):
redirect_uris = parameters['redirect_uris']
client_db_path = os.environ.get("OIDC_CLIENT_DB", "client_db")
LOGGER.info("Updating db: {}".format(client_db_path))
cdb = CDB(client_db_path)
static_client = cdb.create(redirect_uris=redirect_uris, policy_uri="example.com",
logo_uri="example.com")
static_client["response_types"] = ["code", "id_token", "id_token token", "code id_token", "code token", "code id_token token"]
cdb.cdb.close()
LOGGER.info("Generated client credentials: %s", json.dumps(static_client))
return static_client['client_id'], static_client['client_secret']
def application(environ, start_response):
"""
:param environ: The HTTP application environment
:param start_response: The application to run when the handling of the
request is done
:return: The response as a list of lines
"""
path = environ.get('PATH_INFO', '').lstrip('/')
if path == "":
return registration(environ, start_response)
elif path == "generate_client_credentials":
post_data = get_post(environ)
query_params = json.loads(post_data)
client_id, client_secret = generate_static_client_credentials(query_params)
resp = Response(json.dumps({"client_id": client_id, "client_secret": client_secret}),
headers=[('Content-Type', "application/json")])
return resp(environ, start_response)
return NotFound("'/" + path + "' not found")(environ, start_response)
# ----------------------------------------------------------------------------
def bytes_middleware(application):
def response_as_bytes(environ, start_response):
resp = application(environ, start_response)
# encode the data if necessary
data = resp[0]
if isinstance(data, str):
data = data.encode("utf-8")
return [data]
return response_as_bytes
wsgi = bytes_middleware(application)