Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to fastify v3 #7

Merged
merged 1 commit into from May 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,7 @@ typings/

# lock files
package-lock.json
yarn.lock

# IDE
.vscode/
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
language: node_js

node_js:
- "14"
- "12"
- "10"

Expand Down
9 changes: 4 additions & 5 deletions example.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@ fastify
.register(require('./'), {
producer: {
'metadata.broker.list': '127.0.0.1:9092',
'group.id': group,
'fetch.wait.max.ms': 10,
'fetch.error.backoff.ms': 50,
'dr_cb': true
dr_cb: true
},
consumer: {
'metadata.broker.list': '127.0.0.1:9092',
'group.id': group,
'fetch.wait.max.ms': 10,
'fetch.error.backoff.ms': 50,
'fetch.error.backoff.ms': 50
},
consumerTopicConf: {
'auto.offset.reset': 'earliest'
}
})
Expand Down
42 changes: 42 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@

import { FastifyPlugin } from 'fastify';
import { ConsumerGlobalConfig, ConsumerTopicConfig, KafkaConsumer, Message, MetadataOptions, Producer, ProducerGlobalConfig, ProducerTopicConfig } from 'node-rdkafka';

declare module 'fastify' {
interface FastifyKafkaMessage extends Pick<Message, 'topic' | 'partition' | 'key'> {
payload: unknown;
}

interface FastifyKafkaProducer {
producer: Producer;
push(message: FastifyKafkaMessage): void;
stop(done: () => void): void;
}

interface FastifyKafkaConsumer extends Pick<KafkaConsumer, 'consume' | 'subscribe'> {
consumer: KafkaConsumer;
stop(done: () => void): void;
}

interface Kafka extends Pick<FastifyKafkaProducer, 'push'>, Pick<FastifyKafkaConsumer, 'consume' | 'subscribe'> {
producer?: FastifyKafkaProducer;
consumer?: FastifyKafkaConsumer;
}
interface FastifyInstance {
kafka: Kafka;
}
}

declare namespace fastifyKafka {
export interface FastifyKafkaOptions {
producer?: ProducerGlobalConfig;
consumer?: ConsumerGlobalConfig;
producerTopicConf?: ProducerTopicConfig;
consumerTopicConf?: ConsumerTopicConfig;
metadataOptions?: MetadataOptions;
}
}

declare const fastifyKafka: FastifyPlugin<fastifyKafka.FastifyKafkaOptions>;

export default fastifyKafka;
9 changes: 6 additions & 3 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ function fastifyKafka (fastify, opts, next) {
}

function buildProducer (fastify, opts, next) {
const producer = new Producer(opts.producer, fastify.log, next, opts.producerTopicConf)
const producer = new Producer(opts.producer, fastify.log, next, opts.producerTopicConf, opts.metadataOptions)
fastify.kafka.producer = producer
fastify.kafka.push = producer.push.bind(producer)

Expand All @@ -30,7 +30,7 @@ function buildProducer (fastify, opts, next) {
}

function buildConsumer (fastify, opts, next) {
const consumer = new Consumer(opts.consumer, fastify.log, next, opts.consumerTopicConf)
const consumer = new Consumer(opts.consumer, fastify.log, next, opts.consumerTopicConf, opts.metadataOptions)
fastify.kafka.consumer = consumer
fastify.kafka.consume = consumer.consume.bind(consumer)
fastify.kafka.subscribe = consumer.subscribe.bind(consumer)
Expand All @@ -42,4 +42,7 @@ function buildConsumer (fastify, opts, next) {
}
}

module.exports = fp(fastifyKafka, '>=2.0.0')
module.exports = fp(fastifyKafka, {
fastify: '>=3',
name: 'fastify-kafka'
})
4 changes: 2 additions & 2 deletions lib/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ const EE = require('events').EventEmitter
const inherits = require('util').inherits
const Kafka = require('node-rdkafka')

function Consumer (opts, log, next, topicConf) {
function Consumer (opts, log, next, topicConf, metadatOptions) {
this._log = log
this.consumer = new Kafka.KafkaConsumer(opts, topicConf || {})
var calledNext = false

this.consumer.once('ready', onReady.bind(this))
this.consumer.on('event.error', onError.bind(this))
this.consumer.connect({}, onConnect.bind(this))
this.consumer.connect(metadatOptions || {}, onConnect.bind(this))

// register the handler only if the developer doesn't use
// the callback provided on `consume` invocation
Expand Down
12 changes: 6 additions & 6 deletions lib/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ const EE = require('events').EventEmitter
const inherits = require('util').inherits
const Kafka = require('node-rdkafka')

function Consumer (opts, log, next, topicConf) {
function Producer (opts, log, next, topicConf, metadatOptions) {
this._log = log
this.producer = new Kafka.Producer(opts, topicConf || {})

var calledNext = false

this.producer.once('ready', onReady.bind(this))
this.producer.on('event.error', onError.bind(this))
this.producer.connect({}, onConnect.bind(this))
this.producer.connect(metadatOptions || {}, onConnect.bind(this))

function onConnect (err) {
this._log.debug('kafka-producer:onConnect', err)
Expand Down Expand Up @@ -42,9 +42,9 @@ function Consumer (opts, log, next, topicConf) {
EE.call(this)
}

inherits(Consumer, EE)
inherits(Producer, EE)

Consumer.prototype.push = function push (message) {
Producer.prototype.push = function push (message) {
this._log.debug('kafka-producer:push', message)
try {
this.producer.produce(
Expand All @@ -61,10 +61,10 @@ Consumer.prototype.push = function push (message) {
return this
}

Consumer.prototype.stop = function stop (done) {
Producer.prototype.stop = function stop (done) {
this._log.debug('kafka-producer:stop')
this.producer.disconnect(done)
return this
}

module.exports = Consumer
module.exports = Producer
20 changes: 13 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
"version": "0.2.0",
"description": "Fastify plugin to interact with Apache Kafka.",
"main": "index.js",
"types": "index.d.ts",
"scripts": {
"infra:start": "docker-compose up",
"infra:stop": "docker-compose stop",
"test": "standard && tap -j=1 test.js"
"test": "standard && tsd && tap -j=1 test/*.js"
},
"repository": {
"type": "git",
Expand All @@ -25,12 +26,17 @@
},
"homepage": "https://github.com/fastify/fastify-kafka#readme",
"dependencies": {
"fastify-plugin": "^0.1.1",
"node-rdkafka": "^2.1.1"
"fastify-plugin": "^2.0.0",
"node-rdkafka": "^2.8.1"
},
"devDependencies": {
"fastify": "^2.0.0",
"standard": "^10.0.3",
"tap": "^10.7.2"
"abstract-logging": "2.0.0",
"fastify": "^3.0.0-rc.2",
"standard": "^14.3.3",
"tap": "^14.10.7",
"tsd": "^0.11.0"
},
"tsd": {
"directory": "test"
}
}
}
52 changes: 52 additions & 0 deletions test/consumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
'use strict'

const t = require('tap')
const log = require('abstract-logging')
const test = t.test

const Consumer = require('../lib/consumer')

const options = {
'metadata.broker.list': '192.0.2.1:9092',
'fetch.wait.max.ms': 10,
'fetch.error.backoff.ms': 50,
'group.id': 'new-group-id'
}

test('unreachable brokers', t => {
t.plan(1)
const consumer = new Consumer(options, log, (err) => {
t.ok(err)
}, {}, { timeout: 200 })
consumer.on('ready', t.error)
})

test('error event before connection', t => {
t.plan(1)
const consumer = new Consumer(options, log, (err) => {
t.ok(err)
}, {}, { timeout: 200 })
consumer.consumer.emit('event.error', new Error('Test Error'))
})

test('error event after connection', t => {
t.plan(2)
const opts = { ...options, 'metadata.broker.list': '127.0.0.1:9092' }
const consumer = new Consumer(opts, log, (err) => {
t.error(err)
consumer.consumer.emit('event.error', new Error('Test Error'))
})
consumer.on('error', t.ok)
t.tearDown(() => consumer.stop())
})

test('empty message with data event', t => {
t.plan(3)
const opts = { ...options, 'metadata.broker.list': '127.0.0.1:9092' }
const consumer = new Consumer(opts, log, (err) => {
t.error(err)
t.throws(() => consumer.consumer.emit('data'))
})
consumer.on('error', t.ok)
t.tearDown(() => consumer.stop())
})
35 changes: 35 additions & 0 deletions test/index.test-d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import Fastify, { FastifyKafkaConsumer, FastifyKafkaProducer, Kafka } from 'fastify';
import { expectAssignable } from 'tsd';
import fastifyKafka from '..';

const app = Fastify()

app.register(fastifyKafka, {
producer: {
'metadata.broker.list': '127.0.0.1:9092',
dr_cb: true
},
consumer: {
'metadata.broker.list': '127.0.0.1:9092',
'group.id': "new-group-1",
'fetch.wait.max.ms': 10,
'fetch.error.backoff.ms': 50,
},
consumerTopicConf: {
'auto.offset.reset': 'earliest'
},
producerTopicConf: {
"request.timeout.ms": 10
},
metadataOptions: {
timeout: 1000
}
});

// Check whether all properties are merged successfully or not
expectAssignable<Kafka>(app.kafka)
expectAssignable<FastifyKafkaProducer | undefined>(app.kafka.producer)
expectAssignable<FastifyKafkaConsumer | undefined>(app.kafka.consumer)
expectAssignable<Function>(app.kafka.push)
expectAssignable<Function>(app.kafka.consume)
expectAssignable<Function>(app.kafka.subscribe)
46 changes: 46 additions & 0 deletions test/producer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
'use strict'

const t = require('tap')
const log = require('abstract-logging')
const test = t.test

const Producer = require('../lib/producer')

const options = {
'metadata.broker.list': '192.0.2.1:9092',
'socket.timeout.ms': 10,
dr_cb: true
}

test('unreachable brokers', t => {
t.plan(2)
const producer = new Producer(options, log, (err) => {
t.ok(err)
producer.on('error', t.ok)
producer.push()
}, {}, { timeout: 200 })
})

test('error event before connection', t => {
t.plan(1)
const producer = new Producer(options, log, (err) => {
t.ok(err)
}, {}, { timeout: 200 })
producer.producer.emit('event.error', new Error('Test Error'))
})

test('error event after connection', t => {
t.plan(3)
const opts = { ...options, 'metadata.broker.list': '127.0.0.1:9092' }
const producer = new Producer(opts, log, (err) => {
t.error(err)
producer.producer.emit('event.error', new Error('Test Error'))
})
producer.on('error', t.ok)
producer.push({
topic: 'test',
payload: 'hello world!',
key: 'testKey'
})
t.tearDown(() => producer.stop())
})
Loading