Skip to content

Commit

Permalink
fix(enterprise): enforce stream batch size
Browse files Browse the repository at this point in the history
Changed `BufferedEventStream`'s batching logic to fill batches with
events / log entries up to a constant maximum byte count.

Compared to the previous batching logic, the new implementation is
fundamentally robust with respect to request size, and generally drains
its buffer much faster.

Also streamlined some event payloads to omit unnecessary and potentially
very large fields.
  • Loading branch information
thsig committed Sep 9, 2020
1 parent e2f0d8d commit 90ab9d0
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 28 deletions.
4 changes: 2 additions & 2 deletions core/src/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ export class ActionRouter implements TypeGuard {
const { result } = await this.callServiceHandler({ params, actionType: "getServiceStatus" })
this.garden.events.emit("serviceStatus", {
serviceName: params.service.name,
status: result,
status: omit(result, "detail"),
})
this.validateServiceOutputs(params.service, result)
return result
Expand All @@ -431,7 +431,7 @@ export class ActionRouter implements TypeGuard {
const { result } = await this.callServiceHandler({ params, actionType: "deployService" })
this.garden.events.emit("serviceStatus", {
serviceName: params.service.name,
status: result,
status: omit(result, "detail"),
})
this.validateServiceOutputs(params.service, result)
return result
Expand Down
84 changes: 69 additions & 15 deletions core/src/enterprise/buffered-event-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ export interface ApiLogBatch extends ApiBatchBase {

export const controlEventNames: Set<EventName> = new Set(["_workflowRunRegistered"])

/**
* We use 600 kilobytes as the maximum combined size of the events / log entries in a given batch. This number
* was chosen to fit comfortably below e.g. nginx' default max request size, while still being able to carry a decent
* number of records.
*/
export const MAX_BATCH_BYTES = 600 * 1000 // 600 kilobytes

/**
* Buffers events and log entries and periodically POSTs them to Garden Enterprise or another Garden service.
*
Expand Down Expand Up @@ -99,9 +106,7 @@ export class BufferedEventStream {
private intervalId: NodeJS.Timer | null
private bufferedEvents: StreamEvent[]
private bufferedLogEntries: LogEntryEvent[]

protected intervalMsec = 1000
protected maxBatchSize = 100

constructor(log: LogEntry, sessionId: string) {
this.sessionId = sessionId
Expand Down Expand Up @@ -155,7 +160,7 @@ export class BufferedEventStream {

startInterval() {
this.intervalId = setInterval(() => {
this.flushBuffered({ flushAll: false }).catch((err) => {
this.flushBuffered().catch((err) => {
this.log.error(err)
})
}, this.intervalMsec)
Expand All @@ -167,7 +172,7 @@ export class BufferedEventStream {
this.intervalId = null
}
try {
await this.flushBuffered({ flushAll: true })
await this.flushAll()
} catch (err) {
/**
* We don't throw an exception here, since a failure to stream events and log entries doesn't mean that the
Expand Down Expand Up @@ -230,7 +235,7 @@ export class BufferedEventStream {
await this.postToTargets(`${logEntries.length} log entries`, "log-entries", data)
}

private async postToTargets(description: string, path: string, data: any) {
private async postToTargets(description: string, path: string, data: ApiEventBatch | ApiLogBatch) {
if (this.targets.length === 0) {
this.log.silly("No targets to send events to. Dropping them.")
}
Expand All @@ -241,27 +246,76 @@ export class BufferedEventStream {
this.log.silly(`data: ${JSON.stringify(data)}`)
this.log.silly(`--------`)

await Bluebird.map(this.targets, (target) => {
const headers = this.getHeaders(target)
return got.post(`${target.host}/${path}`, { json: data, headers })
})
try {
await Bluebird.map(this.targets, (target) => {
const headers = this.getHeaders(target)
return got.post(`${target.host}/${path}`, { json: data, headers })
})
} catch (err) {
/**
* We don't throw an exception here, since a failure to stream events and log entries doesn't mean that the
* command failed.
*/
this.log.error(`Error while flushing events and log entries: ${err.message}`)
}
}

async flushBuffered({ flushAll = false }) {
/**
* Flushes all events and log entries until none remain, and returns a promise that resolves when all of them
* have been posted to their targets.
*/
async flushAll() {
if (!this.garden || this.targets.length === 0) {
return
}

const eventsToFlush = this.bufferedEvents.splice(0, flushAll ? this.bufferedEvents.length : this.maxBatchSize)
this.log.silly(`Flushing all remaining events and log entries`)
const flushPromises: Promise<any>[] = []
try {
while (this.bufferedEvents.length > 0 || this.bufferedLogEntries.length > 0) {
this.log.silly(`remaining: ${this.bufferedEvents.length} events, ${this.bufferedLogEntries.length} log entries`)
// while (this.bufferedEvents.length > 0 || this.bufferedLogEntries.length > 0) {
flushPromises.push(this.flushBuffered())
}
} catch (err) {
throw err
}
return Bluebird.all(flushPromises)
}

async flushBuffered() {
if (!this.garden || this.targets.length === 0) {
return
}

const logEntryFlushCount = flushAll
? this.bufferedLogEntries.length
: this.maxBatchSize - this.bufferedLogEntries.length
const logEntriesToFlush = this.bufferedLogEntries.splice(0, logEntryFlushCount)
const eventsToFlush = this.makeBatch(this.bufferedEvents)
const logEntriesToFlush = this.makeBatch(this.bufferedLogEntries)

return Bluebird.all([this.flushEvents(eventsToFlush), this.flushLogEntries(logEntriesToFlush)])
}

/**
* Adds buffered records (events or log entries) to a batch until none remain or until their combined size
* exceeds `MAX_MATCH_BYTES`, and returns the batch.
*/
makeBatch<B>(buffered: B[]): B[] {
const batch: B[] = []
let batchBytes = 0
while (batchBytes < MAX_BATCH_BYTES && buffered.length > 0) {
const nextRecordBytes = Buffer.from(JSON.stringify(buffered[0])).length
if (batchBytes + nextRecordBytes > MAX_BATCH_BYTES) {
break
}
if (nextRecordBytes > MAX_BATCH_BYTES) {
this.log.error(`Event or log entry too large to flush, dropping it.`)
this.log.debug(JSON.stringify(buffered[0]))
}
batch.push(buffered.shift() as B)
batchBytes += nextRecordBytes
}
return batch
}

getWorkflowRunUid(): string | undefined {
return gardenEnv.GARDEN_WORKFLOW_RUN_UID || this.workflowRunUid
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
*/

import { EventEmitter2 } from "eventemitter2"
import { ModuleVersion } from "./vcs/vcs"
import { GraphResult } from "./task-graph"
import { LogEntryEvent } from "./enterprise/buffered-event-stream"
import { ServiceStatus } from "./types/service"
import { RunStatus } from "./types/plugin/base"
import { Omit } from "./util/util"

export type GardenEventListener<T extends EventName> = (payload: Events[T]) => void

Expand Down Expand Up @@ -112,9 +112,9 @@ export interface Events extends LoggerEvents {
key: string
type: string
name: string
version: ModuleVersion
versionString: string
}
taskComplete: GraphResult
taskComplete: GraphResult // TODO: Omit dependencyResults in this payload type?
taskError: GraphResult
taskCancelled: {
cancelledAt: Date
Expand Down Expand Up @@ -143,7 +143,7 @@ export interface Events extends LoggerEvents {
}
serviceStatus: {
serviceName: string
status: ServiceStatus
status: Omit<ServiceStatus, "detail">
}

// Workflow events
Expand Down
2 changes: 1 addition & 1 deletion core/src/logger/log-entry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export interface WorkflowStepMetadata {
index: number
}

export const EVENT_LOG_LEVEL = LogLevel.debug
export const EVENT_LOG_LEVEL = LogLevel.info

interface MessageBase {
msg?: string
Expand Down
2 changes: 1 addition & 1 deletion core/src/task-graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ export class TaskGraph extends EventEmitter2 {
key,
batchId,
startedAt: new Date(),
version: task.version,
versionString: task.version.versionString,
})
result = await node.process(dependencyResults)
result.startedAt = startedAt
Expand Down
6 changes: 3 additions & 3 deletions core/test/unit/src/platform/buffered-event-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ describe("BufferedEventStream", () => {
garden.events.emit("_test", "event")
log.root.events.emit("_test", "log")

await bufferedEventStream.flushBuffered({ flushAll: true })
await bufferedEventStream.flushAll()

expect(find(flushedEvents, (e) => isMatch(e, { name: "_test", payload: "event" }))).to.exist
expect(flushedLogEntries).to.include("log")
Expand Down Expand Up @@ -79,13 +79,13 @@ describe("BufferedEventStream", () => {
log.root.events.emit("_test", "log")
gardenA.events.emit("_test", "event")

await bufferedEventStream.flushBuffered({ flushAll: true })
await bufferedEventStream.flushAll()

expect(flushedEvents.length).to.eql(0)
expect(flushedLogEntries).to.include("log")

gardenB.events.emit("_test", "event")
await bufferedEventStream.flushBuffered({ flushAll: true })
await bufferedEventStream.flushAll()

expect(find(flushedEvents, (e) => isMatch(e, { name: "_test", payload: "event" }))).to.exist
})
Expand Down
5 changes: 3 additions & 2 deletions core/test/unit/src/task-graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ describe("task-graph", () => {
key: task.getKey(),
name: task.name,
type: task.type,
version: task.version,
versionString: task.version.versionString,
},
},
{ name: "taskComplete", payload: result["a"] },
Expand Down Expand Up @@ -250,7 +250,7 @@ describe("task-graph", () => {
key: task.getKey(),
name: task.name,
type: task.type,
version: task.version,
versionString: task.version.versionString,
},
},
{ name: "taskError", payload: result["a"] },
Expand Down Expand Up @@ -740,6 +740,7 @@ describe("task-graph", () => {

const filteredKeys: Set<string | number> = new Set([
"version",
"versionString",
"error",
"addedAt",
"startedAt",
Expand Down

0 comments on commit 90ab9d0

Please sign in to comment.