Skip to content

Commit

Permalink
Add custom partitioner (#625)
Browse files Browse the repository at this point in the history
* adding custom partitioner

* CustomPartitioner takes partition as constructor argument

* adding test for CustomPartitioner

* style issues

* exporting CustomPartitinoer

* updated README

* added exception if pass in customPartitioner with setting customPartitioner type

* style
  • Loading branch information
bkim54 authored and hyperlink committed Mar 15, 2017
1 parent e4fcfe1 commit 8cba9d6
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 12 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Closes the connection to Zookeeper and the brokers so that the node process can
* `cb`: **Function**, the callback

## Producer
### Producer(client, [options])
### Producer(client, [options], [customPartitioner])
* `client`: client which keeps a connection with the Kafka server.
* `options`: options for producer,

Expand All @@ -76,7 +76,7 @@ Closes the connection to Zookeeper and the brokers so that the node process can
requireAcks: 1,
// The amount of time in milliseconds to wait for all acks before considered, default 100ms
ackTimeoutMs: 100,
// Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3), default 0
// Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0
partitionerType: 2
}
```
Expand Down Expand Up @@ -162,7 +162,7 @@ producer.createTopics(['t'], function (err, data) {});// Simply omit 2nd arg


## HighLevelProducer
### HighLevelProducer(client, [options])
### HighLevelProducer(client, [options], [customPartitioner])
* `client`: client which keeps a connection with the Kafka server. Round-robins produce requests to the available topic partitions
* `options`: options for producer,

Expand All @@ -172,7 +172,7 @@ producer.createTopics(['t'], function (err, data) {});// Simply omit 2nd arg
requireAcks: 1,
// The amount of time in milliseconds to wait for all acks before considered, default 100ms
ackTimeoutMs: 100,
// Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3), default 2
// Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 2
partitionerType: 3
}
```
Expand Down
1 change: 1 addition & 0 deletions kafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ exports.DefaultPartitioner = require('./lib/partitioner').DefaultPartitioner;
exports.CyclicPartitioner = require('./lib/partitioner').CyclicPartitioner;
exports.RandomPartitioner = require('./lib/partitioner').RandomPartitioner;
exports.KeyedPartitioner = require('./lib/partitioner').KeyedPartitioner;
exports.CustomPartitioner = require('./lib/partitioner').CustomPartitioner;
18 changes: 14 additions & 4 deletions lib/baseProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,22 @@ var DefaultPartitioner = partitioner.DefaultPartitioner;
var RandomPartitioner = partitioner.RandomPartitioner;
var CyclicPartitioner = partitioner.CyclicPartitioner;
var KeyedPartitioner = partitioner.KeyedPartitioner;
var CustomPartitioner = partitioner.CustomPartitioner;

var PARTITIONER_TYPES = {
default: 0,
random: 1,
cyclic: 2,
keyed: 3
keyed: 3,
custom: 4
};

var PARTITIONER_MAP = {
0: DefaultPartitioner,
1: RandomPartitioner,
2: CyclicPartitioner,
3: KeyedPartitioner
3: KeyedPartitioner,
4: CustomPartitioner
};

var DEFAULTS = {
Expand All @@ -46,9 +49,10 @@ var DEFAULTS = {
* @param {Number} [options.ackTimeoutMs=100] The amount of time in milliseconds to wait for all acks before considered
* the message as errored
* @param {Number} [defaultPartitionType] The default partitioner type
* @param {Object} [customPartitioner] a custom partitinoer to use of the form: function (partitions, key)
* @constructor
*/
function BaseProducer (client, options, defaultPartitionerType) {
function BaseProducer (client, options, defaultPartitionerType, customPartitioner) {
options = options || {};

this.ready = false;
Expand All @@ -61,10 +65,16 @@ function BaseProducer (client, options, defaultPartitionerType) {
? DEFAULTS.ackTimeoutMs
: options.ackTimeoutMs;

if (customPartitioner !== undefined && options.partitionerType !== PARTITIONER_TYPES.custom) {
throw new Error('Partitioner Type must be custom if providing a customPartitioner.');
} else if (customPartitioner === undefined && options.partitionerType === PARTITIONER_TYPES.custom) {
throw new Error('No customer partitioner defined');
}

var partitionerType = PARTITIONER_MAP[options.partitionerType] || PARTITIONER_MAP[defaultPartitionerType];

// eslint-disable-next-line
this.partitioner = new partitionerType();
this.partitioner = new partitionerType(customPartitioner);

this.connect();
}
Expand Down
4 changes: 2 additions & 2 deletions lib/highLevelProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ var util = require('util');
var BaseProducer = require('./baseProducer');

/** @inheritdoc */
function HighLevelProducer (client, options) {
BaseProducer.call(this, client, options, BaseProducer.PARTITIONER_TYPES.cyclic);
function HighLevelProducer (client, options, customPartitioner) {
BaseProducer.call(this, client, options, BaseProducer.PARTITIONER_TYPES.cyclic, customPartitioner);
}

util.inherits(HighLevelProducer, BaseProducer);
Expand Down
6 changes: 6 additions & 0 deletions lib/partitioner.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,13 @@ KeyedPartitioner.prototype.getPartition = function (partitions, key) {
return partitions[index];
};

var CustomPartitioner = function (partitioner) {
this.getPartition = partitioner;
};
util.inherits(CustomPartitioner, Partitioner);

module.exports.DefaultPartitioner = DefaultPartitioner;
module.exports.CyclicPartitioner = CyclicPartitioner;
module.exports.RandomPartitioner = RandomPartitioner;
module.exports.KeyedPartitioner = KeyedPartitioner;
module.exports.CustomPartitioner = CustomPartitioner;
4 changes: 2 additions & 2 deletions lib/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ var util = require('util');
var BaseProducer = require('./baseProducer');

/** @inheritdoc */
function Producer (client, options) {
BaseProducer.call(this, client, options, BaseProducer.PARTITIONER_TYPES.default);
function Producer (client, options, customPartitioner) {
BaseProducer.call(this, client, options, BaseProducer.PARTITIONER_TYPES.default, customPartitioner);
}

util.inherits(Producer, BaseProducer);
Expand Down
17 changes: 17 additions & 0 deletions test/test.partitioner.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ var DefaultPartitioner = kafka.DefaultPartitioner;
var RandomPartitioner = kafka.RandomPartitioner;
var CyclicPartitioner = kafka.CyclicPartitioner;
var KeyedPartitioner = kafka.KeyedPartitioner;
var CustomPartitioner = kafka.CustomPartitioner;

function getPartitions (partitioner, partitions, count) {
var arr = [];
Expand Down Expand Up @@ -80,4 +81,20 @@ describe('Partitioner', function () {
});
});
});

describe('CustomPartitioner', function () {
function getPartition (partitions, key) {
return partitions[partitions.length - 1];
}

var partitioner = new CustomPartitioner(getPartition);

describe('#getPartition', function () {
it('should always return the last partition', function () {
var partitions = _.uniq(getPartitions(partitioner, [0, 1, 2, 3], 100));
partitions.should.have.length(1);
partitions.should.containEql(3);
});
});
});
});

0 comments on commit 8cba9d6

Please sign in to comment.