Skip to content

Commit

Permalink
Upgrade to fastify v3 (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nilesh Mali authored May 14, 2020
1 parent 18e224a commit e0eb4bd
Show file tree
Hide file tree
Showing 12 changed files with 229 additions and 45 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,7 @@ typings/

# lock files
package-lock.json
yarn.lock

# IDE
.vscode/
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
language: node_js

node_js:
- "14"
- "12"
- "10"

Expand Down
9 changes: 4 additions & 5 deletions example.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@ fastify
.register(require('./'), {
producer: {
'metadata.broker.list': '127.0.0.1:9092',
'group.id': group,
'fetch.wait.max.ms': 10,
'fetch.error.backoff.ms': 50,
'dr_cb': true
dr_cb: true
},
consumer: {
'metadata.broker.list': '127.0.0.1:9092',
'group.id': group,
'fetch.wait.max.ms': 10,
'fetch.error.backoff.ms': 50,
'fetch.error.backoff.ms': 50
},
consumerTopicConf: {
'auto.offset.reset': 'earliest'
}
})
Expand Down
42 changes: 42 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@

import { FastifyPlugin } from 'fastify';
import { ConsumerGlobalConfig, ConsumerTopicConfig, KafkaConsumer, Message, MetadataOptions, Producer, ProducerGlobalConfig, ProducerTopicConfig } from 'node-rdkafka';

declare module 'fastify' {
interface FastifyKafkaMessage extends Pick<Message, 'topic' | 'partition' | 'key'> {
payload: unknown;
}

interface FastifyKafkaProducer {
producer: Producer;
push(message: FastifyKafkaMessage): void;
stop(done: () => void): void;
}

interface FastifyKafkaConsumer extends Pick<KafkaConsumer, 'consume' | 'subscribe'> {
consumer: KafkaConsumer;
stop(done: () => void): void;
}

interface Kafka extends Pick<FastifyKafkaProducer, 'push'>, Pick<FastifyKafkaConsumer, 'consume' | 'subscribe'> {
producer?: FastifyKafkaProducer;
consumer?: FastifyKafkaConsumer;
}
interface FastifyInstance {
kafka: Kafka;
}
}

declare namespace fastifyKafka {
export interface FastifyKafkaOptions {
producer?: ProducerGlobalConfig;
consumer?: ConsumerGlobalConfig;
producerTopicConf?: ProducerTopicConfig;
consumerTopicConf?: ConsumerTopicConfig;
metadataOptions?: MetadataOptions;
}
}

declare const fastifyKafka: FastifyPlugin<fastifyKafka.FastifyKafkaOptions>;

export default fastifyKafka;
9 changes: 6 additions & 3 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ function fastifyKafka (fastify, opts, next) {
}

function buildProducer (fastify, opts, next) {
const producer = new Producer(opts.producer, fastify.log, next, opts.producerTopicConf)
const producer = new Producer(opts.producer, fastify.log, next, opts.producerTopicConf, opts.metadataOptions)
fastify.kafka.producer = producer
fastify.kafka.push = producer.push.bind(producer)

Expand All @@ -30,7 +30,7 @@ function buildProducer (fastify, opts, next) {
}

function buildConsumer (fastify, opts, next) {
const consumer = new Consumer(opts.consumer, fastify.log, next, opts.consumerTopicConf)
const consumer = new Consumer(opts.consumer, fastify.log, next, opts.consumerTopicConf, opts.metadataOptions)
fastify.kafka.consumer = consumer
fastify.kafka.consume = consumer.consume.bind(consumer)
fastify.kafka.subscribe = consumer.subscribe.bind(consumer)
Expand All @@ -42,4 +42,7 @@ function buildConsumer (fastify, opts, next) {
}
}

module.exports = fp(fastifyKafka, '>=2.0.0')
module.exports = fp(fastifyKafka, {
fastify: '>=3',
name: 'fastify-kafka'
})
4 changes: 2 additions & 2 deletions lib/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ const EE = require('events').EventEmitter
const inherits = require('util').inherits
const Kafka = require('node-rdkafka')

function Consumer (opts, log, next, topicConf) {
function Consumer (opts, log, next, topicConf, metadatOptions) {
this._log = log
this.consumer = new Kafka.KafkaConsumer(opts, topicConf || {})
var calledNext = false

this.consumer.once('ready', onReady.bind(this))
this.consumer.on('event.error', onError.bind(this))
this.consumer.connect({}, onConnect.bind(this))
this.consumer.connect(metadatOptions || {}, onConnect.bind(this))

// register the handler only if the developer doesn't use
// the callback provided on `consume` invocation
Expand Down
12 changes: 6 additions & 6 deletions lib/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ const EE = require('events').EventEmitter
const inherits = require('util').inherits
const Kafka = require('node-rdkafka')

function Consumer (opts, log, next, topicConf) {
function Producer (opts, log, next, topicConf, metadatOptions) {
this._log = log
this.producer = new Kafka.Producer(opts, topicConf || {})

var calledNext = false

this.producer.once('ready', onReady.bind(this))
this.producer.on('event.error', onError.bind(this))
this.producer.connect({}, onConnect.bind(this))
this.producer.connect(metadatOptions || {}, onConnect.bind(this))

function onConnect (err) {
this._log.debug('kafka-producer:onConnect', err)
Expand Down Expand Up @@ -42,9 +42,9 @@ function Consumer (opts, log, next, topicConf) {
EE.call(this)
}

inherits(Consumer, EE)
inherits(Producer, EE)

Consumer.prototype.push = function push (message) {
Producer.prototype.push = function push (message) {
this._log.debug('kafka-producer:push', message)
try {
this.producer.produce(
Expand All @@ -61,10 +61,10 @@ Consumer.prototype.push = function push (message) {
return this
}

Consumer.prototype.stop = function stop (done) {
Producer.prototype.stop = function stop (done) {
this._log.debug('kafka-producer:stop')
this.producer.disconnect(done)
return this
}

module.exports = Consumer
module.exports = Producer
20 changes: 13 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
"version": "0.2.0",
"description": "Fastify plugin to interact with Apache Kafka.",
"main": "index.js",
"types": "index.d.ts",
"scripts": {
"infra:start": "docker-compose up",
"infra:stop": "docker-compose stop",
"test": "standard && tap -j=1 test.js"
"test": "standard && tsd && tap -j=1 test/*.js"
},
"repository": {
"type": "git",
Expand All @@ -25,12 +26,17 @@
},
"homepage": "https://github.com/fastify/fastify-kafka#readme",
"dependencies": {
"fastify-plugin": "^0.1.1",
"node-rdkafka": "^2.1.1"
"fastify-plugin": "^2.0.0",
"node-rdkafka": "^2.8.1"
},
"devDependencies": {
"fastify": "^2.0.0",
"standard": "^10.0.3",
"tap": "^10.7.2"
"abstract-logging": "2.0.0",
"fastify": "^3.0.0-rc.2",
"standard": "^14.3.3",
"tap": "^14.10.7",
"tsd": "^0.11.0"
},
"tsd": {
"directory": "test"
}
}
}
52 changes: 52 additions & 0 deletions test/consumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
'use strict'

const t = require('tap')
const log = require('abstract-logging')
const test = t.test

const Consumer = require('../lib/consumer')

const options = {
'metadata.broker.list': '192.0.2.1:9092',
'fetch.wait.max.ms': 10,
'fetch.error.backoff.ms': 50,
'group.id': 'new-group-id'
}

test('unreachable brokers', t => {
t.plan(1)
const consumer = new Consumer(options, log, (err) => {
t.ok(err)
}, {}, { timeout: 200 })
consumer.on('ready', t.error)
})

test('error event before connection', t => {
t.plan(1)
const consumer = new Consumer(options, log, (err) => {
t.ok(err)
}, {}, { timeout: 200 })
consumer.consumer.emit('event.error', new Error('Test Error'))
})

test('error event after connection', t => {
t.plan(2)
const opts = { ...options, 'metadata.broker.list': '127.0.0.1:9092' }
const consumer = new Consumer(opts, log, (err) => {
t.error(err)
consumer.consumer.emit('event.error', new Error('Test Error'))
})
consumer.on('error', t.ok)
t.tearDown(() => consumer.stop())
})

test('empty message with data event', t => {
t.plan(3)
const opts = { ...options, 'metadata.broker.list': '127.0.0.1:9092' }
const consumer = new Consumer(opts, log, (err) => {
t.error(err)
t.throws(() => consumer.consumer.emit('data'))
})
consumer.on('error', t.ok)
t.tearDown(() => consumer.stop())
})
35 changes: 35 additions & 0 deletions test/index.test-d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import Fastify, { FastifyKafkaConsumer, FastifyKafkaProducer, Kafka } from 'fastify';
import { expectAssignable } from 'tsd';
import fastifyKafka from '..';

const app = Fastify()

app.register(fastifyKafka, {
producer: {
'metadata.broker.list': '127.0.0.1:9092',
dr_cb: true
},
consumer: {
'metadata.broker.list': '127.0.0.1:9092',
'group.id': "new-group-1",
'fetch.wait.max.ms': 10,
'fetch.error.backoff.ms': 50,
},
consumerTopicConf: {
'auto.offset.reset': 'earliest'
},
producerTopicConf: {
"request.timeout.ms": 10
},
metadataOptions: {
timeout: 1000
}
});

// Check whether all properties are merged successfully or not
expectAssignable<Kafka>(app.kafka)
expectAssignable<FastifyKafkaProducer | undefined>(app.kafka.producer)
expectAssignable<FastifyKafkaConsumer | undefined>(app.kafka.consumer)
expectAssignable<Function>(app.kafka.push)
expectAssignable<Function>(app.kafka.consume)
expectAssignable<Function>(app.kafka.subscribe)
46 changes: 46 additions & 0 deletions test/producer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
'use strict'

const t = require('tap')
const log = require('abstract-logging')
const test = t.test

const Producer = require('../lib/producer')

const options = {
'metadata.broker.list': '192.0.2.1:9092',
'socket.timeout.ms': 10,
dr_cb: true
}

test('unreachable brokers', t => {
t.plan(2)
const producer = new Producer(options, log, (err) => {
t.ok(err)
producer.on('error', t.ok)
producer.push()
}, {}, { timeout: 200 })
})

test('error event before connection', t => {
t.plan(1)
const producer = new Producer(options, log, (err) => {
t.ok(err)
}, {}, { timeout: 200 })
producer.producer.emit('event.error', new Error('Test Error'))
})

test('error event after connection', t => {
t.plan(3)
const opts = { ...options, 'metadata.broker.list': '127.0.0.1:9092' }
const producer = new Producer(opts, log, (err) => {
t.error(err)
producer.producer.emit('event.error', new Error('Test Error'))
})
producer.on('error', t.ok)
producer.push({
topic: 'test',
payload: 'hello world!',
key: 'testKey'
})
t.tearDown(() => producer.stop())
})
Loading

0 comments on commit e0eb4bd

Please sign in to comment.