Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spurious EoF from C++ API #182

Closed
vencik opened this issue Jan 16, 2015 · 35 comments
Closed

Spurious EoF from C++ API #182

vencik opened this issue Jan 16, 2015 · 35 comments

Comments

@vencik
Copy link

vencik commented Jan 16, 2015

I'm having trouble reading from queue; sometimes (the problem doesn't seem to be really deterministic, it emerges erratically) I get into a situation where I only can read 1 message and a second attempt ends with EoF (empty queue), although it's positively full of messages.

I read the queue starting with saved msgs.

When I re-create the consumer, then again, I read one message and the get an EoF.
When I re-start the whole process, I can read normally.

@edenhill
Copy link
Contributor

Sounds like you are starting to read from the last offset, so you will only get the last message and then EOF. Is this correct?

@vencik
Copy link
Author

vencik commented Jan 16, 2015

Nope; I read starting from unread messages; model situation:
There are 100 msgs in the queue, during the last batch run, I've read 50 of them.
I create the consumer with RdKafka::Topic::OFFSET_STORED.
I read messages #51 to #100.
Then I get EoF.
I wait for some time, during which let's say another 50 messages are produced.
I create consumer and get msg #101, then EoF.
I create consumer again and get msg #102, then EoF, and so on...

Sometimes it happens after several successful batches...

@edenhill
Copy link
Contributor

Huhm, are you creating and destroying (or start stopping) your consumer between batches?
Can you explain your program abit more and maybe show a code sample of relevant kafka parts?

@vencik
Copy link
Author

vencik commented Jan 16, 2015

No, sorry, I mislead you, here; the topic and consumer is cached throughout the process life. Is that wrong, maybe?

Basically, my program is a system service that is triggered (by a client), a new processing instance that reads all the newly produced messages and does some job.
The topic & consumer creation:

    std::unique_ptr<RdKafka::Consumer> in_consumer(
        RdKafka::Consumer::create(m_global_conf.get(), error));

    if (NULL == in_consumer) {
        error("Failed to create input consumer: %s", error.c_str());

        throw std::runtime_error(
            "messaging::mq_client: failed to create input consumer");
    }

    std::unique_ptr<RdKafka::Topic> in_topic(RdKafka::Topic::create(
        in_consumer.get(), input_q, m_topic_conf.get(), error));

    if (NULL == in_topic) {
        error("Failed to create input consumer topic: %s", error.c_str());

        throw std::runtime_error(
            "messaging::mq_client: failed to create input consumer topic");
    }

    RdKafka::ErrorCode in_start = in_consumer->start(
        in_topic.get(), m_partition, RdKafka::Topic::OFFSET_STORED);

    if (RdKafka::ERR_NO_ERROR != in_start) {
        error("Failed to start signal receiver: %s",
              RdKafka::err2str(in_start).c_str());

        throw std::runtime_error(
            "messaging::mq_client: failed to start signal receiver");
    }

Receiving a message:

    std::unique_ptr<RdKafka::Message> msg;

    debug("Receiving message");

    msg = std::unique_ptr<RdKafka::Message>(consumer->consume(
        topic, m_partition, timeout));

    switch (msg->err()) {
        case RdKafka::ERR_NO_ERROR:  // message ready
            debug("Received message '%.*s' at offset %zu",
                  (int)msg->len(), msg->payload(), msg->offset());

            msg_data = std::string(
                reinterpret_cast<char *>(msg->payload()), msg->len());

            return true;

        case RdKafka::ERR__TIMED_OUT:  // timeout
            debug("Timed out (%u ms)", timeout);

            msg_data.clear();

            return true;

        case RdKafka::ERR__PARTITION_EOF:  // queue end reached
            debug("Message queue is empty");

            return false;

        default:  // error
            error("Error receiving message: %s", msg->errstr().c_str());

            throw std::runtime_error(
                "messaging::mq_client: error receiving message");
    }

    // Unreachable code
    throw std::logic_error(
        "messaging::mq_client: Unresolved receive error code");

@vencik
Copy link
Author

vencik commented Jan 19, 2015

Another weird observation: If there's no new message in the queue between the batches, I mostly get the last one (over and over again) and sometimes get timeout, instead.

@vencik
Copy link
Author

vencik commented Jan 19, 2015

kafkacat doesn't seem to suffer from such problems, however.
I know that the C++ API is considered experimental...

@edenhill
Copy link
Contributor

Is this reproducible with examples/rdkafka_example_cpp?

@vencik
Copy link
Author

vencik commented Jan 19, 2015

Doesn't seem so; let me go through the example code and compare it with mine...

@edenhill edenhill reopened this Jan 19, 2015
@vencik
Copy link
Author

vencik commented Jan 19, 2015

Yes, every 5 minutes (i.e. every batch). Is that right or wrong?

@edenhill
Copy link
Contributor

I guess that was a reply to the comment I mistakenly removed asking if you were start/stopping your consumer repeatedly?

@vencik
Copy link
Author

vencik commented Jan 19, 2015

And, btw, I'm consuming/producing from/to multiple different queues (topics).

@vencik
Copy link
Author

vencik commented Jan 19, 2015

Yup, I start/stop the consumer before/after each batch.
But I haven't used to; when the problem originally emerged, I've just started it upon the process startup and never stopped it, since.

@edenhill
Copy link
Contributor

Unless you are going to jump to a different offset (rather than the next one) you shouldn't start/stop your consumer, just keep it idling until you need it again.

@vencik
Copy link
Author

vencik commented Jan 19, 2015

OK

@vencik
Copy link
Author

vencik commented Jan 19, 2015

Stupid, but perhaps key question; do I need separate topic config object per every topic?

@edenhill
Copy link
Contributor

In the C API: Yes, the config objects are taken over by the API when you pass them.
If you want to reuse config objects you can use the _dup() function.

In the C++ API: No, config objects are reusable.

@vencik
Copy link
Author

vencik commented Jan 19, 2015

So the problem with repeated reading of the last message in the queue seems to have disappeared (although I seem to get RdKafka::ERR__TIMED_OUT return status rather than RdKafka::ERR__PARTITION_EOF which I'd expect).

@vencik
Copy link
Author

vencik commented Jan 19, 2015

But I guess that's because I use timeout of 0.

@edenhill
Copy link
Contributor

Is getting TIMED_OUT rather than PARTITION_EOF a problem for you?
I'm guessing you dont want to use a timoeut of 1?

@vencik
Copy link
Author

vencik commented Jan 19, 2015

No, it's not a problem (just noted that in case it'd be relevant).

@edenhill
Copy link
Contributor

Okay, so the message replay is gone now, do you have any idea why?

@vencik
Copy link
Author

vencik commented Jan 19, 2015

Well, I did what you've advised: I'm now starting the consumer just once and leave it idle between the batches...

@edenhill
Copy link
Contributor

Okay, so there is a bug in the restart code somewhere, I'll look into it.
Thanks

@vencik
Copy link
Author

vencik commented Jan 19, 2015

Unfortunately, the component that feeds me with the messages is currently down, so I can't see if the original issue is still emerging (no new messages are coming).
However, apart from a bit of refactoring, the code I have now is pretty much the same as what I've started with...

@vencik
Copy link
Author

vencik commented Jan 19, 2015

Thanks a lot

@vencik
Copy link
Author

vencik commented Jan 19, 2015

I was just able to reproduce the behaviour described above with a small modification of the rdkafka_example.cpp code (see the -B and -T options added):

diff --git a/examples/rdkafka_example.cpp b/examples/rdkafka_example.cpp
index 9e03a01..e1d7203 100644
--- a/examples/rdkafka_example.cpp
+++ b/examples/rdkafka_example.cpp
@@ -39,7 +39,10 @@
 #include <csignal>
 #include <cstring>

+extern "C" {
 #include <getopt.h>
+#include <unistd.h>
+}


 /*
@@ -132,6 +135,8 @@ int main (int argc, char **argv) {
   bool do_conf_dump = false;
   char opt;
   MyHashPartitionerCb hash_partitioner;
+  int batch_delay = 0;
+  int timeout = 1000;

   /*
    * Create configuration objects
@@ -140,7 +145,7 @@ int main (int argc, char **argv) {
   RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);


-  while ((opt = getopt(argc, argv, "PCt:p:b:z:qd:o:eX:AM:")) != -1) {
+  while ((opt = getopt(argc, argv, "PCt:p:b:z:qd:o:eX:AM:B:T:")) != -1) {
     switch (opt) {
     case 'P':
     case 'C':
@@ -229,6 +234,14 @@ int main (int argc, char **argv) {
       }
       break;

+    case 'B':  // consumer batch mode on
+      batch_delay = std::atoi(optarg);
+      break;
+
+    case 'T':  // consumer timeout
+      timeout = std::atoi(optarg);
+      break;
+
     default:
       goto usage;
     }
@@ -263,6 +276,10 @@ int main (int argc, char **argv) {
             "will be set on topic object.\n"
             "                  Use '-X list' to see the full list\n"
             "                  of supported properties.\n"
+            "  -B <delay>      Consumer batch mode (will sleep for\n"
+            "                  <delay> seconds when queue end is reached\n"
+            "                  and then continue)\n"
+            "  -T <timeout>    Consumer timeout [s], default: 1000\n"
             "\n"
             " In Consumer mode:\n"
             "  writes fetched messages to stdout\n"
@@ -429,37 +446,46 @@ int main (int argc, char **argv) {
     /*
      * Consume messages
      */
-    while (run) {
-      RdKafka::Message *msg = consumer->consume(topic, partition, 1000);
-
-      switch (msg->err())
-      {
-        case RdKafka::ERR__TIMED_OUT:
-          break;
-
-        case RdKafka::ERR_NO_ERROR:
-     /* Real message */
-     std::cerr << "Read msg at offset " << msg->offset() << std::endl;
-          printf("%.*s\n",
-                 static_cast<int>(msg->len()),
-                 static_cast<const char *>(msg->payload()));
-     break;
-
-   case RdKafka::ERR__PARTITION_EOF:
-     /* Last message */
-     if (exit_eof)
-       run = false;
-     break;
-
-   default:
-     /* Errors */
-     std::cerr << "Consume failed: " << msg->errstr() << std::endl;
-     run = false;
-   }
-
-      delete msg;
+    for (;;) {
+      while (run) {
+        RdKafka::Message *msg = consumer->consume(topic, partition, timeout);
+
+        switch (msg->err())
+        {
+          case RdKafka::ERR__TIMED_OUT:
+            break;
+
+          case RdKafka::ERR_NO_ERROR:
+            /* Real message */
+            std::cerr << "Read msg at offset " << msg->offset() << std::endl;
+            printf("%.*s\n",
+                   static_cast<int>(msg->len()),
+                   static_cast<const char *>(msg->payload()));
+            break;
+
+          case RdKafka::ERR__PARTITION_EOF:
+            /* Last message */
+            if (exit_eof || 0 != batch_delay)
+              run = false;
+            break;
+
+          default:
+            /* Errors */
+            std::cerr << "Consume failed: " << msg->errstr() << std::endl;
+            run = false;
+          }
+
+        delete msg;
+
+        consumer->poll(0);
+      }

-      consumer->poll(0);
+      if (0 != batch_delay) {
+        std::cerr << "Sleeping for " << batch_delay << " seconds..." << std::endl;
+        sleep(batch_delay);
+        run = true;
+        continue;
+      }
     }

     /*

@vencik
Copy link
Author

vencik commented Jan 19, 2015

I run the consumer with -B 100 (100 seconds wait between the batches) and -T 0 (timeout of 0).
I push e.g. 3 messages to the queue while the sleep.
After resuming from the sleep, I only get the next message and EOF.

@edenhill
Copy link
Contributor

Big thanks for providing a reproducable patch, much kudos!

@vencik
Copy link
Author

vencik commented Jan 26, 2015

Hi, sorry to bother you; any news on the issue? Can I help in any way?

@edenhill
Copy link
Contributor

Sorry for the delay, I have reproduced the issue but have not sorted it out yet. Will take a closer look tomorrow.
Thanks for your patience

@vencik
Copy link
Author

vencik commented Jan 29, 2015

No problem, thanks.

@edenhill
Copy link
Contributor

Fixed on master

@maharajan1
Copy link

I am facing the same issue that C++ consumer API is returning with EOF and consuming fewer (1, 2) messages though queue has many messages. Can you let me know what is the version which contains the fix for the problem. Help is much appreciated.

@edenhill
Copy link
Contributor

@maharajan1 what version are you on now?

@maharajan1
Copy link

I am using version. librdkafka_2.10-0.8.2.1. Thanks for reply!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants