Skip to content

Commit

Permalink
Version 3.13.0
Browse files Browse the repository at this point in the history
  • Loading branch information
adferrand committed Aug 7, 2023
1 parent 2feee22 commit 9c6b717
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 57 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changelog

## master - CURRENT

## 3.13.0 - 07/08/2023
### Added
* Add `wedos` provider (#1675)

Expand Down
4 changes: 3 additions & 1 deletion lexicon/providers/easydns.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ def __init__(self, config):
def _authenticate(self):
payload = self._get(f"/domain/{self.domain}")
if "error" in payload:
raise AuthenticationError(f'{payload["error"]["code"]} {payload["error"]["message"]}')
raise AuthenticationError(
f'{payload["error"]["code"]} {payload["error"]["message"]}'
)

if payload["data"]["exists"] == "N":
raise AuthenticationError("No domain found")
Expand Down
108 changes: 61 additions & 47 deletions lexicon/providers/wedos.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
import datetime
import hashlib
import json
from typing import Optional, Dict, List
from typing import Dict, List, Optional

import requests

from lexicon.exceptions import AuthenticationError, LexiconError
from lexicon.providers.base import Provider as BaseProvider

NAMESERVER_DOMAINS = [
"wedos.net",
"wedos.eu",
"wedos.cz",
"wedos.com"]
NAMESERVER_DOMAINS = ["wedos.net", "wedos.eu", "wedos.cz", "wedos.com"]


def provider_parser(subparser):
Expand Down Expand Up @@ -55,13 +51,12 @@ def __init__(self, config):

@staticmethod
def _auth_hash(login, password):
passhash = hashlib.sha1(password.encode('utf8')).hexdigest()
phrase = login + passhash + datetime.datetime.now().strftime('%H')
return hashlib.sha1(phrase.encode('utf8')).hexdigest()
passhash = hashlib.sha1(password.encode("utf8")).hexdigest()
phrase = login + passhash + datetime.datetime.now().strftime("%H")
return hashlib.sha1(phrase.encode("utf8")).hexdigest()

def _authenticate(self):

payload = self._post(data=self._create_payload('dns-domains-list', ''))
payload = self._post(data=self._create_payload("dns-domains-list", ""))
domains = payload["response"]["data"]["domain"]
for record in domains:
if record["name"] == self.domain:
Expand All @@ -73,28 +68,33 @@ def _authenticate(self):
self.domain_id = self.domain

def _create_payload(self, command, payload_data):
data = {'request': {'user': self._get_provider_option("auth_username"),
'auth': self._auth_hash(self._get_provider_option("auth_username"),
self._get_provider_option("auth_pass")),
'command': command,
'data': payload_data
}}
return {'request': json.dumps(data)}
data = {
"request": {
"user": self._get_provider_option("auth_username"),
"auth": self._auth_hash(
self._get_provider_option("auth_username"),
self._get_provider_option("auth_pass"),
),
"command": command,
"data": payload_data,
}
}
return {"request": json.dumps(data)}

def _create_record(self, rtype: str, name: str, content: str) -> bool:
records = self._list_records(rtype, name, content)
if len(records) == 1:
return True
data = {
'type': rtype,
'name': self._full_name(name),
'rdata': content,
'domain': self.domain_id
"type": rtype,
"name": self._full_name(name),
"rdata": content,
"domain": self.domain_id,
}
if self._get_lexicon_option("ttl"):
data["ttl"] = self._get_lexicon_option("ttl")

payload = self._post(data=self._create_payload('dns-row-add', data))
payload = self._post(data=self._create_payload("dns-row-add", data))
code = payload["response"]["code"]
if code == 1000:
validation = self._commit_changes()
Expand All @@ -105,9 +105,13 @@ def _create_record(self, rtype: str, name: str, content: str) -> bool:
else:
raise LexiconError("Cannot create records")

def _list_records(self, rtype: Optional[str] = None, name: Optional[str] = None, content: Optional[str] = None) -> \
List[Dict]:
data = self._create_payload('dns-rows-list', {'domain': self.domain_id})
def _list_records(
self,
rtype: Optional[str] = None,
name: Optional[str] = None,
content: Optional[str] = None,
) -> List[Dict]:
data = self._create_payload("dns-rows-list", {"domain": self.domain_id})
payload = self._post(data=data)
records = []
dns_records = payload["response"]["data"]["row"]
Expand All @@ -123,14 +127,21 @@ def _list_records(self, rtype: Optional[str] = None, name: Optional[str] = None,
if rtype is not None:
records = list(rec for rec in records if _filter_rtype(rtype, rec))
if name is not None:
records = list(rec for rec in records if _filter_name(self._full_name(name), rec))
records = list(
rec for rec in records if _filter_name(self._full_name(name), rec)
)
if content is not None:
records = list(rec for rec in records if _filter_content(content, rec))

return records

def _update_record(self, identifier: Optional[str] = None, rtype: Optional[str] = None, name: Optional[str] = None,
content: Optional[str] = None) -> bool:
def _update_record(
self,
identifier: Optional[str] = None,
rtype: Optional[str] = None,
name: Optional[str] = None,
content: Optional[str] = None,
) -> bool:
if not identifier:
records = self._list_records(rtype, name, content)
identifiers = [record["id"] for record in records]
Expand All @@ -142,17 +153,18 @@ def _update_record(self, identifier: Optional[str] = None, rtype: Optional[str]
payloads = []
for record_id in identifiers:
data = {
'type': rtype,

'rdata': content,
'domain': self.domain_id,
'row_id': record_id
"type": rtype,
"rdata": content,
"domain": self.domain_id,
"row_id": record_id,
}
if name:
data['name'] = self._full_name(name)
data["name"] = self._full_name(name)
if self._get_lexicon_option("ttl"):
data["ttl"] = self._get_lexicon_option("ttl")
payloads.append(self._post(data=self._create_payload('dns-row-update', data)))
payloads.append(
self._post(data=self._create_payload("dns-row-update", data))
)

if all(payload["response"]["code"] == 1000 for payload in payloads):
validation = self._commit_changes()
Expand All @@ -163,8 +175,13 @@ def _update_record(self, identifier: Optional[str] = None, rtype: Optional[str]
else:
raise LexiconError("Cannot update records")

def _delete_record(self, identifier: Optional[str] = None, rtype: Optional[str] = None, name: Optional[str] = None,
content: Optional[str] = None) -> bool:
def _delete_record(
self,
identifier: Optional[str] = None,
rtype: Optional[str] = None,
name: Optional[str] = None,
content: Optional[str] = None,
) -> bool:
if not identifier:
records = self._list_records(rtype, name, content)
identifiers = [record["id"] for record in records]
Expand All @@ -175,11 +192,10 @@ def _delete_record(self, identifier: Optional[str] = None, rtype: Optional[str]
return True
payloads = []
for record_id in identifiers:
data = {
'domain': self.domain_id,
'row_id': record_id
}
payloads.append(self._post(data=self._create_payload('dns-row-delete', data)))
data = {"domain": self.domain_id, "row_id": record_id}
payloads.append(
self._post(data=self._create_payload("dns-row-delete", data))
)

if all(payload["response"]["code"] == 1000 for payload in payloads):
validation = self._commit_changes()
Expand All @@ -191,11 +207,9 @@ def _delete_record(self, identifier: Optional[str] = None, rtype: Optional[str]
raise LexiconError("Cannot delete records")

def _commit_changes(self) -> bool:
data = {
'name': self.domain_id
}
data = {"name": self.domain_id}

payload = self._post(data=self._create_payload('dns-domain-commit', data))
payload = self._post(data=self._create_payload("dns-domain-commit", data))
code = payload["response"]["code"]
if code == 1000:
return True
Expand Down
19 changes: 11 additions & 8 deletions lexicon/tests/providers/test_wedos.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,33 @@
import json
import re
import urllib.parse
from unittest import TestCase

from lexicon.tests.providers.integration_tests import IntegrationTestsV2
from unittest import TestCase


# Hook into testing framework by inheriting unittest.TestCase and reuse
# the tests which *each and every* implementation of the interface must
# pass, by inheritance from integration_tests.IntegrationTests
class WedosProviderTests(TestCase, IntegrationTestsV2):
"""Integration tests for wedos provider"""
provider_name = 'wedos'
domain = 'kaniok.com'

provider_name = "wedos"
domain = "kaniok.com"

def _filter_request(self, request):
request_start_string = 'request='
request_start_string = "request="
try:
body = urllib.parse.unquote_plus(urllib.parse.unquote(request.body.decode()))
body = re.sub(r'request=', '', body)
body = urllib.parse.unquote_plus(
urllib.parse.unquote(request.body.decode())
)
body = re.sub(r"request=", "", body)
data = json.loads(body)
except ValueError:
pass
else:
data['request']['user'] = 'username'
data['request']['auth'] = 'password'
data["request"]["user"] = "username"
data["request"]["auth"] = "password"
body = request_start_string + json.dumps(data)
body = urllib.parse.quote(urllib.parse.quote_plus(body.encode()))
request.body = body
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "dns-lexicon"
version = "3.12.0"
version = "3.13.0"
description = "Manipulate DNS records on various DNS providers in a standardized/agnostic way"
license = "MIT"
keywords = [
Expand Down

0 comments on commit 9c6b717

Please sign in to comment.