Skip to content

Commit

Permalink
Adjust Kafka message estimator on MESSAGE_TOO_LARGE (#6374)
Browse files Browse the repository at this point in the history
  • Loading branch information
kamilkisiela authored Jan 22, 2025
1 parent 0e4be14 commit 393ece7
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 2 deletions.
5 changes: 5 additions & 0 deletions .changeset/ka-f-ka.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'hive': patch
---

Adjust the Kafka message size estimation only when Kafka gives back `MESSAGE_TOO_LARGE` error
9 changes: 9 additions & 0 deletions packages/services/usage/__tests__/buffer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ test('increase the defaultBytesPerOperation estimation by 5% when over 100 calls
limitInBytes: eventHubLimitInBytes,
useEstimator: true,
onRetry,
isTooLargePayloadError() {
return true;
},
calculateReportSize(report) {
return report.size;
},
Expand Down Expand Up @@ -150,6 +153,9 @@ test('buffer should split the report into multiple reports when the estimated si
interval,
limitInBytes: eventHubLimitInBytes,
useEstimator: true,
isTooLargePayloadError() {
return true;
},
calculateReportSize(report) {
return report.size;
},
Expand Down Expand Up @@ -262,6 +268,9 @@ test('buffer create two chunks out of one buffer when actual buffer size is too
interval,
limitInBytes: eventHubLimitInBytes,
useEstimator: true,
isTooLargePayloadError() {
return true;
},
calculateReportSize(report) {
return report.size;
},
Expand Down
7 changes: 5 additions & 2 deletions packages/services/usage/src/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ export function createKVBuffer<T>(config: {
calculateReportSize(report: T): number;
split(report: T, numOfChunks: number): readonly T[];
onRetry(reports: readonly T[]): void;
isTooLargePayloadError(error: unknown): boolean;
sender(
reports: readonly T[],
estimatedSizeInBytes: number,
Expand Down Expand Up @@ -254,8 +255,10 @@ export function createKVBuffer<T>(config: {
await flushBuffer(reports, size, batchId);
} catch (error) {
logger.error(error);
// the payload size was most likely too big
estimator.overflowed(batchId);
if (config.isTooLargePayloadError(error)) {
// the payload size was most likely too big
estimator.overflowed(batchId);
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions packages/services/usage/src/usage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ export function createUsage(config: {
interval: config.kafka.buffer.interval,
limitInBytes: 990_000, // 1MB is the limit of a single request to EventHub, let's keep it below that
useEstimator: config.kafka.buffer.dynamic,
isTooLargePayloadError(error) {
return error instanceof Error && 'type' in error && error.type === 'MESSAGE_TOO_LARGE';
},
calculateReportSize(report) {
return Object.keys(report.map).length;
},
Expand Down

0 comments on commit 393ece7

Please sign in to comment.