diff --git a/morpheus/cli/register_stage.py b/morpheus/cli/register_stage.py index d0c5a78925..f1b47e67a9 100644 --- a/morpheus/cli/register_stage.py +++ b/morpheus/cli/register_stage.py @@ -181,12 +181,7 @@ def set_options_param_type(options_kwargs: dict, annotation, doc_type: str): if (is_union_type(annotation)): raise RuntimeError("Union types are not supported for auto registering stages.") - if (issubtype(annotation, typing.List)): - # For variable length array, use multiple=True - options_kwargs["multiple"] = True - options_kwargs["type"] = get_args(annotation)[0] - - elif (issubtype(annotation, pathlib.Path)): + if (issubtype(annotation, pathlib.Path)): # For paths, use the Path option and apply any kwargs options_kwargs["type"] = partial_pop_kwargs(click.Path, doc_type_kwargs)() @@ -216,6 +211,13 @@ def set_options_param_type(options_kwargs: dict, annotation, doc_type: str): options_kwargs["type"] = click.Tuple([str, str]) options_kwargs["callback"] = lambda ctx, param, value: dict(value) + elif (issubtype(annotation, typing.List) or issubtype(annotation, typing.Tuple) + or issubtype(annotation, typing.Iterable)) and not (issubtype(annotation, str)): + + # For variable length array, use multiple=True + options_kwargs["multiple"] = True + options_kwargs["type"] = get_args(annotation)[0] + else: options_kwargs["type"] = annotation diff --git a/morpheus/stages/input/http_client_source_stage.py b/morpheus/stages/input/http_client_source_stage.py index 5b6ab45252..4a101e0992 100644 --- a/morpheus/stages/input/http_client_source_stage.py +++ b/morpheus/stages/input/http_client_source_stage.py @@ -68,8 +68,8 @@ class HttpClientSourceStage(PreallocatorMixin, SingleOutputSource): Number of seconds to wait for the server to send data before giving up and raising an exception. max_errors : int, default 10 Maximum number of consequtive errors to receive before raising an error. - accept_status_codes : typing.List[HTTPStatus], optional, multiple = True - List of status codes to accept. If the response status code is not in this tuple, then the request will be + accept_status_codes : typing.Iterable[int], optional, multiple = True + List of status codes to accept. If the response status code is not in this collection, then the request will be considered an error max_retries : int, default 10 Maximum number of times to retry the request fails, receives a redirect or returns a status in the @@ -80,6 +80,9 @@ class HttpClientSourceStage(PreallocatorMixin, SingleOutputSource): to contain a JSON objects separated by end-of-line characters. stop_after : int, default 0 Stops ingesting after emitting `stop_after` records (rows in the dataframe). Useful for testing. Disabled if `0` + payload_to_df_fn : callable, default None + A callable that takes the HTTP payload bytes as the first argument and the `lines` parameter is passed in as + the second argument and returns a cudf.DataFrame. If unset cudf.read_json is used. **request_kwargs : dict Additional arguments to pass to the `requests.request` function. """ @@ -94,10 +97,11 @@ def __init__(self, error_sleep_time: float = 0.1, respect_retry_after_header: bool = True, request_timeout_secs: int = 30, - accept_status_codes: typing.List[HTTPStatus] = (HTTPStatus.OK, ), + accept_status_codes: typing.Iterable[int] = (HTTPStatus.OK, ), max_retries: int = 10, lines: bool = False, stop_after: int = 0, + payload_to_df_fn: typing.Callable[[bytes, bool], cudf.DataFrame] = None, **request_kwargs): super().__init__(config) self._url = http_utils.prepare_url(url) @@ -135,6 +139,7 @@ def __init__(self, self._stop_after = stop_after self._lines = lines + self._payload_to_df_fn = payload_to_df_fn self._requst_kwargs = request_kwargs @property @@ -154,10 +159,11 @@ def _parse_response(self, response: requests.Response) -> typing.Union[cudf.Data Returns a DataFrame parsed from the response payload. If the response payload is empty, then `None` is returned. """ payload = response.content - if len(payload) > 2: # work-around for https://github.com/rapidsai/cudf/issues/5712 - return cudf.read_json(payload, lines=self._lines, engine='cudf') - return None + if self._payload_to_df_fn is not None: + return self._payload_to_df_fn(payload, self._lines) + + return cudf.read_json(payload, lines=self._lines, engine='cudf') def _generate_frames(self) -> typing.Iterator[MessageMeta]: # Running counter of the number of messages emitted by this source diff --git a/morpheus/stages/input/http_server_source_stage.py b/morpheus/stages/input/http_server_source_stage.py index 70e47eb873..2a555be464 100644 --- a/morpheus/stages/input/http_server_source_stage.py +++ b/morpheus/stages/input/http_server_source_stage.py @@ -77,6 +77,10 @@ class HttpServerSourceStage(PreallocatorMixin, SingleOutputSource): expect each request to be a JSON object per line. stop_after : int, default 0 Stops ingesting after emitting `stop_after` records (rows in the dataframe). Useful for testing. Disabled if `0` + payload_to_df_fn : callable, default None + A callable that takes the HTTP payload string as the first argument and the `lines` parameter is passed in as + the second argument and returns a cudf.DataFrame. When supplied, the C++ implementation of this stage is + disabled, and the Python impl is used. """ def __init__(self, @@ -93,7 +97,8 @@ def __init__(self, max_payload_size: int = 10, request_timeout_secs: int = 30, lines: bool = False, - stop_after: int = 0): + stop_after: int = 0, + payload_to_df_fn: typing.Callable[[str, bool], cudf.DataFrame] = None): super().__init__(config) self._bind_address = bind_address self._port = port @@ -108,6 +113,7 @@ def __init__(self, self._request_timeout_secs = request_timeout_secs self._lines = lines self._stop_after = stop_after + self._payload_to_df_fn = payload_to_df_fn # These are only used when C++ mode is disabled self._queue = None @@ -134,8 +140,12 @@ def compute_schema(self, schema: StageSchema): def _parse_payload(self, payload: str) -> HttpParseResponse: try: - # engine='cudf' is needed when lines=False to avoid using pandas - df = cudf.read_json(payload, lines=self._lines, engine='cudf') + if self._payload_to_df_fn is not None: + df = self._payload_to_df_fn(payload, self._lines) + else: + # engine='cudf' is needed when lines=False to avoid using pandas + df = cudf.read_json(payload, lines=self._lines, engine='cudf') + except Exception as e: err_msg = "Error occurred converting HTTP payload to Dataframe" logger.error("%s: %s", err_msg, e) @@ -206,7 +216,7 @@ def _generate_frames(self) -> typing.Iterator[MessageMeta]: self._processing = False def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject: - if self._build_cpp_node(): + if self._build_cpp_node() and self._payload_to_df_fn is None: import morpheus._lib.stages as _stages node = _stages.HttpServerSourceStage(builder, self.unique_name, diff --git a/morpheus/stages/output/http_client_sink_stage.py b/morpheus/stages/output/http_client_sink_stage.py index d3cd8dbba3..a9cb872b4c 100644 --- a/morpheus/stages/output/http_client_sink_stage.py +++ b/morpheus/stages/output/http_client_sink_stage.py @@ -96,7 +96,7 @@ class HttpClientSinkStage(PassThruTypeMixin, SinglePortStage): perform an exponential backoff starting at `error_sleep_time`. request_timeout_secs : int, optional Number of seconds to wait for the server to send data before giving up and raising an exception. - accept_status_codes : typing.List[HTTPStatus], optional, multiple = True + accept_status_codes : typing.Iterable[int], optional, multiple = True List of acceptable status codes, by default (200, 201, 202). max_retries : int, default 10 Maximum number of times to retry the request fails, receives a redirect or returns a status in the @@ -140,7 +140,7 @@ def __init__(self, error_sleep_time: float = 0.1, respect_retry_after_header: bool = True, request_timeout_secs: int = 30, - accept_status_codes: typing.List[HTTPStatus] = ( + accept_status_codes: typing.Iterable[int] = ( HTTPStatus.OK, HTTPStatus.CREATED, HTTPStatus.ACCEPTED, diff --git a/tests/mock_rest_server/mocks/api/v1/invalid/GET.mock b/tests/mock_rest_server/mocks/api/v1/invalid/GET.mock new file mode 100644 index 0000000000..75d4bed110 --- /dev/null +++ b/tests/mock_rest_server/mocks/api/v1/invalid/GET.mock @@ -0,0 +1,4 @@ +HTTP/1.1 200 OK +Content-Type: application/json + +{"not_valid":"json diff --git a/tests/stages/test_http_client_source_stage_pipe.py b/tests/stages/test_http_client_source_stage_pipe.py new file mode 100644 index 0000000000..26bc1f911e --- /dev/null +++ b/tests/stages/test_http_client_source_stage_pipe.py @@ -0,0 +1,112 @@ +# Copyright (c) 2023-2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from unittest import mock + +import pytest + +from _utils import TEST_DIRS +from _utils import assert_results +from _utils.dataset_manager import DatasetManager +from morpheus.config import Config +from morpheus.pipeline import LinearPipeline +from morpheus.stages.input.http_client_source_stage import HttpClientSourceStage +from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage + + +@pytest.mark.slow +@pytest.mark.use_cudf +@pytest.mark.parametrize("lines", [False, True], ids=["json", "lines"]) +@pytest.mark.parametrize("use_payload_to_df_fn", [False, True], ids=["no_payload_to_df_fn", "payload_to_df_fn"]) +def test_http_client_source_stage_pipe(config: Config, + dataset: DatasetManager, + mock_rest_server: str, + lines: bool, + use_payload_to_df_fn: bool): + """ + Test the HttpClientSourceStage against a mock REST server which will return JSON data which can be deserialized + into a DataFrame. + """ + source_df = dataset['filter_probs.csv'] + + if lines: + endpoint = "data-lines" + else: + endpoint = "data" + + if use_payload_to_df_fn: + if lines: + payload_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.jsonlines") + else: + payload_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.json") + + with open(payload_file, "rb") as f: + expected_payload = f.read() + + def payload_to_df_fn(payload, lines_arg): + assert payload == expected_payload + assert lines_arg == lines + return source_df[['v2', 'v3']].copy(deep=True) + + expected_df = source_df[['v2', 'v3']].copy(deep=True) + + else: + payload_to_df_fn = None + expected_payload = None + expected_df = source_df.copy(deep=True) + + url = f"{mock_rest_server}/api/v1/{endpoint}" + + num_records = len(expected_df) + + pipe = LinearPipeline(config) + pipe.set_source( + HttpClientSourceStage(config=config, + url=url, + max_retries=1, + lines=lines, + stop_after=num_records, + payload_to_df_fn=payload_to_df_fn)) + comp_stage = pipe.add_stage(CompareDataFrameStage(config, expected_df)) + pipe.run() + + assert_results(comp_stage.get_results()) + + +@pytest.mark.slow +@pytest.mark.use_cudf +@pytest.mark.parametrize( + "lines", + [False, pytest.param(True, marks=pytest.mark.skip(reason="https://github.com/rapidsai/cudf/issues/15820"))], + ids=["json", "lines"]) +@pytest.mark.parametrize("use_payload_to_df_fn", [False, True], ids=["no_payload_to_df_fn", "payload_to_df_fn"]) +def test_parse_errors(config: Config, mock_rest_server: str, lines: bool, use_payload_to_df_fn: bool): + url = f"{mock_rest_server}/api/v1/invalid" + + if use_payload_to_df_fn: + payload_to_df_fn = mock.MagicMock(side_effect=ValueError("Invalid payload")) + else: + payload_to_df_fn = None + + pipe = LinearPipeline(config) + pipe.set_source( + HttpClientSourceStage(config=config, url=url, max_retries=1, lines=lines, payload_to_df_fn=payload_to_df_fn)) + + # cudf raises a RuntimeError when it should be raising a ValueError also a part of #15820 + with pytest.raises(Exception): + pipe.run() + + if use_payload_to_df_fn: + payload_to_df_fn.assert_called_once() diff --git a/tests/test_http_server_source_stage.py b/tests/stages/test_http_server_source_stage.py similarity index 54% rename from tests/test_http_server_source_stage.py rename to tests/stages/test_http_server_source_stage.py index 62b8e3f0bc..0e7a034f68 100644 --- a/tests/test_http_server_source_stage.py +++ b/tests/stages/test_http_server_source_stage.py @@ -18,6 +18,7 @@ import typing from http import HTTPStatus from io import StringIO +from unittest import mock import pytest import requests @@ -56,8 +57,9 @@ def join(self, timeout=None): @pytest.mark.slow @pytest.mark.use_python -@pytest.mark.parametrize("lines", [False, True]) -def test_generate_frames(config: Config, dataset_pandas: DatasetManager, lines: bool): +@pytest.mark.parametrize("lines", [False, True], ids=["json", "lines"]) +@pytest.mark.parametrize("use_payload_to_df_fn", [False, True], ids=["no_payload_to_df_fn", "payload_to_df_fn"]) +def test_generate_frames(config: Config, dataset_pandas: DatasetManager, lines: bool, use_payload_to_df_fn: bool): # The _generate_frames() method is only used when C++ mode is disabled endpoint = '/test' port = 8088 @@ -65,12 +67,19 @@ def test_generate_frames(config: Config, dataset_pandas: DatasetManager, lines: accept_status = HTTPStatus.OK url = make_url(port, endpoint) + df = dataset_pandas['filter_probs.csv'] + if lines: content_type = MimeTypes.TEXT.value else: content_type = MimeTypes.JSON.value - df = dataset_pandas['filter_probs.csv'] + if use_payload_to_df_fn: + mock_results = df[['v2', 'v3']].copy(deep=True) + payload_to_df_fn = mock.MagicMock(return_value=mock_results) + else: + payload_to_df_fn = None + buf = df_to_stream_json(df, StringIO(), lines=lines) buf.seek(0) @@ -81,7 +90,8 @@ def test_generate_frames(config: Config, dataset_pandas: DatasetManager, lines: endpoint=endpoint, method=method, accept_status=accept_status, - lines=lines) + lines=lines, + payload_to_df_fn=payload_to_df_fn) generate_frames = stage._generate_frames() msg_queue = queue.SimpleQueue() @@ -111,7 +121,13 @@ def test_generate_frames(config: Config, dataset_pandas: DatasetManager, lines: assert response.headers["Content-Type"] == MimeTypes.TEXT.value assert response.text == "" - dataset_pandas.assert_compare_df(df, result_msg.df) + if use_payload_to_df_fn: + payload_to_df_fn.assert_called_once_with(payload, lines) + expected_df = df[['v2', 'v3']] + else: + expected_df = df + + dataset_pandas.assert_compare_df(expected_df, result_msg.df) @pytest.mark.parametrize("invalid_method", [HTTPMethod.GET, HTTPMethod.PATCH]) @@ -124,3 +140,77 @@ def test_constructor_invalid_method(config: Config, invalid_method: HTTPMethod): def test_constructor_invalid_accept_status(config: Config, invalid_accept_status: HTTPStatus): with pytest.raises(ValueError): HttpServerSourceStage(config=config, accept_status=invalid_accept_status) + + +@pytest.mark.slow +@pytest.mark.use_python +@pytest.mark.parametrize( + "lines", + [False, pytest.param(True, marks=pytest.mark.skip(reason="https://github.com/rapidsai/cudf/issues/15820"))], + ids=["json", "lines"]) +@pytest.mark.parametrize("use_payload_to_df_fn", [False, True], ids=["no_payload_to_df_fn", "payload_to_df_fn"]) +def test_parse_errors(config: Config, lines: bool, use_payload_to_df_fn: bool): + expected_status = HTTPStatus.BAD_REQUEST + + endpoint = '/test' + port = 8088 + method = HTTPMethod.POST + accept_status = HTTPStatus.OK + url = make_url(port, endpoint) + + if lines: + content_type = MimeTypes.TEXT.value + else: + content_type = MimeTypes.JSON.value + + if use_payload_to_df_fn: + payload_to_df_fn = mock.MagicMock(side_effect=ValueError("Invalid payload")) + else: + payload_to_df_fn = None + + payload = '{"not_valid":"json' + + stage = HttpServerSourceStage(config=config, + port=port, + endpoint=endpoint, + method=method, + accept_status=accept_status, + lines=lines, + payload_to_df_fn=payload_to_df_fn) + + generate_frames = stage._generate_frames() + msg_queue = queue.SimpleQueue() + + get_next_thread = GetNext(msg_queue, generate_frames) + get_next_thread.start() + + attempt = 0 + while not stage._processing and get_next_thread.is_alive() and attempt < 2: + time.sleep(0.1) + attempt += 1 + + assert stage._processing + assert get_next_thread.is_alive() + + response = requests.request(method=method.value, + url=url, + data=payload, + timeout=5.0, + allow_redirects=False, + headers={"Content-Type": content_type}) + + assert msg_queue.empty() + assert get_next_thread.is_alive() + + assert response.status_code == expected_status.value + assert response.headers["Content-Type"] == MimeTypes.TEXT.value + assert "error" in response.text.lower() # just verify that we got some sort of error message + + if use_payload_to_df_fn: + payload_to_df_fn.assert_called_once_with(payload, lines) + + # get_next_thread will block until it processes a valid message or the queue is closed + stage._queue.close() + + with pytest.raises(StopIteration): + get_next_thread.join() diff --git a/tests/test_http_server_source_stage_pipe.py b/tests/stages/test_http_server_source_stage_pipe.py similarity index 100% rename from tests/test_http_server_source_stage_pipe.py rename to tests/stages/test_http_server_source_stage_pipe.py diff --git a/tests/test_http_client_source_stage_pipe.py b/tests/test_http_client_source_stage_pipe.py deleted file mode 100644 index 08e8d22d4b..0000000000 --- a/tests/test_http_client_source_stage_pipe.py +++ /dev/null @@ -1,48 +0,0 @@ -# Copyright (c) 2023-2024, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import pytest - -from _utils import assert_results -from _utils.dataset_manager import DatasetManager -from morpheus.config import Config -from morpheus.pipeline import LinearPipeline -from morpheus.stages.input.http_client_source_stage import HttpClientSourceStage -from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage - - -@pytest.mark.slow -@pytest.mark.use_cudf -@pytest.mark.parametrize("lines", [False, True]) -def test_http_client_source_stage_pipe(config: Config, dataset: DatasetManager, mock_rest_server: str, lines: bool): - """ - Test the HttpClientSourceStage against a mock REST server which will return JSON data which can be deserialized - into a DataFrame. - """ - if lines: - endpoint = "data-lines" - else: - endpoint = "data" - - url = f"{mock_rest_server}/api/v1/{endpoint}" - - expected_df = dataset['filter_probs.csv'] - num_records = len(expected_df) - - pipe = LinearPipeline(config) - pipe.set_source(HttpClientSourceStage(config=config, url=url, max_retries=1, lines=lines, stop_after=num_records)) - comp_stage = pipe.add_stage(CompareDataFrameStage(config, expected_df)) - pipe.run() - - assert_results(comp_stage.get_results())