-
-
Notifications
You must be signed in to change notification settings - Fork 19
/
multiple-consumers.js
92 lines (79 loc) · 2.09 KB
/
multiple-consumers.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
'use strict'
const crypto = require('node:crypto')
const fastifyKafka = require('..')
const fastify = require('fastify')
const groupId = crypto.randomBytes(20).toString('hex')
const logger = { level: 'debug' }
const consumerOptions = {
consumer: {
'metadata.broker.list': '127.0.0.1:9092',
'group.id': groupId,
'fetch.wait.max.ms': 10,
'topic.metadata.refresh.interval.ms': 1000,
'fetch.error.backoff.ms': 50
},
consumerTopicConf: {
'auto.offset.reset': 'earliest'
}
}
const producerOptions = {
producer: {
'metadata.broker.list': '127.0.0.1:9092',
dr_cb: true
}
}
const petConsumer = fastify({ logger })
.register(fastifyKafka, consumerOptions)
.after(err => {
if (err) throw err
petConsumer.kafka.subscribe(['cats', 'dogs'])
.on('cats', (msg, commit) => {
petConsumer.log.info('received message in cats topic: %s', msg.value.toString())
commit()
})
.on('dogs', (msg, commit) => {
petConsumer.log.info('received message in dogs topic: %s', msg.value.toString())
commit()
})
petConsumer.kafka.consume()
})
const plantConsumer = fastify({ logger })
.register(fastifyKafka, consumerOptions)
.after(err => {
if (err) throw err
plantConsumer.kafka.subscribe('flowers')
.on('flowers', (msg, commit) => {
plantConsumer.log.info('received message in flowers topic: %s', msg.value.toString())
commit()
})
plantConsumer.kafka.consume()
})
const producer = fastify({ logger })
.register(fastifyKafka, producerOptions)
.after(err => {
if (err) throw err
producer.kafka.push({
topic: 'cats',
payload: 'message about cats',
key: 'testKey'
})
producer.kafka.push({
topic: 'dogs',
payload: 'message about dogs',
key: 'testKey'
})
producer.kafka.push({
topic: 'flowers',
payload: 'message about flowers',
key: 'testKey'
})
})
producer.ready(err => {
if (err) throw err
petConsumer.ready(err => {
if (err) throw err
})
plantConsumer.ready(err => {
if (err) throw err
})
})