From 2611db42c82b8a4d5bbc9141365649fa77fc409f Mon Sep 17 00:00:00 2001 From: Turikotura <68775241+Turikotura@users.noreply.github.com> Date: Thu, 17 Aug 2023 14:51:03 +0400 Subject: [PATCH] Th2 4975 - add streams option 2.1.0.0 (#72) * Added streams option * Formatting fix * Changed streams to stream in get_download_messages --- release_notes.md | 10 +++++--- .../data_source/lwdp/commands/http.py | 25 +++++++++++++++++++ .../data_source/lwdp/source_api/http.py | 12 ++++++--- 3 files changed, 41 insertions(+), 6 deletions(-) diff --git a/release_notes.md b/release_notes.md index 89f1020..e5d8d87 100644 --- a/release_notes.md +++ b/release_notes.md @@ -94,9 +94,13 @@ ## Improvements -1. [TH2-4922] GetMessageAliases command now takes optional start_timestamp and end_timestamp arguments and returns TH2 Data object instead of a list. -2. [TH2-4923] GetMessageGroups command now takes optional start_timestamp and end_timestamp arguments and returns TH2 Data object instead of a list. - +1. [TH2-4922] GetMessageAliases command now takes optional start_timestamp and end_timestamp arguements and returns TH2 Data object instead of a list. +2. [TH2-4923] GetMessageGroups command now takes optional start_timestamp and end_timestamp arguements and returns TH2 Data object instead of a list. +3. [TH2-4924] Added GetMessagesByPages command. +4. [TH2-4926] Added GetEventsByPages command. +5. [TH2-4952] Added DownloadMessagesByPageByGroups, DownloadMessagesByPage and DownloadMessagesByBookByGroups commands. +6. [Th2-4975] Added streams parameter to download and messages by groups sse commands. + ## BugFixes 1. [TH2-4925] Fix BrokenEvent and BrokenMessage. \ No newline at end of file diff --git a/th2_data_services/data_source/lwdp/commands/http.py b/th2_data_services/data_source/lwdp/commands/http.py index 5ecf530..eb34737 100644 --- a/th2_data_services/data_source/lwdp/commands/http.py +++ b/th2_data_services/data_source/lwdp/commands/http.py @@ -1087,6 +1087,7 @@ def __init__( sort: bool = None, response_formats: Union[List[str], str] = None, keep_open: bool = None, + streams: List[str] = None, # Non-data source args. max_url_length: int = 2048, ): @@ -1099,6 +1100,7 @@ def __init__( sort: Enables message sorting within a group. It is not sorted between groups. response_formats: The format of the response keep_open: If true, keeps pulling for new message until don't have one outside the requested range. + streams: List of streams to search messages from. max_url_length: API request url max length. """ response_formats = _get_response_format(response_formats) @@ -1109,7 +1111,9 @@ def __init__( self._sort = sort self._response_formats = response_formats self._keep_open = keep_open + self._streams = streams self._max_url_length = max_url_length + _check_list_or_tuple(self._streams, var_name="streams") def handle(self, data_source: HTTPDataSource): page = _get_page_object(self._book_id, self._page, data_source) @@ -1133,6 +1137,7 @@ def handle(self, data_source: HTTPDataSource): filename=self._filename, page=page, groups=groups, + streams=self._streams, book_id=self._book_id, sort=self._sort, response_formats=self._response_formats, @@ -1163,6 +1168,7 @@ def __init__( sort: bool = None, response_formats: Union[List[str], str] = None, keep_open: bool = None, + streams: List[str] = None, # Non-data source args. max_url_length: int = 2048, ): @@ -1176,6 +1182,7 @@ def __init__( sort: Enables message sorting within a group. It is not sorted between groups. response_formats: The format of the response keep_open: If true, keeps pulling for new message until don't have one outside the requested range. + streams: List of streams to search messages from. max_url_length: API request url max length. """ response_formats = _get_response_format(response_formats) @@ -1184,12 +1191,14 @@ def __init__( self._page = page self._book_id = book_id self._groups = groups + self._streams = streams self._sort = sort self._response_formats = response_formats self._keep_open = keep_open self._max_url_length = max_url_length _check_list_or_tuple(self._groups, var_name="groups") + _check_list_or_tuple(self._streams, var_name="streams") def handle(self, data_source: HTTPDataSource): page = _get_page_object(self._book_id, self._page, data_source) @@ -1206,6 +1215,7 @@ def handle(self, data_source: HTTPDataSource): end_timestamp=self._end_timestamp, book_id=self._book_id, groups=self._groups, + streams=self._streams, sort=self._sort, response_formats=self._response_formats, keep_open=self._keep_open, @@ -1246,6 +1256,7 @@ def __init__( sort: bool = None, response_formats: Union[List[str], str] = None, keep_open: bool = None, + streams: List[str] = None, # Non-data source args. max_url_length: int = 2048, ): @@ -1263,6 +1274,7 @@ def __init__( It's possible to add it to the CradleAPI by request to dev team.) response_formats: The format of the response keep_open: If true, keeps pulling for new message until don't have one outside the requested range. + streams: List of streams to search messages from. max_url_length: API request url max length. """ response_formats = _get_response_format(response_formats) @@ -1273,6 +1285,7 @@ def __init__( self._start_timestamp = DatetimeConverter.to_nanoseconds(start_timestamp) self._end_timestamp = DatetimeConverter.to_nanoseconds(end_timestamp) self._groups = groups + self._streams = streams self._sort = sort self._response_formats = response_formats self._keep_open = keep_open @@ -1280,6 +1293,7 @@ def __init__( self._max_url_length = max_url_length _check_list_or_tuple(self._groups, var_name="groups") + _check_list_or_tuple(self._streams, var_name="streams") def handle(self, data_source: HTTPDataSource): api = data_source.source_api @@ -1288,6 +1302,7 @@ def handle(self, data_source: HTTPDataSource): end_timestamp=self._end_timestamp, book_id=self._book_id, groups=self._groups, + sterams=self._streams, sort=self._sort, response_formats=self._response_formats, keep_open=self._keep_open, @@ -1323,6 +1338,7 @@ def __init__( sort: bool = None, response_formats: Union[List[str], str] = None, keep_open: bool = None, + streams: List[str] = None, # Non-data source args. max_url_length: int = 2048, char_enc: str = "utf-8", @@ -1343,6 +1359,7 @@ def __init__( It's possible to add it to the CradleAPI by request to dev team.) response_formats: The format of the response keep_open: If true, keeps pulling for new message until don't have one outside the requested range. + streams: List of streams to search messages from. char_enc: Encoding for the byte stream. decode_error_handler: Registered decode error handler. cache: If True, all requested data from lw-data-provider will be saved to cache. @@ -1366,6 +1383,7 @@ def __init__( self._start_timestamp = DatetimeConverter.to_nanoseconds(start_timestamp) self._end_timestamp = DatetimeConverter.to_nanoseconds(end_timestamp) self._groups = groups + self._streams = streams self._sort = sort self._response_formats = response_formats self._keep_open = keep_open @@ -1373,6 +1391,7 @@ def __init__( self._max_url_length = max_url_length _check_list_or_tuple(self._groups, var_name="groups") + _check_list_or_tuple(self._streams, var_name="streams") def _get_urls(self, data_source: HTTPDataSource): api = data_source.source_api @@ -1380,6 +1399,7 @@ def _get_urls(self, data_source: HTTPDataSource): start_timestamp=self._start_timestamp, end_timestamp=self._end_timestamp, groups=self._groups, + streams=self._streams, response_formats=self._response_formats, keep_open=self._keep_open, sort=self._sort, @@ -1571,6 +1591,7 @@ def __init__( sort: bool = None, response_formats: Union[List[str], str] = None, keep_open: bool = None, + streams: List[str] = None, # Non-data source args. max_url_length: int = 2048, char_enc: str = "utf-8", @@ -1587,6 +1608,7 @@ def __init__( sort: Enables message sorting within a group. It is not sorted between groups. response_formats: The format of the response keep_open: If true, keeps pulling for new message until don't have one outside the requested range. + streams: List of streams to search messages from. char_enc: Encoding for the byte stream. decode_error_handler: Registered decode error handler. cache: If True, all requested data from lw-data-provider will be saved to cache. @@ -1608,12 +1630,14 @@ def __init__( self._page = page self._book_id = book_id self._groups = groups + self._streams = streams self._sort = sort self._response_formats = response_formats self._keep_open = keep_open self._max_url_length = max_url_length _check_list_or_tuple(self._groups, var_name="groups") + _check_list_or_tuple(self._streams, var_name="streams") def _get_urls(self, data_source: HTTPDataSource): page = _get_page_object(self._book_id, self._page, data_source) @@ -1629,6 +1653,7 @@ def _get_urls(self, data_source: HTTPDataSource): start_timestamp=self._start_timestamp, end_timestamp=self._end_timestamp, groups=self._groups, + streams=self._streams, response_formats=self._response_formats, keep_open=self._keep_open, sort=self._sort, diff --git a/th2_data_services/data_source/lwdp/source_api/http.py b/th2_data_services/data_source/lwdp/source_api/http.py index 50c274c..ae14867 100644 --- a/th2_data_services/data_source/lwdp/source_api/http.py +++ b/th2_data_services/data_source/lwdp/source_api/http.py @@ -234,6 +234,7 @@ def get_url_search_messages_by_groups( end_timestamp: int, book_id: str, groups: List[str], + streams: List[str] = None, sort: bool = None, response_formats: List[str] = None, keep_open: bool = None, @@ -248,6 +249,7 @@ def get_url_search_messages_by_groups( Expected in nanoseconds. book_id: book ID for requested groups. groups: List of groups to search messages by + streams: List of streams to search messages by. sort: Enables message sorting in the request response_formats: Response format keep_open: If true, keeps pulling for new message until don't have one outside the requested range @@ -263,6 +265,7 @@ def get_url_search_messages_by_groups( "sort": sort, "responseFormat": response_formats, "keepOpen": keep_open, + "stream": streams, } groups = [f"&group={x}" for x in groups] # "&group=".join(groups) # options = [] @@ -271,7 +274,7 @@ def get_url_search_messages_by_groups( for k, v in kwargs.items(): if v is None: continue - if k in ["responseFormat"]: + if k in ["responseFormat", "stream"]: for item in v: options.append(self._option(k, item)) else: @@ -288,6 +291,7 @@ def get_download_messages( end_timestamp: int, book_id: str, groups: List[str], + stream: List[str], sort: bool = None, response_formats: List[str] = None, keep_open: bool = None, @@ -302,6 +306,7 @@ def get_download_messages( Expected in nanoseconds. book_id: book ID for requested groups. groups: List of groups to search messages by + stream: List of streams to search messages by. sort: Enables message sorting in the request response_formats: Response format keep_open: If true, keeps pulling for new message until don't have one outside the requested range @@ -317,6 +322,7 @@ def get_download_messages( "sort": sort, "responseFormat": response_formats, "keepOpen": keep_open, + "stream": stream, } groups = [f"&group={x}" for x in groups] # "&group=".join(groups) # options = [] @@ -325,7 +331,7 @@ def get_download_messages( for k, v in kwargs.items(): if v is None: continue - if k in ["responseFormat"]: + if k in ["responseFormat", "stream"]: for item in v: options.append(self._option(k, item)) else: @@ -375,7 +381,7 @@ def execute_request(self, url: str, headers: dict = None, stream=False) -> Respo return requests.get(url, headers=headers, stream=stream) def __split_requests(self, fixed_url: str, optional: List[str], max_url_len: int): - if len(fixed_url) >= max_url_len: + if len(fixed_url + max(optional, key=len)) >= max_url_len: raise Exception( f"Fixed url part ({len(fixed_url)}) >= than max url len ({max_url_len})" )