diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e30b0bc..7024c89 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -24,12 +24,14 @@ jobs: uses: actions/setup-node@v2 with: node-version: ${{ matrix.node-version }} + - name: Start Infrastructure + run: | + docker-compose up -d - name: Install Dependencies run: | - npm install --ignore-scripts + npm install - name: Run Tests run: | - npm run infra:start npm test automerge: needs: test diff --git a/.taprc b/.taprc new file mode 100644 index 0000000..381688a --- /dev/null +++ b/.taprc @@ -0,0 +1 @@ +check-coverage: false \ No newline at end of file diff --git a/README.md b/README.md index 02e36f2..3270db8 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,7 @@ fastify fastify.post('/data', (req, reply) => { fastify.kafka.push({ - topic: 'api-data', + topic: 'updates', payload: req.body, key: 'dataKey' }) diff --git a/index.d.ts b/index.d.ts index 64a3cfc..506a1b8 100644 --- a/index.d.ts +++ b/index.d.ts @@ -1,6 +1,15 @@ -import { FastifyPlugin } from 'fastify'; -import { ConsumerGlobalConfig, ConsumerTopicConfig, KafkaConsumer, Message, MetadataOptions, Producer, ProducerGlobalConfig, ProducerTopicConfig } from 'node-rdkafka'; +import { FastifyPluginCallback } from 'fastify'; +import { + ConsumerGlobalConfig, + ConsumerTopicConfig, + KafkaConsumer, + Message, + MetadataOptions, + Producer, + ProducerGlobalConfig, + ProducerTopicConfig +} from 'node-rdkafka'; declare module 'fastify' { interface FastifyKafkaMessage extends Pick { @@ -37,6 +46,6 @@ declare namespace fastifyKafka { } } -declare const fastifyKafka: FastifyPlugin; +export const fastifyKafka: FastifyPluginCallback; export default fastifyKafka; \ No newline at end of file diff --git a/lib/consumer.js b/lib/consumer.js index 6b534ef..98352f0 100644 --- a/lib/consumer.js +++ b/lib/consumer.js @@ -4,14 +4,14 @@ const EE = require('events').EventEmitter const inherits = require('util').inherits const Kafka = require('node-rdkafka') -function Consumer (opts, log, next, topicConf, metadatOptions) { +function Consumer (opts, log, next, topicConf, metadataOptions) { this._log = log this.consumer = new Kafka.KafkaConsumer(opts, topicConf || {}) - var calledNext = false + let calledNext = false this.consumer.once('ready', onReady.bind(this)) this.consumer.on('event.error', onError.bind(this)) - this.consumer.connect(metadatOptions || {}, onConnect.bind(this)) + this.consumer.connect(metadataOptions || {}, onConnect.bind(this)) // register the handler only if the developer doesn't use // the callback provided on `consume` invocation diff --git a/lib/producer.js b/lib/producer.js index 0f7aab6..3c87893 100644 --- a/lib/producer.js +++ b/lib/producer.js @@ -4,15 +4,15 @@ const EE = require('events').EventEmitter const inherits = require('util').inherits const Kafka = require('node-rdkafka') -function Producer (opts, log, next, topicConf, metadatOptions) { +function Producer (opts, log, next, topicConf, metadataOptions) { this._log = log this.producer = new Kafka.Producer(opts, topicConf || {}) - var calledNext = false + let calledNext = false this.producer.once('ready', onReady.bind(this)) this.producer.on('event.error', onError.bind(this)) - this.producer.connect(metadatOptions || {}, onConnect.bind(this)) + this.producer.connect(metadataOptions || {}, onConnect.bind(this)) function onConnect (err) { this._log.debug('kafka-producer:onConnect', err) diff --git a/package.json b/package.json index 32c542b..8aed0d1 100644 --- a/package.json +++ b/package.json @@ -30,11 +30,13 @@ "node-rdkafka": "^2.8.1" }, "devDependencies": { + "@types/node": "^17.0.8", "abstract-logging": "2.0.1", - "fastify": "^3.0.0-rc.2", - "standard": "^16.0.1", - "tap": "^14.10.7", - "tsd": "^0.11.0" + "fastify": "^3.25.3", + "standard": "^16.0.4", + "tap": "^15.1.6", + "tsd": "^0.19.1", + "typescript": "^4.5.4" }, "tsd": { "directory": "test" diff --git a/test/consumer.js b/test/consumer.js index a4e4cb4..517c606 100644 --- a/test/consumer.js +++ b/test/consumer.js @@ -37,7 +37,7 @@ test('error event after connection', t => { consumer.consumer.emit('event.error', new Error('Test Error')) }) consumer.on('error', t.ok) - t.tearDown(() => consumer.stop()) + t.teardown(() => consumer.stop()) }) test('empty message with data event', t => { @@ -48,5 +48,5 @@ test('empty message with data event', t => { t.throws(() => consumer.consumer.emit('data')) }) consumer.on('error', t.ok) - t.tearDown(() => consumer.stop()) + t.teardown(() => consumer.stop()) }) diff --git a/test/producer.js b/test/producer.js index 73be349..b701583 100644 --- a/test/producer.js +++ b/test/producer.js @@ -42,5 +42,5 @@ test('error event after connection', t => { payload: 'hello world!', key: 'testKey' }) - t.tearDown(() => producer.stop()) + t.teardown(() => producer.stop()) }) diff --git a/test/test.js b/test/test.js index fcd3d51..9a64492 100644 --- a/test/test.js +++ b/test/test.js @@ -5,16 +5,21 @@ const test = t.test const crypto = require('crypto') const Fastify = require('fastify') const fastifyKafka = require('..') +const defaultExport = require('..').default +const { fastifyKafka: namedExport } = require('..') const defaultOptions = { producer: { 'metadata.broker.list': '127.0.0.1:9092', + 'allow.auto.create.topics': true, dr_cb: true }, consumer: { 'metadata.broker.list': '127.0.0.1:9092', 'fetch.wait.max.ms': 10, - 'fetch.error.backoff.ms': 50 + 'fetch.error.backoff.ms': 50, + 'topic.metadata.refresh.interval.ms': 100, + 'allow.auto.create.topics': true }, consumerTopicConf: { 'auto.offset.reset': 'beginning' @@ -24,6 +29,25 @@ const defaultOptions = { } } +test('export', function (t) { + t.plan(3) + + t.test('module export', function (t) { + t.plan(1) + t.equal(typeof fastifyKafka, 'function') + }) + + t.test('default export', function (t) { + t.plan(1) + t.equal(typeof defaultExport, 'function') + }) + + t.test('named export', function (t) { + t.plan(1) + t.equal(typeof namedExport, 'function') + }) +}) + test('communication', t => { t.plan(7) const options = copyPlainObject(defaultOptions) @@ -35,8 +59,8 @@ test('communication', t => { const producerFastify = Fastify() const consumerFastify = Fastify() - t.tearDown(() => producerFastify.close()) - t.tearDown(() => consumerFastify.close()) + t.teardown(() => producerFastify.close()) + t.teardown(() => consumerFastify.close()) consumerFastify .register(fastifyKafka, { ...options, producer: undefined }) @@ -47,7 +71,7 @@ test('communication', t => { consumerFastify.kafka.subscribe(topicName) consumerFastify.kafka.on(topicName, (msg, commit) => { - t.strictEqual(msg.value.toString(), 'hello world!') + t.equal(msg.value.toString(), 'hello world!') commit() t.ok(true) @@ -72,10 +96,10 @@ test('communication', t => { }) producerFastify.ready(err => { - t.ifError(err) + t.error(err) consumerFastify.ready(err => { - t.ifError(err) + t.error(err) }) }) }) @@ -92,8 +116,8 @@ test('multiple topics', t => { const producerFastify = Fastify() const consumerFastify = Fastify() - t.tearDown(() => producerFastify.close()) - t.tearDown(() => consumerFastify.close()) + t.teardown(() => producerFastify.close()) + t.teardown(() => consumerFastify.close()) consumerFastify .register(fastifyKafka, { ...options, producer: undefined }) @@ -104,13 +128,13 @@ test('multiple topics', t => { consumerFastify.kafka.subscribe([topicName1, topicName2]) consumerFastify.kafka.on(topicName1, (msg, commit) => { - t.strictEqual(msg.value.toString(), 'topic1') + t.equal(msg.value.toString(), 'topic1') commit() t.ok(true) }) consumerFastify.kafka.on(topicName2, (msg, commit) => { - t.strictEqual(msg.value.toString(), 'topic2') + t.equal(msg.value.toString(), 'topic2') commit() t.ok(true) }) @@ -139,10 +163,10 @@ test('multiple topics', t => { }) producerFastify.ready(err => { - t.ifError(err) + t.error(err) consumerFastify.ready(err => { - t.ifError(err) + t.error(err) }) }) }) @@ -158,8 +182,8 @@ test('consume callback', t => { const producerFastify = Fastify() const consumerFastify = Fastify() - t.tearDown(() => producerFastify.close()) - t.tearDown(() => consumerFastify.close()) + t.teardown(() => producerFastify.close()) + t.teardown(() => consumerFastify.close()) consumerFastify .register(fastifyKafka, { ...options, producer: undefined }) @@ -171,24 +195,18 @@ test('consume callback', t => { consumerFastify.kafka.on(topicName, t.fail) - function onConsume (err, messages) { - t.ifError(err) - - if (messages && messages.length > 0) { - t.equal(messages.length, 1) - t.match(messages[0], { - topic: topicName, - value: Buffer.from('hello world!'), - key: Buffer.from('testKey') - }) - t.end() - return - } - - setTimeout(() => consumerFastify.kafka.consume(10, onConsume), 10) + function onConsume (err, message) { + t.error(err) + t.match(message, { + topic: topicName, + value: Buffer.from('hello world!'), + key: Buffer.from('testKey') + }) + + t.end() } - consumerFastify.kafka.consume(10, onConsume) + consumerFastify.kafka.consume(onConsume) }) producerFastify @@ -207,10 +225,10 @@ test('consume callback', t => { }) producerFastify.ready(err => { - t.ifError(err) + t.error(err) consumerFastify.ready(err => { - t.ifError(err) + t.error(err) }) }) })