-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpublisher.service.ts
80 lines (72 loc) · 2.24 KB
/
publisher.service.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import {
Inject,
Injectable,
Logger,
OnApplicationShutdown,
} from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { InjectModel } from '@nestjs/sequelize';
import { firstValueFrom } from 'rxjs';
import { Sequelize } from 'sequelize-typescript';
import { Outbox } from 'src/db/models/outbox';
import { KAFKA_CLIENT } from './publisher.kafka';
@Injectable()
export class PublisherService implements OnApplicationShutdown {
private logger: Logger = new Logger(PublisherService.name);
constructor(
@Inject(KAFKA_CLIENT) private readonly client: ClientProxy,
private readonly sequelize: Sequelize,
@InjectModel(Outbox) private outbox: typeof Outbox,
) {}
async poll() {
while (!this.shutdownRequested) {
const count = await this.outbox.count({
where: { sentAt: null },
});
if (count <= 0) {
await new Promise((resolve) => setTimeout(resolve, 1000));
continue;
}
const transaction = await this.sequelize.transaction();
const outbox = await this.outbox.findAll({
where: { sentAt: null },
order: [['createdAt', 'ASC']],
limit: 1,
transaction,
lock: transaction.LOCK.UPDATE,
});
try {
for (const message of outbox) {
await this.publishMessage(message);
await this.outbox.update(
{ sentAt: new Date() },
{ where: { id: message.id }, transaction },
);
}
await transaction.commit();
} catch (e) {
this.logger.error('PublisherService.listen() error: ' + e);
await transaction.rollback();
}
}
}
async publishMessage(message: Outbox) {
const topic = `${message.eventName}.v${message.eventVersion}`;
console.log('Publishing message: ', message.payload, ' to topic: ', topic);
const result = await firstValueFrom(
this.client.emit(topic, {
key: message.id,
value: {
event_name: message.eventName,
event_version: message.eventVersion,
payload: message.payload,
},
}),
);
console.log('Publishing result: ', result);
}
private shutdownRequested = false;
onApplicationShutdown() {
this.shutdownRequested = true;
}
}