diff --git a/release_notes.md b/release_notes.md index 02bb4c7..e7758c7 100644 --- a/release_notes.md +++ b/release_notes.md @@ -262,4 +262,5 @@ BugFixes without ticket ## BugFixes 1. [TH2-5222] - Fix `DownloadMessagesByPageGzip`, it constructed `DownloadMessagesByPageByGroupsGzip` incorrectly. -2. [TH2-5243] - Fix backward compatibility issues with `GetMessagesByBookByGroups` and `GetMessagesByPageByGroups`. +2. [TH2-5243] - Fix backward compatibility issues with `GetMessagesByBookByGroups` and `GetMessagesByPageByGroups`, + added 'limit' and 'search_direction' parameters to 'post_download_messages'. diff --git a/tests/tests_unit/tests_common/test_utils.py b/tests/tests_unit/tests_common/test_utils.py index 9cc1155..9c1147c 100644 --- a/tests/tests_unit/tests_common/test_utils.py +++ b/tests/tests_unit/tests_common/test_utils.py @@ -1,5 +1,8 @@ import pytest from datetime import datetime + +from th2_data_services.data_source.lwdp import Stream, Streams +from th2_data_services.data_source.lwdp.streams import _convert_stream_to_dict_format from th2_data_services.data_source.lwdp.utils import _check_timestamp @@ -7,3 +10,14 @@ def test_datetime_invalid_type(): with pytest.raises(Exception) as err: _check_timestamp(datetime.now().timestamp()) assert "Provided timestamp should be `datetime`, `str` or `int` object in UTC time" in str(err) + + +def test_convert_stream_to_dict_format(): + streams = ['s1:1', {'sessionAlias': 's2', 'direction': ['IN']}, Stream('s3', 2), Streams(['s4'])] + expected = [ + {'sessionAlias': 's1', 'direction': ['IN']}, + {'sessionAlias': 's2', 'direction': ['IN']}, + {'sessionAlias': 's3', 'direction': ['OUT']}, + {'sessionAlias': 's4'} + ] + assert _convert_stream_to_dict_format(streams) == expected diff --git a/th2_data_services/data_source/lwdp/commands/http.py b/th2_data_services/data_source/lwdp/commands/http.py index 5702228..5157ea8 100644 --- a/th2_data_services/data_source/lwdp/commands/http.py +++ b/th2_data_services/data_source/lwdp/commands/http.py @@ -12,8 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. import asyncio +import warnings from abc import abstractmethod -from typing import List, Optional, Union, Generator, Any +from typing import List, Optional, Union, Generator, Any, Dict from datetime import datetime from functools import partial from shutil import copyfileobj @@ -37,7 +38,7 @@ from th2_data_services.data_source.lwdp.interfaces.command import IHTTPCommand from th2_data_services.data_source.lwdp.data_source.http import DataSource from th2_data_services.data_source.lwdp.source_api.http import API -from th2_data_services.data_source.lwdp.streams import Streams, Stream +from th2_data_services.data_source.lwdp.streams import Streams, Stream, _convert_stream_to_dict_format from th2_data_services.utils.sse_client import SSEClient from th2_data_services.data_source.lwdp.adapters.adapter_sse import ( SSEAdapter, @@ -60,6 +61,21 @@ Event = dict +# Available stream formats: +# 1) str +# `['stream_abc:1']`, `['stream_abc']`, where 1 - IN, 2 - OUT. +# +# 2) dict +# ``` +# [ +# { +# "sessionAlias": "stream_abc", +# "directions": ["IN", "OUT"] +# } +# ] +# ``` +T_streams = Union[str, Stream, Streams, Dict, List[Union[str, Stream, Streams, Dict]]] + # LOG import logging @@ -1111,7 +1127,7 @@ def __init__( self, start_timestamp: Union[datetime, str, int], book_id: str, - streams: Union[List[Union[str, Streams, Stream]], Streams], + streams: T_streams, message_ids: List[str] = None, search_direction: str = "next", result_count_limit: int = None, @@ -1247,7 +1263,7 @@ def __init__( sort: bool = None, response_formats: Union[List[str], str] = None, keep_open: bool = None, - streams: List[str] = None, + streams: Optional[T_streams] = None, fast_fail: bool = True, ): """DownloadMessagesByPageGzip Constructor. @@ -1262,7 +1278,14 @@ def __init__( streams: List of streams to search messages from the specified groups. You will receive only the specified streams and directions for them. You can specify direction for your streams. - e.g. ['stream_abc:1']. 1 - IN, 2 - OUT. + e.g.: + ['stream_abc:1']. 1 - IN, 2 - OUT. + [ + { + "sessionAlias": "stream_abc", + "directions": ["IN"] + } + ] fast_fail: If true, stops task execution right after first error. """ response_formats = _get_response_format(response_formats) @@ -1406,6 +1429,42 @@ def do_req_and_store(fn, headers, url, raw_body): return do_req_and_store(f"{filename}.gz", headers, url, raw_body) +def _download_messages_old(api, urls, headers, filename): + """Downloads messages from LwDP and store to jsons.gz files. + + Args: + api: + urls: + headers: + filename: + + Returns: + None + """ + + def do_req_and_store(fn, headers, url): + with open(fn, "wb") as file: + try: + response = api.execute_request(url, headers=headers, stream=True) + response.raise_for_status() + + copyfileobj(response.raw, file) + except requests.exceptions.HTTPError as e: + print(e) + print() + raise + + if filename.endswith(".gz"): + filename = filename[:-3] + + if len(urls) == 1: + do_req_and_store(f"{filename}.gz", headers, urls[0]) + + else: + for num, url in enumerate(urls): + do_req_and_store(f"{filename}.{num + 1}.gz", headers, url) + + class DownloadMessagesByPageByGroupsGzip(IHTTPCommand): """A Class-Command for request to lw-data-provider. @@ -1431,8 +1490,10 @@ def __init__( book_id: str = None, sort: bool = None, response_formats: Union[List[str], str] = None, - streams: List[str] = [], + streams: Optional[T_streams] = [], fast_fail: bool = True, + limit: Optional[int] = None, + search_direction: str = "next", ): """DownloadMessagesByPageByGroupsGzip Constructor. @@ -1446,9 +1507,24 @@ def __init__( streams: List of streams to search messages from the specified groups. You will receive only the specified streams and directions for them. You can specify direction for your streams. - e.g. ['stream_abc:1']. 1 - IN, 2 - OUT. + e.g.: + ['stream_abc:1']. 1 - IN, 2 - OUT. + [ + { + "sessionAlias": "stream_abc", + "directions": ["IN"] + } + ] fast_fail: If true, stops task execution right after first error. + limit: Limit for messages in the response. No limit if not specified. + search_direction: Defines the order of the messages. """ + if sort is not None: + warnings.warn( + "The 'sort' parameter is deprecated and will be removed in a future version.", + DeprecationWarning, + ) + response_formats = _get_response_format(response_formats) _check_response_formats(response_formats) self._filename = filename @@ -1457,10 +1533,12 @@ def __init__( self._page = page self._book_id = book_id self._groups = groups - self._streams = streams + self._streams = _convert_stream_to_dict_format(streams) self._sort = sort self._response_formats = response_formats self._fast_fail = fast_fail + self._limit = limit + self._search_direction = search_direction _check_list_or_tuple(self._groups, var_name="groups") if streams is not None: @@ -1475,20 +1553,22 @@ def handle(self, data_source: DataSource) -> Data: else ProtobufTimestampConverter.to_nanoseconds(page.end_timestamp) ) self._book_id = page.book + api = data_source.source_api + headers = {"Accept": "application/stream+json", "Accept-Encoding": "gzip, deflate"} + url, body = api.post_download_messages( start_timestamp=self._start_timestamp, end_timestamp=self._end_timestamp, book_id=self._book_id, groups=self._groups, streams=self._streams, - sort=self._sort, response_formats=self._response_formats, fast_fail=self._fast_fail, + limit=self._limit, + search_direction=self._search_direction, ) - headers = {"Accept": "application/stream+json", "Accept-Encoding": "gzip, deflate"} - status = _download_messages(api, url, body, headers, self._filename) return Data.from_json(f"{self._filename}.gz", gzip=True).update_metadata( @@ -1522,8 +1602,10 @@ def __init__( groups: List[str], sort: bool = None, response_formats: Union[List[str], str] = None, - streams: List[str] = [], + streams: Optional[T_streams] = [], fast_fail: bool = True, + limit: Optional[int] = None, + search_direction: str = "next", ): """DownloadMessagesByBookByGroupsGzip Constructor. @@ -1541,9 +1623,24 @@ def __init__( streams: List of streams to search messages from the specified groups. You will receive only the specified streams and directions for them. You can specify direction for your streams. - e.g. ['stream_abc:1']. 1 - IN, 2 - OUT. + e.g.: + ['stream_abc:1']. 1 - IN, 2 - OUT. + [ + { + "sessionAlias": "stream_abc", + "directions": ["IN"] + } + ] fast_fail: If true, stops task execution right after first error. + limit: Limit for messages in the response. No limit if not specified. + search_direction: Defines the order of the messages. """ + if sort is not None: + warnings.warn( + "The 'sort' parameter is deprecated and will be removed in a future version.", + DeprecationWarning, + ) + response_formats = _get_response_format(response_formats) _check_response_formats(response_formats) _check_timestamp(start_timestamp) @@ -1564,11 +1661,13 @@ def __init__( if isinstance(end_timestamp, int): self._end_timestamp = UnixTimestampConverter.to_nanoseconds(end_timestamp) self._groups = groups - self._streams = streams + self._streams = _convert_stream_to_dict_format(streams) self._sort = sort self._response_formats = response_formats self._book_id = book_id self._fast_fail = fast_fail + self._limit = limit + self._search_direction = search_direction _check_list_or_tuple(self._groups, var_name="groups") if streams is not None: @@ -1576,17 +1675,19 @@ def __init__( def handle(self, data_source: DataSource): api = data_source.source_api + headers = {"Accept": "application/stream+json", "Accept-Encoding": "gzip, deflate"} + url, body = api.post_download_messages( start_timestamp=self._start_timestamp, end_timestamp=self._end_timestamp, book_id=self._book_id, groups=self._groups, streams=self._streams, - sort=self._sort, response_formats=self._response_formats, fast_fail=self._fast_fail, + limit=self._limit, + search_direction=self._search_direction, ) - headers = {"Accept": "application/stream+json", "Accept-Encoding": "gzip, deflate"} status = _download_messages(api, url, body, headers, self._filename) @@ -1613,7 +1714,7 @@ def __init__( sort: bool = None, response_formats: Union[List[str], str] = None, keep_open: bool = None, - streams: List[str] = None, + streams: Optional[T_streams] = None, # Non-data source args. max_url_length: int = 2048, char_enc: str = "utf-8", @@ -1713,9 +1814,11 @@ def __init__( groups: List[str], sort: bool = None, response_formats: Union[List[str], str] = None, - streams: List[str] = [], + streams: Optional[T_streams] = [], fast_fail: bool = True, cache: bool = False, + limit: Optional[int] = None, + search_direction: str = "next", ): """GetMessagesByBookByGroupsJson Constructor. @@ -1731,10 +1834,25 @@ def __init__( streams: List of streams to search messages from the specified groups. You will receive only the specified streams and directions for them. You can specify direction for your streams. - e.g. ['stream_abc:1']. 1 - IN, 2 - OUT. + e.g.: + ['stream_abc:1']. 1 - IN, 2 - OUT. + [ + { + "sessionAlias": "stream_abc", + "directions": ["IN"] + } + ] fast_fail: If true, stops task execution right after first error. cache: If True, all requested data from lw-data-provider will be saved to cache. + limit: Limit for messages in the response. No limit if not specified. + search_direction: Defines the order of the messages. """ + if sort is not None: + warnings.warn( + "The 'sort' parameter is deprecated and will be removed in a future version.", + DeprecationWarning, + ) + response_formats = _get_response_format(response_formats) _check_response_formats(response_formats) _check_timestamp(start_timestamp) @@ -1752,12 +1870,14 @@ def __init__( if isinstance(end_timestamp, int): self._end_timestamp = UnixTimestampConverter.to_nanoseconds(end_timestamp) self._groups = groups - self._streams = streams + self._streams = _convert_stream_to_dict_format(streams) self._sort = sort self._response_formats = response_formats self._book_id = book_id self._fast_fail = fast_fail self._cache = cache + self._limit = limit + self._search_direction = search_direction _check_list_or_tuple(self._groups, var_name="groups") if streams is not None: @@ -1771,9 +1891,10 @@ def handle(self, data_source: DataSource): book_id=self._book_id, groups=self._groups, streams=self._streams, - sort=self._sort, response_formats=self._response_formats, fast_fail=self._fast_fail, + limit=self._limit, + search_direction=self._search_direction, ) headers = {"Accept": "application/stream+json", "Accept-Encoding": "gzip, deflate"} @@ -1803,7 +1924,7 @@ def __init__( sort: bool = None, response_formats: Union[List[str], str] = None, keep_open: bool = None, - streams: List[str] = [], + streams: Optional[T_streams] = [], max_url_length: int = None, char_enc: str = None, decode_error_handler: str = None, @@ -2106,7 +2227,7 @@ def __init__( sort: bool = None, response_formats: Union[List[str], str] = None, keep_open: bool = None, - streams: List[str] = None, + streams: Optional[T_streams] = None, # Non-data source args. max_url_length: int = 2048, char_enc: str = "utf-8", @@ -2197,9 +2318,11 @@ def __init__( book_id: str = None, sort: bool = None, response_formats: Union[List[str], str] = None, - streams: List[str] = [], + streams: Optional[T_streams] = [], fast_fail: bool = True, cache: bool = False, + limit: Optional[int] = None, + search_direction: str = "next", ): """GetMessagesByPageByGroupsJson Constructor. @@ -2212,20 +2335,37 @@ def __init__( streams: List of streams to search messages from the specified groups. You will receive only the specified streams and directions for them. You can specify direction for your streams. - e.g. ['stream_abc:1']. 1 - IN, 2 - OUT. + e.g.: + ['stream_abc:1']. 1 - IN, 2 - OUT. + [ + { + "sessionAlias": "stream_abc", + "directions": ["IN"] + } + ] fast_fail: If true, stops task execution right after first error. cache: If True, all requested data from lw-data-provider will be saved to cache. + limit: Limit for messages in the response. No limit if not specified. + search_direction: Defines the order of the messages. """ + if sort is not None: + warnings.warn( + "The 'sort' parameter is deprecated and will be removed in a future version.", + DeprecationWarning, + ) + response_formats = _get_response_format(response_formats) _check_response_formats(response_formats) self._page = page self._book_id = book_id self._groups = groups - self._streams = streams + self._streams = _convert_stream_to_dict_format(streams) self._sort = sort self._response_formats = response_formats self._fast_fail = fast_fail self._cache = cache + self._limit = limit + self._search_direction = search_direction _check_list_or_tuple(self._groups, var_name="groups") if streams is not None: @@ -2247,9 +2387,10 @@ def handle(self, data_source: DataSource) -> Data: book_id=self._book_id, groups=self._groups, streams=self._streams, - sort=self._sort, response_formats=self._response_formats, fast_fail=self._fast_fail, + limit=self._limit, + search_direction=self._search_direction, ) headers = {"Accept": "application/stream+json", "Accept-Encoding": "gzip, deflate"} @@ -2279,7 +2420,7 @@ def __init__( sort: bool = None, response_formats: Union[List[str], str] = None, keep_open: bool = None, - streams: List[str] = [], + streams: Optional[T_streams] = [], max_url_length: int = None, char_enc: str = None, decode_error_handler: str = None, diff --git a/th2_data_services/data_source/lwdp/source_api/http.py b/th2_data_services/data_source/lwdp/source_api/http.py index 40419a7..140d33f 100644 --- a/th2_data_services/data_source/lwdp/source_api/http.py +++ b/th2_data_services/data_source/lwdp/source_api/http.py @@ -14,14 +14,15 @@ # LOG import logging from http import HTTPStatus -from typing import List, Generator, Optional, Union, Tuple +from typing import Generator, List, Optional, Tuple, Union +from urllib.parse import quote import requests from requests import Response from urllib3 import PoolManager, exceptions -from urllib.parse import quote from th2_data_services.data_source.lwdp.interfaces.source_api import IHTTPSourceAPI +from th2_data_services.data_source.lwdp.streams import _convert_stream_to_dict_format # LOG logger = logging.getLogger("th2_data_services") @@ -286,18 +287,76 @@ def get_url_search_messages_by_groups( urls = self.__split_requests(url, groups, max_url_length) return [self.__encode_url(url) for url in urls] + def get_download_messages( + self, + start_timestamp: int, + end_timestamp: int, + book_id: str, + groups: List[str], + sort: bool = None, + response_formats: List[str] = None, + stream: List[str] = None, + keep_open: bool = None, + max_url_length=2048, + ) -> List[str]: + """REST-API `download/messages` call downloads messages in specified time range in json format. + + Args: + start_timestamp: Sets the search starting point. Expected in nanoseconds. One of the 'start_timestamp' + or 'resume_from_id' must not absent. + end_timestamp: Sets the timestamp to which the search will be performed, starting with 'start_timestamp'. + Expected in nanoseconds. + book_id: book ID for requested groups. + groups: List of groups to search messages by + sort: Enables message sorting in the request + response_formats: Response format + stream: List of streams (optionally with direction) to include in the response. + keep_open: If true, keeps pulling for new message until don't have one outside the requested range + max_url_length: API request url max length. + + Returns: + URL for downloading messages. + """ + kwargs = { + "startTimestamp": start_timestamp, + "endTimestamp": end_timestamp, + "bookId": book_id, + "sort": sort, + "responseFormat": response_formats, + "keepOpen": keep_open, + "stream": stream, + } + groups = [f"&group={x}" for x in groups] # "&group=".join(groups) # + options = [] + url = f"{self._url}/download/messages?" + + for k, v in kwargs.items(): + if v is None: + continue + if k in ["responseFormat", "stream"]: + for item in v: + options.append(self._option(k, item)) + else: + options.append(self._option(k, v)) + + options_url = "&".join(options) + url = f"{url}{options_url}" + urls = self.__split_requests(url, groups, max_url_length) + return [self.__encode_url(url) for url in urls] + def post_download_messages( self, start_timestamp: int, end_timestamp: int, book_id: str, groups: List[str], - sort: bool = False, response_formats: List[str] = None, streams: List[str] = [], fast_fail: bool = True, + limit: Optional[int] = None, + search_direction: str = "next", ) -> Tuple[str, dict]: - """REST-API `download/messages` call downloads messages in specified time range in json format. + """REST-API `download` call downloads messages in specified time range in json format. Args: start_timestamp: Sets the search starting point. Expected in nanoseconds. One of the 'start_timestamp' @@ -306,28 +365,33 @@ def post_download_messages( Expected in nanoseconds. book_id: book ID for requested groups. groups: List of groups to search messages by - sort: Enables message sorting in the request response_formats: Response format - stream: List of streams (optionally with direction) to include in the response. + streams: List of streams (optionally with direction) to include in the response. fast_fail: If true, stops task execution right after first error. + limit: Limit for messages in the response. No limit if not specified. + search_direction: Defines the order of the messages. Returns: URL for downloading messages and dictionary for request body. """ + kwargs = { "resource": "MESSAGES", "startTimestamp": start_timestamp, "endTimestamp": end_timestamp, "bookID": book_id, - "sort": sort, "responseFormats": response_formats, "streams": streams, "groups": groups, - "fastFail": fast_fail, + "failFast": fast_fail, + "limit": limit, + "searchDirection": search_direction, } url = f"{self._url}/download" - return self.__encode_url(url), kwargs + filtered_kwargs = {k: v for k, v in kwargs.items() if v is not None} + + return self.__encode_url(url), filtered_kwargs def get_download( self, diff --git a/th2_data_services/data_source/lwdp/streams.py b/th2_data_services/data_source/lwdp/streams.py index 3f9b36b..8e9f8aa 100644 --- a/th2_data_services/data_source/lwdp/streams.py +++ b/th2_data_services/data_source/lwdp/streams.py @@ -48,6 +48,16 @@ def url(self) -> str: return f"&stream={self._alias}" return f"&stream={self._alias}:{self._direction}" + def convert_to_dict_format(self) -> dict: + direction_mapper = { + "1": ["IN"], + "2": ["OUT"], + } + result = {"sessionAlias": self._alias} + if self._direction: + result["direction"] = direction_mapper[self._direction] + return result + class Streams: """General interface for composite streams of lwdp ds. @@ -92,3 +102,29 @@ def url(self) -> str: if self._direction is None: return "&".join([f"stream={alias}" for alias in self._aliases]) return "&".join([f"stream={stream}:{self._direction}" for stream in self._aliases]) + + def convert_to_dict_format(self) -> List[dict]: + return [ + Stream(stream, self._direction).convert_to_dict_format() for stream in self._aliases + ] + + +def _convert_stream_to_dict_format(streams): + def map_func(s): + if isinstance(s, Stream): + return [s.convert_to_dict_format()] + + if isinstance(s, Streams): + return s.convert_to_dict_format() + + if isinstance(s, str): + if ":" in s: + stream_obj = Stream(*s.split(":")) + else: + stream_obj = Stream(s) + return [stream_obj.convert_to_dict_format()] + return [s] + + if isinstance(streams, List): + return [item for sublist in map(map_func, streams) for item in sublist] + return map_func(streams)