Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.7 branch can't seem to actually connect to kafka 0.7.0 ? #107

Closed
dfellis opened this issue Apr 22, 2014 · 3 comments
Closed

0.7 branch can't seem to actually connect to kafka 0.7.0 ? #107

dfellis opened this issue Apr 22, 2014 · 3 comments

Comments

@dfellis
Copy link

dfellis commented Apr 22, 2014

Hi, I'd like to use librdkafka for consuming from some high volume streams as I'd rather write C instead of Java (and oh god the Java buildchain needed for production...), but the example library can't seem to connect to Kafka 0.7.0

damocles@kafka-yo-kafka:~/librdkafka/examples$ ./rdkafka_example -C -t rt-san_francisco -b localhost:9093
%7|1398125017.755|CONNECTED|localhost:9093#consumer-0| connected to localhost:9093
% Error: Broker: Offset out of range (0)
% Error: Broker: Offset out of range (0)
% Error: Broker: Offset out of range (0)
^Cdamocles@kafka-yo-kafka:~/librdkafka/examples$ cd ../../kafka-0.7.0-incubating-src/
damocles@kafka-yo-kafka:~/kafka-0.7.0-incubating-src$ ./bin/kafka-console-consumer.sh --topic rt-san_francisco --zookeeper localhost:2182
[2014-04-22 00:04:07,133] INFO Connecting to zookeeper instance at localhost:2182 (kafka.consumer.ZookeeperConsumerConnector)
[2014-04-22 00:04:07,153] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2014-04-22 00:04:07,163] INFO Client environment:zookeeper.version=3.3.3-1073969, built on 02/23/2011 22:27 GMT (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,164] INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,164] INFO Client environment:java.version=1.6.0_30 (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,164] INFO Client environment:java.vendor=Sun Microsystems Inc. (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,164] INFO Client environment:java.home=/usr/lib/jvm/java-6-openjdk-amd64/jre (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,164] INFO Client environment:java.class.path=:./bin/../project/boot/scala-2.8.0/lib/scala-compiler.jar:./bin/../project/boot/scala-2.8.0/lib/scala-library.jar:./bin/../core/target/scala_2.8.0/kafka-0.7.0.jar:./bin/../core/lib/zkclient-20110412.jar:./bin/../core/lib_managed/scala_2.8.0/compile/jopt-simple-3.2.jar:./bin/../core/lib_managed/scala_2.8.0/compile/log4j-1.2.15.jar:./bin/../core/lib_managed/scala_2.8.0/compile/zookeeper-3.3.3.jar (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,164] INFO Client environment:java.library.path=/usr/lib/jvm/java-6-openjdk-amd64/jre/lib/amd64/server:/usr/lib/jvm/java-6-openjdk-amd64/jre/lib/amd64:/usr/lib/jvm/java-6-openjdk-amd64/jre/../lib/amd64:/usr/java/packages/lib/amd64:/usr/lib/jni:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,164] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,164] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,164] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,164] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,165] INFO Client environment:os.version=3.2.0-54-virtual (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,165] INFO Client environment:user.name=damocles (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,165] INFO Client environment:user.home=/home/damocles (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,165] INFO Client environment:user.dir=/home/damocles/kafka-0.7.0-incubating-src (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,166] INFO Initiating client connection, connectString=localhost:2182 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@2a74c252 (org.apache.zookeeper.ZooKeeper)
[2014-04-22 00:04:07,197] INFO Opening socket connection to server localhost/127.0.0.1:2182 (org.apache.zookeeper.ClientCnxn)
[2014-04-22 00:04:07,207] INFO Socket connection established to localhost/127.0.0.1:2182, initiating session (org.apache.zookeeper.ClientCnxn)
[2014-04-22 00:04:07,215] INFO Session establishment complete on server localhost/127.0.0.1:2182, sessionid = 0x1451fcfd9314693, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2014-04-22 00:04:07,218] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2014-04-22 00:04:07,221] INFO starting auto committer every 10000 ms (kafka.consumer.ZookeeperConsumerConnector)
[2014-04-22 00:04:07,313] INFO begin registering consumer console-consumer-82345_kafka-yo-kafka-1398125047273-388d7684 in ZK (kafka.consumer.ZookeeperConsumerConnector)
[2014-04-22 00:04:07,356] INFO end registering consumer console-consumer-82345_kafka-yo-kafka-1398125047273-388d7684 in ZK (kafka.consumer.ZookeeperConsumerConnector)
[2014-04-22 00:04:07,403] INFO begin rebalancing consumer console-consumer-82345_kafka-yo-kafka-1398125047273-388d7684 try #0 (kafka.consumer.ZookeeperConsumerConnector)
[2014-04-22 00:04:07,651] INFO Committing all offsets (kafka.consumer.ZookeeperConsumerConnector)
[2014-04-22 00:04:07,654] INFO Releasing partition ownership (kafka.consumer.ZookeeperConsumerConnector)
[2014-04-22 00:04:07,657] INFO Consumer console-consumer-82345_kafka-yo-kafka-1398125047273-388d7684 rebalancing the following partitions: List(0-0) for topic rt-san_francisco with consumers: List(console-consumer-82345_kafka-yo-kafka-1398125047273-388d7684-0) (kafka.consumer.ZookeeperConsumerConnector)
[2014-04-22 00:04:07,660] INFO console-consumer-82345_kafka-yo-kafka-1398125047273-388d7684-0 attempting to claim partition 0-0 (kafka.consumer.ZookeeperConsumerConnector)
[2014-04-22 00:04:07,704] INFO Consumer console-consumer-82345_kafka-yo-kafka-1398125047273-388d7684 selected partitions : rt-san_francisco:0-0: fetched offset = 1402874789: consumed offset = 1402874789 (kafka.consumer.ZookeeperConsumerConnector)
[2014-04-22 00:04:07,710] INFO end rebalancing consumer console-consumer-82345_kafka-yo-kafka-1398125047273-388d7684 try #0 (kafka.consumer.ZookeeperConsumerConnector)
[2014-04-22 00:04:07,711] INFO FetchRunnable-0 start fetching topic: rt-san_francisco part: 0 offset: 1402874789 from localhost:9093 (kafka.consumer.FetcherRunnable)
{"host":"kafka-yo-kafka","level":"warn","msg":"Request Timed Out {}","ts":1398125047.859,"isodate":"2014-04-22T00:04:07.859Z","peerId":2,"workerId":2}
{"host":"kafka-yo-kafka","level":"info","msg":"NCarClient consecutiveErrorCount: 110151 {}","ts":1398125047.859,"isodate":"2014-04-22T00:04:07.859Z","peerId":2,"workerId":2}
{"host":"kafka-yo-kafka","level":"info","msg":"NCarClient.makeNonOperational: false {}","ts":1398125047.859,"isodate":"2014-04-22T00:04:07.859Z","peerId":2,"workerId":2}
{"host":"kafka-yo-kafka","level":"warn","msg":"Request Timed Out {}","ts":1398125047.914,"isodate":"2014-04-22T00:04:07.914Z","peerId":1,"workerId":1}
{"host":"kafka-yo-kafka","level":"info","msg":"NCarClient consecutiveErrorCount: 110216 {}","ts":1398125047.915,"isodate":"2014-04-22T00:04:07.915Z","peerId":1,"workerId":1}
{"host":"kafka-yo-kafka","level":"info","msg":"NCarClient.makeNonOperational: false {}","ts":1398125047.915,"isodate":"2014-04-22T00:04:07.915Z","peerId":1,"workerId":1}
{"host":"kafka-yo-kafka","level":"info","msg":"Received response for request, url: /cities/san_francisco, statusCode: 200, x-powered-by: Express, cache-control: s-maxage=0, content-type: application/json; charset=utf-8, content-length: 801372, etag: \"1409535808\", date: Tue, 22 Apr 2014 00:04:08 GMT, connection: keep-alive {}","ts":1398125048.255,"isodate":"2014-04-22T00:04:08.255Z","peerId":1,"workerId":1}

The highlight reel:

  • Zookeeper and Kafka are running on non-default ports (literally adding 1 to each port number)
  • The official client correctly identifies the current offset, while rdkafka_example seems to think the offset == 0. [2014-04-22 00:04:07,704] INFO Consumer console-consumer-82345_kafka-yo-kafka-1398125047273-388d7684 selected partitions : rt-san_francisco:0-0: fetched offset = 1402874789: consumed offset = 1402874789 (kafka.consumer.ZookeeperConsumerConnector) versus % Error: Broker: Offset out of range (0)
  • The official client is using the exact same host:port combo to eventually communicate with kafka after the zookeeper handshake as being provided to rdkafka_example: [2014-04-22 00:04:07,711] INFO FetchRunnable-0 start fetching topic: rt-san_francisco part: 0 offset: 1402874789 from localhost:9093 (kafka.consumer.FetcherRunnable)

I started to dive into the code, but I'm not entirely familiar with the codebase, so not sure where this error is originating from.

@edenhill
Copy link
Contributor

Hi,

Broker addresses
In Apache Kafka 0.7 the clients (producers and consumers) need to integrate with zookeeper to automatically find out the proper broker for a topic and partition, in 0.8 this changed to a pure Kafka protocol method. Kafka is moving away from depending on zk on the client side - already done for the producer side.

Solution:
Since librdkafka 0.7 does not have zookeeper support the application/user will need to provide the proper address for the leader broker for the desired topic and partition. The alternative is to use virtual IPs to solve this on the network addressing layer.

In librdkafka 0.8/master there is no need for this since the same information is gathered automatically directly from the broker without the need for zk.

Initial offset
librdkafka 0.7 requires the application to provide the proper starting offset, and if not provided defaults to 0 which may be an expired offset such as in your case.
While the 0.7 protocol supports querying the broker for the oldest and newest offset this is not implemented in librdkafka 0.7 (but is in 0.8).
If offset store has been enabled librdkafka will store the last fetched offset in an offset file to use at the next restart, so this initial offset is only needed the first time the application is run.

Solution:

  • implement OFFSETS request in librdkafka 0.7 code base
  • or; provide the initial offset by other means

There is a big difference in functionality between librdkafka 0.7 and 0.8, and while 0.7 is used in some limited production setups, 0.8 is used at big scale (wikipedia for example) and it is where all development happens.

@edenhill
Copy link
Contributor

It is of course feasible to add the required zookeeper support to the application and propagate broker leader information to rdkafka 0.7.

@edenhill
Copy link
Contributor

Closing due to inactivity. Reopen if this is still an issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants