Skip to content

Commit

Permalink
Support passing a custom parser to HttpServerSourceStage and `HttpC…
Browse files Browse the repository at this point in the history
…lientSourceStage` stages (#1705)

* Add a new constructor argument to `HttpServerSourceStage` & `HttpClientSourceStage` called `payload_to_df_fn`, allowing users to specify a custom payload parser.
* Remove work-around for rapidsai/cudf#5712 this bug is fixed in our current version of cudf.
* Relocate updated tests to `tests/stages`

Closes #1703

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1705
  • Loading branch information
dagardner-nv authored May 30, 2024
1 parent ca14433 commit 12abdce
Show file tree
Hide file tree
Showing 9 changed files with 247 additions and 71 deletions.
14 changes: 8 additions & 6 deletions morpheus/cli/register_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,7 @@ def set_options_param_type(options_kwargs: dict, annotation, doc_type: str):
if (is_union_type(annotation)):
raise RuntimeError("Union types are not supported for auto registering stages.")

if (issubtype(annotation, typing.List)):
# For variable length array, use multiple=True
options_kwargs["multiple"] = True
options_kwargs["type"] = get_args(annotation)[0]

elif (issubtype(annotation, pathlib.Path)):
if (issubtype(annotation, pathlib.Path)):
# For paths, use the Path option and apply any kwargs
options_kwargs["type"] = partial_pop_kwargs(click.Path, doc_type_kwargs)()

Expand Down Expand Up @@ -216,6 +211,13 @@ def set_options_param_type(options_kwargs: dict, annotation, doc_type: str):
options_kwargs["type"] = click.Tuple([str, str])
options_kwargs["callback"] = lambda ctx, param, value: dict(value)

elif (issubtype(annotation, typing.List) or issubtype(annotation, typing.Tuple)
or issubtype(annotation, typing.Iterable)) and not (issubtype(annotation, str)):

# For variable length array, use multiple=True
options_kwargs["multiple"] = True
options_kwargs["type"] = get_args(annotation)[0]

else:
options_kwargs["type"] = annotation

Expand Down
18 changes: 12 additions & 6 deletions morpheus/stages/input/http_client_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ class HttpClientSourceStage(PreallocatorMixin, SingleOutputSource):
Number of seconds to wait for the server to send data before giving up and raising an exception.
max_errors : int, default 10
Maximum number of consequtive errors to receive before raising an error.
accept_status_codes : typing.List[HTTPStatus], optional, multiple = True
List of status codes to accept. If the response status code is not in this tuple, then the request will be
accept_status_codes : typing.Iterable[int], optional, multiple = True
List of status codes to accept. If the response status code is not in this collection, then the request will be
considered an error
max_retries : int, default 10
Maximum number of times to retry the request fails, receives a redirect or returns a status in the
Expand All @@ -80,6 +80,9 @@ class HttpClientSourceStage(PreallocatorMixin, SingleOutputSource):
to contain a JSON objects separated by end-of-line characters.
stop_after : int, default 0
Stops ingesting after emitting `stop_after` records (rows in the dataframe). Useful for testing. Disabled if `0`
payload_to_df_fn : callable, default None
A callable that takes the HTTP payload bytes as the first argument and the `lines` parameter is passed in as
the second argument and returns a cudf.DataFrame. If unset cudf.read_json is used.
**request_kwargs : dict
Additional arguments to pass to the `requests.request` function.
"""
Expand All @@ -94,10 +97,11 @@ def __init__(self,
error_sleep_time: float = 0.1,
respect_retry_after_header: bool = True,
request_timeout_secs: int = 30,
accept_status_codes: typing.List[HTTPStatus] = (HTTPStatus.OK, ),
accept_status_codes: typing.Iterable[int] = (HTTPStatus.OK, ),
max_retries: int = 10,
lines: bool = False,
stop_after: int = 0,
payload_to_df_fn: typing.Callable[[bytes, bool], cudf.DataFrame] = None,
**request_kwargs):
super().__init__(config)
self._url = http_utils.prepare_url(url)
Expand Down Expand Up @@ -135,6 +139,7 @@ def __init__(self,

self._stop_after = stop_after
self._lines = lines
self._payload_to_df_fn = payload_to_df_fn
self._requst_kwargs = request_kwargs

@property
Expand All @@ -154,10 +159,11 @@ def _parse_response(self, response: requests.Response) -> typing.Union[cudf.Data
Returns a DataFrame parsed from the response payload. If the response payload is empty, then `None` is returned.
"""
payload = response.content
if len(payload) > 2: # work-around for https://github.com/rapidsai/cudf/issues/5712
return cudf.read_json(payload, lines=self._lines, engine='cudf')

return None
if self._payload_to_df_fn is not None:
return self._payload_to_df_fn(payload, self._lines)

return cudf.read_json(payload, lines=self._lines, engine='cudf')

def _generate_frames(self) -> typing.Iterator[MessageMeta]:
# Running counter of the number of messages emitted by this source
Expand Down
18 changes: 14 additions & 4 deletions morpheus/stages/input/http_server_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ class HttpServerSourceStage(PreallocatorMixin, SingleOutputSource):
expect each request to be a JSON object per line.
stop_after : int, default 0
Stops ingesting after emitting `stop_after` records (rows in the dataframe). Useful for testing. Disabled if `0`
payload_to_df_fn : callable, default None
A callable that takes the HTTP payload string as the first argument and the `lines` parameter is passed in as
the second argument and returns a cudf.DataFrame. When supplied, the C++ implementation of this stage is
disabled, and the Python impl is used.
"""

def __init__(self,
Expand All @@ -93,7 +97,8 @@ def __init__(self,
max_payload_size: int = 10,
request_timeout_secs: int = 30,
lines: bool = False,
stop_after: int = 0):
stop_after: int = 0,
payload_to_df_fn: typing.Callable[[str, bool], cudf.DataFrame] = None):
super().__init__(config)
self._bind_address = bind_address
self._port = port
Expand All @@ -108,6 +113,7 @@ def __init__(self,
self._request_timeout_secs = request_timeout_secs
self._lines = lines
self._stop_after = stop_after
self._payload_to_df_fn = payload_to_df_fn

# These are only used when C++ mode is disabled
self._queue = None
Expand All @@ -134,8 +140,12 @@ def compute_schema(self, schema: StageSchema):

def _parse_payload(self, payload: str) -> HttpParseResponse:
try:
# engine='cudf' is needed when lines=False to avoid using pandas
df = cudf.read_json(payload, lines=self._lines, engine='cudf')
if self._payload_to_df_fn is not None:
df = self._payload_to_df_fn(payload, self._lines)
else:
# engine='cudf' is needed when lines=False to avoid using pandas
df = cudf.read_json(payload, lines=self._lines, engine='cudf')

except Exception as e:
err_msg = "Error occurred converting HTTP payload to Dataframe"
logger.error("%s: %s", err_msg, e)
Expand Down Expand Up @@ -206,7 +216,7 @@ def _generate_frames(self) -> typing.Iterator[MessageMeta]:
self._processing = False

def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject:
if self._build_cpp_node():
if self._build_cpp_node() and self._payload_to_df_fn is None:
import morpheus._lib.stages as _stages
node = _stages.HttpServerSourceStage(builder,
self.unique_name,
Expand Down
4 changes: 2 additions & 2 deletions morpheus/stages/output/http_client_sink_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class HttpClientSinkStage(PassThruTypeMixin, SinglePortStage):
perform an exponential backoff starting at `error_sleep_time`.
request_timeout_secs : int, optional
Number of seconds to wait for the server to send data before giving up and raising an exception.
accept_status_codes : typing.List[HTTPStatus], optional, multiple = True
accept_status_codes : typing.Iterable[int], optional, multiple = True
List of acceptable status codes, by default (200, 201, 202).
max_retries : int, default 10
Maximum number of times to retry the request fails, receives a redirect or returns a status in the
Expand Down Expand Up @@ -140,7 +140,7 @@ def __init__(self,
error_sleep_time: float = 0.1,
respect_retry_after_header: bool = True,
request_timeout_secs: int = 30,
accept_status_codes: typing.List[HTTPStatus] = (
accept_status_codes: typing.Iterable[int] = (
HTTPStatus.OK,
HTTPStatus.CREATED,
HTTPStatus.ACCEPTED,
Expand Down
4 changes: 4 additions & 0 deletions tests/mock_rest_server/mocks/api/v1/invalid/GET.mock
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
HTTP/1.1 200 OK
Content-Type: application/json

{"not_valid":"json
112 changes: 112 additions & 0 deletions tests/stages/test_http_client_source_stage_pipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from unittest import mock

import pytest

from _utils import TEST_DIRS
from _utils import assert_results
from _utils.dataset_manager import DatasetManager
from morpheus.config import Config
from morpheus.pipeline import LinearPipeline
from morpheus.stages.input.http_client_source_stage import HttpClientSourceStage
from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage


@pytest.mark.slow
@pytest.mark.use_cudf
@pytest.mark.parametrize("lines", [False, True], ids=["json", "lines"])
@pytest.mark.parametrize("use_payload_to_df_fn", [False, True], ids=["no_payload_to_df_fn", "payload_to_df_fn"])
def test_http_client_source_stage_pipe(config: Config,
dataset: DatasetManager,
mock_rest_server: str,
lines: bool,
use_payload_to_df_fn: bool):
"""
Test the HttpClientSourceStage against a mock REST server which will return JSON data which can be deserialized
into a DataFrame.
"""
source_df = dataset['filter_probs.csv']

if lines:
endpoint = "data-lines"
else:
endpoint = "data"

if use_payload_to_df_fn:
if lines:
payload_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.jsonlines")
else:
payload_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.json")

with open(payload_file, "rb") as f:
expected_payload = f.read()

def payload_to_df_fn(payload, lines_arg):
assert payload == expected_payload
assert lines_arg == lines
return source_df[['v2', 'v3']].copy(deep=True)

expected_df = source_df[['v2', 'v3']].copy(deep=True)

else:
payload_to_df_fn = None
expected_payload = None
expected_df = source_df.copy(deep=True)

url = f"{mock_rest_server}/api/v1/{endpoint}"

num_records = len(expected_df)

pipe = LinearPipeline(config)
pipe.set_source(
HttpClientSourceStage(config=config,
url=url,
max_retries=1,
lines=lines,
stop_after=num_records,
payload_to_df_fn=payload_to_df_fn))
comp_stage = pipe.add_stage(CompareDataFrameStage(config, expected_df))
pipe.run()

assert_results(comp_stage.get_results())


@pytest.mark.slow
@pytest.mark.use_cudf
@pytest.mark.parametrize(
"lines",
[False, pytest.param(True, marks=pytest.mark.skip(reason="https://github.com/rapidsai/cudf/issues/15820"))],
ids=["json", "lines"])
@pytest.mark.parametrize("use_payload_to_df_fn", [False, True], ids=["no_payload_to_df_fn", "payload_to_df_fn"])
def test_parse_errors(config: Config, mock_rest_server: str, lines: bool, use_payload_to_df_fn: bool):
url = f"{mock_rest_server}/api/v1/invalid"

if use_payload_to_df_fn:
payload_to_df_fn = mock.MagicMock(side_effect=ValueError("Invalid payload"))
else:
payload_to_df_fn = None

pipe = LinearPipeline(config)
pipe.set_source(
HttpClientSourceStage(config=config, url=url, max_retries=1, lines=lines, payload_to_df_fn=payload_to_df_fn))

# cudf raises a RuntimeError when it should be raising a ValueError also a part of #15820
with pytest.raises(Exception):
pipe.run()

if use_payload_to_df_fn:
payload_to_df_fn.assert_called_once()
Loading

0 comments on commit 12abdce

Please sign in to comment.