Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 'poll' function to custreamz kafka consumer #13782

Merged
merged 7 commits into from
Aug 14, 2023
Merged
21 changes: 20 additions & 1 deletion python/custreamz/custreamz/kafka.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
import confluent_kafka as ck
from cudf_kafka._lib.kafka import KafkaDatasource

Expand All @@ -25,6 +25,7 @@ def __init__(self, kafka_configs):

self.kafka_configs = kafka_configs
self.kafka_meta_client = KafkaDatasource(kafka_configs)
self.ck_consumer = ck.Consumer(kafka_configs)

def list_topics(self, specific_topic=None):
"""
Expand Down Expand Up @@ -270,3 +271,21 @@ def commit(self, offsets=None, asynchronous=True):
self.kafka_meta_client.commit_offset(
offs.topic.encode(), offs.partition, offs.offset
)

def poll(self, timeout=None):
"""
Consumes a single message, calls callbacks and returns events.

The application must check the returned Message object's
Message.error() method to distinguish between proper messages
(error() returns None), or an event or error
(see error().code() for specifics).

Parameters
----------
timeout : float
Maximum time to block waiting for message, event or callback
(default: infinite (None translated into -1 in the
library)). (Seconds)
"""
return self.ck.poll(timeout)