Skip to content

Commit

Permalink
Add AdminBlockRuleViews WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
mtomilov committed Jan 1, 2025
1 parent 82b4186 commit 606f1cb
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 18 deletions.
2 changes: 1 addition & 1 deletion checkmate/checker/url/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from checkmate.checker.url.allow_rules import AllowRules
from checkmate.checker.url.custom_rules import CustomRules
from checkmate.checker.url.custom_rules import CustomRules, BlocklistParser
from checkmate.checker.url.url_haus import URLHaus
6 changes: 3 additions & 3 deletions checkmate/checker/url/custom_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ def load_simple_rules(self, raw_rules): # pragma: no cover
CustomRule.bulk_upsert(
self._session,
values=[
self._value_from_domain(domain, reason)
self.value_from_domain(domain, reason)
for domain, reason in raw_rules.items()
],
)

@staticmethod
def _value_from_domain(domain, reason): # pragma: no cover
def value_from_domain(domain, reason): # pragma: no cover
if "*" in domain:
# It's a wild card!
domain = domain.lstrip("*")
if "*" in domain:
raise ValueError("Cannot convert non prefix wildcard")
raise ValueError(f"Cannot convert non prefix wildcard: {domain!r}")

# Using a raw domain as a URL is close enough, as it's subject
# to normalisation anyway
Expand Down
1 change: 1 addition & 0 deletions checkmate/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def add_routes(config):

config.add_route("admin.index", "/admin/")
config.add_route("admin.allow_url", "/admin/allow_url/")
config.add_route("admin.block_list", "/admin/block_list/")

config.add_route("add_to_allow_list", "/api/rule", request_method="POST")

Expand Down
4 changes: 4 additions & 0 deletions checkmate/services/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from checkmate.services.rule import RuleService
from checkmate.services.custom_rule import CustomRuleService
from checkmate.services.secure_link import SecureLinkService
from checkmate.services.signature import SignatureService
from checkmate.services.url_checker import URLCheckerService
Expand All @@ -18,3 +19,6 @@ def includeme(config): # pragma: no cover
config.register_service_factory(
"checkmate.services.rule.factory", iface=RuleService
)
config.register_service_factory(
"checkmate.services.custom_rule.factory", iface=CustomRuleService
)
59 changes: 59 additions & 0 deletions checkmate/services/custom_rule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from checkmate.checker import url
from checkmate.models import Reason, CustomRule


class CustomRuleService:
def __init__(self, db):
self._db = db

def set_block_list(self, text: str) -> list[str]:
rules, errors = self._parse_text(text)
if not errors:
self._set_custom_rules(rules)
return errors

def get_block_list(self) -> str:
lines = [self._render_line(rule) for rule in self._db.query(CustomRule).all()]
lines.sort()
return "\n".join(lines)

@staticmethod
def _render_line(rule: CustomRule) -> str:
return "{} {}".format(rule.rule, ",".join(tag.value for tag in rule.reasons))

def _set_custom_rules(self, rules: list[CustomRule]) -> None:
self._db.query(CustomRule).delete()
CustomRule.bulk_upsert(self._db, values=rules)

def _parse_text(self, text: str) -> tuple[list[CustomRule], list[str]]:
rules, errors = [], []
for line in text.split("\n"):
line = line.strip()
if not line or line.startswith("#"):
continue

try:
domain, reason = self._parse_line(line)
except ValueError as e:
errors.append(str(e))
continue

try:
rules.append(url.CustomRules.value_from_domain(domain, reason))
except ValueError as e:
errors.append(str(e))

return rules, errors

@staticmethod
def _parse_line(line: str) -> tuple[str, Reason] | None:
match = url.BlocklistParser.LINE_PATTERN.match(line)
if not match:
raise ValueError(f"Cannot parse blocklist line: {line!r}")

raw_rule, reason = match.group(1), match.group(2)
return raw_rule, Reason.parse(reason)


def factory(_context, request):
return CustomRuleService(request.db)
2 changes: 2 additions & 0 deletions checkmate/templates/admin/base.html.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
<div class="navbar-start" style="flex-grow: 1; justify-content: center;">
<a class="navbar-item"
href="{{ request.route_url("admin.allow_url") }}">Allow URL</a>
<a class="navbar-item"
href="{{ request.route_url("admin.block_list") }}">Block List</a>
<div class="navbar-end">
<div class="navbar-item">
<div class="buttons">
Expand Down
63 changes: 63 additions & 0 deletions checkmate/templates/admin/block_list.html.jinja2
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{% extends "checkmate:templates/admin/base.html.jinja2" %}
{% block content %}

<section>
<div class="container">
{% if errors %}
<article class="message is-danger">
<div class="message-body">
{% for error in errors %}
<p>{{ error }}</p>
{% endfor %}
</div>
</article>
{% endif %}

{% if message %}
<article class="message">
<div class="message-body">
<p>{{ message }}</p>
</div>
</article>
{% endif %}
</div>
</section>

<section>
<div class="container">
<fieldset class="box mt-6">
<legend class="label has-text-centered">Update Block List</legend>
<form action="{{ request.route_url("admin.block_list") }}" method="POST">
<input type="hidden" name="csrf_token" value="{{ get_csrf_token() }}">

<div class="field is-horizontal">
<div class="field-label is-normal">
<label class="label">Block List</label>
</div>
<div class="field-body">
<div class="field">
<div class="control is-expanded">
<textarea class="textarea" name="block-list" rows="20">{{ block_list }}</textarea>
</div>
</div>
</div>
</div>

<div class="field is-horizontal">
<div class="field-label is-normal">
<label class="label"></label>
</div>
<div class="field-body">
<div class="field">
<div class="control is-expanded">
<input type="submit" class="button is-info" value="Update">
</div>
</div>
</div>
</div>
</form>
</fieldset>
</div>
</section>

{% endblock %}
57 changes: 43 additions & 14 deletions checkmate/views/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from checkmate.exceptions import ResourceConflict
from checkmate.security import Permissions
from checkmate.services import RuleService
from checkmate.services import CustomRuleService


@view_config(route_name="admin.index")
Expand All @@ -21,35 +22,63 @@ def notfound(_request):
return HTTPNotFound()


@forbidden_view_config(path_info="/admin/*")
def logged_out(request):
return HTTPFound(location=request.route_url("pyramid_googleauth.login"))


@view_defaults(
route_name="admin.allow_url",
renderer="checkmate:templates/admin/allow_url.html.jinja2",
permission=Permissions.ADMIN,
)
class AdminAllowRuleViews:
def __init__(self, request):
self.request = request
self._request = request
self._rule_service: RuleService = request.find_service(RuleService)

@view_config(
request_method="GET",
permission=Permissions.ADMIN,
)
@view_config(request_method="GET")
def get(self):
return {}

@view_config(
request_method="POST",
permission=Permissions.ADMIN,
)
@view_config(request_method="POST")
def post(self):
url = self.request.params.get("url")
url = self._request.params.get("url")
if not url:
return {"messages": [{"detail": "URL is required"}]}
try:
allow_rule = self.request.find_service(RuleService).add_to_allow_list(url)
allow_rule = self._rule_service.add_to_allow_list(url)
except ResourceConflict as e:
return {"messages": e.messages}
return {"allow_rule": allow_rule}

@forbidden_view_config()
def logged_out(self):
return HTTPFound(location=self.request.route_url("pyramid_googleauth.login"))

@view_defaults(
route_name="admin.block_list",
renderer="checkmate:templates/admin/block_list.html.jinja2",
permission=Permissions.ADMIN,
)
class AdminBlockListViews:
def __init__(self, request):
self._request = request
self._custom_rule_service: CustomRuleService = request.find_service(
CustomRuleService
)

@view_config(request_method="GET")
def get(self):
text = self._custom_rule_service.get_block_list()
return {"block_list": text}

@view_config(request_method="POST")
def post(self):
block_list: str = self._request.POST["block-list"]
if not block_list:
return {"errors": ["Block List is required"]}

errors = self._custom_rule_service.set_block_list(block_list)
if errors:
return {"errors": errors, "block_list": block_list}

block_list = self._custom_rule_service.get_block_list()
return {"message": "Block List set successfully", "block_list": block_list}

0 comments on commit 606f1cb

Please sign in to comment.