diff --git a/python/custreamz/custreamz/kafka.py b/python/custreamz/custreamz/kafka.py index a88b05c83b1..0def0ba746e 100644 --- a/python/custreamz/custreamz/kafka.py +++ b/python/custreamz/custreamz/kafka.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2022, NVIDIA CORPORATION. +# Copyright (c) 2020-2023, NVIDIA CORPORATION. import confluent_kafka as ck from cudf_kafka._lib.kafka import KafkaDatasource @@ -25,6 +25,7 @@ def __init__(self, kafka_configs): self.kafka_configs = kafka_configs self.kafka_meta_client = KafkaDatasource(kafka_configs) + self.ck_consumer = ck.Consumer(kafka_configs) def list_topics(self, specific_topic=None): """ @@ -270,3 +271,21 @@ def commit(self, offsets=None, asynchronous=True): self.kafka_meta_client.commit_offset( offs.topic.encode(), offs.partition, offs.offset ) + + def poll(self, timeout=None): + """ + Consumes a single message, calls callbacks and returns events. + + The application must check the returned Message object's + Message.error() method to distinguish between proper messages + (error() returns None), or an event or error + (see error().code() for specifics). + + Parameters + ---------- + timeout : float + Maximum time to block waiting for message, event or callback + (default: infinite (None translated into -1 in the + library)). (Seconds) + """ + return self.ck.poll(timeout)