From ffc1094311f48f221ae86c28456838aa540e581f Mon Sep 17 00:00:00 2001 From: Thorarinn Sigurdsson Date: Mon, 10 Feb 2020 15:12:15 +0100 Subject: [PATCH] feat(core): add event and log streaming * Added an event bus to Logger, which emits events when log entries are created or updated. * Added the `BufferedEventStream` class. This is used for batching and streaming events from the Logger and the active Garden instance to the platform when the user is logged in. Later, we can use this class for streaming to the dashboard as well. --- garden-service/src/analytics/analytics.ts | 7 +- garden-service/src/cli/cli.ts | 8 + .../src/{platform => cloud}/auth.ts | 4 +- .../src/cloud/buffered-event-stream.ts | 187 ++++++++++++++++++ garden-service/src/commands/login.ts | 2 +- garden-service/src/commands/logout.ts | 2 +- garden-service/src/db/base-entity.ts | 1 - garden-service/src/events.ts | 42 +++- garden-service/src/garden.ts | 5 +- garden-service/src/logger/log-entry.ts | 17 +- garden-service/src/logger/logger.ts | 9 +- .../logger/writers/json-terminal-writer.ts | 1 - garden-service/src/process.ts | 1 + garden-service/test/helpers.ts | 6 +- garden-service/test/unit/src/events.ts | 3 +- garden-service/test/unit/src/logger/logger.ts | 33 ++++ garden-service/test/unit/src/platform/auth.ts | 4 +- .../src/platform/buffered-event-stream.ts | 75 +++++++ 18 files changed, 379 insertions(+), 28 deletions(-) rename garden-service/src/{platform => cloud}/auth.ts (97%) create mode 100644 garden-service/src/cloud/buffered-event-stream.ts create mode 100644 garden-service/test/unit/src/platform/buffered-event-stream.ts diff --git a/garden-service/src/analytics/analytics.ts b/garden-service/src/analytics/analytics.ts index 7946cf7268..b957e31b29 100644 --- a/garden-service/src/analytics/analytics.ts +++ b/garden-service/src/analytics/analytics.ts @@ -20,6 +20,7 @@ import { Events, EventName } from "../events" import { AnalyticsType } from "./analytics-types" import dedent from "dedent" import { getGitHubUrl } from "../docs/common" +import { InternalError } from "../exceptions" const API_KEY = process.env.ANALYTICS_DEV ? SEGMENT_DEV_API_KEY : SEGMENT_PROD_API_KEY @@ -134,14 +135,18 @@ export class AnalyticsHandler { private ciName = ci.name private systemConfig: SystemInfo private isCI = ci.isCI - private sessionId = uuidv4() + private sessionId: string protected garden: Garden private projectMetadata: ProjectMetadata private constructor(garden: Garden, log: LogEntry) { + if (!garden.sessionId) { + throw new InternalError(`Garden instance with null sessionId passed to AnalyticsHandler constructor.`, {}) + } this.segment = new segmentClient(API_KEY, { flushAt: 20, flushInterval: 300 }) this.log = log this.garden = garden + this.sessionId = garden.sessionId this.globalConfigStore = new GlobalConfigStore() this.analyticsConfig = { userId: "", diff --git a/garden-service/src/cli/cli.ts b/garden-service/src/cli/cli.ts index d24a921706..dd8e87a7d1 100644 --- a/garden-service/src/cli/cli.ts +++ b/garden-service/src/cli/cli.ts @@ -55,6 +55,7 @@ import { AnalyticsHandler } from "../analytics/analytics" import { defaultDotIgnoreFiles } from "../util/fs" import { renderError } from "../logger/renderers" import { getDefaultProfiler } from "../util/profiling" +import { BufferedEventStream } from "../cloud/buffered-event-stream" const OUTPUT_RENDERERS = { json: (data: DeepPrimitiveMap) => { @@ -309,7 +310,9 @@ export class GardenCli { logger.info("") const footerLog = logger.placeholder() + // Init event & log streaming. const sessionId = uuidv4() + const bufferedEventStream = new BufferedEventStream(log, sessionId) const contextOpts: GardenOpts = { commandInfo: { @@ -342,6 +345,11 @@ export class GardenCli { } else { garden = await Garden.factory(root, contextOpts) } + + if (garden.clientAuthToken && garden.platformUrl) { + bufferedEventStream.connect(garden.events, garden.clientAuthToken, garden.platformUrl, garden.projectName) + } + // Register log file writers. We need to do this after the Garden class is initialised because // the file writers depend on the project root. await this.initFileWriters(logger, garden.projectRoot, garden.gardenDirPath) diff --git a/garden-service/src/platform/auth.ts b/garden-service/src/cloud/auth.ts similarity index 97% rename from garden-service/src/platform/auth.ts rename to garden-service/src/cloud/auth.ts index 62465d3b6e..b8ace9741a 100644 --- a/garden-service/src/platform/auth.ts +++ b/garden-service/src/cloud/auth.ts @@ -19,6 +19,8 @@ import { LogEntry } from "../logger/log-entry" import { got } from "../util/http" import { RuntimeError } from "../exceptions" +export const makeAuthHeader = (clientAuthToken: string) => ({ "x-access-auth-token": clientAuthToken }) + // TODO: Add error handling and tests for all of this /** @@ -65,7 +67,7 @@ async function checkClientAuthToken(token: string, platformUrl: string, log: Log await got({ method: "get", url: `${platformUrl}/token/verify`, - headers: { "x-access-auth-token": token }, + headers: makeAuthHeader(token), }) valid = true } catch (err) { diff --git a/garden-service/src/cloud/buffered-event-stream.ts b/garden-service/src/cloud/buffered-event-stream.ts new file mode 100644 index 0000000000..8190f58749 --- /dev/null +++ b/garden-service/src/cloud/buffered-event-stream.ts @@ -0,0 +1,187 @@ +/* + * Copyright (C) 2018-2020 Garden Technologies, Inc. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +import { registerCleanupFunction } from "../util/util" +import { Events, EventName, EventBus, eventNames } from "../events" +import { LogEntryMetadata, LogEntry } from "../logger/log-entry" +import { chainMessages } from "../logger/renderers" +import { got } from "../util/http" +import { makeAuthHeader } from "./auth" + +export type StreamEvent = { + name: EventName + payload: Events[EventName] + timestamp: Date +} + +export interface LogEntryEvent { + key: string + parentKey: string | null + revision: number + msg: string | string[] + timestamp: Date + data?: any + section?: string + metadata?: LogEntryMetadata +} + +export function formatForEventStream(entry: LogEntry): LogEntryEvent { + const { section, data } = entry.getMessageState() + const { key, revision } = entry + const parentKey = entry.parent ? entry.parent.key : null + const metadata = entry.getMetadata() + const msg = chainMessages(entry.getMessageStates() || []) + const timestamp = new Date() + return { key, parentKey, revision, msg, data, metadata, section, timestamp } +} + +export const FLUSH_INTERVAL_MSEC = 1000 +export const MAX_BATCH_SIZE = 100 + +/** + * Buffers events and log entries and periodically POSTs them to the platform. + * + * Subscribes to logger events once, in the constructor. + * + * Subscribes to Garden events via the connect method, since we need to subscribe to the event bus of + * new Garden instances (and unsubscribe from events from the previously connected Garden instance, if + * any) e.g. when config changes during a watch-mode command. + */ +export class BufferedEventStream { + private log: LogEntry + private eventBus: EventBus + public sessionId: string + private platformUrl: string + private clientAuthToken: string + private projectName: string + + /** + * We maintain this map to facilitate unsubscribing from a previously connected event bus + * when a new event bus is connected. + */ + private gardenEventListeners: { [eventName: string]: (payload: any) => void } + + private intervalId: NodeJS.Timer | null + private bufferedEvents: StreamEvent[] + private bufferedLogEntries: LogEntryEvent[] + + constructor(log: LogEntry, sessionId: string) { + this.sessionId = sessionId + this.log = log + this.log.root.events.onAny((_name: string, payload: LogEntryEvent) => { + this.streamLogEntry(payload) + }) + this.bufferedEvents = [] + this.bufferedLogEntries = [] + } + + // TODO: Replace projectName with projectId once we've figured out the flow for that. + connect(eventBus: EventBus, clientAuthToken: string, platformUrl: string, projectName: string) { + this.clientAuthToken = clientAuthToken + this.platformUrl = platformUrl + this.projectName = projectName + + if (!this.intervalId) { + this.startInterval() + } + + if (this.eventBus) { + // We unsubscribe from the old event bus' events. + this.unsubscribeFromGardenEvents(this.eventBus) + } + + this.eventBus = eventBus + this.subscribeToGardenEvents(this.eventBus) + } + + subscribeToGardenEvents(eventBus: EventBus) { + // We maintain this map to facilitate unsubscribing from events when the Garden instance is closed. + const gardenEventListeners = {} + for (const gardenEventName of eventNames) { + const listener = (payload: LogEntryEvent) => this.streamEvent(gardenEventName, payload) + gardenEventListeners[gardenEventName] = listener + eventBus.on(gardenEventName, listener) + } + this.gardenEventListeners = gardenEventListeners + } + + unsubscribeFromGardenEvents(eventBus: EventBus) { + for (const [gardenEventName, listener] of Object.entries(this.gardenEventListeners)) { + eventBus.removeListener(gardenEventName, listener) + } + } + + startInterval() { + this.intervalId = setInterval(() => { + this.flushBuffered({ flushAll: false }) + }, FLUSH_INTERVAL_MSEC) + + registerCleanupFunction("flushAllBufferedEventsAndLogEntries", () => { + this.close() + }) + } + + close() { + if (this.intervalId) { + clearInterval(this.intervalId) + this.intervalId = null + } + this.flushBuffered({ flushAll: true }) + } + + streamEvent(name: T, payload: Events[T]) { + this.bufferedEvents.push({ + name, + payload, + timestamp: new Date(), + }) + } + + streamLogEntry(logEntry: LogEntryEvent) { + this.bufferedLogEntries.push(logEntry) + } + + flushEvents(events: StreamEvent[]) { + const data = { + events, + sessionId: this.sessionId, + projectName: this.projectName, + } + const headers = makeAuthHeader(this.clientAuthToken) + got.post(`${this.platformUrl}/events`, { json: data, headers }).catch((err) => { + this.log.error(err) + }) + } + + flushLogEntries(logEntries: LogEntryEvent[]) { + const data = { + logEntries, + sessionId: this.sessionId, + projectName: this.projectName, + } + const headers = makeAuthHeader(this.clientAuthToken) + got.post(`${this.platformUrl}/log-entries`, { json: data, headers }).catch((err) => { + this.log.error(err) + }) + } + + flushBuffered({ flushAll = false }) { + const eventsToFlush = this.bufferedEvents.splice(0, flushAll ? this.bufferedEvents.length : MAX_BATCH_SIZE) + + if (eventsToFlush.length > 0) { + this.flushEvents(eventsToFlush) + } + + const logEntryFlushCount = flushAll ? this.bufferedLogEntries.length : MAX_BATCH_SIZE - eventsToFlush.length + const logEntriesToFlush = this.bufferedLogEntries.splice(0, logEntryFlushCount) + + if (logEntriesToFlush.length > 0) { + this.flushLogEntries(logEntriesToFlush) + } + } +} diff --git a/garden-service/src/commands/login.ts b/garden-service/src/commands/login.ts index 1843171d0e..5ddf912039 100644 --- a/garden-service/src/commands/login.ts +++ b/garden-service/src/commands/login.ts @@ -9,7 +9,7 @@ import { Command, CommandParams, CommandResult } from "./base" import { printHeader } from "../logger/util" import dedent = require("dedent") -import { login } from "../platform/auth" +import { login } from "../cloud/auth" export class LoginCommand extends Command { name = "login" diff --git a/garden-service/src/commands/logout.ts b/garden-service/src/commands/logout.ts index dc700f4782..c54615eb18 100644 --- a/garden-service/src/commands/logout.ts +++ b/garden-service/src/commands/logout.ts @@ -9,7 +9,7 @@ import { Command, CommandParams, CommandResult } from "./base" import { printHeader } from "../logger/util" import dedent = require("dedent") -import { clearAuthToken } from "../platform/auth" +import { clearAuthToken } from "../cloud/auth" export class LogOutCommand extends Command { name = "logout" diff --git a/garden-service/src/db/base-entity.ts b/garden-service/src/db/base-entity.ts index a7b691d315..49342af03e 100644 --- a/garden-service/src/db/base-entity.ts +++ b/garden-service/src/db/base-entity.ts @@ -45,7 +45,6 @@ export class GardenEntity extends BaseEntity { /** * Helper method to avoid circular import issues. */ - static getConnection() { return getConnection() } diff --git a/garden-service/src/events.ts b/garden-service/src/events.ts index c055ca3ac7..31d3a066fb 100644 --- a/garden-service/src/events.ts +++ b/garden-service/src/events.ts @@ -7,9 +7,9 @@ */ import { EventEmitter2 } from "eventemitter2" -import { LogEntry } from "./logger/log-entry" import { ModuleVersion } from "./vcs/vcs" import { TaskResult } from "./task-graph" +import { LogEntryEvent } from "./cloud/buffered-event-stream" /** * This simple class serves as the central event bus for a Garden instance. Its function @@ -18,7 +18,7 @@ import { TaskResult } from "./task-graph" * See below for the event interfaces. */ export class EventBus extends EventEmitter2 { - constructor(private log: LogEntry) { + constructor() { super({ wildcard: false, newListener: false, @@ -27,7 +27,6 @@ export class EventBus extends EventEmitter2 { } emit(name: T, payload: Events[T]) { - this.log.silly(`Emit event '${name}'`) return super.emit(name, payload) } @@ -47,9 +46,19 @@ export class EventBus extends EventEmitter2 { } /** - * The supported events and their interfaces. + * Supported logger events and their interfaces. */ -export type Events = { +export interface LoggerEvents { + _test: any + logEntry: LogEntryEvent +} + +export type LoggerEventName = keyof LoggerEvents + +/** + * Supported Garden events and their interfaces. + */ +export interface Events extends LoggerEvents { // Internal test/control events _exit: {} _restart: {} @@ -108,8 +117,29 @@ export type Events = { taskGraphComplete: { completedAt: Date } - watchingForChanges: {} } export type EventName = keyof Events + +// Note: Does not include logger events. +export const eventNames: EventName[] = [ + "_exit", + "_restart", + "_test", + "configAdded", + "configRemoved", + "internalError", + "projectConfigChanged", + "moduleConfigChanged", + "moduleSourcesChanged", + "moduleRemoved", + "taskPending", + "taskProcessing", + "taskComplete", + "taskError", + "taskCancelled", + "taskGraphProcessing", + "taskGraphComplete", + "watchingForChanges", +] diff --git a/garden-service/src/garden.ts b/garden-service/src/garden.ts index aed477cda0..35e4c69c31 100644 --- a/garden-service/src/garden.ts +++ b/garden-service/src/garden.ts @@ -61,7 +61,7 @@ import { deline, naturalList } from "./util/string" import { ensureConnected } from "./db/connection" import { DependencyValidationGraph } from "./util/validate-dependencies" import { Profile } from "./util/profiling" -import { readAuthToken, login } from "./platform/auth" +import { readAuthToken, login } from "./cloud/auth" import { ResolveModuleTask, getResolvedModules } from "./tasks/resolve-module" import username from "username" @@ -220,7 +220,7 @@ export class Garden { this.resolvedProviders = {} this.taskGraph = new TaskGraph(this, this.log) - this.events = new EventBus(this.log) + this.events = new EventBus() // Register plugins for (const plugin of [...builtinPlugins, ...params.plugins]) { @@ -336,6 +336,7 @@ export class Garden { * Clean up before shutting down. */ async close() { + this.events.removeAllListeners() this.watcher && (await this.watcher.stop()) } diff --git a/garden-service/src/logger/log-entry.ts b/garden-service/src/logger/log-entry.ts index de4093ba89..8f1ca51e92 100644 --- a/garden-service/src/logger/log-entry.ts +++ b/garden-service/src/logger/log-entry.ts @@ -33,6 +33,8 @@ export interface TaskMetadata { durationMs?: number } +export const EVENT_LOG_LEVEL = LogLevel.debug + interface MessageBase { msg?: string emoji?: EmojiName @@ -90,6 +92,7 @@ export class LogEntry extends LogNode { public readonly childEntriesInheritLevel?: boolean public readonly id?: string public isPlaceholder?: boolean + public revision: number constructor(params: LogEntryConstructor) { super(params.level, params.parent, params.id) @@ -102,6 +105,7 @@ export class LogEntry extends LogNode { this.metadata = params.metadata this.id = params.id this.isPlaceholder = params.isPlaceholder + this.revision = -1 if (!params.isPlaceholder) { this.update({ @@ -124,6 +128,7 @@ export class LogEntry extends LogNode { * 3. next metadata is merged with the previous metadata */ protected update(updateParams: UpdateLogEntryParams): void { + this.revision = this.revision + 1 const messageState = this.getMessageState() // Explicitly set all the fields so the shape stays consistent @@ -232,13 +237,13 @@ export class LogEntry extends LogNode { setState(params?: string | UpdateLogEntryParams): LogEntry { this.isPlaceholder = false this.deepUpdate({ ...resolveParams(params) }) - this.root.onGraphChange(this) + this.onGraphChange(this) return this } setDone(params?: string | Omit): LogEntry { this.deepUpdate({ ...resolveParams(params), status: "done" }) - this.root.onGraphChange(this) + this.onGraphChange(this) return this } @@ -248,7 +253,7 @@ export class LogEntry extends LogNode { symbol: "success", status: "success", }) - this.root.onGraphChange(this) + this.onGraphChange(this) return this } @@ -258,7 +263,7 @@ export class LogEntry extends LogNode { symbol: "error", status: "error", }) - this.root.onGraphChange(this) + this.onGraphChange(this) return this } @@ -268,7 +273,7 @@ export class LogEntry extends LogNode { symbol: "warning", status: "warn", }) - this.root.onGraphChange(this) + this.onGraphChange(this) return this } @@ -280,7 +285,7 @@ export class LogEntry extends LogNode { // Stop gracefully if still in active state if (this.getMessageState().status === "active") { this.update({ symbol: "empty", status: "done" }) - this.root.onGraphChange(this) + this.onGraphChange(this) } return this } diff --git a/garden-service/src/logger/logger.ts b/garden-service/src/logger/logger.ts index d05bca3c6a..b36a22da72 100644 --- a/garden-service/src/logger/logger.ts +++ b/garden-service/src/logger/logger.ts @@ -7,7 +7,7 @@ */ import { LogNode, CreateNodeParams } from "./log-node" -import { LogEntry } from "./log-entry" +import { LogEntry, EVENT_LOG_LEVEL } from "./log-entry" import { getChildEntries, findLogNode } from "./util" import { Writer } from "./writers/base" import { InternalError, ParameterError } from "../exceptions" @@ -17,6 +17,8 @@ import { FancyTerminalWriter } from "./writers/fancy-terminal-writer" import { JsonTerminalWriter } from "./writers/json-terminal-writer" import { parseLogLevel } from "../cli/helpers" import { FullscreenTerminalWriter } from "./writers/fullscreen-terminal-writer" +import { EventBus } from "../events" +import { formatForEventStream } from "../cloud/buffered-event-stream" export type LoggerType = "quiet" | "basic" | "fancy" | "fullscreen" | "json" export const LOGGER_TYPES = new Set(["quiet", "basic", "fancy", "fullscreen", "json"]) @@ -44,6 +46,7 @@ export interface LoggerConfig { export class Logger extends LogNode { public writers: Writer[] + public events: EventBus public useEmoji: boolean private static instance: Logger @@ -102,6 +105,7 @@ export class Logger extends LogNode { super(config.level) this.writers = config.writers || [] this.useEmoji = config.useEmoji === false ? false : true + this.events = new EventBus() } protected createNode(params: CreateNodeParams): LogEntry { @@ -114,6 +118,9 @@ export class Logger extends LogNode { } onGraphChange(entry: LogEntry) { + if (entry.level <= EVENT_LOG_LEVEL) { + this.events.emit("logEntry", formatForEventStream(entry)) + } for (const writer of this.writers) { if (entry.level <= writer.level) { writer.onGraphChange(entry, this) diff --git a/garden-service/src/logger/writers/json-terminal-writer.ts b/garden-service/src/logger/writers/json-terminal-writer.ts index e2e3736e6e..063149751a 100644 --- a/garden-service/src/logger/writers/json-terminal-writer.ts +++ b/garden-service/src/logger/writers/json-terminal-writer.ts @@ -15,7 +15,6 @@ export interface JsonLogEntry { msg: string data?: any section?: string - durationMs?: number metadata?: LogEntryMetadata } diff --git a/garden-service/src/process.ts b/garden-service/src/process.ts index 8bad7b45de..2a6dcb614a 100644 --- a/garden-service/src/process.ts +++ b/garden-service/src/process.ts @@ -211,6 +211,7 @@ async function validateConfigChange( try { const nextGarden = await Garden.factory(garden.projectRoot, garden.opts) await nextGarden.getConfigGraph(log) + await nextGarden.close() } catch (error) { if (error instanceof ConfigurationError) { const msg = dedent` diff --git a/garden-service/test/helpers.ts b/garden-service/test/helpers.ts index 77e9165f34..d92b7ad98f 100644 --- a/garden-service/test/helpers.ts +++ b/garden-service/test/helpers.ts @@ -320,8 +320,8 @@ interface EventLogEntry { class TestEventBus extends EventBus { public eventLog: EventLogEntry[] - constructor(log: LogEntry) { - super(log) + constructor() { + super() this.eventLog = [] } @@ -342,7 +342,7 @@ export class TestGarden extends Garden { constructor(params: GardenParams) { super(params) - this.events = new TestEventBus(this.log) + this.events = new TestEventBus() } setModuleConfigs(moduleConfigs: ModuleConfig[]) { diff --git a/garden-service/test/unit/src/events.ts b/garden-service/test/unit/src/events.ts index 9dcce56645..44c631fcca 100644 --- a/garden-service/test/unit/src/events.ts +++ b/garden-service/test/unit/src/events.ts @@ -8,13 +8,12 @@ import { EventBus } from "../../../src/events" import { expect } from "chai" -import { getLogger } from "../../../src/logger/logger" describe("EventBus", () => { let events: EventBus beforeEach(() => { - events = new EventBus(getLogger().placeholder()) + events = new EventBus() }) it("should send+receive events", (done) => { diff --git a/garden-service/test/unit/src/logger/logger.ts b/garden-service/test/unit/src/logger/logger.ts index 9ccc19d80d..7d10a4e4f9 100644 --- a/garden-service/test/unit/src/logger/logger.ts +++ b/garden-service/test/unit/src/logger/logger.ts @@ -7,9 +7,11 @@ */ import { expect } from "chai" +import { omit } from "lodash" import { LogLevel } from "../../../../src/logger/log-node" import { getLogger } from "../../../../src/logger/logger" +import { LogEntryEvent, formatForEventStream } from "../../../../src/cloud/buffered-event-stream" const logger: any = getLogger() @@ -18,6 +20,37 @@ beforeEach(() => { }) describe("Logger", () => { + describe("events", () => { + let loggerEvents: LogEntryEvent[] = [] + let listener = (event: LogEntryEvent) => loggerEvents.push(event) + + before(() => logger.events.on("logEntry", listener)) + after(() => logger.events.off("logEntry", listener)) + + beforeEach(() => { + loggerEvents = [] + }) + + it("should emit a loggerEvent event when an entry is created", () => { + const log = logger.info({ msg: "0" }) + const e = loggerEvents[0] + expect(loggerEvents.length).to.eql(1) + expect(e.revision).to.eql(0) + expect(omit(formatForEventStream(log), "timestamp")).to.eql(omit(e, "timestamp")) + }) + + it("should emit a loggerEvent with a bumped revision when an entry is updated", () => { + const log = logger.info({ msg: "0" }) + log.setState("1") + logger.info({ msg: "0" }) + const [e1, e2, e3] = loggerEvents + expect(loggerEvents.length).to.eql(3) + expect(e1.revision).to.eql(0) + expect(e2.revision).to.eql(1) + expect(e3.revision).to.eql(0) + }) + }) + describe("findById", () => { it("should return the first log entry with a matching id and undefined otherwise", () => { logger.info({ msg: "0" }) diff --git a/garden-service/test/unit/src/platform/auth.ts b/garden-service/test/unit/src/platform/auth.ts index 7d7ebad338..a056eed511 100644 --- a/garden-service/test/unit/src/platform/auth.ts +++ b/garden-service/test/unit/src/platform/auth.ts @@ -10,9 +10,9 @@ import Bluebird from "bluebird" import { expect } from "chai" import { ClientAuthToken } from "../../../../src/db/entities/client-auth-token" import { makeTestGardenA } from "../../../helpers" -import { saveAuthToken, readAuthToken, clearAuthToken } from "../../../../src/platform/auth" +import { saveAuthToken, readAuthToken, clearAuthToken } from "../../../../src/cloud/auth" -async function cleanupAuthTokens() { +export async function cleanupAuthTokens() { await ClientAuthToken.createQueryBuilder() .delete() .execute() diff --git a/garden-service/test/unit/src/platform/buffered-event-stream.ts b/garden-service/test/unit/src/platform/buffered-event-stream.ts new file mode 100644 index 0000000000..0302c41a45 --- /dev/null +++ b/garden-service/test/unit/src/platform/buffered-event-stream.ts @@ -0,0 +1,75 @@ +/* + * Copyright (C) 2018-2020 Garden Technologies, Inc. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +import { expect } from "chai" +import { StreamEvent, LogEntryEvent, BufferedEventStream } from "../../../../src/cloud/buffered-event-stream" +import { getLogger } from "../../../../src/logger/logger" +import { EventBus } from "../../../../src/events" + +describe("BufferedEventStream", () => { + it("should flush events and log entries emitted by a connected event emitter", async () => { + const flushedEvents: StreamEvent[] = [] + const flushedLogEntries: LogEntryEvent[] = [] + + const log = getLogger().placeholder() + + const bufferedEventStream = new BufferedEventStream(log, "dummy-session-id") + + bufferedEventStream["flushEvents"] = (events: StreamEvent[]) => { + flushedEvents.push(...events) + } + bufferedEventStream["flushLogEntries"] = (logEntries: LogEntryEvent[]) => { + flushedLogEntries.push(...logEntries) + } + + const eventBus = new EventBus() + bufferedEventStream.connect(eventBus, "dummy-client-token", "dummy-platform_url", "myproject") + + eventBus.emit("_test", {}) + log.root.events.emit("_test", {}) + + bufferedEventStream.flushBuffered({ flushAll: true }) + + expect(flushedEvents.length).to.eql(1) + expect(flushedLogEntries.length).to.eql(1) + }) + + it("should only flush events or log entries emitted by the last connected event emitter", async () => { + const flushedEvents: StreamEvent[] = [] + const flushedLogEntries: LogEntryEvent[] = [] + + const log = getLogger().placeholder() + + const bufferedEventStream = new BufferedEventStream(log, "dummy-session-id") + + bufferedEventStream["flushEvents"] = (events: StreamEvent[]) => { + flushedEvents.push(...events) + } + bufferedEventStream["flushLogEntries"] = (logEntries: LogEntryEvent[]) => { + flushedLogEntries.push(...logEntries) + } + + const oldEventBus = new EventBus() + bufferedEventStream.connect(oldEventBus, "dummy-client-token", "dummy-platform_url", "myproject") + const newEventBus = new EventBus() + bufferedEventStream.connect(newEventBus, "dummy-client-token", "dummy-platform_url", "myproject") + + log.root.events.emit("_test", {}) + oldEventBus.emit("_test", {}) + + bufferedEventStream.flushBuffered({ flushAll: true }) + + expect(flushedEvents.length).to.eql(0) + expect(flushedLogEntries.length).to.eql(1) + + newEventBus.emit("_test", {}) + bufferedEventStream.flushBuffered({ flushAll: true }) + + expect(flushedEvents.length).to.eql(1) + }) +})