Skip to content

Commit

Permalink
mypy: common.py [BF-1560]
Browse files Browse the repository at this point in the history
Also in this commit:
- More unit tests;
- Removed redundant `()` in regex.
  • Loading branch information
Samuel Giffard committed Mar 22, 2023
1 parent ca5f52e commit e8702ac
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 26 deletions.
37 changes: 24 additions & 13 deletions pglookout/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,51 @@
Copyright (c) 2015 Ohmu Ltd
See LICENSE for details
"""
import datetime
from __future__ import annotations

from datetime import datetime, timedelta
from typing import Final

import re


def convert_xlog_location_to_offset(wal_location):
def convert_xlog_location_to_offset(wal_location: str) -> int:
log_id, offset = wal_location.split("/")
return int(log_id, 16) << 32 | int(offset, 16)


ISO_EXT_RE = re.compile(
ISO_EXT_RE: Final[re.Pattern[str]] = re.compile(
r"(?P<year>\d{4})-(?P<month>\d\d)-(?P<day>\d\d)(T(?P<hour>\d\d):(?P<minute>\d\d)"
r"(:(?P<second>\d\d)(.(?P<microsecond>\d{6}))?)?Z)?$"
)
ISO_BASIC_RE = re.compile(
ISO_BASIC_RE: Final[re.Pattern[str]] = re.compile(
r"(?P<year>\d{4})(?P<month>\d\d)(?P<day>\d\d)(T(?P<hour>\d\d)(?P<minute>\d\d)"
r"((?P<second>\d\d)((?P<microsecond>\d{6}))?)?Z)?$"
)
ISO_GROUP_NAMES: Final[tuple[str, ...]] = (
"year",
"month",
"day",
"hour",
"minute",
"second",
"microsecond",
)


def parse_iso_datetime(value):
def parse_iso_datetime(value: str) -> datetime:
match = ISO_EXT_RE.match(value)
if not match:
match = ISO_BASIC_RE.match(value)
if not match:
raise ValueError(f"Invalid ISO timestamp {value!r}")
parts = dict(
(key, int(match.group(key) or "0")) for key in ("year", "month", "day", "hour", "minute", "second", "microsecond")
)
return datetime.datetime(tzinfo=None, **parts)
parts = {key: int(match.group(key) or "0") for key in ISO_GROUP_NAMES}
return datetime(tzinfo=None, **parts)


def get_iso_timestamp(fetch_time=None):
def get_iso_timestamp(fetch_time: datetime | None = None) -> str:
if not fetch_time:
fetch_time = datetime.datetime.utcnow()
elif fetch_time.tzinfo:
fetch_time = fetch_time.replace(tzinfo=None) - datetime.timedelta(seconds=fetch_time.utcoffset().seconds)
fetch_time = datetime.utcnow()
elif (offset := fetch_time.utcoffset()) is not None:
fetch_time = fetch_time.replace(tzinfo=None) - timedelta(seconds=offset.seconds)
return fetch_time.isoformat() + "Z"
2 changes: 0 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ exclude = [
# Implementation.
'pglookout/__main__.py',
'pglookout/cluster_monitor.py',
'pglookout/common.py',
'pglookout/current_master.py',
'pglookout/logutil.py',
'pglookout/pglookout.py',
Expand All @@ -43,7 +42,6 @@ exclude = [
# Tests.
'test/conftest.py',
'test/test_cluster_monitor.py',
'test/test_common.py',
'test/test_lookout.py',
'test/test_pgutil.py',
'test/test_webserver.py',
Expand Down
66 changes: 55 additions & 11 deletions test/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,77 @@
Copyright (c) 2015 Ohmu Ltd
See LICENSE for details
"""

from datetime import datetime
from pglookout.common import convert_xlog_location_to_offset, get_iso_timestamp, ISO_EXT_RE, parse_iso_datetime
from pytest import raises

import datetime
import pytest


def test_convert_xlog_location_to_offset():
def test_convert_xlog_location_to_offset() -> None:
assert convert_xlog_location_to_offset("1/00000000") == 1 << 32
assert convert_xlog_location_to_offset("F/AAAAAAAA") == (0xF << 32) | 0xAAAAAAAA
with raises(ValueError):
with pytest.raises(ValueError):
convert_xlog_location_to_offset("x")
with raises(ValueError):
with pytest.raises(ValueError):
convert_xlog_location_to_offset("x/y")


def test_parse_iso_datetime():
date = datetime.datetime.utcnow()
def test_parse_iso_datetime() -> None:
date = datetime.utcnow()
date.replace(microsecond=0)
assert date == parse_iso_datetime(date.isoformat() + "Z")
with raises(ValueError):
with pytest.raises(ValueError):
parse_iso_datetime("foobar")


def test_get_iso_timestamp():
def test_get_iso_timestamp() -> None:
v = get_iso_timestamp()
assert ISO_EXT_RE.match(v)
ts = datetime.datetime.now()
ts = datetime.now()
v = get_iso_timestamp(ts)
assert parse_iso_datetime(v) == ts


@pytest.mark.parametrize(
"timestamp",
[
datetime(2021, 1, 1, 23, 42, 11, 123456),
datetime(2021, 1, 1, 23, 42, 11),
datetime(2021, 1, 1, 23, 42),
datetime(2021, 1, 1, 23),
datetime(2021, 1, 1),
],
)
def test_roundtrip(timestamp: datetime) -> None:
ts2 = parse_iso_datetime(get_iso_timestamp(timestamp))

assert ts2 == timestamp


@pytest.mark.parametrize(
("value", "normalized_value"),
# fmt: off
[
# Extended format
("2021-01-01T00:00:00.000000Z", "2021-01-01T00:00:00Z"), # noqa: E241
("2021-01-01T23:42:11.123456Z", "2021-01-01T23:42:11.123456Z"), # noqa: E241
("2021-01-01T00:00:00Z", "2021-01-01T00:00:00Z"), # noqa: E241
("2021-01-01T23:42:11Z", "2021-01-01T23:42:11Z"), # noqa: E241
("2021-01-01T00:00Z", "2021-01-01T00:00:00Z"), # noqa: E241
("2021-01-01T23:42Z", "2021-01-01T23:42:00Z"), # noqa: E241
("2021-01-01", "2021-01-01T00:00:00Z"), # noqa: E241
# Basic format
("20210101T000000Z", "2021-01-01T00:00:00Z"), # noqa: E241
("20210101T234211123456Z", "2021-01-01T23:42:11.123456Z"), # noqa: E241
("20210101T000000Z", "2021-01-01T00:00:00Z"), # noqa: E241
("20210101T234211Z", "2021-01-01T23:42:11Z"), # noqa: E241
("20210101T0000Z", "2021-01-01T00:00:00Z"), # noqa: E241
("20210101T2342Z", "2021-01-01T23:42:00Z"), # noqa: E241
("20210101", "2021-01-01T00:00:00Z"), # noqa: E241
],
# fmt: on
)
def test_reverse_roundtrip(value: str, normalized_value: str) -> None:
v2 = get_iso_timestamp(parse_iso_datetime(value))

assert v2 == normalized_value

0 comments on commit e8702ac

Please sign in to comment.