Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Canonicize user identity URLs #487

Merged
merged 1 commit into from
Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions publ/tokens.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
""" IndieAuth token endpoint """

import functools
import json
import logging
import time
import typing
import urllib.parse

import arrow
import flask
Expand All @@ -12,7 +14,7 @@
import werkzeug.exceptions as http_error
from pony import orm

from . import model
from . import model, utils
from .config import config

LOGGER = logging.getLogger(__name__)
Expand All @@ -26,7 +28,7 @@ def signer(context: str):

def get_token(id_url: str, lifetime: int, scope: str = None, context: str = '') -> str:
""" Gets a signed token for the given identity"""
token = {'me': id_url}
token = {'me': utils.canonicize_url(id_url)}
if scope:
token['scope'] = scope

Expand Down Expand Up @@ -134,13 +136,45 @@ def redeem_grant(grant_type: str, auth_token: str):
return json.dumps(response), {'Content-Type': 'application/json'}


@functools.lru_cache()
def get_ticket_endpoint(me_url: str):
""" Get the IndieAuth Ticket Auth endpoint and the canonical identity URL """
LOGGER.debug("get_ticket_endpoint %s", me_url)
import authl.handlers.indieauth
from bs4 import BeautifulSoup

req = authl.utils.request_url(me_url)
content = BeautifulSoup(req.text, 'html.parser')

if req.links and 'canonical' in req.links:
canonical_url = req.links['canonical']['url']
else:
link = content.find('link', rel='canonical')
if link:
canonical_url = urllib.parse.urljoin(me_url, link.get('href'))
else:
canonical_url = me_url

if utils.canonicize_url(canonical_url) != utils.canonicize_url(me_url):
# We have a rel="canonical" which mismatches the provided identity URL
LOGGER.debug("%s -> canonical=%s", me_url, canonical_url)
endpoint, me_url = authl.handlers.indieauth.find_endpoint(canonical_url,
rel='ticket_endpoint')
else:
# Use our fetch to seed Authl's endpoint fetch and get that instead
endpoints, me_url = authl.handlers.indieauth.find_endpoints(me_url,
req.links, content)
endpoint = endpoints.get('ticket_endpoint')

LOGGER.debug("%s %s", me_url, endpoint)
return endpoint, me_url


def ticket_request(me_url: str, scope: str):
""" Initiate a ticket request """
import authl.handlers.indieauth

try:
endpoint, _ = authl.handlers.indieauth.find_endpoint(me_url,
rel='ticket_endpoint')
endpoint, me_url = get_ticket_endpoint(utils.canonicize_url(me_url))
except RuntimeError:
endpoint = None
if not endpoint:
Expand Down
2 changes: 1 addition & 1 deletion publ/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def register(verified: authl.disposition):
""" Registers a user from the on_verified Authl hook """
if isinstance(verified, authl.disposition.Verified):
LOGGER.info("Got login from user %s with profile %s", verified.identity, verified.profile)
identity = verified.identity
identity = utils.canonicize_url(verified.identity)
now = arrow.utcnow().datetime
values = {
'last_login': now,
Expand Down
11 changes: 11 additions & 0 deletions publ/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,3 +629,14 @@ def parse_arglist(args: str, pos_limit: int = None) -> typing.Tuple[list, ArgDic

LOGGER.debug("pos_args=%s kw_args=%s", pos_args, kwargs)
return pos_args, kwargs


def canonicize_url(url: str) -> str:
""" Canonicize a URL to make them string-comparable """
assert url is not None

parsed = urllib.parse.urlparse(url)._asdict()
parsed['netloc'] = parsed['netloc'].casefold()
if not parsed.get('path'):
parsed['path'] = '/'
return urllib.parse.urlunparse(urllib.parse.ParseResult(**parsed))
75 changes: 72 additions & 3 deletions tests/test_tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,20 @@ def ticket_endpoint(request, _):

token_user = user.User(verified['me'])
assert token_user.profile['name'] == 'boop'
foo_tickets.reset()

# Provisional request flow
with app.test_request_context('/bogus'):
request_url = flask.url_for('tokens')
with app.test_client() as client:
req = client.post(request_url, data={'action': 'ticket',
'subject': 'https://foo.example/'})
'subject': 'https://foo.example'})
LOGGER.info("Got ticket redemption response %d: %s",
req.status_code, req.data)
assert req.status_code == 202
assert req.data == b'Ticket sent'

assert not foo_tickets.called # should be cached from previous test
# should be cached from previous test
assert foo_tickets.call_count == 1
assert stash['response']['token_type'].lower() == 'bearer'
assert stash['response']['me'] == 'https://foo.example/'
token = tokens.parse_token(stash['response']['access_token'])
Expand Down Expand Up @@ -206,3 +206,72 @@ def ticket_endpoint(request, _):
'Authorization': f'Bearer {stash["response"]["refresh_token"]}'
})
assert req.status_code == 401


def test_ticketauth_canonical(requests_mock):
"""
Ensure that rel="canonical" is being correctly respected on TicketAuth grants,
and that identity URLs are being properly canonicalized
"""
app = PublMock()
app.add_url_rule('/_tokens', 'tokens', tokens.indieauth_endpoint,
methods=['GET', 'POST'])

stash = {}

def ticket_endpoint(request, _):
import urllib.parse
args = urllib.parse.parse_qs(request.text)
assert 'subject' in args
assert 'ticket' in args
assert 'resource' in args
stash['ticket'] = args['ticket']

with app.test_client() as client:
req = client.post(token_endpoint, data={
'grant_type': 'ticket',
'ticket': args['ticket']
})
token = json.loads(req.data)
assert 'access_token' in token
assert token['token_type'].lower() == 'bearer'
stash['response'] = token

with app.test_request_context('/'):
token_endpoint = flask.url_for('tokens')

for scheme in ('http', 'https'):
requests_mock.get(f'{scheme}://canonical.ticketauth', text='''
<link rel="ticket_endpoint" href="https://foo.example/tickets">
<link rel="canonical" href="https://canonical.ticketAuth">
<p class="h-card"><span class="p-name">pachelbel</span></p>
''')
requests_mock.post('https://foo.example/tickets', text=ticket_endpoint)

def test_url(identity, match):
with app.test_request_context('/bogus'):
request_url = flask.url_for('tokens')
with app.test_client() as client:
req = client.post(request_url, data={'action': 'ticket',
'subject': identity})
LOGGER.info("Got ticket redemption response %d: %s",
req.status_code, req.data)
assert req.status_code == 202
assert req.data == b'Ticket sent'

assert stash['response']['token_type'].lower() == 'bearer'
assert stash['response']['me'] == match
token = tokens.parse_token(stash['response']['access_token'])
assert token['me'] == match

req = client.get(token_endpoint, headers={
'Authorization': f'Bearer {stash["response"]["access_token"]}'
})
assert req.status_code == 200
assert req.headers['Content-Type'] == 'application/json'
verified = json.loads(req.data)
assert verified['me'] == match

for url in ('http://canonical.ticketauth', 'https://canonical.ticketauth',
'http://Canonical.TicketAuth'):
test_url(url, 'https://canonical.ticketauth/')
12 changes: 12 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,3 +383,15 @@ def login(redir=''):
assert utils.auth_link("login")("/foo") == "https://example.com/_login/foo"
assert utils.auth_link("login")("/bar",
absolute=True) == "https://example.com/_login/bar"


def test_canonicize_url():
""" Test the canonicization of URLs for string-equivalence """
for lhs, rhs in (('https://foo.bar', 'https://foo.bar/'),
('https://Foo.Bar', 'https://foo.BAR')):
assert utils.canonicize_url(lhs) == utils.canonicize_url(rhs)

for lhs, rhs in (('http://foo.bar', 'https://foo.bar/'),
('https://foo.bar/a', 'https://foo.bar/b'),
('https://foo.bar/a', 'https://foo.bar/a/')):
assert utils.canonicize_url(lhs) != utils.canonicize_url(rhs)