Skip to content

Commit

Permalink
fix(enterprise): fix batch sizing logic
Browse files Browse the repository at this point in the history
We now correctly handle the case where a single event or log entry
payload exceeds `MAX_BATCH_BYTES` (dropping the event / log entry and
logging an error).

Also wrote a couple of unit tests for the batching logic.

In addition, the log level for error messages generated when posting to
targets failed was raised, to avoid cluttering the log.
  • Loading branch information
thsig committed Oct 5, 2020
1 parent 951031f commit da36730
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 15 deletions.
30 changes: 16 additions & 14 deletions core/src/enterprise/buffered-event-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,6 @@ export interface ApiLogBatch extends ApiBatchBase {

export const controlEventNames: Set<EventName> = new Set(["_workflowRunRegistered"])

/**
* We use 600 kilobytes as the maximum combined size of the events / log entries in a given batch. This number
* was chosen to fit comfortably below e.g. nginx' default max request size, while still being able to carry a decent
* number of records.
*/
export const MAX_BATCH_BYTES = 600 * 1000 // 600 kilobytes

/**
* Buffers events and log entries and periodically POSTs them to Garden Enterprise or another Garden service.
*
Expand Down Expand Up @@ -107,6 +100,13 @@ export class BufferedEventStream {
private bufferedLogEntries: LogEntryEvent[]
protected intervalMsec = 1000

/**
* We use 600 kilobytes as the maximum combined size of the events / log entries in a given batch. This number
* was chosen to fit comfortably below e.g. nginx' default max request size, while still being able to carry a decent
* number of records.
*/
private maxBatchBytes = 600 * 1024 // 600 kilobytes

constructor(log: LogEntry, sessionId: string) {
this.sessionId = sessionId
this.log = log
Expand Down Expand Up @@ -255,7 +255,7 @@ export class BufferedEventStream {
* We don't throw an exception here, since a failure to stream events and log entries doesn't mean that the
* command failed.
*/
this.log.error(`Error while flushing events and log entries: ${err.message}`)
this.log.debug(`Error while flushing events and log entries: ${err.message}`)
}
}

Expand Down Expand Up @@ -300,14 +300,16 @@ export class BufferedEventStream {
makeBatch<B>(buffered: B[]): B[] {
const batch: B[] = []
let batchBytes = 0
while (batchBytes < MAX_BATCH_BYTES && buffered.length > 0) {
const nextRecordBytes = Buffer.from(JSON.stringify(buffered[0])).length
if (batchBytes + nextRecordBytes > MAX_BATCH_BYTES) {
break
}
if (nextRecordBytes > MAX_BATCH_BYTES) {
while (batchBytes < this.maxBatchBytes && buffered.length > 0) {
let nextRecordBytes = Buffer.from(JSON.stringify(buffered[0])).length
if (nextRecordBytes > this.maxBatchBytes) {
this.log.error(`Event or log entry too large to flush, dropping it.`)
this.log.debug(JSON.stringify(buffered[0]))
buffered.shift() // Drop first record.
nextRecordBytes = Buffer.from(JSON.stringify(buffered[0])).length
}
if (batchBytes + nextRecordBytes > this.maxBatchBytes) {
break
}
batch.push(buffered.shift() as B)
batchBytes += nextRecordBytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ import { StreamEvent, LogEntryEvent, BufferedEventStream } from "../../../../src
import { getLogger } from "../../../../src/logger/logger"
import { Garden } from "../../../../src/garden"
import { makeTestGardenA } from "../../../helpers"
import { find, isMatch } from "lodash"
import { find, isMatch, range, repeat } from "lodash"

function makeDummyRecord(sizeKb: number) {
return { someKey: repeat("a", sizeKb * 1024) }
}

describe("BufferedEventStream", () => {
const getConnectionParams = (garden: Garden) => ({
Expand Down Expand Up @@ -89,4 +93,42 @@ describe("BufferedEventStream", () => {

expect(find(flushedEvents, (e) => isMatch(e, { name: "_test", payload: "event" }))).to.exist
})

describe("makeBatch", () => {
const maxBatchBytes = 3 * 1024 // Set this to a low value (3 Kb) to keep the memory use of the test suite low.
it("should pick records until the batch size reaches MAX_BATCH_BYTES", async () => {
const recordSizeKb = 0.5
const log = getLogger().placeholder()
const bufferedEventStream = new BufferedEventStream(log, "dummy-session-id")
bufferedEventStream["maxBatchBytes"] = maxBatchBytes
// Total size is ~3MB, which exceeds MAX_BATCH_BYTES
const records = range(100).map((_) => makeDummyRecord(recordSizeKb))
const batch = bufferedEventStream.makeBatch(records)
const batchSize = Buffer.from(JSON.stringify(batch)).length
expect(batch.length).to.be.lte(records.length)
expect(batch.length).to.be.lte(maxBatchBytes / (recordSizeKb * 1024))
expect(batchSize).to.be.lte(maxBatchBytes)
})

it("should drop individual records whose payload size exceeds MAX_BATCH_BYTES", async () => {
const recordSizeKb = 0.5
const log = getLogger().placeholder()
const bufferedEventStream = new BufferedEventStream(log, "dummy-session-id")
bufferedEventStream["maxBatchBytes"] = maxBatchBytes
// This record's size, exceeds MAX_BATCH_BYTES, so it should be dropped by `makeBatch`.
const tooLarge = {
...makeDummyRecord(maxBatchBytes / 1024 + 3),
tag: "tooLarge",
}
const records = [tooLarge, ...range(100).map((_) => makeDummyRecord(recordSizeKb))]
const batch = bufferedEventStream.makeBatch(records)
const batchSize = Buffer.from(JSON.stringify(batch)).length

expect(batch.find((r) => r["tag"] === "tooLarge")).to.be.undefined // We expect `tooLarge` to have been dropped.
expect(batch.length).to.be.gte(3)
expect(batch.length).to.be.lte(records.length)
expect(batch.length).to.be.lte(maxBatchBytes / (recordSizeKb * 1024))
expect(batchSize).to.be.lte(maxBatchBytes)
})
})
})

0 comments on commit da36730

Please sign in to comment.