Skip to content

Commit

Permalink
feat(core): add event and log streaming
Browse files Browse the repository at this point in the history
* Added an event bus to Logger, which emits events when log entries are
created or updated.

* Added the `BufferedEventStream` class. This is used for batching and
streaming events from the Logger and the active Garden instance to the
platform when the user is logged in. Later, we can use this class for
streaming to the dashboard as well.
  • Loading branch information
thsig committed Apr 14, 2020
1 parent 5a44da5 commit ffc1094
Show file tree
Hide file tree
Showing 18 changed files with 379 additions and 28 deletions.
7 changes: 6 additions & 1 deletion garden-service/src/analytics/analytics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { Events, EventName } from "../events"
import { AnalyticsType } from "./analytics-types"
import dedent from "dedent"
import { getGitHubUrl } from "../docs/common"
import { InternalError } from "../exceptions"

const API_KEY = process.env.ANALYTICS_DEV ? SEGMENT_DEV_API_KEY : SEGMENT_PROD_API_KEY

Expand Down Expand Up @@ -134,14 +135,18 @@ export class AnalyticsHandler {
private ciName = ci.name
private systemConfig: SystemInfo
private isCI = ci.isCI
private sessionId = uuidv4()
private sessionId: string
protected garden: Garden
private projectMetadata: ProjectMetadata

private constructor(garden: Garden, log: LogEntry) {
if (!garden.sessionId) {
throw new InternalError(`Garden instance with null sessionId passed to AnalyticsHandler constructor.`, {})
}
this.segment = new segmentClient(API_KEY, { flushAt: 20, flushInterval: 300 })
this.log = log
this.garden = garden
this.sessionId = garden.sessionId
this.globalConfigStore = new GlobalConfigStore()
this.analyticsConfig = {
userId: "",
Expand Down
8 changes: 8 additions & 0 deletions garden-service/src/cli/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import { AnalyticsHandler } from "../analytics/analytics"
import { defaultDotIgnoreFiles } from "../util/fs"
import { renderError } from "../logger/renderers"
import { getDefaultProfiler } from "../util/profiling"
import { BufferedEventStream } from "../cloud/buffered-event-stream"

const OUTPUT_RENDERERS = {
json: (data: DeepPrimitiveMap) => {
Expand Down Expand Up @@ -309,7 +310,9 @@ export class GardenCli {
logger.info("")
const footerLog = logger.placeholder()

// Init event & log streaming.
const sessionId = uuidv4()
const bufferedEventStream = new BufferedEventStream(log, sessionId)

const contextOpts: GardenOpts = {
commandInfo: {
Expand Down Expand Up @@ -342,6 +345,11 @@ export class GardenCli {
} else {
garden = await Garden.factory(root, contextOpts)
}

if (garden.clientAuthToken && garden.platformUrl) {
bufferedEventStream.connect(garden.events, garden.clientAuthToken, garden.platformUrl, garden.projectName)
}

// Register log file writers. We need to do this after the Garden class is initialised because
// the file writers depend on the project root.
await this.initFileWriters(logger, garden.projectRoot, garden.gardenDirPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import { LogEntry } from "../logger/log-entry"
import { got } from "../util/http"
import { RuntimeError } from "../exceptions"

export const makeAuthHeader = (clientAuthToken: string) => ({ "x-access-auth-token": clientAuthToken })

// TODO: Add error handling and tests for all of this

/**
Expand Down Expand Up @@ -65,7 +67,7 @@ async function checkClientAuthToken(token: string, platformUrl: string, log: Log
await got({
method: "get",
url: `${platformUrl}/token/verify`,
headers: { "x-access-auth-token": token },
headers: makeAuthHeader(token),
})
valid = true
} catch (err) {
Expand Down
187 changes: 187 additions & 0 deletions garden-service/src/cloud/buffered-event-stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Copyright (C) 2018-2020 Garden Technologies, Inc. <[email protected]>
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

import { registerCleanupFunction } from "../util/util"
import { Events, EventName, EventBus, eventNames } from "../events"
import { LogEntryMetadata, LogEntry } from "../logger/log-entry"
import { chainMessages } from "../logger/renderers"
import { got } from "../util/http"
import { makeAuthHeader } from "./auth"

export type StreamEvent = {
name: EventName
payload: Events[EventName]
timestamp: Date
}

export interface LogEntryEvent {
key: string
parentKey: string | null
revision: number
msg: string | string[]
timestamp: Date
data?: any
section?: string
metadata?: LogEntryMetadata
}

export function formatForEventStream(entry: LogEntry): LogEntryEvent {
const { section, data } = entry.getMessageState()
const { key, revision } = entry
const parentKey = entry.parent ? entry.parent.key : null
const metadata = entry.getMetadata()
const msg = chainMessages(entry.getMessageStates() || [])
const timestamp = new Date()
return { key, parentKey, revision, msg, data, metadata, section, timestamp }
}

export const FLUSH_INTERVAL_MSEC = 1000
export const MAX_BATCH_SIZE = 100

/**
* Buffers events and log entries and periodically POSTs them to the platform.
*
* Subscribes to logger events once, in the constructor.
*
* Subscribes to Garden events via the connect method, since we need to subscribe to the event bus of
* new Garden instances (and unsubscribe from events from the previously connected Garden instance, if
* any) e.g. when config changes during a watch-mode command.
*/
export class BufferedEventStream {
private log: LogEntry
private eventBus: EventBus
public sessionId: string
private platformUrl: string
private clientAuthToken: string
private projectName: string

/**
* We maintain this map to facilitate unsubscribing from a previously connected event bus
* when a new event bus is connected.
*/
private gardenEventListeners: { [eventName: string]: (payload: any) => void }

private intervalId: NodeJS.Timer | null
private bufferedEvents: StreamEvent[]
private bufferedLogEntries: LogEntryEvent[]

constructor(log: LogEntry, sessionId: string) {
this.sessionId = sessionId
this.log = log
this.log.root.events.onAny((_name: string, payload: LogEntryEvent) => {
this.streamLogEntry(payload)
})
this.bufferedEvents = []
this.bufferedLogEntries = []
}

// TODO: Replace projectName with projectId once we've figured out the flow for that.
connect(eventBus: EventBus, clientAuthToken: string, platformUrl: string, projectName: string) {
this.clientAuthToken = clientAuthToken
this.platformUrl = platformUrl
this.projectName = projectName

if (!this.intervalId) {
this.startInterval()
}

if (this.eventBus) {
// We unsubscribe from the old event bus' events.
this.unsubscribeFromGardenEvents(this.eventBus)
}

this.eventBus = eventBus
this.subscribeToGardenEvents(this.eventBus)
}

subscribeToGardenEvents(eventBus: EventBus) {
// We maintain this map to facilitate unsubscribing from events when the Garden instance is closed.
const gardenEventListeners = {}
for (const gardenEventName of eventNames) {
const listener = (payload: LogEntryEvent) => this.streamEvent(gardenEventName, payload)
gardenEventListeners[gardenEventName] = listener
eventBus.on(gardenEventName, listener)
}
this.gardenEventListeners = gardenEventListeners
}

unsubscribeFromGardenEvents(eventBus: EventBus) {
for (const [gardenEventName, listener] of Object.entries(this.gardenEventListeners)) {
eventBus.removeListener(gardenEventName, listener)
}
}

startInterval() {
this.intervalId = setInterval(() => {
this.flushBuffered({ flushAll: false })
}, FLUSH_INTERVAL_MSEC)

registerCleanupFunction("flushAllBufferedEventsAndLogEntries", () => {
this.close()
})
}

close() {
if (this.intervalId) {
clearInterval(this.intervalId)
this.intervalId = null
}
this.flushBuffered({ flushAll: true })
}

streamEvent<T extends EventName>(name: T, payload: Events[T]) {
this.bufferedEvents.push({
name,
payload,
timestamp: new Date(),
})
}

streamLogEntry(logEntry: LogEntryEvent) {
this.bufferedLogEntries.push(logEntry)
}

flushEvents(events: StreamEvent[]) {
const data = {
events,
sessionId: this.sessionId,
projectName: this.projectName,
}
const headers = makeAuthHeader(this.clientAuthToken)
got.post(`${this.platformUrl}/events`, { json: data, headers }).catch((err) => {
this.log.error(err)
})
}

flushLogEntries(logEntries: LogEntryEvent[]) {
const data = {
logEntries,
sessionId: this.sessionId,
projectName: this.projectName,
}
const headers = makeAuthHeader(this.clientAuthToken)
got.post(`${this.platformUrl}/log-entries`, { json: data, headers }).catch((err) => {
this.log.error(err)
})
}

flushBuffered({ flushAll = false }) {
const eventsToFlush = this.bufferedEvents.splice(0, flushAll ? this.bufferedEvents.length : MAX_BATCH_SIZE)

if (eventsToFlush.length > 0) {
this.flushEvents(eventsToFlush)
}

const logEntryFlushCount = flushAll ? this.bufferedLogEntries.length : MAX_BATCH_SIZE - eventsToFlush.length
const logEntriesToFlush = this.bufferedLogEntries.splice(0, logEntryFlushCount)

if (logEntriesToFlush.length > 0) {
this.flushLogEntries(logEntriesToFlush)
}
}
}
2 changes: 1 addition & 1 deletion garden-service/src/commands/login.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import { Command, CommandParams, CommandResult } from "./base"
import { printHeader } from "../logger/util"
import dedent = require("dedent")
import { login } from "../platform/auth"
import { login } from "../cloud/auth"

export class LoginCommand extends Command {
name = "login"
Expand Down
2 changes: 1 addition & 1 deletion garden-service/src/commands/logout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import { Command, CommandParams, CommandResult } from "./base"
import { printHeader } from "../logger/util"
import dedent = require("dedent")
import { clearAuthToken } from "../platform/auth"
import { clearAuthToken } from "../cloud/auth"

export class LogOutCommand extends Command {
name = "logout"
Expand Down
1 change: 0 additions & 1 deletion garden-service/src/db/base-entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ export class GardenEntity extends BaseEntity {
/**
* Helper method to avoid circular import issues.
*/

static getConnection() {
return getConnection()
}
Expand Down
42 changes: 36 additions & 6 deletions garden-service/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
*/

import { EventEmitter2 } from "eventemitter2"
import { LogEntry } from "./logger/log-entry"
import { ModuleVersion } from "./vcs/vcs"
import { TaskResult } from "./task-graph"
import { LogEntryEvent } from "./cloud/buffered-event-stream"

/**
* This simple class serves as the central event bus for a Garden instance. Its function
Expand All @@ -18,7 +18,7 @@ import { TaskResult } from "./task-graph"
* See below for the event interfaces.
*/
export class EventBus extends EventEmitter2 {
constructor(private log: LogEntry) {
constructor() {
super({
wildcard: false,
newListener: false,
Expand All @@ -27,7 +27,6 @@ export class EventBus extends EventEmitter2 {
}

emit<T extends EventName>(name: T, payload: Events[T]) {
this.log.silly(`Emit event '${name}'`)
return super.emit(name, payload)
}

Expand All @@ -47,9 +46,19 @@ export class EventBus extends EventEmitter2 {
}

/**
* The supported events and their interfaces.
* Supported logger events and their interfaces.
*/
export type Events = {
export interface LoggerEvents {
_test: any
logEntry: LogEntryEvent
}

export type LoggerEventName = keyof LoggerEvents

/**
* Supported Garden events and their interfaces.
*/
export interface Events extends LoggerEvents {
// Internal test/control events
_exit: {}
_restart: {}
Expand Down Expand Up @@ -108,8 +117,29 @@ export type Events = {
taskGraphComplete: {
completedAt: Date
}

watchingForChanges: {}
}

export type EventName = keyof Events

// Note: Does not include logger events.
export const eventNames: EventName[] = [
"_exit",
"_restart",
"_test",
"configAdded",
"configRemoved",
"internalError",
"projectConfigChanged",
"moduleConfigChanged",
"moduleSourcesChanged",
"moduleRemoved",
"taskPending",
"taskProcessing",
"taskComplete",
"taskError",
"taskCancelled",
"taskGraphProcessing",
"taskGraphComplete",
"watchingForChanges",
]
5 changes: 3 additions & 2 deletions garden-service/src/garden.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ import { deline, naturalList } from "./util/string"
import { ensureConnected } from "./db/connection"
import { DependencyValidationGraph } from "./util/validate-dependencies"
import { Profile } from "./util/profiling"
import { readAuthToken, login } from "./platform/auth"
import { readAuthToken, login } from "./cloud/auth"
import { ResolveModuleTask, getResolvedModules } from "./tasks/resolve-module"
import username from "username"

Expand Down Expand Up @@ -220,7 +220,7 @@ export class Garden {
this.resolvedProviders = {}

this.taskGraph = new TaskGraph(this, this.log)
this.events = new EventBus(this.log)
this.events = new EventBus()

// Register plugins
for (const plugin of [...builtinPlugins, ...params.plugins]) {
Expand Down Expand Up @@ -336,6 +336,7 @@ export class Garden {
* Clean up before shutting down.
*/
async close() {
this.events.removeAllListeners()
this.watcher && (await this.watcher.stop())
}

Expand Down
Loading

0 comments on commit ffc1094

Please sign in to comment.