Skip to content

Commit

Permalink
refactor: move onSpanEndCallback logic out of batch processor
Browse files Browse the repository at this point in the history
  • Loading branch information
yousif-bugsnag committed Dec 20, 2024
1 parent c9a6313 commit 770b1e4
Show file tree
Hide file tree
Showing 11 changed files with 60 additions and 65 deletions.
41 changes: 3 additions & 38 deletions packages/core/lib/batch-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@ import type ProbabilityManager from './probability-manager'
import type { Processor } from './processor'
import type { RetryQueue } from './retry-queue'
import type { ReadonlySampler } from './sampler'
import type { Span, SpanEnded } from './span'
import type { SpanEnded } from './span'

import { millisecondsToNanoseconds } from './clock'
import { spanEndedToSpan } from './span'

export type OnSpanEndCallback = (span: Span) => boolean | Promise<boolean>
export type OnSpanEndCallbacks = OnSpanEndCallback[]
import { runSpanEndCallbacks } from './span'

type MinimalProbabilityManager = Pick<ProbabilityManager, 'setProbability' | 'ensureFreshProbability'>

Expand Down Expand Up @@ -114,37 +110,6 @@ export class BatchProcessor<C extends Configuration> implements Processor {
await this.flushQueue
}

async runCallbacks (span: Span): Promise<boolean> {
if (this.configuration.onSpanEnd) {
const callbackStartTime = performance.now()
let continueToBatch = true
for (const callback of this.configuration.onSpanEnd) {
try {
let result = callback(span)

// @ts-expect-error result may or may not be a promise
if (typeof result.then === 'function') {
result = await result
}

if (result === false) {
continueToBatch = false
break
}
} catch (err) {
this.configuration.logger.error('Error in onSpanEnd callback: ' + err)
}
}
if (continueToBatch) {
const duration = millisecondsToNanoseconds(performance.now() - callbackStartTime)
span.setAttribute('bugsnag.span.callbacks_duration', duration)
}
return continueToBatch
} else {
return true
}
}

private async prepareBatch (): Promise<SpanEnded[] | undefined> {
if (this.spans.length === 0) {
return
Expand All @@ -165,7 +130,7 @@ export class BatchProcessor<C extends Configuration> implements Processor {
if (this.sampler.sample(span)) {
// Run any callbacks that have been registered before batching
// as callbacks could cause the span to be discarded
const shouldAddToBatch = await this.runCallbacks(spanEndedToSpan(span))
const shouldAddToBatch = await runSpanEndCallbacks(span, this.configuration.logger, this.configuration.onSpanEnd)
if (shouldAddToBatch) batch.push(span)
}
}
Expand Down
5 changes: 4 additions & 1 deletion packages/core/lib/config.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import type { OnSpanEndCallbacks } from './batch-processor'
import {
ATTRIBUTE_ARRAY_LENGTH_LIMIT_DEFAULT,
ATTRIBUTE_COUNT_LIMIT_DEFAULT,
Expand All @@ -8,6 +7,7 @@ import {
ATTRIBUTE_STRING_VALUE_LIMIT_MAX
} from './custom-attribute-limits'
import type { Plugin } from './plugin'
import type { Span } from './span'
import { isLogger, isNumber, isObject, isOnSpanEndCallbacks, isPluginArray, isString, isStringArray, isStringWithLength } from './validation'

type SetTraceCorrelation = (traceId: string, spanId: string) => void
Expand Down Expand Up @@ -39,6 +39,9 @@ export interface Logger {
error: (message: string) => void
}

export type OnSpanEndCallback = (span: Span) => boolean | Promise<boolean>
export type OnSpanEndCallbacks = OnSpanEndCallback[]

export interface Configuration {
apiKey: string
endpoint?: string
Expand Down
7 changes: 1 addition & 6 deletions packages/core/lib/processor.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import type { InternalConfiguration, Configuration } from './config'
import type { Span, SpanEnded } from './span'
import type { SpanEnded } from './span'

// processor.add is called by a Span when 'Span.end' is called
// it can then add to a queue or send immediately
export interface Processor {
add: (span: SpanEnded) => void
runCallbacks: (span: Span) => Promise<boolean>
}

export interface ProcessorFactory<C extends Configuration> {
Expand All @@ -27,8 +26,4 @@ export class BufferingProcessor implements Processor {
add (span: SpanEnded): void {
this.spans.push(span)
}

async runCallbacks (span: Span) {
return true
}
}
8 changes: 5 additions & 3 deletions packages/core/lib/span-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import type { SpanAttribute, SpanAttributesLimits, SpanAttributesSource } from '
import { SpanAttributes } from './attributes'
import type { BackgroundingListener, BackgroundingListenerState } from './backgrounding-listener'
import type { Clock } from './clock'
import type { Configuration, InternalConfiguration, Logger } from './config'
import type { Configuration, InternalConfiguration, Logger, OnSpanEndCallbacks } from './config'
import { defaultSpanAttributeLimits } from './custom-attribute-limits'
import type { IdGenerator } from './id-generator'
import type { NetworkSpanOptions } from './network-span'
Expand All @@ -21,14 +21,15 @@ export type SpanFactoryConstructor<C extends Configuration> = new (
) => InstanceType<typeof SpanFactory<C>>

export class SpanFactory<C extends Configuration> {
protected processor: Processor
private processor: Processor
readonly sampler: ReadonlySampler
private readonly idGenerator: IdGenerator
private readonly spanAttributesSource: SpanAttributesSource<C>
protected readonly clock: Clock
private readonly spanContextStorage: SpanContextStorage
private logger: Logger
protected logger: Logger
private spanAttributeLimits: SpanAttributesLimits = defaultSpanAttributeLimits
protected onSpanEndCallbacks?: OnSpanEndCallbacks

private openSpans: WeakSet<SpanInternal> = new WeakSet<SpanInternal>()
private isInForeground: boolean = true
Expand Down Expand Up @@ -124,6 +125,7 @@ export class SpanFactory<C extends Configuration> {
attributeCountLimit: configuration.attributeCountLimit,
attributeStringValueLimit: configuration.attributeStringValueLimit
}
this.onSpanEndCallbacks = configuration.onSpanEnd
}

endSpan (
Expand Down
32 changes: 32 additions & 0 deletions packages/core/lib/span.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import type { SpanAttribute, SpanAttributes } from './attributes'
import { millisecondsToNanoseconds } from './clock'
import type { Clock } from './clock'
import type { Logger, OnSpanEndCallbacks } from './config'
import type { DeliverySpan } from './delivery'
import { SpanEvents } from './events'
import type { SpanContext } from './span-context'
Expand Down Expand Up @@ -86,6 +88,36 @@ export function spanEndedToSpan (span: SpanEnded): Span {
}
}

export async function runSpanEndCallbacks (spanEnded: SpanEnded, logger: Logger, callbacks?: OnSpanEndCallbacks) {
if (!callbacks) return true

const span = spanEndedToSpan(spanEnded)
const callbackStartTime = performance.now()
let shouldSample = true
for (const callback of callbacks) {
try {
let result = callback(span)

// @ts-expect-error result may or may not be a promise
if (typeof result.then === 'function') {
result = await result
}

if (result === false) {
shouldSample = false
break
}
} catch (err) {
logger.error('Error in onSpanEnd callback: ' + err)
}
}
if (shouldSample) {
const duration = millisecondsToNanoseconds(performance.now() - callbackStartTime)
span.setAttribute('bugsnag.span.callbacks_duration', duration)
}
return shouldSample
}

export class SpanInternal implements SpanContext {
readonly id: string
readonly traceId: string
Expand Down
3 changes: 1 addition & 2 deletions packages/core/lib/validation.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type { OnSpanEndCallback } from './batch-processor'
import type { Configuration, Logger } from './config'
import type { Configuration, Logger, OnSpanEndCallback } from './config'
import type { PersistedProbability } from './persistence'
import type { Plugin } from './plugin'
import type { ParentContext } from './span'
Expand Down
8 changes: 4 additions & 4 deletions packages/core/tests/span-factory.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ describe('SpanFactory', () => {
const clock = new IncrementingClock('1970-01-01T00:00:00.000Z')
const sampler = new Sampler(0.5)
const delivery = { send: jest.fn() }
const processor = { add: (span: SpanEnded) => delivery.send(spanToJson(span, clock)), runCallbacks: async () => true }
const processor = { add: (span: SpanEnded) => delivery.send(spanToJson(span, clock)) }
const backgroundingListener = new ControllableBackgroundingListener()
const spanFactory = new SpanFactory(
processor,
Expand All @@ -169,7 +169,7 @@ describe('SpanFactory', () => {
const clock = new IncrementingClock('1970-01-01T00:00:00.000Z')
const sampler = new Sampler(0.5)
const delivery = { send: jest.fn() }
const processor = { add: (span: SpanEnded) => delivery.send(spanToJson(span, clock)), runCallbacks: async () => true }
const processor = { add: (span: SpanEnded) => delivery.send(spanToJson(span, clock)) }
const backgroundingListener = new ControllableBackgroundingListener()
const spanFactory = new SpanFactory(
processor,
Expand All @@ -193,7 +193,7 @@ describe('SpanFactory', () => {
const clock = new IncrementingClock('1970-01-01T00:00:00.000Z')
const sampler = new Sampler(0.5)
const delivery = { send: jest.fn() }
const processor = { add: (span: SpanEnded) => delivery.send(spanToJson(span, clock)), runCallbacks: async () => true }
const processor = { add: (span: SpanEnded) => delivery.send(spanToJson(span, clock)) }
const backgroundingListener = new ControllableBackgroundingListener()
const spanFactory = new SpanFactory(
processor,
Expand All @@ -217,7 +217,7 @@ describe('SpanFactory', () => {
const clock = new IncrementingClock('1970-01-01T00:00:00.000Z')
const sampler = new Sampler(0.5)
const delivery = { send: jest.fn() }
const processor = { add: (span: SpanEnded) => delivery.send(spanToJson(span, clock)), runCallbacks: async () => true }
const processor = { add: (span: SpanEnded) => delivery.send(spanToJson(span, clock)) }
const backgroundingListener = new ControllableBackgroundingListener()
const spanFactory = new SpanFactory(
processor,
Expand Down
4 changes: 2 additions & 2 deletions packages/core/tests/span.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ describe('SpanInternal', () => {
const clock = new IncrementingClock()
const sampler = new Sampler(0.5)
const delivery = { send: jest.fn() }
const processor = { add: (span: SpanEnded) => delivery.send(spanToJson(span, clock)), runCallbacks: async () => true }
const processor = { add: (span: SpanEnded) => delivery.send(spanToJson(span, clock)) }
const backgroundingListener = new ControllableBackgroundingListener()
const spanFactory = new SpanFactory(
processor,
Expand Down Expand Up @@ -80,7 +80,7 @@ describe('SpanInternal', () => {
const clock = new IncrementingClock('1970-01-01T00:00:00.000Z')
const sampler = new Sampler(0.5)
const delivery = { send: jest.fn() }
const processor = { add: (span: SpanEnded) => delivery.send(spanToJson(span, clock)), runCallbacks: async () => true }
const processor = { add: (span: SpanEnded) => delivery.send(spanToJson(span, clock)) }
const backgroundingListener = new ControllableBackgroundingListener()
const spanFactory = new SpanFactory(
processor,
Expand Down
4 changes: 2 additions & 2 deletions packages/platforms/react-native/lib/span-factory.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { spanEndedToSpan, SpanFactory, SpanInternal } from '@bugsnag/core-performance'
import { runSpanEndCallbacks, SpanFactory, SpanInternal } from '@bugsnag/core-performance'
import type { SpanAttributes, ParentContext } from '@bugsnag/core-performance'
import type { ReactNativeConfiguration } from './config'
import type { NativeSettings } from './NativeBugsnagPerformance'
Expand Down Expand Up @@ -41,7 +41,7 @@ export class ReactNativeSpanFactory extends SpanFactory<ReactNativeConfiguration

private async processNativeSpan (span: NativeSpanInternal, endTime: number) {
const spanEnded = span.end(endTime, this.sampler.spanProbability)
const shouldSend = await this.processor.runCallbacks(spanEndedToSpan(spanEnded))
const shouldSend = await runSpanEndCallbacks(spanEnded, this.logger, this.onSpanEndCallbacks)

if (shouldSend) {
const unixEndTimeNanos = (this.clock as ReactNativeClock).toUnixNanoseconds(endTime)
Expand Down
9 changes: 6 additions & 3 deletions packages/platforms/react-native/tests/span-factory.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ import { ReactNativeSpanFactory } from '../lib/span-factory'
import NativeBugsnagPerformance from '../lib/native'
import { ControllableBackgroundingListener, InMemoryProcessor, spanAttributesSource, StableIdGenerator } from '@bugsnag/js-performance-test-utilities'
import { DefaultSpanContextStorage, Sampler } from '@bugsnag/core-performance'
import type { InternalConfiguration } from '@bugsnag/core-performance'
import createClock from '../lib/clock'
import type { ReactNativeClock } from '../lib/clock'
import type { ReactNativeConfiguration } from '../lib/config'

let clock: ReactNativeClock
let spanFactory: ReactNativeSpanFactory
Expand Down Expand Up @@ -149,10 +151,11 @@ describe('ReactNativeSpanFactory', () => {
const nativeSettings = NativeBugsnagPerformance!.initialise()
spanFactory.attach(nativeSettings)

const runCalbacksSpy = jest.spyOn(processor, 'runCallbacks').mockImplementation((span) => {
const onSpanEndCallback = jest.fn((span) => {
return Promise.resolve(span.name === 'should send')
})

spanFactory.configure(processor, { logger: jestLogger, onSpanEnd: [onSpanEndCallback] } as unknown as InternalConfiguration<ReactNativeConfiguration>)
const startTime = clock.now()
const validSpan = spanFactory.startSpan('should send', { startTime, isFirstClass: true })
expect(NativeBugsnagPerformance!.startNativeSpan).toHaveBeenCalledWith('should send', expect.objectContaining({ startTime: clock.toUnixNanoseconds(startTime) }))
Expand All @@ -166,14 +169,14 @@ describe('ReactNativeSpanFactory', () => {
spanFactory.endSpan(invalidSpan, endTime)
await jest.runOnlyPendingTimersAsync()

expect(runCalbacksSpy).toHaveBeenCalledTimes(1)
expect(onSpanEndCallback).toHaveBeenCalledTimes(1)
expect(NativeBugsnagPerformance!.endNativeSpan).not.toHaveBeenCalled()
expect(NativeBugsnagPerformance!.discardNativeSpan).toHaveBeenCalledTimes(1)

spanFactory.endSpan(validSpan, endTime)
await jest.runOnlyPendingTimersAsync()

expect(runCalbacksSpy).toHaveBeenCalledTimes(2)
expect(onSpanEndCallback).toHaveBeenCalledTimes(2)
expect(NativeBugsnagPerformance!.endNativeSpan).toHaveBeenCalledTimes(1)
expect(NativeBugsnagPerformance!.discardNativeSpan).toHaveBeenCalledTimes(1)

Expand Down
4 changes: 0 additions & 4 deletions packages/test-utilities/lib/in-memory-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@ class InMemoryProcessor implements Processor {
}

configure (): void {}

async runCallbacks (span: Span) {
return true
}
}

export default InMemoryProcessor

0 comments on commit 770b1e4

Please sign in to comment.