diff --git a/README.md b/README.md index 3270db8..2b8be40 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,8 @@ fastify.listen(3000, err => { console.log(`server listening on ${fastify.server.address().port}`) }) ``` + +For more examples on how to use this plugin you can take a look at the [examples directory](./examples). ### API This module exposes the following APIs: diff --git a/example.js b/examples/basic.js similarity index 92% rename from example.js rename to examples/basic.js index 2434031..6f383a5 100644 --- a/example.js +++ b/examples/basic.js @@ -7,17 +7,17 @@ const fastify = require('fastify')({ } }) -const group = crypto.randomBytes(20).toString('hex') +const groupId = crypto.randomBytes(20).toString('hex') fastify - .register(require('./'), { + .register(require('..'), { producer: { 'metadata.broker.list': '127.0.0.1:9092', dr_cb: true }, consumer: { 'metadata.broker.list': '127.0.0.1:9092', - 'group.id': group, + 'group.id': groupId, 'fetch.wait.max.ms': 10, 'fetch.error.backoff.ms': 50 }, diff --git a/examples/multiple-consumers.js b/examples/multiple-consumers.js new file mode 100644 index 0000000..8a4463c --- /dev/null +++ b/examples/multiple-consumers.js @@ -0,0 +1,92 @@ +'use strict' + +const crypto = require('crypto') +const fastifyKafka = require('..') +const fastify = require('fastify') + +const groupId = crypto.randomBytes(20).toString('hex') +const logger = { level: 'debug' } +const consumerOptions = { + consumer: { + 'metadata.broker.list': '127.0.0.1:9092', + 'group.id': groupId, + 'fetch.wait.max.ms': 10, + 'topic.metadata.refresh.interval.ms': 1000, + 'fetch.error.backoff.ms': 50 + }, + consumerTopicConf: { + 'auto.offset.reset': 'earliest' + } +} + +const producerOptions = { + producer: { + 'metadata.broker.list': '127.0.0.1:9092', + dr_cb: true + } +} + +const petConsumer = fastify({ logger }) + .register(fastifyKafka, consumerOptions) + .after(err => { + if (err) throw err + + petConsumer.kafka.subscribe(['cats', 'dogs']) + .on('cats', (msg, commit) => { + petConsumer.log.info('received message in cats topic: %s', msg.value.toString()) + commit() + }) + .on('dogs', (msg, commit) => { + petConsumer.log.info('received message in dogs topic: %s', msg.value.toString()) + commit() + }) + + petConsumer.kafka.consume() + }) + +const plantConsumer = fastify({ logger }) + .register(fastifyKafka, consumerOptions) + .after(err => { + if (err) throw err + + plantConsumer.kafka.subscribe('flowers') + .on('flowers', (msg, commit) => { + plantConsumer.log.info('received message in flowers topic: %s', msg.value.toString()) + commit() + }) + + plantConsumer.kafka.consume() + }) + +const producer = fastify({ logger }) + .register(fastifyKafka, producerOptions) + .after(err => { + if (err) throw err + + producer.kafka.push({ + topic: 'cats', + payload: 'message about cats', + key: 'testKey' + }) + producer.kafka.push({ + topic: 'dogs', + payload: 'message about dogs', + key: 'testKey' + }) + producer.kafka.push({ + topic: 'flowers', + payload: 'message about flowers', + key: 'testKey' + }) + }) + +producer.ready(err => { + if (err) throw err + + petConsumer.ready(err => { + if (err) throw err + }) + plantConsumer.ready(err => { + if (err) throw err + }) +}) diff --git a/package.json b/package.json index 8aed0d1..135f955 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,10 @@ "scripts": { "infra:start": "docker-compose up -d", "infra:stop": "docker-compose stop", - "test": "standard && tsd && tap -j=1 test/*.js" + "lint": "standard", + "lint:fix": "standard --fix", + "test": "npm run lint && tap \"test/*.js\" && npm run test:typescript", + "test:typescript": "tsd" }, "repository": { "type": "git",