Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixup domain prefix #58

Merged
merged 5 commits into from
Sep 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions wgkex/broker/app_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@
import mock
import app
import sys

_VALID_CFG = "domains:\n- a\n- b\nmqtt:\n broker_port: 1883\n broker_url: mqtt://broker\n keepalive: 5\n password: pass\n tls: true\n username: user\n"
_INVALID_CFG = "asdasdasdasd"
from wgkex.config.config_test import _VALID_CFG
from wgkex.config.config_test import _INVALID_CFG


class TestApp(unittest.TestCase):

# TODO(ruairi): Add test for Flask.
# def setUp(self) -> None:
# mock_open = mock.mock_open(read_data=_VALID_CFG)
# with mock.patch("builtins.open", mock_open):
# app_cfg = app.app.test_client()
# app.main()
# self.app_cfg = app_cfg
# mock_open = mock.mock_open(read_data=_VALID_CFG)
# with mock.patch("builtins.open", mock_open):
# app_cfg = app.app.test_client()
# app.main()
# self.app_cfg = app_cfg

def test_app_load_success(self):
"""Tests _fetch_app_config success."""
Expand Down
19 changes: 15 additions & 4 deletions wgkex/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from functools import lru_cache
from typing import Dict, Union, Any, List, Optional
import dataclasses
import logging


class Error(Exception):
Expand All @@ -17,6 +18,11 @@ class ConfigFileNotFoundError(Error):

WG_CONFIG_OS_ENV = "WGKEX_CONFIG_FILE"
WG_CONFIG_DEFAULT_LOCATION = "/etc/wgkex.yaml"
logging.basicConfig(
format="%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s",
datefmt="%Y-%m-%d:%H:%M:%S",
level=logging.DEBUG,
)


@dataclasses.dataclass
Expand All @@ -27,6 +33,7 @@ class MQTT:
broker_url: The broker URL for MQTT to connect to.
username: The username to use for MQTT.
password: The password to use for MQTT.
domain_prefix: The prefix to pre-pend to a given domain.
tls: If TLS is used or not.
broker_port: The port for MQTT to connect on.
keepalive: The keepalive in seconds to use.
Expand All @@ -35,6 +42,7 @@ class MQTT:
broker_url: str
username: str
password: str
domain_prefix: str
tls: bool = False
broker_port: int = 1883
keepalive: int = 5
Expand All @@ -46,8 +54,11 @@ def from_dict(cls, mqtt_cfg: Dict[str, str]) -> "MQTT":
username=mqtt_cfg["username"],
password=mqtt_cfg["password"],
tls=mqtt_cfg["tls"] if mqtt_cfg["tls"] else False,
broker_port=mqtt_cfg["broker_port"] if mqtt_cfg["broker_port"] else None,
keepalive=mqtt_cfg["keepalive"] if mqtt_cfg["keepalive"] else None,
broker_port=int(mqtt_cfg["broker_port"])
if mqtt_cfg["broker_port"]
else None,
keepalive=int(mqtt_cfg["keepalive"]) if mqtt_cfg["keepalive"] else None,
domain_prefix=mqtt_cfg["domain_prefix"],
)


Expand Down Expand Up @@ -97,13 +108,13 @@ def load_config() -> Dict[str, str]:
try:
config = yaml.safe_load(cfg_contents)
except yaml.YAMLError as e:
print(f"Failed to load YAML file: {e}")
logging.error("Failed to load YAML file: %s", e)
sys.exit(1)
try:
_ = Config.from_dict(config)
return config
except (KeyError, TypeError) as e:
print(f"Failed to lint file: {e}", file=sys.stderr)
logging.error("Failed to lint file: %s", e)
sys.exit(2)


Expand Down
4 changes: 2 additions & 2 deletions wgkex/config/config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import config
import yaml

_VALID_CFG = "domains:\n- a\n- b\nmqtt:\n broker_port: 1883\n broker_url: mqtt://broker\n keepalive: 5\n password: pass\n tls: true\n username: user\n"
_INVALID_LINT = "derpmains:\n- a\n- b\nmqtt:\n broker_port: 1883\n broker_url: mqtt://broker\n keepalive: 5\n password: pass\n tls: true\n username: user\n"
_VALID_CFG = "domains:\n- a\n- b\nmqtt:\n broker_port: 1883\n broker_url: mqtt://broker\n keepalive: 5\n password: pass\n tls: true\n username: user\n domain_prefix: ffmuc_\n"
_INVALID_LINT = "derpmains:\n- a\n- b\nmqtt:\n broker_port: 1883\n broker_url: mqtt://broker\n keepalive: 5\n password: pass\n tls: true\n username: user\n domain_prefix: ffmuc_\n"
_INVALID_CFG = "asdasdasdasd"


Expand Down
52 changes: 40 additions & 12 deletions wgkex/worker/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@
from wgkex.worker.netlink import wg_flush_stale_peers
import threading
import time
import logging
import datetime
from typing import List, Text

logging.basicConfig(
format="%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s",
datefmt="%Y-%m-%d:%H:%M:%S",
level=logging.DEBUG,
)

_CLEANUP_TIME = datetime.timedelta(seconds=300)


class Error(Exception):
Expand All @@ -15,11 +26,35 @@ class DomainsNotInConfig(Error):
"""If no domains exist in configuration file."""


def clean_up_worker(domain: str) -> None:
def flush_workers(domain: Text) -> None:
"""Calls peer flush every _CLEANUP_TIME interval."""
while True:
time.sleep(300)
print(f"Running cleanup task for {domain}")
wg_flush_stale_peers(domain)
time.sleep(_CLEANUP_TIME)
logging.info(f"Running cleanup task for {domain}")
logging.info("Cleaned up domains: %s", wg_flush_stale_peers(domain))


def clean_up_worker(domains: List[Text]) -> None:
"""Wraps flush_workers in a thread for all given domains.

Arguments:
domains: list of domains.
"""
logging.debug("Cleaning up the following domains: %s", domains)
prefix = config.load_config().get("domain_prefix")
for domain in domains:
logging.info("Scheduling cleanup task for %s, ", domain)
try:
cleaned_domain = domain.split(prefix)[1]
except IndexError:
logging.error(
"Cannot strip domain with prefix %s from passed value %s. Skipping cleanup operation",
prefix,
domain,
)
continue
thread = threading.Thread(target=flush_workers, args=(cleaned_domain,))
thread.start()


def main():
Expand All @@ -31,14 +66,7 @@ def main():
domains = config.load_config().get("domains")
if not domains:
raise DomainsNotInConfig("Could not locate domains in configuration.")
clean_up_threads = []
for domain in domains:
print(f"Scheduling cleanup task for {domain}")
thread = threading.Thread(
target=clean_up_worker, args=(domain.split("ffmuc_")[1],)
)
thread.start()
clean_up_threads.append(thread)
clean_up_worker(domains)
mqtt.connect(domains)


Expand Down
15 changes: 11 additions & 4 deletions wgkex/worker/app_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,28 @@


class AppTest(unittest.TestCase):
def setUp(self) -> None:
app._CLEANUP_TIME = 0

@mock.patch.object(app.config, "load_config")
@mock.patch.object(app.mqtt, "connect", autospec=True)
def test_main_success(self, connect_mock, config_mock):
"""Ensure we can execute main."""
connect_mock.return_value = None
config_mock.return_value = dict(domains=["domain.one"])
app.main()
connect_mock.assert_called_with(["domain.one"])
test_prefix = "TEST_PREFIX_"
config_mock.return_value = dict(
domains=[f"{test_prefix}domain.one"], domain_prefix=test_prefix
)
with mock.patch("app.flush_workers", return_value=None):
app.main()
connect_mock.assert_called_with(["TEST_PREFIX_domain.one"])

@mock.patch.object(app.config, "load_config")
@mock.patch.object(app.mqtt, "connect", autospec=True)
def test_main_fails_no_domain(self, connect_mock, config_mock):
"""Ensure we fail when domains are not configured."""
connect_mock.return_value = None
config_mock.return_value = dict(domains=None)
connect_mock.return_value = None
with self.assertRaises(app.DomainsNotInConfig):
app.main()

Expand Down
6 changes: 6 additions & 0 deletions wgkex/worker/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
from typing import Optional, Dict, List, Any, Union
import logging

logging.basicConfig(
format="%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s",
datefmt="%Y-%m-%d:%H:%M:%S",
level=logging.DEBUG,
)


def fetch_from_config(var: str) -> Optional[Union[Dict[str, str], str]]:
"""Fetches values from configuration file.
Expand Down
1 change: 0 additions & 1 deletion wgkex/worker/netlink.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from datetime import timedelta
from textwrap import wrap
from typing import Dict, List

import pyroute2

from wgkex.common.utils import mac2eui64
Expand Down