Skip to content

Commit

Permalink
fix(enterprise): wait for event stream to flush
Browse files Browse the repository at this point in the history
We now wait for `BufferedEventStream` to finish flushing any buffered
events and log entries before exiting a command.
  • Loading branch information
thsig authored and edvald committed Jun 25, 2020
1 parent dc302a3 commit f8854c9
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 16 deletions.
2 changes: 2 additions & 0 deletions garden-service/src/cli/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,8 @@ export class GardenCli {
}
} while (result.restartRequired)

await bufferedEventStream.close()

// We attach the action result to cli context so that we can process it in the parse method
cliContext.details.result = result
}
Expand Down
40 changes: 24 additions & 16 deletions garden-service/src/enterprise/buffered-event-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

import { registerCleanupFunction } from "../util/util"
import Bluebird from "bluebird"
import { Events, EventName, EventBus, eventNames } from "../events"
import { LogEntryMetadata, LogEntry } from "../logger/log-entry"
import { chainMessages } from "../logger/renderers"
Expand Down Expand Up @@ -122,18 +122,22 @@ export class BufferedEventStream {
this.intervalId = setInterval(() => {
this.flushBuffered({ flushAll: false })
}, FLUSH_INTERVAL_MSEC)

registerCleanupFunction("flushAllBufferedEventsAndLogEntries", () => {
this.close()
})
}

close() {
async close() {
if (this.intervalId) {
clearInterval(this.intervalId)
this.intervalId = null
}
this.flushBuffered({ flushAll: true })
try {
await this.flushBuffered({ flushAll: true })
} catch (err) {
/**
* We don't throw an exception here, since a failure to stream events and log entries doesn't mean that the
* command failed.
*/
this.log.error(`Error while flushing events and log entries: ${err.message}`)
}
}

streamEvent<T extends EventName>(name: T, payload: Events[T]) {
Expand All @@ -148,7 +152,11 @@ export class BufferedEventStream {
this.bufferedLogEntries.push(logEntry)
}

// Note: Returns a promise.
flushEvents(events: StreamEvent[]) {
if (events.length === 0) {
return
}
const data = {
events,
workflowRunUid,
Expand All @@ -160,12 +168,16 @@ export class BufferedEventStream {
this.log.silly(`--------`)
this.log.silly(`data: ${JSON.stringify(data)}`)
this.log.silly(`--------`)
got.post(`${this.enterpriseDomain}/events`, { json: data, headers }).catch((err) => {
return got.post(`${this.enterpriseDomain}/events`, { json: data, headers }).catch((err) => {
this.log.error(err)
})
}

// Note: Returns a promise.
flushLogEntries(logEntries: LogEntryEvent[]) {
if (logEntries.length === 0) {
return
}
const data = {
logEntries,
workflowRunUid,
Expand All @@ -185,15 +197,11 @@ export class BufferedEventStream {
flushBuffered({ flushAll = false }) {
const eventsToFlush = this.bufferedEvents.splice(0, flushAll ? this.bufferedEvents.length : MAX_BATCH_SIZE)

if (eventsToFlush.length > 0) {
this.flushEvents(eventsToFlush)
}

const logEntryFlushCount = flushAll ? this.bufferedLogEntries.length : MAX_BATCH_SIZE - eventsToFlush.length
const logEntryFlushCount = flushAll
? this.bufferedLogEntries.length
: MAX_BATCH_SIZE - this.bufferedLogEntries.length
const logEntriesToFlush = this.bufferedLogEntries.splice(0, logEntryFlushCount)

if (logEntriesToFlush.length > 0) {
this.flushLogEntries(logEntriesToFlush)
}
return Bluebird.all([this.flushEvents(eventsToFlush), this.flushLogEntries(logEntriesToFlush)])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ describe("BufferedEventStream", () => {

bufferedEventStream["flushEvents"] = (events: StreamEvent[]) => {
flushedEvents.push(...events)
return Promise.resolve()
}
bufferedEventStream["flushLogEntries"] = (logEntries: LogEntryEvent[]) => {
flushedLogEntries.push(...logEntries)
return Promise.resolve()
}

const eventBus = new EventBus()
Expand All @@ -49,9 +51,11 @@ describe("BufferedEventStream", () => {

bufferedEventStream["flushEvents"] = (events: StreamEvent[]) => {
flushedEvents.push(...events)
return Promise.resolve()
}
bufferedEventStream["flushLogEntries"] = (logEntries: LogEntryEvent[]) => {
flushedLogEntries.push(...logEntries)
return Promise.resolve()
}

const oldEventBus = new EventBus()
Expand Down

0 comments on commit f8854c9

Please sign in to comment.