Skip to content

Commit

Permalink
improvement(enterprise): include message metadata with log entries
Browse files Browse the repository at this point in the history
  • Loading branch information
eysi09 authored and thsig committed Jan 7, 2021
1 parent 2eaac5f commit e9710e5
Show file tree
Hide file tree
Showing 23 changed files with 186 additions and 160 deletions.
40 changes: 27 additions & 13 deletions core/src/enterprise/buffered-event-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
*/

import Bluebird from "bluebird"
import { omit } from "lodash"

import { Events, EventName, EventBus, eventNames } from "../events"
import { LogEntryMetadata, LogEntry } from "../logger/log-entry"
import { LogEntryMetadata, LogEntry, LogEntryMessage } from "../logger/log-entry"
import { chainMessages } from "../logger/renderers"
import { got } from "../util/http"
import { makeAuthHeader } from "./auth"
Expand All @@ -21,26 +23,38 @@ export type StreamEvent = {
timestamp: Date
}

export interface LogEntryEvent {
// TODO: Remove data, section, timestamp and msg once we've updated GE (it's included in the message)
export interface LogEntryEventPayload {
key: string
parentKey: string | null
revision: number
msg: string | string[]
timestamp: Date
level: LogLevel
message: Omit<LogEntryMessage, "timestamp">
data?: any
section?: string
metadata?: LogEntryMetadata
}

export function formatLogEntryForEventStream(entry: LogEntry): LogEntryEvent {
const { section, data } = entry.getMessageState()
export function formatLogEntryForEventStream(entry: LogEntry): LogEntryEventPayload {
const message = entry.getLatestMessage()
const { key, revision, level } = entry
const parentKey = entry.parent ? entry.parent.key : null
const metadata = entry.getMetadata()
const msg = chainMessages(entry.getMessageStates() || [])
const timestamp = new Date()
return { key, parentKey, revision, msg, data, metadata, section, timestamp, level }
const msg = chainMessages(entry.getMessages() || [])
return {
key,
parentKey,
revision,
msg,
data: message.data,
metadata,
section: message.section,
timestamp: message.timestamp,
level,
message: omit(message, "timestamp"),
}
}

interface StreamTarget {
Expand Down Expand Up @@ -70,7 +84,7 @@ export interface ApiEventBatch extends ApiBatchBase {
}

export interface ApiLogBatch extends ApiBatchBase {
logEntries: LogEntryEvent[]
logEntries: LogEntryEventPayload[]
}

export const controlEventNames: Set<EventName> = new Set(["_workflowRunRegistered"])
Expand Down Expand Up @@ -103,7 +117,7 @@ export class BufferedEventStream {

private intervalId: NodeJS.Timer | null
private bufferedEvents: StreamEvent[]
private bufferedLogEntries: LogEntryEvent[]
private bufferedLogEntries: LogEntryEventPayload[]
protected intervalMsec = 1000

/**
Expand All @@ -116,7 +130,7 @@ export class BufferedEventStream {
constructor(log: LogEntry, sessionId: string) {
this.sessionId = sessionId
this.log = log
this.log.root.events.onAny((_name: string, payload: LogEntryEvent) => {
this.log.root.events.onAny((_name: string, payload: LogEntryEventPayload) => {
this.streamLogEntry(payload)
})
this.bufferedEvents = []
Expand Down Expand Up @@ -153,7 +167,7 @@ export class BufferedEventStream {
// We maintain this map to facilitate unsubscribing from events when the Garden instance is closed.
const gardenEventListeners = {}
for (const gardenEventName of eventNames) {
const listener = (payload: LogEntryEvent) => this.streamEvent(gardenEventName, payload)
const listener = (payload: LogEntryEventPayload) => this.streamEvent(gardenEventName, payload)
gardenEventListeners[gardenEventName] = listener
eventBus.on(gardenEventName, listener)
}
Expand Down Expand Up @@ -205,7 +219,7 @@ export class BufferedEventStream {
}
}

streamLogEntry(logEntry: LogEntryEvent) {
streamLogEntry(logEntry: LogEntryEventPayload) {
if (this.streamLogEntries) {
this.bufferedLogEntries.push(logEntry)
}
Expand All @@ -232,7 +246,7 @@ export class BufferedEventStream {
await this.postToTargets(`${events.length} events`, "events", data)
}

async flushLogEntries(logEntries: LogEntryEvent[]) {
async flushLogEntries(logEntries: LogEntryEventPayload[]) {
if (logEntries.length === 0 || !this.garden) {
return
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import { omit } from "lodash"
import { EventEmitter2 } from "eventemitter2"
import { GraphResult } from "./task-graph"
import { LogEntryEvent } from "./enterprise/buffered-event-stream"
import { LogEntryEventPayload } from "./enterprise/buffered-event-stream"
import { ServiceStatus } from "./types/service"
import { RunStatus } from "./types/plugin/base"
import { Omit } from "./util/util"
Expand Down Expand Up @@ -55,7 +55,7 @@ export class EventBus extends EventEmitter2 {
*/
export interface LoggerEvents {
_test: any
logEntry: LogEntryEvent
logEntry: LogEntryEventPayload
}

export type LoggerEventName = keyof LoggerEvents
Expand Down
70 changes: 34 additions & 36 deletions core/src/logger/log-entry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ interface MessageBase {
maxSectionWidth?: number
}

export interface MessageState extends MessageBase {
timestamp: number
export interface LogEntryMessage extends MessageBase {
timestamp: Date
}

export interface UpdateLogEntryParams extends MessageBase {
Expand All @@ -63,8 +63,6 @@ export interface UpdateLogEntryParams extends MessageBase {

export interface LogEntryParams extends UpdateLogEntryParams {
error?: GardenError
data?: any // to be rendered as e.g. YAML or JSON
dataFormat?: "json" | "yaml" // how to render the data object
indent?: number
childEntriesInheritLevel?: boolean
fromStdStream?: boolean
Expand All @@ -89,7 +87,7 @@ function resolveParams(params?: string | UpdateLogEntryParams): UpdateLogEntryPa
}

export class LogEntry extends LogNode {
private messageStates?: MessageState[]
private messages?: LogEntryMessage[]
private metadata?: LogEntryMetadata
public readonly root: Logger
public readonly fromStdStream?: boolean
Expand Down Expand Up @@ -130,38 +128,38 @@ export class LogEntry extends LogNode {
/**
* Updates the log entry with a few invariants:
* 1. msg, emoji, section, status, and symbol can only be replaced with a value of same type, not removed
* 2. append is always set explicitly (the next message state does not inherit the previous value)
* 2. append is always set explicitly (the next message does not inherit the previous value)
* 3. next metadata is merged with the previous metadata
*/
protected update(updateParams: UpdateLogEntryParams): void {
this.revision = this.revision + 1
const messageState = this.getMessageState()
const latestMessage = this.getLatestMessage()

// Explicitly set all the fields so the shape stays consistent
const nextMessageState: MessageState = {
const nextMessage: LogEntryMessage = {
// Ensure empty string gets set
msg: typeof updateParams.msg === "string" ? updateParams.msg : messageState.msg,
emoji: updateParams.emoji || messageState.emoji,
section: updateParams.section || messageState.section,
status: updateParams.status || messageState.status,
symbol: updateParams.symbol || messageState.symbol,
data: updateParams.data || messageState.data,
dataFormat: updateParams.dataFormat || messageState.dataFormat,
// Next state does not inherit the append field
msg: typeof updateParams.msg === "string" ? updateParams.msg : latestMessage.msg,
emoji: updateParams.emoji || latestMessage.emoji,
section: updateParams.section || latestMessage.section,
status: updateParams.status || latestMessage.status,
symbol: updateParams.symbol || latestMessage.symbol,
data: updateParams.data || latestMessage.data,
dataFormat: updateParams.dataFormat || latestMessage.dataFormat,
// Next message does not inherit the append field
append: updateParams.append,
timestamp: Date.now(),
timestamp: new Date(),
maxSectionWidth:
updateParams.maxSectionWidth !== undefined ? updateParams.maxSectionWidth : messageState.maxSectionWidth,
updateParams.maxSectionWidth !== undefined ? updateParams.maxSectionWidth : latestMessage.maxSectionWidth,
}

// Hack to preserve section alignment if spinner disappears
const hadSpinner = messageState.status === "active"
const hasSymbolOrSpinner = nextMessageState.symbol || nextMessageState.status === "active"
if (nextMessageState.section && hadSpinner && !hasSymbolOrSpinner) {
nextMessageState.symbol = "empty"
const hadSpinner = latestMessage.status === "active"
const hasSymbolOrSpinner = nextMessage.symbol || nextMessage.status === "active"
if (nextMessage.section && hadSpinner && !hasSymbolOrSpinner) {
nextMessage.symbol = "empty"
}

this.messageStates = [...(this.messageStates || []), nextMessageState]
this.messages = [...(this.messages || []), nextMessage]

if (updateParams.metadata) {
this.metadata = { ...(this.metadata || {}), ...updateParams.metadata }
Expand All @@ -170,14 +168,14 @@ export class LogEntry extends LogNode {

// Update node and child nodes
private deepUpdate(updateParams: UpdateLogEntryParams): void {
const wasActive = this.getMessageState().status === "active"
const wasActive = this.getLatestMessage().status === "active"

this.update(updateParams)

// Stop active child nodes if no longer active
if (wasActive && updateParams.status !== "active") {
getChildEntries(this).forEach((entry) => {
if (entry.getMessageState().status === "active") {
if (entry.getLatestMessage().status === "active") {
entry.update({ status: "done" })
}
})
Expand Down Expand Up @@ -210,23 +208,23 @@ export class LogEntry extends LogNode {
return this.metadata
}

getMessageStates() {
return this.messageStates
getMessages() {
return this.messages
}

/**
* Returns a deep copy of the latest message state, if availble.
* Otherwise return an empty object of type MessageState for convenience.
* Returns a deep copy of the latest message, if availble.
* Otherwise returns an empty object of type LogEntryMessage for convenience.
*/
getMessageState() {
if (!this.messageStates) {
return <MessageState>{}
getLatestMessage() {
if (!this.messages) {
return <LogEntryMessage>{}
}

// Use spread operator to clone the array
const msgState = [...this.messageStates][this.messageStates.length - 1]
const message = [...this.messages][this.messages.length - 1]
// ...and the object itself
return { ...msgState }
return { ...message }
}

placeholder({
Expand Down Expand Up @@ -296,7 +294,7 @@ export class LogEntry extends LogNode {

stop() {
// Stop gracefully if still in active state
if (this.getMessageState().status === "active") {
if (this.getLatestMessage().status === "active") {
this.update({ symbol: "empty", status: "done" })
this.onGraphChange(this)
}
Expand All @@ -316,7 +314,7 @@ export class LogEntry extends LogNode {
toString(filter?: (log: LogEntry) => boolean) {
return this.getChildEntries()
.filter((entry) => (filter ? filter(entry) : true))
.flatMap((entry) => entry.getMessageStates()?.map((state) => state.msg))
.flatMap((entry) => entry.getMessages()?.map((message) => message.msg))
.join("\n")
}
}
6 changes: 3 additions & 3 deletions core/src/logger/log-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ export function resolveParams(level: LogLevel, params: string | LogEntryParams):
}

export abstract class LogNode {
public readonly timestamp: number
public readonly timestamp: Date
public readonly key: string
public readonly children: LogEntry[]

constructor(public readonly level: LogLevel, public readonly parent?: LogEntry, public readonly id?: string) {
this.key = uniqid()
this.timestamp = Date.now()
this.timestamp = new Date()
this.children = []
}

Expand Down Expand Up @@ -94,6 +94,6 @@ export abstract class LogNode {
* Returns the duration in seconds, defaults to 2 decimal precision
*/
getDuration(precision: number = 2): number {
return round((Date.now() - this.timestamp) / 1000, precision)
return round((new Date().getTime() - this.timestamp.getTime()) / 1000, precision)
}
}
2 changes: 1 addition & 1 deletion core/src/logger/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ export class Logger extends LogNode {
}

filterBySection(section: string): LogEntry[] {
return getChildEntries(this).filter((entry) => entry.getMessageState().section === section)
return getChildEntries(this).filter((entry) => entry.getLatestMessage().section === section)
}

findById(id: string): LogEntry | void {
Expand Down
Loading

0 comments on commit e9710e5

Please sign in to comment.