Skip to content

Commit

Permalink
tries fixing #127 and #55 again
Browse files Browse the repository at this point in the history
  • Loading branch information
WolfgangFahl committed Aug 20, 2024
1 parent aeba301 commit 00f97fe
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 192 deletions.
183 changes: 2 additions & 181 deletions lodstorage/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import datetime
import io
import logging
import re

# python standard library
Expand All @@ -15,6 +14,7 @@
import time

from lodstorage.lod import LOD
from lodstorage.sqlite_api import SQLiteApiFixer


class SQLDB(object):
Expand Down Expand Up @@ -52,6 +52,7 @@ def __init__(
self.dbname = dbname
self.debug = debug
self.errorDebug = errorDebug
SQLiteApiFixer.install(lenient=debug)
if connection is None:
self.c = sqlite3.connect(
dbname,
Expand Down Expand Up @@ -652,183 +653,3 @@ def fixDates(self, resultList):
dt = datetime.datetime.strptime(record[key], "%Y-%m-%d")
dateValue = dt.date()
record[key] = dateValue


# sqlite2 adapters as needed as of python 3.12
def adapt_date_iso(val: datetime.date):
"""Adapt datetime.date to ISO 8601 date."""
return val.isoformat()


def adapt_datetime_iso(val: datetime.datetime):
"""Adapt datetime.datetime to timezone-naive ISO 8601 date."""
return val.isoformat()


def adapt_datetime_epoch(val: datetime.datetime):
"""Adapt datetime.datetime to Unix timestamp."""
return float(val.timestamp()) * 10**6


def adapt_boolean(val: bool):
"""Adapt boolean to int"""
return 1 if val else 0


sqlite3.register_adapter(datetime.date, adapt_date_iso)
sqlite3.register_adapter(datetime.datetime, adapt_datetime_iso)
# alternative way of storing datetimes
# sqlite3.register_adapter(datetime.datetime, adapt_datetime_epoch)
sqlite3.register_adapter(bool, adapt_boolean)


class DatetimeAdapter:
"""Singleton class for converting date and time formats with optional lenient error handling.
Attributes:
lenient (bool): Whether to handle conversion errors leniently, returning None and logging a warning.
"""

_instance = None

def __new__(cls, lenient: bool = False):
"""Ensure only one instance of the adapter exists.
Args:
lenient (bool): If True, the adapter will not raise exceptions on conversion failures.
Returns:
DatetimeAdapter: The singleton instance of the adapter.
"""
if cls._instance is None:
cls._instance = super(DatetimeAdapter, cls).__new__(cls)
cls._instance.lenient = lenient
return cls._instance

def _handle_input(self, val: bytes) -> str:
"""Validate and decode the input bytes into string.
Args:
val (bytes): The bytes input to validate and decode.
Returns:
str: The decoded string from bytes.
Raises:
TypeError: If the input is not bytes.
"""
if not isinstance(val, bytes):
raise TypeError("Input must be a byte string.")
return val.decode()

def _handle_error(self, error: Exception, val: bytes):
"""Handle errors based on the lenient mode.
Args:
error (Exception): The exception that was raised.
val (bytes): The input value that caused the error.
Returns:
None: If lenient mode is True and an error occurs.
Raises:
Exception: If lenient mode is False and an error occurs.
"""
if self.lenient:
logging.warning(f"Failed to convert {val}: {error}")
return None
else:
raise error

def convert_date(self, val: bytes) -> datetime.date:
"""Convert ISO 8601 date byte string to a datetime.date object.
Args:
val (bytes): The ISO 8601 date string in bytes.
Returns:
datetime.date: The converted date object.
"""
try:
decoded_date = self._handle_input(val)
return datetime.date.fromisoformat(decoded_date)
except Exception as e:
return self._handle_error(e, val)

def convert_datetime(self, val: bytes) -> datetime.datetime:
"""Convert ISO 8601 datetime byte string to a datetime.datetime object.
Args:
val (bytes): The ISO 8601 datetime string in bytes.
Returns:
datetime.datetime: The converted datetime object.
"""
try:
decoded_datetime = self._handle_input(val)
return datetime.datetime.fromisoformat(decoded_datetime)
except Exception as e:
return self._handle_error(e, val)

def convert_timestamp(self, val: bytes) -> datetime.datetime:
"""Convert Unix epoch timestamp byte string to a datetime.datetime object.
Args:
val (bytes): The Unix epoch timestamp in bytes.
Returns:
datetime.datetime: The converted datetime object.
"""
try:
decoded_string = self._handle_input(val)
timestamp_float = float(decoded_string) / 10**6
return datetime.datetime.fromtimestamp(timestamp_float)
except ValueError as _ve:
try:
# If not, try to parse it as a datetime string
dt = datetime.datetime.fromisoformat(decoded_string)
return dt
except Exception as e:
return self._handle_error(e, val)
except Exception as e:
return self._handle_error(e, val)

def set_lenient(self, lenient: bool):
"""Set the lenient mode of the adapter.
Args:
lenient (bool): True to enable lenient mode, False to disable it.
"""
self.lenient = lenient


# Usage Functions
def convert_date(val: bytes) -> datetime.date:
"""Convert byte string to date using the DatetimeAdapter."""
adapter = DatetimeAdapter()
return adapter.convert_date(val)


def convert_datetime(val: bytes) -> datetime.datetime:
"""Convert byte string to datetime using the DatetimeAdapter."""
adapter = DatetimeAdapter()
return adapter.convert_datetime(val)


def convert_timestamp(val: bytes) -> datetime.datetime:
"""Convert byte string to timestamp using the DatetimeAdapter."""
adapter = DatetimeAdapter()
return adapter.convert_timestamp(val)


def convert_boolean(val: bytes):
"""
Convert 0 or 1 to boolean
"""
return True if int(val) == 1 else False


sqlite3.register_converter("date", convert_date)
sqlite3.register_converter("datetime", convert_datetime)
sqlite3.register_converter("timestamp", convert_timestamp)
sqlite3.register_converter("boolean", convert_boolean)
115 changes: 115 additions & 0 deletions lodstorage/sqlite_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
'''
Created on 2024-08-24
@author: wf
'''
import sqlite3
import datetime
import logging

class DatetimeAdapter:
"""Class for converting date and time formats with optional lenient error handling."""

def __init__(self, lenient: bool = False):
"""Initialize with optional lenient error handling."""
self.lenient = lenient

def _handle_input(self, val: bytes) -> str:
"""Validate and decode the input bytes into string."""
if not isinstance(val, bytes):
raise TypeError("Input must be a byte string.")
return val.decode()

def _handle_error(self, error: Exception, val: bytes):
"""Handle errors based on the lenient mode."""
if self.lenient:
logging.warning(f"Failed to convert {val}: {error}")
return None
else:
raise error

def convert_date(self, val: bytes) -> datetime.date:
"""Convert ISO 8601 date byte string to a datetime.date object."""
try:
decoded_date = self._handle_input(val)
return datetime.date.fromisoformat(decoded_date)
except Exception as e:
return self._handle_error(e, val)

def convert_datetime(self, val: bytes) -> datetime.datetime:
"""Convert ISO 8601 datetime byte string to a datetime.datetime object."""
try:
decoded_datetime = self._handle_input(val)
return datetime.datetime.fromisoformat(decoded_datetime)
except Exception as e:
return self._handle_error(e, val)

def convert_timestamp(self, val: bytes) -> datetime.datetime:
"""Convert Unix epoch timestamp byte string to a datetime.datetime object."""
try:
decoded_string = self._handle_input(val)
timestamp_float = float(decoded_string) / 10**6
return datetime.datetime.fromtimestamp(timestamp_float)
except ValueError as _ve:
try:
dt = datetime.datetime.fromisoformat(decoded_string)
return dt
except Exception as e:
return self._handle_error(e, val)
except Exception as e:
return self._handle_error(e, val)


class SQLiteApiFixer:
"""
Class to register SQLite adapters
and converters using a DatetimeAdapter instance.
"""
_instance = None # Singleton instance

def __init__(self, lenient: bool = True):
"""Private constructor to initialize the singleton instance."""
self.adapter = DatetimeAdapter(lenient=lenient)
self.register_converters()
self.register_adapters()

@classmethod
def install(cls, lenient: bool = True):
"""Install the singleton instance and register SQLite adapters and converters."""
if cls._instance is None:
cls._instance = cls(lenient=lenient)
return cls._instance

def register_adapters(self):
"""Register the necessary SQLite adapters."""
sqlite3.register_adapter(datetime.date, self.adapt_date_iso)
sqlite3.register_adapter(datetime.datetime, self.adapt_datetime_iso)
sqlite3.register_adapter(bool, self.adapt_boolean)

def register_converters(self):
"""Register the necessary SQLite converters."""
sqlite3.register_converter("date", self.adapter.convert_date)
sqlite3.register_converter("datetime", self.adapter.convert_datetime)
sqlite3.register_converter("timestamp", self.adapter.convert_timestamp)
sqlite3.register_converter("boolean", self.convert_boolean)

@staticmethod
def adapt_date_iso(val: datetime.date):
"""Adapt datetime.date to ISO 8601 date."""
return val.isoformat()

@staticmethod
def adapt_datetime_iso(val: datetime.datetime):
"""Adapt datetime.datetime to timezone-naive ISO 8601 date."""
return val.isoformat()

@staticmethod
def adapt_boolean(val: bool):
"""Adapt boolean to int."""
return 1 if val else 0

@staticmethod
def convert_boolean(val: bytes):
"""Convert 0 or 1 to boolean."""
return bool(int(val))

22 changes: 11 additions & 11 deletions tests/test_sqlite3.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

from lodstorage.sample import Sample
from lodstorage.schema import Schema
from lodstorage.sql import SQLDB, DatetimeAdapter, EntityInfo, convert_timestamp
from lodstorage.sql import SQLDB, EntityInfo
from lodstorage.sqlite_api import SQLiteApiFixer
from lodstorage.uml import UML
from tests.basetest import Basetest

Expand Down Expand Up @@ -466,33 +467,31 @@ def testIssue127And55(self):
https://github.com/WolfgangFahl/pyLoDStorage/issues/55
datetime handling sqlite error should lead to warning and not raise an exception
"""
# Initialize the SQLiteApiFixer singleton
SQLiteApiFixer.install(lenient=True)

log_stream = StringIO()
handler = logging.StreamHandler(log_stream)
logger = logging.getLogger()
logger.addHandler(handler)
logger.setLevel(logging.WARNING)
_dta = DatetimeAdapter(lenient=True)

examples = [
(b"not-a-timestamp", None),
(
b"725811479000000",
datetime.fromtimestamp(725811479),
), # Correct microseconds to seconds
(b"725811479000000", datetime.fromtimestamp(725811479)), # Correct microseconds to seconds
(b"1995-04-07 00:00:00", datetime(1995, 4, 7, 0, 0)),
]

for val, expected in examples:
with self.subTest(val=val):
result = convert_timestamp(val)
result = SQLiteApiFixer._instance.adapter.convert_timestamp(val) # Use the correct method from the singleton
if expected is None:
self.assertIsNone(
result, "Expected None for invalid timestamp input"
)
# Check if the expected log message is in log_stream
log_content=log_stream.getvalue()

log_content = log_stream.getvalue()
self.assertIn("Failed to convert", log_content)
# Clear log stream after checking
log_stream.truncate(0)
Expand All @@ -503,10 +502,11 @@ def testIssue127And55(self):
expected,
f"Expected correct datetime conversion for {val}",
)

# Remove the handler after the test to clean up
logger.removeHandler(handler)
log_stream.close()
DatetimeAdapter._instance=None


def testMultipleAdapters(self):
"""
Expand Down

0 comments on commit 00f97fe

Please sign in to comment.