-
-
Notifications
You must be signed in to change notification settings - Fork 19
/
basic.js
66 lines (55 loc) · 1.36 KB
/
basic.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
'use strict'
const crypto = require('node:crypto')
const fastify = require('fastify')({
logger: {
level: 'debug'
}
})
const groupId = crypto.randomBytes(20).toString('hex')
fastify
.register(require('..'), {
producer: {
'metadata.broker.list': '127.0.0.1:9092',
dr_cb: true
},
consumer: {
'metadata.broker.list': '127.0.0.1:9092',
'group.id': groupId,
'fetch.wait.max.ms': 10,
'fetch.error.backoff.ms': 50
},
consumerTopicConf: {
'auto.offset.reset': 'earliest'
}
})
.after(err => {
if (err) throw err
fastify.kafka.producer.on('error', err => { if (err) throw err })
fastify.kafka.consumer.on('error', err => { if (err) throw err })
fastify.kafka.subscribe(['test', 'kafka'])
fastify.kafka.on('test', (msg, commit) => {
console.log(msg.value.toString())
commit()
fastify.kafka.push({
topic: 'kafka',
payload: 'proooova',
key: 'testKey'
})
})
fastify.kafka.on('kafka', (msg, commit) => {
console.log(msg.value.toString())
commit()
setImmediate(fastify.close)
})
fastify.kafka.consume()
setTimeout(() => {
fastify.kafka.push({
topic: 'test',
payload: 'hello world!',
key: 'testKey'
})
}, 1000)
})
process.once('SIGINT', function () {
fastify.close()
})