Skip to content

Commit

Permalink
fix(targets): Deserialize floats as decimal.Decimal
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Jul 5, 2023
1 parent 5f59f38 commit f1de868
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 9 deletions.
29 changes: 20 additions & 9 deletions singer_sdk/io_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import abc
import decimal
import json
import logging
import sys
Expand Down Expand Up @@ -49,6 +50,24 @@ def _assert_line_requires(line_dict: dict, requires: set[str]) -> None:
msg = f"Line is missing required {', '.join(missing)} key(s): {line_dict}"
raise Exception(msg)

def deserialize_json(self, line: str) -> dict:
"""Deserialize a line of json.
Args:
line: A single line of json.
Returns:
A dictionary of the deserialized json.
Raises:
json.decoder.JSONDecodeError: raised if any lines are not valid json
"""
try:
return json.loads(line, parse_float=decimal.Decimal)
except json.decoder.JSONDecodeError as exc:
logger.error("Unable to parse:\n%s", line, exc_info=exc)
raise

def _process_lines(self, file_input: t.IO[str]) -> t.Counter[str]:
"""Internal method to process jsonl lines from a Singer tap.
Expand All @@ -57,18 +76,10 @@ def _process_lines(self, file_input: t.IO[str]) -> t.Counter[str]:
Returns:
A counter object for the processed lines.
Raises:
json.decoder.JSONDecodeError: raised if any lines are not valid json
"""
stats: dict[str, int] = defaultdict(int)
for line in file_input:
try:
line_dict = json.loads(line)
except json.decoder.JSONDecodeError as exc:
logger.error("Unable to parse:\n%s", line, exc_info=exc)
raise

line_dict = self.deserialize_json(line)
self._assert_line_requires(line_dict, requires={"type"})

record_type: SingerMessageType = line_dict["type"]
Expand Down
45 changes: 45 additions & 0 deletions tests/core/test_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""Test IO operations."""

from __future__ import annotations

import decimal

import pytest

from singer_sdk.io_base import SingerReader


class DummyReader(SingerReader):
def _process_activate_version_message(self, message_dict: dict) -> None:
pass

def _process_batch_message(self, message_dict: dict) -> None:
pass

def _process_record_message(self, message_dict: dict) -> None:
pass

def _process_schema_message(self, message_dict: dict) -> None:
pass

def _process_state_message(self, message_dict: dict) -> None:
pass


@pytest.mark.parametrize(
"line,expected",
[
pytest.param(
b'{"type": "RECORD", "stream": "users", "record": {"id": 1, "value": 1.23}}', # noqa: E501
{
"type": "RECORD",
"stream": "users",
"record": {"id": 1, "value": decimal.Decimal("1.23")},
},
id="record",
),
],
)
def test_deserialize(line, expected):
reader = DummyReader()
assert reader.deserialize_json(line) == expected

0 comments on commit f1de868

Please sign in to comment.