From f8854c97543b1d672ccaa05a640a5580924bd7de Mon Sep 17 00:00:00 2001 From: Thorarinn Sigurdsson Date: Thu, 25 Jun 2020 17:39:04 +0200 Subject: [PATCH] fix(enterprise): wait for event stream to flush We now wait for `BufferedEventStream` to finish flushing any buffered events and log entries before exiting a command. --- garden-service/src/cli/cli.ts | 2 + .../src/enterprise/buffered-event-stream.ts | 40 +++++++++++-------- .../src/platform/buffered-event-stream.ts | 4 ++ 3 files changed, 30 insertions(+), 16 deletions(-) diff --git a/garden-service/src/cli/cli.ts b/garden-service/src/cli/cli.ts index b543f5e7a6..be1a41df06 100644 --- a/garden-service/src/cli/cli.ts +++ b/garden-service/src/cli/cli.ts @@ -396,6 +396,8 @@ export class GardenCli { } } while (result.restartRequired) + await bufferedEventStream.close() + // We attach the action result to cli context so that we can process it in the parse method cliContext.details.result = result } diff --git a/garden-service/src/enterprise/buffered-event-stream.ts b/garden-service/src/enterprise/buffered-event-stream.ts index 4fd1ba8c95..3fa3a835f0 100644 --- a/garden-service/src/enterprise/buffered-event-stream.ts +++ b/garden-service/src/enterprise/buffered-event-stream.ts @@ -6,7 +6,7 @@ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -import { registerCleanupFunction } from "../util/util" +import Bluebird from "bluebird" import { Events, EventName, EventBus, eventNames } from "../events" import { LogEntryMetadata, LogEntry } from "../logger/log-entry" import { chainMessages } from "../logger/renderers" @@ -122,18 +122,22 @@ export class BufferedEventStream { this.intervalId = setInterval(() => { this.flushBuffered({ flushAll: false }) }, FLUSH_INTERVAL_MSEC) - - registerCleanupFunction("flushAllBufferedEventsAndLogEntries", () => { - this.close() - }) } - close() { + async close() { if (this.intervalId) { clearInterval(this.intervalId) this.intervalId = null } - this.flushBuffered({ flushAll: true }) + try { + await this.flushBuffered({ flushAll: true }) + } catch (err) { + /** + * We don't throw an exception here, since a failure to stream events and log entries doesn't mean that the + * command failed. + */ + this.log.error(`Error while flushing events and log entries: ${err.message}`) + } } streamEvent(name: T, payload: Events[T]) { @@ -148,7 +152,11 @@ export class BufferedEventStream { this.bufferedLogEntries.push(logEntry) } + // Note: Returns a promise. flushEvents(events: StreamEvent[]) { + if (events.length === 0) { + return + } const data = { events, workflowRunUid, @@ -160,12 +168,16 @@ export class BufferedEventStream { this.log.silly(`--------`) this.log.silly(`data: ${JSON.stringify(data)}`) this.log.silly(`--------`) - got.post(`${this.enterpriseDomain}/events`, { json: data, headers }).catch((err) => { + return got.post(`${this.enterpriseDomain}/events`, { json: data, headers }).catch((err) => { this.log.error(err) }) } + // Note: Returns a promise. flushLogEntries(logEntries: LogEntryEvent[]) { + if (logEntries.length === 0) { + return + } const data = { logEntries, workflowRunUid, @@ -185,15 +197,11 @@ export class BufferedEventStream { flushBuffered({ flushAll = false }) { const eventsToFlush = this.bufferedEvents.splice(0, flushAll ? this.bufferedEvents.length : MAX_BATCH_SIZE) - if (eventsToFlush.length > 0) { - this.flushEvents(eventsToFlush) - } - - const logEntryFlushCount = flushAll ? this.bufferedLogEntries.length : MAX_BATCH_SIZE - eventsToFlush.length + const logEntryFlushCount = flushAll + ? this.bufferedLogEntries.length + : MAX_BATCH_SIZE - this.bufferedLogEntries.length const logEntriesToFlush = this.bufferedLogEntries.splice(0, logEntryFlushCount) - if (logEntriesToFlush.length > 0) { - this.flushLogEntries(logEntriesToFlush) - } + return Bluebird.all([this.flushEvents(eventsToFlush), this.flushLogEntries(logEntriesToFlush)]) } } diff --git a/garden-service/test/unit/src/platform/buffered-event-stream.ts b/garden-service/test/unit/src/platform/buffered-event-stream.ts index bb3990ef3d..518833a2a5 100644 --- a/garden-service/test/unit/src/platform/buffered-event-stream.ts +++ b/garden-service/test/unit/src/platform/buffered-event-stream.ts @@ -22,9 +22,11 @@ describe("BufferedEventStream", () => { bufferedEventStream["flushEvents"] = (events: StreamEvent[]) => { flushedEvents.push(...events) + return Promise.resolve() } bufferedEventStream["flushLogEntries"] = (logEntries: LogEntryEvent[]) => { flushedLogEntries.push(...logEntries) + return Promise.resolve() } const eventBus = new EventBus() @@ -49,9 +51,11 @@ describe("BufferedEventStream", () => { bufferedEventStream["flushEvents"] = (events: StreamEvent[]) => { flushedEvents.push(...events) + return Promise.resolve() } bufferedEventStream["flushLogEntries"] = (logEntries: LogEntryEvent[]) => { flushedLogEntries.push(...logEntries) + return Promise.resolve() } const oldEventBus = new EventBus()