diff --git a/mypy.ini b/mypy.ini
index a48107ac2..278e3a956 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -1,6 +1,19 @@
[mypy]
-# TODO: enable more flags like in https://github.com/ocf/slackbridge/blob/master/mypy.ini
show_traceback = True
ignore_missing_imports = True
+check_untyped_defs = True
+
plugins =
mypy_django_plugin.main
+
+disallow_untyped_defs = True
+disallow_untyped_calls = True
+disallow_any_generics = True
+
+warn_no_return = True
+warn_redundant_casts = True
+warn_unused_configs = True
+warn_unused_ignores = True
+
+[mypy.plugins.django-stubs]
+django_settings_module = ocfweb.settings
diff --git a/ocfweb/about/lab.py b/ocfweb/about/lab.py
index 7385dda15..a48ccfef1 100644
--- a/ocfweb/about/lab.py
+++ b/ocfweb/about/lab.py
@@ -1,7 +1,9 @@
+from django.http import HttpRequest
+from django.http import HttpResponse
from django.shortcuts import render
-def lab_open_source(request):
+def lab_open_source(request: HttpRequest) -> HttpResponse:
return render(
request,
'about/lab-open-source.html',
@@ -11,7 +13,7 @@ def lab_open_source(request):
)
-def lab_vote(request):
+def lab_vote(request: HttpRequest) -> HttpResponse:
return render(
request,
'about/lab-vote.html',
diff --git a/ocfweb/about/staff.py b/ocfweb/about/staff.py
index 1ebc634f5..d9c7a3e06 100644
--- a/ocfweb/about/staff.py
+++ b/ocfweb/about/staff.py
@@ -1,7 +1,9 @@
+from django.http import HttpRequest
+from django.http import HttpResponse
from django.shortcuts import render
-def about_staff(request):
+def about_staff(request: HttpRequest) -> HttpResponse:
return render(
request,
'about/staff.html',
diff --git a/ocfweb/account/chpass.py b/ocfweb/account/chpass.py
index 09633ab46..86f9e4981 100644
--- a/ocfweb/account/chpass.py
+++ b/ocfweb/account/chpass.py
@@ -1,4 +1,10 @@
+from typing import Any
+from typing import Iterator
+from typing import List
+
from django import forms
+from django.http import HttpRequest
+from django.http import HttpResponse
from django.shortcuts import render
from django.template.loader import render_to_string
from django.utils.safestring import mark_safe
@@ -15,15 +21,14 @@
from ocfweb.component.celery import change_password as change_password_task
from ocfweb.component.forms import Form
-
CALLINK_ERROR_MSG = (
"Couldn't connect to CalLink API. Resetting group "
'account passwords online is unavailable.'
)
-def get_accounts_signatory_for(calnet_uid):
- def flatten(lst):
+def get_accounts_signatory_for(calnet_uid: str) -> List[Any]:
+ def flatten(lst: Iterator[Any]) -> List[Any]:
return [item for sublist in lst for item in sublist]
group_accounts = flatten(
@@ -40,7 +45,7 @@ def flatten(lst):
return group_accounts
-def get_accounts_for(calnet_uid):
+def get_accounts_for(calnet_uid: str) -> List[Any]:
accounts = users_by_calnet_uid(calnet_uid)
if calnet_uid in TESTER_CALNET_UIDS:
@@ -51,7 +56,7 @@ def get_accounts_for(calnet_uid):
@calnet_required
-def change_password(request):
+def change_password(request: HttpRequest) -> HttpResponse:
calnet_uid = request.session['calnet_uid']
error = None
accounts = get_accounts_for(calnet_uid)
@@ -117,19 +122,20 @@ def change_password(request):
class ChpassForm(Form):
-
- def __init__(self, ocf_accounts, calnet_uid, *args, **kwargs):
+ # fix self.fields.keyOrder type error in mypy
+ field_order = [
+ 'ocf_account',
+ 'new_password',
+ 'confirm_password',
+ ]
+
+ def __init__(self, ocf_accounts: List[str], calnet_uid: str, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.calnet_uid = calnet_uid
self.fields['ocf_account'] = forms.ChoiceField(
choices=[(x, x) for x in ocf_accounts],
label='OCF account',
)
- self.fields.keyOrder = [
- 'ocf_account',
- 'new_password',
- 'confirm_password',
- ]
new_password = forms.CharField(
widget=forms.PasswordInput,
@@ -141,7 +147,7 @@ def __init__(self, ocf_accounts, calnet_uid, *args, **kwargs):
label='Confirm password',
)
- def clean_ocf_account(self):
+ def clean_ocf_account(self) -> str:
data = self.cleaned_data['ocf_account']
if not user_exists(data):
raise forms.ValidationError('OCF user account does not exist.')
@@ -161,7 +167,7 @@ def clean_ocf_account(self):
return data
- def clean_confirm_password(self):
+ def clean_confirm_password(self) -> str:
new_password = self.cleaned_data.get('new_password')
confirm_password = self.cleaned_data.get('confirm_password')
diff --git a/ocfweb/account/commands.py b/ocfweb/account/commands.py
index 056e2e18e..c71aa69bf 100644
--- a/ocfweb/account/commands.py
+++ b/ocfweb/account/commands.py
@@ -1,5 +1,7 @@
from django import forms
from django.forms import widgets
+from django.http import HttpRequest
+from django.http import HttpResponse
from django.shortcuts import render
from paramiko import AuthenticationException
from paramiko import SSHClient
@@ -8,7 +10,7 @@
from ocfweb.component.forms import Form
-def commands(request):
+def commands(request: HttpRequest) -> HttpResponse:
command_to_run = ''
output = ''
error = ''
diff --git a/ocfweb/account/recommender.py b/ocfweb/account/recommender.py
index 170e7e7cd..d57a08d77 100644
--- a/ocfweb/account/recommender.py
+++ b/ocfweb/account/recommender.py
@@ -1,15 +1,17 @@
from random import randint
+from typing import Any
+from typing import List
from ocflib.account.creation import validate_username
from ocflib.account.creation import ValidationError
from ocflib.account.creation import ValidationWarning
-def recommend(real_name, n):
- name_fields = [name.lower() for name in real_name.split()]
+def recommend(real_name: str, n: int) -> List[Any]:
+ name_fields: List[str] = [name.lower() for name in real_name.split()]
# Can reimplement name_field_abbrevs to only remove vowels or consonants
- name_field_abbrevs = [[] for i in range(len(name_fields))]
+ name_field_abbrevs: List[List[str]] = [[] for i in range(len(name_fields))]
for i in range(len(name_fields)):
name_field = name_fields[i]
for j in range(1, len(name_field) + 1):
@@ -23,7 +25,7 @@ def recommend(real_name, n):
new_unvalidated_recs.append(rec + name_field_abbrev)
unvalidated_recs = new_unvalidated_recs
- validated_recs = []
+ validated_recs: List[Any] = []
while len(validated_recs) < n and len(unvalidated_recs) > 0:
rec = unvalidated_recs.pop(randint(0, len(unvalidated_recs) - 1))
try:
diff --git a/ocfweb/account/register.py b/ocfweb/account/register.py
index 978d1d848..187728a9f 100644
--- a/ocfweb/account/register.py
+++ b/ocfweb/account/register.py
@@ -1,10 +1,13 @@
+from typing import Union
+
import ocflib.account.search as search
import ocflib.account.validators as validators
import ocflib.misc.validators
import ocflib.ucb.directory as directory
from Crypto.PublicKey import RSA
from django import forms
-from django.core.exceptions import NON_FIELD_ERRORS
+from django.http import HttpRequest
+from django.http import HttpResponse
from django.http import HttpResponseBadRequest
from django.http import HttpResponseRedirect
from django.http import JsonResponse
@@ -29,7 +32,7 @@
@calnet_required
-def request_account(request):
+def request_account(request: HttpRequest) -> Union[HttpResponseRedirect, HttpResponse]:
calnet_uid = request.session['calnet_uid']
status = 'new_request'
@@ -87,10 +90,10 @@ def request_account(request):
if isinstance(task.result, NewAccountResponse):
if task.result.status == NewAccountResponse.REJECTED:
status = 'has_errors'
- form._errors[NON_FIELD_ERRORS] = form.error_class(task.result.errors)
+ form.add_error('NON_FIELD_ERRORS', task.result.errors)
elif task.result.status == NewAccountResponse.FLAGGED:
status = 'has_warnings'
- form._errors[NON_FIELD_ERRORS] = form.error_class(task.result.errors)
+ form.add_error('NON_FIELD_ERRORS', task.result.errors)
elif task.result.status == NewAccountResponse.PENDING:
return HttpResponseRedirect(reverse('account_pending'))
else:
@@ -114,7 +117,7 @@ def request_account(request):
)
-def recommend(request):
+def recommend(request: HttpRequest) -> Union[JsonResponse, HttpResponseBadRequest]:
real_name = request.GET.get('real_name', None)
if real_name is None:
return HttpResponseBadRequest('No real_name in recommend request')
@@ -127,7 +130,7 @@ def recommend(request):
)
-def validate(request):
+def validate(request: HttpRequest) -> Union[HttpResponseBadRequest, JsonResponse]:
real_name = request.GET.get('real_name', None)
if real_name is None:
return HttpResponseBadRequest('No real_name in validate request')
@@ -149,7 +152,7 @@ def validate(request):
})
-def wait_for_account(request):
+def wait_for_account(request: HttpRequest) -> Union[HttpResponse, HttpResponseRedirect]:
if 'approve_task_id' not in request.session:
return render(
request,
@@ -180,11 +183,11 @@ def wait_for_account(request):
return render(request, 'account/register/wait/error-probably-not-created.html', {})
-def account_pending(request):
+def account_pending(request: HttpRequest) -> HttpResponse:
return render(request, 'account/register/pending.html', {'title': 'Account request pending'})
-def account_created(request):
+def account_created(request: HttpRequest) -> HttpResponse:
return render(request, 'account/register/success.html', {'title': 'Account request successful'})
@@ -232,7 +235,7 @@ class ApproveForm(Form):
},
)
- def clean_verify_password(self):
+ def clean_verify_password(self) -> str:
password = self.cleaned_data.get('password')
verify_password = self.cleaned_data.get('verify_password')
@@ -241,7 +244,7 @@ def clean_verify_password(self):
raise forms.ValidationError("Your passwords don't match.")
return verify_password
- def clean_verify_contact_email(self):
+ def clean_verify_contact_email(self) -> str:
email = self.cleaned_data.get('contact_email')
verify_contact_email = self.cleaned_data.get('verify_contact_email')
@@ -250,7 +253,8 @@ def clean_verify_contact_email(self):
raise forms.ValidationError("Your emails don't match.")
return verify_contact_email
- def clean(self):
+ # clean incompatible with supertype BaseForm which is defined in django.
+ def clean(self) -> None: # type: ignore
cleaned_data = super().clean()
# validate password (requires username to check similarity)
diff --git a/ocfweb/account/templatetags/vhost_mail.py b/ocfweb/account/templatetags/vhost_mail.py
index 4254f1ac1..c0c5d5544 100644
--- a/ocfweb/account/templatetags/vhost_mail.py
+++ b/ocfweb/account/templatetags/vhost_mail.py
@@ -1,8 +1,10 @@
+from typing import List
+
from django import template
register = template.Library()
@register.filter
-def address_to_parts(address):
+def address_to_parts(address: str) -> List[str]:
return address.split('@')
diff --git a/ocfweb/account/vhost.py b/ocfweb/account/vhost.py
index 6b6d1c130..052e2e7b5 100644
--- a/ocfweb/account/vhost.py
+++ b/ocfweb/account/vhost.py
@@ -2,9 +2,12 @@
import re
import socket
from textwrap import dedent
+from typing import Any
from django import forms
from django.conf import settings
+from django.http import HttpRequest
+from django.http import HttpResponse
from django.shortcuts import redirect
from django.shortcuts import render
from django.urls import reverse
@@ -23,18 +26,18 @@
from ocfweb.component.session import logged_in_user
-def available_domain(domain):
+def available_domain(domain: str) -> bool:
if not re.match(r'^[a-zA-Z0-9]+\.berkeley\.edu$', domain):
return False
return not host_exists(domain)
-def valid_domain_external(domain):
+def valid_domain_external(domain: str) -> bool:
return bool(re.match(r'([a-zA-Z0-9]+\.)+[a-zA-Z0-9]{2,}', domain))
@login_required
-def request_vhost(request):
+def request_vhost(request: HttpRequest) -> HttpResponse:
user = logged_in_user(request)
attrs = user_attrs(user)
is_group = 'callinkOid' in attrs
@@ -137,7 +140,9 @@ def request_vhost(request):
else:
return redirect(reverse('request_vhost_success'))
else:
- form = VirtualHostForm(is_group, initial={'requested_subdomain': user + '.berkeley.edu'})
+ # Unsupported left operand type for + ("None") because form might not have been instantiated at this point...
+ # but this doesn't matter because of if-else clause
+ form = VirtualHostForm(is_group, initial={'requested_subdomain': user + '.berkeley.edu'}) # type: ignore
group_url = f'https://www.ocf.berkeley.edu/~{user}/'
@@ -156,7 +161,7 @@ def request_vhost(request):
)
-def request_vhost_success(request):
+def request_vhost_success(request: HttpRequest) -> HttpResponse:
return render(
request,
'account/vhost/success.html',
@@ -231,7 +236,7 @@ class VirtualHostForm(Form):
max_length=1024,
)
- def __init__(self, is_group=True, *args, **kwargs):
+ def __init__(self, is_group: bool = True, *args: Any, **kwargs: Any) -> None:
super(Form, self).__init__(*args, **kwargs)
# It's pretty derpy that we have to set the labels here, but we can't
@@ -266,7 +271,7 @@ def __init__(self, is_group=True, *args, **kwargs):
max_length=64,
)
- def clean_requested_subdomain(self):
+ def clean_requested_subdomain(self) -> str:
requested_subdomain = self.cleaned_data['requested_subdomain'].lower().strip()
if self.cleaned_data['requested_own_domain']:
@@ -291,7 +296,7 @@ def clean_requested_subdomain(self):
return requested_subdomain
- def clean_your_email(self):
+ def clean_your_email(self) -> str:
your_email = self.cleaned_data['your_email']
if not valid_email(your_email):
raise forms.ValidationError(
diff --git a/ocfweb/account/vhost_mail.py b/ocfweb/account/vhost_mail.py
index 2ca1b9f9f..52fe35fcd 100644
--- a/ocfweb/account/vhost_mail.py
+++ b/ocfweb/account/vhost_mail.py
@@ -3,10 +3,19 @@
import re
from contextlib import contextmanager
from textwrap import dedent
+from typing import Any
+from typing import Collection
+from typing import Dict
+from typing import Generator
+from typing import NoReturn
+from typing import Optional
+from typing import Tuple
from django.conf import settings
from django.contrib import messages
+from django.http import HttpRequest
from django.http import HttpResponse
+from django.http import HttpResponseRedirect
from django.shortcuts import redirect
from django.shortcuts import render
from django.urls import reverse
@@ -22,7 +31,6 @@
from ocfweb.component.errors import ResponseException
from ocfweb.component.session import logged_in_user
-
EXAMPLE_CSV = dedent("""\
president,john.doe@berkeley.edu
officers,john.doe@berkeley.edu jane.doe@berkeley.edu
@@ -42,7 +50,7 @@ class InvalidEmailError(ValueError):
@login_required
@group_account_required
-def vhost_mail(request):
+def vhost_mail(request: HttpRequest) -> HttpResponse:
user = logged_in_user(request)
vhosts = []
@@ -69,12 +77,15 @@ def vhost_mail(request):
@login_required
@group_account_required
@require_POST
-def vhost_mail_update(request):
+def vhost_mail_update(request: HttpRequest) -> HttpResponseRedirect:
user = logged_in_user(request)
# All requests are required to have these
action = _get_action(request)
- addr_name, addr_domain, addr_vhost = _get_addr(request, user, 'addr', required=True)
+ # _get_addr may return None, but never with this particular call
+ addr_info = _get_addr(request, user, 'addr', required=True)
+ assert addr_info is not None
+ addr_name, addr_domain, addr_vhost = addr_info
addr = (addr_name or '') + '@' + addr_domain
# These fields are optional; some might be None
@@ -137,7 +148,7 @@ def vhost_mail_update(request):
@login_required
@group_account_required
-def vhost_mail_csv_export(request, domain):
+def vhost_mail_csv_export(request: HttpRequest, domain: str) -> HttpResponse:
user = logged_in_user(request)
vhost = _get_vhost(user, domain)
if not vhost:
@@ -161,7 +172,7 @@ def vhost_mail_csv_export(request, domain):
@login_required
@group_account_required
@require_POST
-def vhost_mail_csv_import(request, domain):
+def vhost_mail_csv_import(request: HttpRequest, domain: str) -> HttpResponseRedirect:
user = logged_in_user(request)
vhost = _get_vhost(user, domain)
if not vhost:
@@ -204,7 +215,7 @@ def vhost_mail_csv_import(request, domain):
return _redirect_back()
-def _write_csv(addresses):
+def _write_csv(addresses: Generator[Any, None, None]) -> Any:
"""Turn a collection of vhost forwarding addresses into a CSV
string for user download."""
buf = io.StringIO()
@@ -218,14 +229,14 @@ def _write_csv(addresses):
return buf.getvalue()
-def _parse_csv(request, domain):
+def _parse_csv(request: HttpRequest, domain: str) -> Dict[str, Any]:
"""Parse, validate, and return addresses from the file uploaded
with the CSV upload button/form."""
- csv_file = request.FILES.get('csv_file')
+ csv_file: Any = request.FILES.get('csv_file')
if not csv_file:
_error(request, 'Missing CSV file!')
- addresses = {}
+ addresses: Dict[str, Collection[Any]] = {}
try:
with io.TextIOWrapper(csv_file, encoding='utf-8') as f:
reader = csv.reader(f)
@@ -247,12 +258,12 @@ def _parse_csv(request, domain):
except ValueError as e:
_error(request, 'Error parsing CSV: row {}: {}'.format(i + 1, e))
except UnicodeDecodeError as e:
- _error(f'Uploaded file is not valid UTF-8 encoded: "{e}"')
+ _error(request, f'Uploaded file is not valid UTF-8 encoded: "{e}"')
return addresses
-def _parse_csv_forward_addrs(string):
+def _parse_csv_forward_addrs(string: str) -> Collection[Any]:
"""Parse and validate emails from a commas-and-whitespace separated
list string."""
# Allow any combination of whitespace and , as separators
@@ -269,24 +280,26 @@ def _parse_csv_forward_addrs(string):
return frozenset(to_addrs)
-def _error(request, msg):
+def _error(request: HttpRequest, msg: str) -> NoReturn:
messages.add_message(request, messages.ERROR, msg)
raise ResponseException(_redirect_back())
-def _redirect_back():
+def _redirect_back() -> Any:
return redirect(reverse('vhost_mail'))
-def _get_action(request):
+def _get_action(request: HttpRequest) -> Any:
action = request.POST.get('action')
if action not in {'add', 'update', 'delete'}:
_error(request, f'Invalid action: "{action}"')
else:
return action
+ return None
+
-def _parse_addr(addr, allow_wildcard=False):
+def _parse_addr(addr: str, allow_wildcard: bool = False) -> Optional[Tuple[str, str]]:
"""Safely parse an email, returning first component and domain."""
m = re.match(
(
@@ -302,8 +315,10 @@ def _parse_addr(addr, allow_wildcard=False):
if '.' in domain:
return name, domain
+ return None
-def _get_addr(request, user, field, required=True):
+
+def _get_addr(request: HttpRequest, user: Any, field: str, required: bool = True) -> Optional[Tuple[Any, Any, Any]]:
original = request.POST.get(field)
if original is not None:
addr = original.strip()
@@ -322,8 +337,10 @@ def _get_addr(request, user, field, required=True):
elif required:
_error(request, 'You must provide an address!')
+ return None
+
-def _get_forward_to(request):
+def _get_forward_to(request: HttpRequest) -> Optional[Collection[Any]]:
forward_to = request.POST.get('forward_to')
if forward_to is None:
@@ -346,7 +363,7 @@ def _get_forward_to(request):
return frozenset(parsed_addrs)
-def _get_password(request, addr_name):
+def _get_password(request: HttpRequest, addr_name: Optional[str]) -> Any:
# If addr_name is None, then this is a wildcard address, and those can't
# have passwords.
if addr_name is None:
@@ -365,21 +382,21 @@ def _get_password(request, addr_name):
return crypt_password(password)
-def _get_vhost(user, domain):
+def _get_vhost(user: Any, domain: str) -> Any:
vhosts = vhosts_for_user(user)
for vhost in vhosts:
if vhost.domain == domain:
return vhost
-def _find_addr(c, vhost, addr):
+def _find_addr(c: Any, vhost: Any, addr: str) -> Any:
for addr_obj in vhost.get_forwarding_addresses(c):
if addr_obj.address == addr:
return addr_obj
@contextmanager
-def _txn(**kwargs):
+def _txn(**kwargs: Any) -> Generator[Any, None, None]:
with get_connection(
user=settings.OCFMAIL_USER,
password=settings.OCFMAIL_PASSWORD,
diff --git a/ocfweb/announcements/announcements.py b/ocfweb/announcements/announcements.py
index fd747fef3..df18f7c0e 100644
--- a/ocfweb/announcements/announcements.py
+++ b/ocfweb/announcements/announcements.py
@@ -1,10 +1,14 @@
from collections import namedtuple
from datetime import date
-from datetime import datetime
+from datetime import datetime as original_datetime
from datetime import time
+from typing import Any
+from typing import Callable
from typing import Tuple
from cached_property import cached_property
+from django.http import HttpRequest
+from django.http import HttpResponse
from django.shortcuts import render
from django.templatetags.static import static
from django.urls import reverse
@@ -18,24 +22,24 @@
class Announcement(namedtuple('Announcement', ('title', 'date', 'path', 'render'))):
@cached_property
- def link(self):
+ def link(self) -> str:
return reverse(self.route_name)
@cached_property
- def route_name(self):
+ def route_name(self) -> str:
return f'{self.path}-announcement'
@cached_property
- def datetime(self):
+ def datetime(self) -> original_datetime:
"""This is pretty silly, but Django humanize needs a datetime."""
return timezone.make_aware(
- datetime.combine(self.date, time()),
+ original_datetime.combine(self.date, time()),
timezone.get_default_timezone(),
)
-def announcement(title, date, path):
- def wrapper(fn):
+def announcement(title: str, date: date, path: str) -> Callable[[Any], Any]:
+ def wrapper(fn: Callable[..., Any]) -> Callable[..., Any]:
global announcements
announcements += (
Announcement(
@@ -49,7 +53,7 @@ def wrapper(fn):
return wrapper
-def index(request):
+def index(request: HttpRequest) -> HttpResponse:
return render(
request,
'announcements/index.html',
@@ -71,7 +75,7 @@ def index(request):
date(2016, 5, 12),
'ocf-eff-alliance',
)
-def eff_alliance(title, request):
+def eff_alliance(title: str, request: HttpRequest) -> HttpResponse:
return render(
request,
'announcements/2016-05-12-ocf-eff-alliance.html',
@@ -86,7 +90,7 @@ def eff_alliance(title, request):
date(2016, 4, 1),
'renaming-ocf',
)
-def renaming_announcement(title, request):
+def renaming_announcement(title: str, request: HttpRequest) -> HttpResponse:
return render(
request,
'announcements/2016-04-01-renaming.html',
@@ -107,7 +111,7 @@ def renaming_announcement(title, request):
date(2016, 2, 9),
'printing',
)
-def printing_announcement(title, request):
+def printing_announcement(title: str, request: HttpRequest) -> HttpResponse:
return render(
request,
'announcements/2016-02-09-printing.html',
@@ -122,7 +126,7 @@ def printing_announcement(title, request):
date(2017, 3, 1),
'hpc-survey',
)
-def hpc_survey(title, request):
+def hpc_survey(title: str, request: HttpRequest) -> HttpResponse:
return render(
request,
'announcements/2017-03-01-hpc-survey.html',
@@ -137,7 +141,7 @@ def hpc_survey(title, request):
date(2017, 3, 20),
'hiring-2017',
)
-def hiring_2017(title, request):
+def hiring_2017(title: str, request: HttpRequest) -> HttpResponse:
return render(
request,
'announcements/2017-03-20-hiring.html',
@@ -152,7 +156,7 @@ def hiring_2017(title, request):
date(2018, 10, 30),
'hiring-2018',
)
-def hiring_2018(title, request):
+def hiring_2018(title: str, request: HttpRequest) -> HttpResponse:
return render(
request,
'announcements/2018-10-30-hiring.html',
diff --git a/ocfweb/api/announce.py b/ocfweb/api/announce.py
index 6e9cf36f0..bcf93ecae 100644
--- a/ocfweb/api/announce.py
+++ b/ocfweb/api/announce.py
@@ -1,9 +1,10 @@
+from django.http import HttpRequest
from django.http import JsonResponse
from ocfweb.component.blog import get_blog_posts as real_get_blog_posts
-def get_blog_posts(request):
+def get_blog_posts(request: HttpRequest) -> JsonResponse:
return JsonResponse(
[item._asdict() for item in real_get_blog_posts()],
safe=False,
diff --git a/ocfweb/api/hours.py b/ocfweb/api/hours.py
index e49a226f6..2876ca901 100644
--- a/ocfweb/api/hours.py
+++ b/ocfweb/api/hours.py
@@ -1,6 +1,8 @@
from datetime import time
from json import JSONEncoder
+from typing import Any
+from django.http import HttpRequest
from django.http import JsonResponse
from ocflib.lab.hours import Hour
from ocflib.lab.hours import HoursListing
@@ -10,7 +12,7 @@
class JSONHoursEncoder(JSONEncoder):
- def default(self, obj):
+ def default(self, obj: Any) -> Any:
if isinstance(obj, HoursListing):
return obj.__dict__
elif isinstance(obj, Hour):
@@ -22,11 +24,11 @@ def default(self, obj):
@periodic(60)
-def get_hours_listing():
+def get_hours_listing() -> HoursListing:
return read_hours_listing()
-def get_hours_today(request):
+def get_hours_today(request: HttpRequest) -> JsonResponse:
return JsonResponse(
get_hours_listing().hours_on_date(),
encoder=JSONHoursEncoder,
diff --git a/ocfweb/api/lab.py b/ocfweb/api/lab.py
index dbb3978aa..f2d9c2faf 100644
--- a/ocfweb/api/lab.py
+++ b/ocfweb/api/lab.py
@@ -1,3 +1,8 @@
+from typing import Any
+from typing import List
+from typing import Set
+
+from django.http import HttpRequest
from django.http import JsonResponse
from ocflib.infra.hosts import hostname_from_domain
from ocflib.lab.stats import get_connection
@@ -8,12 +13,12 @@
@cache()
-def _list_public_desktops():
+def _list_public_desktops() -> List[Any]:
return list_desktops(public_only=True)
@periodic(5)
-def _get_desktops_in_use():
+def _get_desktops_in_use() -> Set[Any]:
"""List which desktops are currently in use."""
# https://github.com/ocf/ocflib/blob/90f9268a89ac9d53c089ab819c1aa95bdc38823d/ocflib/lab/ocfstats.sql#L70
@@ -27,7 +32,7 @@ def _get_desktops_in_use():
return {hostname_from_domain(session['host']) for session in c}
-def desktop_usage(request):
+def desktop_usage(request: HttpRequest) -> JsonResponse:
public_desktops = _list_public_desktops()
desktops_in_use = _get_desktops_in_use()
diff --git a/ocfweb/api/session_tracking.py b/ocfweb/api/session_tracking.py
index 8169dc5cf..05bd46852 100644
--- a/ocfweb/api/session_tracking.py
+++ b/ocfweb/api/session_tracking.py
@@ -2,8 +2,11 @@
from enum import Enum
from functools import partial
from ipaddress import ip_address
+from typing import Any
+from typing import Dict
from django.conf import settings
+from django.http import HttpRequest
from django.http import HttpResponse
from django.http import HttpResponseBadRequest
from django.views.decorators.csrf import csrf_exempt
@@ -28,7 +31,7 @@
@require_POST
@csrf_exempt
-def log_session(request):
+def log_session(request: HttpRequest) -> HttpResponse:
"""Primary API endpoint for session tracking.
Desktops have a cronjob that calls this endpoint: https://git.io/vpIKX
@@ -63,7 +66,7 @@ def log_session(request):
return HttpResponseBadRequest(e)
-def _new_session(host, user):
+def _new_session(host: str, user: str) -> None:
"""Register new session in when a user logs into a desktop."""
_close_sessions(host)
@@ -75,7 +78,7 @@ def _new_session(host, user):
)
-def _session_exists(host, user):
+def _session_exists(host: str, user: str) -> bool:
"""Returns whether an open session already exists for a given host and user."""
with get_connection() as c:
@@ -87,7 +90,7 @@ def _session_exists(host, user):
return c.fetchone()['count'] > 0
-def _refresh_session(host, user):
+def _refresh_session(host: str, user: str) -> None:
"""Keep a session around if the user is still logged in."""
with get_connection() as c:
@@ -97,7 +100,7 @@ def _refresh_session(host, user):
)
-def _close_sessions(host):
+def _close_sessions(host: str) -> None:
"""Close all sessions for a particular host."""
with get_connection() as c:
@@ -108,7 +111,7 @@ def _close_sessions(host):
@cache(600)
-def _get_desktops():
+def _get_desktops() -> Dict[Any, Any]:
"""Return IPv4 and 6 address to fqdn mapping for OCF desktops from LDAP."""
desktops = {}
diff --git a/ocfweb/api/shorturls.py b/ocfweb/api/shorturls.py
index 70ad232d6..58ba1af7e 100644
--- a/ocfweb/api/shorturls.py
+++ b/ocfweb/api/shorturls.py
@@ -1,10 +1,14 @@
+from typing import Any
+from typing import Union
+
+from django.http import HttpRequest
from django.http import HttpResponseNotFound
from django.http import HttpResponseRedirect
from ocflib.misc.shorturls import get_connection
from ocflib.misc.shorturls import get_shorturl
-def bounce_shorturl(request, slug):
+def bounce_shorturl(request: HttpRequest, slug: Any) -> Union[HttpResponseRedirect, HttpResponseNotFound]:
if slug:
with get_connection() as ctx:
target = get_shorturl(ctx, slug)
diff --git a/ocfweb/api/staff_hours.py b/ocfweb/api/staff_hours.py
index f8186d0b4..d6af0b1ba 100644
--- a/ocfweb/api/staff_hours.py
+++ b/ocfweb/api/staff_hours.py
@@ -1,9 +1,10 @@
+from django.http import HttpRequest
from django.http import JsonResponse
from ocfweb.main.staff_hours import get_staff_hours as real_get_staff_hours
-def get_staff_hours(request):
+def get_staff_hours(request: HttpRequest) -> JsonResponse:
return JsonResponse(
[item._asdict() for item in real_get_staff_hours()],
safe=False,
diff --git a/ocfweb/auth.py b/ocfweb/auth.py
index 22cf62b2b..bc24e1a16 100644
--- a/ocfweb/auth.py
+++ b/ocfweb/auth.py
@@ -1,7 +1,11 @@
# TODO: move this file into ocfweb.component.session?
+from typing import Any
+from typing import Callable
+from typing import Optional
from urllib.parse import urlencode
from django.contrib.auth import REDIRECT_FIELD_NAME
+from django.http import HttpRequest
from django.http import HttpResponseRedirect
from django.shortcuts import render
from django.urls import reverse
@@ -11,8 +15,8 @@
from ocfweb.component.session import logged_in_user
-def login_required(function):
- def _decorator(request, *args, **kwargs):
+def login_required(function: Callable[..., Any]) -> Callable[..., Any]:
+ def _decorator(request: HttpRequest, *args: Any, **kwargs: Any) -> Any:
if is_logged_in(request):
return function(request, *args, **kwargs)
@@ -22,10 +26,10 @@ def _decorator(request, *args, **kwargs):
return _decorator
-def group_account_required(function):
- def _decorator(request, *args, **kwargs):
+def group_account_required(function: Callable[..., Any]) -> Callable[..., Any]:
+ def _decorator(request: HttpRequest, *args: Any, **kwargs: Any) -> Any:
try:
- user = logged_in_user(request)
+ user: Optional[str] = logged_in_user(request)
except KeyError:
user = None
@@ -41,13 +45,13 @@ def _decorator(request, *args, **kwargs):
return _decorator
-def calnet_required(fn):
+def calnet_required(fn: Callable[..., Any]) -> Callable[..., Any]:
"""Decorator for views that require CalNet auth
Checks if "calnet_uid" is in the request.session dictionary. If the value
is not a valid uid, the user is rediected to CalNet login view.
"""
- def wrapper(request, *args, **kwargs):
+ def wrapper(request: HttpRequest, *args: Any, **kwargs: Any) -> Any:
calnet_uid = request.session.get('calnet_uid')
if calnet_uid:
return fn(request, *args, **kwargs)
diff --git a/ocfweb/bin/run_periodic_functions.py b/ocfweb/bin/run_periodic_functions.py
index 423be749f..18638c531 100755
--- a/ocfweb/bin/run_periodic_functions.py
+++ b/ocfweb/bin/run_periodic_functions.py
@@ -12,6 +12,8 @@
from argparse import ArgumentParser
from textwrap import dedent
from traceback import format_exc
+from typing import Any
+from typing import Optional
from django.conf import settings
from ocflib.misc.mail import send_problem_report
@@ -21,17 +23,16 @@
from ocfweb.caching import periodic_functions
-
_logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
# seconds to pause worker after encountering an error
DELAY_ON_ERROR_MIN = 30
DELAY_ON_ERROR_MAX = 1800 # 30 minutes
-delay_on_error = DELAY_ON_ERROR_MIN
+delay_on_error: float = DELAY_ON_ERROR_MIN
-def run_periodic_functions():
+def run_periodic_functions() -> None:
global delay_on_error
# First, import urls so that views are imported, decorators are run, and
@@ -99,7 +100,7 @@ def run_periodic_functions():
delay_on_error = max(DELAY_ON_ERROR_MIN, delay_on_error / 2)
-def main(argv=None):
+def main(argv: Optional[Any] = None) -> int:
os.environ['DJANGO_SETTINGS_MODULE'] = 'ocfweb.settings'
parser = ArgumentParser(description='Run ocfweb periodic functions')
@@ -121,6 +122,8 @@ def main(argv=None):
run_periodic_functions()
time.sleep(1)
+ return 0
+
if __name__ == '__main__':
sys.exit(main())
diff --git a/ocfweb/caching.py b/ocfweb/caching.py
index 1aba885e0..21310b92d 100644
--- a/ocfweb/caching.py
+++ b/ocfweb/caching.py
@@ -4,6 +4,13 @@
from collections import namedtuple
from datetime import datetime
from itertools import chain
+from typing import Any
+from typing import Callable
+from typing import Dict
+from typing import Hashable
+from typing import Iterable
+from typing import Optional
+from typing import Tuple
from cached_property import cached_property
from django.conf import settings
@@ -11,11 +18,10 @@
from ocfweb.environment import ocfweb_version
-
_logger = logging.getLogger(__name__)
-def cache_lookup(key):
+def cache_lookup(key: Hashable) -> Any:
"""Look up a key in the cache, raising KeyError if it's a miss."""
# The "get" method returns `None` both for cached values of `None`,
# and keys which aren't in the cache.
@@ -23,7 +29,7 @@ def cache_lookup(key):
# The recommended workaround is using a sentinel as a default
# return value for when a key is missing. This allows us to still
# cache functions which return None.
- cache_miss_sentinel = {}
+ cache_miss_sentinel: Dict[Any, Any] = {}
retval = django_cache.get(key, cache_miss_sentinel)
is_hit = retval is not cache_miss_sentinel
@@ -35,7 +41,9 @@ def cache_lookup(key):
return retval
-def cache_lookup_with_fallback(key, fallback, ttl=None, force_miss=False):
+def cache_lookup_with_fallback(
+ key: Hashable, fallback: Callable[[], Any], ttl: Optional[int] = None, force_miss: bool = False,
+) -> Any:
"""Look up a key in the cache, falling back to a function if it's a miss.
We first check if the key is in the cache, and if so, return it. If not, we
@@ -69,7 +77,7 @@ def cache_lookup_with_fallback(key, fallback, ttl=None, force_miss=False):
return result
-def cache(ttl=None):
+def cache(ttl: Optional[int] = None) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Caching function decorator, with an optional ttl.
The optional ttl (in seconds) specifies how long cache entries should live.
@@ -94,8 +102,8 @@ def my_deterministic_function(a, b, c):
def my_changing_function(a, b, c):
....
"""
- def outer(fn):
- def inner(*args, **kwargs):
+ def outer(fn: Callable[..., Any]) -> Callable[..., Any]:
+ def inner(*args: Any, **kwargs: Any) -> Any:
return cache_lookup_with_fallback(
_make_function_call_key(fn, args, kwargs),
lambda: fn(*args, **kwargs),
@@ -105,7 +113,7 @@ def inner(*args, **kwargs):
return outer
-def _make_key(key):
+def _make_key(key: Iterable[Any]) -> Tuple[Any, ...]:
"""Return a key suitable for caching.
The returned key prepends a version tag so that we don't share the cache
@@ -122,7 +130,7 @@ def _make_key(key):
)
-def _make_function_call_key(fn, args, kwargs):
+def _make_function_call_key(fn: Callable[..., Any], args: Iterable[Any], kwargs: Dict[Any, Any]) -> Tuple[Any, ...]:
"""Return a key for a function call.
The key will eventually be converted to a string and used as a cache key.
@@ -152,21 +160,25 @@ class PeriodicFunction(
),
):
- def __hash__(self):
+ def __hash__(self) -> int:
return hash(self.function_call_key)
- def __eq__(self, other):
+ def __eq__(self, other: object) -> bool:
+ # Mypy note: It is recommended for __eq__ to work with arbitrary objects.
+ if not isinstance(other, PeriodicFunction):
+ return NotImplemented
+
return self.function_call_key == other.function_call_key
- def __str__(self):
+ def __str__(self) -> str:
return f'PeriodicFunction({self.function_call_key})'
@cached_property
- def function_call_key(self):
+ def function_call_key(self) -> Tuple[Any, ...]:
"""Return the function's cache key."""
return _make_function_call_key(self.function, (), {})
- def function_with_timestamp(self):
+ def function_with_timestamp(self) -> Tuple[datetime, Any]:
"""Return a tuple (timestamp, result).
This is the value we actually store in the cache; the benefit is that
@@ -176,7 +188,7 @@ def function_with_timestamp(self):
"""
return (datetime.now(), self.function())
- def last_update(self):
+ def last_update(self) -> Any:
"""Return the timestamp of the last update of this function.
If the function has never been updated, returns None."""
@@ -186,7 +198,7 @@ def last_update(self):
except KeyError:
return None
- def seconds_since_last_update(self):
+ def seconds_since_last_update(self) -> float:
"""Return the number of seconds since the last update.
If we've never updated, we return the number of seconds since
@@ -195,7 +207,7 @@ def seconds_since_last_update(self):
last_update = self.last_update() or datetime.fromtimestamp(0)
return (datetime.now() - last_update).total_seconds()
- def result(self, **kwargs):
+ def result(self, **kwargs: Any) -> Any:
"""Return the result of this periodic function.
In most cases, we can read it from the cache and so it is nearly
@@ -212,7 +224,7 @@ def result(self, **kwargs):
)
return result
- def update(self):
+ def update(self) -> Any:
"""Run this periodic function and cache the result."""
cache_lookup_with_fallback(
self.function_call_key,
@@ -222,7 +234,7 @@ def update(self):
)
-def periodic(period, ttl=None):
+def periodic(period: float, ttl: Optional[float] = None) -> Callable[[Callable[..., Any]], Any]:
"""Caching function decorator for functions which desire TTL-based caching.
Using this decorator on a function registers it as a "periodic function",
@@ -259,7 +271,7 @@ def get_blog_posts():
elif ttl is None:
ttl = period * 2
- def outer(fn):
+ def outer(fn: Callable[..., Any]) -> Any:
pf = PeriodicFunction(
function=fn,
period=period,
diff --git a/ocfweb/component/blog.py b/ocfweb/component/blog.py
index c69e11d92..a618de856 100644
--- a/ocfweb/component/blog.py
+++ b/ocfweb/component/blog.py
@@ -1,4 +1,7 @@
from collections import namedtuple
+from typing import Any
+from typing import Dict
+from typing import List
from xml.etree import ElementTree as etree
import dateutil.parser
@@ -8,7 +11,6 @@
from ocfweb.caching import periodic
-
_namespaces = {'atom': 'http://www.w3.org/2005/Atom'}
@@ -28,32 +30,34 @@ class Post(
):
@cached_property
- def datetime(self):
+ def datetime(self) -> bool:
return self.published
@classmethod
- def from_element(cls, element):
- def grab_attr(attr):
- el = element
+ def from_element(cls: Any, element: Any) -> Any:
+ def grab_attr(attr: str) -> str:
+ el: Any = element
for part in attr.split('_'):
el = el.find('atom:' + part, namespaces=_namespaces)
return el.text
- attrs = {
+ attrs: Dict[str, Any] = {
attr: grab_attr(attr)
for attr in cls._fields
}
attrs['updated'] = dateutil.parser.parse(attrs['updated'])
attrs['published'] = dateutil.parser.parse(attrs['published'])
- attrs['link'] = element.find(
+ # Fix builtin function being typed as returning an int on error, which has no get
+ el_find: Any = element.find(
'.//atom:link[@type="text/html"]',
namespaces=_namespaces,
- ).get('href')
+ )
+ attrs['link'] = el_find.get('href')
return cls(**attrs)
@periodic(60)
-def get_blog_posts():
+def get_blog_posts() -> List[Any]:
"""Parse the beautiful OCF status blog atom feed into a list of Posts.
Unfortunately Blogger is hella flakey so we use it inside a loop and fail
diff --git a/ocfweb/component/errors.py b/ocfweb/component/errors.py
index 67beea6d1..a26cc4018 100644
--- a/ocfweb/component/errors.py
+++ b/ocfweb/component/errors.py
@@ -1,4 +1,7 @@
+from requests import models
+
+
class ResponseException(Exception):
- def __init__(self, response):
+ def __init__(self, response: models.Response) -> None:
self.response = response
diff --git a/ocfweb/component/forms.py b/ocfweb/component/forms.py
index 1a12835ba..fc601a9f9 100644
--- a/ocfweb/component/forms.py
+++ b/ocfweb/component/forms.py
@@ -1,3 +1,6 @@
+from typing import Any
+from typing import Callable
+
from django import forms
from django.core.exceptions import ValidationError
@@ -9,7 +12,7 @@ class Form(forms.Form):
required_css_class = 'required'
-def wrap_validator(validator):
+def wrap_validator(validator: Callable[..., Any]) -> Callable[..., None]:
"""Wraps a validator which raises some kind of Exception, and instead
returns a Django ValidationError with the same message.
@@ -21,7 +24,7 @@ def wrap_validator(validator):
>>> validator('ocf')
ValidationError: Username is reserved
"""
- def wrapped_validator(*args, **kwargs):
+ def wrapped_validator(*args: Any, **kwargs: Any) -> None:
try:
validator(*args, **kwargs)
except Exception as ex:
diff --git a/ocfweb/component/graph.py b/ocfweb/component/graph.py
index 3d93ed270..00ba9c06f 100644
--- a/ocfweb/component/graph.py
+++ b/ocfweb/component/graph.py
@@ -3,25 +3,33 @@
from datetime import date
from datetime import datetime
from datetime import timedelta
+from typing import Any
+from typing import Callable
+from typing import Optional
+from typing import Tuple
+from django.http import HttpRequest
from django.http import HttpResponse
from django.shortcuts import redirect
from django.urls import reverse
from matplotlib.backends.backend_agg import FigureCanvasAgg
-
+from matplotlib.figure import Figure
MIN_DAYS = 1
MAX_DAYS = 365 * 5
DEFAULT_DAYS = 14
-def current_start_end():
+def current_start_end() -> Tuple[date, date]:
"""Return current default start and end date."""
end = date.today()
return end - timedelta(days=DEFAULT_DAYS), end
-def canonical_graph(hot_path=None, default_start_end=current_start_end):
+def canonical_graph(
+ hot_path: Optional[Callable[..., Any]] = None,
+ default_start_end: Callable[..., Tuple[date, date]] = current_start_end,
+) -> Callable[..., Any]:
"""Decorator to make graphs with a start_day and end_day.
It does three primary things:
@@ -42,9 +50,9 @@ def canonical_graph(hot_path=None, default_start_end=current_start_end):
:param default_start_end: optional, function to get current start/end date
(default: current_start_end)
"""
- def decorator(fn):
- def wrapper(request):
- def _day_from_params(param, default):
+ def decorator(fn: Callable[[Any, date, date], Any]) -> Callable[[Any], Any]:
+ def wrapper(request: HttpRequest) -> Any:
+ def _day_from_params(param: str, default: date) -> date:
try:
return datetime.strptime(request.GET.get(param, ''), '%Y-%m-%d').date()
except ValueError:
@@ -85,7 +93,7 @@ def _day_from_params(param, default):
return decorator
-def plot_to_image_bytes(fig, format='svg', **kwargs):
+def plot_to_image_bytes(fig: Figure, format: str = 'svg', **kwargs: Any) -> bytes:
"""Return bytes representing the plot image."""
buf = io.BytesIO()
FigureCanvasAgg(fig).print_figure(buf, format=format, **kwargs)
diff --git a/ocfweb/component/lab_status.py b/ocfweb/component/lab_status.py
index cc91cd691..790f70a0d 100644
--- a/ocfweb/component/lab_status.py
+++ b/ocfweb/component/lab_status.py
@@ -14,7 +14,7 @@
@periodic(60, ttl=86400)
-def get_lab_status():
+def get_lab_status() -> LabStatus:
"""Get the front page banner message from the default location."""
with open('/etc/ocf/lab_status.yaml') as f:
tree = yaml.safe_load(f)
diff --git a/ocfweb/component/markdown.py b/ocfweb/component/markdown.py
index 487c432c8..02685e332 100644
--- a/ocfweb/component/markdown.py
+++ b/ocfweb/component/markdown.py
@@ -1,4 +1,10 @@
import re
+from typing import Any
+from typing import List
+from typing import Match
+from typing import Set
+from typing import Tuple
+from typing import TYPE_CHECKING
import mistune
from django.urls import reverse
@@ -14,24 +20,39 @@
# tags of a format like: [[!meta title="Backups"]]
META_REGEX = re.compile(r'\[\[!meta ([a-z]+)="([^"]*)"\]\]')
+# Make mypy play nicely with mixins https://github.com/python/mypy/issues/5837
+# TODO: issue has been resolved in mypy, patch when next version of mypy gets released
+# More info: https://github.com/python/mypy/pull/7860
-class HtmlCommentsLexerMixin:
+
+class MixinBase:
+ def __init__(self, rules: Any, default_rules: Any) -> None:
+ self.rules = rules
+ self.default_rules = default_rules
+
+
+_Base: Any = object
+if TYPE_CHECKING:
+ _Base = MixinBase
+
+
+class HtmlCommentsLexerMixin(_Base):
"""Strip HTML comments as entire blocks or inside lines."""
- def enable_html_comments(self):
+ def enable_html_comments(self) -> None:
self.rules.html_comment = re.compile(
r'^',
)
self.default_rules.insert(0, 'html_comment')
- def output_html_comment(self, m):
+ def output_html_comment(self, m: Match[Any]) -> str:
return ''
- def parse_html_comment(self, m):
+ def parse_html_comment(self, m: Match[Any]) -> None:
pass
-class BackslashLineBreakLexerMixin:
+class BackslashLineBreakLexerMixin(_Base):
"""Convert lines that end in a backslash into a simple line break.
This follows GitHub-flavored Markdown on backslashes at the end of lines
@@ -50,22 +71,22 @@ class BackslashLineBreakLexerMixin:
with a line break
"""
- def enable_backslash_line_breaks(self):
+ def enable_backslash_line_breaks(self) -> None:
self.rules.backslash_line_break = re.compile(
'^\\\\\n',
)
self.default_rules.insert(0, 'backslash_line_break')
- def output_backslash_line_break(self, m):
+ def output_backslash_line_break(self, m: Match[Any]) -> str:
return '
'
-class CodeRendererMixin:
+class CodeRendererMixin(_Base):
"""Render highlighted code."""
# TODO: don't use inline styles; see http://pygments.org/docs/formatters/
html_formatter = HtmlFormatter(noclasses=True)
- def block_code(self, code, lang):
+ def block_code(self, code: str, lang: str) -> str:
try:
if lang:
lexer = get_lexer_by_name(lang, stripall=True)
@@ -77,7 +98,7 @@ def block_code(self, code, lang):
return highlight(code, lexer, CodeRendererMixin.html_formatter)
-class DjangoLinkInlineLexerMixin:
+class DjangoLinkInlineLexerMixin(_Base):
"""Turn special Markdown link syntax into Django links.
In Django templates, we can use `url` tags, such as:
@@ -95,7 +116,7 @@ class DjangoLinkInlineLexerMixin:
split_words = re.compile(r'((?:\S|\\ )+)')
- def enable_django_links(self):
+ def enable_django_links(self) -> None:
self.rules.django_link = re.compile(
r'^\[\[(?!\!)'
r'([\s\S]+?)'
@@ -106,10 +127,10 @@ def enable_django_links(self):
)
self.default_rules.insert(0, 'django_link')
- def output_django_link(self, m):
+ def output_django_link(self, m: Match[Any]) -> str:
text, target, fragment = m.group(1), m.group(2), m.group(3)
- def href(link, fragment):
+ def href(link: str, fragment: str) -> str:
if fragment:
return link + '#' + fragment
return link
@@ -123,7 +144,7 @@ def href(link, fragment):
)
-class HeaderRendererMixin:
+class HeaderRendererMixin(_Base):
"""Mixin to render headers with auto-generated IDs (or provided IDs).
If headers are written as usual, they'll be given automatically-generated
@@ -142,14 +163,14 @@ class HeaderRendererMixin:
rendering a document and read afterwards.
"""
- def reset_toc(self):
- self.toc = []
- self.toc_ids = set()
+ def reset_toc(self) -> None:
+ self.toc: List[Any] = []
+ self.toc_ids: Set[Any] = set()
- def get_toc(self):
+ def get_toc(self) -> List[Any]:
return self.toc
- def header(self, text, level, raw=None):
+ def header(self, text: str, level: int, raw: None = None) -> str:
custom_id_match = re.match(r'^(.*?)\s+{([a-z0-9\-_]+)}\s*$', text)
if custom_id_match:
text = custom_id_match.group(1)
@@ -220,12 +241,12 @@ class OcfMarkdownBlockLexer(
)
-def markdown(text):
+def markdown(text: str) -> mistune.Markdown:
_renderer.reset_toc()
return _markdown(text)
-def text_and_meta(f):
+def text_and_meta(f: Any) -> Tuple[str, Any]:
"""Return tuple (text, meta dict) for the given file.
Meta tags are stripped from the Markdown source, but the Markdown is
@@ -234,7 +255,7 @@ def text_and_meta(f):
text = f.read()
meta = {}
- def repl(match):
+ def repl(match: Match[Any]) -> str:
meta[match.group(1)] = match.group(2)
return ''
@@ -243,7 +264,7 @@ def repl(match):
@cache()
-def markdown_and_toc(text):
+def markdown_and_toc(text: str) -> Tuple[Any, Any]:
"""Return tuple (html, toc) for the given text."""
html = markdown(text)
return html, _renderer.get_toc()
diff --git a/ocfweb/component/session.py b/ocfweb/component/session.py
index eda3271a7..fd3b571fe 100644
--- a/ocfweb/component/session.py
+++ b/ocfweb/component/session.py
@@ -1,23 +1,27 @@
+from typing import Any
+from typing import Optional
+
+from django.http import HttpRequest
from ocflib.account.validators import user_exists
-def is_logged_in(request):
+def is_logged_in(request: HttpRequest) -> bool:
"""Return whether a user is logged in."""
return bool(logged_in_user(request))
-def logged_in_user(request):
+def logged_in_user(request: HttpRequest) -> Optional[Any]:
"""Return logged in user, or raise KeyError."""
return request.session.get('ocf_user')
-def login(request, user):
+def login(request: HttpRequest, user: str) -> None:
"""Log in a user. Doesn't do any kind of password validation (obviously)."""
assert user_exists(user)
request.session['ocf_user'] = user
-def logout(request):
+def logout(request: HttpRequest) -> bool:
"""Log out the user. Return True if a user was logged out, False otherwise."""
try:
del request.session['ocf_user']
diff --git a/ocfweb/context_processors.py b/ocfweb/context_processors.py
index bceb2753c..f01a18882 100644
--- a/ocfweb/context_processors.py
+++ b/ocfweb/context_processors.py
@@ -1,6 +1,10 @@
import re
from ipaddress import ip_address
+from typing import Any
+from typing import Dict
+from typing import Generator
+from django.http import HttpRequest
from django.urls import reverse
from ipware import get_client_ip
from ocflib.account.search import user_is_group
@@ -12,7 +16,7 @@
from ocfweb.environment import ocfweb_version
-def get_base_css_classes(request):
+def get_base_css_classes(request: HttpRequest) -> Generator[str, None, None]:
if request.resolver_match and request.resolver_match.url_name:
page_class = 'page-' + request.resolver_match.url_name
yield page_class
@@ -22,7 +26,7 @@ def get_base_css_classes(request):
yield page_class
-def ocf_template_processor(request):
+def ocf_template_processor(request: HttpRequest) -> Dict[str, Any]:
hours_listing = get_hours_listing()
real_ip, _ = get_client_ip(request)
user = logged_in_user(request)
diff --git a/ocfweb/docs/doc.py b/ocfweb/docs/doc.py
index 11e957679..0e78522f4 100644
--- a/ocfweb/docs/doc.py
+++ b/ocfweb/docs/doc.py
@@ -6,7 +6,7 @@
class Document(namedtuple('Document', ['name', 'title', 'render'])):
@cached_property
- def category(self):
+ def category(self) -> str:
"""Return full category path of the document.
For example, "/" or "/staff/backend/".
@@ -14,7 +14,7 @@ def category(self):
return self.name.rsplit('/', 1)[0] + '/'
@cached_property
- def category_for_sidebar(self):
+ def category_for_sidebar(self) -> str:
"""Return the category to show similar pages for in the sidebar.
If this page isn't at the root category, we just return this page's
@@ -29,7 +29,7 @@ def category_for_sidebar(self):
return self.category
@cached_property
- def edit_url(self):
+ def edit_url(self) -> str:
"""Return a GitHub edit URL for this page."""
return (
'https://github.com/ocf/ocfweb/edit/master/ocfweb/docs/docs' +
@@ -38,7 +38,7 @@ def edit_url(self):
)
@cached_property
- def history_url(self):
+ def history_url(self) -> str:
"""Return a GitHub history URL for this page."""
return (
'https://github.com/ocf/ocfweb/commits/master/ocfweb/docs/docs' +
diff --git a/ocfweb/docs/markdown_based.py b/ocfweb/docs/markdown_based.py
index 383217b95..976bd053b 100644
--- a/ocfweb/docs/markdown_based.py
+++ b/ocfweb/docs/markdown_based.py
@@ -15,19 +15,25 @@
import os
from functools import partial
from pathlib import Path
+from typing import Any
+from typing import Dict
+from typing import Generator
from django.conf import settings
+from django.http import HttpRequest
+from django.http import HttpResponse
from django.shortcuts import render
from ocfweb.component.markdown import markdown_and_toc
from ocfweb.component.markdown import text_and_meta
from ocfweb.docs.doc import Document
-
DOCS_DIR = Path(__file__).parent.joinpath('docs')
-def render_markdown_doc(path, meta, text, doc, request):
+def render_markdown_doc(
+ path: Path, meta: Dict[str, Any], text: str, doc: Document, request: HttpRequest,
+) -> HttpResponse:
# Reload markdown docs if in development
if settings.DEBUG:
@@ -48,7 +54,7 @@ def render_markdown_doc(path, meta, text, doc, request):
)
-def get_markdown_docs():
+def get_markdown_docs() -> Generator[Document, None, None]:
for path in DOCS_DIR.glob('**/*.md'):
name, _ = os.path.splitext(str(path.relative_to(DOCS_DIR)))
diff --git a/ocfweb/docs/templatetags/docs.py b/ocfweb/docs/templatetags/docs.py
index bcf56ee85..5d3821125 100644
--- a/ocfweb/docs/templatetags/docs.py
+++ b/ocfweb/docs/templatetags/docs.py
@@ -1,6 +1,11 @@
import re
from collections import namedtuple
from operator import attrgetter
+from typing import Any
+from typing import Collection
+from typing import Dict
+from typing import List
+from typing import Optional
from django import template
from django.utils.html import strip_tags
@@ -11,7 +16,7 @@
class Node(namedtuple('Node', ['path', 'title', 'children'])):
@property
- def url_path(self):
+ def url_path(self) -> str:
return self.path.lstrip('/').rstrip('/')
@@ -19,14 +24,19 @@ def url_path(self):
@register.inclusion_tag('docs/partials/doc-tree.html')
-def doc_tree(root='/', suppress_root=True, cur_path=None, exclude='$^'):
+def doc_tree(
+ root: str = '/',
+ suppress_root: bool = True,
+ cur_path: Optional[str] = None,
+ exclude: Any = '$^',
+) -> Dict[str, Any]:
# root is expected to be like '/' or '/services/' or '/services/web/'
assert root.startswith('/')
assert root.endswith('/')
exclude = re.compile(exclude)
- def _make_tree(root):
+ def _make_tree(root: str) -> Node:
path = root[:-1]
doc = DOCS.get(path)
return Node(
@@ -54,10 +64,10 @@ def _make_tree(root):
@register.inclusion_tag('docs/partials/doc-toc.html')
-def doc_toc(toc, collapsible=False):
+def doc_toc(toc: Collection[Any], collapsible: bool = False) -> Dict[str, Any]:
if len(toc) > 3: # heuristic to avoid dumb tables of contents
- levels = list(sorted({entry[0] for entry in toc}))
- cur = levels[0]
+ levels: List[Any] = list(sorted({entry[0] for entry in toc}))
+ cur: int = levels[0]
html = '