Skip to content

Commit

Permalink
Add Unit test for convert_stream_to_dict_format
Browse files Browse the repository at this point in the history
  • Loading branch information
davitmamrikishvili committed Nov 27, 2024
1 parent 86dfd28 commit ee3b806
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 26 deletions.
14 changes: 14 additions & 0 deletions tests/tests_unit/tests_common/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
import pytest
from datetime import datetime

from th2_data_services.data_source.lwdp import Stream, Streams
from th2_data_services.data_source.lwdp.streams import _convert_stream_to_dict_format
from th2_data_services.data_source.lwdp.utils import _check_timestamp


def test_datetime_invalid_type():
with pytest.raises(Exception) as err:
_check_timestamp(datetime.now().timestamp())
assert "Provided timestamp should be `datetime`, `str` or `int` object in UTC time" in str(err)


def test_convert_stream_to_dict_format():
streams = ['s1:1', {'sessionAlias': 's2', 'direction': ['IN']}, Stream('s3', 2), Streams(['s4'])]
expected = [
{'sessionAlias': 's1', 'direction': ['IN']},
{'sessionAlias': 's2', 'direction': ['IN']},
{'sessionAlias': 's3', 'direction': ['OUT']},
{'sessionAlias': 's4'}
]
assert _convert_stream_to_dict_format(streams) == expected
13 changes: 12 additions & 1 deletion th2_data_services/data_source/lwdp/commands/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -1277,7 +1277,14 @@ def __init__(
streams: List of streams to search messages from the specified groups.
You will receive only the specified streams and directions for them.
You can specify direction for your streams.
e.g. ['stream_abc:1']. 1 - IN, 2 - OUT.
e.g.:
['stream_abc:1']. 1 - IN, 2 - OUT.
[
{
"sessionAlias": "stream_abc",
"directions": ["IN"]
}
]
fast_fail: If true, stops task execution right after first error.
"""
response_formats = _get_response_format(response_formats)
Expand Down Expand Up @@ -1498,6 +1505,7 @@ def __init__(
You will receive only the specified streams and directions for them.
You can specify direction for your streams.
e.g.:
['stream_abc:1']. 1 - IN, 2 - OUT.
[
{
"sessionAlias": "stream_abc",
Expand Down Expand Up @@ -1600,6 +1608,7 @@ def __init__(
You will receive only the specified streams and directions for them.
You can specify direction for your streams.
e.g.:
['stream_abc:1']. 1 - IN, 2 - OUT.
[
{
"sessionAlias": "stream_abc",
Expand Down Expand Up @@ -1797,6 +1806,7 @@ def __init__(
You will receive only the specified streams and directions for them.
You can specify direction for your streams.
e.g.:
['stream_abc:1']. 1 - IN, 2 - OUT.
[
{
"sessionAlias": "stream_abc",
Expand Down Expand Up @@ -2284,6 +2294,7 @@ def __init__(
You will receive only the specified streams and directions for them.
You can specify direction for your streams.
e.g.:
['stream_abc:1']. 1 - IN, 2 - OUT.
[
{
"sessionAlias": "stream_abc",
Expand Down
29 changes: 4 additions & 25 deletions th2_data_services/data_source/lwdp/source_api/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@
# LOG import logging
from http import HTTPStatus
from typing import Generator, List, Optional, Tuple, Union
from urllib.parse import quote

import requests
from requests import Response
from urllib3 import PoolManager, exceptions
from urllib.parse import quote

from th2_data_services.data_source.lwdp import Stream, Streams
from th2_data_services.data_source.lwdp.commands.http import T_streams
from th2_data_services.data_source.lwdp.interfaces.source_api import IHTTPSourceAPI
from th2_data_services.data_source.lwdp.streams import _convert_stream_to_dict_format


# LOG logger = logging.getLogger("th2_data_services")
Expand Down Expand Up @@ -353,7 +352,7 @@ def post_download_messages(
groups: List[str],
sort: bool = False,
response_formats: List[str] = None,
streams: T_streams = [],
streams: List[str] = [],
fast_fail: bool = True,
) -> Tuple[str, dict]:
"""REST-API `download` call downloads messages in specified time range in json format.
Expand All @@ -374,27 +373,7 @@ def post_download_messages(
URL for downloading messages and dictionary for request body.
"""

def convert_stream_to_dict_format():
def map_func(s):
if isinstance(s, Stream):
return [s.convert_to_dict_format()]

if isinstance(s, Streams):
return s.convert_to_dict_format()

if isinstance(s, str):
if ":" in s:
stream_obj = Stream(*s.split(":"))
else:
stream_obj = Stream(s)
return [stream_obj.convert_to_dict_format()]
return [s]

if isinstance(streams, List):
return list(map(map_func, streams))
return map_func(streams)

streams = sum(convert_stream_to_dict_format(), [])
streams = _convert_stream_to_dict_format(streams)

kwargs = {
"resource": "MESSAGES",
Expand Down
21 changes: 21 additions & 0 deletions th2_data_services/data_source/lwdp/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,24 @@ def convert_to_dict_format(self) -> List[dict]:
return [
Stream(stream, self._direction).convert_to_dict_format() for stream in self._aliases
]


def _convert_stream_to_dict_format(streams):
def map_func(s):
if isinstance(s, Stream):
return [s.convert_to_dict_format()]

if isinstance(s, Streams):
return s.convert_to_dict_format()

if isinstance(s, str):
if ":" in s:
stream_obj = Stream(*s.split(":"))
else:
stream_obj = Stream(s)
return [stream_obj.convert_to_dict_format()]
return [s]

if isinstance(streams, List):
return [item for sublist in map(map_func, streams) for item in sublist]
return map_func(streams)

0 comments on commit ee3b806

Please sign in to comment.