From 4cc15c6589ad95b0b2ffea84ee5a6cba17d4c59c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Zugmeyer?= Date: Thu, 6 Apr 2023 19:26:18 +0200 Subject: [PATCH 1/7] =?UTF-8?q?=E2=9C=A8=20[RUMF-1533]=20implement=20a=20f?= =?UTF-8?q?lush=20controller?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/src/browser/pageExitObservable.ts | 2 +- .../src/transport/flushController.spec.ts | 246 ++++++++++++++++++ .../core/src/transport/flushController.ts | 101 +++++++ 3 files changed, 348 insertions(+), 1 deletion(-) create mode 100644 packages/core/src/transport/flushController.spec.ts create mode 100644 packages/core/src/transport/flushController.ts diff --git a/packages/core/src/browser/pageExitObservable.ts b/packages/core/src/browser/pageExitObservable.ts index c75980aa9e..bbc15811f2 100644 --- a/packages/core/src/browser/pageExitObservable.ts +++ b/packages/core/src/browser/pageExitObservable.ts @@ -11,7 +11,7 @@ export const PageExitReason = { FROZEN: 'page_frozen', } as const -type PageExitReason = (typeof PageExitReason)[keyof typeof PageExitReason] +export type PageExitReason = (typeof PageExitReason)[keyof typeof PageExitReason] export interface PageExitEvent { reason: PageExitReason diff --git a/packages/core/src/transport/flushController.spec.ts b/packages/core/src/transport/flushController.spec.ts new file mode 100644 index 0000000000..236066499e --- /dev/null +++ b/packages/core/src/transport/flushController.spec.ts @@ -0,0 +1,246 @@ +import type { Clock } from '../../test' +import { mockClock } from '../../test' +import type { PageExitEvent } from '../browser/pageExitObservable' +import { Observable } from '../tools/observable' +import type { Duration } from '../tools/utils/timeUtils' +import type { FlushController, FlushEvent } from './flushController' +import { createFlushController } from './flushController' + +const BYTES_LIMIT = 100 +const MESSAGES_LIMIT = 5 +const DURATION_LIMIT = 100 as Duration +// Arbitrary message size that is below the BYTES_LIMIT +const SMALL_MESSAGE_BYTE_COUNT = 2 + +describe('flushController', () => { + let clock: Clock + let flushController: FlushController + let flushSpy: jasmine.Spy<(event: FlushEvent) => void> + let pageExitObservable: Observable + + beforeEach(() => { + clock = mockClock() + pageExitObservable = new Observable() + flushController = createFlushController({ + bytesLimit: BYTES_LIMIT, + messagesLimit: MESSAGES_LIMIT, + durationLimit: DURATION_LIMIT, + pageExitObservable, + }) + flushSpy = jasmine.createSpy() + flushController.flushObservable.subscribe(flushSpy) + }) + + afterEach(() => { + clock.cleanup() + }) + + it('when flushing, the event contains a reason, the bytes count and the messages count', () => { + const messagesCount = 3 + for (let i = 0; i < messagesCount; i += 1) { + flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.didAddMessage() + } + + pageExitObservable.notify({ reason: 'before_unload' }) + + expect(flushSpy).toHaveBeenCalledOnceWith({ + reason: jasmine.any(String), + bytesCount: messagesCount * SMALL_MESSAGE_BYTE_COUNT, + messagesCount, + }) + }) + + describe('page exit', () => { + it('notifies when the page is exiting', () => { + flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.didAddMessage() + pageExitObservable.notify({ reason: 'before_unload' }) + expect(flushSpy).toHaveBeenCalled() + }) + + it('flush reason should be the page exit reason', () => { + flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.didAddMessage() + pageExitObservable.notify({ reason: 'before_unload' }) + expect(flushSpy.calls.first().args[0].reason).toBe('before_unload') + }) + + it('does not notify if no message was added', () => { + pageExitObservable.notify({ reason: 'before_unload' }) + expect(flushSpy).not.toHaveBeenCalled() + }) + + it('notifies when the page is exiting even if no message have been fully added yet', () => { + flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) + pageExitObservable.notify({ reason: 'before_unload' }) + expect(flushSpy).toHaveBeenCalled() + }) + }) + + describe('bytes limit', () => { + it('notifies when the bytes limit is reached after adding a message', () => { + flushController.willAddMessage(BYTES_LIMIT) + flushController.didAddMessage() + expect(flushSpy).toHaveBeenCalled() + }) + + it('flush reason should be "bytes_limit"', () => { + flushController.willAddMessage(BYTES_LIMIT) + flushController.didAddMessage() + expect(flushSpy.calls.first().args[0].reason).toBe('bytes_limit') + }) + + it('notifies when the bytes limit will be reached before adding a message', () => { + flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.didAddMessage() + flushController.willAddMessage(BYTES_LIMIT) + expect(flushSpy).toHaveBeenCalled() + }) + + it('does not take removed messages into account', () => { + flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.didAddMessage() + flushController.didRemoveMessage(SMALL_MESSAGE_BYTE_COUNT) + + flushController.willAddMessage(BYTES_LIMIT - SMALL_MESSAGE_BYTE_COUNT) + flushController.didAddMessage() + + expect(flushSpy).not.toHaveBeenCalled() + }) + + it('does not notify when the bytes limit will be reached if no message was added yet', () => { + flushController.willAddMessage(BYTES_LIMIT) + expect(flushSpy).not.toHaveBeenCalled() + }) + + it('resets the current bytes count once flushed', () => { + flushController.willAddMessage(BYTES_LIMIT) + flushController.didAddMessage() + + flushSpy.calls.reset() + + flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.didAddMessage() + expect(flushSpy).not.toHaveBeenCalled() + }) + }) + + describe('messages limit', () => { + it('notifies when the messages limit is reached', () => { + for (let i = 0; i < MESSAGES_LIMIT; i += 1) { + flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.didAddMessage() + } + expect(flushSpy).toHaveBeenCalled() + }) + + it('flush reason should be "bytes_limit"', () => { + for (let i = 0; i < MESSAGES_LIMIT; i += 1) { + flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.didAddMessage() + } + expect(flushSpy.calls.first().args[0].reason).toBe('bytes_limit') + }) + + it('does not flush when the message was not fully added yet', () => { + for (let i = 0; i < MESSAGES_LIMIT - 1; i += 1) { + flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.didAddMessage() + } + flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) + expect(flushSpy).not.toHaveBeenCalled() + }) + + it('does not take removed messages into account', () => { + for (let i = 0; i < MESSAGES_LIMIT - 1; i += 1) { + flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.didAddMessage() + } + + flushController.didRemoveMessage(SMALL_MESSAGE_BYTE_COUNT) + + flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.didAddMessage() + + expect(flushSpy).not.toHaveBeenCalled() + }) + + it('resets the messages count when flushed', () => { + for (let i = 0; i < MESSAGES_LIMIT; i += 1) { + flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.didAddMessage() + } + + flushSpy.calls.reset() + + flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.didAddMessage() + expect(flushSpy).not.toHaveBeenCalled() + }) + }) + + describe('duration limit', () => { + it('notifies when the duration limit is reached after adding a message', () => { + flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.didAddMessage() + clock.tick(DURATION_LIMIT) + expect(flushSpy).toHaveBeenCalled() + }) + + it('flush reason should be "duration_limit"', () => { + flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.didAddMessage() + clock.tick(DURATION_LIMIT) + expect(flushSpy.calls.first().args[0].reason).toBe('duration_limit') + }) + + it('does not postpone the duration limit when another message was added', () => { + flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.didAddMessage() + clock.tick(DURATION_LIMIT / 2) + flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.didAddMessage() + clock.tick(DURATION_LIMIT / 2) + expect(flushSpy).toHaveBeenCalled() + }) + + it('does not notify if no message was added yet', () => { + clock.tick(DURATION_LIMIT) + expect(flushSpy).not.toHaveBeenCalled() + }) + + it('does not notify if a message was added then removed', () => { + flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.didAddMessage() + flushController.didRemoveMessage(SMALL_MESSAGE_BYTE_COUNT) + clock.tick(DURATION_LIMIT) + expect(flushSpy).not.toHaveBeenCalled() + }) + + it('notifies if a message was added, and another was added then removed', () => { + flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.didAddMessage() + + flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.didAddMessage() + flushController.didRemoveMessage(SMALL_MESSAGE_BYTE_COUNT) + + clock.tick(DURATION_LIMIT) + expect(flushSpy).toHaveBeenCalled() + }) + + it('does not notify prematurely if a message was added then removed then another was added', () => { + flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.didAddMessage() + flushController.didRemoveMessage(SMALL_MESSAGE_BYTE_COUNT) + clock.tick(DURATION_LIMIT / 2) + flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.didAddMessage() + clock.tick(DURATION_LIMIT / 2) + expect(flushSpy).not.toHaveBeenCalled() + clock.tick(DURATION_LIMIT / 2) + expect(flushSpy).toHaveBeenCalled() + }) + }) +}) diff --git a/packages/core/src/transport/flushController.ts b/packages/core/src/transport/flushController.ts new file mode 100644 index 0000000000..e2fdf013de --- /dev/null +++ b/packages/core/src/transport/flushController.ts @@ -0,0 +1,101 @@ +import type { PageExitEvent, PageExitReason } from '../browser/pageExitObservable' +import { Observable } from '../tools/observable' +import type { TimeoutId } from '../tools/timer' +import { clearTimeout, setTimeout } from '../tools/timer' +import type { Duration } from '../tools/utils/timeUtils' + +export type FlushReason = PageExitReason | 'duration_limit' | 'bytes_limit' + +export type FlushController = ReturnType +export interface FlushEvent { + reason: FlushReason + bytesCount: number + messagesCount: number +} + +interface FlushControllerOptions { + messagesLimit: number + bytesLimit: number + durationLimit: Duration + pageExitObservable: Observable +} + +export function createFlushController({ + messagesLimit, + bytesLimit, + durationLimit, + pageExitObservable, +}: FlushControllerOptions) { + const flushObservable = new Observable() + + pageExitObservable.subscribe((event) => flush(event.reason)) + + let currentBytesCount = 0 + let currentMessagesCount = 0 + + function flush(flushReason: FlushReason) { + if (currentMessagesCount === 0) { + return + } + + const messagesCount = currentMessagesCount + const bytesCount = currentBytesCount + + currentMessagesCount = 0 + currentBytesCount = 0 + cancelFlushTimeout() + + flushObservable.notify({ + reason: flushReason, + messagesCount, + bytesCount, + }) + } + + let flushTimeoutId: TimeoutId | undefined + function scheduleFlushTimeout() { + if (flushTimeoutId === undefined) { + flushTimeoutId = setTimeout(() => { + flush('duration_limit') + }, durationLimit) + } + } + + function cancelFlushTimeout() { + clearTimeout(flushTimeoutId) + flushTimeoutId = undefined + } + + return { + flushObservable, + get messagesCount() { + return currentMessagesCount + }, + + willAddMessage(messageBytesCount: number) { + if (currentBytesCount + messageBytesCount >= bytesLimit) { + flush('bytes_limit') + } + // Consider the message to be added now rather than in `didAddMessage`, because if no message + // was added yet and `didAddMessage` is called asynchronously, we still want to notify when a + // flush is needed (for example on page exit). + currentMessagesCount += 1 + currentBytesCount += messageBytesCount + scheduleFlushTimeout() + }, + + didAddMessage() { + if (currentMessagesCount >= messagesLimit || currentBytesCount >= bytesLimit) { + flush('bytes_limit') + } + }, + + didRemoveMessage(messageBytesCount: number) { + currentBytesCount -= messageBytesCount + currentMessagesCount -= 1 + if (currentMessagesCount === 0) { + cancelFlushTimeout() + } + }, + } +} From 1fd3dc076e0142997a1fd71f1348bb0522c2901d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Zugmeyer?= Date: Thu, 6 Apr 2023 19:27:23 +0200 Subject: [PATCH 2/7] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20[RUMF-1533]=20use=20th?= =?UTF-8?q?e=20flush=20controller=20in=20Batch?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/domain/configuration/configuration.ts | 5 +- .../configuration/endpointBuilder.spec.ts | 8 +- packages/core/src/index.ts | 3 + packages/core/src/transport/batch.spec.ts | 201 +++++++----------- packages/core/src/transport/batch.ts | 94 +++----- packages/core/src/transport/httpRequest.ts | 2 +- packages/core/src/transport/index.ts | 3 +- .../src/transport/startBatchWithReplica.ts | 13 +- .../core/test/emulate/mockFlushController.ts | 45 ++++ packages/core/test/index.ts | 1 + .../rum-core/src/transport/startRumBatch.ts | 22 +- 11 files changed, 183 insertions(+), 214 deletions(-) create mode 100644 packages/core/test/emulate/mockFlushController.ts diff --git a/packages/core/src/domain/configuration/configuration.ts b/packages/core/src/domain/configuration/configuration.ts index 2eb671749b..c2f57f639e 100644 --- a/packages/core/src/domain/configuration/configuration.ts +++ b/packages/core/src/domain/configuration/configuration.ts @@ -4,6 +4,7 @@ import { catchUserErrors } from '../../tools/catchUserErrors' import { display } from '../../tools/display' import type { RawTelemetryConfiguration } from '../telemetry' import { ExperimentalFeature, addExperimentalFeatures } from '../../tools/experimentalFeatures' +import type { Duration } from '../../tools/utils/timeUtils' import { ONE_SECOND } from '../../tools/utils/timeUtils' import { isPercentage } from '../../tools/utils/numberUtils' import { ONE_KIBI_BYTE } from '../../tools/utils/byteUtils' @@ -85,7 +86,7 @@ export interface Configuration extends TransportConfiguration { // Batch configuration batchBytesLimit: number - flushTimeout: number + flushTimeout: Duration batchMessagesLimit: number messageBytesLimit: number } @@ -148,7 +149,7 @@ export function validateAndBuildConfiguration(initConfiguration: InitConfigurati * flush automatically, aim to be lower than ALB connection timeout * to maximize connection reuse. */ - flushTimeout: 30 * ONE_SECOND, + flushTimeout: (30 * ONE_SECOND) as Duration, /** * Logs intake limit diff --git a/packages/core/src/domain/configuration/endpointBuilder.spec.ts b/packages/core/src/domain/configuration/endpointBuilder.spec.ts index 51daa579b9..53ac1efbac 100644 --- a/packages/core/src/domain/configuration/endpointBuilder.spec.ts +++ b/packages/core/src/domain/configuration/endpointBuilder.spec.ts @@ -122,7 +122,7 @@ describe('endpointBuilder', () => { it('should contain retry infos', () => { expect( - createEndpointBuilder(initConfiguration, 'rum', []).build('xhr', 'batch_bytes_limit', { + createEndpointBuilder(initConfiguration, 'rum', []).build('xhr', 'bytes_limit', { count: 5, lastFailureStatus: 408, }) @@ -131,13 +131,13 @@ describe('endpointBuilder', () => { it('should contain flush reason when ff collect_flush_reason is enabled', () => { addExperimentalFeatures([ExperimentalFeature.COLLECT_FLUSH_REASON]) - expect(createEndpointBuilder(initConfiguration, 'rum', []).build('xhr', 'batch_bytes_limit')).toContain( - 'flush_reason%3Abatch_bytes_limit' + expect(createEndpointBuilder(initConfiguration, 'rum', []).build('xhr', 'bytes_limit')).toContain( + 'flush_reason%3Abytes_limit' ) }) it('should not contain flush reason when ff collect_flush_reason is disnabled', () => { - expect(createEndpointBuilder(initConfiguration, 'rum', []).build('xhr', 'batch_bytes_limit')).not.toContain( + expect(createEndpointBuilder(initConfiguration, 'rum', []).build('xhr', 'bytes_limit')).not.toContain( 'flush_reason' ) }) diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 563d07ce14..907969c608 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -55,6 +55,9 @@ export { canUseEventBridge, getEventBridge, startBatchWithReplica, + createFlushController, + FlushEvent, + FlushReason, } from './transport' export * from './tools/display' export * from './tools/utils/urlPolyfill' diff --git a/packages/core/src/transport/batch.spec.ts b/packages/core/src/transport/batch.spec.ts index 99dc321d30..8464ebd77a 100644 --- a/packages/core/src/transport/batch.spec.ts +++ b/packages/core/src/transport/batch.spec.ts @@ -1,141 +1,87 @@ -import sinon from 'sinon' -import type { PageExitEvent } from '../browser/pageExitObservable' -import { PageExitReason } from '../browser/pageExitObservable' -import { Observable } from '../tools/observable' -import { noop } from '../tools/utils/functionUtils' -import type { FlushReason } from './batch' +import type { MockFlushController } from '../../test' +import { createMockFlushController } from '../../test' +import { display } from '../tools/display' import { Batch } from './batch' +import type { FlushReason } from './flushController' import type { HttpRequest } from './httpRequest' describe('batch', () => { - const BATCH_MESSAGES_LIMIT = 3 - const BATCH_BYTES_LIMIT = 100 - const MESSAGE_BYTES_LIMIT = 50 * 1024 - const FLUSH_TIMEOUT = 60 * 1000 + const MESSAGE_BYTES_LIMIT = 50 + + const BIG_MESSAGE_OVER_BYTES_LIMIT = { message: 'hello xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' } + const SMALL_MESSAGE = { message: 'hello' } + const SMALL_MESSAGE_BYTES_COUNT = 19 + const SEPARATOR_BYTES_COUNT = 1 + let batch: Batch - let transport: HttpRequest - let sendSpy: jasmine.Spy - let pageExitObservable: Observable + let transport: { + send: jasmine.Spy + sendOnExit: jasmine.Spy + } + let flushNotifySpy: jasmine.Spy - const flushReason: FlushReason = 'batch_bytes_limit' + const flushReason: FlushReason = 'bytes_limit' + let flushController: MockFlushController beforeEach(() => { - transport = { send: noop } as unknown as HttpRequest - sendSpy = spyOn(transport, 'send') - pageExitObservable = new Observable() - batch = new Batch( - transport, - BATCH_MESSAGES_LIMIT, - BATCH_BYTES_LIMIT, - MESSAGE_BYTES_LIMIT, - FLUSH_TIMEOUT, - pageExitObservable - ) + transport = { + send: jasmine.createSpy(), + sendOnExit: jasmine.createSpy(), + } satisfies HttpRequest + flushController = createMockFlushController() + batch = new Batch(transport, flushController, MESSAGE_BYTES_LIMIT) flushNotifySpy = spyOn(batch.flushObservable, 'notify') }) - it('should add context to message', () => { - batch.add({ message: 'hello' }) + it('should send a message', () => { + batch.add(SMALL_MESSAGE) - batch.flush(flushReason) + flushController.notifyFlush() - expect(sendSpy.calls.mostRecent().args[0]).toEqual({ + expect(transport.send.calls.mostRecent().args[0]).toEqual({ data: '{"message":"hello"}', - bytesCount: jasmine.any(Number), + bytesCount: SMALL_MESSAGE_BYTES_COUNT, flushReason, }) }) - it('should empty the batch after a flush', () => { - batch.add({ message: 'hello' }) + it('should add message to the flush controller', () => { + batch.add(SMALL_MESSAGE) - batch.flush(flushReason) - sendSpy.calls.reset() - batch.flush(flushReason) - - expect(transport.send).not.toHaveBeenCalled() + expect(flushController.willAddMessage).toHaveBeenCalledOnceWith(SMALL_MESSAGE_BYTES_COUNT) + expect(flushController.didAddMessage).toHaveBeenCalledOnceWith() }) - it('should flush when the message count limit is reached', () => { - batch.add({ message: '1' }) - batch.add({ message: '2' }) - batch.add({ message: '3' }) - expect(sendSpy.calls.mostRecent().args[0]).toEqual({ - data: '{"message":"1"}\n{"message":"2"}\n{"message":"3"}', - bytesCount: jasmine.any(Number), - flushReason, - }) - }) - - it('should flush when a new message will overflow the bytes limit', () => { - batch.add({ message: '50 bytes - xxxxxxxxxxxxxxxxxxxxxxxxx' }) - expect(sendSpy).not.toHaveBeenCalled() + it('should consider separators when adding message', () => { + batch.add(SMALL_MESSAGE) + batch.add(SMALL_MESSAGE) + batch.add(SMALL_MESSAGE) - batch.add({ message: '60 bytes - xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' }) - expect(sendSpy).toHaveBeenCalledWith({ - data: '{"message":"50 bytes - xxxxxxxxxxxxxxxxxxxxxxxxx"}', - bytesCount: 50, - flushReason, - }) - - batch.flush(flushReason) - expect(sendSpy).toHaveBeenCalledWith({ - data: '{"message":"60 bytes - xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"}', - bytesCount: 60, - flushReason, - }) + expect(flushController.willAddMessage.calls.allArgs()).toEqual([ + [SMALL_MESSAGE_BYTES_COUNT], + [SMALL_MESSAGE_BYTES_COUNT + SEPARATOR_BYTES_COUNT], + [SMALL_MESSAGE_BYTES_COUNT + SEPARATOR_BYTES_COUNT], + ]) }) - it('should consider separators when computing the byte count', () => { - batch.add({ message: '30 bytes - xxxxx' }) // batch: 30 sep: 0 - batch.add({ message: '30 bytes - xxxxx' }) // batch: 60 sep: 1 - batch.add({ message: '39 bytes - xxxxxxxxxxxxxx' }) // batch: 99 sep: 2 + it('should consider separators when replacing messages', () => { + batch.add(SMALL_MESSAGE) + batch.upsert(SMALL_MESSAGE, 'a') - expect(sendSpy).toHaveBeenCalledWith({ - data: '{"message":"30 bytes - xxxxx"}\n{"message":"30 bytes - xxxxx"}', - bytesCount: 61, - flushReason, - }) - }) - - it('should call send one time when the byte count is too high and the batch is empty', () => { - const message = '101 bytes - xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' - batch.add({ message }) - expect(sendSpy).toHaveBeenCalledWith({ data: `{"message":"${message}"}`, bytesCount: 101, flushReason }) - }) - - it('should flush the batch and send the message when the message is too heavy', () => { - const message = '101 bytes - xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' - - batch.add({ message: '50 bytes - xxxxxxxxxxxxxxxxxxxxxxxxx' }) - batch.add({ message }) - expect(sendSpy).toHaveBeenCalledTimes(2) - }) - - it('should flush after timeout', () => { - const clock = sinon.useFakeTimers() - batch = new Batch(transport, BATCH_MESSAGES_LIMIT, BATCH_BYTES_LIMIT, MESSAGE_BYTES_LIMIT, 10, pageExitObservable) - batch.add({ message: '50 bytes - xxxxxxxxxxxxxxxxxxxxxxxxx' }) - clock.tick(100) + flushController.willAddMessage.calls.reset() - expect(sendSpy).toHaveBeenCalled() - - clock.restore() - }) + batch.upsert(SMALL_MESSAGE, 'a') - it('should flush on page exit', () => { - batch.add({ message: '1' }) - pageExitObservable.notify({ reason: PageExitReason.UNLOADING }) - expect(sendSpy).toHaveBeenCalledTimes(1) + expect(flushController.didRemoveMessage).toHaveBeenCalledOnceWith(SMALL_MESSAGE_BYTES_COUNT + SEPARATOR_BYTES_COUNT) + expect(flushController.willAddMessage).toHaveBeenCalledOnceWith(SMALL_MESSAGE_BYTES_COUNT + SEPARATOR_BYTES_COUNT) }) it('should not send a message with a bytes size above the limit', () => { - const warnStub = sinon.stub(console, 'warn') - batch = new Batch(transport, BATCH_MESSAGES_LIMIT, BATCH_BYTES_LIMIT, 50, FLUSH_TIMEOUT, pageExitObservable) - batch.add({ message: '50 bytes - xxxxxxxxxxxxx' }) + const warnSpy = spyOn(display, 'warn') + batch.add(BIG_MESSAGE_OVER_BYTES_LIMIT) - expect(sendSpy).not.toHaveBeenCalled() - warnStub.restore() + expect(warnSpy).toHaveBeenCalled() + expect(flushController.willAddMessage).not.toHaveBeenCalled() }) it('should upsert a message for a given key', () => { @@ -143,8 +89,9 @@ describe('batch', () => { batch.upsert({ message: '2' }, 'a') batch.upsert({ message: '3' }, 'b') batch.upsert({ message: '4' }, 'c') + flushController.notifyFlush() - expect(sendSpy.calls.mostRecent().args[0]).toEqual({ + expect(transport.send.calls.mostRecent().args[0]).toEqual({ data: '{"message":"2"}\n{"message":"3"}\n{"message":"4"}', bytesCount: jasmine.any(Number), flushReason, @@ -153,8 +100,9 @@ describe('batch', () => { batch.upsert({ message: '5' }, 'c') batch.upsert({ message: '6' }, 'b') batch.upsert({ message: '7' }, 'a') + flushController.notifyFlush() - expect(sendSpy.calls.mostRecent().args[0]).toEqual({ + expect(transport.send.calls.mostRecent().args[0]).toEqual({ data: '{"message":"5"}\n{"message":"6"}\n{"message":"7"}', bytesCount: jasmine.any(Number), flushReason, @@ -164,9 +112,9 @@ describe('batch', () => { batch.upsert({ message: '9' }, 'b') batch.upsert({ message: '10' }, 'a') batch.upsert({ message: '11' }, 'b') - batch.flush(flushReason) + flushController.notifyFlush() - expect(sendSpy.calls.mostRecent().args[0]).toEqual({ + expect(transport.send.calls.mostRecent().args[0]).toEqual({ data: '{"message":"10"}\n{"message":"11"}', bytesCount: jasmine.any(Number), flushReason, @@ -174,32 +122,25 @@ describe('batch', () => { }) it('should be able to use telemetry in the httpRequest.send', () => { - const fakeRequest = { - send(data: string) { - addTelemetryDebugFake() - transport.send({ data, bytesCount: BATCH_BYTES_LIMIT, flushReason }) - }, - } as unknown as HttpRequest - const batch = new Batch( - fakeRequest, - BATCH_MESSAGES_LIMIT, - BATCH_BYTES_LIMIT, - MESSAGE_BYTES_LIMIT, - FLUSH_TIMEOUT, - pageExitObservable - ) + transport.send.and.callFake(() => { + addTelemetryDebugFake() + }) const addTelemetryDebugFake = () => batch.add({ message: 'telemetry message' }) batch.add({ message: 'normal message' }) - batch.flush(flushReason) - expect(sendSpy).toHaveBeenCalledTimes(1) - batch.flush(flushReason) - expect(sendSpy).toHaveBeenCalledTimes(2) + expect(flushController.willAddMessage).toHaveBeenCalledTimes(1) + + flushController.notifyFlush() + expect(transport.send).toHaveBeenCalledTimes(1) + expect(flushController.willAddMessage).toHaveBeenCalledTimes(2) + + flushController.notifyFlush() + expect(transport.send).toHaveBeenCalledTimes(2) }) it('should notify when the batch is flushed', () => { batch.add({}) - batch.flush(flushReason) + flushController.notifyFlush() expect(flushNotifySpy).toHaveBeenCalledOnceWith({ bufferBytesCount: 2, bufferMessagesCount: 1 }) }) }) diff --git a/packages/core/src/transport/batch.ts b/packages/core/src/transport/batch.ts index 7ff98382f2..c1a96101e5 100644 --- a/packages/core/src/transport/batch.ts +++ b/packages/core/src/transport/batch.ts @@ -1,44 +1,30 @@ import { display } from '../tools/display' import type { Context } from '../tools/serialisation/context' import { Observable } from '../tools/observable' -import { setTimeout } from '../tools/timer' -import type { PageExitEvent } from '../browser/pageExitObservable' import { objectValues } from '../tools/utils/polyfills' +import { isPageExitReason } from '../browser/pageExitObservable' import { computeBytesCount } from '../tools/utils/byteUtils' import { jsonStringify } from '../tools/serialisation/jsonStringify' import type { HttpRequest } from './httpRequest' +import type { FlushController, FlushEvent } from './flushController' export interface BatchFlushEvent { bufferBytesCount: number bufferMessagesCount: number } -export type FlushReason = - | 'batch_duration_limit' - | 'batch_bytes_limit' - | 'before_unload' - | 'page_hide' - | 'visibility_hidden' - | 'page_frozen' - export class Batch { flushObservable = new Observable() private pushOnlyBuffer: string[] = [] private upsertBuffer: { [key: string]: string } = {} - private bufferBytesCount = 0 - private bufferMessagesCount = 0 constructor( private request: HttpRequest, - private batchMessagesLimit: number, - private batchBytesLimit: number, - private messageBytesLimit: number, - private flushTimeout: number, - private pageExitObservable: Observable + private flushController: FlushController, + private messageBytesLimit: number ) { - pageExitObservable.subscribe((event) => this.flush(event.reason, this.request.sendOnExit)) - this.flushPeriodically() + this.flushController.flushObservable.subscribe((event) => this.flush(event)) } add(message: Context) { @@ -49,44 +35,41 @@ export class Batch { this.addOrUpdate(message, key) } - flush(flushReason: FlushReason, sendFn = this.request.send) { - if (this.bufferMessagesCount !== 0) { - const messages = this.pushOnlyBuffer.concat(objectValues(this.upsertBuffer)) - const bytesCount = this.bufferBytesCount + flush(event: FlushEvent) { + const messages = this.pushOnlyBuffer.concat(objectValues(this.upsertBuffer)) - this.flushObservable.notify({ - bufferBytesCount: this.bufferBytesCount, - bufferMessagesCount: this.bufferMessagesCount, - }) + this.flushObservable.notify({ + // TODO + bufferBytesCount: event.bytesCount, + bufferMessagesCount: event.messagesCount, + }) - this.pushOnlyBuffer = [] - this.upsertBuffer = {} - this.bufferBytesCount = 0 - this.bufferMessagesCount = 0 + this.pushOnlyBuffer = [] + this.upsertBuffer = {} - sendFn({ data: messages.join('\n'), bytesCount, flushReason }) + const payload = { data: messages.join('\n'), bytesCount: event.bytesCount, flushReason: event.reason } + if (isPageExitReason(event.reason)) { + this.request.sendOnExit(payload) + } else { + this.request.send(payload) } } private addOrUpdate(message: Context, key?: string) { const { processedMessage, messageBytesCount } = this.process(message) + if (messageBytesCount >= this.messageBytesLimit) { display.warn( `Discarded a message whose size was bigger than the maximum allowed size ${this.messageBytesLimit}KB.` ) return } + if (this.hasMessageFor(key)) { this.remove(key) } - if (this.willReachedBytesLimitWith(messageBytesCount)) { - this.flush('batch_bytes_limit') - } this.push(processedMessage, messageBytesCount, key) - if (this.isFull()) { - this.flush('batch_bytes_limit') - } } private process(message: Context) { @@ -96,47 +79,28 @@ export class Batch { } private push(processedMessage: string, messageBytesCount: number, key?: string) { - if (this.bufferMessagesCount > 0) { - // \n separator at serialization - this.bufferBytesCount += 1 - } + // If there are other messages, a '\n' will be added at serialization + const separatorBytesCount = this.flushController.messagesCount > 0 ? 1 : 0 + + this.flushController.willAddMessage(messageBytesCount + separatorBytesCount) if (key !== undefined) { this.upsertBuffer[key] = processedMessage } else { this.pushOnlyBuffer.push(processedMessage) } - this.bufferBytesCount += messageBytesCount - this.bufferMessagesCount += 1 + this.flushController.didAddMessage() } private remove(key: string) { const removedMessage = this.upsertBuffer[key] delete this.upsertBuffer[key] const messageBytesCount = computeBytesCount(removedMessage) - this.bufferBytesCount -= messageBytesCount - this.bufferMessagesCount -= 1 - if (this.bufferMessagesCount > 0) { - this.bufferBytesCount -= 1 - } + // If there are other messages, a '\n' will be added at serialization + const separatorBytesCount = this.flushController.messagesCount > 1 ? 1 : 0 + this.flushController.didRemoveMessage(messageBytesCount + separatorBytesCount) } private hasMessageFor(key?: string): key is string { return key !== undefined && this.upsertBuffer[key] !== undefined } - - private willReachedBytesLimitWith(messageBytesCount: number) { - // byte of the separator at the end of the message - return this.bufferBytesCount + messageBytesCount + 1 >= this.batchBytesLimit - } - - private isFull() { - return this.bufferMessagesCount === this.batchMessagesLimit || this.bufferBytesCount >= this.batchBytesLimit - } - - private flushPeriodically() { - setTimeout(() => { - this.flush('batch_duration_limit') - this.flushPeriodically() - }, this.flushTimeout) - } } diff --git a/packages/core/src/transport/httpRequest.ts b/packages/core/src/transport/httpRequest.ts index 7f07a4f1f5..a069ec1546 100644 --- a/packages/core/src/transport/httpRequest.ts +++ b/packages/core/src/transport/httpRequest.ts @@ -5,7 +5,7 @@ import { monitor } from '../tools/monitor' import type { RawError } from '../domain/error/error' import { addEventListener } from '../browser/addEventListener' import { newRetryState, sendWithRetryStrategy } from './sendWithRetryStrategy' -import type { FlushReason } from './batch' +import type { FlushReason } from './flushController' /** * Use POST request without content type to: diff --git a/packages/core/src/transport/index.ts b/packages/core/src/transport/index.ts index e61eacfe2f..924888ee8b 100644 --- a/packages/core/src/transport/index.ts +++ b/packages/core/src/transport/index.ts @@ -1,4 +1,5 @@ export { HttpRequest, createHttpRequest, Payload, RetryInfo } from './httpRequest' -export { Batch, BatchFlushEvent, FlushReason } from './batch' +export { Batch, BatchFlushEvent } from './batch' export { canUseEventBridge, getEventBridge, BrowserWindowWithEventBridge } from './eventBridge' export { startBatchWithReplica } from './startBatchWithReplica' +export { createFlushController, FlushController, FlushEvent, FlushReason } from './flushController' diff --git a/packages/core/src/transport/startBatchWithReplica.ts b/packages/core/src/transport/startBatchWithReplica.ts index 8e928e86d4..4925a4c7ba 100644 --- a/packages/core/src/transport/startBatchWithReplica.ts +++ b/packages/core/src/transport/startBatchWithReplica.ts @@ -5,6 +5,7 @@ import type { Observable } from '../tools/observable' import type { PageExitEvent } from '../browser/pageExitObservable' import { Batch } from './batch' import { createHttpRequest } from './httpRequest' +import { createFlushController } from './flushController' export function startBatchWithReplica( configuration: Configuration, @@ -22,11 +23,13 @@ export function startBatchWithReplica( function createBatch(endpointBuilder: EndpointBuilder) { return new Batch( createHttpRequest(endpointBuilder, configuration.batchBytesLimit, reportError), - configuration.batchMessagesLimit, - configuration.batchBytesLimit, - configuration.messageBytesLimit, - configuration.flushTimeout, - pageExitObservable + createFlushController({ + messagesLimit: configuration.batchMessagesLimit, + bytesLimit: configuration.batchBytesLimit, + durationLimit: configuration.flushTimeout, + pageExitObservable, + }), + configuration.messageBytesLimit ) } diff --git a/packages/core/test/emulate/mockFlushController.ts b/packages/core/test/emulate/mockFlushController.ts new file mode 100644 index 0000000000..dac7cf23c5 --- /dev/null +++ b/packages/core/test/emulate/mockFlushController.ts @@ -0,0 +1,45 @@ +import { Observable } from '../../src/tools/observable' +import type { FlushEvent, FlushController } from '../../src/transport' + +export type MockFlushController = ReturnType + +export function createMockFlushController() { + const flushObservable = new Observable() + let currentMessagesCount = 0 + let currentBytesCount = 0 + + return { + willAddMessage: jasmine.createSpy().and.callFake((messageBytesCount) => { + currentBytesCount += messageBytesCount + currentMessagesCount += 1 + }), + didAddMessage: jasmine.createSpy(), + didRemoveMessage: jasmine.createSpy().and.callFake((messageBytesCount) => { + currentBytesCount -= messageBytesCount + currentMessagesCount -= 1 + }), + get messagesCount() { + return currentMessagesCount + }, + flushObservable, + notifyFlush() { + if (currentMessagesCount === 0) { + throw new Error( + 'MockFlushController.notifyFlush(): the original FlushController would not notify flush if no message was added' + ) + } + + const messagesCount = currentMessagesCount + const bytesCount = currentBytesCount + + currentMessagesCount = 0 + currentBytesCount = 0 + + flushObservable.notify({ + reason: 'bytes_limit', + bytesCount, + messagesCount, + }) + }, + } satisfies Record & FlushController +} diff --git a/packages/core/test/index.ts b/packages/core/test/index.ts index cdffa284cc..452e6062c6 100644 --- a/packages/core/test/index.ts +++ b/packages/core/test/index.ts @@ -15,3 +15,4 @@ export * from './emulate/eventBridge' export * from './emulate/eventBridge' export * from './emulate/windowOnError' export * from './emulate/cookie' +export * from './emulate/mockFlushController' diff --git a/packages/rum-core/src/transport/startRumBatch.ts b/packages/rum-core/src/transport/startRumBatch.ts index 0694f0396e..3b2b76a73d 100644 --- a/packages/rum-core/src/transport/startRumBatch.ts +++ b/packages/rum-core/src/transport/startRumBatch.ts @@ -7,7 +7,13 @@ import type { PageExitEvent, BatchFlushEvent, } from '@datadog/browser-core' -import { Batch, combine, createHttpRequest, isTelemetryReplicationAllowed } from '@datadog/browser-core' +import { + createFlushController, + Batch, + combine, + createHttpRequest, + isTelemetryReplicationAllowed, +} from '@datadog/browser-core' import type { RumConfiguration } from '../domain/configuration' import type { LifeCycle } from '../domain/lifeCycle' import { LifeCycleEventType } from '../domain/lifeCycle' @@ -55,13 +61,17 @@ function makeRumBatch( } function createRumBatch(endpointBuilder: EndpointBuilder) { + const flushController = createFlushController({ + messagesLimit: configuration.batchMessagesLimit, + bytesLimit: configuration.batchBytesLimit, + durationLimit: configuration.flushTimeout, + pageExitObservable, + }) + return new Batch( createHttpRequest(endpointBuilder, configuration.batchBytesLimit, reportError), - configuration.batchMessagesLimit, - configuration.batchBytesLimit, - configuration.messageBytesLimit, - configuration.flushTimeout, - pageExitObservable + flushController, + configuration.messageBytesLimit ) } From 07ef2fb7345d88623bde6479316542a26c26d0dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Zugmeyer?= Date: Thu, 6 Apr 2023 11:01:18 +0200 Subject: [PATCH 3/7] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20[RUMF-1533]=20use=20th?= =?UTF-8?q?e=20flush=20controller=20for=20customer=20data=20telemetry?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/core/src/index.ts | 1 - packages/core/src/transport/batch.spec.ts | 8 -------- packages/core/src/transport/batch.ts | 14 -------------- packages/core/src/transport/index.ts | 2 +- .../domain/startCustomerDataTelemetry.spec.ts | 14 +++++++++----- .../src/domain/startCustomerDataTelemetry.ts | 10 +++++----- .../rum-core/src/transport/startRumBatch.ts | 19 +++++++++++++------ 7 files changed, 28 insertions(+), 40 deletions(-) diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 907969c608..5be122f49d 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -51,7 +51,6 @@ export { Payload, createHttpRequest, Batch, - BatchFlushEvent, canUseEventBridge, getEventBridge, startBatchWithReplica, diff --git a/packages/core/src/transport/batch.spec.ts b/packages/core/src/transport/batch.spec.ts index 8464ebd77a..8f7a8dafe7 100644 --- a/packages/core/src/transport/batch.spec.ts +++ b/packages/core/src/transport/batch.spec.ts @@ -19,7 +19,6 @@ describe('batch', () => { sendOnExit: jasmine.Spy } - let flushNotifySpy: jasmine.Spy const flushReason: FlushReason = 'bytes_limit' let flushController: MockFlushController @@ -30,7 +29,6 @@ describe('batch', () => { } satisfies HttpRequest flushController = createMockFlushController() batch = new Batch(transport, flushController, MESSAGE_BYTES_LIMIT) - flushNotifySpy = spyOn(batch.flushObservable, 'notify') }) it('should send a message', () => { @@ -137,10 +135,4 @@ describe('batch', () => { flushController.notifyFlush() expect(transport.send).toHaveBeenCalledTimes(2) }) - - it('should notify when the batch is flushed', () => { - batch.add({}) - flushController.notifyFlush() - expect(flushNotifySpy).toHaveBeenCalledOnceWith({ bufferBytesCount: 2, bufferMessagesCount: 1 }) - }) }) diff --git a/packages/core/src/transport/batch.ts b/packages/core/src/transport/batch.ts index c1a96101e5..e95ad16107 100644 --- a/packages/core/src/transport/batch.ts +++ b/packages/core/src/transport/batch.ts @@ -1,6 +1,5 @@ import { display } from '../tools/display' import type { Context } from '../tools/serialisation/context' -import { Observable } from '../tools/observable' import { objectValues } from '../tools/utils/polyfills' import { isPageExitReason } from '../browser/pageExitObservable' import { computeBytesCount } from '../tools/utils/byteUtils' @@ -8,14 +7,7 @@ import { jsonStringify } from '../tools/serialisation/jsonStringify' import type { HttpRequest } from './httpRequest' import type { FlushController, FlushEvent } from './flushController' -export interface BatchFlushEvent { - bufferBytesCount: number - bufferMessagesCount: number -} - export class Batch { - flushObservable = new Observable() - private pushOnlyBuffer: string[] = [] private upsertBuffer: { [key: string]: string } = {} @@ -38,12 +30,6 @@ export class Batch { flush(event: FlushEvent) { const messages = this.pushOnlyBuffer.concat(objectValues(this.upsertBuffer)) - this.flushObservable.notify({ - // TODO - bufferBytesCount: event.bytesCount, - bufferMessagesCount: event.messagesCount, - }) - this.pushOnlyBuffer = [] this.upsertBuffer = {} diff --git a/packages/core/src/transport/index.ts b/packages/core/src/transport/index.ts index 924888ee8b..8fb69d59fc 100644 --- a/packages/core/src/transport/index.ts +++ b/packages/core/src/transport/index.ts @@ -1,5 +1,5 @@ export { HttpRequest, createHttpRequest, Payload, RetryInfo } from './httpRequest' -export { Batch, BatchFlushEvent } from './batch' +export { Batch } from './batch' export { canUseEventBridge, getEventBridge, BrowserWindowWithEventBridge } from './eventBridge' export { startBatchWithReplica } from './startBatchWithReplica' export { createFlushController, FlushController, FlushEvent, FlushReason } from './flushController' diff --git a/packages/rum-core/src/domain/startCustomerDataTelemetry.spec.ts b/packages/rum-core/src/domain/startCustomerDataTelemetry.spec.ts index 41ce3c1513..37e43b21fa 100644 --- a/packages/rum-core/src/domain/startCustomerDataTelemetry.spec.ts +++ b/packages/rum-core/src/domain/startCustomerDataTelemetry.spec.ts @@ -1,4 +1,4 @@ -import type { BatchFlushEvent, Context, ContextManager, TelemetryEvent } from '@datadog/browser-core' +import type { FlushEvent, Context, ContextManager, TelemetryEvent } from '@datadog/browser-core' import { resetExperimentalFeatures, TelemetryService, startTelemetry, Observable } from '@datadog/browser-core' import type { TestSetupBuilder } from '../../test' import { setup } from '../../test' @@ -10,7 +10,7 @@ import { MEASURES_PERIOD_DURATION, startCustomerDataTelemetry } from './startCus describe('customerDataTelemetry', () => { let setupBuilder: TestSetupBuilder - let batchFlushObservable: Observable + let batchFlushObservable: Observable let telemetryEvents: TelemetryEvent[] let fakeContext: Context let fakeContextBytesCount: number @@ -35,7 +35,11 @@ describe('customerDataTelemetry', () => { for (let index = 0; index < eventNumber; index++) { lifeCycle.notify(LifeCycleEventType.RUM_EVENT_COLLECTED, viewEvent) } - batchFlushObservable.notify({ bufferBytesCount: batchBytesCount, bufferMessagesCount: eventNumber }) + batchFlushObservable.notify({ + reason: 'duration_limit', + bytesCount: batchBytesCount, + messagesCount: eventNumber, + }) } function spyOnContextManager(contextManager: ContextManager) { @@ -125,7 +129,7 @@ describe('customerDataTelemetry', () => { it('should collect customer data only if batches contains rum events, no just telemetry', () => { const { clock } = setupBuilder.build() - batchFlushObservable.notify({ bufferBytesCount: 1, bufferMessagesCount: 1 }) + batchFlushObservable.notify({ reason: 'duration_limit', bytesCount: 1, messagesCount: 1 }) clock.tick(MEASURES_PERIOD_DURATION) @@ -136,7 +140,7 @@ describe('customerDataTelemetry', () => { const { clock } = setupBuilder.build() lifeCycle.notify(LifeCycleEventType.RUM_EVENT_COLLECTED, viewEvent) - batchFlushObservable.notify({ bufferBytesCount: 1, bufferMessagesCount: 1 }) + batchFlushObservable.notify({ reason: 'duration_limit', bytesCount: 1, messagesCount: 1 }) lifeCycle.notify(LifeCycleEventType.RUM_EVENT_COLLECTED, viewEvent) clock.tick(MEASURES_PERIOD_DURATION) diff --git a/packages/rum-core/src/domain/startCustomerDataTelemetry.ts b/packages/rum-core/src/domain/startCustomerDataTelemetry.ts index 49352bea18..3158019e1e 100644 --- a/packages/rum-core/src/domain/startCustomerDataTelemetry.ts +++ b/packages/rum-core/src/domain/startCustomerDataTelemetry.ts @@ -1,4 +1,4 @@ -import type { BatchFlushEvent, Context, ContextManager, Observable, Telemetry } from '@datadog/browser-core' +import type { Context, ContextManager, FlushEvent, Observable, Telemetry } from '@datadog/browser-core' import { isEmptyObject, includes, performDraw, ONE_SECOND, addTelemetryDebug, setInterval } from '@datadog/browser-core' import { RumEventType } from '../rawRumEvent.types' import type { RumEvent } from '../rumEvent.types' @@ -41,7 +41,7 @@ export function startCustomerDataTelemetry( globalContextManager: ContextManager, userContextManager: ContextManager, featureFlagContexts: FeatureFlagContexts, - batchFlushObservable: Observable + batchFlushObservable: Observable ) { const customerDataTelemetryEnabled = telemetry.enabled && performDraw(configuration.customerDataTelemetrySampleRate) if (!customerDataTelemetryEnabled) { @@ -76,15 +76,15 @@ export function startCustomerDataTelemetry( ) }) - batchFlushObservable.subscribe(({ bufferBytesCount, bufferMessagesCount }) => { + batchFlushObservable.subscribe(({ bytesCount, messagesCount }) => { // Don't measure batch that only contains telemetry events to avoid batch sending loop // It could happen because after each batch we are adding a customer data measures telemetry event to the next one if (!batchHasRumEvent) { return } currentPeriodMeasures.batchCount += 1 - updateMeasure(currentPeriodMeasures.batchBytesCount, bufferBytesCount) - updateMeasure(currentPeriodMeasures.batchMessagesCount, bufferMessagesCount) + updateMeasure(currentPeriodMeasures.batchBytesCount, bytesCount) + updateMeasure(currentPeriodMeasures.batchMessagesCount, messagesCount) mergeMeasure(currentPeriodMeasures.globalContextBytes, currentBatchMeasures.globalContextBytes) mergeMeasure(currentPeriodMeasures.userContextBytes, currentBatchMeasures.userContextBytes) mergeMeasure(currentPeriodMeasures.featureFlagBytes, currentBatchMeasures.featureFlagBytes) diff --git a/packages/rum-core/src/transport/startRumBatch.ts b/packages/rum-core/src/transport/startRumBatch.ts index 3b2b76a73d..89cd242086 100644 --- a/packages/rum-core/src/transport/startRumBatch.ts +++ b/packages/rum-core/src/transport/startRumBatch.ts @@ -5,7 +5,7 @@ import type { Observable, RawError, PageExitEvent, - BatchFlushEvent, + FlushEvent, } from '@datadog/browser-core' import { createFlushController, @@ -43,7 +43,7 @@ export function startRumBatch( } export interface RumBatch { - flushObservable: Observable + flushObservable: Observable add: (message: Context, replicated?: boolean) => void upsert: (message: Context, key: string) => void } @@ -53,11 +53,13 @@ function makeRumBatch( reportError: (error: RawError) => void, pageExitObservable: Observable ): RumBatch { - const primaryBatch = createRumBatch(configuration.rumEndpointBuilder) + const { batch: primaryBatch, flushController: primaryFlushController } = createRumBatch( + configuration.rumEndpointBuilder + ) let replicaBatch: Batch | undefined const replica = configuration.replica if (replica !== undefined) { - replicaBatch = createRumBatch(replica.rumEndpointBuilder) + replicaBatch = createRumBatch(replica.rumEndpointBuilder).batch } function createRumBatch(endpointBuilder: EndpointBuilder) { @@ -68,11 +70,16 @@ function makeRumBatch( pageExitObservable, }) - return new Batch( + const batch = new Batch( createHttpRequest(endpointBuilder, configuration.batchBytesLimit, reportError), flushController, configuration.messageBytesLimit ) + + return { + batch, + flushController, + } } function withReplicaApplicationId(message: Context) { @@ -80,7 +87,7 @@ function makeRumBatch( } return { - flushObservable: primaryBatch.flushObservable, + flushObservable: primaryFlushController.flushObservable, add: (message: Context, replicated = true) => { primaryBatch.add(message) if (replicaBatch && replicated) { From e1c0b017cecf902980e488babb4c9b0cd16f393c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Zugmeyer?= Date: Tue, 11 Apr 2023 12:06:40 +0200 Subject: [PATCH 4/7] =?UTF-8?q?=F0=9F=91=8C=20rename=20'flush=20timeout'?= =?UTF-8?q?=20to=20'duration=20limit=20timeout'?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/src/transport/flushController.ts | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/packages/core/src/transport/flushController.ts b/packages/core/src/transport/flushController.ts index e2fdf013de..11d243effd 100644 --- a/packages/core/src/transport/flushController.ts +++ b/packages/core/src/transport/flushController.ts @@ -43,7 +43,7 @@ export function createFlushController({ currentMessagesCount = 0 currentBytesCount = 0 - cancelFlushTimeout() + cancelDurationLimitTimeout() flushObservable.notify({ reason: flushReason, @@ -52,18 +52,18 @@ export function createFlushController({ }) } - let flushTimeoutId: TimeoutId | undefined - function scheduleFlushTimeout() { - if (flushTimeoutId === undefined) { - flushTimeoutId = setTimeout(() => { + let durationLimitTimeoutId: TimeoutId | undefined + function scheduleDurationLimitTimeout() { + if (durationLimitTimeoutId === undefined) { + durationLimitTimeoutId = setTimeout(() => { flush('duration_limit') }, durationLimit) } } - function cancelFlushTimeout() { - clearTimeout(flushTimeoutId) - flushTimeoutId = undefined + function cancelDurationLimitTimeout() { + clearTimeout(durationLimitTimeoutId) + durationLimitTimeoutId = undefined } return { @@ -81,7 +81,7 @@ export function createFlushController({ // flush is needed (for example on page exit). currentMessagesCount += 1 currentBytesCount += messageBytesCount - scheduleFlushTimeout() + scheduleDurationLimitTimeout() }, didAddMessage() { @@ -94,7 +94,7 @@ export function createFlushController({ currentBytesCount -= messageBytesCount currentMessagesCount -= 1 if (currentMessagesCount === 0) { - cancelFlushTimeout() + cancelDurationLimitTimeout() } }, } From b8a7a04bddcb9721e406894187be6d565f563273 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Zugmeyer?= Date: Tue, 11 Apr 2023 12:12:40 +0200 Subject: [PATCH 5/7] =?UTF-8?q?=F0=9F=91=8C=20rename=20{will,did}{Add,Remo?= =?UTF-8?q?ve}Message?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/core/src/transport/batch.spec.ts | 22 ++-- packages/core/src/transport/batch.ts | 6 +- .../src/transport/flushController.spec.ts | 122 +++++++++--------- .../core/src/transport/flushController.ts | 10 +- .../core/test/emulate/mockFlushController.ts | 22 ++-- 5 files changed, 95 insertions(+), 87 deletions(-) diff --git a/packages/core/src/transport/batch.spec.ts b/packages/core/src/transport/batch.spec.ts index 8f7a8dafe7..b243bc6ef1 100644 --- a/packages/core/src/transport/batch.spec.ts +++ b/packages/core/src/transport/batch.spec.ts @@ -46,8 +46,8 @@ describe('batch', () => { it('should add message to the flush controller', () => { batch.add(SMALL_MESSAGE) - expect(flushController.willAddMessage).toHaveBeenCalledOnceWith(SMALL_MESSAGE_BYTES_COUNT) - expect(flushController.didAddMessage).toHaveBeenCalledOnceWith() + expect(flushController.notifyBeforeAddMessage).toHaveBeenCalledOnceWith(SMALL_MESSAGE_BYTES_COUNT) + expect(flushController.notifyAfterAddMessage).toHaveBeenCalledOnceWith() }) it('should consider separators when adding message', () => { @@ -55,7 +55,7 @@ describe('batch', () => { batch.add(SMALL_MESSAGE) batch.add(SMALL_MESSAGE) - expect(flushController.willAddMessage.calls.allArgs()).toEqual([ + expect(flushController.notifyBeforeAddMessage.calls.allArgs()).toEqual([ [SMALL_MESSAGE_BYTES_COUNT], [SMALL_MESSAGE_BYTES_COUNT + SEPARATOR_BYTES_COUNT], [SMALL_MESSAGE_BYTES_COUNT + SEPARATOR_BYTES_COUNT], @@ -66,12 +66,16 @@ describe('batch', () => { batch.add(SMALL_MESSAGE) batch.upsert(SMALL_MESSAGE, 'a') - flushController.willAddMessage.calls.reset() + flushController.notifyBeforeAddMessage.calls.reset() batch.upsert(SMALL_MESSAGE, 'a') - expect(flushController.didRemoveMessage).toHaveBeenCalledOnceWith(SMALL_MESSAGE_BYTES_COUNT + SEPARATOR_BYTES_COUNT) - expect(flushController.willAddMessage).toHaveBeenCalledOnceWith(SMALL_MESSAGE_BYTES_COUNT + SEPARATOR_BYTES_COUNT) + expect(flushController.notifyAfterRemoveMessage).toHaveBeenCalledOnceWith( + SMALL_MESSAGE_BYTES_COUNT + SEPARATOR_BYTES_COUNT + ) + expect(flushController.notifyBeforeAddMessage).toHaveBeenCalledOnceWith( + SMALL_MESSAGE_BYTES_COUNT + SEPARATOR_BYTES_COUNT + ) }) it('should not send a message with a bytes size above the limit', () => { @@ -79,7 +83,7 @@ describe('batch', () => { batch.add(BIG_MESSAGE_OVER_BYTES_LIMIT) expect(warnSpy).toHaveBeenCalled() - expect(flushController.willAddMessage).not.toHaveBeenCalled() + expect(flushController.notifyBeforeAddMessage).not.toHaveBeenCalled() }) it('should upsert a message for a given key', () => { @@ -126,11 +130,11 @@ describe('batch', () => { const addTelemetryDebugFake = () => batch.add({ message: 'telemetry message' }) batch.add({ message: 'normal message' }) - expect(flushController.willAddMessage).toHaveBeenCalledTimes(1) + expect(flushController.notifyBeforeAddMessage).toHaveBeenCalledTimes(1) flushController.notifyFlush() expect(transport.send).toHaveBeenCalledTimes(1) - expect(flushController.willAddMessage).toHaveBeenCalledTimes(2) + expect(flushController.notifyBeforeAddMessage).toHaveBeenCalledTimes(2) flushController.notifyFlush() expect(transport.send).toHaveBeenCalledTimes(2) diff --git a/packages/core/src/transport/batch.ts b/packages/core/src/transport/batch.ts index e95ad16107..c7bd9d5bd3 100644 --- a/packages/core/src/transport/batch.ts +++ b/packages/core/src/transport/batch.ts @@ -68,13 +68,13 @@ export class Batch { // If there are other messages, a '\n' will be added at serialization const separatorBytesCount = this.flushController.messagesCount > 0 ? 1 : 0 - this.flushController.willAddMessage(messageBytesCount + separatorBytesCount) + this.flushController.notifyBeforeAddMessage(messageBytesCount + separatorBytesCount) if (key !== undefined) { this.upsertBuffer[key] = processedMessage } else { this.pushOnlyBuffer.push(processedMessage) } - this.flushController.didAddMessage() + this.flushController.notifyAfterAddMessage() } private remove(key: string) { @@ -83,7 +83,7 @@ export class Batch { const messageBytesCount = computeBytesCount(removedMessage) // If there are other messages, a '\n' will be added at serialization const separatorBytesCount = this.flushController.messagesCount > 1 ? 1 : 0 - this.flushController.didRemoveMessage(messageBytesCount + separatorBytesCount) + this.flushController.notifyAfterRemoveMessage(messageBytesCount + separatorBytesCount) } private hasMessageFor(key?: string): key is string { diff --git a/packages/core/src/transport/flushController.spec.ts b/packages/core/src/transport/flushController.spec.ts index 236066499e..7e839b4942 100644 --- a/packages/core/src/transport/flushController.spec.ts +++ b/packages/core/src/transport/flushController.spec.ts @@ -38,8 +38,8 @@ describe('flushController', () => { it('when flushing, the event contains a reason, the bytes count and the messages count', () => { const messagesCount = 3 for (let i = 0; i < messagesCount; i += 1) { - flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) - flushController.didAddMessage() + flushController.notifyBeforeAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyAfterAddMessage() } pageExitObservable.notify({ reason: 'before_unload' }) @@ -53,15 +53,15 @@ describe('flushController', () => { describe('page exit', () => { it('notifies when the page is exiting', () => { - flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) - flushController.didAddMessage() + flushController.notifyBeforeAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyAfterAddMessage() pageExitObservable.notify({ reason: 'before_unload' }) expect(flushSpy).toHaveBeenCalled() }) it('flush reason should be the page exit reason', () => { - flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) - flushController.didAddMessage() + flushController.notifyBeforeAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyAfterAddMessage() pageExitObservable.notify({ reason: 'before_unload' }) expect(flushSpy.calls.first().args[0].reason).toBe('before_unload') }) @@ -72,7 +72,7 @@ describe('flushController', () => { }) it('notifies when the page is exiting even if no message have been fully added yet', () => { - flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyBeforeAddMessage(SMALL_MESSAGE_BYTE_COUNT) pageExitObservable.notify({ reason: 'before_unload' }) expect(flushSpy).toHaveBeenCalled() }) @@ -80,48 +80,48 @@ describe('flushController', () => { describe('bytes limit', () => { it('notifies when the bytes limit is reached after adding a message', () => { - flushController.willAddMessage(BYTES_LIMIT) - flushController.didAddMessage() + flushController.notifyBeforeAddMessage(BYTES_LIMIT) + flushController.notifyAfterAddMessage() expect(flushSpy).toHaveBeenCalled() }) it('flush reason should be "bytes_limit"', () => { - flushController.willAddMessage(BYTES_LIMIT) - flushController.didAddMessage() + flushController.notifyBeforeAddMessage(BYTES_LIMIT) + flushController.notifyAfterAddMessage() expect(flushSpy.calls.first().args[0].reason).toBe('bytes_limit') }) it('notifies when the bytes limit will be reached before adding a message', () => { - flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) - flushController.didAddMessage() - flushController.willAddMessage(BYTES_LIMIT) + flushController.notifyBeforeAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyAfterAddMessage() + flushController.notifyBeforeAddMessage(BYTES_LIMIT) expect(flushSpy).toHaveBeenCalled() }) it('does not take removed messages into account', () => { - flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) - flushController.didAddMessage() - flushController.didRemoveMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyBeforeAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyAfterAddMessage() + flushController.notifyAfterRemoveMessage(SMALL_MESSAGE_BYTE_COUNT) - flushController.willAddMessage(BYTES_LIMIT - SMALL_MESSAGE_BYTE_COUNT) - flushController.didAddMessage() + flushController.notifyBeforeAddMessage(BYTES_LIMIT - SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyAfterAddMessage() expect(flushSpy).not.toHaveBeenCalled() }) it('does not notify when the bytes limit will be reached if no message was added yet', () => { - flushController.willAddMessage(BYTES_LIMIT) + flushController.notifyBeforeAddMessage(BYTES_LIMIT) expect(flushSpy).not.toHaveBeenCalled() }) it('resets the current bytes count once flushed', () => { - flushController.willAddMessage(BYTES_LIMIT) - flushController.didAddMessage() + flushController.notifyBeforeAddMessage(BYTES_LIMIT) + flushController.notifyAfterAddMessage() flushSpy.calls.reset() - flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) - flushController.didAddMessage() + flushController.notifyBeforeAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyAfterAddMessage() expect(flushSpy).not.toHaveBeenCalled() }) }) @@ -129,78 +129,78 @@ describe('flushController', () => { describe('messages limit', () => { it('notifies when the messages limit is reached', () => { for (let i = 0; i < MESSAGES_LIMIT; i += 1) { - flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) - flushController.didAddMessage() + flushController.notifyBeforeAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyAfterAddMessage() } expect(flushSpy).toHaveBeenCalled() }) it('flush reason should be "bytes_limit"', () => { for (let i = 0; i < MESSAGES_LIMIT; i += 1) { - flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) - flushController.didAddMessage() + flushController.notifyBeforeAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyAfterAddMessage() } expect(flushSpy.calls.first().args[0].reason).toBe('bytes_limit') }) it('does not flush when the message was not fully added yet', () => { for (let i = 0; i < MESSAGES_LIMIT - 1; i += 1) { - flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) - flushController.didAddMessage() + flushController.notifyBeforeAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyAfterAddMessage() } - flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyBeforeAddMessage(SMALL_MESSAGE_BYTE_COUNT) expect(flushSpy).not.toHaveBeenCalled() }) it('does not take removed messages into account', () => { for (let i = 0; i < MESSAGES_LIMIT - 1; i += 1) { - flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) - flushController.didAddMessage() + flushController.notifyBeforeAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyAfterAddMessage() } - flushController.didRemoveMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyAfterRemoveMessage(SMALL_MESSAGE_BYTE_COUNT) - flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) - flushController.didAddMessage() + flushController.notifyBeforeAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyAfterAddMessage() expect(flushSpy).not.toHaveBeenCalled() }) it('resets the messages count when flushed', () => { for (let i = 0; i < MESSAGES_LIMIT; i += 1) { - flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) - flushController.didAddMessage() + flushController.notifyBeforeAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyAfterAddMessage() } flushSpy.calls.reset() - flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) - flushController.didAddMessage() + flushController.notifyBeforeAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyAfterAddMessage() expect(flushSpy).not.toHaveBeenCalled() }) }) describe('duration limit', () => { it('notifies when the duration limit is reached after adding a message', () => { - flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) - flushController.didAddMessage() + flushController.notifyBeforeAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyAfterAddMessage() clock.tick(DURATION_LIMIT) expect(flushSpy).toHaveBeenCalled() }) it('flush reason should be "duration_limit"', () => { - flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) - flushController.didAddMessage() + flushController.notifyBeforeAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyAfterAddMessage() clock.tick(DURATION_LIMIT) expect(flushSpy.calls.first().args[0].reason).toBe('duration_limit') }) it('does not postpone the duration limit when another message was added', () => { - flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) - flushController.didAddMessage() + flushController.notifyBeforeAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyAfterAddMessage() clock.tick(DURATION_LIMIT / 2) - flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) - flushController.didAddMessage() + flushController.notifyBeforeAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyAfterAddMessage() clock.tick(DURATION_LIMIT / 2) expect(flushSpy).toHaveBeenCalled() }) @@ -211,32 +211,32 @@ describe('flushController', () => { }) it('does not notify if a message was added then removed', () => { - flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) - flushController.didAddMessage() - flushController.didRemoveMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyBeforeAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyAfterAddMessage() + flushController.notifyAfterRemoveMessage(SMALL_MESSAGE_BYTE_COUNT) clock.tick(DURATION_LIMIT) expect(flushSpy).not.toHaveBeenCalled() }) it('notifies if a message was added, and another was added then removed', () => { - flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) - flushController.didAddMessage() + flushController.notifyBeforeAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyAfterAddMessage() - flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) - flushController.didAddMessage() - flushController.didRemoveMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyBeforeAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyAfterAddMessage() + flushController.notifyAfterRemoveMessage(SMALL_MESSAGE_BYTE_COUNT) clock.tick(DURATION_LIMIT) expect(flushSpy).toHaveBeenCalled() }) it('does not notify prematurely if a message was added then removed then another was added', () => { - flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) - flushController.didAddMessage() - flushController.didRemoveMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyBeforeAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyAfterAddMessage() + flushController.notifyAfterRemoveMessage(SMALL_MESSAGE_BYTE_COUNT) clock.tick(DURATION_LIMIT / 2) - flushController.willAddMessage(SMALL_MESSAGE_BYTE_COUNT) - flushController.didAddMessage() + flushController.notifyBeforeAddMessage(SMALL_MESSAGE_BYTE_COUNT) + flushController.notifyAfterAddMessage() clock.tick(DURATION_LIMIT / 2) expect(flushSpy).not.toHaveBeenCalled() clock.tick(DURATION_LIMIT / 2) diff --git a/packages/core/src/transport/flushController.ts b/packages/core/src/transport/flushController.ts index 11d243effd..9d3192bac6 100644 --- a/packages/core/src/transport/flushController.ts +++ b/packages/core/src/transport/flushController.ts @@ -72,25 +72,25 @@ export function createFlushController({ return currentMessagesCount }, - willAddMessage(messageBytesCount: number) { + notifyBeforeAddMessage(messageBytesCount: number) { if (currentBytesCount + messageBytesCount >= bytesLimit) { flush('bytes_limit') } - // Consider the message to be added now rather than in `didAddMessage`, because if no message - // was added yet and `didAddMessage` is called asynchronously, we still want to notify when a + // Consider the message to be added now rather than in `notifyAfterAddMessage`, because if no message + // was added yet and `notifyAfterAddMessage` is called asynchronously, we still want to notify when a // flush is needed (for example on page exit). currentMessagesCount += 1 currentBytesCount += messageBytesCount scheduleDurationLimitTimeout() }, - didAddMessage() { + notifyAfterAddMessage() { if (currentMessagesCount >= messagesLimit || currentBytesCount >= bytesLimit) { flush('bytes_limit') } }, - didRemoveMessage(messageBytesCount: number) { + notifyAfterRemoveMessage(messageBytesCount: number) { currentBytesCount -= messageBytesCount currentMessagesCount -= 1 if (currentMessagesCount === 0) { diff --git a/packages/core/test/emulate/mockFlushController.ts b/packages/core/test/emulate/mockFlushController.ts index dac7cf23c5..f6d507be7d 100644 --- a/packages/core/test/emulate/mockFlushController.ts +++ b/packages/core/test/emulate/mockFlushController.ts @@ -9,15 +9,19 @@ export function createMockFlushController() { let currentBytesCount = 0 return { - willAddMessage: jasmine.createSpy().and.callFake((messageBytesCount) => { - currentBytesCount += messageBytesCount - currentMessagesCount += 1 - }), - didAddMessage: jasmine.createSpy(), - didRemoveMessage: jasmine.createSpy().and.callFake((messageBytesCount) => { - currentBytesCount -= messageBytesCount - currentMessagesCount -= 1 - }), + notifyBeforeAddMessage: jasmine + .createSpy() + .and.callFake((messageBytesCount) => { + currentBytesCount += messageBytesCount + currentMessagesCount += 1 + }), + notifyAfterAddMessage: jasmine.createSpy(), + notifyAfterRemoveMessage: jasmine + .createSpy() + .and.callFake((messageBytesCount) => { + currentBytesCount -= messageBytesCount + currentMessagesCount -= 1 + }), get messagesCount() { return currentMessagesCount }, From 381a4f6d1823bb9f7073c172d738b0d2a87258e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Zugmeyer?= Date: Tue, 11 Apr 2023 12:13:38 +0200 Subject: [PATCH 6/7] =?UTF-8?q?=F0=9F=91=8C=20make=20batch.flush=20private?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/core/src/transport/batch.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/core/src/transport/batch.ts b/packages/core/src/transport/batch.ts index c7bd9d5bd3..8a3a8ee428 100644 --- a/packages/core/src/transport/batch.ts +++ b/packages/core/src/transport/batch.ts @@ -27,7 +27,7 @@ export class Batch { this.addOrUpdate(message, key) } - flush(event: FlushEvent) { + private flush(event: FlushEvent) { const messages = this.pushOnlyBuffer.concat(objectValues(this.upsertBuffer)) this.pushOnlyBuffer = [] From ba4178590b7e7971fea7ab4842080a6daadbf48c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Zugmeyer?= Date: Tue, 11 Apr 2023 12:14:55 +0200 Subject: [PATCH 7/7] =?UTF-8?q?=F0=9F=91=8C=20introduce=20a=20`messages=5F?= =?UTF-8?q?limit`=20flush=20reason?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/core/src/transport/flushController.spec.ts | 4 ++-- packages/core/src/transport/flushController.ts | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/packages/core/src/transport/flushController.spec.ts b/packages/core/src/transport/flushController.spec.ts index 7e839b4942..193687ced6 100644 --- a/packages/core/src/transport/flushController.spec.ts +++ b/packages/core/src/transport/flushController.spec.ts @@ -135,12 +135,12 @@ describe('flushController', () => { expect(flushSpy).toHaveBeenCalled() }) - it('flush reason should be "bytes_limit"', () => { + it('flush reason should be "messages_limit"', () => { for (let i = 0; i < MESSAGES_LIMIT; i += 1) { flushController.notifyBeforeAddMessage(SMALL_MESSAGE_BYTE_COUNT) flushController.notifyAfterAddMessage() } - expect(flushSpy.calls.first().args[0].reason).toBe('bytes_limit') + expect(flushSpy.calls.first().args[0].reason).toBe('messages_limit') }) it('does not flush when the message was not fully added yet', () => { diff --git a/packages/core/src/transport/flushController.ts b/packages/core/src/transport/flushController.ts index 9d3192bac6..c94ca3eaa6 100644 --- a/packages/core/src/transport/flushController.ts +++ b/packages/core/src/transport/flushController.ts @@ -4,7 +4,7 @@ import type { TimeoutId } from '../tools/timer' import { clearTimeout, setTimeout } from '../tools/timer' import type { Duration } from '../tools/utils/timeUtils' -export type FlushReason = PageExitReason | 'duration_limit' | 'bytes_limit' +export type FlushReason = PageExitReason | 'duration_limit' | 'bytes_limit' | 'messages_limit' export type FlushController = ReturnType export interface FlushEvent { @@ -85,7 +85,9 @@ export function createFlushController({ }, notifyAfterAddMessage() { - if (currentMessagesCount >= messagesLimit || currentBytesCount >= bytesLimit) { + if (currentMessagesCount >= messagesLimit) { + flush('messages_limit') + } else if (currentBytesCount >= bytesLimit) { flush('bytes_limit') } },