From aad2c7fdb399e746275398ca18f2406856fbc1e0 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Fri, 28 Jul 2023 10:36:17 -0400 Subject: [PATCH 1/3] Add poll function to custreamz kafka consumer that proxies functionality to underlying confluent kafka consumer library --- python/custreamz/custreamz/kafka.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/python/custreamz/custreamz/kafka.py b/python/custreamz/custreamz/kafka.py index a88b05c83b1..b60f2703376 100644 --- a/python/custreamz/custreamz/kafka.py +++ b/python/custreamz/custreamz/kafka.py @@ -25,6 +25,7 @@ def __init__(self, kafka_configs): self.kafka_configs = kafka_configs self.kafka_meta_client = KafkaDatasource(kafka_configs) + self.ck_consumer = ck.Consumer(kafka_configs) def list_topics(self, specific_topic=None): """ @@ -270,3 +271,20 @@ def commit(self, offsets=None, asynchronous=True): self.kafka_meta_client.commit_offset( offs.topic.encode(), offs.partition, offs.offset ) + + def poll(self, timeout=None): + """ + Consumes a single message, calls callbacks and returns events. + + The application must check the returned Message object's Message.error() + method to distinguish between proper messages (error() returns None), + or an event or error (see error().code() for specifics). + + Parameters + ---------- + timeout : float, + Maximum time to block waiting for message, event or callback + (default: infinite (None translated into -1 in the library)). (Seconds) + """ + return self.ck.poll(timeout) + From 734284e552da2eb00c33eda3da03bb7b67da8953 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Fri, 28 Jul 2023 15:57:00 -0400 Subject: [PATCH 2/3] linter fixes for kafka.py --- python/custreamz/custreamz/kafka.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/python/custreamz/custreamz/kafka.py b/python/custreamz/custreamz/kafka.py index b60f2703376..3883936f55a 100644 --- a/python/custreamz/custreamz/kafka.py +++ b/python/custreamz/custreamz/kafka.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2022, NVIDIA CORPORATION. +# Copyright (c) 2020-2023, NVIDIA CORPORATION. import confluent_kafka as ck from cudf_kafka._lib.kafka import KafkaDatasource @@ -276,15 +276,16 @@ def poll(self, timeout=None): """ Consumes a single message, calls callbacks and returns events. - The application must check the returned Message object's Message.error() - method to distinguish between proper messages (error() returns None), - or an event or error (see error().code() for specifics). + The application must check the returned Message object's + Message.error() method to distinguish between proper messages + (error() returns None), or an event or error + (see error().code() for specifics). Parameters ---------- timeout : float, Maximum time to block waiting for message, event or callback - (default: infinite (None translated into -1 in the library)). (Seconds) + (default: infinite (None translated into -1 in the + library)). (Seconds) """ return self.ck.poll(timeout) - From 63524484071b83a6f0e5445032f094390efd5be8 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Mon, 14 Aug 2023 09:45:54 -0400 Subject: [PATCH 3/3] Update python/custreamz/custreamz/kafka.py Co-authored-by: Bradley Dice --- python/custreamz/custreamz/kafka.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/custreamz/custreamz/kafka.py b/python/custreamz/custreamz/kafka.py index 3883936f55a..0def0ba746e 100644 --- a/python/custreamz/custreamz/kafka.py +++ b/python/custreamz/custreamz/kafka.py @@ -283,7 +283,7 @@ def poll(self, timeout=None): Parameters ---------- - timeout : float, + timeout : float Maximum time to block waiting for message, event or callback (default: infinite (None translated into -1 in the library)). (Seconds)