Skip to content

Commit

Permalink
Refactor time code, add tests, fix bug when parsing absolute timestam…
Browse files Browse the repository at this point in the history
…ps that omit seconds (ansible-collections#745)

* Add time module utils.

* Add time helpers to ACME backend.

* Add changelog fragment.

* ACME timestamp parser: do not choke on nanoseconds.
  • Loading branch information
felixfontein authored May 3, 2024
1 parent 9501a28 commit 0a15be1
Show file tree
Hide file tree
Showing 19 changed files with 755 additions and 119 deletions.
2 changes: 2 additions & 0 deletions changelogs/fragments/745-absolute-time.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
bugfixes:
- "x509_crl, x509_certificate, x509_certificate_info - when parsing absolute timestamps which omitted the second count, the first digit of the minutes was used as a one-digit minutes count, and the second digit of the minutes as a one-digit second count (https://github.com/ansible-collections/community.crypto/pull/745)."
42 changes: 35 additions & 7 deletions plugins/module_utils/acme/backend_cryptography.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import base64
import binascii
import datetime
import os
import traceback

Expand All @@ -21,6 +22,7 @@
from ansible_collections.community.crypto.plugins.module_utils.acme.backends import (
CertificateInformation,
CryptoBackend,
_parse_acme_timestamp,
)

from ansible_collections.community.crypto.plugins.module_utils.acme.certificates import (
Expand All @@ -41,12 +43,6 @@
convert_int_to_hex,
)

from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
get_now_datetime,
ensure_utc_timezone,
parse_name_field,
)

from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
CRYPTOGRAPHY_TIMEZONE,
cryptography_name_to_oid,
Expand All @@ -59,6 +55,18 @@
extract_first_pem,
)

from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
parse_name_field,
)

from ansible_collections.community.crypto.plugins.module_utils.time import (
ensure_utc_timezone,
from_epoch_seconds,
get_epoch_seconds,
get_now_datetime,
UTC,
)

CRYPTOGRAPHY_MINIMAL_VERSION = '1.5'

CRYPTOGRAPHY_ERROR = None
Expand Down Expand Up @@ -173,6 +181,26 @@ class CryptographyBackend(CryptoBackend):
def __init__(self, module):
super(CryptographyBackend, self).__init__(module)

def get_now(self):
return get_now_datetime(with_timezone=CRYPTOGRAPHY_TIMEZONE)

def parse_acme_timestamp(self, timestamp_str):
return _parse_acme_timestamp(timestamp_str, with_timezone=CRYPTOGRAPHY_TIMEZONE)

def interpolate_timestamp(self, timestamp_start, timestamp_end, percentage):
start = get_epoch_seconds(timestamp_start)
end = get_epoch_seconds(timestamp_end)
return from_epoch_seconds(start + percentage * (end - start), with_timezone=CRYPTOGRAPHY_TIMEZONE)

def get_utc_datetime(self, *args, **kwargs):
kwargs_ext = dict(kwargs)
if CRYPTOGRAPHY_TIMEZONE and ('tzinfo' not in kwargs_ext and len(args) < 8):
kwargs_ext['tzinfo'] = UTC
result = datetime.datetime(*args, **kwargs_ext)
if CRYPTOGRAPHY_TIMEZONE and ('tzinfo' in kwargs or len(args) >= 8):
result = ensure_utc_timezone(result)
return result

def parse_key(self, key_file=None, key_content=None, passphrase=None):
'''
Parses an RSA or Elliptic Curve key file in PEM format and returns key_data.
Expand Down Expand Up @@ -379,7 +407,7 @@ def get_cert_days(self, cert_filename=None, cert_content=None, now=None):
raise BackendException('Cannot parse certificate {0}: {1}'.format(cert_filename, e))

if now is None:
now = get_now_datetime(with_timezone=CRYPTOGRAPHY_TIMEZONE)
now = self.get_now()
elif CRYPTOGRAPHY_TIMEZONE:
now = ensure_utc_timezone(now)
return (get_not_valid_after(cert) - now).days
Expand Down
64 changes: 64 additions & 0 deletions plugins/module_utils/acme/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,23 @@

from collections import namedtuple
import abc
import datetime
import re

from ansible.module_utils import six

from ansible_collections.community.crypto.plugins.module_utils.acme.errors import (
BackendException,
)

from ansible_collections.community.crypto.plugins.module_utils.time import (
ensure_utc_timezone,
from_epoch_seconds,
get_epoch_seconds,
get_now_datetime,
remove_timezone,
)


CertificateInformation = namedtuple(
'CertificateInformation',
Expand All @@ -31,11 +41,65 @@
)


_FRACTIONAL_MATCHER = re.compile(r'^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})(|\.\d+)(Z|[+-]\d{2}:?\d{2}.*)$')


def _reduce_fractional_digits(timestamp_str):
"""
Given a RFC 3339 timestamp that includes too many digits for the fractional seconds part, reduces these to at most 6.
"""
# RFC 3339 (https://www.rfc-editor.org/info/rfc3339)
m = _FRACTIONAL_MATCHER.match(timestamp_str)
if not m:
raise BackendException('Cannot parse ISO 8601 timestamp {0!r}'.format(timestamp_str))
timestamp, fractional, timezone = m.groups()
if len(fractional) > 7:
# Python does not support anything smaller than microseconds
# (Golang supports nanoseconds, Boulder often emits more fractional digits, which Python chokes on)
fractional = fractional[:7]
return '%s%s%s' % (timestamp, fractional, timezone)


def _parse_acme_timestamp(timestamp_str, with_timezone):
"""
Parses a RFC 3339 timestamp.
"""
# RFC 3339 (https://www.rfc-editor.org/info/rfc3339)
timestamp_str = _reduce_fractional_digits(timestamp_str)
for format in ('%Y-%m-%dT%H:%M:%SZ', '%Y-%m-%dT%H:%M:%S.%fZ', '%Y-%m-%dT%H:%M:%S%z', '%Y-%m-%dT%H:%M:%S.%f%z'):
# Note that %z won't work with Python 2... https://stackoverflow.com/a/27829491
try:
result = datetime.datetime.strptime(timestamp_str, format)
except ValueError:
pass
else:
return ensure_utc_timezone(result) if with_timezone else remove_timezone(result)
raise BackendException('Cannot parse ISO 8601 timestamp {0!r}'.format(timestamp_str))


@six.add_metaclass(abc.ABCMeta)
class CryptoBackend(object):
def __init__(self, module):
self.module = module

def get_now(self):
return get_now_datetime(with_timezone=False)

def parse_acme_timestamp(self, timestamp_str):
# RFC 3339 (https://www.rfc-editor.org/info/rfc3339)
return _parse_acme_timestamp(timestamp_str, with_timezone=False)

def interpolate_timestamp(self, timestamp_start, timestamp_end, percentage):
start = get_epoch_seconds(timestamp_start)
end = get_epoch_seconds(timestamp_end)
return from_epoch_seconds(start + percentage * (end - start), with_timezone=False)

def get_utc_datetime(self, *args, **kwargs):
result = datetime.datetime(*args, **kwargs)
if 'tzinfo' in kwargs or len(args) >= 8:
result = remove_timezone(result)
return result

@abc.abstractmethod
def parse_key(self, key_file=None, key_content=None, passphrase=None):
'''
Expand Down
2 changes: 1 addition & 1 deletion plugins/module_utils/acme/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from ansible_collections.community.crypto.plugins.module_utils.crypto.math import convert_int_to_bytes

from ansible_collections.community.crypto.plugins.module_utils.crypto.support import get_now_datetime
from ansible_collections.community.crypto.plugins.module_utils.time import get_now_datetime


def nopad_b64(data):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
load_certificate,
get_now_datetime,
get_relative_time_option,
)

from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
Expand All @@ -34,6 +32,11 @@
CertificateProvider,
)

from ansible_collections.community.crypto.plugins.module_utils.time import (
get_now_datetime,
get_relative_time_option,
)

try:
from cryptography.x509.oid import NameOID
except ImportError:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
load_certificate,
get_fingerprint_of_bytes,
get_now_datetime,
)

from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
Expand All @@ -40,6 +39,10 @@
get_publickey_info,
)

from ansible_collections.community.crypto.plugins.module_utils.time import (
get_now_datetime,
)

MINIMAL_CRYPTOGRAPHY_VERSION = '1.6'

CRYPTOGRAPHY_IMP_ERR = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
load_privatekey,
load_certificate,
get_relative_time_option,
select_message_digest,
)

Expand All @@ -44,6 +43,10 @@
CertificateProvider,
)

from ansible_collections.community.crypto.plugins.module_utils.time import (
get_relative_time_option,
)

try:
import cryptography
from cryptography import x509
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from random import randrange

from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
get_relative_time_option,
select_message_digest,
)

Expand All @@ -34,6 +33,10 @@
CertificateProvider,
)

from ansible_collections.community.crypto.plugins.module_utils.time import (
get_relative_time_option,
)

try:
import cryptography
from cryptography import x509
Expand Down
92 changes: 9 additions & 83 deletions plugins/module_utils/crypto/support.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,25 @@


import abc
import datetime
import errno
import hashlib
import os
import re

from ansible.module_utils import six
from ansible.module_utils.common.text.converters import to_native, to_bytes
from ansible.module_utils.common.text.converters import to_bytes

from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import (
identify_pem_format,
)

from ansible_collections.community.crypto.plugins.module_utils.time import ( # noqa: F401, pylint: disable=unused-import
# These imports are for backwards compatibility
get_now_datetime,
ensure_utc_timezone,
convert_relative_to_datetime,
get_relative_time_option,
)

try:
from OpenSSL import crypto
HAS_PYOPENSSL = True
Expand Down Expand Up @@ -279,86 +285,6 @@ def parse_ordered_name_field(input_list, name_field_name):
return result


def get_now_datetime(with_timezone):
if with_timezone:
return datetime.datetime.now(tz=datetime.timezone.utc)
return datetime.datetime.utcnow()


def ensure_utc_timezone(timestamp):
if timestamp.tzinfo is not None:
return timestamp
return timestamp.astimezone(datetime.timezone.utc)


def convert_relative_to_datetime(relative_time_string, with_timezone=False):
"""Get a datetime.datetime or None from a string in the time format described in sshd_config(5)"""

parsed_result = re.match(
r"^(?P<prefix>[+-])((?P<weeks>\d+)[wW])?((?P<days>\d+)[dD])?((?P<hours>\d+)[hH])?((?P<minutes>\d+)[mM])?((?P<seconds>\d+)[sS]?)?$",
relative_time_string)

if parsed_result is None or len(relative_time_string) == 1:
# not matched or only a single "+" or "-"
return None

offset = datetime.timedelta(0)
if parsed_result.group("weeks") is not None:
offset += datetime.timedelta(weeks=int(parsed_result.group("weeks")))
if parsed_result.group("days") is not None:
offset += datetime.timedelta(days=int(parsed_result.group("days")))
if parsed_result.group("hours") is not None:
offset += datetime.timedelta(hours=int(parsed_result.group("hours")))
if parsed_result.group("minutes") is not None:
offset += datetime.timedelta(
minutes=int(parsed_result.group("minutes")))
if parsed_result.group("seconds") is not None:
offset += datetime.timedelta(
seconds=int(parsed_result.group("seconds")))

now = get_now_datetime(with_timezone=with_timezone)
if parsed_result.group("prefix") == "+":
return now + offset
else:
return now - offset


def get_relative_time_option(input_string, input_name, backend='cryptography', with_timezone=False):
"""Return an absolute timespec if a relative timespec or an ASN1 formatted
string is provided.
The return value will be a datetime object for the cryptography backend,
and a ASN1 formatted string for the pyopenssl backend."""
result = to_native(input_string)
if result is None:
raise OpenSSLObjectError(
'The timespec "%s" for %s is not valid' %
input_string, input_name)
# Relative time
if result.startswith("+") or result.startswith("-"):
result_datetime = convert_relative_to_datetime(result, with_timezone=with_timezone)
if backend == 'pyopenssl':
return result_datetime.strftime("%Y%m%d%H%M%SZ")
elif backend == 'cryptography':
return result_datetime
# Absolute time
if backend == 'cryptography':
for date_fmt in ['%Y%m%d%H%M%SZ', '%Y%m%d%H%MZ', '%Y%m%d%H%M%S%z', '%Y%m%d%H%M%z']:
try:
res = datetime.datetime.strptime(result, date_fmt)
except ValueError:
pass
else:
if with_timezone:
res = res.astimezone(datetime.timezone.utc)
return res

raise OpenSSLObjectError(
'The time spec "%s" for %s is invalid' %
(input_string, input_name)
)


def select_message_digest(digest_string):
digest = None
if digest_string == 'sha256':
Expand Down
Loading

0 comments on commit 0a15be1

Please sign in to comment.