Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Confluent Cloud client example: Rust #170

Merged
merged 1 commit into from
Jun 11, 2019

Conversation

frankgreco
Copy link

@frankgreco frankgreco commented Jun 5, 2019

@confluentinc/devx

@ybyzek
Copy link
Contributor

ybyzek commented Jun 5, 2019

@frankgreco can you please add Rust to https://github.com/confluentinc/examples/blob/5.2.1-post/clients/cloud/README.md ?


```bash
$ RUSTFLAGS='-C link-args=-lzstd' cargo build
$ ./target/debug/producer --config ~/.ccloud/example.config --topic i_love_rust
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency with all the other client examples, let's have topic name test1

@ybyzek ybyzek requested review from ybyzek and a team June 5, 2019 11:04
@ybyzek ybyzek changed the title [WIP] Adding Rust Client Example [WIP] Confluent Cloud client example: Rust Jun 5, 2019
Produce messages to produce and consume messages from [Confluent Cloud](https://www.confluent.io/confluent-cloud/) using the [rust-rdkafka client for Apache Kafka](https://github.com/fede1024/rust-rdkafka).

# Prerequisites
* [Rust Client for Apache Kafka](https://github.com/fede1024/rust-rdkafka#installation) installed on your machine
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Client -> client

fn log_produce_result(result: Result<(i32, i64), (KafkaError, OwnedMessage)>) -> Result<(), ()> {
result
.and_then(|(p, o)| {
println!("produced to partition {} with offset {}", p, o);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency with the other client examples, can this print topic name as well? Specifically, this is what the other clients print:

Produced record to topic test1 partition [0] @ offset 0
...


let messages = (0..9)
.map(|msg| {
producer.send(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency with the other client examples, can this print that it's about to send? Specifically, this is what the other clients print:

Producing record: alice 	 {"count": 0}
...

@frankgreco frankgreco changed the title [WIP] Confluent Cloud client example: Rust Confluent Cloud client example: Rust Jun 5, 2019
@@ -22,7 +22,7 @@ The following subset includes examples with Confluent Cloud Schema Registry and
| [![](images/kafka-connect-datagen.png)](kafka-connect-datagen/) | [ksql-datagen](ksql-datagen/) | |


# Other Confluent Cloud Demos
# Other Confluent Cloud Dem
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankgreco -- typo

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@frankgreco frankgreco force-pushed the rust branch 2 times, most recently from 08f9040 to 7fee79a Compare June 5, 2019 15:58
@@ -10,7 +10,7 @@ This directory includes examples of Kafka client applications connecting to [Con
| [![](images/scala.png)](scala/) | [![](images/confluent-cli.png)](confluent-cli/) | [![](images/ruby.png)](ruby/) |
| [![](images/groovy.png)](groovy/) | [![](images/kotlin.png)](kotlin/) | [![](images/nodejs.png)](nodejs/) |
| [![](images/kafkacat.jpg)](kafkacat/) | [![](images/dotnet.png)](csharp/) | [![](images/c.png)](c/) |
| [![](images/kafka-connect-datagen.png)](kafka-connect-datagen/) | [ksql-datagen](ksql-datagen/) | |
| [![](images/kafka-connect-datagen.png)](kafka-connect-datagen/) | [ksql-datagen](ksql-datagen/) | [<img src="images/rust.png" width="100" />](rust/) |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This image import should probably not be different from the existing ones.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason it is is because this is the only way in GH markdown that I know of to resize an image.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, can we resize the actual image instead? We did this with the other ones

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea sure I could do that - 256x256 seems like it should fit

@@ -0,0 +1,2 @@
/target
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be just target, right? /target is outside the git repo

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this .gitignore was generated by the cargo cli

@@ -0,0 +1,61 @@
# Overview

Produce messages to produce and consume messages from [Confluent Cloud](https://www.confluent.io/confluent-cloud/) using the [rust-rdkafka client for Apache Kafka](https://github.com/fede1024/rust-rdkafka).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Produce messages to produce .." is redundant, should probably be "Produce messages to and consume messages from .."

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy/paste from other client examples - I left it as is for uniformity

```

# Example 1: Hello World!
In this example, the producer writes Kafka data to a topic in Confluent Cloud.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"writes data to a Kafka topic"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy/paste from other client examples - I left it as is for uniformity

I guess I could change in all client example readme's

1. Run the producer, passing in arguments for (a) the local file with configuration parameters to connect to your Confluent Cloud instance and (b) the topic name:

```bash
$ RUSTFLAGS='-C link-args=-lzstd' cargo build
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these RUSTFLAGS needed? librdkafka should already be linked with -lzstd

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would love to figure out how to do a build without specifying this but I spend too much time trying to get it to work and this was the workaround. Here is the error I get:

  = note: Undefined symbols for architecture x86_64:
            "_ZSTD_createCStream", referenced from:
                _rd_kafka_zstd_compress in librdkafka_sys-347d93b86e02028e.rlib(rdkafka_zstd.o)
            "_ZSTD_initCStream", referenced from:
                _rd_kafka_zstd_compress in librdkafka_sys-347d93b86e02028e.rlib(rdkafka_zstd.o)
            "_ZSTD_endStream", referenced from:
                _rd_kafka_zstd_compress in librdkafka_sys-347d93b86e02028e.rlib(rdkafka_zstd.o)
            "_ZSTD_decompress", referenced from:
                _rd_kafka_zstd_decompress in librdkafka_sys-347d93b86e02028e.rlib(rdkafka_zstd.o)
            "_ZSTD_compressBound", referenced from:
                _rd_kafka_zstd_compress in librdkafka_sys-347d93b86e02028e.rlib(rdkafka_zstd.o)
            "_ZSTD_isError", referenced from:
                _rd_kafka_zstd_decompress in librdkafka_sys-347d93b86e02028e.rlib(rdkafka_zstd.o)
                _rd_kafka_zstd_compress in librdkafka_sys-347d93b86e02028e.rlib(rdkafka_zstd.o)
            "_ZSTD_compressStream", referenced from:
                _rd_kafka_zstd_compress in librdkafka_sys-347d93b86e02028e.rlib(rdkafka_zstd.o)
            "_ZSTD_freeCStream", referenced from:
                _rd_kafka_zstd_compress in librdkafka_sys-347d93b86e02028e.rlib(rdkafka_zstd.o)
            "_ZSTD_getFrameContentSize", referenced from:
                _rd_kafka_zstd_decompress in librdkafka_sys-347d93b86e02028e.rlib(rdkafka_zstd.o)
            "_ZSTD_getErrorCode", referenced from:
                _rd_kafka_zstd_decompress in librdkafka_sys-347d93b86e02028e.rlib(rdkafka_zstd.o)
            "_ZSTD_getErrorName", referenced from:
                _rd_kafka_zstd_decompress in librdkafka_sys-347d93b86e02028e.rlib(rdkafka_zstd.o)
                _rd_kafka_zstd_compress in librdkafka_sys-347d93b86e02028e.rlib(rdkafka_zstd.o)
          ld: symbol(s) not found for architecture x86_64
          clang: error: linker command failed with exit code 1 (use -v to see invocation)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a bug, it should be possible to automate this with a build.rs file
fede1024/rust-rdkafka#125

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gardnervickers good find! let me try that 💯

@@ -0,0 +1,53 @@
extern crate clap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copyright blobbing

.get_matches();

let mut kafka_config = ClientConfig::new();
kafka_config.set_log_level(RDKafkaLogLevel::Debug);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed. No debug is logged unless you specify a "debug" config property, in which case it also raises the log level.

let file = File::open(matches.value_of("config").ok_or("error parsing config")?)?;
for line in BufReader::new(&file).lines() {
let cur_line = line?;
if cur_line.starts_with('#') || cur_line.chars().count() < 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to l/r trim the line.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

continue;
}
let key_value: Vec<&str> = cur_line.split("=").collect();
kafka_config.set(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this propagate errors?

Copy link
Author

@frankgreco frankgreco Jun 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anywhere where there is an error, a result is returned and errors are propagated


fn main() -> Result<(), Box<std::error::Error>> {
let (topic, mut config) = utils::get_config()?;
let consumer: StreamConsumer = config.set("group.id", "rust_example_group_1").create()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it needs some more config for cloud, such as "security.protocol" and "sasl.mechanisms". Same for the producer

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those are provided by the passed in config

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the config file example above does not have them.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point - i'll make it clear in the readme that they should be there by adding them to the example

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@frankgreco frankgreco force-pushed the rust branch 3 times, most recently from 39372a5 to 3822821 Compare June 5, 2019 21:32
fn echo_message<M: Message>(msg: M) -> Result<(), std::str::Utf8Error> {
let deserialize = |o| match o {
None => Ok("".to_string()),
Some(val) => Ok(std::str::from_utf8(val)?.to_string()),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can just be String::from_utf8(val)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I do that, then I need to use .to_vec() which then means I need add type annotation to the closure. Hence, it's less code to have it as is.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm on second thought, we're taking a &[u8] here, converting it to a &str and then to a String. I think we should be able to avoid the to_string() copying the &str to a String here right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good observation - no need to convert to a String

None
}
})
.for_each(move |msg| {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm do we need the move here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope :) good catch - I wish the compiler told you this because at one point I probably did but then when I no longer needed it it's still valid code and doesn't warn.

@@ -0,0 +1,22 @@
[package]
name = "rust_kafka_client_example"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the convention is to use dashes instead of underscores here

1. Run the producer, passing in arguments for (a) the local file with configuration parameters to connect to your Confluent Cloud instance and (b) the topic name:

```bash
$ RUSTFLAGS='-C link-args=-lzstd' cargo build

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a bug, it should be possible to automate this with a build.rs file
fede1024/rust-rdkafka#125

*/
extern crate futures;
extern crate rdkafka;
extern crate tokio;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth considering using Rust 2018 and removing these extern crate statements.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Didn't know these weren't required in Rust 2018 :)

Ok(())
}

fn main() -> Result<(), Box<std::error::Error>> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be dyn std::error::Error?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got the inspiration from here.

mod utils;

fn log_produce_result(
topic: String,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does topic here need to be owned or can it be &str?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, it doesn't need to be owned!

for msg in messages {
msg.wait()
.map_err(|err| eprintln!("error producing message: {}", err))
.and_then(|result| log_produce_result(topic.to_owned(), result))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im not sure topic needs to be cloned here if log_produce_result does not need an owned String.

@frankgreco frankgreco force-pushed the rust branch 2 times, most recently from 1959e9b to 8e79558 Compare June 6, 2019 00:24
@frankgreco
Copy link
Author

frankgreco commented Jun 6, 2019

@edenhill @ybyzek @gardnervickers I believe I have addressed all your suggestions 😄

Copy link
Contributor

@ybyzek ybyzek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankgreco thanks for the PR! Can you resize the Rust graphic just a tough smaller (shave off ~15%), but otherwise LGTM! I approve.

@frankgreco frankgreco force-pushed the rust branch 5 times, most recently from c5aa187 to 99cb68d Compare June 6, 2019 15:49
Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

name = "rust-kafka-client-example"
version = "0.1.0"
authors = ["GRECO, FRANK <[email protected]>"]
edition = "2018"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2019

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's not the current year, but the rust edition

Copy link

@gardnervickers gardnervickers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, one thing to watch out for is that trait objects need a dyn before them, bare trait objects are deprecated.
So Box<std::error::Error> should become Box<dyn std::error::Error>

fn log_produce_result(
topic: &str,
result: Result<(i32, i64), (KafkaError, OwnedMessage)>,
) -> Result<(), ()> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is nothing to return for the result, do we even need to return anything?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to if I want to use it in an and_then like I do

msg.wait()
.map_err(|err| eprintln!("error producing message: {}", err))
.and_then(|result| log_produce_result(&topic, result))
.ok();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this .ok() necessary?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes because i'm using and_then which returns a Result which means that there could be an error. We know there won't be because the result we're returning doesn't throw any error but we still want to make the compiler happy

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow, essentially we're going from Result<Result<(i32, i64), (KafkaError, OwnedMessage)>> here to Option<()> but not consuming the option. That should be equivalent to Result<Result<(i32, i64), (KafkaError, OwnedMessage)>> -> () (since we don't use the option. Calling expect() here would be fine since it's illegal for log_produce_result to return any error.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm using Result::and_then not Option::and_then so a Result will be returned, not an Option

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the .ok() here will consume the result and return an option.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea basically I'm saying, "even if the and_then has an error in the result (which it won't), it's okay anyways". That way, the compile won't complain about an unchecked result.

msg.topic(),
msg.partition(),
msg.offset(),
deserialize(msg.key())?,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could just be

    msg.key()
        .map(|s| std::str::from_utf8(s)?)
        .unwrap_or("");

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea but then I have to have that same code for the value which is why I abstracted it out into a function.

@ybyzek ybyzek merged commit f731310 into confluentinc:5.2.1-post Jun 11, 2019
@frankgreco frankgreco deleted the rust branch June 11, 2019 23:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants