From 13153d5318223bbec9c74d9f28ccefd691992af6 Mon Sep 17 00:00:00 2001 From: VBetsun Date: Wed, 12 Jan 2022 09:35:19 +0200 Subject: [PATCH 1/2] docs(examples): add multiple consumers usage --- README.md | 2 + example.js => examples/basic.js | 2 +- examples/multiple-consumers.js | 92 +++++++++++++++++++++++++++++++++ package.json | 3 +- 4 files changed, 97 insertions(+), 2 deletions(-) rename example.js => examples/basic.js (97%) create mode 100644 examples/multiple-consumers.js diff --git a/README.md b/README.md index 3270db8..ad76c88 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,8 @@ fastify.listen(3000, err => { console.log(`server listening on ${fastify.server.address().port}`) }) ``` + +for more examples please take a look at the [examples directory](./examples) ### API This module exposes the following APIs: diff --git a/example.js b/examples/basic.js similarity index 97% rename from example.js rename to examples/basic.js index 2434031..8f07958 100644 --- a/example.js +++ b/examples/basic.js @@ -10,7 +10,7 @@ const fastify = require('fastify')({ const group = crypto.randomBytes(20).toString('hex') fastify - .register(require('./'), { + .register(require('..'), { producer: { 'metadata.broker.list': '127.0.0.1:9092', dr_cb: true diff --git a/examples/multiple-consumers.js b/examples/multiple-consumers.js new file mode 100644 index 0000000..ee2dd14 --- /dev/null +++ b/examples/multiple-consumers.js @@ -0,0 +1,92 @@ +'use strict' + +const crypto = require('crypto') +const fastifyKafka = require('..') +const fastify = require('fastify') + +const group = crypto.randomBytes(20).toString('hex') +const logger = { level: 'debug' } +const consumerOptions = { + consumer: { + 'metadata.broker.list': '127.0.0.1:9092', + 'group.id': group, + 'fetch.wait.max.ms': 10, + 'topic.metadata.refresh.interval.ms': 1000, + 'fetch.error.backoff.ms': 50 + }, + consumerTopicConf: { + 'auto.offset.reset': 'earliest' + } +} + +const producerOptions = { + producer: { + 'metadata.broker.list': '127.0.0.1:9092', + dr_cb: true + } +} + +const petConsumer = fastify({ logger }) + .register(fastifyKafka, consumerOptions) + .after(err => { + if (err) throw err + + petConsumer.kafka.subscribe(['cats', 'dogs']) + .on('cats', (msg, commit) => { + petConsumer.log.info('received message in cats topic: %s', msg.value.toString()) + commit() + }) + .on('dogs', (msg, commit) => { + petConsumer.log.info('received message in dogs topic: %s', msg.value.toString()) + commit() + }) + + petConsumer.kafka.consume() + }) + +const plantConsumer = fastify({ logger }) + .register(fastifyKafka, consumerOptions) + .after(err => { + if (err) throw err + + plantConsumer.kafka.subscribe('flowers') + .on('flowers', (msg, commit) => { + plantConsumer.log.info('received message in flowers topic: %s', msg.value.toString()) + commit() + }) + + plantConsumer.kafka.consume() + }) + +const producer = fastify({ logger }) + .register(fastifyKafka, producerOptions) + .after(err => { + if (err) throw err + + producer.kafka.push({ + topic: 'cats', + payload: 'message about cats', + key: 'testKey' + }) + producer.kafka.push({ + topic: 'dogs', + payload: 'message about dogs', + key: 'testKey' + }) + producer.kafka.push({ + topic: 'flowers', + payload: 'message about flowers', + key: 'testKey' + }) + }) + +producer.ready(err => { + if (err) throw err + + petConsumer.ready(err => { + if (err) throw err + }) + plantConsumer.ready(err => { + if (err) throw err + }) +}) diff --git a/package.json b/package.json index 8aed0d1..b0cd83c 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,8 @@ "scripts": { "infra:start": "docker-compose up -d", "infra:stop": "docker-compose stop", - "test": "standard && tsd && tap -j=1 test/*.js" + "lint": "standard --fix", + "test": "standard && tsd && tap test/*.js" }, "repository": { "type": "git", From b1c485293f26a4cdc05221683b784161830a3fc4 Mon Sep 17 00:00:00 2001 From: VBetsun Date: Wed, 12 Jan 2022 10:03:49 +0200 Subject: [PATCH 2/2] fix: accept code-review suggestions --- README.md | 2 +- examples/basic.js | 4 ++-- examples/multiple-consumers.js | 4 ++-- package.json | 6 ++++-- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index ad76c88..2b8be40 100644 --- a/README.md +++ b/README.md @@ -59,7 +59,7 @@ fastify.listen(3000, err => { }) ``` -for more examples please take a look at the [examples directory](./examples) +For more examples on how to use this plugin you can take a look at the [examples directory](./examples). ### API This module exposes the following APIs: diff --git a/examples/basic.js b/examples/basic.js index 8f07958..6f383a5 100644 --- a/examples/basic.js +++ b/examples/basic.js @@ -7,7 +7,7 @@ const fastify = require('fastify')({ } }) -const group = crypto.randomBytes(20).toString('hex') +const groupId = crypto.randomBytes(20).toString('hex') fastify .register(require('..'), { @@ -17,7 +17,7 @@ fastify }, consumer: { 'metadata.broker.list': '127.0.0.1:9092', - 'group.id': group, + 'group.id': groupId, 'fetch.wait.max.ms': 10, 'fetch.error.backoff.ms': 50 }, diff --git a/examples/multiple-consumers.js b/examples/multiple-consumers.js index ee2dd14..8a4463c 100644 --- a/examples/multiple-consumers.js +++ b/examples/multiple-consumers.js @@ -4,12 +4,12 @@ const crypto = require('crypto') const fastifyKafka = require('..') const fastify = require('fastify') -const group = crypto.randomBytes(20).toString('hex') +const groupId = crypto.randomBytes(20).toString('hex') const logger = { level: 'debug' } const consumerOptions = { consumer: { 'metadata.broker.list': '127.0.0.1:9092', - 'group.id': group, + 'group.id': groupId, 'fetch.wait.max.ms': 10, 'topic.metadata.refresh.interval.ms': 1000, 'fetch.error.backoff.ms': 50 diff --git a/package.json b/package.json index b0cd83c..135f955 100644 --- a/package.json +++ b/package.json @@ -7,8 +7,10 @@ "scripts": { "infra:start": "docker-compose up -d", "infra:stop": "docker-compose stop", - "lint": "standard --fix", - "test": "standard && tsd && tap test/*.js" + "lint": "standard", + "lint:fix": "standard --fix", + "test": "npm run lint && tap \"test/*.js\" && npm run test:typescript", + "test:typescript": "tsd" }, "repository": { "type": "git",