From 9c6b7179e380b0dc74255319c84c1991481ffe45 Mon Sep 17 00:00:00 2001 From: Adrien Ferrand Date: Mon, 7 Aug 2023 22:59:12 +0200 Subject: [PATCH] Version 3.13.0 --- CHANGELOG.md | 2 + lexicon/providers/easydns.py | 4 +- lexicon/providers/wedos.py | 108 +++++++++++++++----------- lexicon/tests/providers/test_wedos.py | 19 +++-- pyproject.toml | 2 +- 5 files changed, 78 insertions(+), 57 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4fba4ae6d..0d5ecc4cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ # Changelog ## master - CURRENT + +## 3.13.0 - 07/08/2023 ### Added * Add `wedos` provider (#1675) diff --git a/lexicon/providers/easydns.py b/lexicon/providers/easydns.py index 472792efa..a526ab60f 100644 --- a/lexicon/providers/easydns.py +++ b/lexicon/providers/easydns.py @@ -33,7 +33,9 @@ def __init__(self, config): def _authenticate(self): payload = self._get(f"/domain/{self.domain}") if "error" in payload: - raise AuthenticationError(f'{payload["error"]["code"]} {payload["error"]["message"]}') + raise AuthenticationError( + f'{payload["error"]["code"]} {payload["error"]["message"]}' + ) if payload["data"]["exists"] == "N": raise AuthenticationError("No domain found") diff --git a/lexicon/providers/wedos.py b/lexicon/providers/wedos.py index 0e628d57a..b1d689ccc 100644 --- a/lexicon/providers/wedos.py +++ b/lexicon/providers/wedos.py @@ -1,18 +1,14 @@ import datetime import hashlib import json -from typing import Optional, Dict, List +from typing import Dict, List, Optional import requests from lexicon.exceptions import AuthenticationError, LexiconError from lexicon.providers.base import Provider as BaseProvider -NAMESERVER_DOMAINS = [ - "wedos.net", - "wedos.eu", - "wedos.cz", - "wedos.com"] +NAMESERVER_DOMAINS = ["wedos.net", "wedos.eu", "wedos.cz", "wedos.com"] def provider_parser(subparser): @@ -55,13 +51,12 @@ def __init__(self, config): @staticmethod def _auth_hash(login, password): - passhash = hashlib.sha1(password.encode('utf8')).hexdigest() - phrase = login + passhash + datetime.datetime.now().strftime('%H') - return hashlib.sha1(phrase.encode('utf8')).hexdigest() + passhash = hashlib.sha1(password.encode("utf8")).hexdigest() + phrase = login + passhash + datetime.datetime.now().strftime("%H") + return hashlib.sha1(phrase.encode("utf8")).hexdigest() def _authenticate(self): - - payload = self._post(data=self._create_payload('dns-domains-list', '')) + payload = self._post(data=self._create_payload("dns-domains-list", "")) domains = payload["response"]["data"]["domain"] for record in domains: if record["name"] == self.domain: @@ -73,28 +68,33 @@ def _authenticate(self): self.domain_id = self.domain def _create_payload(self, command, payload_data): - data = {'request': {'user': self._get_provider_option("auth_username"), - 'auth': self._auth_hash(self._get_provider_option("auth_username"), - self._get_provider_option("auth_pass")), - 'command': command, - 'data': payload_data - }} - return {'request': json.dumps(data)} + data = { + "request": { + "user": self._get_provider_option("auth_username"), + "auth": self._auth_hash( + self._get_provider_option("auth_username"), + self._get_provider_option("auth_pass"), + ), + "command": command, + "data": payload_data, + } + } + return {"request": json.dumps(data)} def _create_record(self, rtype: str, name: str, content: str) -> bool: records = self._list_records(rtype, name, content) if len(records) == 1: return True data = { - 'type': rtype, - 'name': self._full_name(name), - 'rdata': content, - 'domain': self.domain_id + "type": rtype, + "name": self._full_name(name), + "rdata": content, + "domain": self.domain_id, } if self._get_lexicon_option("ttl"): data["ttl"] = self._get_lexicon_option("ttl") - payload = self._post(data=self._create_payload('dns-row-add', data)) + payload = self._post(data=self._create_payload("dns-row-add", data)) code = payload["response"]["code"] if code == 1000: validation = self._commit_changes() @@ -105,9 +105,13 @@ def _create_record(self, rtype: str, name: str, content: str) -> bool: else: raise LexiconError("Cannot create records") - def _list_records(self, rtype: Optional[str] = None, name: Optional[str] = None, content: Optional[str] = None) -> \ - List[Dict]: - data = self._create_payload('dns-rows-list', {'domain': self.domain_id}) + def _list_records( + self, + rtype: Optional[str] = None, + name: Optional[str] = None, + content: Optional[str] = None, + ) -> List[Dict]: + data = self._create_payload("dns-rows-list", {"domain": self.domain_id}) payload = self._post(data=data) records = [] dns_records = payload["response"]["data"]["row"] @@ -123,14 +127,21 @@ def _list_records(self, rtype: Optional[str] = None, name: Optional[str] = None, if rtype is not None: records = list(rec for rec in records if _filter_rtype(rtype, rec)) if name is not None: - records = list(rec for rec in records if _filter_name(self._full_name(name), rec)) + records = list( + rec for rec in records if _filter_name(self._full_name(name), rec) + ) if content is not None: records = list(rec for rec in records if _filter_content(content, rec)) return records - def _update_record(self, identifier: Optional[str] = None, rtype: Optional[str] = None, name: Optional[str] = None, - content: Optional[str] = None) -> bool: + def _update_record( + self, + identifier: Optional[str] = None, + rtype: Optional[str] = None, + name: Optional[str] = None, + content: Optional[str] = None, + ) -> bool: if not identifier: records = self._list_records(rtype, name, content) identifiers = [record["id"] for record in records] @@ -142,17 +153,18 @@ def _update_record(self, identifier: Optional[str] = None, rtype: Optional[str] payloads = [] for record_id in identifiers: data = { - 'type': rtype, - - 'rdata': content, - 'domain': self.domain_id, - 'row_id': record_id + "type": rtype, + "rdata": content, + "domain": self.domain_id, + "row_id": record_id, } if name: - data['name'] = self._full_name(name) + data["name"] = self._full_name(name) if self._get_lexicon_option("ttl"): data["ttl"] = self._get_lexicon_option("ttl") - payloads.append(self._post(data=self._create_payload('dns-row-update', data))) + payloads.append( + self._post(data=self._create_payload("dns-row-update", data)) + ) if all(payload["response"]["code"] == 1000 for payload in payloads): validation = self._commit_changes() @@ -163,8 +175,13 @@ def _update_record(self, identifier: Optional[str] = None, rtype: Optional[str] else: raise LexiconError("Cannot update records") - def _delete_record(self, identifier: Optional[str] = None, rtype: Optional[str] = None, name: Optional[str] = None, - content: Optional[str] = None) -> bool: + def _delete_record( + self, + identifier: Optional[str] = None, + rtype: Optional[str] = None, + name: Optional[str] = None, + content: Optional[str] = None, + ) -> bool: if not identifier: records = self._list_records(rtype, name, content) identifiers = [record["id"] for record in records] @@ -175,11 +192,10 @@ def _delete_record(self, identifier: Optional[str] = None, rtype: Optional[str] return True payloads = [] for record_id in identifiers: - data = { - 'domain': self.domain_id, - 'row_id': record_id - } - payloads.append(self._post(data=self._create_payload('dns-row-delete', data))) + data = {"domain": self.domain_id, "row_id": record_id} + payloads.append( + self._post(data=self._create_payload("dns-row-delete", data)) + ) if all(payload["response"]["code"] == 1000 for payload in payloads): validation = self._commit_changes() @@ -191,11 +207,9 @@ def _delete_record(self, identifier: Optional[str] = None, rtype: Optional[str] raise LexiconError("Cannot delete records") def _commit_changes(self) -> bool: - data = { - 'name': self.domain_id - } + data = {"name": self.domain_id} - payload = self._post(data=self._create_payload('dns-domain-commit', data)) + payload = self._post(data=self._create_payload("dns-domain-commit", data)) code = payload["response"]["code"] if code == 1000: return True diff --git a/lexicon/tests/providers/test_wedos.py b/lexicon/tests/providers/test_wedos.py index b06881e34..a992c6817 100644 --- a/lexicon/tests/providers/test_wedos.py +++ b/lexicon/tests/providers/test_wedos.py @@ -2,9 +2,9 @@ import json import re import urllib.parse +from unittest import TestCase from lexicon.tests.providers.integration_tests import IntegrationTestsV2 -from unittest import TestCase # Hook into testing framework by inheriting unittest.TestCase and reuse @@ -12,20 +12,23 @@ # pass, by inheritance from integration_tests.IntegrationTests class WedosProviderTests(TestCase, IntegrationTestsV2): """Integration tests for wedos provider""" - provider_name = 'wedos' - domain = 'kaniok.com' + + provider_name = "wedos" + domain = "kaniok.com" def _filter_request(self, request): - request_start_string = 'request=' + request_start_string = "request=" try: - body = urllib.parse.unquote_plus(urllib.parse.unquote(request.body.decode())) - body = re.sub(r'request=', '', body) + body = urllib.parse.unquote_plus( + urllib.parse.unquote(request.body.decode()) + ) + body = re.sub(r"request=", "", body) data = json.loads(body) except ValueError: pass else: - data['request']['user'] = 'username' - data['request']['auth'] = 'password' + data["request"]["user"] = "username" + data["request"]["auth"] = "password" body = request_start_string + json.dumps(data) body = urllib.parse.quote(urllib.parse.quote_plus(body.encode())) request.body = body diff --git a/pyproject.toml b/pyproject.toml index c47860706..546c9f0cf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "dns-lexicon" -version = "3.12.0" +version = "3.13.0" description = "Manipulate DNS records on various DNS providers in a standardized/agnostic way" license = "MIT" keywords = [