From 6857403af9344a72a5e8b325efb91248a0ac9db7 Mon Sep 17 00:00:00 2001 From: Nils Wisiol Date: Mon, 18 May 2020 16:41:07 +0200 Subject: [PATCH] wip --- api/desecapi/models.py | 38 ++++++++---------- api/desecapi/tests/test_rrsets.py | 67 ++++++++++++++++++------------- 2 files changed, 55 insertions(+), 50 deletions(-) diff --git a/api/desecapi/models.py b/api/desecapi/models.py index 6e027f319..d3fd30acc 100644 --- a/api/desecapi/models.py +++ b/api/desecapi/models.py @@ -372,16 +372,7 @@ class Meta: # backend types are types that are the types supported by the backend(s) RR_SET_TYPES_BACKEND = pdns.SUPPORTED_RRSET_TYPES # validation types are types supported by the validation backend, currently: dnspython -RR_SET_TYPES_VALIDATION = { - # from dns import rdatatype - # sorted(rdatatype._by_text.keys()) - 'A', 'A6', 'AAAA', 'AFSDB', 'ANY', 'APL', 'AVC', 'AXFR', 'CAA', 'CDNSKEY', 'CDS', 'CERT', 'CNAME', 'CSYNC', - 'DHCID', 'DLV', 'DNAME', 'DNSKEY', 'DS', 'EUI48', 'EUI64', 'GPOS', 'HINFO', 'HIP', 'IPSECKEY', 'ISDN', 'IXFR', - 'KEY', 'KX', 'LOC', 'MAILA', 'MAILB', 'MB', 'MD', 'MF', 'MG', 'MINFO', 'MR', 'MX', 'NAPTR', 'NONE', 'NS', - 'NSAP', 'NSAP-PTR', 'NSEC', 'NSEC3', 'NSEC3PARAM', 'NULL', 'NXT', 'OPENPGPKEY', 'OPT', 'PTR', 'PX', 'RP', - 'RRSIG', 'RT', 'SIG', 'SOA', 'SPF', 'SRV', 'SSHFP', 'TA', 'TKEY', 'TLSA', 'TSIG', 'TXT', 'UNSPEC', 'URI', 'WKS', - 'X25' -} +RR_SET_TYPES_VALIDATION = set(rdatatype._by_text.keys()) # manageable types are directly managed by the API client RR_SET_TYPES_MANAGEABLE = \ (RR_SET_TYPES_BACKEND & RR_SET_TYPES_VALIDATION) - RR_SET_TYPES_UNSUPPORTED - RR_SET_TYPES_AUTOMATIC @@ -455,36 +446,39 @@ def clean_records(self, records_presentation_format): :param records_presentation_format: iterable of records in presentation format """ - rdtype = rdatatype.from_text(self.type) # TODO may raise if type is unknown + rdtype = rdatatype.from_text(self.type) + errors = [] - def _error(record, msg): - raise ValidationError(f'Record content of {self.name}/{self.type} invalid: `{record}`: {msg}') + def _error_msg(record, detail): + return f'Record content of {self.name}/{self.type} invalid: \'{record}\': {detail}' - records_canonical_presentation_format = [] + records_canonical_presentation_format = set() for r in records_presentation_format: try: - records_canonical_presentation_format.append(RR.canonical_presentation_format(r, rdtype)) + records_canonical_presentation_format.add(RR.canonical_presentation_format(r, rdtype)) except binascii.Error: # e.g., odd-length string - _error(r, 'Cannot parse hexadecimal or base64 record contents') + errors.append(_error_msg(r, 'Cannot parse hexadecimal or base64 record contents')) except dns.exception.SyntaxError as e: # e.g., A/127.0.0.999 if e.args[0] == 'string too long': - _error(r, f'Splitting the data in chunks is required (RFC 4408).') - _error(r, 'Record syntax malformed') + errors.append(_error_msg(r, f'Splitting the data in chunks is required (RFC 4408).')) + errors.append(_error_msg(r, 'Record syntax malformed')) except dns.name.NeedAbsoluteNameOrOrigin: - _error(r, 'Hostname must be fully qualified (i.e., end in a dot: "example.com.")') + errors.append(_error_msg(r, 'Hostname must be fully qualified (i.e., end in a dot: "example.com.")')) except ValueError: # e.g., string ("asdf") cannot be parsed into int on base 10 - _error(r, 'Cannot parse record contents') + errors.append(_error_msg(r, 'Cannot parse record contents')) except Exception as e: # TODO see what exceptions raise here for faulty input raise e - records_canonical_presentation_format = set(records_canonical_presentation_format) if len(records_canonical_presentation_format) < len(records_presentation_format): # Duplicate record content - raise ValidationError(f'Duplicate records in RR set {self.type}/{self.name}.') + errors.append(f'Duplicate records in RR set {self.type}/{self.name}.') + + if any(errors): + raise ValidationError(errors) return records_canonical_presentation_format diff --git a/api/desecapi/tests/test_rrsets.py b/api/desecapi/tests/test_rrsets.py index 01967903c..20f3c80a6 100644 --- a/api/desecapi/tests/test_rrsets.py +++ b/api/desecapi/tests/test_rrsets.py @@ -197,50 +197,43 @@ def test_create_other_rr_sets(self): response = self.client.post_rr_set(self.other_domain.name, **data) self.assertStatus(response, status.HTTP_404_NOT_FOUND) + @staticmethod + def _create_test_txt_record(record): + return {'records': [f'{record}'], 'ttl': 3600, 'type': 'TXT', 'subname': f'name{len(record)}'} + def test_create_my_rr_sets_chunk_too_long(self): - def _create_data(record): - return {'records': [f'{record}'], 'ttl': 3600, 'type': 'TXT', 'subname': f'name{len(record)}'} with self.assertPdnsRequests(self.requests_desec_rr_sets_update(self.my_empty_domain.name)): response = self.client.post_rr_set( self.my_empty_domain.name, - # record of length 500 bytes in chunks of max 255 each (RFC 4408) - # "AAA...AAA" "AAA...AAA" - **_create_data('"' + 'A' * 255 + '"') + **self._create_test_txt_record(f'"{"A" * 255}"') ) self.assertStatus(response, status.HTTP_201_CREATED) response = self.client.post_rr_set( self.my_empty_domain.name, - # record of length 501 bytes in chunks of max 255 each (RFC 4408) - # "AAA...AAA" "AAA...AAA" - **_create_data('"' + 'A' * 256 + '"') + **self._create_test_txt_record(f'"{"A" * 256}"') ) self.assertStatus(response, status.HTTP_400_BAD_REQUEST) self.assertIn('Splitting the data in chunks is required (RFC 4408).', str(response.data)) def test_create_my_rr_sets_too_long_content(self): - def _create_data(record): - return {'records': [f'{record}'], 'ttl': 3600, 'type': 'TXT', 'subname': f'name{len(record)}'} + response = self.client.post_rr_set( + self.my_empty_domain.name, + # record of length 501 bytes in chunks of max 255 each (RFC 4408) + **self._create_test_txt_record(f'"{"A" * 255}" "{"A" * 241}"') + ) + self.assertStatus(response, status.HTTP_400_BAD_REQUEST) + self.assertIn('Ensure this field has no more than 500 characters.', str(response.data)) with self.assertPdnsRequests(self.requests_desec_rr_sets_update(self.my_empty_domain.name)): response = self.client.post_rr_set( self.my_empty_domain.name, # record of length 500 bytes in chunks of max 255 each (RFC 4408) - # "AAA...AAA" "AAA...AAA" - **_create_data('"' + 'A' * 255 + '" "' + 'A' * 240 + '"') + ** self._create_test_txt_record(f'"{"A" * 255}" "{"A" * 240}"') ) self.assertStatus(response, status.HTTP_201_CREATED) - response = self.client.post_rr_set( - self.my_empty_domain.name, - # record of length 501 bytes in chunks of max 255 each (RFC 4408) - # "AAA...AAA" "AAA...AAA" - **_create_data('"' + 'A' * 255 + '" "' + 'A' * 241 + '"') - ) - self.assertStatus(response, status.HTTP_400_BAD_REQUEST) - self.assertIn('Ensure this field has no more than 500 characters.', str(response.data)) - def test_create_my_rr_sets_too_large_rrset(self): network = IPv4Network('127.0.0.0/20') # size: 4096 IP addresses data = {'records': [str(ip) for ip in network], 'ttl': 3600, 'type': 'A', 'subname': 'name'} @@ -293,7 +286,10 @@ def test_create_my_rr_sets_canonical_content(self): ('CNAME', ('', '')), ('DHCID', ('', '')), ('DLV', ('', '')), - ('DS', ('', '')), + ('DS', ('6454 8 2 5CBA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA1 0DF1F520', + '6454 8 2 5CBA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA10DF1F520'.lower())), + ('DS', ('6454 8 2 5C BA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA1 0DF1F520', + '6454 8 2 5CBA665A006F6487625C6218522F09BD3673C25FA10F25CB18459AA10DF1F520'.lower())), ('EUI48', ('', '')), ('EUI64', ('', '')), ('HINFO', ('', '')), @@ -305,7 +301,8 @@ def test_create_my_rr_sets_canonical_content(self): ('MAILB', ('', '')), ('MINFO', ('', '')), ('MR', ('', '')), - ('MX', ('', '')), + ('MX', ('10 010.1.1.1.', '10 010.1.1.1.')), + ('MX', ('010 010.1.1.2.', '10 010.1.1.2.')), ('NAPTR', ('', '')), ('NS', ('', '')), ('NSEC', ('', '')), @@ -313,13 +310,19 @@ def test_create_my_rr_sets_canonical_content(self): ('OPENPGPKEY', ('', '')), ('PTR', ('', '')), ('RP', ('', '')), - ('SPF', ('', '')), + ('SPF', ('foobar', '"foobar"')), + ('SPF', ('foo bar', '"foo" "bar"')), + ('SPF', ('"foo" "bar"', '"foo" "bar"')), + ('SPF', ('"foobar"', '"foobar"')), ('SRV', ('', '')), ('SSHFP', ('', '')), ('TKEY', ('', '')), ('TLSA', ('3 0001 1 000AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA', '3 1 1 000aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')), ('TSIG', ('', '')), - ('TXT', ('"foobar"', 'foobar')), + ('TXT', ('foobar', '"foobar"')), + ('TXT', ('foo bar', '"foo" "bar"')), + ('TXT', ('"foo" "bar"', '"foo" "bar"')), + ('TXT', ('"foobar"', '"foobar"')), ('URI', ('', '')), ('WKS', ('', '')), ] @@ -329,7 +332,10 @@ def test_create_my_rr_sets_canonical_content(self): data = {'records': [record], 'ttl': 3660, 'type': t, 'subname': ''} with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)): response = self.client.post_rr_set(self.my_empty_domain.name, **data) - self.assertContains(response, canonical_record, status_code=status.HTTP_201_CREATED) + self.assertStatus(response, status.HTTP_201_CREATED) + self.assertEqual(response.data['records'][0], canonical_record, + f'For RR set type {t}, expected \'{canonical_record}\' to be the canonical form of ' + f'\'{record}\', but saw \'{response.data["records"][0]}\'.') with self.assertPdnsRequests(self.requests_desec_rr_sets_update(name=self.my_empty_domain.name)): response = self.client.delete_rr_set(self.my_empty_domain.name, subname='', type_=t) self.assertStatus(response, status.HTTP_204_NO_CONTENT) @@ -354,7 +360,7 @@ def test_create_my_rr_sets_known_type_benign(self): 'IPSECKEY': [], 'KEY': [], 'KX': [], 'LOC': ['23 12 59.000 N 42 22 48.500 W 65.00m 20.00m 10.00m 10.00m'], 'MAILA': [], 'MAILB': [], 'MINFO': [], 'MR': [], - 'MX': ['10 example.com.'], + 'MX': ['10 example.com.', '20 1.1.1.1.'], 'NAPTR': [], 'NS': ['ns1.example.com.'], 'OPENPGPKEY': [], @@ -426,7 +432,12 @@ def test_create_my_rr_sets_known_type_invalid(self): def test_create_my_rr_sets_unknown_type(self): for _type in ['AA', 'ASDF'] + list(RR_SET_TYPES_AUTOMATIC | RR_SET_TYPES_UNSUPPORTED): response = self.client.post_rr_set(self.my_domain.name, records=['1234'], ttl=3660, type=_type) - self.assertStatus(response, status.HTTP_400_BAD_REQUEST) + self.assertContains( + response, + text='managed by the API' if _type in RR_SET_TYPES_AUTOMATIC else 'type is currently unsupported', + status_code=status.HTTP_400_BAD_REQUEST + ) + def test_create_my_rr_sets_insufficient_ttl(self): ttl = settings.MINIMUM_TTL_DEFAULT - 1