From 0a15be101758333bafce41f11189852d210f4194 Mon Sep 17 00:00:00 2001 From: Felix Fontein Date: Fri, 3 May 2024 22:25:39 +0200 Subject: [PATCH] Refactor time code, add tests, fix bug when parsing absolute timestamps that omit seconds (#745) * Add time module utils. * Add time helpers to ACME backend. * Add changelog fragment. * ACME timestamp parser: do not choke on nanoseconds. --- changelogs/fragments/745-absolute-time.yml | 2 + .../module_utils/acme/backend_cryptography.py | 42 ++- plugins/module_utils/acme/backends.py | 64 ++++ plugins/module_utils/acme/utils.py | 2 +- .../module_backends/certificate_entrust.py | 7 +- .../module_backends/certificate_info.py | 5 +- .../module_backends/certificate_ownca.py | 5 +- .../module_backends/certificate_selfsigned.py | 5 +- plugins/module_utils/crypto/support.py | 92 +---- plugins/module_utils/openssh/certificate.py | 18 +- plugins/module_utils/time.py | 171 ++++++++++ plugins/modules/acme_challenge_cert_helper.py | 8 +- plugins/modules/get_certificate.py | 8 +- plugins/modules/x509_certificate_info.py | 8 +- plugins/modules/x509_crl.py | 5 +- .../plugins/module_utils/acme/backend_data.py | 51 +++ .../acme/test_backend_cryptography.py | 29 ++ .../acme/test_backend_openssl_cli.py | 29 ++ tests/unit/plugins/module_utils/test_time.py | 323 ++++++++++++++++++ 19 files changed, 755 insertions(+), 119 deletions(-) create mode 100644 changelogs/fragments/745-absolute-time.yml create mode 100644 plugins/module_utils/time.py create mode 100644 tests/unit/plugins/module_utils/test_time.py diff --git a/changelogs/fragments/745-absolute-time.yml b/changelogs/fragments/745-absolute-time.yml new file mode 100644 index 000000000..75431c05e --- /dev/null +++ b/changelogs/fragments/745-absolute-time.yml @@ -0,0 +1,2 @@ +bugfixes: + - "x509_crl, x509_certificate, x509_certificate_info - when parsing absolute timestamps which omitted the second count, the first digit of the minutes was used as a one-digit minutes count, and the second digit of the minutes as a one-digit second count (https://github.com/ansible-collections/community.crypto/pull/745)." diff --git a/plugins/module_utils/acme/backend_cryptography.py b/plugins/module_utils/acme/backend_cryptography.py index e6e76b80d..9f7770aea 100644 --- a/plugins/module_utils/acme/backend_cryptography.py +++ b/plugins/module_utils/acme/backend_cryptography.py @@ -11,6 +11,7 @@ import base64 import binascii +import datetime import os import traceback @@ -21,6 +22,7 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.backends import ( CertificateInformation, CryptoBackend, + _parse_acme_timestamp, ) from ansible_collections.community.crypto.plugins.module_utils.acme.certificates import ( @@ -41,12 +43,6 @@ convert_int_to_hex, ) -from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( - get_now_datetime, - ensure_utc_timezone, - parse_name_field, -) - from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( CRYPTOGRAPHY_TIMEZONE, cryptography_name_to_oid, @@ -59,6 +55,18 @@ extract_first_pem, ) +from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( + parse_name_field, +) + +from ansible_collections.community.crypto.plugins.module_utils.time import ( + ensure_utc_timezone, + from_epoch_seconds, + get_epoch_seconds, + get_now_datetime, + UTC, +) + CRYPTOGRAPHY_MINIMAL_VERSION = '1.5' CRYPTOGRAPHY_ERROR = None @@ -173,6 +181,26 @@ class CryptographyBackend(CryptoBackend): def __init__(self, module): super(CryptographyBackend, self).__init__(module) + def get_now(self): + return get_now_datetime(with_timezone=CRYPTOGRAPHY_TIMEZONE) + + def parse_acme_timestamp(self, timestamp_str): + return _parse_acme_timestamp(timestamp_str, with_timezone=CRYPTOGRAPHY_TIMEZONE) + + def interpolate_timestamp(self, timestamp_start, timestamp_end, percentage): + start = get_epoch_seconds(timestamp_start) + end = get_epoch_seconds(timestamp_end) + return from_epoch_seconds(start + percentage * (end - start), with_timezone=CRYPTOGRAPHY_TIMEZONE) + + def get_utc_datetime(self, *args, **kwargs): + kwargs_ext = dict(kwargs) + if CRYPTOGRAPHY_TIMEZONE and ('tzinfo' not in kwargs_ext and len(args) < 8): + kwargs_ext['tzinfo'] = UTC + result = datetime.datetime(*args, **kwargs_ext) + if CRYPTOGRAPHY_TIMEZONE and ('tzinfo' in kwargs or len(args) >= 8): + result = ensure_utc_timezone(result) + return result + def parse_key(self, key_file=None, key_content=None, passphrase=None): ''' Parses an RSA or Elliptic Curve key file in PEM format and returns key_data. @@ -379,7 +407,7 @@ def get_cert_days(self, cert_filename=None, cert_content=None, now=None): raise BackendException('Cannot parse certificate {0}: {1}'.format(cert_filename, e)) if now is None: - now = get_now_datetime(with_timezone=CRYPTOGRAPHY_TIMEZONE) + now = self.get_now() elif CRYPTOGRAPHY_TIMEZONE: now = ensure_utc_timezone(now) return (get_not_valid_after(cert) - now).days diff --git a/plugins/module_utils/acme/backends.py b/plugins/module_utils/acme/backends.py index 78ff0f181..421c595ad 100644 --- a/plugins/module_utils/acme/backends.py +++ b/plugins/module_utils/acme/backends.py @@ -11,6 +11,8 @@ from collections import namedtuple import abc +import datetime +import re from ansible.module_utils import six @@ -18,6 +20,14 @@ BackendException, ) +from ansible_collections.community.crypto.plugins.module_utils.time import ( + ensure_utc_timezone, + from_epoch_seconds, + get_epoch_seconds, + get_now_datetime, + remove_timezone, +) + CertificateInformation = namedtuple( 'CertificateInformation', @@ -31,11 +41,65 @@ ) +_FRACTIONAL_MATCHER = re.compile(r'^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})(|\.\d+)(Z|[+-]\d{2}:?\d{2}.*)$') + + +def _reduce_fractional_digits(timestamp_str): + """ + Given a RFC 3339 timestamp that includes too many digits for the fractional seconds part, reduces these to at most 6. + """ + # RFC 3339 (https://www.rfc-editor.org/info/rfc3339) + m = _FRACTIONAL_MATCHER.match(timestamp_str) + if not m: + raise BackendException('Cannot parse ISO 8601 timestamp {0!r}'.format(timestamp_str)) + timestamp, fractional, timezone = m.groups() + if len(fractional) > 7: + # Python does not support anything smaller than microseconds + # (Golang supports nanoseconds, Boulder often emits more fractional digits, which Python chokes on) + fractional = fractional[:7] + return '%s%s%s' % (timestamp, fractional, timezone) + + +def _parse_acme_timestamp(timestamp_str, with_timezone): + """ + Parses a RFC 3339 timestamp. + """ + # RFC 3339 (https://www.rfc-editor.org/info/rfc3339) + timestamp_str = _reduce_fractional_digits(timestamp_str) + for format in ('%Y-%m-%dT%H:%M:%SZ', '%Y-%m-%dT%H:%M:%S.%fZ', '%Y-%m-%dT%H:%M:%S%z', '%Y-%m-%dT%H:%M:%S.%f%z'): + # Note that %z won't work with Python 2... https://stackoverflow.com/a/27829491 + try: + result = datetime.datetime.strptime(timestamp_str, format) + except ValueError: + pass + else: + return ensure_utc_timezone(result) if with_timezone else remove_timezone(result) + raise BackendException('Cannot parse ISO 8601 timestamp {0!r}'.format(timestamp_str)) + + @six.add_metaclass(abc.ABCMeta) class CryptoBackend(object): def __init__(self, module): self.module = module + def get_now(self): + return get_now_datetime(with_timezone=False) + + def parse_acme_timestamp(self, timestamp_str): + # RFC 3339 (https://www.rfc-editor.org/info/rfc3339) + return _parse_acme_timestamp(timestamp_str, with_timezone=False) + + def interpolate_timestamp(self, timestamp_start, timestamp_end, percentage): + start = get_epoch_seconds(timestamp_start) + end = get_epoch_seconds(timestamp_end) + return from_epoch_seconds(start + percentage * (end - start), with_timezone=False) + + def get_utc_datetime(self, *args, **kwargs): + result = datetime.datetime(*args, **kwargs) + if 'tzinfo' in kwargs or len(args) >= 8: + result = remove_timezone(result) + return result + @abc.abstractmethod def parse_key(self, key_file=None, key_content=None, passphrase=None): ''' diff --git a/plugins/module_utils/acme/utils.py b/plugins/module_utils/acme/utils.py index 9179da191..e0e9ef7f1 100644 --- a/plugins/module_utils/acme/utils.py +++ b/plugins/module_utils/acme/utils.py @@ -22,7 +22,7 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.math import convert_int_to_bytes -from ansible_collections.community.crypto.plugins.module_utils.crypto.support import get_now_datetime +from ansible_collections.community.crypto.plugins.module_utils.time import get_now_datetime def nopad_b64(data): diff --git a/plugins/module_utils/crypto/module_backends/certificate_entrust.py b/plugins/module_utils/crypto/module_backends/certificate_entrust.py index 7dc4641e1..38c1046fa 100644 --- a/plugins/module_utils/crypto/module_backends/certificate_entrust.py +++ b/plugins/module_utils/crypto/module_backends/certificate_entrust.py @@ -18,8 +18,6 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( load_certificate, - get_now_datetime, - get_relative_time_option, ) from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( @@ -34,6 +32,11 @@ CertificateProvider, ) +from ansible_collections.community.crypto.plugins.module_utils.time import ( + get_now_datetime, + get_relative_time_option, +) + try: from cryptography.x509.oid import NameOID except ImportError: diff --git a/plugins/module_utils/crypto/module_backends/certificate_info.py b/plugins/module_utils/crypto/module_backends/certificate_info.py index 5db6c3586..b612f8b18 100644 --- a/plugins/module_utils/crypto/module_backends/certificate_info.py +++ b/plugins/module_utils/crypto/module_backends/certificate_info.py @@ -23,7 +23,6 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( load_certificate, get_fingerprint_of_bytes, - get_now_datetime, ) from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( @@ -40,6 +39,10 @@ get_publickey_info, ) +from ansible_collections.community.crypto.plugins.module_utils.time import ( + get_now_datetime, +) + MINIMAL_CRYPTOGRAPHY_VERSION = '1.6' CRYPTOGRAPHY_IMP_ERR = None diff --git a/plugins/module_utils/crypto/module_backends/certificate_ownca.py b/plugins/module_utils/crypto/module_backends/certificate_ownca.py index 4d312e6b7..cd4b37340 100644 --- a/plugins/module_utils/crypto/module_backends/certificate_ownca.py +++ b/plugins/module_utils/crypto/module_backends/certificate_ownca.py @@ -22,7 +22,6 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( load_privatekey, load_certificate, - get_relative_time_option, select_message_digest, ) @@ -44,6 +43,10 @@ CertificateProvider, ) +from ansible_collections.community.crypto.plugins.module_utils.time import ( + get_relative_time_option, +) + try: import cryptography from cryptography import x509 diff --git a/plugins/module_utils/crypto/module_backends/certificate_selfsigned.py b/plugins/module_utils/crypto/module_backends/certificate_selfsigned.py index edd8d8d77..d8914853d 100644 --- a/plugins/module_utils/crypto/module_backends/certificate_selfsigned.py +++ b/plugins/module_utils/crypto/module_backends/certificate_selfsigned.py @@ -14,7 +14,6 @@ from random import randrange from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( - get_relative_time_option, select_message_digest, ) @@ -34,6 +33,10 @@ CertificateProvider, ) +from ansible_collections.community.crypto.plugins.module_utils.time import ( + get_relative_time_option, +) + try: import cryptography from cryptography import x509 diff --git a/plugins/module_utils/crypto/support.py b/plugins/module_utils/crypto/support.py index 8b59a3b70..862f5b8fc 100644 --- a/plugins/module_utils/crypto/support.py +++ b/plugins/module_utils/crypto/support.py @@ -9,19 +9,25 @@ import abc -import datetime import errno import hashlib import os -import re from ansible.module_utils import six -from ansible.module_utils.common.text.converters import to_native, to_bytes +from ansible.module_utils.common.text.converters import to_bytes from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import ( identify_pem_format, ) +from ansible_collections.community.crypto.plugins.module_utils.time import ( # noqa: F401, pylint: disable=unused-import + # These imports are for backwards compatibility + get_now_datetime, + ensure_utc_timezone, + convert_relative_to_datetime, + get_relative_time_option, +) + try: from OpenSSL import crypto HAS_PYOPENSSL = True @@ -279,86 +285,6 @@ def parse_ordered_name_field(input_list, name_field_name): return result -def get_now_datetime(with_timezone): - if with_timezone: - return datetime.datetime.now(tz=datetime.timezone.utc) - return datetime.datetime.utcnow() - - -def ensure_utc_timezone(timestamp): - if timestamp.tzinfo is not None: - return timestamp - return timestamp.astimezone(datetime.timezone.utc) - - -def convert_relative_to_datetime(relative_time_string, with_timezone=False): - """Get a datetime.datetime or None from a string in the time format described in sshd_config(5)""" - - parsed_result = re.match( - r"^(?P[+-])((?P\d+)[wW])?((?P\d+)[dD])?((?P\d+)[hH])?((?P\d+)[mM])?((?P\d+)[sS]?)?$", - relative_time_string) - - if parsed_result is None or len(relative_time_string) == 1: - # not matched or only a single "+" or "-" - return None - - offset = datetime.timedelta(0) - if parsed_result.group("weeks") is not None: - offset += datetime.timedelta(weeks=int(parsed_result.group("weeks"))) - if parsed_result.group("days") is not None: - offset += datetime.timedelta(days=int(parsed_result.group("days"))) - if parsed_result.group("hours") is not None: - offset += datetime.timedelta(hours=int(parsed_result.group("hours"))) - if parsed_result.group("minutes") is not None: - offset += datetime.timedelta( - minutes=int(parsed_result.group("minutes"))) - if parsed_result.group("seconds") is not None: - offset += datetime.timedelta( - seconds=int(parsed_result.group("seconds"))) - - now = get_now_datetime(with_timezone=with_timezone) - if parsed_result.group("prefix") == "+": - return now + offset - else: - return now - offset - - -def get_relative_time_option(input_string, input_name, backend='cryptography', with_timezone=False): - """Return an absolute timespec if a relative timespec or an ASN1 formatted - string is provided. - - The return value will be a datetime object for the cryptography backend, - and a ASN1 formatted string for the pyopenssl backend.""" - result = to_native(input_string) - if result is None: - raise OpenSSLObjectError( - 'The timespec "%s" for %s is not valid' % - input_string, input_name) - # Relative time - if result.startswith("+") or result.startswith("-"): - result_datetime = convert_relative_to_datetime(result, with_timezone=with_timezone) - if backend == 'pyopenssl': - return result_datetime.strftime("%Y%m%d%H%M%SZ") - elif backend == 'cryptography': - return result_datetime - # Absolute time - if backend == 'cryptography': - for date_fmt in ['%Y%m%d%H%M%SZ', '%Y%m%d%H%MZ', '%Y%m%d%H%M%S%z', '%Y%m%d%H%M%z']: - try: - res = datetime.datetime.strptime(result, date_fmt) - except ValueError: - pass - else: - if with_timezone: - res = res.astimezone(datetime.timezone.utc) - return res - - raise OpenSSLObjectError( - 'The time spec "%s" for %s is invalid' % - (input_string, input_name) - ) - - def select_message_digest(digest_string): digest = None if digest_string == 'sha256': diff --git a/plugins/module_utils/openssh/certificate.py b/plugins/module_utils/openssh/certificate.py index f59766651..8efb2ad9c 100644 --- a/plugins/module_utils/openssh/certificate.py +++ b/plugins/module_utils/openssh/certificate.py @@ -31,11 +31,15 @@ from ansible.module_utils import six from ansible.module_utils.common.text.converters import to_text -from ansible_collections.community.crypto.plugins.module_utils.crypto.support import convert_relative_to_datetime from ansible_collections.community.crypto.plugins.module_utils.openssh.utils import ( OpensshParser, _OpensshWriter, ) +from ansible_collections.community.crypto.plugins.module_utils.time import ( + add_or_remove_timezone as _add_or_remove_timezone, + convert_relative_to_datetime, + UTC as _UTC, +) # See https://cvsweb.openbsd.org/src/usr.bin/ssh/PROTOCOL.certkeys?annotate=HEAD _USER_TYPE = 1 @@ -66,14 +70,8 @@ _USE_TIMEZONE = sys.version_info >= (3, 6) -def _ensure_utc_timezone_if_use_timezone(value): - if not _USE_TIMEZONE or value.tzinfo is not None: - return value - return value.astimezone(_datetime.timezone.utc) - - -_ALWAYS = _ensure_utc_timezone_if_use_timezone(datetime(1970, 1, 1)) -_FOREVER = datetime(9999, 12, 31, 23, 59, 59, 999999, _datetime.timezone.utc) if _USE_TIMEZONE else datetime.max +_ALWAYS = _add_or_remove_timezone(datetime(1970, 1, 1), with_timezone=_USE_TIMEZONE) +_FOREVER = datetime(9999, 12, 31, 23, 59, 59, 999999, _UTC) if _USE_TIMEZONE else datetime.max _CRITICAL_OPTIONS = ( 'force-command', @@ -198,7 +196,7 @@ def _time_string_to_datetime(time_string): else: for time_format in ("%Y-%m-%d", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"): try: - result = _ensure_utc_timezone_if_use_timezone(datetime.strptime(time_string, time_format)) + result = _add_or_remove_timezone(datetime.strptime(time_string, time_format), with_timezone=_USE_TIMEZONE) except ValueError: pass if result is None: diff --git a/plugins/module_utils/time.py b/plugins/module_utils/time.py new file mode 100644 index 000000000..4adc4620e --- /dev/null +++ b/plugins/module_utils/time.py @@ -0,0 +1,171 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2024, Felix Fontein +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +import datetime +import re +import sys + +from ansible.module_utils.common.text.converters import to_native + +from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( + OpenSSLObjectError, +) + + +try: + UTC = datetime.timezone.utc +except AttributeError: + _DURATION_ZERO = datetime.timedelta(0) + + class _UTCClass(datetime.tzinfo): + def utcoffset(self, dt): + return _DURATION_ZERO + + def dst(self, dt): + return _DURATION_ZERO + + def tzname(self, dt): + return 'UTC' + + def fromutc(self, dt): + return dt + + def __repr__(self): + return 'UTC' + + UTC = _UTCClass() + + +def get_now_datetime(with_timezone): + if with_timezone: + return datetime.datetime.now(tz=UTC) + return datetime.datetime.utcnow() + + +def ensure_utc_timezone(timestamp): + if timestamp.tzinfo is UTC: + return timestamp + if timestamp.tzinfo is None: + # We assume that naive datetime objects use timezone UTC! + return timestamp.replace(tzinfo=UTC) + return timestamp.astimezone(UTC) + + +def remove_timezone(timestamp): + # Convert to native datetime object + if timestamp.tzinfo is None: + return timestamp + if timestamp.tzinfo is not UTC: + timestamp = timestamp.astimezone(UTC) + return timestamp.replace(tzinfo=None) + + +def add_or_remove_timezone(timestamp, with_timezone): + return ensure_utc_timezone(timestamp) if with_timezone else remove_timezone(timestamp) + + +if sys.version_info < (3, 3): + def get_epoch_seconds(timestamp): + epoch = datetime.datetime(1970, 1, 1, tzinfo=UTC if timestamp.tzinfo is not None else None) + delta = timestamp - epoch + try: + return delta.total_seconds() + except AttributeError: + # Python 2.6 and earlier: total_seconds() does not yet exist, so we use the formula from + # https://docs.python.org/2/library/datetime.html#datetime.timedelta.total_seconds + return (delta.microseconds + (delta.seconds + delta.days * 24 * 3600) * 10**6) / 10**6 +else: + def get_epoch_seconds(timestamp): + return timestamp.timestamp() + + +def from_epoch_seconds(timestamp, with_timezone): + if with_timezone: + return datetime.datetime.fromtimestamp(timestamp, UTC) + return datetime.datetime.utcfromtimestamp(timestamp) + + +def convert_relative_to_datetime(relative_time_string, with_timezone=False, now=None): + """Get a datetime.datetime or None from a string in the time format described in sshd_config(5)""" + + parsed_result = re.match( + r"^(?P[+-])((?P\d+)[wW])?((?P\d+)[dD])?((?P\d+)[hH])?((?P\d+)[mM])?((?P\d+)[sS]?)?$", + relative_time_string) + + if parsed_result is None or len(relative_time_string) == 1: + # not matched or only a single "+" or "-" + return None + + offset = datetime.timedelta(0) + if parsed_result.group("weeks") is not None: + offset += datetime.timedelta(weeks=int(parsed_result.group("weeks"))) + if parsed_result.group("days") is not None: + offset += datetime.timedelta(days=int(parsed_result.group("days"))) + if parsed_result.group("hours") is not None: + offset += datetime.timedelta(hours=int(parsed_result.group("hours"))) + if parsed_result.group("minutes") is not None: + offset += datetime.timedelta( + minutes=int(parsed_result.group("minutes"))) + if parsed_result.group("seconds") is not None: + offset += datetime.timedelta( + seconds=int(parsed_result.group("seconds"))) + + if now is None: + now = get_now_datetime(with_timezone=with_timezone) + else: + now = add_or_remove_timezone(now, with_timezone=with_timezone) + + if parsed_result.group("prefix") == "+": + return now + offset + else: + return now - offset + + +def get_relative_time_option(input_string, input_name, backend='cryptography', with_timezone=False, now=None): + """Return an absolute timespec if a relative timespec or an ASN1 formatted + string is provided. + + The return value will be a datetime object for the cryptography backend, + and a ASN1 formatted string for the pyopenssl backend.""" + result = to_native(input_string) + if result is None: + raise OpenSSLObjectError( + 'The timespec "%s" for %s is not valid' % + input_string, input_name) + # Relative time + if result.startswith("+") or result.startswith("-"): + result_datetime = convert_relative_to_datetime(result, with_timezone=with_timezone, now=now) + if backend == 'pyopenssl': + return result_datetime.strftime("%Y%m%d%H%M%SZ") + elif backend == 'cryptography': + return result_datetime + # Absolute time + if backend == 'pyopenssl': + return input_string + elif backend == 'cryptography': + for date_fmt, length in [ + ('%Y%m%d%H%M%SZ', 15), # this also parses '202401020304Z', but as datetime(2024, 1, 2, 3, 0, 4) + ('%Y%m%d%H%MZ', 13), + ('%Y%m%d%H%M%S%z', 14 + 5), # this also parses '202401020304+0000', but as datetime(2024, 1, 2, 3, 0, 4, tzinfo=...) + ('%Y%m%d%H%M%z', 12 + 5), + ]: + if len(result) != length: + continue + try: + res = datetime.datetime.strptime(result, date_fmt) + except ValueError: + pass + else: + return add_or_remove_timezone(res, with_timezone=with_timezone) + + raise OpenSSLObjectError( + 'The time spec "%s" for %s is invalid' % + (input_string, input_name) + ) diff --git a/plugins/modules/acme_challenge_cert_helper.py b/plugins/modules/acme_challenge_cert_helper.py index 48b65f998..edd2c3331 100644 --- a/plugins/modules/acme_challenge_cert_helper.py +++ b/plugins/modules/acme_challenge_cert_helper.py @@ -165,16 +165,16 @@ read_file, ) -from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( - get_now_datetime, -) - from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( CRYPTOGRAPHY_TIMEZONE, set_not_valid_after, set_not_valid_before, ) +from ansible_collections.community.crypto.plugins.module_utils.time import ( + get_now_datetime, +) + CRYPTOGRAPHY_IMP_ERR = None try: import cryptography diff --git a/plugins/modules/get_certificate.py b/plugins/modules/get_certificate.py index 6ae9439d3..d4b38afbd 100644 --- a/plugins/modules/get_certificate.py +++ b/plugins/modules/get_certificate.py @@ -220,10 +220,6 @@ from ansible_collections.community.crypto.plugins.module_utils.version import LooseVersion -from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( - get_now_datetime, -) - from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( CRYPTOGRAPHY_TIMEZONE, cryptography_oid_to_name, @@ -232,6 +228,10 @@ get_not_valid_before, ) +from ansible_collections.community.crypto.plugins.module_utils.time import ( + get_now_datetime, +) + MINIMAL_CRYPTOGRAPHY_VERSION = '1.6' CREATE_DEFAULT_CONTEXT_IMP_ERR = None diff --git a/plugins/modules/x509_certificate_info.py b/plugins/modules/x509_certificate_info.py index 8379937f7..9e8c20e29 100644 --- a/plugins/modules/x509_certificate_info.py +++ b/plugins/modules/x509_certificate_info.py @@ -406,10 +406,6 @@ OpenSSLObjectError, ) -from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( - get_relative_time_option, -) - from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( CRYPTOGRAPHY_TIMEZONE, ) @@ -418,6 +414,10 @@ select_backend, ) +from ansible_collections.community.crypto.plugins.module_utils.time import ( + get_relative_time_option, +) + def main(): module = AnsibleModule( diff --git a/plugins/modules/x509_crl.py b/plugins/modules/x509_crl.py index 527975b88..f8eb8d85e 100644 --- a/plugins/modules/x509_crl.py +++ b/plugins/modules/x509_crl.py @@ -470,7 +470,6 @@ load_certificate, parse_name_field, parse_ordered_name_field, - get_relative_time_option, select_message_digest, ) @@ -506,6 +505,10 @@ get_crl_info, ) +from ansible_collections.community.crypto.plugins.module_utils.time import ( + get_relative_time_option, +) + MINIMAL_CRYPTOGRAPHY_VERSION = '1.2' CRYPTOGRAPHY_IMP_ERR = None diff --git a/tests/unit/plugins/module_utils/acme/backend_data.py b/tests/unit/plugins/module_utils/acme/backend_data.py index 31e0ef006..d508f006f 100644 --- a/tests/unit/plugins/module_utils/acme/backend_data.py +++ b/tests/unit/plugins/module_utils/acme/backend_data.py @@ -9,6 +9,7 @@ import base64 import datetime import os +import sys from ansible_collections.community.crypto.plugins.module_utils.acme.backends import ( CertificateInformation, @@ -107,6 +108,56 @@ def load_fixture(name): ] +TEST_PARSE_ACME_TIMESTAMP = [ + ( + '2024-01-01T00:11:22Z', + dict(year=2024, month=1, day=1, hour=0, minute=11, second=22), + ), + ( + '2024-01-01T00:11:22.123Z', + dict(year=2024, month=1, day=1, hour=0, minute=11, second=22, microsecond=123000), + ), + ( + '2024-04-17T06:54:13.333333334Z', + dict(year=2024, month=4, day=17, hour=6, minute=54, second=13, microsecond=333333), + ), +] + +if sys.version_info >= (3, 5): + TEST_PARSE_ACME_TIMESTAMP.extend([ + ( + '2024-01-01T00:11:22+0100', + dict(year=2023, month=12, day=31, hour=23, minute=11, second=22), + ), + ( + '2024-01-01T00:11:22.123+0100', + dict(year=2023, month=12, day=31, hour=23, minute=11, second=22, microsecond=123000), + ), + ]) + + +TEST_INTERPOLATE_TIMESTAMP = [ + ( + dict(year=2024, month=1, day=1, hour=0, minute=0, second=0), + dict(year=2024, month=1, day=1, hour=1, minute=0, second=0), + 0.0, + dict(year=2024, month=1, day=1, hour=0, minute=0, second=0), + ), + ( + dict(year=2024, month=1, day=1, hour=0, minute=0, second=0), + dict(year=2024, month=1, day=1, hour=1, minute=0, second=0), + 0.5, + dict(year=2024, month=1, day=1, hour=0, minute=30, second=0), + ), + ( + dict(year=2024, month=1, day=1, hour=0, minute=0, second=0), + dict(year=2024, month=1, day=1, hour=1, minute=0, second=0), + 1.0, + dict(year=2024, month=1, day=1, hour=1, minute=0, second=0), + ), +] + + class FakeBackend(CryptoBackend): def parse_key(self, key_file=None, key_content=None, passphrase=None): raise BackendException('Not implemented in fake backend') diff --git a/tests/unit/plugins/module_utils/acme/test_backend_cryptography.py b/tests/unit/plugins/module_utils/acme/test_backend_cryptography.py index c3b713ee6..9186e2430 100644 --- a/tests/unit/plugins/module_utils/acme/test_backend_cryptography.py +++ b/tests/unit/plugins/module_utils/acme/test_backend_cryptography.py @@ -30,6 +30,8 @@ TEST_CERT, TEST_CERT_DAYS, TEST_CERT_INFO, + TEST_PARSE_ACME_TIMESTAMP, + TEST_INTERPOLATE_TIMESTAMP, ) @@ -92,3 +94,30 @@ def test_get_cert_information(cert_content, expected_cert_info, openssl_output, assert cert_info == expected_cert_info cert_info = backend.get_cert_information(cert_content=cert_content) assert cert_info == expected_cert_info + + +def test_now(): + module = MagicMock() + backend = CryptographyBackend(module) + now = backend.get_now() + assert CRYPTOGRAPHY_TIMEZONE == (now.tzinfo is not None) + + +@pytest.mark.parametrize("input, expected", TEST_PARSE_ACME_TIMESTAMP) +def test_parse_acme_timestamp(input, expected): + module = MagicMock() + backend = CryptographyBackend(module) + ts_expected = backend.get_utc_datetime(**expected) + timestamp = backend.parse_acme_timestamp(input) + assert ts_expected == timestamp + + +@pytest.mark.parametrize("start, end, percentage, expected", TEST_INTERPOLATE_TIMESTAMP) +def test_interpolate_timestamp(start, end, percentage, expected): + module = MagicMock() + backend = CryptographyBackend(module) + ts_start = backend.get_utc_datetime(**start) + ts_end = backend.get_utc_datetime(**end) + ts_expected = backend.get_utc_datetime(**expected) + timestamp = backend.interpolate_timestamp(ts_start, ts_end, percentage) + assert ts_expected == timestamp diff --git a/tests/unit/plugins/module_utils/acme/test_backend_openssl_cli.py b/tests/unit/plugins/module_utils/acme/test_backend_openssl_cli.py index c0a108611..5138a6202 100644 --- a/tests/unit/plugins/module_utils/acme/test_backend_openssl_cli.py +++ b/tests/unit/plugins/module_utils/acme/test_backend_openssl_cli.py @@ -22,6 +22,8 @@ TEST_CERT_OPENSSL_OUTPUT, TEST_CERT_DAYS, TEST_CERT_INFO, + TEST_PARSE_ACME_TIMESTAMP, + TEST_INTERPOLATE_TIMESTAMP, ) @@ -91,3 +93,30 @@ def test_get_cert_information(cert_content, expected_cert_info, openssl_output, assert cert_info == expected_cert_info cert_info = backend.get_cert_information(cert_content=cert_content) assert cert_info == expected_cert_info + + +def test_now(): + module = MagicMock() + backend = OpenSSLCLIBackend(module, openssl_binary='openssl') + now = backend.get_now() + assert now.tzinfo is None + + +@pytest.mark.parametrize("input, expected", TEST_PARSE_ACME_TIMESTAMP) +def test_parse_acme_timestamp(input, expected): + module = MagicMock() + backend = OpenSSLCLIBackend(module, openssl_binary='openssl') + ts_expected = backend.get_utc_datetime(**expected) + timestamp = backend.parse_acme_timestamp(input) + assert ts_expected == timestamp + + +@pytest.mark.parametrize("start, end, percentage, expected", TEST_INTERPOLATE_TIMESTAMP) +def test_interpolate_timestamp(start, end, percentage, expected): + module = MagicMock() + backend = OpenSSLCLIBackend(module, openssl_binary='openssl') + ts_start = backend.get_utc_datetime(**start) + ts_end = backend.get_utc_datetime(**end) + ts_expected = backend.get_utc_datetime(**expected) + timestamp = backend.interpolate_timestamp(ts_start, ts_end, percentage) + assert ts_expected == timestamp diff --git a/tests/unit/plugins/module_utils/test_time.py b/tests/unit/plugins/module_utils/test_time.py new file mode 100644 index 000000000..35a83f4e4 --- /dev/null +++ b/tests/unit/plugins/module_utils/test_time.py @@ -0,0 +1,323 @@ +# Copyright (c) Ansible Project +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +import datetime +import sys + +import pytest + + +from ansible_collections.community.crypto.plugins.module_utils.time import ( + add_or_remove_timezone, + get_now_datetime, + convert_relative_to_datetime, + ensure_utc_timezone, + from_epoch_seconds, + get_epoch_seconds, + get_relative_time_option, + remove_timezone, + UTC, +) + + +TEST_REMOVE_TIMEZONE = [ + ( + datetime.datetime(2024, 1, 1, 0, 1, 2, tzinfo=UTC), + datetime.datetime(2024, 1, 1, 0, 1, 2), + ), + ( + datetime.datetime(2024, 1, 1, 0, 1, 2), + datetime.datetime(2024, 1, 1, 0, 1, 2), + ), +] + +TEST_UTC_TIMEZONE = [ + ( + datetime.datetime(2024, 1, 1, 0, 1, 2), + datetime.datetime(2024, 1, 1, 0, 1, 2, tzinfo=UTC), + ), + ( + datetime.datetime(2024, 1, 1, 0, 1, 2, tzinfo=UTC), + datetime.datetime(2024, 1, 1, 0, 1, 2, tzinfo=UTC), + ), +] + +TEST_EPOCH_SECONDS = [ + (0, dict(year=1970, day=1, month=1, hour=0, minute=0, second=0, microsecond=0)), + (1E-6, dict(year=1970, day=1, month=1, hour=0, minute=0, second=0, microsecond=1)), + (1E-3, dict(year=1970, day=1, month=1, hour=0, minute=0, second=0, microsecond=1000)), + (3691.2, dict(year=1970, day=1, month=1, hour=1, minute=1, second=31, microsecond=200000)), +] + +TEST_EPOCH_TO_SECONDS = [ + (datetime.datetime(1970, 1, 1, 0, 1, 2, 0), 62), + (datetime.datetime(1970, 1, 1, 0, 1, 2, 0, tzinfo=UTC), 62), +] + +TEST_CONVERT_RELATIVE_TO_DATETIME = [ + ( + '+0', + False, + datetime.datetime(2024, 1, 1, 0, 0, 0), + datetime.datetime(2024, 1, 1, 0, 0, 0), + ), + ( + '+1s', + False, + datetime.datetime(2024, 1, 1, 0, 0, 0, tzinfo=UTC), + datetime.datetime(2024, 1, 1, 0, 0, 1), + ), + ( + '-10w20d30h40m50s', + False, + datetime.datetime(2024, 1, 1, 0, 0, 0, tzinfo=UTC), + datetime.datetime(2023, 10, 1, 17, 19, 10), + ), + ( + '+0', + True, + datetime.datetime(2024, 1, 1, 0, 0, 0), + datetime.datetime(2024, 1, 1, 0, 0, 0, tzinfo=UTC), + ), + ( + '+1s', + True, + datetime.datetime(2024, 1, 1, 0, 0, 0, tzinfo=UTC), + datetime.datetime(2024, 1, 1, 0, 0, 1, tzinfo=UTC), + ), + ( + '-10w20d30h40m50s', + True, + datetime.datetime(2024, 1, 1, 0, 0, 0), + datetime.datetime(2023, 10, 1, 17, 19, 10, tzinfo=UTC), + ), +] + +TEST_GET_RELATIVE_TIME_OPTION = [ + ( + '+1d2h3m4s', + 'foo', + 'cryptography', + False, + datetime.datetime(2024, 1, 1, 0, 0, 0), + datetime.datetime(2024, 1, 2, 2, 3, 4), + ), + ( + '-1w10d24h', + 'foo', + 'cryptography', + False, + datetime.datetime(2024, 1, 1, 0, 0, 0), + datetime.datetime(2023, 12, 14, 0, 0, 0), + ), + ( + '20240102040506Z', + 'foo', + 'cryptography', + False, + datetime.datetime(2024, 1, 1, 0, 0, 0), + datetime.datetime(2024, 1, 2, 4, 5, 6), + ), + ( + '202401020405Z', + 'foo', + 'cryptography', + False, + datetime.datetime(2024, 1, 1, 0, 0, 0), + datetime.datetime(2024, 1, 2, 4, 5, 0), + ), + ( + '+1d2h3m4s', + 'foo', + 'cryptography', + True, + datetime.datetime(2024, 1, 1, 0, 0, 0), + datetime.datetime(2024, 1, 2, 2, 3, 4, tzinfo=UTC), + ), + ( + '-1w10d24h', + 'foo', + 'cryptography', + True, + datetime.datetime(2024, 1, 1, 0, 0, 0), + datetime.datetime(2023, 12, 14, 0, 0, 0, tzinfo=UTC), + ), + ( + '20240102040506Z', + 'foo', + 'cryptography', + True, + datetime.datetime(2024, 1, 1, 0, 0, 0), + datetime.datetime(2024, 1, 2, 4, 5, 6, tzinfo=UTC), + ), + ( + '202401020405Z', + 'foo', + 'cryptography', + True, + datetime.datetime(2024, 1, 1, 0, 0, 0), + datetime.datetime(2024, 1, 2, 4, 5, 0, tzinfo=UTC), + ), + ( + '+1d2h3m4s', + 'foo', + 'pyopenssl', + False, + datetime.datetime(2024, 1, 1, 0, 0, 0), + '20240102020304Z', + ), + ( + '-1w10d24h', + 'foo', + 'pyopenssl', + False, + datetime.datetime(2024, 1, 1, 0, 0, 0), + '20231214000000Z', + ), + ( + '20240102040506Z', + 'foo', + 'pyopenssl', + False, + datetime.datetime(2024, 1, 1, 0, 0, 0), + '20240102040506Z', + ), + ( + '202401020405Z', + 'foo', + 'pyopenssl', + False, + datetime.datetime(2024, 1, 1, 0, 0, 0), + '202401020405Z', + ), +] + + +if sys.version_info >= (3, 5): + ONE_HOUR_PLUS = datetime.timezone(datetime.timedelta(hours=1)) + + TEST_REMOVE_TIMEZONE.extend([ + ( + datetime.datetime(2024, 1, 1, 0, 1, 2, tzinfo=ONE_HOUR_PLUS), + datetime.datetime(2023, 12, 31, 23, 1, 2), + ), + ]) + TEST_UTC_TIMEZONE.extend([ + ( + datetime.datetime(2024, 1, 1, 0, 1, 2, tzinfo=ONE_HOUR_PLUS), + datetime.datetime(2023, 12, 31, 23, 1, 2, tzinfo=UTC), + ), + ]) + TEST_EPOCH_TO_SECONDS.extend([ + (datetime.datetime(1970, 1, 1, 0, 1, 2, 0, tzinfo=ONE_HOUR_PLUS), 62 - 3600), + ]) + TEST_GET_RELATIVE_TIME_OPTION.extend([ + ( + '20240102040506+0100', + 'foo', + 'cryptography', + False, + datetime.datetime(2024, 1, 1, 0, 0, 0), + datetime.datetime(2024, 1, 2, 3, 5, 6), + ), + ( + '202401020405+0100', + 'foo', + 'cryptography', + False, + datetime.datetime(2024, 1, 1, 0, 0, 0), + datetime.datetime(2024, 1, 2, 3, 5, 0), + ), + ( + '20240102040506+0100', + 'foo', + 'cryptography', + True, + datetime.datetime(2024, 1, 1, 0, 0, 0), + datetime.datetime(2024, 1, 2, 3, 5, 6, tzinfo=UTC), + ), + ( + '202401020405+0100', + 'foo', + 'cryptography', + True, + datetime.datetime(2024, 1, 1, 0, 0, 0), + datetime.datetime(2024, 1, 2, 3, 5, 0, tzinfo=UTC), + ), + ( + '20240102040506+0100', + 'foo', + 'pyopenssl', + False, + datetime.datetime(2024, 1, 1, 0, 0, 0), + '20240102040506+0100', + ), + ( + '202401020405+0100', + 'foo', + 'pyopenssl', + False, + datetime.datetime(2024, 1, 1, 0, 0, 0), + '202401020405+0100', + ), + ]) + + +@pytest.mark.parametrize("input, expected", TEST_REMOVE_TIMEZONE) +def test_remove_timezone(input, expected): + output_1 = remove_timezone(input) + assert expected == output_1 + output_2 = add_or_remove_timezone(input, with_timezone=False) + assert expected == output_2 + + +@pytest.mark.parametrize("input, expected", TEST_UTC_TIMEZONE) +def test_utc_timezone(input, expected): + output_1 = ensure_utc_timezone(input) + assert expected == output_1 + output_2 = add_or_remove_timezone(input, with_timezone=True) + assert expected == output_2 + + +def test_get_now_datetime(): + output_1 = get_now_datetime(with_timezone=False) + assert output_1.tzinfo is None + output_2 = get_now_datetime(with_timezone=True) + assert output_2.tzinfo is not None + assert output_2.tzinfo == UTC + + +@pytest.mark.parametrize("seconds, timestamp", TEST_EPOCH_SECONDS) +def test_epoch_seconds(seconds, timestamp): + ts_wo_tz = datetime.datetime(**timestamp) + assert seconds == get_epoch_seconds(ts_wo_tz) + timestamp_w_tz = dict(timestamp) + timestamp_w_tz['tzinfo'] = UTC + ts_w_tz = datetime.datetime(**timestamp_w_tz) + assert seconds == get_epoch_seconds(ts_w_tz) + output_1 = from_epoch_seconds(seconds, with_timezone=False) + assert ts_wo_tz == output_1 + output_2 = from_epoch_seconds(seconds, with_timezone=True) + assert ts_w_tz == output_2 + + +@pytest.mark.parametrize("timestamp, expected_seconds", TEST_EPOCH_TO_SECONDS) +def test_epoch_to_seconds(timestamp, expected_seconds): + assert expected_seconds == get_epoch_seconds(timestamp) + + +@pytest.mark.parametrize("relative_time_string, with_timezone, now, expected", TEST_CONVERT_RELATIVE_TO_DATETIME) +def test_convert_relative_to_datetime(relative_time_string, with_timezone, now, expected): + output = convert_relative_to_datetime(relative_time_string, with_timezone=with_timezone, now=now) + assert expected == output + + +@pytest.mark.parametrize("input_string, input_name, backend, with_timezone, now, expected", TEST_GET_RELATIVE_TIME_OPTION) +def test_get_relative_time_option(input_string, input_name, backend, with_timezone, now, expected): + output = get_relative_time_option(input_string, input_name, backend=backend, with_timezone=with_timezone, now=now) + assert expected == output