Morpheus - input/output kafka stages #1563
-
Hello, I need to connect to a kafka server (source stage), using SSL connection. As I can see in the default morpheus/stages/input/kafka_source_stage.py stage file, there is not way to pass SSL related parameters to the consumer instance. As I see you're using the confluent_kafka python library for the client part which If I'm not wrong support SSL. Does I miss something or does I need implement my own kafka source stage to allows SSL parameters ? Same through goes for the output kafka stage. Thank you. BTW: I'm pretty new in Morpheus and Python which isn't my first langage. |
Beta Was this translation helpful? Give feedback.
Replies: 5 comments 2 replies
-
@nuxwin You are correct. Our Kafka input and output stages only accept minimal arguments for connecting to a local Kafka cluster. We could possibly update the stages to accept a config dictionary to allow more flexibility. For now, you should be able to easily update the existing stages to accept the config options required for SSL. |
Beta Was this translation helpful? Give feedback.
-
@efajardo-nv Thank you for your fast answer. I'll so update existing stages. |
Beta Was this translation helpful? Give feedback.
-
@nuxwin Were open to pull requests for this functionality. If you updates to the existing stages, it would be great to get it integrated into the repository. |
Beta Was this translation helpful? Give feedback.
-
Something like this would be ok for you ? Here the updated kafka input stage for SSL support. I've added a dedicated dictionary as suggested. For the output stage, this is almost identical. # Copyright (c) 2021-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SSL support added by Laurent DECLERCQ, Konzeptplus AG <[email protected]>
# Version: 20230315
import logging
import os
import time
import typing
from enum import Enum
from io import StringIO
import confluent_kafka as ck
import mrc
import pandas as pd
import cudf
import morpheus._lib.stages as _stages
from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.config import auto_determine_bootstrap
from morpheus.messages import MessageMeta
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.stage_schema import StageSchema
logger = logging.getLogger(__name__)
class AutoOffsetReset(Enum):
"""The supported offset options in Kafka"""
EARLIEST = "earliest"
LATEST = "latest"
NONE = "none"
class SSLConfigError(Exception):
"""Custom exception for SSL configuration errors."""
pass
@register_stage("from-kafka", modes=[PipelineModes.FIL, PipelineModes.NLP, PipelineModes.OTHER])
class KafkaSourceStage(PreallocatorMixin, SingleOutputSource):
"""
Load messages from a Kafka cluster.
Parameters
----------
c : `morpheus.config.Config`
Pipeline configuration instance.
bootstrap_servers : str
Comma-separated list of bootstrap servers. If using Kafka created via `docker-compose`, this can be set to
'auto' to automatically determine the cluster IPs and ports
input_topic : typing.List[str], default = ["test_pcap"]
Name of the Kafka topic from which messages will be consumed. To consume from multiple topics,
repeat the same option multiple times.
group_id : str
Specifies the name of the consumer group a Kafka consumer belongs to.
client_id : str, default = None
An optional identifier of the consumer.
poll_interval : str
Seconds that elapse between polling Kafka for new messages. Follows the pandas interval format.
disable_commit : bool, default = False
Enabling this option will skip committing messages as they are pulled off the server. This is only useful for
debugging, allowing the user to process the same messages multiple times.
disable_pre_filtering : bool, default = False
Enabling this option will skip pre-filtering of json messages. This is only useful when inputs are known to be
valid json.
auto_offset_reset : `AutoOffsetReset`, case_sensitive = False
Sets the value for the configuration option 'auto.offset.reset'. See the kafka documentation for more
information on the effects of each value."
stop_after: int, default = 0
Stops ingesting after emitting `stop_after` records (rows in the dataframe). Useful for testing. Disabled if `0`
async_commits: bool, default = True
Enable commits to be performed asynchronously. Ignored if `disable_commit` is `True`.
ssl_config: typing.Dict[str, typing.Union[str, bool], optional
A dictionary containing SSL configuration options for secure Kafka communication.
Keys include:
ssl.ca.location: Path to the CA (Certificate Authority) certificate file (PEM format) used to verify the
Kafka broker's SSL certificate.
ssl.certificate.location: Path to the client's certificate file (PEM format) for client authentication.
ssl.key.location: Path to the client's private key file (PEM format) used in conjunction with the
client's certificate.
enable.ssl.certificate.verification : Boolean to enable/disable SSL certificate verification.
ssl.key.password: Optional password for the client's private key (if the key is password-protected).
"""
def __init__(self,
config: Config,
bootstrap_servers: str,
input_topic: typing.List[str] = None,
group_id: str = "morpheus",
client_id: str = None,
poll_interval: str = "10millis",
disable_commit: bool = False,
disable_pre_filtering: bool = False,
auto_offset_reset: AutoOffsetReset = AutoOffsetReset.LATEST,
stop_after: int = 0,
async_commits: bool = True,
ssl_config: typing.Dict[str, typing.Union[str, bool]] = None):
super().__init__(config)
if input_topic is None:
input_topic = ["test_pcap"]
if isinstance(auto_offset_reset, AutoOffsetReset):
auto_offset_reset = auto_offset_reset.value
if bootstrap_servers == "auto":
bootstrap_servers = auto_determine_bootstrap()
self._consumer_params = {
'bootstrap.servers': bootstrap_servers,
'group.id': group_id,
'session.timeout.ms': "60000",
"auto.offset.reset": auto_offset_reset
}
if client_id is not None:
self._consumer_params['client.id'] = client_id
if ssl_config is not None:
self.ssl_config = ssl_config
self._validate_ssl_config()
self._consumer_params['security.protocol'] = 'SSL'
self._consumer_params.update(self.ssl_config)
if isinstance(input_topic, str):
input_topic = [input_topic]
# Remove duplicate topics if there are any.
self._topics = list(set(input_topic))
self._max_batch_size = config.pipeline_batch_size
self._max_concurrent = config.num_threads
self._disable_commit = disable_commit
self._disable_pre_filtering = disable_pre_filtering
self._stop_after = stop_after
self._async_commits = async_commits
self._client = None
# Flag to indicate whether or not we should stop
self._stop_requested = False
self._poll_interval = pd.Timedelta(poll_interval).total_seconds()
self._started = False
self._records_emitted = 0
self._num_messages = 0
@property
def name(self) -> str:
return "from-kafka"
def supports_cpp_node(self):
return True
def compute_schema(self, schema: StageSchema):
schema.output_schema.set_type(MessageMeta)
def stop(self):
"""
Performs cleanup steps when pipeline is stopped.
"""
# Indicate we need to stop
self._stop_requested = True
return super().stop()
def _validate_ssl_config(self):
"""
Validates the existence of files specified in the SSL configuration. It ensures the paths for CA certificates,
client certificates, and required parameters exist to avoid issues during Kafka consumer initialization.
If any required parameter is missing or if any specified file does not exist, an SSLConfigError exception is
raised.
"""
for key in ['ssl.ca.location', 'ssl.certificate.location', 'ssl.key.location']:
if key not in self.ssl_config:
raise SSLConfigError(f"{key} is missing for SSL configuration")
if not os.path.exists(self.ssl_config[key]):
raise SSLConfigError(f"SSL file for {key} not found at {self.ssl_config.get(key)}")
def _process_batch(self, consumer, batch):
message_meta = None
if len(batch):
buffer = StringIO()
for msg in batch:
payload = msg.value()
if payload is not None:
buffer.write(payload.decode("utf-8"))
buffer.write("\n")
df = None
try:
buffer.seek(0)
df = cudf.io.read_json(buffer, engine='cudf', lines=True, orient='records')
except Exception as e:
logger.error("Error parsing payload into a dataframe : %s", e)
finally:
if (not self._disable_commit):
for msg in batch:
consumer.commit(message=msg, asynchronous=self._async_commits)
if df is not None:
num_records = len(df)
message_meta = MessageMeta(df)
self._records_emitted += num_records
self._num_messages += 1
if self._stop_after > 0 and self._records_emitted >= self._stop_after:
self._stop_requested = True
batch.clear()
return message_meta
def _source_generator(self):
consumer = None
try:
consumer = ck.Consumer(self._consumer_params)
consumer.subscribe(self._topics)
batch = []
while not self._stop_requested:
do_process_batch = False
do_sleep = False
msg = consumer.poll(timeout=1.0)
if msg is None:
do_process_batch = True
do_sleep = True
else:
msg_error = msg.error()
if msg_error is None:
batch.append(msg)
if len(batch) == self._max_batch_size:
do_process_batch = True
elif msg_error == ck.KafkaError._PARTITION_EOF:
do_process_batch = True
do_sleep = True
else:
raise ck.KafkaException(msg_error)
if do_process_batch:
message_meta = self._process_batch(consumer, batch)
if message_meta is not None:
yield message_meta
if do_sleep and not self._stop_requested:
time.sleep(self._poll_interval)
message_meta = self._process_batch(consumer, batch)
if message_meta is not None:
yield message_meta
finally:
# Close the consumer and call on_completed
if (consumer):
consumer.close()
def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject:
if (self._build_cpp_node()):
source = _stages.KafkaSourceStage(builder,
self.unique_name,
self._max_batch_size,
self._topics,
int(self._poll_interval * 1000),
self._consumer_params,
self._disable_commit,
self._disable_pre_filtering,
self._stop_after,
self._async_commits)
# Only use multiple progress engines with C++. The python implementation will duplicate messages with
# multiple threads
source.launch_options.pe_count = self._max_concurrent
else:
source = builder.make_source(self.unique_name, self._source_generator)
return source Usage example#!/bin/bash --login
# Activate the `morpheus` conda environment.
. /opt/conda/etc/profile.d/conda.sh
conda activate morpheus
KAFKA_SERVER_HOST=${KAFKA_SERVER_HOST:-morpheus-konzeptplus.i-hms.net}
KAFKA_SERVER_PORT=${KAFKA_SERVER_PORT:-9094}
SSL_CA_LOCATION=${SSL_CA_LOCATION:-/.certs/ca/ca-cert.pem}
SSL_CERTIFICATE_LOCATION=${SSL_CERTIFICATE_LOCATION:-/.certs/client/client-cert.pem}
SSL_KEY_LOCATION=${SSL_KEY_LOCATION:-/.certs/client/client-key.pem}
SSL_KEY_PASSWORD=${SSL_KEY_PASSWORD:-}
ENABLE_SSL_CERTIFICATE_VERIFICATION=${ENABLE_SSL_CERTIFICATE_VERIFICATION:-False}
morpheus --log_level=DEBUG \
run pipeline-nlp \
from-kafka \
--input_topic ai_pf_001_input_topic \
--bootstrap_servers "${KAFKA_SERVER_HOST}:${KAFKA_SERVER_PORT}" \
--ssl_config ssl.ca.location "${SSL_CA_LOCATION}" \
--ssl_config ssl.certificate.location "${SSL_CERTIFICATE_LOCATION}" \
--ssl_config ssl.key.location "${SSL_KEY_LOCATION}" \
--ssl_config ssl.key.password "${SSL_KEY_PASSWORD}" \
--ssl_config enable.ssl.certificate.verification "${ENABLE_SSL_CERTIFICATE_VERIFICATION}" \
deserialize \
serialize \
to-kafka \
--output_topic ai_pf_001_output_topic \
--bootstrap_servers "${KAFKA_SERVER_HOST}:${KAFKA_SERVER_PORT}" \
--ssl_config ssl.ca.location "${SSL_CA_LOCATION}" \
--ssl_config ssl.certificate.location "${SSL_CERTIFICATE_LOCATION}" \
--ssl_config ssl.key.location "${SSL_KEY_LOCATION}" \
--ssl_config ssl.key.password "${SSL_KEY_PASSWORD}" \
--ssl_config enable.ssl.certificate.verification "${ENABLE_SSL_CERTIFICATE_VERIFICATION}" Output examplenuxwin@morpheus-konzeptplus:/srv/projects/AI_PF_M_001/.docker$ docker compose up ai-pf-001-morpheus-pipeline
[+] Running 3/0
✔ Container ai-pf-001-triton-server Created 0.0s
✔ Container ai-pf-001-kafka-server Running 0.0s
✔ Container ai-pf-001-morpheus-pipeline Created 0.0s
Attaching to ai-pf-001-morpheus-pipeline
ai-pf-001-morpheus-pipeline | Configuring Pipeline via CLI
ai-pf-001-morpheus-pipeline | Parameter, 'labels_file', with relative path, 'data/labels_nlp.txt', does not exist. Using package relative location: '/opt/conda/envs/morpheus/lib/python3.10/site-packages/morpheus/data/labels_nlp.txt'
ai-pf-001-morpheus-pipeline | Loaded labels file. Current labels: [['address', 'bank_acct', 'credit_card', 'email', 'govt_id', 'name', 'password', 'phone_num', 'secret_keys', 'user']]
ai-pf-001-morpheus-pipeline | Module 'FileBatcher' was successfully registered with 'morpheus' namespace.
ai-pf-001-morpheus-pipeline | Module 'FileToDF' was successfully registered with 'morpheus' namespace.
ai-pf-001-morpheus-pipeline | Module 'FilterCmFailed' was successfully registered with 'morpheus' namespace.
ai-pf-001-morpheus-pipeline | Module 'FilterControlMessage' was successfully registered with 'morpheus' namespace.
ai-pf-001-morpheus-pipeline | Module 'FilterDetections' was successfully registered with 'morpheus' namespace.
ai-pf-001-morpheus-pipeline | Module 'FromControlMessage' was successfully registered with 'morpheus' namespace.
ai-pf-001-morpheus-pipeline | Module 'MLFlowModelWriter' was successfully registered with 'morpheus' namespace.
ai-pf-001-morpheus-pipeline | Module 'PayloadBatcher' was successfully registered with 'morpheus' namespace.
ai-pf-001-morpheus-pipeline | Module 'Serialize' was successfully registered with 'morpheus' namespace.
ai-pf-001-morpheus-pipeline | Module 'ToControlMessage' was successfully registered with 'morpheus' namespace.
ai-pf-001-morpheus-pipeline | Module 'WriteToElasticsearch' was successfully registered with 'morpheus' namespace.
ai-pf-001-morpheus-pipeline | Module 'WriteToFile' was successfully registered with 'morpheus' namespace.
ai-pf-001-morpheus-pipeline | Module 'deserialize' was successfully registered with 'morpheus' namespace.
ai-pf-001-morpheus-pipeline | ====Pipeline Pre-build====
ai-pf-001-morpheus-pipeline | ====Pre-Building Segment: linear_segment_0====
ai-pf-001-morpheus-pipeline | ====Pre-Building Segment Complete!====
ai-pf-001-morpheus-pipeline | ====Pipeline Pre-build Complete!====
ai-pf-001-morpheus-pipeline | ====Registering Pipeline====
ai-pf-001-morpheus-pipeline | Starting pipeline via CLI... Ctrl+C to Quit
ai-pf-001-morpheus-pipeline | ====Building Pipeline====
ai-pf-001-morpheus-pipeline | ====Building Pipeline Complete!====
ai-pf-001-morpheus-pipeline | ====Registering Pipeline Complete!====
ai-pf-001-morpheus-pipeline | Config:
ai-pf-001-morpheus-pipeline | {
ai-pf-001-morpheus-pipeline | "ae": null,
ai-pf-001-morpheus-pipeline | "class_labels": [
ai-pf-001-morpheus-pipeline | "address",
ai-pf-001-morpheus-pipeline | "bank_acct",
ai-pf-001-morpheus-pipeline | "credit_card",
ai-pf-001-morpheus-pipeline | "email",
ai-pf-001-morpheus-pipeline | "govt_id",
ai-pf-001-morpheus-pipeline | "name",
ai-pf-001-morpheus-pipeline | "password",
ai-pf-001-morpheus-pipeline | "phone_num",
ai-pf-001-morpheus-pipeline | "secret_keys",
ai-pf-001-morpheus-pipeline | "user"
ai-pf-001-morpheus-pipeline | ],
ai-pf-001-morpheus-pipeline | "debug": false,
ai-pf-001-morpheus-pipeline | "edge_buffer_size": 128,
ai-pf-001-morpheus-pipeline | "feature_length": 256,
ai-pf-001-morpheus-pipeline | "fil": null,
ai-pf-001-morpheus-pipeline | "log_config_file": null,
ai-pf-001-morpheus-pipeline | "log_level": 10,
ai-pf-001-morpheus-pipeline | "mode": "NLP",
ai-pf-001-morpheus-pipeline | "model_max_batch_size": 8,
ai-pf-001-morpheus-pipeline | "num_threads": 20,
ai-pf-001-morpheus-pipeline | "pipeline_batch_size": 256,
ai-pf-001-morpheus-pipeline | "plugins": []
ai-pf-001-morpheus-pipeline | }
ai-pf-001-morpheus-pipeline | CPP Enabled: True
ai-pf-001-morpheus-pipeline | ====Starting Pipeline====
ai-pf-001-morpheus-pipeline | ====Building Segment: linear_segment_0====
ai-pf-001-morpheus-pipeline | Added source: <from-kafka-0; KafkaSourceStage(bootstrap_servers=morpheus-konzeptplus.i-hms.net:9094, input_topic=('ai_pf_001_input_topic',), group_id=morpheus, client_id=None, poll_interval=10millis, disable_commit=False, disable_pre_filtering=False, auto_offset_reset=AutoOffsetReset.LATEST, stop_after=0, async_commits=True, ssl_config={'ssl.ca.location': '/.certs/ca/ca-cert.pem', 'ssl.certificate.location': '/.certs/client/client-cert.pem', 'ssl.key.location': '/.certs/client/client-key.pem', 'ssl.key.password': '', 'enable.ssl.certificate.verification': 'False'})>
ai-pf-001-morpheus-pipeline | └─> morpheus.MessageMeta
ai-pf-001-morpheus-pipeline | Module 'deserialize' with namespace 'morpheus' is successfully loaded.
ai-pf-001-morpheus-pipeline | Added stage: <deserialize-1; DeserializeStage(ensure_sliceable_index=True, message_type=<class 'morpheus.messages.multi_message.MultiMessage'>, task_type=None, task_payload=None)>
ai-pf-001-morpheus-pipeline | └─ morpheus.MessageMeta -> morpheus.MultiMessage
ai-pf-001-morpheus-pipeline | Added stage: <serialize-2; SerializeStage(include=(), exclude=(), fixed_columns=True)>
ai-pf-001-morpheus-pipeline | └─ morpheus.MultiMessage -> morpheus.MessageMeta
ai-pf-001-morpheus-pipeline | Added stage: <to-kafka-3; WriteToKafkaStage(bootstrap_servers=morpheus-konzeptplus.i-hms.net:9094, output_topic=ai_pf_001_output_topic, client_id=None, ssl_config={'ssl.ca.location': '/.certs/ca/ca-cert.pem', 'ssl.certificate.location': '/.certs/client/client-cert.pem', 'ssl.key.location': '/.certs/client/client-key.pem', 'ssl.key.password': '', 'enable.ssl.certificate.verification': 'False'})>
ai-pf-001-morpheus-pipeline | └─ morpheus.MessageMeta -> morpheus.MessageMeta
ai-pf-001-morpheus-pipeline | ====Pipeline Started====
ai-pf-001-morpheus-pipeline | ====Building Segment Complete!==== |
Beta Was this translation helpful? Give feedback.
-
@nuxwin Thank you for doing this! Could you please submit this in a PR? That would be a better place to have review/discussion on this to get it merged. |
Beta Was this translation helpful? Give feedback.
@nuxwin You are correct. Our Kafka input and output stages only accept minimal arguments for connecting to a local Kafka cluster. We could possibly update the stages to accept a config dictionary to allow more flexibility. For now, you should be able to easily update the existing stages to accept the config options required for SSL.