Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Add a poll method to cuStreamz for single messages #13600

Closed
chinmaychandak opened this issue Jun 21, 2023 · 6 comments · Fixed by #13782
Closed

[FEA] Add a poll method to cuStreamz for single messages #13600

chinmaychandak opened this issue Jun 21, 2023 · 6 comments · Fixed by #13782
Assignees
Labels
0 - Backlog In queue waiting for assignment feature request New feature or request

Comments

@chinmaychandak
Copy link
Contributor

I am using cuStreamz with cudf 23.06.

When using the Confluent Kafka engine, things work as expected. When using the accelerated cudf_kafka datasource, I see the below error:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
/tmp/ipykernel_1704/3564192403.py in <module>
----> 1 stream_cudf_kafka.start()

/opt/conda/envs/rapids/lib/python3.9/site-packages/streamz/core.py in start(self)
    340         """ Start any upstream sources """
    341         for upstream in self.upstreams:
--> 342             upstream.start()
    343 
    344     def stop(self):

/opt/conda/envs/rapids/lib/python3.9/site-packages/streamz/sources.py in start(self)
    595             # blocks for consumer thread to come up and invoke poll to establish
    596             # connection with broker to fetch oauth token for kafka
--> 597             self.consumer.poll(timeout=1)
    598             self.consumer.get_watermark_offsets(tp)
    599             self.loop.add_callback(self.poll_kafka)

AttributeError: 'Consumer' object has no attribute 'poll'

@jdye64 / @randerzander - Can you please help take a look?

@chinmaychandak chinmaychandak added Needs Triage Need team to review and classify bug Something isn't working labels Jun 21, 2023
@GregoryKimball
Copy link
Contributor

Thank you @chinmaychandak for reporting this. Would you please share a bit more background? Have you been relying on the cudf_kafka datasource for some time, or are you trying to use it for the first time? Are there any old versions of cuDF that work correctly for you?

@chinmaychandak
Copy link
Contributor Author

Hey @GregoryKimball!

We're resuming dev on custreamz after 1.5+ years. cuStreamz uses cudf_kafka to fetch batches of data from Kafka in an accelerated manner (data gets passed from librdkafka to libcudf via the JSON reader in the C/C++ layer bypassing the Python layer and effectively the GIL). cudf_kafka has the same API as confluent_kafka.

This worked well for versions around 0.20/ 21.06 (long time ago, ikr!).

Here's a smaller MRE:

import confluent_kafka as ck
from custreamz import kafka

kafka_brokers = "somekafkabroker:9092"
kafka_consumer_configs = {
    "bootstrap.servers": kafka_brokers,
    "group.id": "mre-checkpoint",
    "auto.offset.reset": "earliest"
}
topic = "topic"

# initializing consumer works
cudf_kafka_consumer = kafka.Consumer(kafka_consumer_configs)
ck_consumer = kafka.Consumer(kafka_consumer_configs)

# fetching watermark offsets works
tp = ck.TopicPartition(topic, 0, 0)
w1 = ck_consumer.get_watermark_offsets(tp) 
w2 = cudf_kafka_consumer.get_watermark_offsets(tp)
print(w1, w2)

# reading batch of data from Kafka into cuDF dataframe using cudf.read_json works
gdf = cudf_kafka_consumer.read_gdf(topic=topic, partition=0, lines=True, start=0, end=10) 

# polling topic fails in cudf_kafka but works in confluent_kafka
ck_consumer.poll(timeout=1) # works
cudf_kafka_consumer.poll(timeout=1) # fails with AttributeError: 'Consumer' object has no attribute 'poll'

Since the poll fails, this streamz code fails: https://github.com/python-streamz/streamz/blob/master/streamz/sources.py#L581-L599.

@GregoryKimball GregoryKimball added 0 - Backlog In queue waiting for assignment cuStreamz and removed Needs Triage Need team to review and classify labels Jul 22, 2023
@jdye64
Copy link
Contributor

jdye64 commented Jul 24, 2023

@chinmaychandak, @randerzander, & @GregoryKimball I just started looking into this. I wanted to make everyone aware that the "poll" method only returns a single "kafka message" as described in the Confluent Kafka documentation here.

What that means is cudf would only be provided with a single message each time that the "poll" method is invoked in custreamz. Cudf gains its speed advantages by operating on "bulk reads" of data. So getting a single message at a time via custreamz is likely to be slower than just using the confluent kafka python library, ("ck_consumer.poll") if you follow the MRE ^^.

I'm willing to add the function but want to make everyone aware that it will not be fast and in fact might be slower. My personal recommendation is to just use the existing confluent kafka python library for this operation. I will wait for feedback from others before I continue.

@chinmaychandak
Copy link
Contributor Author

poll would only be used once per consumer upon startup: https://github.com/python-streamz/streamz/blob/master/streamz/sources.py#L581-L599. This poll is needed to establish a connection with brokers to fetch a token if we're using Kafka with OAuth.

Bulk reads are still the way to go, no changes there: https://github.com/python-streamz/streamz/blob/master/streamz/sources.py#L746-L758.

@jdye64
Copy link
Contributor

jdye64 commented Jul 25, 2023

That makes sense. So, given that what about if I created a function in cudf_kafka/custreamz and then behind the scenes it used Confluent kafka python to perform the actual operation? This way the only library you would need to deal with would be cudf_kafka. Does that sound resonable? I could try and get a rough working PR in the next few days for that and you could test it out of your end before we merge to make sure that works for you.

@GregoryKimball GregoryKimball added libcudf Affects libcudf (C++/CUDA) code. feature request New feature or request and removed bug Something isn't working libcudf Affects libcudf (C++/CUDA) code. labels Jul 25, 2023
@GregoryKimball GregoryKimball changed the title [BUG] cudf_kafka Consumer not able to read data [FEA] Add a poll method to cuStreamz for single messages Jul 25, 2023
@chinmaychandak
Copy link
Contributor Author

Yes, that sounds reasonable. Thanks Jeremy!

rapids-bot bot pushed a commit that referenced this issue Aug 14, 2023
Streamz has updated their codebase to include a call to the Confluent Kafka Consumer library function 'poll'. Currently custreamz does not include this method. This PR adds the 'poll' function to custreamz to simply proxy the call to the underlying confluent kafka library so that streamz is no longer broken for end users. Without this function end users are no longer able to use custreamz with newer versions of the streamz library.

This closes: #13600

Authors:
  - Jeremy Dyer (https://github.com/jdye64)

Approvers:
  - Bradley Dice (https://github.com/bdice)

URL: #13782
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
0 - Backlog In queue waiting for assignment feature request New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants