diff --git a/.gitignore b/.gitignore index f86a224..4e8397f 100644 --- a/.gitignore +++ b/.gitignore @@ -65,3 +65,7 @@ typings/ # lock files package-lock.json +yarn.lock + +# IDE +.vscode/ \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index f4665f6..8f0b6b1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,7 @@ language: node_js node_js: + - "14" - "12" - "10" diff --git a/example.js b/example.js index c6bef11..2434031 100644 --- a/example.js +++ b/example.js @@ -13,16 +13,15 @@ fastify .register(require('./'), { producer: { 'metadata.broker.list': '127.0.0.1:9092', - 'group.id': group, - 'fetch.wait.max.ms': 10, - 'fetch.error.backoff.ms': 50, - 'dr_cb': true + dr_cb: true }, consumer: { 'metadata.broker.list': '127.0.0.1:9092', 'group.id': group, 'fetch.wait.max.ms': 10, - 'fetch.error.backoff.ms': 50, + 'fetch.error.backoff.ms': 50 + }, + consumerTopicConf: { 'auto.offset.reset': 'earliest' } }) diff --git a/index.d.ts b/index.d.ts new file mode 100644 index 0000000..64a3cfc --- /dev/null +++ b/index.d.ts @@ -0,0 +1,42 @@ + +import { FastifyPlugin } from 'fastify'; +import { ConsumerGlobalConfig, ConsumerTopicConfig, KafkaConsumer, Message, MetadataOptions, Producer, ProducerGlobalConfig, ProducerTopicConfig } from 'node-rdkafka'; + +declare module 'fastify' { + interface FastifyKafkaMessage extends Pick { + payload: unknown; + } + + interface FastifyKafkaProducer { + producer: Producer; + push(message: FastifyKafkaMessage): void; + stop(done: () => void): void; + } + + interface FastifyKafkaConsumer extends Pick { + consumer: KafkaConsumer; + stop(done: () => void): void; + } + + interface Kafka extends Pick, Pick { + producer?: FastifyKafkaProducer; + consumer?: FastifyKafkaConsumer; + } + interface FastifyInstance { + kafka: Kafka; + } +} + +declare namespace fastifyKafka { + export interface FastifyKafkaOptions { + producer?: ProducerGlobalConfig; + consumer?: ConsumerGlobalConfig; + producerTopicConf?: ProducerTopicConfig; + consumerTopicConf?: ConsumerTopicConfig; + metadataOptions?: MetadataOptions; + } +} + +declare const fastifyKafka: FastifyPlugin; + +export default fastifyKafka; \ No newline at end of file diff --git a/index.js b/index.js index aa88358..2b8dd83 100644 --- a/index.js +++ b/index.js @@ -19,7 +19,7 @@ function fastifyKafka (fastify, opts, next) { } function buildProducer (fastify, opts, next) { - const producer = new Producer(opts.producer, fastify.log, next, opts.producerTopicConf) + const producer = new Producer(opts.producer, fastify.log, next, opts.producerTopicConf, opts.metadataOptions) fastify.kafka.producer = producer fastify.kafka.push = producer.push.bind(producer) @@ -30,7 +30,7 @@ function buildProducer (fastify, opts, next) { } function buildConsumer (fastify, opts, next) { - const consumer = new Consumer(opts.consumer, fastify.log, next, opts.consumerTopicConf) + const consumer = new Consumer(opts.consumer, fastify.log, next, opts.consumerTopicConf, opts.metadataOptions) fastify.kafka.consumer = consumer fastify.kafka.consume = consumer.consume.bind(consumer) fastify.kafka.subscribe = consumer.subscribe.bind(consumer) @@ -42,4 +42,7 @@ function buildConsumer (fastify, opts, next) { } } -module.exports = fp(fastifyKafka, '>=2.0.0') +module.exports = fp(fastifyKafka, { + fastify: '>=3', + name: 'fastify-kafka' +}) diff --git a/lib/consumer.js b/lib/consumer.js index cbf9594..6b534ef 100644 --- a/lib/consumer.js +++ b/lib/consumer.js @@ -4,14 +4,14 @@ const EE = require('events').EventEmitter const inherits = require('util').inherits const Kafka = require('node-rdkafka') -function Consumer (opts, log, next, topicConf) { +function Consumer (opts, log, next, topicConf, metadatOptions) { this._log = log this.consumer = new Kafka.KafkaConsumer(opts, topicConf || {}) var calledNext = false this.consumer.once('ready', onReady.bind(this)) this.consumer.on('event.error', onError.bind(this)) - this.consumer.connect({}, onConnect.bind(this)) + this.consumer.connect(metadatOptions || {}, onConnect.bind(this)) // register the handler only if the developer doesn't use // the callback provided on `consume` invocation diff --git a/lib/producer.js b/lib/producer.js index aef7dc3..0f7aab6 100644 --- a/lib/producer.js +++ b/lib/producer.js @@ -4,7 +4,7 @@ const EE = require('events').EventEmitter const inherits = require('util').inherits const Kafka = require('node-rdkafka') -function Consumer (opts, log, next, topicConf) { +function Producer (opts, log, next, topicConf, metadatOptions) { this._log = log this.producer = new Kafka.Producer(opts, topicConf || {}) @@ -12,7 +12,7 @@ function Consumer (opts, log, next, topicConf) { this.producer.once('ready', onReady.bind(this)) this.producer.on('event.error', onError.bind(this)) - this.producer.connect({}, onConnect.bind(this)) + this.producer.connect(metadatOptions || {}, onConnect.bind(this)) function onConnect (err) { this._log.debug('kafka-producer:onConnect', err) @@ -42,9 +42,9 @@ function Consumer (opts, log, next, topicConf) { EE.call(this) } -inherits(Consumer, EE) +inherits(Producer, EE) -Consumer.prototype.push = function push (message) { +Producer.prototype.push = function push (message) { this._log.debug('kafka-producer:push', message) try { this.producer.produce( @@ -61,10 +61,10 @@ Consumer.prototype.push = function push (message) { return this } -Consumer.prototype.stop = function stop (done) { +Producer.prototype.stop = function stop (done) { this._log.debug('kafka-producer:stop') this.producer.disconnect(done) return this } -module.exports = Consumer +module.exports = Producer diff --git a/package.json b/package.json index 9895a57..133c29c 100644 --- a/package.json +++ b/package.json @@ -3,10 +3,11 @@ "version": "0.2.0", "description": "Fastify plugin to interact with Apache Kafka.", "main": "index.js", + "types": "index.d.ts", "scripts": { "infra:start": "docker-compose up", "infra:stop": "docker-compose stop", - "test": "standard && tap -j=1 test.js" + "test": "standard && tsd && tap -j=1 test/*.js" }, "repository": { "type": "git", @@ -25,12 +26,17 @@ }, "homepage": "https://github.com/fastify/fastify-kafka#readme", "dependencies": { - "fastify-plugin": "^0.1.1", - "node-rdkafka": "^2.1.1" + "fastify-plugin": "^2.0.0", + "node-rdkafka": "^2.8.1" }, "devDependencies": { - "fastify": "^2.0.0", - "standard": "^10.0.3", - "tap": "^10.7.2" + "abstract-logging": "2.0.0", + "fastify": "^3.0.0-rc.2", + "standard": "^14.3.3", + "tap": "^14.10.7", + "tsd": "^0.11.0" + }, + "tsd": { + "directory": "test" } -} +} \ No newline at end of file diff --git a/test/consumer.js b/test/consumer.js new file mode 100644 index 0000000..a4e4cb4 --- /dev/null +++ b/test/consumer.js @@ -0,0 +1,52 @@ +'use strict' + +const t = require('tap') +const log = require('abstract-logging') +const test = t.test + +const Consumer = require('../lib/consumer') + +const options = { + 'metadata.broker.list': '192.0.2.1:9092', + 'fetch.wait.max.ms': 10, + 'fetch.error.backoff.ms': 50, + 'group.id': 'new-group-id' +} + +test('unreachable brokers', t => { + t.plan(1) + const consumer = new Consumer(options, log, (err) => { + t.ok(err) + }, {}, { timeout: 200 }) + consumer.on('ready', t.error) +}) + +test('error event before connection', t => { + t.plan(1) + const consumer = new Consumer(options, log, (err) => { + t.ok(err) + }, {}, { timeout: 200 }) + consumer.consumer.emit('event.error', new Error('Test Error')) +}) + +test('error event after connection', t => { + t.plan(2) + const opts = { ...options, 'metadata.broker.list': '127.0.0.1:9092' } + const consumer = new Consumer(opts, log, (err) => { + t.error(err) + consumer.consumer.emit('event.error', new Error('Test Error')) + }) + consumer.on('error', t.ok) + t.tearDown(() => consumer.stop()) +}) + +test('empty message with data event', t => { + t.plan(3) + const opts = { ...options, 'metadata.broker.list': '127.0.0.1:9092' } + const consumer = new Consumer(opts, log, (err) => { + t.error(err) + t.throws(() => consumer.consumer.emit('data')) + }) + consumer.on('error', t.ok) + t.tearDown(() => consumer.stop()) +}) diff --git a/test/index.test-d.ts b/test/index.test-d.ts new file mode 100644 index 0000000..6f79e26 --- /dev/null +++ b/test/index.test-d.ts @@ -0,0 +1,35 @@ +import Fastify, { FastifyKafkaConsumer, FastifyKafkaProducer, Kafka } from 'fastify'; +import { expectAssignable } from 'tsd'; +import fastifyKafka from '..'; + +const app = Fastify() + +app.register(fastifyKafka, { + producer: { + 'metadata.broker.list': '127.0.0.1:9092', + dr_cb: true + }, + consumer: { + 'metadata.broker.list': '127.0.0.1:9092', + 'group.id': "new-group-1", + 'fetch.wait.max.ms': 10, + 'fetch.error.backoff.ms': 50, + }, + consumerTopicConf: { + 'auto.offset.reset': 'earliest' + }, + producerTopicConf: { + "request.timeout.ms": 10 + }, + metadataOptions: { + timeout: 1000 + } +}); + +// Check whether all properties are merged successfully or not +expectAssignable(app.kafka) +expectAssignable(app.kafka.producer) +expectAssignable(app.kafka.consumer) +expectAssignable(app.kafka.push) +expectAssignable(app.kafka.consume) +expectAssignable(app.kafka.subscribe) diff --git a/test/producer.js b/test/producer.js new file mode 100644 index 0000000..73be349 --- /dev/null +++ b/test/producer.js @@ -0,0 +1,46 @@ +'use strict' + +const t = require('tap') +const log = require('abstract-logging') +const test = t.test + +const Producer = require('../lib/producer') + +const options = { + 'metadata.broker.list': '192.0.2.1:9092', + 'socket.timeout.ms': 10, + dr_cb: true +} + +test('unreachable brokers', t => { + t.plan(2) + const producer = new Producer(options, log, (err) => { + t.ok(err) + producer.on('error', t.ok) + producer.push() + }, {}, { timeout: 200 }) +}) + +test('error event before connection', t => { + t.plan(1) + const producer = new Producer(options, log, (err) => { + t.ok(err) + }, {}, { timeout: 200 }) + producer.producer.emit('event.error', new Error('Test Error')) +}) + +test('error event after connection', t => { + t.plan(3) + const opts = { ...options, 'metadata.broker.list': '127.0.0.1:9092' } + const producer = new Producer(opts, log, (err) => { + t.error(err) + producer.producer.emit('event.error', new Error('Test Error')) + }) + producer.on('error', t.ok) + producer.push({ + topic: 'test', + payload: 'hello world!', + key: 'testKey' + }) + t.tearDown(() => producer.stop()) +}) diff --git a/test.js b/test/test.js similarity index 82% rename from test.js rename to test/test.js index c4ecdcd..fcd3d51 100644 --- a/test.js +++ b/test/test.js @@ -4,16 +4,12 @@ const t = require('tap') const test = t.test const crypto = require('crypto') const Fastify = require('fastify') -const fastifyKafka = require('./') - -const logger = { level: 'trace' } +const fastifyKafka = require('..') const defaultOptions = { producer: { 'metadata.broker.list': '127.0.0.1:9092', - 'fetch.wait.max.ms': 10, - 'fetch.error.backoff.ms': 50, - 'dr_cb': true + dr_cb: true }, consumer: { 'metadata.broker.list': '127.0.0.1:9092', @@ -22,6 +18,9 @@ const defaultOptions = { }, consumerTopicConf: { 'auto.offset.reset': 'beginning' + }, + metadataOptions: { + timeout: 2000 } } @@ -30,18 +29,17 @@ test('communication', t => { const options = copyPlainObject(defaultOptions) const group = generateGroupId() options.consumer['group.id'] = group - options.producer['group.id'] = group const topicName = generateTopicName() - const producerFastify = Fastify({ logger }) - const consumerFastify = Fastify({ logger }) + const producerFastify = Fastify() + const consumerFastify = Fastify() t.tearDown(() => producerFastify.close()) t.tearDown(() => consumerFastify.close()) consumerFastify - .register(fastifyKafka, {...options, producer: undefined}) + .register(fastifyKafka, { ...options, producer: undefined }) .after(err => { t.error(err) @@ -59,7 +57,7 @@ test('communication', t => { }) producerFastify - .register(fastifyKafka, {...options, consumer: undefined}) + .register(fastifyKafka, { ...options, consumer: undefined }) .after(err => { t.error(err) @@ -87,19 +85,18 @@ test('multiple topics', t => { const options = copyPlainObject(defaultOptions) const group = generateGroupId() options.consumer['group.id'] = group - options.producer['group.id'] = group const topicName1 = generateTopicName() const topicName2 = generateTopicName() - const producerFastify = Fastify({ logger }) - const consumerFastify = Fastify({ logger }) + const producerFastify = Fastify() + const consumerFastify = Fastify() t.tearDown(() => producerFastify.close()) t.tearDown(() => consumerFastify.close()) consumerFastify - .register(fastifyKafka, {...options, producer: undefined}) + .register(fastifyKafka, { ...options, producer: undefined }) .after(err => { t.error(err) @@ -122,7 +119,7 @@ test('multiple topics', t => { }) producerFastify - .register(fastifyKafka, {...options, consumer: undefined}) + .register(fastifyKafka, { ...options, consumer: undefined }) .after(err => { t.error(err) @@ -154,19 +151,18 @@ test('consume callback', t => { const options = copyPlainObject(defaultOptions) const group = generateGroupId() options.consumer['group.id'] = group - options.producer['group.id'] = group - options.consumer['event_cb'] = true + options.consumer.event_cb = true const topicName = generateTopicName() - const producerFastify = Fastify({ logger }) - const consumerFastify = Fastify({ logger }) + const producerFastify = Fastify() + const consumerFastify = Fastify() t.tearDown(() => producerFastify.close()) t.tearDown(() => consumerFastify.close()) consumerFastify - .register(fastifyKafka, {...options, producer: undefined}) + .register(fastifyKafka, { ...options, producer: undefined }) .after(err => { t.error(err) @@ -196,7 +192,7 @@ test('consume callback', t => { }) producerFastify - .register(fastifyKafka, {...options, consumer: undefined}) + .register(fastifyKafka, { ...options, consumer: undefined }) .after(err => { t.error(err)