Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pilot rebase update #91

Merged
merged 6 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .eslintcache

This file was deleted.

1 change: 1 addition & 0 deletions .eslintignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
node_modules/*
Binary file modified config/ipms_facility_mappings.xlsx
Binary file not shown.
Binary file added config/ipms_facility_mappings_v1.xlsx
Binary file not shown.
54 changes: 29 additions & 25 deletions src/lib/kafkaProducerUtil.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { Kafka, KafkaConfig, Producer, ProducerRecord, Transaction } from 'kafkajs';
import logger from './winston';
import { Kafka, KafkaConfig, Producer, ProducerRecord, Transaction } from 'kafkajs'
import logger from './winston'

type DeliveryReportCallback = (report: any) => void;
type DeliveryReportCallback = (report: any) => void

/**
* KafkaUtil class provides utility functions to interact with Kafka producer.
*/
export class KafkaProducerUtil {
private producer: Producer | null = null;
private producer: Producer | null = null

/**
* Creates an instance of KafkaUtil.
Expand All @@ -23,10 +23,10 @@ export class KafkaProducerUtil {
*/
public async init(): Promise<void> {
try {
this.producer = await this.createProducer();
this.producer = await this.createProducer()
} catch (err) {
console.error('Failed to initialize producer:', err);
throw err;
console.error('Failed to initialize producer:', err)
throw err
}
}

Expand All @@ -35,13 +35,17 @@ export class KafkaProducerUtil {
* @returns {Promise<Producer>} Promise that resolves with Kafka producer instance.
*/
private async createProducer(): Promise<Producer> {
logger.info('Creating Kafka producer...');
const kafka = new Kafka(this.config);
const producer = kafka.producer({transactionalId: 'shr-producer-transaction', idempotent: true, maxInFlightRequests: 1});
await producer.connect();
return producer;
logger.info('Creating Kafka producer...')
const kafka = new Kafka(this.config)
const producer = kafka.producer({
transactionalId: 'shr-producer-transaction',
idempotent: true,
maxInFlightRequests: 1,
})
await producer.connect()
return producer
}

/**
* Sends message using transaction.
* @param {ProducerRecord[]} records - Array of producer records to send.
Expand All @@ -51,22 +55,22 @@ export class KafkaProducerUtil {
public async sendMessageTransactionally(records: ProducerRecord[]): Promise<void> {
if (!this.producer) {
logger.error('Producer is not initialized.')
throw new Error('Producer is not initialized.');
throw new Error('Producer is not initialized.')
}

const transaction: Transaction = await this.producer.transaction();
const transaction: Transaction = await this.producer.transaction()
try {
logger.info('Sending the following records transactionally:');
logger.info(JSON.stringify(records, null, 2));
//logger.info('Sending the following records transactionally:');
//logger.info(JSON.stringify(records, null, 2));
for (const record of records) {
await transaction.send(record);
await transaction.send(record)
}
await transaction.commit();
this.onDeliveryReport({ status: 'committed' });
await transaction.commit()
this.onDeliveryReport({ status: 'committed' })
} catch (err) {
await transaction.abort();
this.onDeliveryReport({ status: 'aborted' });
throw err;
await transaction.abort()
this.onDeliveryReport({ status: 'aborted' })
throw err
}
}

Expand All @@ -75,9 +79,9 @@ export class KafkaProducerUtil {
* @returns {Promise<void>} Promise that resolves when producer is disconnected.
*/
public async shutdown(): Promise<void> {
logger.info('Shutting down Kafka producer...');
logger.info('Shutting down Kafka producer...')
if (this.producer) {
await this.producer.disconnect();
await this.producer.disconnect()
}
}
}
Loading