Skip to content

Commit

Permalink
[service-utils] CJS import for kafkajs-lz4 (#6127)
Browse files Browse the repository at this point in the history
  • Loading branch information
arcoraven authored Jan 31, 2025
1 parent 7217e77 commit ca75681
Showing 1 changed file with 19 additions and 11 deletions.
30 changes: 19 additions & 11 deletions packages/service-utils/src/node/usageV2.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
import { randomUUID } from "node:crypto";
import { checkServerIdentity } from "node:tls";
import * as KafkaJS from "kafkajs";
import {
CompressionTypes,
Kafka,
type Producer,
type ProducerConfig,
} from "kafkajs";
import LZ4Codec from "kafkajs-lz4";
import type { ServiceName } from "../core/services.js";
import { type UsageV2Event, getTopicName } from "../core/usageV2.js";

// Import KafkaJS with CJS pattern (source: https://github.com/tulios/kafkajs/issues/1391)
import KafkaJS from "kafkajs";
const { CompressionCodecs } = KafkaJS;

/**
* Creates a UsageV2Producer which opens a persistent TCP connection.
* This class is thread-safe so your service should re-use one instance.
Expand All @@ -19,10 +28,10 @@ import { type UsageV2Event, getTopicName } from "../core/usageV2.js";
* ```
*/
export class UsageV2Producer {
private kafka: KafkaJS.Kafka;
private producer: KafkaJS.Producer | null = null;
private kafka: Kafka;
private producer: Producer | null = null;
private topic: string;
private compression: KafkaJS.CompressionTypes;
private compression: CompressionTypes;

constructor(config: {
/**
Expand All @@ -40,7 +49,7 @@ export class UsageV2Producer {
/**
* The compression algorithm to use.
*/
compression?: KafkaJS.CompressionTypes;
compression?: CompressionTypes;

username: string;
password: string;
Expand All @@ -49,12 +58,12 @@ export class UsageV2Producer {
producerName,
environment,
productName,
compression = KafkaJS.CompressionTypes.LZ4,
compression = CompressionTypes.LZ4,
username,
password,
} = config;

this.kafka = new KafkaJS.Kafka({
this.kafka = new Kafka({
clientId: `${producerName}-${environment}`,
brokers:
environment === "production"
Expand All @@ -80,10 +89,9 @@ export class UsageV2Producer {
* Connect the producer.
* This must be called before calling `sendEvents()`.
*/
async init(configOverrides?: KafkaJS.ProducerConfig) {
if (this.compression === KafkaJS.CompressionTypes.LZ4) {
KafkaJS.CompressionCodecs[KafkaJS.CompressionTypes.LZ4] =
new LZ4Codec().codec;
async init(configOverrides?: ProducerConfig) {
if (this.compression === CompressionTypes.LZ4) {
CompressionCodecs[CompressionTypes.LZ4] = new LZ4Codec().codec;
}

this.producer = this.kafka.producer({
Expand Down

0 comments on commit ca75681

Please sign in to comment.