Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using messages beyond lifetime of consumer hangs thread #48

Closed
thijsc opened this issue Jun 6, 2017 · 2 comments
Closed

Using messages beyond lifetime of consumer hangs thread #48

thijsc opened this issue Jun 6, 2017 · 2 comments
Assignees

Comments

@thijsc
Copy link
Collaborator

thijsc commented Jun 6, 2017

If you try to collect messages and use them after dropping the consumer the thread hangs indefinitely.

This test (in produce_consume_base_test.rs) reproduces the issue:

// All produced messages should be consumed.
#[test]
fn test_produce_consume_messages_beyond_consumer_lifetime() {
    let _r = env_logger::init();

    let topic_name = rand_test_topic();
    let _message_map = produce_messages(&topic_name, 100, &value_fn, &key_fn, None, None);
    let mut messages = Vec::new();

    // Drop consumer before checking vector contents
    {
        let mut consumer = create_stream_consumer(&rand_test_group(), None);
        consumer.subscribe(&vec![topic_name.as_str()]).unwrap();

        let _consumer_future = consumer.start()
            .take(100)
            .for_each(|message| {
                match message {
                    Ok(m) => messages.push(m),
                    Err(e) => panic!("Error receiving message: {:?}", e)
                };
                Ok(())
            })
            .wait();
    }

    assert_eq!(100, messages.len());
}
@fede1024
Copy link
Owner

fede1024 commented Jun 9, 2017

Thanks for reporting the issue! This is probably because the correct termination sequence for librdkafka requires to destroy the messages before the consumer or the client are destroyed. This ordering is not currently enforced in rust rdkafka.

The right approach here is probably to have the lifetime of messages bound to the one of the consumer. I hope it won't make the API much more complicated.

@fede1024
Copy link
Owner

Each Message now hold a (phantom) reference to the consumer who created it. Your example won't compile anymore.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants