Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Missing 'key' for message with 'key' field sends by Producer #699

Closed
twang2218 opened this issue Jul 5, 2017 · 7 comments
Closed

Missing 'key' for message with 'key' field sends by Producer #699

twang2218 opened this issue Jul 5, 2017 · 7 comments

Comments

@twang2218
Copy link

Bug Report

In producer.send(payload), if the payload is something like:

{
      topic: 'node-test4',
      messages: 'my test message',
      key: 'lowkey',
}

the key should be the message key, ref: https://github.com/SOHU-Co/kafka-node#sendpayloads-cb-1

However, the after checking the Kafka side, the key is missing.

Environment

  • Node version: v8.1.3
  • Kafka-node version: 1.6.2
  • Kafka version: v0.10.2.1

For specific cases also provide

  • Number of Brokers: 1
  • Number partitions for topic: 1

Include Sample Code to reproduce behavior

const kafka = require('kafka-node');

const { HighLevelProducer, KeyedMessage, Client } = kafka;

const client = new Client('localhost:2181', 'my-client-id');

client.on('error', error => console.error(error));

const producer = new HighLevelProducer(client, {
  requireAcks: 1,
  partitionerType: 3,
});

let count = 0;
producer.on('ready', () => {
  client.refreshMetadata(['node-test5'], (error) => {
    if (error) {
      console.error(error);
      return;
    }
    setInterval(() => {
      count += 1;
      // Create a new payload
      const payload = [
        {
          topic: 'node-test5',
          messages: `message ${count}`,
          key: 'lowkey',
        },
        {
          topic: 'node-test5',
          messages: new KeyedMessage('highkey', `message ${count}`),
        },
      ];
      producer.send(payload, (err, result) => {
        console.info('Sent payload to Kafka: ', payload);
        if (err) {
          console.error(err);
        } else {
          const formattedResult = result[0];
          console.log(`result: ${formattedResult}`);
        }
      });
    }, 2000);
  });
});

producer.on('error', error => console.error(error));

For the comparison, I also send a message which is KeyedMessage object.

Here is the output of the node index.js

  kafka-node:Client refresh metadata currentAttempt 1 +0ms
Sent payload to Kafka:  [ { topic: 'node-test5',
    messages: 'message 1',
    key: 'lowkey',
    partition: 0,
    attributes: 0 },
  { topic: 'node-test5',
    messages: KeyedMessage { magic: 0, attributes: 0, key: 'highkey', value: 'message 1' },
    partition: 0,
    attributes: 0 } ]
result: undefined
Sent payload to Kafka:  [ { topic: 'node-test5',
    messages: 'message 2',
    key: 'lowkey',
    partition: 0,
    attributes: 0 },
  { topic: 'node-test5',
    messages: KeyedMessage { magic: 0, attributes: 0, key: 'highkey', value: 'message 2' },
    partition: 0,
    attributes: 0 } ]
result: undefined
Sent payload to Kafka:  [ { topic: 'node-test5',
    messages: 'message 3',
    key: 'lowkey',
    partition: 0,
    attributes: 0 },
  { topic: 'node-test5',
    messages: KeyedMessage { magic: 0, attributes: 0, key: 'highkey', value: 'message 3' },
    partition: 0,
    attributes: 0 } ]
result: undefined

However, on the Kafka side, the result is different:

[
  {
    "topic": "node-test5",
    "key": "�ée",
    "value": "message 1",
    "partition": 0,
    "offset": 0
  },
  {
    "topic": "node-test5",
    "key": "highkey",
    "value": "message 1",
    "partition": 0,
    "offset": 1
  },
  {
    "topic": "node-test5",
    "key": "�ée",
    "value": "message 2",
    "partition": 0,
    "offset": 2
  },
  {
    "topic": "node-test5",
    "key": "highkey",
    "value": "message 2",
    "partition": 0,
    "offset": 3
  },
  {
    "topic": "node-test5",
    "key": "�ée",
    "value": "message 3",
    "partition": 0,
    "offset": 4
  },
  {
    "topic": "node-test5",
    "key": "highkey",
    "value": "message 3",
    "partition": 0,
    "offset": 5
  },
  {
    "topic": "node-test5",
    "key": "�ée",
    "value": "message 1",
    "partition": 0,
    "offset": 6
  },
  {
    "topic": "node-test5",
    "key": "highkey",
    "value": "message 1",
    "partition": 0,
    "offset": 7
  },
  {
    "topic": "node-test5",
    "key": "�ée",
    "value": "message 2",
    "partition": 0,
    "offset": 8
  },
  {
    "topic": "node-test5",
    "key": "highkey",
    "value": "message 2",
    "partition": 0,
    "offset": 9
  }
]

As you can see, all the lowkey key are missing, only key in the KeyedMessage are there.
Thanks for your contribution!

@twang2218
Copy link
Author

Is it the bug here?
https://github.com/SOHU-Co/kafka-node/blob/master/lib/baseProducer.js#L133-L138

    messages = messages.map(function (message) {
      if (message instanceof KeyedMessage) {
        return message;
      }
-      return new Message(0, 0, '', message);
+      return new Message(0, 0, p.key || '', message);
    });

I think it might because the p.key is missing here.

@twang2218
Copy link
Author

After I changed it to p.key || '', the result seems right.

[
  {
    "topic": "node-test6",
    "key": "lowkey",
    "value": "message 1",
    "partition": 0,
    "offset": 0
  },
  {
    "topic": "node-test6",
    "key": "highkey",
    "value": "message 1",
    "partition": 0,
    "offset": 1
  },
  {
    "topic": "node-test6",
    "key": "lowkey",
    "value": "message 2",
    "partition": 0,
    "offset": 2
  },
  {
    "topic": "node-test6",
    "key": "highkey",
    "value": "message 2",
    "partition": 0,
    "offset": 3
  },
  {
    "topic": "node-test6",
    "key": "lowkey",
    "value": "message 3",
    "partition": 0,
    "offset": 4
  },
  {
    "topic": "node-test6",
    "key": "highkey",
    "value": "message 3",
    "partition": 0,
    "offset": 5
  }
]

twang2218 added a commit to twang2218/kafka-node that referenced this issue Jul 5, 2017
@twang2218
Copy link
Author

I created a PR #700 for this issue.

twang2218 added a commit to twang2218/kafka-node that referenced this issue Jul 6, 2017
@hyperlink
Copy link
Collaborator

Hi @twang2218 lets continue our discussion here instead of on the PR.

I believe we should allow any binary data for the key. In the past I thought perhaps a string key would be best (see #361) but since there's no restriction in the Kafka world we should treat it the same as messages are treated-- which is to let the user produce any binary data and the consumer define the output using the encoding option. What do you think?

@twang2218
Copy link
Author

If we allow the key can be anything, then KeyedPartitioner.hashCode() also has to be changed as well, as it assumed the key is string:

KeyedPartitioner.prototype.hashCode = function (string) {
var hash = 0;
var length = string.length;
for (var i = 0; i < length; i++) {
hash = ((hash * 31) + string.charCodeAt(i)) & 0x7fffffff;
}
return (hash === 0) ? 1 : hash;
};

KeyedPartitioner.prototype.hashCode = function (string) {
  var hash = 0;
  var length = string.length;

  for (var i = 0; i < length; i++) {
    hash = ((hash * 31) + string.charCodeAt(i)) & 0x7fffffff;
  }

  return (hash === 0) ? 1 : hash;
};

But I do think the key should be able to be any type, the default type might still be string, however, it should be able to be changed by the user.

In Java client, when creating the KafkaProducer object, both key and value's serializer should be defined.

https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

We can do the same, add 2 options in the BaseProducer for key and value serializers, and use string as default ones.

As well as Consumer, just like #702, it has to return the Buffer because there is no indication in the options for how to decode the key.

@hyperlink
Copy link
Collaborator

If a user is setting a Buffer as the key they must provide their own custom partitioner to handle that case.

I propose making the key consistent with how messages are treated.

  • If the key is null or undefined we set the length to -1. (will be consumed as null)
  • If the key is explicitly set as a Buffer we store it as is otherwise we store it as a binary string.

Decoding it on the consumer side will depending on the keyEncoding option and if it doesn't exist we could fallback to using encoding.

@hyperlink
Copy link
Collaborator

Fixed in #704 and published as 2.0.0. Thanks for your help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants