Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

♻️ [RUMF-1533] extract the Flush logic into a reusable component #2144

Merged
merged 7 commits into from
Apr 11, 2023
2 changes: 1 addition & 1 deletion packages/core/src/browser/pageExitObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export const PageExitReason = {
FROZEN: 'page_frozen',
} as const

type PageExitReason = (typeof PageExitReason)[keyof typeof PageExitReason]
export type PageExitReason = (typeof PageExitReason)[keyof typeof PageExitReason]

export interface PageExitEvent {
reason: PageExitReason
Expand Down
5 changes: 3 additions & 2 deletions packages/core/src/domain/configuration/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { catchUserErrors } from '../../tools/catchUserErrors'
import { display } from '../../tools/display'
import type { RawTelemetryConfiguration } from '../telemetry'
import { ExperimentalFeature, addExperimentalFeatures } from '../../tools/experimentalFeatures'
import type { Duration } from '../../tools/utils/timeUtils'
import { ONE_SECOND } from '../../tools/utils/timeUtils'
import { isPercentage } from '../../tools/utils/numberUtils'
import { ONE_KIBI_BYTE } from '../../tools/utils/byteUtils'
Expand Down Expand Up @@ -85,7 +86,7 @@ export interface Configuration extends TransportConfiguration {

// Batch configuration
batchBytesLimit: number
flushTimeout: number
flushTimeout: Duration
batchMessagesLimit: number
messageBytesLimit: number
}
Expand Down Expand Up @@ -148,7 +149,7 @@ export function validateAndBuildConfiguration(initConfiguration: InitConfigurati
* flush automatically, aim to be lower than ALB connection timeout
* to maximize connection reuse.
*/
flushTimeout: 30 * ONE_SECOND,
flushTimeout: (30 * ONE_SECOND) as Duration,

/**
* Logs intake limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ describe('endpointBuilder', () => {

it('should contain retry infos', () => {
expect(
createEndpointBuilder(initConfiguration, 'rum', []).build('xhr', 'batch_bytes_limit', {
createEndpointBuilder(initConfiguration, 'rum', []).build('xhr', 'bytes_limit', {
count: 5,
lastFailureStatus: 408,
})
Expand All @@ -131,13 +131,13 @@ describe('endpointBuilder', () => {

it('should contain flush reason when ff collect_flush_reason is enabled', () => {
addExperimentalFeatures([ExperimentalFeature.COLLECT_FLUSH_REASON])
expect(createEndpointBuilder(initConfiguration, 'rum', []).build('xhr', 'batch_bytes_limit')).toContain(
'flush_reason%3Abatch_bytes_limit'
expect(createEndpointBuilder(initConfiguration, 'rum', []).build('xhr', 'bytes_limit')).toContain(
'flush_reason%3Abytes_limit'
)
})

it('should not contain flush reason when ff collect_flush_reason is disnabled', () => {
expect(createEndpointBuilder(initConfiguration, 'rum', []).build('xhr', 'batch_bytes_limit')).not.toContain(
expect(createEndpointBuilder(initConfiguration, 'rum', []).build('xhr', 'bytes_limit')).not.toContain(
'flush_reason'
)
})
Expand Down
4 changes: 3 additions & 1 deletion packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ export {
Payload,
createHttpRequest,
Batch,
BatchFlushEvent,
canUseEventBridge,
getEventBridge,
startBatchWithReplica,
createFlushController,
FlushEvent,
FlushReason,
} from './transport'
export * from './tools/display'
export * from './tools/utils/urlPolyfill'
Expand Down
205 changes: 69 additions & 136 deletions packages/core/src/transport/batch.spec.ts
Original file line number Diff line number Diff line change
@@ -1,150 +1,95 @@
import sinon from 'sinon'
import type { PageExitEvent } from '../browser/pageExitObservable'
import { PageExitReason } from '../browser/pageExitObservable'
import { Observable } from '../tools/observable'
import { noop } from '../tools/utils/functionUtils'
import type { FlushReason } from './batch'
import type { MockFlushController } from '../../test'
import { createMockFlushController } from '../../test'
import { display } from '../tools/display'
import { Batch } from './batch'
import type { FlushReason } from './flushController'
import type { HttpRequest } from './httpRequest'

describe('batch', () => {
const BATCH_MESSAGES_LIMIT = 3
const BATCH_BYTES_LIMIT = 100
const MESSAGE_BYTES_LIMIT = 50 * 1024
const FLUSH_TIMEOUT = 60 * 1000
let batch: Batch
let transport: HttpRequest
let sendSpy: jasmine.Spy<HttpRequest['send']>
let pageExitObservable: Observable<PageExitEvent>
let flushNotifySpy: jasmine.Spy
const flushReason: FlushReason = 'batch_bytes_limit'

beforeEach(() => {
transport = { send: noop } as unknown as HttpRequest
sendSpy = spyOn(transport, 'send')
pageExitObservable = new Observable()
batch = new Batch(
transport,
BATCH_MESSAGES_LIMIT,
BATCH_BYTES_LIMIT,
MESSAGE_BYTES_LIMIT,
FLUSH_TIMEOUT,
pageExitObservable
)
flushNotifySpy = spyOn(batch.flushObservable, 'notify')
})
const MESSAGE_BYTES_LIMIT = 50

it('should add context to message', () => {
batch.add({ message: 'hello' })
const BIG_MESSAGE_OVER_BYTES_LIMIT = { message: 'hello xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' }
const SMALL_MESSAGE = { message: 'hello' }
const SMALL_MESSAGE_BYTES_COUNT = 19
const SEPARATOR_BYTES_COUNT = 1

batch.flush(flushReason)

expect(sendSpy.calls.mostRecent().args[0]).toEqual({
data: '{"message":"hello"}',
bytesCount: jasmine.any(Number),
flushReason,
})
})

it('should empty the batch after a flush', () => {
batch.add({ message: 'hello' })

batch.flush(flushReason)
sendSpy.calls.reset()
batch.flush(flushReason)
let batch: Batch
let transport: {
send: jasmine.Spy<HttpRequest['send']>
sendOnExit: jasmine.Spy<HttpRequest['sendOnExit']>
}

expect(transport.send).not.toHaveBeenCalled()
})
const flushReason: FlushReason = 'bytes_limit'
let flushController: MockFlushController

it('should flush when the message count limit is reached', () => {
batch.add({ message: '1' })
batch.add({ message: '2' })
batch.add({ message: '3' })
expect(sendSpy.calls.mostRecent().args[0]).toEqual({
data: '{"message":"1"}\n{"message":"2"}\n{"message":"3"}',
bytesCount: jasmine.any(Number),
flushReason,
})
beforeEach(() => {
transport = {
send: jasmine.createSpy(),
sendOnExit: jasmine.createSpy(),
} satisfies HttpRequest
flushController = createMockFlushController()
batch = new Batch(transport, flushController, MESSAGE_BYTES_LIMIT)
})

it('should flush when a new message will overflow the bytes limit', () => {
batch.add({ message: '50 bytes - xxxxxxxxxxxxxxxxxxxxxxxxx' })
expect(sendSpy).not.toHaveBeenCalled()
it('should send a message', () => {
batch.add(SMALL_MESSAGE)

batch.add({ message: '60 bytes - xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' })
expect(sendSpy).toHaveBeenCalledWith({
data: '{"message":"50 bytes - xxxxxxxxxxxxxxxxxxxxxxxxx"}',
bytesCount: 50,
flushReason,
})
flushController.notifyFlush()

batch.flush(flushReason)
expect(sendSpy).toHaveBeenCalledWith({
data: '{"message":"60 bytes - xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"}',
bytesCount: 60,
expect(transport.send.calls.mostRecent().args[0]).toEqual({
data: '{"message":"hello"}',
bytesCount: SMALL_MESSAGE_BYTES_COUNT,
flushReason,
})
})

it('should consider separators when computing the byte count', () => {
batch.add({ message: '30 bytes - xxxxx' }) // batch: 30 sep: 0
batch.add({ message: '30 bytes - xxxxx' }) // batch: 60 sep: 1
batch.add({ message: '39 bytes - xxxxxxxxxxxxxx' }) // batch: 99 sep: 2

expect(sendSpy).toHaveBeenCalledWith({
data: '{"message":"30 bytes - xxxxx"}\n{"message":"30 bytes - xxxxx"}',
bytesCount: 61,
flushReason,
})
})
it('should add message to the flush controller', () => {
batch.add(SMALL_MESSAGE)

it('should call send one time when the byte count is too high and the batch is empty', () => {
const message = '101 bytes - xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
batch.add({ message })
expect(sendSpy).toHaveBeenCalledWith({ data: `{"message":"${message}"}`, bytesCount: 101, flushReason })
expect(flushController.willAddMessage).toHaveBeenCalledOnceWith(SMALL_MESSAGE_BYTES_COUNT)
expect(flushController.didAddMessage).toHaveBeenCalledOnceWith()
})

it('should flush the batch and send the message when the message is too heavy', () => {
const message = '101 bytes - xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
it('should consider separators when adding message', () => {
batch.add(SMALL_MESSAGE)
batch.add(SMALL_MESSAGE)
batch.add(SMALL_MESSAGE)

batch.add({ message: '50 bytes - xxxxxxxxxxxxxxxxxxxxxxxxx' })
batch.add({ message })
expect(sendSpy).toHaveBeenCalledTimes(2)
expect(flushController.willAddMessage.calls.allArgs()).toEqual([
[SMALL_MESSAGE_BYTES_COUNT],
[SMALL_MESSAGE_BYTES_COUNT + SEPARATOR_BYTES_COUNT],
[SMALL_MESSAGE_BYTES_COUNT + SEPARATOR_BYTES_COUNT],
])
})

it('should flush after timeout', () => {
const clock = sinon.useFakeTimers()
batch = new Batch(transport, BATCH_MESSAGES_LIMIT, BATCH_BYTES_LIMIT, MESSAGE_BYTES_LIMIT, 10, pageExitObservable)
batch.add({ message: '50 bytes - xxxxxxxxxxxxxxxxxxxxxxxxx' })
clock.tick(100)
it('should consider separators when replacing messages', () => {
batch.add(SMALL_MESSAGE)
batch.upsert(SMALL_MESSAGE, 'a')

expect(sendSpy).toHaveBeenCalled()
flushController.willAddMessage.calls.reset()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ question: ‏Why do you need this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to call reset to reset the number of calls after the test setup, so I can use expect().toHaveBeenCalledOnceWith(...) (only the last call is relevent to this test case)


clock.restore()
})
batch.upsert(SMALL_MESSAGE, 'a')

it('should flush on page exit', () => {
batch.add({ message: '1' })
pageExitObservable.notify({ reason: PageExitReason.UNLOADING })
expect(sendSpy).toHaveBeenCalledTimes(1)
expect(flushController.didRemoveMessage).toHaveBeenCalledOnceWith(SMALL_MESSAGE_BYTES_COUNT + SEPARATOR_BYTES_COUNT)
expect(flushController.willAddMessage).toHaveBeenCalledOnceWith(SMALL_MESSAGE_BYTES_COUNT + SEPARATOR_BYTES_COUNT)
})

it('should not send a message with a bytes size above the limit', () => {
const warnStub = sinon.stub(console, 'warn')
batch = new Batch(transport, BATCH_MESSAGES_LIMIT, BATCH_BYTES_LIMIT, 50, FLUSH_TIMEOUT, pageExitObservable)
batch.add({ message: '50 bytes - xxxxxxxxxxxxx' })
const warnSpy = spyOn(display, 'warn')
batch.add(BIG_MESSAGE_OVER_BYTES_LIMIT)

expect(sendSpy).not.toHaveBeenCalled()
warnStub.restore()
expect(warnSpy).toHaveBeenCalled()
expect(flushController.willAddMessage).not.toHaveBeenCalled()
})

it('should upsert a message for a given key', () => {
batch.upsert({ message: '1' }, 'a')
batch.upsert({ message: '2' }, 'a')
batch.upsert({ message: '3' }, 'b')
batch.upsert({ message: '4' }, 'c')
flushController.notifyFlush()

expect(sendSpy.calls.mostRecent().args[0]).toEqual({
expect(transport.send.calls.mostRecent().args[0]).toEqual({
data: '{"message":"2"}\n{"message":"3"}\n{"message":"4"}',
bytesCount: jasmine.any(Number),
flushReason,
Expand All @@ -153,8 +98,9 @@ describe('batch', () => {
batch.upsert({ message: '5' }, 'c')
batch.upsert({ message: '6' }, 'b')
batch.upsert({ message: '7' }, 'a')
flushController.notifyFlush()

expect(sendSpy.calls.mostRecent().args[0]).toEqual({
expect(transport.send.calls.mostRecent().args[0]).toEqual({
data: '{"message":"5"}\n{"message":"6"}\n{"message":"7"}',
bytesCount: jasmine.any(Number),
flushReason,
Expand All @@ -164,42 +110,29 @@ describe('batch', () => {
batch.upsert({ message: '9' }, 'b')
batch.upsert({ message: '10' }, 'a')
batch.upsert({ message: '11' }, 'b')
batch.flush(flushReason)
flushController.notifyFlush()

expect(sendSpy.calls.mostRecent().args[0]).toEqual({
expect(transport.send.calls.mostRecent().args[0]).toEqual({
data: '{"message":"10"}\n{"message":"11"}',
bytesCount: jasmine.any(Number),
flushReason,
})
})

it('should be able to use telemetry in the httpRequest.send', () => {
const fakeRequest = {
send(data: string) {
addTelemetryDebugFake()
transport.send({ data, bytesCount: BATCH_BYTES_LIMIT, flushReason })
},
} as unknown as HttpRequest
const batch = new Batch(
fakeRequest,
BATCH_MESSAGES_LIMIT,
BATCH_BYTES_LIMIT,
MESSAGE_BYTES_LIMIT,
FLUSH_TIMEOUT,
pageExitObservable
)
transport.send.and.callFake(() => {
addTelemetryDebugFake()
})
const addTelemetryDebugFake = () => batch.add({ message: 'telemetry message' })

batch.add({ message: 'normal message' })
batch.flush(flushReason)
expect(sendSpy).toHaveBeenCalledTimes(1)
batch.flush(flushReason)
expect(sendSpy).toHaveBeenCalledTimes(2)
})
expect(flushController.willAddMessage).toHaveBeenCalledTimes(1)

flushController.notifyFlush()
expect(transport.send).toHaveBeenCalledTimes(1)
expect(flushController.willAddMessage).toHaveBeenCalledTimes(2)

it('should notify when the batch is flushed', () => {
batch.add({})
batch.flush(flushReason)
expect(flushNotifySpy).toHaveBeenCalledOnceWith({ bufferBytesCount: 2, bufferMessagesCount: 1 })
flushController.notifyFlush()
expect(transport.send).toHaveBeenCalledTimes(2)
})
})
Loading