Skip to content

Commit

Permalink
Th2 4975 - add streams option 2.1.0.0 (#72)
Browse files Browse the repository at this point in the history
* Added streams option

* Formatting fix

* Changed streams to stream in get_download_messages
  • Loading branch information
Turikotura authored Aug 17, 2023
1 parent a6bcacf commit 2611db4
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 6 deletions.
10 changes: 7 additions & 3 deletions release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,13 @@

## Improvements

1. [TH2-4922] GetMessageAliases command now takes optional start_timestamp and end_timestamp arguments and returns TH2 Data object instead of a list.
2. [TH2-4923] GetMessageGroups command now takes optional start_timestamp and end_timestamp arguments and returns TH2 Data object instead of a list.

1. [TH2-4922] GetMessageAliases command now takes optional start_timestamp and end_timestamp arguements and returns TH2 Data object instead of a list.
2. [TH2-4923] GetMessageGroups command now takes optional start_timestamp and end_timestamp arguements and returns TH2 Data object instead of a list.
3. [TH2-4924] Added GetMessagesByPages command.
4. [TH2-4926] Added GetEventsByPages command.
5. [TH2-4952] Added DownloadMessagesByPageByGroups, DownloadMessagesByPage and DownloadMessagesByBookByGroups commands.
6. [Th2-4975] Added streams parameter to download and messages by groups sse commands.

## BugFixes

1. [TH2-4925] Fix BrokenEvent and BrokenMessage.
25 changes: 25 additions & 0 deletions th2_data_services/data_source/lwdp/commands/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,7 @@ def __init__(
sort: bool = None,
response_formats: Union[List[str], str] = None,
keep_open: bool = None,
streams: List[str] = None,
# Non-data source args.
max_url_length: int = 2048,
):
Expand All @@ -1099,6 +1100,7 @@ def __init__(
sort: Enables message sorting within a group. It is not sorted between groups.
response_formats: The format of the response
keep_open: If true, keeps pulling for new message until don't have one outside the requested range.
streams: List of streams to search messages from.
max_url_length: API request url max length.
"""
response_formats = _get_response_format(response_formats)
Expand All @@ -1109,7 +1111,9 @@ def __init__(
self._sort = sort
self._response_formats = response_formats
self._keep_open = keep_open
self._streams = streams
self._max_url_length = max_url_length
_check_list_or_tuple(self._streams, var_name="streams")

def handle(self, data_source: HTTPDataSource):
page = _get_page_object(self._book_id, self._page, data_source)
Expand All @@ -1133,6 +1137,7 @@ def handle(self, data_source: HTTPDataSource):
filename=self._filename,
page=page,
groups=groups,
streams=self._streams,
book_id=self._book_id,
sort=self._sort,
response_formats=self._response_formats,
Expand Down Expand Up @@ -1163,6 +1168,7 @@ def __init__(
sort: bool = None,
response_formats: Union[List[str], str] = None,
keep_open: bool = None,
streams: List[str] = None,
# Non-data source args.
max_url_length: int = 2048,
):
Expand All @@ -1176,6 +1182,7 @@ def __init__(
sort: Enables message sorting within a group. It is not sorted between groups.
response_formats: The format of the response
keep_open: If true, keeps pulling for new message until don't have one outside the requested range.
streams: List of streams to search messages from.
max_url_length: API request url max length.
"""
response_formats = _get_response_format(response_formats)
Expand All @@ -1184,12 +1191,14 @@ def __init__(
self._page = page
self._book_id = book_id
self._groups = groups
self._streams = streams
self._sort = sort
self._response_formats = response_formats
self._keep_open = keep_open
self._max_url_length = max_url_length

_check_list_or_tuple(self._groups, var_name="groups")
_check_list_or_tuple(self._streams, var_name="streams")

def handle(self, data_source: HTTPDataSource):
page = _get_page_object(self._book_id, self._page, data_source)
Expand All @@ -1206,6 +1215,7 @@ def handle(self, data_source: HTTPDataSource):
end_timestamp=self._end_timestamp,
book_id=self._book_id,
groups=self._groups,
streams=self._streams,
sort=self._sort,
response_formats=self._response_formats,
keep_open=self._keep_open,
Expand Down Expand Up @@ -1246,6 +1256,7 @@ def __init__(
sort: bool = None,
response_formats: Union[List[str], str] = None,
keep_open: bool = None,
streams: List[str] = None,
# Non-data source args.
max_url_length: int = 2048,
):
Expand All @@ -1263,6 +1274,7 @@ def __init__(
It's possible to add it to the CradleAPI by request to dev team.)
response_formats: The format of the response
keep_open: If true, keeps pulling for new message until don't have one outside the requested range.
streams: List of streams to search messages from.
max_url_length: API request url max length.
"""
response_formats = _get_response_format(response_formats)
Expand All @@ -1273,13 +1285,15 @@ def __init__(
self._start_timestamp = DatetimeConverter.to_nanoseconds(start_timestamp)
self._end_timestamp = DatetimeConverter.to_nanoseconds(end_timestamp)
self._groups = groups
self._streams = streams
self._sort = sort
self._response_formats = response_formats
self._keep_open = keep_open
self._book_id = book_id
self._max_url_length = max_url_length

_check_list_or_tuple(self._groups, var_name="groups")
_check_list_or_tuple(self._streams, var_name="streams")

def handle(self, data_source: HTTPDataSource):
api = data_source.source_api
Expand All @@ -1288,6 +1302,7 @@ def handle(self, data_source: HTTPDataSource):
end_timestamp=self._end_timestamp,
book_id=self._book_id,
groups=self._groups,
sterams=self._streams,
sort=self._sort,
response_formats=self._response_formats,
keep_open=self._keep_open,
Expand Down Expand Up @@ -1323,6 +1338,7 @@ def __init__(
sort: bool = None,
response_formats: Union[List[str], str] = None,
keep_open: bool = None,
streams: List[str] = None,
# Non-data source args.
max_url_length: int = 2048,
char_enc: str = "utf-8",
Expand All @@ -1343,6 +1359,7 @@ def __init__(
It's possible to add it to the CradleAPI by request to dev team.)
response_formats: The format of the response
keep_open: If true, keeps pulling for new message until don't have one outside the requested range.
streams: List of streams to search messages from.
char_enc: Encoding for the byte stream.
decode_error_handler: Registered decode error handler.
cache: If True, all requested data from lw-data-provider will be saved to cache.
Expand All @@ -1366,20 +1383,23 @@ def __init__(
self._start_timestamp = DatetimeConverter.to_nanoseconds(start_timestamp)
self._end_timestamp = DatetimeConverter.to_nanoseconds(end_timestamp)
self._groups = groups
self._streams = streams
self._sort = sort
self._response_formats = response_formats
self._keep_open = keep_open
self._book_id = book_id
self._max_url_length = max_url_length

_check_list_or_tuple(self._groups, var_name="groups")
_check_list_or_tuple(self._streams, var_name="streams")

def _get_urls(self, data_source: HTTPDataSource):
api = data_source.source_api
return api.get_url_search_messages_by_groups(
start_timestamp=self._start_timestamp,
end_timestamp=self._end_timestamp,
groups=self._groups,
streams=self._streams,
response_formats=self._response_formats,
keep_open=self._keep_open,
sort=self._sort,
Expand Down Expand Up @@ -1571,6 +1591,7 @@ def __init__(
sort: bool = None,
response_formats: Union[List[str], str] = None,
keep_open: bool = None,
streams: List[str] = None,
# Non-data source args.
max_url_length: int = 2048,
char_enc: str = "utf-8",
Expand All @@ -1587,6 +1608,7 @@ def __init__(
sort: Enables message sorting within a group. It is not sorted between groups.
response_formats: The format of the response
keep_open: If true, keeps pulling for new message until don't have one outside the requested range.
streams: List of streams to search messages from.
char_enc: Encoding for the byte stream.
decode_error_handler: Registered decode error handler.
cache: If True, all requested data from lw-data-provider will be saved to cache.
Expand All @@ -1608,12 +1630,14 @@ def __init__(
self._page = page
self._book_id = book_id
self._groups = groups
self._streams = streams
self._sort = sort
self._response_formats = response_formats
self._keep_open = keep_open
self._max_url_length = max_url_length

_check_list_or_tuple(self._groups, var_name="groups")
_check_list_or_tuple(self._streams, var_name="streams")

def _get_urls(self, data_source: HTTPDataSource):
page = _get_page_object(self._book_id, self._page, data_source)
Expand All @@ -1629,6 +1653,7 @@ def _get_urls(self, data_source: HTTPDataSource):
start_timestamp=self._start_timestamp,
end_timestamp=self._end_timestamp,
groups=self._groups,
streams=self._streams,
response_formats=self._response_formats,
keep_open=self._keep_open,
sort=self._sort,
Expand Down
12 changes: 9 additions & 3 deletions th2_data_services/data_source/lwdp/source_api/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ def get_url_search_messages_by_groups(
end_timestamp: int,
book_id: str,
groups: List[str],
streams: List[str] = None,
sort: bool = None,
response_formats: List[str] = None,
keep_open: bool = None,
Expand All @@ -248,6 +249,7 @@ def get_url_search_messages_by_groups(
Expected in nanoseconds.
book_id: book ID for requested groups.
groups: List of groups to search messages by
streams: List of streams to search messages by.
sort: Enables message sorting in the request
response_formats: Response format
keep_open: If true, keeps pulling for new message until don't have one outside the requested range
Expand All @@ -263,6 +265,7 @@ def get_url_search_messages_by_groups(
"sort": sort,
"responseFormat": response_formats,
"keepOpen": keep_open,
"stream": streams,
}
groups = [f"&group={x}" for x in groups] # "&group=".join(groups) #
options = []
Expand All @@ -271,7 +274,7 @@ def get_url_search_messages_by_groups(
for k, v in kwargs.items():
if v is None:
continue
if k in ["responseFormat"]:
if k in ["responseFormat", "stream"]:
for item in v:
options.append(self._option(k, item))
else:
Expand All @@ -288,6 +291,7 @@ def get_download_messages(
end_timestamp: int,
book_id: str,
groups: List[str],
stream: List[str],
sort: bool = None,
response_formats: List[str] = None,
keep_open: bool = None,
Expand All @@ -302,6 +306,7 @@ def get_download_messages(
Expected in nanoseconds.
book_id: book ID for requested groups.
groups: List of groups to search messages by
stream: List of streams to search messages by.
sort: Enables message sorting in the request
response_formats: Response format
keep_open: If true, keeps pulling for new message until don't have one outside the requested range
Expand All @@ -317,6 +322,7 @@ def get_download_messages(
"sort": sort,
"responseFormat": response_formats,
"keepOpen": keep_open,
"stream": stream,
}
groups = [f"&group={x}" for x in groups] # "&group=".join(groups) #
options = []
Expand All @@ -325,7 +331,7 @@ def get_download_messages(
for k, v in kwargs.items():
if v is None:
continue
if k in ["responseFormat"]:
if k in ["responseFormat", "stream"]:
for item in v:
options.append(self._option(k, item))
else:
Expand Down Expand Up @@ -375,7 +381,7 @@ def execute_request(self, url: str, headers: dict = None, stream=False) -> Respo
return requests.get(url, headers=headers, stream=stream)

def __split_requests(self, fixed_url: str, optional: List[str], max_url_len: int):
if len(fixed_url) >= max_url_len:
if len(fixed_url + max(optional, key=len)) >= max_url_len:
raise Exception(
f"Fixed url part ({len(fixed_url)}) >= than max url len ({max_url_len})"
)
Expand Down

0 comments on commit 2611db4

Please sign in to comment.