Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix linting #58

Merged
merged 3 commits into from
Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ jobs:
uses: actions/setup-node@v2
with:
node-version: ${{ matrix.node-version }}
- name: Start Infrastructure
run: |
docker-compose up -d
darkgl0w marked this conversation as resolved.
Show resolved Hide resolved
- name: Install Dependencies
run: |
npm install --ignore-scripts
npm install
darkgl0w marked this conversation as resolved.
Show resolved Hide resolved
- name: Run Tests
run: |
npm run infra:start
npm test
automerge:
needs: test
Expand Down
1 change: 1 addition & 0 deletions .taprc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
check-coverage: false
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ fastify

fastify.post('/data', (req, reply) => {
fastify.kafka.push({
topic: 'api-data',
topic: 'updates',
payload: req.body,
key: 'dataKey'
})
Expand Down
15 changes: 12 additions & 3 deletions index.d.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@

import { FastifyPlugin } from 'fastify';
import { ConsumerGlobalConfig, ConsumerTopicConfig, KafkaConsumer, Message, MetadataOptions, Producer, ProducerGlobalConfig, ProducerTopicConfig } from 'node-rdkafka';
import { FastifyPluginCallback } from 'fastify';
import {
ConsumerGlobalConfig,
ConsumerTopicConfig,
KafkaConsumer,
Message,
MetadataOptions,
Producer,
ProducerGlobalConfig,
ProducerTopicConfig
} from 'node-rdkafka';

declare module 'fastify' {
interface FastifyKafkaMessage extends Pick<Message, 'topic' | 'partition' | 'key'> {
Expand Down Expand Up @@ -37,6 +46,6 @@ declare namespace fastifyKafka {
}
}

declare const fastifyKafka: FastifyPlugin<fastifyKafka.FastifyKafkaOptions>;
export const fastifyKafka: FastifyPluginCallback<fastifyKafka.FastifyKafkaOptions>;

export default fastifyKafka;
6 changes: 3 additions & 3 deletions lib/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ const EE = require('events').EventEmitter
const inherits = require('util').inherits
const Kafka = require('node-rdkafka')

function Consumer (opts, log, next, topicConf, metadatOptions) {
function Consumer (opts, log, next, topicConf, metadataOptions) {
this._log = log
this.consumer = new Kafka.KafkaConsumer(opts, topicConf || {})
var calledNext = false
let calledNext = false

this.consumer.once('ready', onReady.bind(this))
this.consumer.on('event.error', onError.bind(this))
this.consumer.connect(metadatOptions || {}, onConnect.bind(this))
this.consumer.connect(metadataOptions || {}, onConnect.bind(this))

// register the handler only if the developer doesn't use
// the callback provided on `consume` invocation
Expand Down
6 changes: 3 additions & 3 deletions lib/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ const EE = require('events').EventEmitter
const inherits = require('util').inherits
const Kafka = require('node-rdkafka')

function Producer (opts, log, next, topicConf, metadatOptions) {
function Producer (opts, log, next, topicConf, metadataOptions) {
this._log = log
this.producer = new Kafka.Producer(opts, topicConf || {})

var calledNext = false
let calledNext = false

this.producer.once('ready', onReady.bind(this))
this.producer.on('event.error', onError.bind(this))
this.producer.connect(metadatOptions || {}, onConnect.bind(this))
this.producer.connect(metadataOptions || {}, onConnect.bind(this))

function onConnect (err) {
this._log.debug('kafka-producer:onConnect', err)
Expand Down
10 changes: 6 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
"node-rdkafka": "^2.8.1"
},
"devDependencies": {
"@types/node": "^17.0.8",
"abstract-logging": "2.0.1",
"fastify": "^3.0.0-rc.2",
"standard": "^16.0.1",
"tap": "^14.10.7",
"tsd": "^0.11.0"
"fastify": "^3.25.3",
"standard": "^16.0.4",
"tap": "^15.1.6",
"tsd": "^0.19.1",
"typescript": "^4.5.4"
},
"tsd": {
"directory": "test"
Expand Down
4 changes: 2 additions & 2 deletions test/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ test('error event after connection', t => {
consumer.consumer.emit('event.error', new Error('Test Error'))
})
consumer.on('error', t.ok)
t.tearDown(() => consumer.stop())
t.teardown(() => consumer.stop())
})

test('empty message with data event', t => {
Expand All @@ -48,5 +48,5 @@ test('empty message with data event', t => {
t.throws(() => consumer.consumer.emit('data'))
})
consumer.on('error', t.ok)
t.tearDown(() => consumer.stop())
t.teardown(() => consumer.stop())
})
2 changes: 1 addition & 1 deletion test/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,5 @@ test('error event after connection', t => {
payload: 'hello world!',
key: 'testKey'
})
t.tearDown(() => producer.stop())
t.teardown(() => producer.stop())
})
82 changes: 50 additions & 32 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@ const test = t.test
const crypto = require('crypto')
const Fastify = require('fastify')
const fastifyKafka = require('..')
const defaultExport = require('..').default
const { fastifyKafka: namedExport } = require('..')

const defaultOptions = {
producer: {
'metadata.broker.list': '127.0.0.1:9092',
'allow.auto.create.topics': true,
dr_cb: true
},
consumer: {
'metadata.broker.list': '127.0.0.1:9092',
'fetch.wait.max.ms': 10,
'fetch.error.backoff.ms': 50
'fetch.error.backoff.ms': 50,
'topic.metadata.refresh.interval.ms': 100,
'allow.auto.create.topics': true
},
consumerTopicConf: {
'auto.offset.reset': 'beginning'
Expand All @@ -24,6 +29,25 @@ const defaultOptions = {
}
}

test('export', function (t) {
t.plan(3)

t.test('module export', function (t) {
t.plan(1)
t.equal(typeof fastifyKafka, 'function')
})

t.test('default export', function (t) {
t.plan(1)
t.equal(typeof defaultExport, 'function')
})

t.test('named export', function (t) {
t.plan(1)
t.equal(typeof namedExport, 'function')
})
})

test('communication', t => {
t.plan(7)
const options = copyPlainObject(defaultOptions)
Expand All @@ -35,8 +59,8 @@ test('communication', t => {
const producerFastify = Fastify()
const consumerFastify = Fastify()

t.tearDown(() => producerFastify.close())
t.tearDown(() => consumerFastify.close())
t.teardown(() => producerFastify.close())
t.teardown(() => consumerFastify.close())

consumerFastify
.register(fastifyKafka, { ...options, producer: undefined })
Expand All @@ -47,7 +71,7 @@ test('communication', t => {
consumerFastify.kafka.subscribe(topicName)

consumerFastify.kafka.on(topicName, (msg, commit) => {
t.strictEqual(msg.value.toString(), 'hello world!')
t.equal(msg.value.toString(), 'hello world!')
commit()

t.ok(true)
Expand All @@ -72,10 +96,10 @@ test('communication', t => {
})

producerFastify.ready(err => {
t.ifError(err)
t.error(err)

consumerFastify.ready(err => {
t.ifError(err)
t.error(err)
})
})
})
Expand All @@ -92,8 +116,8 @@ test('multiple topics', t => {
const producerFastify = Fastify()
const consumerFastify = Fastify()

t.tearDown(() => producerFastify.close())
t.tearDown(() => consumerFastify.close())
t.teardown(() => producerFastify.close())
t.teardown(() => consumerFastify.close())

consumerFastify
.register(fastifyKafka, { ...options, producer: undefined })
Expand All @@ -104,13 +128,13 @@ test('multiple topics', t => {
consumerFastify.kafka.subscribe([topicName1, topicName2])

consumerFastify.kafka.on(topicName1, (msg, commit) => {
t.strictEqual(msg.value.toString(), 'topic1')
t.equal(msg.value.toString(), 'topic1')
commit()
t.ok(true)
})

consumerFastify.kafka.on(topicName2, (msg, commit) => {
t.strictEqual(msg.value.toString(), 'topic2')
t.equal(msg.value.toString(), 'topic2')
commit()
t.ok(true)
})
Expand Down Expand Up @@ -139,10 +163,10 @@ test('multiple topics', t => {
})

producerFastify.ready(err => {
t.ifError(err)
t.error(err)

consumerFastify.ready(err => {
t.ifError(err)
t.error(err)
})
})
})
Expand All @@ -158,8 +182,8 @@ test('consume callback', t => {
const producerFastify = Fastify()
const consumerFastify = Fastify()

t.tearDown(() => producerFastify.close())
t.tearDown(() => consumerFastify.close())
t.teardown(() => producerFastify.close())
t.teardown(() => consumerFastify.close())

consumerFastify
.register(fastifyKafka, { ...options, producer: undefined })
Expand All @@ -171,24 +195,18 @@ test('consume callback', t => {

consumerFastify.kafka.on(topicName, t.fail)

function onConsume (err, messages) {
t.ifError(err)

if (messages && messages.length > 0) {
t.equal(messages.length, 1)
t.match(messages[0], {
topic: topicName,
value: Buffer.from('hello world!'),
key: Buffer.from('testKey')
})
t.end()
return
}

setTimeout(() => consumerFastify.kafka.consume(10, onConsume), 10)
function onConsume (err, message) {
t.error(err)
t.match(message, {
topic: topicName,
value: Buffer.from('hello world!'),
key: Buffer.from('testKey')
})

t.end()
}

consumerFastify.kafka.consume(10, onConsume)
consumerFastify.kafka.consume(onConsume)
})

producerFastify
Expand All @@ -207,10 +225,10 @@ test('consume callback', t => {
})

producerFastify.ready(err => {
t.ifError(err)
t.error(err)

consumerFastify.ready(err => {
t.ifError(err)
t.error(err)
})
})
})
Expand Down