Skip to content

Commit

Permalink
Properly map dateutil timezones to tzid
Browse files Browse the repository at this point in the history
  • Loading branch information
niccokunzmann committed Nov 15, 2024
1 parent 28308bc commit 2a639dd
Show file tree
Hide file tree
Showing 3 changed files with 879 additions and 18,954 deletions.
180 changes: 67 additions & 113 deletions src/icalendar/timezone/equivalent_timezone_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
See also:
- https://stackoverflow.com/questions/79185519/which-timezones-are-equivalent
"""
from __future__ import annotations

Expand All @@ -19,13 +20,15 @@
from pathlib import Path
from pprint import pprint
from time import time
from typing import Callable
from typing import Callable, NamedTuple, Optional

from zoneinfo import ZoneInfo, available_timezones

from pytz import AmbiguousTimeError, NonExistentTimeError


def check(dt, tz:tzinfo):
return (dt, tz.utcoffset(dt), tz.tzname(dt))
return (dt, tz.utcoffset(dt))

def checks(tz:tzinfo) -> tuple:
result = []
Expand All @@ -38,13 +41,13 @@ def checks(tz:tzinfo) -> tuple:


START = datetime(1970, 1, 1) # noqa: DTZ001
END = datetime(2030, 1, 1) # noqa: DTZ001
END = datetime(2000, 1, 1) # noqa: DTZ001

DTS = []
dt = START
while dt <= END:
DTS.append(dt)
dt += timedelta(hours=1)
dt += timedelta(hours=25) # This must be big enough to be fast and small enough to identify the timeszones before it is the present year
del dt

def main(
Expand All @@ -58,119 +61,71 @@ def main(
if we mix timezone implementations.
"""
print(create_timezones, name)
dtids2tzids = defaultdict(tuple) # checks -> tzids
# tzids without timezones that change
tzids = [
tzid for tzid in sorted(available_timezones())
if tzid.lower() not in ("factory", "localtime")
unsorted_tzids = available_timezones()
unsorted_tzids.remove("localtime")
unsorted_tzids.remove("Factory")

class TZ(NamedTuple):
tz: tzinfo
id:str

tzs = [
TZ(create_timezone(tzid), tzid)
for create_timezone in create_timezones
for tzid in unsorted_tzids
]
print("Press Control+C for partial computation.")
write_to_result_file = True

try:
start = time()
if pool_size > 1:
pool = Pool(pool_size)
pmap = pool.map
else:
pmap = map
iter = enumerate(sorted(tzids))
for i, tzid in iter:
this_go_tzids = [tzid]
if pool_size > 1:
for i, tzid in iter:
this_go_tzids.append(tzid)
if len(this_go_tzids) >= pool_size:
break
for create_timezone in create_timezones:
tzid_checks = pmap(checks, list(map(create_timezone, this_go_tzids)))
for tzid_check in tzid_checks:
tzid_check = tuple(tzid_check)
_tzids = dtids2tzids[tzid_check]
if tzid not in _tzids:
dtids2tzids[tzid_check] = _tzids + (tzid,)
duration = time() - start
print(f"{i+1}/{len(tzids)} timezones, {timedelta(seconds=int(duration * len(tzids) / (i+1) - duration))} remaining.")
except KeyboardInterrupt:
write_to_result_file = False
pass

print("The following ids are equivalent")
for tzids in dtids2tzids.values():
print(tzids)


# def generate_count(ids_list:list[list]):
# """-> dt_id -> count"""
# counts = defaultdict(int)
# for ids in ids_list:
# for dt_id in ids:
# counts[dt_id] += 1
# return counts

print("counting tzids for dtids")
# datetime -> dtid -> tzids
dates : list[datetime] = []
lookup :dict[datetime, dict[tuple, list[str]]]= defaultdict(dict)
tzids2dtids : dict[tuple, tuple] = {
v:k for k, v in dtids2tzids.items()
}
# uniquedtid2tzids = dtid -> [tzids, ...]
n = len(dtids2tzids)
uniquedtid2tzids : dict[tuple, list[tuple|str]] = defaultdict(list)
for i, (dtids, tzids) in enumerate(dtids2tzids.items()):
for dtid in dtids:
uniquedtid2tzids[dtid].append(tzids)
print(f"{i+1}/{n}")
print("finding identifying ids")
n = len(uniquedtid2tzids.items())
p = 0
for i, (dtid, tzids) in enumerate(list(uniquedtid2tzids.items())):
if len(tzids) == 1:
uniquedtid2tzids[dtid] = tzids[0]
else:
del uniquedtid2tzids[dtid]
p_ = int((i+1)/n*100)
if p_ != p:
p = p_
print(f"{p}%")
# uniquedtid2tzids = dtid -> tuple[tzids]
dt2uniquedtids = defaultdict(set)
for dtid in uniquedtid2tzids:
dt2uniquedtids[dtid[0]].add(dtid)

print("Computing tree to find tzids")
n = len(dt2uniquedtids)
p = 0
while dt2uniquedtids:
dt = min(dt2uniquedtids)
uniquedtids = dt2uniquedtids.pop(dt)
dates.append(dt)
for uniquedtid in uniquedtids:
tzids = uniquedtid2tzids[uniquedtid]
lookup[dt][uniquedtid[1:]] = tzids
for dtid in tzids2dtids[tzids]:
dtids_to_shrink = dt2uniquedtids[dtid[0]]
dtids_to_shrink-= {dtid}
if not dtids_to_shrink:
del dt2uniquedtids[dtid[0]]
p_ = int(100 - len(dt2uniquedtids)/n*100)
if p_ != p:
p = p_
print(f"{p}%")
def generate_tree(
tzs: list[TZ],
step:timedelta=timedelta(hours=1),
start:datetime=START,
end:datetime=END,
todo:Optional[set[str]]=None
) -> tuple[datetime, dict[timedelta, set[str]]] | set[str]: # should be recursive
"""Generate a lookup tree."""
if todo is None:
todo = [tz.id for tz in tzs]
print(f"{len(todo)} left to compute")
print(len(tzs))
if len(tzs) == 0:
raise ValueError("tzs cannot be empty")
if len(tzs) == 1:
todo.remove(tzs[0].id)
return {tzs[0].id}
while start < end:
offsets : dict[timedelta, list[TZ]] = defaultdict(list)
try:
for tz in tzs:
offsets[tz.tz.utcoffset(start)].append(tz)
except (NonExistentTimeError, AmbiguousTimeError):
start += step
continue
if len(offsets) == 1:
start += step
continue
lookup = {}
for offset, tzs in offsets.items():
lookup[offset] = generate_tree(tzs=tzs, step=step, start=start + step, end=end, todo=todo)
return start, lookup
result = set()
for tz in tzs:
result.add(tz.id)
todo.remove(tz.id)
return result


lookup = generate_tree(tzs, step=timedelta(hours=33))

file = Path(__file__).parent / f"equivalent_timezone_ids_{name}.py"
print(f"The result is written to {file}.")
lookup = dict(lookup)
print("lookup = ", end="")
pprint(lookup)
if write_to_result_file:
with file.open("w") as f:
f.write(f"'''This file is automatically generated by {Path(__file__).name}'''\n")
f.write("import datetime\n\n")
f.write(f"lookup = ")
pprint(lookup, stream=f)
f.write("\n\n__all__ = ['lookup']\n")
with file.open("w") as f:
f.write(f"'''This file is automatically generated by {Path(__file__).name}'''\n")
f.write("import datetime\n\n")
f.write(f"\nlookup = ")
pprint(lookup, stream=f)
f.write("\n\n__all__ = ['lookup', 'dates']\n")

return lookup

Expand All @@ -180,8 +135,7 @@ def main(
from dateutil.tz import gettz
from pytz import timezone
from zoneinfo import ZoneInfo
main([ZoneInfo, timezone, gettz], "test",
# add more timezone implementations if you like
main([ZoneInfo, timezone, gettz], "result",
pool_size=1
)
# main([timezone], "pytz")
# main([gettz], "dateutil")
Loading

0 comments on commit 2a639dd

Please sign in to comment.