Skip to content

Commit

Permalink
Add support for Name.com (#1212)
Browse files Browse the repository at this point in the history
* Add support for Name.com

Name.com API docs: https://www.name.com/api-docs/DNS

* Fix incompatibility with Python 3.7 in mock calls

Co-authored-by: Adrien Ferrand <[email protected]>
  • Loading branch information
giuseongit and adferrand authored May 6, 2022
1 parent 589c1a6 commit e5ea281
Show file tree
Hide file tree
Showing 41 changed files with 9,030 additions and 0 deletions.
1 change: 1 addition & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ lexicon/providers/memset.py @tnwhitwell
lexicon/providers/misaka.py @misakaio @ym
lexicon/providers/mythicbeasts.py @lexitus
lexicon/providers/namecheap.py @pschmitt @rbelnap
lexicon/providers/namecom.py @Jamim
lexicon/providers/namesilo.py @analogj
lexicon/providers/netcup.py @coldfix
lexicon/providers/nfsn.py @tersers
Expand Down
210 changes: 210 additions & 0 deletions lexicon/providers/namecom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
"""Module provider for Name.com"""
from __future__ import absolute_import

import logging

from requests import HTTPError, Session
from requests.auth import HTTPBasicAuth

from lexicon.providers.base import Provider as BaseProvider

LOGGER = logging.getLogger(__name__)

NAMESERVER_DOMAINS = ['name.com']

DUPLICATE_ERROR = {
'message': 'Invalid Argument',
'details': 'Parameter Value Error - Duplicate Record'
}


def provider_parser(subparser):
"""Configure a subparser for Name.com."""

subparser.add_argument('--auth-username', help='specify a username')
subparser.add_argument('--auth-token', help='specify an API token')


class NamecomLoader(object): # pylint: disable=useless-object-inheritance,too-few-public-methods
"""Loader that handles pagination for the Name.com provider."""

def __init__(self, get, url, data_key, next_page=1):
self.get = get
self.url = url
self.data_key = data_key
self.next_page = next_page

def __iter__(self):
while self.next_page:
response = self.get(self.url, {'page': self.next_page})
for data in response[self.data_key]:
yield data
self.next_page = response.get('next_page')


class NamecomProvider(BaseProvider):
"""Provider implementation for Name.com."""

def __init__(self, config):
super(Provider, self).__init__(config)
self.api_endpoint = 'https://api.name.com/v4'
self.session = Session()

def _authenticate(self):
self.session.auth = HTTPBasicAuth(
username=self._get_provider_option('auth_username'),
password=self._get_provider_option('auth_token')
)

# checking domain existence
domain_name = self.domain
for domain in NamecomLoader(self._get, '/domains', 'domains'):
if domain['domainName'] == domain_name:
self.domain_id = domain_name
return

raise Exception('{} domain does not exist'.format(domain_name))

def _create_record(self, rtype, name, content):
data = {
'type': rtype,
'host': self._relative_name(name),
'answer': content,
'ttl': self._get_lexicon_option('ttl')
}

if rtype in ('MX', 'SRV'):
# despite the documentation says a priority is
# required for MX and SRV, it's actually optional
priority = self._get_lexicon_option('priority')
if priority:
data['priority'] = priority

url = '/domains/{}/records'.format(self.domain)
try:
record_id = self._post(url, data)['id']
except HTTPError as error:
response = error.response
if response.status_code == 400 and \
response.json() == DUPLICATE_ERROR:
LOGGER.warning(
'create_record: duplicate record has been skipped'
)
return True
raise

LOGGER.debug('create_record: record %s has been created', record_id)

return record_id

def _list_records(self, rtype=None, name=None, content=None):
url = '/domains/{}/records'.format(self.domain)
records = []

for raw in NamecomLoader(self._get, url, 'records'):
record = {
'id': raw['id'],
'type': raw['type'],
'name': raw['fqdn'][:-1],
'ttl': raw['ttl'],
'content': raw['answer'],
}
records.append(record)

LOGGER.debug('list_records: retrieved %s records', len(records))

if rtype:
records = [record for record in records if record['type'] == rtype]
if name:
name = self._full_name(name)
records = [record for record in records if record['name'] == name]
if content:
records = [record for record in records
if record['content'] == content]

LOGGER.debug('list_records: filtered %s records', len(records))

return records

def _update_record(self, identifier, rtype=None, name=None, content=None):
if not identifier:
if not (rtype and name):
raise ValueError(
'Record identifier or rtype+name must be specified'
)
records = self._list_records(rtype, name)
if not records:
raise Exception('There is no record to update')

if len(records) > 1:
filtered_records = [record for record in records
if record['content'] == content]
if filtered_records:
records = filtered_records

if len(records) > 1:
raise Exception(
'There are multiple records to update: {}'.format(
', '.join(record['id'] for record in records)
)
)

record_id = records[0]['id']
else:
record_id = identifier

data = {'ttl': self._get_lexicon_option('ttl')}

# even though the documentation says a type and an answer
# are required, they are not required actually
if rtype:
data['type'] = rtype
if name:
data['host'] = self._relative_name(name)
if content:
data['answer'] = content

url = '/domains/{}/records/{}'.format(self.domain, record_id)
record_id = self._put(url, data)['id']
logging.debug('update_record: record %s has been updated', record_id)

return record_id

def _delete_record(self, identifier=None,
rtype=None, name=None, content=None):
if not identifier:
if not (rtype and name):
raise ValueError(
'Record identifier or rtype+name must be specified'
)
records = self._list_records(rtype, name, content)
if not records:
LOGGER.warning('delete_record: there is no record to delete')
return False
record_ids = [record['id'] for record in records]
else:
record_ids = [identifier, ]

for record_id in record_ids:
url = '/domains/{}/records/{}'.format(self.domain, record_id)
self._delete(url)
LOGGER.debug(
'delete_record: record %s has been deleted', record_id
)

return True

def _get_raw_record(self, record_id):
url = '/domains/{}/records/{}'.format(self.domain, record_id)
return self._get(url)

def _request(self, action='GET', url='/', data=None, query_params=None):
response = self.session.request(method=action,
url=self.api_endpoint + url,
json=data,
params=query_params)
response.raise_for_status()
return response.json()


Provider = NamecomProvider
148 changes: 148 additions & 0 deletions lexicon/tests/providers/test_namecom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
"""Integration tests for Name.com"""
import json
from unittest import TestCase

import pytest
from unittest.mock import ANY, Mock, patch, call
from requests import HTTPError

from lexicon.config import DictConfigSource
from lexicon.providers.namecom import provider_parser
from lexicon.tests.providers.integration_tests import (
IntegrationTestsV2, vcr_integration_test
)


# Hook into testing framework by inheriting unittest.TestCase and reuse
# the tests which *each and every* implementation of the interface must
# pass, by inheritance from integration_tests.IntegrationTests
class NamecomProviderTests(TestCase, IntegrationTestsV2):
"""TestCase for Name.com"""

# I don't think we really need some docstrings here.
# pylint: disable=missing-function-docstring

provider_name = 'namecom'
domain = 'mim.pw'

def _filter_headers(self):
return ['Authorization', 'Cookie']

def _filter_response(self, response):
headers = response['headers']
headers.pop('Set-Cookie', None)
headers.pop('content-length', None)

if response['status']['code'] == 200:
try:
data = json.loads(response['body']['string'].decode())
except ValueError:
pass
else:
if 'records' in data:
min_id = 10 ** 8
data['records'] = [
record for record in data['records']
if record['id'] > min_id
]
response['body']['string'] = json.dumps(data).encode()

return response

###########################
# Provider.authenticate() #
###########################
@vcr_integration_test
def test_provider_authentication_method(self):
provider = self._construct_authenticated_provider()
assert provider.session.auth

############################
# Provider.create_record() #
############################
@vcr_integration_test
def test_provider_when_calling_create_record_for_MX_with_priority(self): # pylint: disable=invalid-name
priority = 42
config = self._test_config()
config.add_config_source(DictConfigSource({'priority': priority}), 0)
provider = self.provider_module.Provider(config)
provider.authenticate()

record_id = provider.create_record('MX', 'mx.test1', self.domain)
assert provider._get_raw_record(record_id)['priority'] == priority # pylint: disable=protected-access

@vcr_integration_test
def test_provider_when_calling_create_record_for_MX_with_no_priority(self): # pylint: disable=invalid-name
provider = self._construct_authenticated_provider()
record_id = provider.create_record('MX', 'mx.test2', self.domain)
assert 'priority' not in provider._get_raw_record(record_id) # pylint: disable=protected-access

@vcr_integration_test
def test_provider_when_calling_create_record_should_fail_on_http_error(self):
provider = self._construct_authenticated_provider()
error = HTTPError(response=Mock())
with patch.object(provider, '_request', side_effect=error):
with pytest.raises(HTTPError):
provider.create_record('TXT', 'httperror', 'HTTPError')

############################
# Provider.update_record() #
############################
@vcr_integration_test
def test_provider_when_calling_update_record_with_no_identifier_or_rtype_and_name_should_fail(self): # pylint: disable=line-too-long
provider = self._construct_authenticated_provider()
with pytest.raises(ValueError):
provider.update_record(None)

@vcr_integration_test
def test_provider_when_calling_update_record_should_fail_if_no_record_to_update(self):
provider = self._construct_authenticated_provider()
with pytest.raises(Exception):
provider.update_record(None, 'TXT', 'missingrecord')

@vcr_integration_test
def test_provider_when_calling_update_record_should_fail_if_multiple_records_to_update(self):
provider = self._construct_authenticated_provider()
provider.create_record('TXT', 'multiple.test', 'foo')
provider.create_record('TXT', 'multiple.test', 'bar')
with pytest.raises(Exception):
provider.update_record(None, 'TXT', 'multiple.test', 'updated')

@vcr_integration_test
def test_provider_when_calling_update_record_filter_by_content_should_pass(self):
provider = self._construct_authenticated_provider()
provider.create_record('TXT', 'multiple.test', 'foo')
provider.create_record('TXT', 'multiple.test', 'bar')
assert provider.update_record(None, 'TXT', 'multiple.test', 'foo')

@vcr_integration_test
def test_provider_when_calling_update_record_by_identifier_with_no_other_args_should_pass(self):
provider = self._construct_authenticated_provider()
record_id = provider.create_record('TXT', 'update.test', 'foo')
assert provider.update_record(record_id)

############################
# Provider.delete_record() #
############################
@vcr_integration_test
def test_provider_when_calling_delete_record_with_no_identifier_or_rtype_and_name_should_fail(self): # pylint: disable=line-too-long
provider = self._construct_authenticated_provider()
with pytest.raises(ValueError):
provider.delete_record()

@vcr_integration_test
@patch('lexicon.providers.namecom.LOGGER.warning')
def test_provider_when_calling_delete_record_should_pass_if_no_record_to_delete(self, warning):
provider = self._construct_authenticated_provider()
provider.delete_record(None, 'TXT', 'missingrecord')
warning.assert_called_once()
assert call('delete_record: there is no record to delete') == warning.call_args


def test_subparser_configuration():
"""Tests the provider_parser method."""

subparser = Mock()
provider_parser(subparser)
subparser.add_argument.assert_any_call('--auth-username', help=ANY)
subparser.add_argument.assert_any_call('--auth-token', help=ANY)
Loading

0 comments on commit e5ea281

Please sign in to comment.