From 8cba9d60d8ebdee32ae1d7668b05d4030fb19948 Mon Sep 17 00:00:00 2001 From: Bill Kim Date: Wed, 15 Mar 2017 11:21:05 -0400 Subject: [PATCH] Add custom partitioner (#625) * adding custom partitioner * CustomPartitioner takes partition as constructor argument * adding test for CustomPartitioner * style issues * exporting CustomPartitinoer * updated README * added exception if pass in customPartitioner with setting customPartitioner type * style --- README.md | 8 ++++---- kafka.js | 1 + lib/baseProducer.js | 18 ++++++++++++++---- lib/highLevelProducer.js | 4 ++-- lib/partitioner.js | 6 ++++++ lib/producer.js | 4 ++-- test/test.partitioner.js | 17 +++++++++++++++++ 7 files changed, 46 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 4e93cf80..95729bfa 100644 --- a/README.md +++ b/README.md @@ -66,7 +66,7 @@ Closes the connection to Zookeeper and the brokers so that the node process can * `cb`: **Function**, the callback ## Producer -### Producer(client, [options]) +### Producer(client, [options], [customPartitioner]) * `client`: client which keeps a connection with the Kafka server. * `options`: options for producer, @@ -76,7 +76,7 @@ Closes the connection to Zookeeper and the brokers so that the node process can requireAcks: 1, // The amount of time in milliseconds to wait for all acks before considered, default 100ms ackTimeoutMs: 100, - // Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3), default 0 + // Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0 partitionerType: 2 } ``` @@ -162,7 +162,7 @@ producer.createTopics(['t'], function (err, data) {});// Simply omit 2nd arg ## HighLevelProducer -### HighLevelProducer(client, [options]) +### HighLevelProducer(client, [options], [customPartitioner]) * `client`: client which keeps a connection with the Kafka server. Round-robins produce requests to the available topic partitions * `options`: options for producer, @@ -172,7 +172,7 @@ producer.createTopics(['t'], function (err, data) {});// Simply omit 2nd arg requireAcks: 1, // The amount of time in milliseconds to wait for all acks before considered, default 100ms ackTimeoutMs: 100, - // Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3), default 2 + // Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 2 partitionerType: 3 } ``` diff --git a/kafka.js b/kafka.js index 6dad804f..dc638bc0 100644 --- a/kafka.js +++ b/kafka.js @@ -10,3 +10,4 @@ exports.DefaultPartitioner = require('./lib/partitioner').DefaultPartitioner; exports.CyclicPartitioner = require('./lib/partitioner').CyclicPartitioner; exports.RandomPartitioner = require('./lib/partitioner').RandomPartitioner; exports.KeyedPartitioner = require('./lib/partitioner').KeyedPartitioner; +exports.CustomPartitioner = require('./lib/partitioner').CustomPartitioner; diff --git a/lib/baseProducer.js b/lib/baseProducer.js index aec0fcb4..78197957 100644 --- a/lib/baseProducer.js +++ b/lib/baseProducer.js @@ -13,19 +13,22 @@ var DefaultPartitioner = partitioner.DefaultPartitioner; var RandomPartitioner = partitioner.RandomPartitioner; var CyclicPartitioner = partitioner.CyclicPartitioner; var KeyedPartitioner = partitioner.KeyedPartitioner; +var CustomPartitioner = partitioner.CustomPartitioner; var PARTITIONER_TYPES = { default: 0, random: 1, cyclic: 2, - keyed: 3 + keyed: 3, + custom: 4 }; var PARTITIONER_MAP = { 0: DefaultPartitioner, 1: RandomPartitioner, 2: CyclicPartitioner, - 3: KeyedPartitioner + 3: KeyedPartitioner, + 4: CustomPartitioner }; var DEFAULTS = { @@ -46,9 +49,10 @@ var DEFAULTS = { * @param {Number} [options.ackTimeoutMs=100] The amount of time in milliseconds to wait for all acks before considered * the message as errored * @param {Number} [defaultPartitionType] The default partitioner type + * @param {Object} [customPartitioner] a custom partitinoer to use of the form: function (partitions, key) * @constructor */ -function BaseProducer (client, options, defaultPartitionerType) { +function BaseProducer (client, options, defaultPartitionerType, customPartitioner) { options = options || {}; this.ready = false; @@ -61,10 +65,16 @@ function BaseProducer (client, options, defaultPartitionerType) { ? DEFAULTS.ackTimeoutMs : options.ackTimeoutMs; + if (customPartitioner !== undefined && options.partitionerType !== PARTITIONER_TYPES.custom) { + throw new Error('Partitioner Type must be custom if providing a customPartitioner.'); + } else if (customPartitioner === undefined && options.partitionerType === PARTITIONER_TYPES.custom) { + throw new Error('No customer partitioner defined'); + } + var partitionerType = PARTITIONER_MAP[options.partitionerType] || PARTITIONER_MAP[defaultPartitionerType]; // eslint-disable-next-line - this.partitioner = new partitionerType(); + this.partitioner = new partitionerType(customPartitioner); this.connect(); } diff --git a/lib/highLevelProducer.js b/lib/highLevelProducer.js index 139a927c..8091f9b6 100644 --- a/lib/highLevelProducer.js +++ b/lib/highLevelProducer.js @@ -4,8 +4,8 @@ var util = require('util'); var BaseProducer = require('./baseProducer'); /** @inheritdoc */ -function HighLevelProducer (client, options) { - BaseProducer.call(this, client, options, BaseProducer.PARTITIONER_TYPES.cyclic); +function HighLevelProducer (client, options, customPartitioner) { + BaseProducer.call(this, client, options, BaseProducer.PARTITIONER_TYPES.cyclic, customPartitioner); } util.inherits(HighLevelProducer, BaseProducer); diff --git a/lib/partitioner.js b/lib/partitioner.js index e5d95814..5feae553 100644 --- a/lib/partitioner.js +++ b/lib/partitioner.js @@ -56,7 +56,13 @@ KeyedPartitioner.prototype.getPartition = function (partitions, key) { return partitions[index]; }; +var CustomPartitioner = function (partitioner) { + this.getPartition = partitioner; +}; +util.inherits(CustomPartitioner, Partitioner); + module.exports.DefaultPartitioner = DefaultPartitioner; module.exports.CyclicPartitioner = CyclicPartitioner; module.exports.RandomPartitioner = RandomPartitioner; module.exports.KeyedPartitioner = KeyedPartitioner; +module.exports.CustomPartitioner = CustomPartitioner; diff --git a/lib/producer.js b/lib/producer.js index e6fcaa29..0507c1d3 100644 --- a/lib/producer.js +++ b/lib/producer.js @@ -4,8 +4,8 @@ var util = require('util'); var BaseProducer = require('./baseProducer'); /** @inheritdoc */ -function Producer (client, options) { - BaseProducer.call(this, client, options, BaseProducer.PARTITIONER_TYPES.default); +function Producer (client, options, customPartitioner) { + BaseProducer.call(this, client, options, BaseProducer.PARTITIONER_TYPES.default, customPartitioner); } util.inherits(Producer, BaseProducer); diff --git a/test/test.partitioner.js b/test/test.partitioner.js index ff65983e..0522a160 100644 --- a/test/test.partitioner.js +++ b/test/test.partitioner.js @@ -6,6 +6,7 @@ var DefaultPartitioner = kafka.DefaultPartitioner; var RandomPartitioner = kafka.RandomPartitioner; var CyclicPartitioner = kafka.CyclicPartitioner; var KeyedPartitioner = kafka.KeyedPartitioner; +var CustomPartitioner = kafka.CustomPartitioner; function getPartitions (partitioner, partitions, count) { var arr = []; @@ -80,4 +81,20 @@ describe('Partitioner', function () { }); }); }); + + describe('CustomPartitioner', function () { + function getPartition (partitions, key) { + return partitions[partitions.length - 1]; + } + + var partitioner = new CustomPartitioner(getPartition); + + describe('#getPartition', function () { + it('should always return the last partition', function () { + var partitions = _.uniq(getPartitions(partitioner, [0, 1, 2, 3], 100)); + partitions.should.have.length(1); + partitions.should.containEql(3); + }); + }); + }); });