From e6fc07b4fbcb90e2201912e7a21782c227a05c9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Zugmeyer?= Date: Tue, 5 Sep 2023 10:59:07 +0200 Subject: [PATCH 01/12] =?UTF-8?q?=F0=9F=94=8A=20[RUM-253]=20customize=20de?= =?UTF-8?q?flate=20worker=20failure=20logs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/rum/src/boot/recorderApi.ts | 1 + packages/rum/src/boot/startRecording.spec.ts | 2 +- .../src/domain/deflate/deflateWorker.spec.ts | 129 ++++++++++-------- .../rum/src/domain/deflate/deflateWorker.ts | 25 ++-- 4 files changed, 91 insertions(+), 66 deletions(-) diff --git a/packages/rum/src/boot/recorderApi.ts b/packages/rum/src/boot/recorderApi.ts index 751b4d684a..74133ea479 100644 --- a/packages/rum/src/boot/recorderApi.ts +++ b/packages/rum/src/boot/recorderApi.ts @@ -126,6 +126,7 @@ export function makeRecorderApi( const worker = startDeflateWorker( configuration, + 'Datadog Session Replay', () => { stopStrategy() }, diff --git a/packages/rum/src/boot/startRecording.spec.ts b/packages/rum/src/boot/startRecording.spec.ts index 1a2d9149db..d36fc08c89 100644 --- a/packages/rum/src/boot/startRecording.spec.ts +++ b/packages/rum/src/boot/startRecording.spec.ts @@ -42,7 +42,7 @@ describe('startRecording', () => { textField = document.createElement('input') sandbox.appendChild(textField) - const worker = startDeflateWorker(configuration, noop)! + const worker = startDeflateWorker(configuration, 'Datadog Session Replay', noop)! setupBuilder = setup() .withViewContexts({ diff --git a/packages/rum/src/domain/deflate/deflateWorker.spec.ts b/packages/rum/src/domain/deflate/deflateWorker.spec.ts index a99a1281cc..a881352f3d 100644 --- a/packages/rum/src/domain/deflate/deflateWorker.spec.ts +++ b/packages/rum/src/domain/deflate/deflateWorker.spec.ts @@ -1,5 +1,5 @@ import type { RawTelemetryEvent } from '@datadog/browser-core' -import { display, isIE, noop, resetTelemetry, startFakeTelemetry } from '@datadog/browser-core' +import { display, isIE, resetTelemetry, startFakeTelemetry } from '@datadog/browser-core' import type { RumConfiguration } from '@datadog/browser-rum-core' import type { Clock } from '@datadog/browser-core/test' import { mockClock } from '@datadog/browser-core/test' @@ -14,10 +14,23 @@ describe('startDeflateWorker', () => { let mockWorker: MockWorker let createDeflateWorkerSpy: jasmine.Spy let onInitializationFailureSpy: jasmine.Spy<() => void> - let configuration: RumConfiguration + + function startDeflateWorkerWithDefaults({ + configuration = {}, + source = 'Datadog Session Replay', + }: { + configuration?: Partial + source?: string + } = {}) { + return startDeflateWorker( + configuration as RumConfiguration, + source, + onInitializationFailureSpy, + createDeflateWorkerSpy + ) + } beforeEach(() => { - configuration = {} as RumConfiguration mockWorker = new MockWorker() onInitializationFailureSpy = jasmine.createSpy('onInitializationFailureSpy') createDeflateWorkerSpy = jasmine.createSpy('createDeflateWorkerSpy').and.callFake(() => mockWorker) @@ -28,7 +41,7 @@ describe('startDeflateWorker', () => { }) it('creates a deflate worker', () => { - const worker = startDeflateWorker(configuration, onInitializationFailureSpy, createDeflateWorkerSpy) + const worker = startDeflateWorkerWithDefaults() expect(createDeflateWorkerSpy).toHaveBeenCalledTimes(1) expect(worker).toBe(mockWorker) @@ -37,17 +50,17 @@ describe('startDeflateWorker', () => { }) it('uses the previously created worker during loading', () => { - const worker1 = startDeflateWorker(configuration, noop, createDeflateWorkerSpy) - const worker2 = startDeflateWorker(configuration, noop, createDeflateWorkerSpy) + const worker1 = startDeflateWorkerWithDefaults() + const worker2 = startDeflateWorkerWithDefaults() expect(createDeflateWorkerSpy).toHaveBeenCalledTimes(1) expect(worker1).toBe(worker2) }) it('uses the previously created worker once initialized', () => { - const worker1 = startDeflateWorker(configuration, noop, createDeflateWorkerSpy) + const worker1 = startDeflateWorkerWithDefaults() mockWorker.processAllMessages() - const worker2 = startDeflateWorker(configuration, onInitializationFailureSpy, createDeflateWorkerSpy) + const worker2 = startDeflateWorkerWithDefaults() expect(createDeflateWorkerSpy).toHaveBeenCalledTimes(1) expect(worker1).toBe(worker2) @@ -60,13 +73,11 @@ describe('startDeflateWorker', () => { // mimic Chrome behavior let CSP_ERROR: DOMException let displaySpy: jasmine.Spy - let configuration: RumConfiguration beforeEach(() => { if (isIE()) { pending('IE does not support CSP blocking worker creation') } - configuration = {} as RumConfiguration displaySpy = spyOn(display, 'error') telemetryEvents = startFakeTelemetry() CSP_ERROR = new DOMException( @@ -80,40 +91,37 @@ describe('startDeflateWorker', () => { describe('Chrome and Safari behavior: exception during worker creation', () => { it('returns undefined when the worker creation throws an exception', () => { - const worker = startDeflateWorker(configuration, noop, () => { - throw CSP_ERROR - }) + createDeflateWorkerSpy.and.throwError(CSP_ERROR) + const worker = startDeflateWorkerWithDefaults() expect(worker).toBeUndefined() }) it('displays CSP instructions when the worker creation throws a CSP error', () => { - startDeflateWorker(configuration, noop, () => { - throw CSP_ERROR - }) + createDeflateWorkerSpy.and.throwError(CSP_ERROR) + startDeflateWorkerWithDefaults() expect(displaySpy).toHaveBeenCalledWith( jasmine.stringContaining('Please make sure CSP is correctly configured') ) }) it('does not report CSP errors to telemetry', () => { - startDeflateWorker(configuration, noop, () => { - throw CSP_ERROR - }) + createDeflateWorkerSpy.and.throwError(CSP_ERROR) + startDeflateWorkerWithDefaults() expect(telemetryEvents).toEqual([]) }) it('does not try to create a worker again after the creation failed', () => { - startDeflateWorker(configuration, noop, () => { - throw CSP_ERROR - }) - startDeflateWorker(configuration, noop, createDeflateWorkerSpy) + createDeflateWorkerSpy.and.throwError(CSP_ERROR) + startDeflateWorkerWithDefaults() + createDeflateWorkerSpy.calls.reset() + startDeflateWorkerWithDefaults() expect(createDeflateWorkerSpy).not.toHaveBeenCalled() }) }) describe('Firefox behavior: error during worker loading', () => { it('displays ErrorEvent as CSP error', () => { - startDeflateWorker(configuration, noop, createDeflateWorkerSpy) + startDeflateWorkerWithDefaults() mockWorker.dispatchErrorEvent() expect(displaySpy).toHaveBeenCalledWith( jasmine.stringContaining('Please make sure CSP is correctly configured') @@ -121,24 +129,28 @@ describe('startDeflateWorker', () => { }) it('calls the initialization failure callback when of an error occurs during loading', () => { - startDeflateWorker(configuration, onInitializationFailureSpy, createDeflateWorkerSpy) + startDeflateWorkerWithDefaults() mockWorker.dispatchErrorEvent() expect(onInitializationFailureSpy).toHaveBeenCalledTimes(1) }) it('returns undefined if an error occurred in a previous loading', () => { - startDeflateWorker(configuration, noop, createDeflateWorkerSpy) + startDeflateWorkerWithDefaults() mockWorker.dispatchErrorEvent() + onInitializationFailureSpy.calls.reset() - const worker = startDeflateWorker(configuration, onInitializationFailureSpy, createDeflateWorkerSpy) + const worker = startDeflateWorkerWithDefaults() expect(worker).toBeUndefined() expect(onInitializationFailureSpy).not.toHaveBeenCalled() }) it('adjusts the error message when a workerUrl is set', () => { - configuration.workerUrl = '/worker.js' - startDeflateWorker(configuration, noop, createDeflateWorkerSpy) + startDeflateWorkerWithDefaults({ + configuration: { + workerUrl: '/worker.js', + }, + }) mockWorker.dispatchErrorEvent() expect(displaySpy).toHaveBeenCalledWith( jasmine.stringContaining( @@ -148,25 +160,25 @@ describe('startDeflateWorker', () => { }) it('calls all registered callbacks when the worker initialization fails', () => { - const onInitializationFailureSpy1 = jasmine.createSpy() - const onInitializationFailureSpy2 = jasmine.createSpy() - startDeflateWorker(configuration, onInitializationFailureSpy1, createDeflateWorkerSpy) - startDeflateWorker(configuration, onInitializationFailureSpy2, createDeflateWorkerSpy) + startDeflateWorkerWithDefaults() + startDeflateWorkerWithDefaults() mockWorker.dispatchErrorEvent() - expect(onInitializationFailureSpy1).toHaveBeenCalledTimes(1) - expect(onInitializationFailureSpy2).toHaveBeenCalledTimes(1) + expect(onInitializationFailureSpy).toHaveBeenCalledTimes(2) }) }) }) describe('initialization timeout', () => { let displaySpy: jasmine.Spy - let configuration: RumConfiguration let clock: Clock beforeEach(() => { - configuration = {} as RumConfiguration displaySpy = spyOn(display, 'error') + createDeflateWorkerSpy.and.callFake( + () => + // Creates a worker that does nothing + new Worker(URL.createObjectURL(new Blob(['']))) + ) clock = mockClock() }) @@ -175,16 +187,18 @@ describe('startDeflateWorker', () => { }) it('displays an error message when the worker does not respond to the init action', () => { - startDeflateWorker( - configuration, - noop, - () => - // Creates a worker that does nothing - new Worker(URL.createObjectURL(new Blob(['']))) + startDeflateWorkerWithDefaults() + clock.tick(INITIALIZATION_TIME_OUT_DELAY) + expect(displaySpy).toHaveBeenCalledOnceWith( + 'Datadog Session Replay failed to start: a timeout occurred while initializing the Worker' ) + }) + + it('displays a customized error message', () => { + startDeflateWorkerWithDefaults({ source: 'Foo' }) clock.tick(INITIALIZATION_TIME_OUT_DELAY) expect(displaySpy).toHaveBeenCalledOnceWith( - 'Session Replay recording failed to start: a timeout occurred while initializing the Worker' + 'Foo failed to start: a timeout occurred while initializing the Worker' ) }) }) @@ -193,10 +207,8 @@ describe('startDeflateWorker', () => { let telemetryEvents: RawTelemetryEvent[] const UNKNOWN_ERROR = new Error('boom') let displaySpy: jasmine.Spy - let configuration: RumConfiguration beforeEach(() => { - configuration = {} as RumConfiguration displaySpy = spyOn(display, 'error') telemetryEvents = startFakeTelemetry() }) @@ -206,19 +218,26 @@ describe('startDeflateWorker', () => { }) it('displays an error message when the worker creation throws an unknown error', () => { - startDeflateWorker(configuration, noop, () => { - throw UNKNOWN_ERROR - }) + createDeflateWorkerSpy.and.throwError(UNKNOWN_ERROR) + startDeflateWorkerWithDefaults() expect(displaySpy).toHaveBeenCalledOnceWith( - 'Session Replay recording failed to start: an error occurred while creating the Worker:', + 'Datadog Session Replay failed to start: an error occurred while creating the Worker:', + UNKNOWN_ERROR + ) + }) + + it('displays a customized error message', () => { + createDeflateWorkerSpy.and.throwError(UNKNOWN_ERROR) + startDeflateWorkerWithDefaults({ source: 'Foo' }) + expect(displaySpy).toHaveBeenCalledOnceWith( + 'Foo failed to start: an error occurred while creating the Worker:', UNKNOWN_ERROR ) }) it('reports unknown errors to telemetry', () => { - startDeflateWorker(configuration, noop, () => { - throw UNKNOWN_ERROR - }) + createDeflateWorkerSpy.and.throwError(UNKNOWN_ERROR) + startDeflateWorkerWithDefaults() expect(telemetryEvents).toEqual([ { type: 'log', @@ -230,13 +249,13 @@ describe('startDeflateWorker', () => { }) it('does not display error messages as CSP error', () => { - startDeflateWorker(configuration, noop, createDeflateWorkerSpy) + startDeflateWorkerWithDefaults() mockWorker.dispatchErrorMessage('foo') expect(displaySpy).not.toHaveBeenCalledWith(jasmine.stringContaining('CSP')) }) it('reports errors occurring after loading to telemetry', () => { - startDeflateWorker(configuration, noop, createDeflateWorkerSpy) + startDeflateWorkerWithDefaults() mockWorker.processAllMessages() mockWorker.dispatchErrorMessage('boom', TEST_STREAM_ID) diff --git a/packages/rum/src/domain/deflate/deflateWorker.ts b/packages/rum/src/domain/deflate/deflateWorker.ts index 5a3bc437d2..17cd506882 100644 --- a/packages/rum/src/domain/deflate/deflateWorker.ts +++ b/packages/rum/src/domain/deflate/deflateWorker.ts @@ -50,12 +50,13 @@ let state: DeflateWorkerState = { status: DeflateWorkerStatus.Nil } export function startDeflateWorker( configuration: RumConfiguration, + source: string, onInitializationFailure: () => void, createDeflateWorkerImpl = createDeflateWorker ) { if (state.status === DeflateWorkerStatus.Nil) { // doStartDeflateWorker updates the state to "loading" or "error" - doStartDeflateWorker(configuration, createDeflateWorkerImpl) + doStartDeflateWorker(configuration, source, createDeflateWorkerImpl) } switch (state.status) { @@ -84,30 +85,34 @@ export function getDeflateWorkerStatus() { * * more details: https://bugzilla.mozilla.org/show_bug.cgi?id=1736865#c2 */ -export function doStartDeflateWorker(configuration: RumConfiguration, createDeflateWorkerImpl = createDeflateWorker) { +export function doStartDeflateWorker( + configuration: RumConfiguration, + source: string, + createDeflateWorkerImpl = createDeflateWorker +) { try { const worker = createDeflateWorkerImpl(configuration) addEventListener(configuration, worker, 'error', (error) => { - onError(configuration, error) + onError(configuration, source, error) }) addEventListener(configuration, worker, 'message', ({ data }: MessageEvent) => { if (data.type === 'errored') { - onError(configuration, data.error, data.streamId) + onError(configuration, source, data.error, data.streamId) } else if (data.type === 'initialized') { onInitialized(data.version) } }) worker.postMessage({ action: 'init' }) - setTimeout(onTimeout, INITIALIZATION_TIME_OUT_DELAY) + setTimeout(() => onTimeout(source), INITIALIZATION_TIME_OUT_DELAY) state = { status: DeflateWorkerStatus.Loading, worker, initializationFailureCallbacks: [] } } catch (error) { - onError(configuration, error) + onError(configuration, source, error) } } -function onTimeout() { +function onTimeout(source: string) { if (state.status === DeflateWorkerStatus.Loading) { - display.error('Session Replay recording failed to start: a timeout occurred while initializing the Worker') + display.error(`${source} failed to start: a timeout occurred while initializing the Worker`) state.initializationFailureCallbacks.forEach((callback) => callback()) state = { status: DeflateWorkerStatus.Error } } @@ -119,9 +124,9 @@ function onInitialized(version: string) { } } -function onError(configuration: RumConfiguration, error: unknown, streamId?: number) { +function onError(configuration: RumConfiguration, source: string, error: unknown, streamId?: number) { if (state.status === DeflateWorkerStatus.Loading || state.status === DeflateWorkerStatus.Nil) { - display.error('Session Replay recording failed to start: an error occurred while creating the Worker:', error) + display.error(`${source} failed to start: an error occurred while creating the Worker:`, error) if (error instanceof Event || (error instanceof Error && isMessageCspRelated(error.message))) { let baseMessage if (configuration.workerUrl) { From 8975b9ce021abe618596ca280b9f19829f44d24e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Zugmeyer?= Date: Thu, 10 Aug 2023 18:01:16 +0200 Subject: [PATCH 02/12] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20[RUM-253]=20use=20pa?= =?UTF-8?q?yload=20object=20when=20building=20URLs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In a next commit, the endpointBuilder will conditionally add a `dd-evp-encoding` URL parameter based on another payload property --- .../configuration/endpointBuilder.spec.ts | 78 +++++++++++++------ .../domain/configuration/endpointBuilder.ts | 16 +--- .../transportConfiguration.spec.ts | 45 +++++++---- packages/core/src/transport/httpRequest.ts | 28 +++---- packages/core/test/requests.ts | 2 +- packages/logs/src/boot/startLogs.spec.ts | 4 +- .../src/domain/requestCollection.spec.ts | 10 ++- 7 files changed, 114 insertions(+), 69 deletions(-) diff --git a/packages/core/src/domain/configuration/endpointBuilder.spec.ts b/packages/core/src/domain/configuration/endpointBuilder.spec.ts index 53ac1efbac..ff05d688a8 100644 --- a/packages/core/src/domain/configuration/endpointBuilder.spec.ts +++ b/packages/core/src/domain/configuration/endpointBuilder.spec.ts @@ -5,9 +5,12 @@ import { addExperimentalFeatures, } from '../../tools/experimentalFeatures' import { startsWith } from '../../tools/utils/polyfills' +import type { Payload } from '../../transport' import type { InitConfiguration } from './configuration' import { createEndpointBuilder } from './endpointBuilder' +const DEFAULT_PAYLOAD = {} as Payload + describe('endpointBuilder', () => { const clientToken = 'some_client_token' let initConfiguration: InitConfiguration @@ -20,23 +23,30 @@ describe('endpointBuilder', () => { describe('query parameters', () => { it('should add intake query parameters', () => { - expect(createEndpointBuilder(initConfiguration, 'rum', []).build('xhr')).toMatch( + expect(createEndpointBuilder(initConfiguration, 'rum', []).build('xhr', DEFAULT_PAYLOAD)).toMatch( `&dd-api-key=${clientToken}&dd-evp-origin-version=(.*)&dd-evp-origin=browser&dd-request-id=(.*)` ) }) it('should add batch_time for rum endpoint', () => { - expect(createEndpointBuilder(initConfiguration, 'rum', []).build('xhr')).toContain('&batch_time=') + expect(createEndpointBuilder(initConfiguration, 'rum', []).build('xhr', DEFAULT_PAYLOAD)).toContain( + '&batch_time=' + ) }) it('should not add batch_time for logs and replay endpoints', () => { - expect(createEndpointBuilder(initConfiguration, 'logs', []).build('xhr')).not.toContain('&batch_time=') - expect(createEndpointBuilder(initConfiguration, 'sessionReplay', []).build('xhr')).not.toContain('&batch_time=') + expect(createEndpointBuilder(initConfiguration, 'logs', []).build('xhr', DEFAULT_PAYLOAD)).not.toContain( + '&batch_time=' + ) + expect(createEndpointBuilder(initConfiguration, 'sessionReplay', []).build('xhr', DEFAULT_PAYLOAD)).not.toContain( + '&batch_time=' + ) }) it('should not start with ddsource for internal analytics mode', () => { const url = createEndpointBuilder({ ...initConfiguration, internalAnalyticsSubdomain: 'foo' }, 'rum', []).build( - 'xhr' + 'xhr', + DEFAULT_PAYLOAD ) expect(url).not.toContain('/rum?ddsource') expect(url).toContain('ddsource=browser') @@ -46,7 +56,10 @@ describe('endpointBuilder', () => { describe('proxy configuration', () => { it('should replace the intake endpoint by the proxy and set the intake path and parameters in the attribute ddforward', () => { expect( - createEndpointBuilder({ ...initConfiguration, proxy: 'https://proxy.io/path' }, 'rum', []).build('xhr') + createEndpointBuilder({ ...initConfiguration, proxy: 'https://proxy.io/path' }, 'rum', []).build( + 'xhr', + DEFAULT_PAYLOAD + ) ).toMatch( `https://proxy.io/path\\?ddforward=${encodeURIComponent( `/api/v2/rum?ddsource=(.*)&ddtags=(.*)&dd-api-key=${clientToken}` + @@ -58,7 +71,7 @@ describe('endpointBuilder', () => { it('normalizes the proxy url', () => { expect( startsWith( - createEndpointBuilder({ ...initConfiguration, proxy: '/path' }, 'rum', []).build('xhr'), + createEndpointBuilder({ ...initConfiguration, proxy: '/path' }, 'rum', []).build('xhr', DEFAULT_PAYLOAD), `${location.origin}/path?ddforward` ) ).toBeTrue() @@ -70,7 +83,7 @@ describe('endpointBuilder', () => { { ...initConfiguration, proxy: 'https://proxy.io/path', proxyUrl: 'https://legacy-proxy.io/path' }, 'rum', [] - ).build('xhr') + ).build('xhr', DEFAULT_PAYLOAD) ).toMatch(/^https:\/\/proxy.io\/path\?/) expect( @@ -78,7 +91,7 @@ describe('endpointBuilder', () => { { ...initConfiguration, proxy: false as any, proxyUrl: 'https://legacy-proxy.io/path' }, 'rum', [] - ).build('xhr') + ).build('xhr', DEFAULT_PAYLOAD) ).toMatch(/^https:\/\/rum.browser-intake-datadoghq.com\//) }) }) @@ -86,7 +99,10 @@ describe('endpointBuilder', () => { describe('deprecated proxyUrl configuration', () => { it('should replace the full intake endpoint by the proxyUrl and set it in the attribute ddforward', () => { expect( - createEndpointBuilder({ ...initConfiguration, proxyUrl: 'https://proxy.io/path' }, 'rum', []).build('xhr') + createEndpointBuilder({ ...initConfiguration, proxyUrl: 'https://proxy.io/path' }, 'rum', []).build( + 'xhr', + DEFAULT_PAYLOAD + ) ).toMatch( `https://proxy.io/path\\?ddforward=${encodeURIComponent( `https://rum.browser-intake-datadoghq.com/api/v2/rum?ddsource=(.*)&ddtags=(.*)&dd-api-key=${clientToken}` + @@ -98,7 +114,7 @@ describe('endpointBuilder', () => { it('normalizes the proxy url', () => { expect( startsWith( - createEndpointBuilder({ ...initConfiguration, proxyUrl: '/path' }, 'rum', []).build('xhr'), + createEndpointBuilder({ ...initConfiguration, proxyUrl: '/path' }, 'rum', []).build('xhr', DEFAULT_PAYLOAD), `${location.origin}/path?ddforward` ) ).toBeTrue() @@ -107,39 +123,53 @@ describe('endpointBuilder', () => { describe('tags', () => { it('should contain sdk version', () => { - expect(createEndpointBuilder(initConfiguration, 'rum', []).build('xhr')).toContain('sdk_version%3Asome_version') + expect(createEndpointBuilder(initConfiguration, 'rum', []).build('xhr', DEFAULT_PAYLOAD)).toContain( + 'sdk_version%3Asome_version' + ) }) it('should contain api', () => { - expect(createEndpointBuilder(initConfiguration, 'rum', []).build('xhr')).toContain('api%3Axhr') + expect(createEndpointBuilder(initConfiguration, 'rum', []).build('xhr', DEFAULT_PAYLOAD)).toContain('api%3Axhr') }) it('should be encoded', () => { expect( - createEndpointBuilder(initConfiguration, 'rum', ['service:bar:foo', 'datacenter:us1.prod.dog']).build('xhr') + createEndpointBuilder(initConfiguration, 'rum', ['service:bar:foo', 'datacenter:us1.prod.dog']).build( + 'xhr', + DEFAULT_PAYLOAD + ) ).toContain('service%3Abar%3Afoo%2Cdatacenter%3Aus1.prod.dog') }) it('should contain retry infos', () => { expect( - createEndpointBuilder(initConfiguration, 'rum', []).build('xhr', 'bytes_limit', { - count: 5, - lastFailureStatus: 408, + createEndpointBuilder(initConfiguration, 'rum', []).build('xhr', { + ...DEFAULT_PAYLOAD, + retry: { + count: 5, + lastFailureStatus: 408, + }, }) ).toContain('retry_count%3A5%2Cretry_after%3A408') }) it('should contain flush reason when ff collect_flush_reason is enabled', () => { addExperimentalFeatures([ExperimentalFeature.COLLECT_FLUSH_REASON]) - expect(createEndpointBuilder(initConfiguration, 'rum', []).build('xhr', 'bytes_limit')).toContain( - 'flush_reason%3Abytes_limit' - ) + expect( + createEndpointBuilder(initConfiguration, 'rum', []).build('xhr', { + ...DEFAULT_PAYLOAD, + flushReason: 'bytes_limit', + }) + ).toContain('flush_reason%3Abytes_limit') }) - it('should not contain flush reason when ff collect_flush_reason is disnabled', () => { - expect(createEndpointBuilder(initConfiguration, 'rum', []).build('xhr', 'bytes_limit')).not.toContain( - 'flush_reason' - ) + it('should not contain flush reason when ff collect_flush_reason is disabled', () => { + expect( + createEndpointBuilder(initConfiguration, 'rum', []).build('xhr', { + ...DEFAULT_PAYLOAD, + flushReason: 'bytes_limit', + }) + ).not.toContain('flush_reason') }) }) }) diff --git a/packages/core/src/domain/configuration/endpointBuilder.ts b/packages/core/src/domain/configuration/endpointBuilder.ts index f7676c77bf..bfa75596de 100644 --- a/packages/core/src/domain/configuration/endpointBuilder.ts +++ b/packages/core/src/domain/configuration/endpointBuilder.ts @@ -1,4 +1,4 @@ -import type { RetryInfo, FlushReason } from '../../transport' +import type { Payload } from '../../transport' import { timeStampNow } from '../../tools/utils/timeUtils' import { normalizeUrl } from '../../tools/utils/urlPolyfill' import { ExperimentalFeature, isExperimentalFeatureEnabled } from '../../tools/experimentalFeatures' @@ -33,15 +33,8 @@ export function createEndpointBuilder( const buildUrlWithParameters = createEndpointUrlWithParametersBuilder(initConfiguration, endpointType) return { - build(api: 'xhr' | 'fetch' | 'beacon', flushReason?: FlushReason, retry?: RetryInfo) { - const parameters = buildEndpointParameters( - initConfiguration, - endpointType, - configurationTags, - api, - flushReason, - retry - ) + build(api: 'xhr' | 'fetch' | 'beacon', payload: Payload) { + const parameters = buildEndpointParameters(initConfiguration, endpointType, configurationTags, api, payload) return buildUrlWithParameters(parameters) }, urlPrefix: buildUrlWithParameters(''), @@ -100,8 +93,7 @@ function buildEndpointParameters( endpointType: EndpointType, configurationTags: string[], api: 'xhr' | 'fetch' | 'beacon', - flushReason: FlushReason | undefined, - retry: RetryInfo | undefined + { retry, flushReason }: Payload ) { const tags = [`sdk_version:${__BUILD_ENV__SDK_VERSION__}`, `api:${api}`].concat(configurationTags) if (flushReason && isExperimentalFeatureEnabled(ExperimentalFeature.COLLECT_FLUSH_REASON)) { diff --git a/packages/core/src/domain/configuration/transportConfiguration.spec.ts b/packages/core/src/domain/configuration/transportConfiguration.spec.ts index a33da4f3b6..f9236a5c7c 100644 --- a/packages/core/src/domain/configuration/transportConfiguration.spec.ts +++ b/packages/core/src/domain/configuration/transportConfiguration.spec.ts @@ -1,18 +1,21 @@ +import type { Payload } from '../../transport' import { computeTransportConfiguration } from './transportConfiguration' +const DEFAULT_PAYLOAD = {} as Payload + describe('transportConfiguration', () => { const clientToken = 'some_client_token' const internalAnalyticsSubdomain = 'ia-rum-intake' describe('site', () => { it('should use US site by default', () => { const configuration = computeTransportConfiguration({ clientToken }) - expect(configuration.rumEndpointBuilder.build('xhr')).toContain('datadoghq.com') + expect(configuration.rumEndpointBuilder.build('xhr', DEFAULT_PAYLOAD)).toContain('datadoghq.com') expect(configuration.site).toBe('datadoghq.com') }) it('should use site value when set', () => { const configuration = computeTransportConfiguration({ clientToken, site: 'foo.com' }) - expect(configuration.rumEndpointBuilder.build('xhr')).toContain('foo.com') + expect(configuration.rumEndpointBuilder.build('xhr', DEFAULT_PAYLOAD)).toContain('foo.com') expect(configuration.site).toBe('foo.com') }) }) @@ -23,7 +26,7 @@ describe('transportConfiguration', () => { clientToken, internalAnalyticsSubdomain, }) - expect(configuration.rumEndpointBuilder.build('xhr')).toContain(internalAnalyticsSubdomain) + expect(configuration.rumEndpointBuilder.build('xhr', DEFAULT_PAYLOAD)).toContain(internalAnalyticsSubdomain) }) it('should not use internal analytics subdomain value when set for other sites', () => { @@ -32,30 +35,42 @@ describe('transportConfiguration', () => { site: 'foo.bar', internalAnalyticsSubdomain, }) - expect(configuration.rumEndpointBuilder.build('xhr')).not.toContain(internalAnalyticsSubdomain) + expect(configuration.rumEndpointBuilder.build('xhr', DEFAULT_PAYLOAD)).not.toContain(internalAnalyticsSubdomain) }) }) describe('sdk_version, env, version and service', () => { it('should not modify the logs and rum endpoints tags when not defined', () => { const configuration = computeTransportConfiguration({ clientToken }) - expect(decodeURIComponent(configuration.rumEndpointBuilder.build('xhr'))).not.toContain(',env:') - expect(decodeURIComponent(configuration.rumEndpointBuilder.build('xhr'))).not.toContain(',service:') - expect(decodeURIComponent(configuration.rumEndpointBuilder.build('xhr'))).not.toContain(',version:') - expect(decodeURIComponent(configuration.rumEndpointBuilder.build('xhr'))).not.toContain(',datacenter:') - - expect(decodeURIComponent(configuration.logsEndpointBuilder.build('xhr'))).not.toContain(',env:') - expect(decodeURIComponent(configuration.logsEndpointBuilder.build('xhr'))).not.toContain(',service:') - expect(decodeURIComponent(configuration.logsEndpointBuilder.build('xhr'))).not.toContain(',version:') - expect(decodeURIComponent(configuration.logsEndpointBuilder.build('xhr'))).not.toContain(',datacenter:') + expect(decodeURIComponent(configuration.rumEndpointBuilder.build('xhr', DEFAULT_PAYLOAD))).not.toContain(',env:') + expect(decodeURIComponent(configuration.rumEndpointBuilder.build('xhr', DEFAULT_PAYLOAD))).not.toContain( + ',service:' + ) + expect(decodeURIComponent(configuration.rumEndpointBuilder.build('xhr', DEFAULT_PAYLOAD))).not.toContain( + ',version:' + ) + expect(decodeURIComponent(configuration.rumEndpointBuilder.build('xhr', DEFAULT_PAYLOAD))).not.toContain( + ',datacenter:' + ) + + expect(decodeURIComponent(configuration.logsEndpointBuilder.build('xhr', DEFAULT_PAYLOAD))).not.toContain(',env:') + expect(decodeURIComponent(configuration.logsEndpointBuilder.build('xhr', DEFAULT_PAYLOAD))).not.toContain( + ',service:' + ) + expect(decodeURIComponent(configuration.logsEndpointBuilder.build('xhr', DEFAULT_PAYLOAD))).not.toContain( + ',version:' + ) + expect(decodeURIComponent(configuration.logsEndpointBuilder.build('xhr', DEFAULT_PAYLOAD))).not.toContain( + ',datacenter:' + ) }) it('should be set as tags in the logs and rum endpoints', () => { const configuration = computeTransportConfiguration({ clientToken, env: 'foo', service: 'bar', version: 'baz' }) - expect(decodeURIComponent(configuration.rumEndpointBuilder.build('xhr'))).toContain( + expect(decodeURIComponent(configuration.rumEndpointBuilder.build('xhr', DEFAULT_PAYLOAD))).toContain( 'env:foo,service:bar,version:baz' ) - expect(decodeURIComponent(configuration.logsEndpointBuilder.build('xhr'))).toContain( + expect(decodeURIComponent(configuration.logsEndpointBuilder.build('xhr', DEFAULT_PAYLOAD))).toContain( 'env:foo,service:bar,version:baz' ) }) diff --git a/packages/core/src/transport/httpRequest.ts b/packages/core/src/transport/httpRequest.ts index 9ef2a4e326..f340dd1290 100644 --- a/packages/core/src/transport/httpRequest.ts +++ b/packages/core/src/transport/httpRequest.ts @@ -63,13 +63,13 @@ function sendBeaconStrategy( configuration: Configuration, endpointBuilder: EndpointBuilder, bytesLimit: number, - { data, bytesCount, flushReason }: Payload + payload: Payload ) { - const canUseBeacon = !!navigator.sendBeacon && bytesCount < bytesLimit + const canUseBeacon = !!navigator.sendBeacon && payload.bytesCount < bytesLimit if (canUseBeacon) { try { - const beaconUrl = endpointBuilder.build('beacon', flushReason) - const isQueued = navigator.sendBeacon(beaconUrl, data) + const beaconUrl = endpointBuilder.build('beacon', payload) + const isQueued = navigator.sendBeacon(beaconUrl, payload.data) if (isQueued) { return @@ -79,8 +79,8 @@ function sendBeaconStrategy( } } - const xhrUrl = endpointBuilder.build('xhr', flushReason) - sendXHR(configuration, xhrUrl, data) + const xhrUrl = endpointBuilder.build('xhr', payload) + sendXHR(configuration, xhrUrl, payload.data) } let hasReportedBeaconError = false @@ -96,23 +96,23 @@ export function fetchKeepAliveStrategy( configuration: Configuration, endpointBuilder: EndpointBuilder, bytesLimit: number, - { data, bytesCount, flushReason, retry }: Payload, + payload: Payload, onResponse?: (r: HttpResponse) => void ) { - const canUseKeepAlive = isKeepAliveSupported() && bytesCount < bytesLimit + const canUseKeepAlive = isKeepAliveSupported() && payload.bytesCount < bytesLimit if (canUseKeepAlive) { - const fetchUrl = endpointBuilder.build('fetch', flushReason, retry) - fetch(fetchUrl, { method: 'POST', body: data, keepalive: true, mode: 'cors' }).then( + const fetchUrl = endpointBuilder.build('fetch', payload) + fetch(fetchUrl, { method: 'POST', body: payload.data, keepalive: true, mode: 'cors' }).then( monitor((response: Response) => onResponse?.({ status: response.status, type: response.type })), monitor(() => { - const xhrUrl = endpointBuilder.build('xhr', flushReason, retry) + const xhrUrl = endpointBuilder.build('xhr', payload) // failed to queue the request - sendXHR(configuration, xhrUrl, data, onResponse) + sendXHR(configuration, xhrUrl, payload.data, onResponse) }) ) } else { - const xhrUrl = endpointBuilder.build('xhr', flushReason, retry) - sendXHR(configuration, xhrUrl, data, onResponse) + const xhrUrl = endpointBuilder.build('xhr', payload) + sendXHR(configuration, xhrUrl, payload.data, onResponse) } } diff --git a/packages/core/test/requests.ts b/packages/core/test/requests.ts index e72ab147c1..4458987a62 100644 --- a/packages/core/test/requests.ts +++ b/packages/core/test/requests.ts @@ -13,7 +13,7 @@ export const SPEC_ENDPOINTS = { } export function stubEndpointBuilder(url: string) { - return { build: (_: any) => url } as EndpointBuilder + return { build: (..._: any) => url } as EndpointBuilder } export interface Request { diff --git a/packages/logs/src/boot/startLogs.spec.ts b/packages/logs/src/boot/startLogs.spec.ts index cca957bc5c..526d8e49f3 100644 --- a/packages/logs/src/boot/startLogs.spec.ts +++ b/packages/logs/src/boot/startLogs.spec.ts @@ -1,3 +1,4 @@ +import type { Payload } from '@datadog/browser-core' import { ErrorSource, display, stopSessionManager, getCookie, SESSION_STORE_KEY } from '@datadog/browser-core' import type { Request } from '@datadog/browser-core/test' import { @@ -36,6 +37,7 @@ const COMMON_CONTEXT = { context: {}, user: {}, } +const DEFAULT_PAYLOAD = {} as Payload describe('logs', () => { const initConfiguration = { clientToken: 'xxx', service: 'service', telemetrySampleRate: 0 } @@ -74,7 +76,7 @@ describe('logs', () => { handleLog({ message: 'message', status: StatusType.warn, context: { foo: 'bar' } }, logger, COMMON_CONTEXT) expect(requests.length).toEqual(1) - expect(requests[0].url).toContain(baseConfiguration.logsEndpointBuilder.build('xhr')) + expect(requests[0].url).toContain(baseConfiguration.logsEndpointBuilder.build('xhr', DEFAULT_PAYLOAD)) expect(getLoggedMessage(requests, 0)).toEqual({ date: jasmine.any(Number), foo: 'bar', diff --git a/packages/rum-core/src/domain/requestCollection.spec.ts b/packages/rum-core/src/domain/requestCollection.spec.ts index d3713f406c..ecb46b6223 100644 --- a/packages/rum-core/src/domain/requestCollection.spec.ts +++ b/packages/rum-core/src/domain/requestCollection.spec.ts @@ -1,3 +1,4 @@ +import type { Payload } from '@datadog/browser-core' import { isIE, RequestType } from '@datadog/browser-core' import type { FetchStub, FetchStubManager } from '@datadog/browser-core/test' import { SPEC_ENDPOINTS, stubFetch, stubXhr, withXhr } from '@datadog/browser-core/test' @@ -9,6 +10,8 @@ import { trackFetch, trackXhr } from './requestCollection' import type { Tracer } from './tracing/tracer' import { clearTracingIfNeeded, TraceIdentifier } from './tracing/tracer' +const DEFAULT_PAYLOAD = {} as Payload + describe('collect fetch', () => { let configuration: RumConfiguration const FAKE_URL = 'http://fake-url/' @@ -133,7 +136,10 @@ describe('collect fetch', () => { }) it('should ignore intake requests', (done) => { - fetchStub(SPEC_ENDPOINTS.rumEndpointBuilder.build('xhr')).resolveWith({ status: 200, responseText: 'foo' }) + fetchStub(SPEC_ENDPOINTS.rumEndpointBuilder.build('xhr', DEFAULT_PAYLOAD)).resolveWith({ + status: 200, + responseText: 'foo', + }) fetchStubManager.whenAllComplete(() => { expect(startSpy).not.toHaveBeenCalled() @@ -272,7 +278,7 @@ describe('collect xhr', () => { it('should ignore intake requests', (done) => { withXhr({ setup(xhr) { - xhr.open('GET', SPEC_ENDPOINTS.rumEndpointBuilder.build('xhr')) + xhr.open('GET', SPEC_ENDPOINTS.rumEndpointBuilder.build('xhr', DEFAULT_PAYLOAD)) xhr.send() xhr.complete(200) }, From 22132eba2000fb6e53a919bb7f3dffc80cac86c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Zugmeyer?= Date: Tue, 5 Sep 2023 11:55:04 +0200 Subject: [PATCH 03/12] [RUM-253] add the encoding to the intake URLs --- .../core/src/domain/configuration/endpointBuilder.spec.ts | 6 ++++++ packages/core/src/domain/configuration/endpointBuilder.ts | 8 +++++++- packages/core/src/transport/httpRequest.ts | 1 + 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/packages/core/src/domain/configuration/endpointBuilder.spec.ts b/packages/core/src/domain/configuration/endpointBuilder.spec.ts index ff05d688a8..78ab911230 100644 --- a/packages/core/src/domain/configuration/endpointBuilder.spec.ts +++ b/packages/core/src/domain/configuration/endpointBuilder.spec.ts @@ -43,6 +43,12 @@ describe('endpointBuilder', () => { ) }) + it('should add the provided encoding', () => { + expect( + createEndpointBuilder(initConfiguration, 'rum', []).build('xhr', { ...DEFAULT_PAYLOAD, encoding: 'deflate' }) + ).toContain('&dd-evp-encoding=deflate') + }) + it('should not start with ddsource for internal analytics mode', () => { const url = createEndpointBuilder({ ...initConfiguration, internalAnalyticsSubdomain: 'foo' }, 'rum', []).build( 'xhr', diff --git a/packages/core/src/domain/configuration/endpointBuilder.ts b/packages/core/src/domain/configuration/endpointBuilder.ts index bfa75596de..cd72b18ef1 100644 --- a/packages/core/src/domain/configuration/endpointBuilder.ts +++ b/packages/core/src/domain/configuration/endpointBuilder.ts @@ -93,7 +93,7 @@ function buildEndpointParameters( endpointType: EndpointType, configurationTags: string[], api: 'xhr' | 'fetch' | 'beacon', - { retry, flushReason }: Payload + { retry, flushReason, encoding }: Payload ) { const tags = [`sdk_version:${__BUILD_ENV__SDK_VERSION__}`, `api:${api}`].concat(configurationTags) if (flushReason && isExperimentalFeatureEnabled(ExperimentalFeature.COLLECT_FLUSH_REASON)) { @@ -102,6 +102,7 @@ function buildEndpointParameters( if (retry) { tags.push(`retry_count:${retry.count}`, `retry_after:${retry.lastFailureStatus}`) } + const parameters = [ 'ddsource=browser', `ddtags=${encodeURIComponent(tags.join(','))}`, @@ -111,9 +112,14 @@ function buildEndpointParameters( `dd-request-id=${generateUUID()}`, ] + if (encoding) { + parameters.push(`dd-evp-encoding=${encoding}`) + } + if (endpointType === 'rum') { parameters.push(`batch_time=${timeStampNow()}`) } + if (internalAnalyticsSubdomain) { parameters.reverse() } diff --git a/packages/core/src/transport/httpRequest.ts b/packages/core/src/transport/httpRequest.ts index f340dd1290..97736c68ed 100644 --- a/packages/core/src/transport/httpRequest.ts +++ b/packages/core/src/transport/httpRequest.ts @@ -28,6 +28,7 @@ export interface Payload { bytesCount: number retry?: RetryInfo flushReason?: FlushReason + encoding?: string } export interface RetryInfo { From 9d61657143217f0a1be699257abaffec3a6d5595 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Zugmeyer?= Date: Mon, 28 Aug 2023 11:00:18 +0200 Subject: [PATCH 04/12] =?UTF-8?q?=E2=9C=A8=20[RUM-253]=20introduce=20an=20?= =?UTF-8?q?abstract=20Encoder=20interface?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This new interface is slightly different than the current deflate encoder implementation. This is because it has been adjusted to take the Batch constraints into account. The deflate encoder implementation will be changed in a future commit. --- packages/core/src/index.ts | 1 + packages/core/src/tools/encoder.spec.ts | 112 ++++++++++++++++++++++++ packages/core/src/tools/encoder.ts | 102 +++++++++++++++++++++ 3 files changed, 215 insertions(+) create mode 100644 packages/core/src/tools/encoder.spec.ts create mode 100644 packages/core/src/tools/encoder.ts diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index a66c0ae13a..ae65d97343 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -62,6 +62,7 @@ export { FlushReason, } from './transport' export * from './tools/display' +export { Encoder, EncoderResult } from './tools/encoder' export * from './tools/utils/urlPolyfill' export * from './tools/utils/timeUtils' export * from './tools/utils/arrayUtils' diff --git a/packages/core/src/tools/encoder.spec.ts b/packages/core/src/tools/encoder.spec.ts new file mode 100644 index 0000000000..5f177b815d --- /dev/null +++ b/packages/core/src/tools/encoder.spec.ts @@ -0,0 +1,112 @@ +import { createIdentityEncoder } from './encoder' +import { noop } from './utils/functionUtils' + +describe('createIdentityEncoder', () => { + it('creates an encoder with initial values', () => { + const encoder = createIdentityEncoder() + + expect(encoder.isEmpty).toBe(true) + expect(encoder.finishSync()).toEqual({ + output: '', + outputBytesCount: 0, + rawBytesCount: 0, + pendingData: '', + }) + }) + + describe('write()', () => { + it('writes data to the encoder', () => { + const encoder = createIdentityEncoder() + const data = 'Hello, world!' + + encoder.write(data) + + expect(encoder.isEmpty).toBe(false) + }) + + it('calls the callback when writing data with a callback', () => { + const encoder = createIdentityEncoder() + const data = 'Callback test' + const callbackSpy = jasmine.createSpy() + + encoder.write(data, callbackSpy) + + expect(callbackSpy).toHaveBeenCalledOnceWith(data.length) + }) + }) + + describe('finish()', () => { + it('calls the callback with the result', () => { + const encoder = createIdentityEncoder() + const data = 'Final data' + encoder.write(data) + const callbackSpy = jasmine.createSpy() + + encoder.finish(callbackSpy) + + expect(callbackSpy).toHaveBeenCalledWith({ + output: data, + outputBytesCount: data.length, + rawBytesCount: data.length, + pendingData: '', + }) + }) + + it('after calling finish(), the encoder should be considered empty', () => { + const encoder = createIdentityEncoder() + encoder.write('Some data') + + encoder.finish(noop) + + expect(encoder.isEmpty).toBe(true) + expect(encoder.finishSync()).toEqual({ + output: '', + outputBytesCount: 0, + rawBytesCount: 0, + pendingData: '', + }) + }) + }) + + describe('finishSync()', () => { + it('returns the encoder result', () => { + const encoder = createIdentityEncoder() + const data = 'Hello, world!' + + encoder.write(data) + + expect(encoder.finishSync()).toEqual({ + output: data, + outputBytesCount: data.length, + rawBytesCount: data.length, + pendingData: '', + }) + }) + + it('after calling finish(), the encoder should be considered empty', () => { + const encoder = createIdentityEncoder() + encoder.write('Some data') + + encoder.finishSync() + + expect(encoder.isEmpty).toBe(true) + expect(encoder.finishSync()).toEqual({ + output: '', + outputBytesCount: 0, + rawBytesCount: 0, + pendingData: '', + }) + }) + }) + + describe('estimateEncodedBytesCount()', () => { + it('estimates encoded bytes count accurately', () => { + const encoder = createIdentityEncoder() + const data = 'Estimation test' + + const estimatedBytes = encoder.estimateEncodedBytesCount(data) + + expect(estimatedBytes).toBe(data.length) + }) + }) +}) diff --git a/packages/core/src/tools/encoder.ts b/packages/core/src/tools/encoder.ts new file mode 100644 index 0000000000..87eda4a41c --- /dev/null +++ b/packages/core/src/tools/encoder.ts @@ -0,0 +1,102 @@ +import { computeBytesCount } from './utils/byteUtils' + +export interface Encoder { + /** + * Whether this encoder might call the provided callbacks asynchronously + */ + isAsync: boolean + + /** + * Whether some data has been written since the last finish() or finishSync() call + */ + isEmpty: boolean + + /** + * Write a string to be encoded. + * + * This operation can be synchronous or asynchronous depending on the encoder implementation. + * + * If specified, the callback will be invoked when the operation finishes, unless the operation is + * asynchronous and finish() or finishSync() is called in the meantime. + */ + write(data: string, callback?: (additionalEncodedBytesCount: number) => void): void + + /** + * Waits for pending data to be encoded and resets the encoder state. + * + * This operation can be synchronous or asynchronous depending on the encoder implementation. + * + * The callback will be invoked when the operation finishes, unless the operation is asynchronous + * and another call to finish() or finishSync() occurs in the meantime. + */ + finish(callback: (result: EncoderResult) => void): void + + /** + * Resets the encoder state then returns the encoded data and any potential pending data directly, + * discarding all pending write operations. + */ + finishSync(): EncoderResult & { pendingData: string } + + /** + * Returns a rough estimation of the bytes count if the data was encoded. + */ + estimateEncodedBytesCount(data: string): number +} + +export interface EncoderResult { + output: Output + outputBytesCount: number + + /** + * An encoding type supported by HTTP Content-Encoding, if applicable. + * See https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding#directives + */ + encoding?: string + + /** + * Total bytes count of the input strings encoded to UTF-8. + */ + rawBytesCount: number +} + +export function createIdentityEncoder(): Encoder { + let output = '' + let outputBytesCount = 0 + + return { + isAsync: false, + + get isEmpty() { + return !output + }, + + write(data, callback) { + const additionalEncodedBytesCount = computeBytesCount(data) + outputBytesCount += additionalEncodedBytesCount + output += data + if (callback) { + callback(additionalEncodedBytesCount) + } + }, + + finish(callback) { + callback(this.finishSync()) + }, + + finishSync() { + const result = { + output, + outputBytesCount, + rawBytesCount: outputBytesCount, + pendingData: '', + } + output = '' + outputBytesCount = 0 + return result + }, + + estimateEncodedBytesCount(data) { + return data.length + }, + } +} From 848ca83829ebdae590fa9e6680b82f3e45792786 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Zugmeyer?= Date: Mon, 28 Aug 2023 11:04:41 +0200 Subject: [PATCH 05/12] =?UTF-8?q?=E2=9C=A8=20[RUM-253]=20use=20an=20Encode?= =?UTF-8?q?r=20in=20Batch?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/core/src/transport/batch.spec.ts | 213 ++++++++++++++---- packages/core/src/transport/batch.ts | 109 ++++++--- .../core/src/transport/flushController.ts | 20 +- packages/core/src/transport/httpRequest.ts | 2 +- .../src/transport/startBatchWithReplica.ts | 2 + .../core/test/emulate/mockFlushController.ts | 15 +- 6 files changed, 278 insertions(+), 83 deletions(-) diff --git a/packages/core/src/transport/batch.spec.ts b/packages/core/src/transport/batch.spec.ts index b243bc6ef1..23b5441bac 100644 --- a/packages/core/src/transport/batch.spec.ts +++ b/packages/core/src/transport/batch.spec.ts @@ -1,6 +1,8 @@ import type { MockFlushController } from '../../test' import { createMockFlushController } from '../../test' import { display } from '../tools/display' +import type { Encoder } from '../tools/encoder' +import { createIdentityEncoder } from '../tools/encoder' import { Batch } from './batch' import type { FlushReason } from './flushController' import type { HttpRequest } from './httpRequest' @@ -21,6 +23,7 @@ describe('batch', () => { const flushReason: FlushReason = 'bytes_limit' let flushController: MockFlushController + let encoder: Encoder beforeEach(() => { transport = { @@ -28,7 +31,8 @@ describe('batch', () => { sendOnExit: jasmine.createSpy(), } satisfies HttpRequest flushController = createMockFlushController() - batch = new Batch(transport, flushController, MESSAGE_BYTES_LIMIT) + encoder = createIdentityEncoder() + batch = new Batch(encoder, transport, flushController, MESSAGE_BYTES_LIMIT) }) it('should send a message', () => { @@ -40,50 +44,64 @@ describe('batch', () => { data: '{"message":"hello"}', bytesCount: SMALL_MESSAGE_BYTES_COUNT, flushReason, + encoding: undefined, }) }) - it('should add message to the flush controller', () => { - batch.add(SMALL_MESSAGE) + describe('adding a message', () => { + it('should add message to the flush controller', () => { + batch.add(SMALL_MESSAGE) - expect(flushController.notifyBeforeAddMessage).toHaveBeenCalledOnceWith(SMALL_MESSAGE_BYTES_COUNT) - expect(flushController.notifyAfterAddMessage).toHaveBeenCalledOnceWith() - }) + expect(flushController.notifyBeforeAddMessage).toHaveBeenCalledOnceWith(SMALL_MESSAGE_BYTES_COUNT) + expect(flushController.notifyAfterAddMessage).toHaveBeenCalledOnceWith(0) + }) - it('should consider separators when adding message', () => { - batch.add(SMALL_MESSAGE) - batch.add(SMALL_MESSAGE) - batch.add(SMALL_MESSAGE) + it('should consider separators when adding message', () => { + batch.add(SMALL_MESSAGE) + batch.add(SMALL_MESSAGE) + batch.add(SMALL_MESSAGE) - expect(flushController.notifyBeforeAddMessage.calls.allArgs()).toEqual([ - [SMALL_MESSAGE_BYTES_COUNT], - [SMALL_MESSAGE_BYTES_COUNT + SEPARATOR_BYTES_COUNT], - [SMALL_MESSAGE_BYTES_COUNT + SEPARATOR_BYTES_COUNT], - ]) - }) + expect(flushController.bytesCount).toEqual( + SMALL_MESSAGE_BYTES_COUNT + + SEPARATOR_BYTES_COUNT + + SMALL_MESSAGE_BYTES_COUNT + + SEPARATOR_BYTES_COUNT + + SMALL_MESSAGE_BYTES_COUNT + ) + }) - it('should consider separators when replacing messages', () => { - batch.add(SMALL_MESSAGE) - batch.upsert(SMALL_MESSAGE, 'a') + it('should remove the estimated message bytes count when replacing a message', () => { + batch.add(SMALL_MESSAGE) + batch.upsert(SMALL_MESSAGE, 'a') - flushController.notifyBeforeAddMessage.calls.reset() + flushController.notifyBeforeAddMessage.calls.reset() - batch.upsert(SMALL_MESSAGE, 'a') + batch.upsert(SMALL_MESSAGE, 'a') - expect(flushController.notifyAfterRemoveMessage).toHaveBeenCalledOnceWith( - SMALL_MESSAGE_BYTES_COUNT + SEPARATOR_BYTES_COUNT - ) - expect(flushController.notifyBeforeAddMessage).toHaveBeenCalledOnceWith( - SMALL_MESSAGE_BYTES_COUNT + SEPARATOR_BYTES_COUNT - ) - }) + expect(flushController.notifyAfterRemoveMessage).toHaveBeenCalledOnceWith(SMALL_MESSAGE_BYTES_COUNT) + expect(flushController.notifyBeforeAddMessage).toHaveBeenCalledOnceWith(SMALL_MESSAGE_BYTES_COUNT) + expect(flushController.bytesCount).toEqual( + // Note: contrary to added messages (see test above), we don't take separators into account + // when upserting messages, because it's irrelevant: upserted messages size are not yet + // encoded so the bytes count is only an estimation + SMALL_MESSAGE_BYTES_COUNT + SMALL_MESSAGE_BYTES_COUNT + ) + }) + + it('should not send a message with a bytes size above the limit', () => { + const warnSpy = spyOn(display, 'warn') + batch.add(BIG_MESSAGE_OVER_BYTES_LIMIT) - it('should not send a message with a bytes size above the limit', () => { - const warnSpy = spyOn(display, 'warn') - batch.add(BIG_MESSAGE_OVER_BYTES_LIMIT) + expect(warnSpy).toHaveBeenCalled() + expect(flushController.notifyBeforeAddMessage).not.toHaveBeenCalled() + }) - expect(warnSpy).toHaveBeenCalled() - expect(flushController.notifyBeforeAddMessage).not.toHaveBeenCalled() + it('should adjust the message size after the message has been added', () => { + const message = { message: '😤' } // JS string length = 2, but 4 bytes once encoded to UTF-8 + batch.add(message) + expect(flushController.notifyBeforeAddMessage).toHaveBeenCalledOnceWith(16) + expect(flushController.notifyAfterAddMessage).toHaveBeenCalledOnceWith(2) // 2 more bytes once encoded + }) }) it('should upsert a message for a given key', () => { @@ -97,6 +115,7 @@ describe('batch', () => { data: '{"message":"2"}\n{"message":"3"}\n{"message":"4"}', bytesCount: jasmine.any(Number), flushReason, + encoding: undefined, }) batch.upsert({ message: '5' }, 'c') @@ -108,6 +127,7 @@ describe('batch', () => { data: '{"message":"5"}\n{"message":"6"}\n{"message":"7"}', bytesCount: jasmine.any(Number), flushReason, + encoding: undefined, }) batch.upsert({ message: '8' }, 'a') @@ -120,23 +140,128 @@ describe('batch', () => { data: '{"message":"10"}\n{"message":"11"}', bytesCount: jasmine.any(Number), flushReason, + encoding: undefined, }) }) - it('should be able to use telemetry in the httpRequest.send', () => { - transport.send.and.callFake(() => { - addTelemetryDebugFake() + describe('flush messages when the page is not exiting or with a synchronous encoder', () => { + it('should send addend and upserted messages in the same request', () => { + batch.add({ message: '1' }) + batch.upsert({ message: '2' }, 'a') + + flushController.notifyFlush() + + expect(transport.send.calls.mostRecent().args[0]).toEqual({ + data: '{"message":"1"}\n{"message":"2"}', + bytesCount: jasmine.any(Number), + flushReason, + encoding: undefined, + }) }) - const addTelemetryDebugFake = () => batch.add({ message: 'telemetry message' }) - batch.add({ message: 'normal message' }) - expect(flushController.notifyBeforeAddMessage).toHaveBeenCalledTimes(1) + it('should encode upserted messages', () => { + const encoderWriteSpy = spyOn(encoder, 'write') - flushController.notifyFlush() - expect(transport.send).toHaveBeenCalledTimes(1) - expect(flushController.notifyBeforeAddMessage).toHaveBeenCalledTimes(2) + batch.upsert({ message: '2' }, 'a') - flushController.notifyFlush() - expect(transport.send).toHaveBeenCalledTimes(2) + flushController.notifyFlush() + + expect(encoderWriteSpy).toHaveBeenCalledOnceWith('{"message":"2"}') + }) + + it('should be able to use telemetry in the httpRequest.send', () => { + transport.send.and.callFake(() => { + addTelemetryDebugFake() + }) + const addTelemetryDebugFake = () => batch.add({ message: 'telemetry message' }) + + batch.add({ message: 'normal message' }) + expect(flushController.notifyBeforeAddMessage).toHaveBeenCalledTimes(1) + + flushController.notifyFlush() + expect(transport.send).toHaveBeenCalledTimes(1) + expect(flushController.notifyBeforeAddMessage).toHaveBeenCalledTimes(2) + + flushController.notifyFlush() + expect(transport.send).toHaveBeenCalledTimes(2) + }) + }) + + describe('flush messages when the page is exiting and with an asynchronous encoder', () => { + beforeEach(() => { + encoder.isAsync = true + }) + + // + ;[ + { + title: 'when adding a message, it should be sent in one request', + add: { message: 1 }, + expectedRequests: ['{"message":1}'], + }, + { + title: 'when upserting a message, it should be sent in one request', + upsert: { message: 1 }, + expectedRequests: ['{"message":1}'], + }, + { + title: 'when adding a message and upserting another, they should be sent in two separate requests', + add: { message: 1 }, + upsert: { message: 2 }, + expectedRequests: ['{"message":1}', '{"message":2}'], + }, + { + title: + 'when adding a message and another message is still pending, they should be sent in two separate requests', + add: { message: 1 }, + pending: { message: 2 }, + expectedRequests: ['{"message":1}', '{"message":2}'], + }, + { + title: 'when upserting a message and another message is still pending, they should be sent in one request', + upsert: { message: 1 }, + pending: { message: 2 }, + expectedRequests: ['{"message":2}\n{"message":1}'], + }, + ].forEach(({ title, add, upsert, pending, expectedRequests }) => { + it(title, () => { + if (add) { + batch.add(add) + } + if (upsert) { + batch.upsert(upsert, 'a') + } + if (pending) { + // eslint-disable-next-line @typescript-eslint/unbound-method + const original = encoder.finishSync + spyOn(encoder, 'finishSync').and.callFake(() => ({ + ...original(), + pendingData: JSON.stringify(pending), + })) + } + + flushController.notifyFlush('before_unload') + + expect(transport.sendOnExit.calls.allArgs().map(([payload]) => payload.data)).toEqual(expectedRequests) + }) + }) + + it('should be able to use telemetry in the httpRequest.sendOnExit', () => { + transport.sendOnExit.and.callFake(() => { + addTelemetryDebugFake() + }) + const addTelemetryDebugFake = () => batch.add({ message: 'telemetry message' }) + + batch.add({ message: 'normal message' }) + batch.upsert({ message: '2' }, 'a') + expect(flushController.notifyBeforeAddMessage).toHaveBeenCalledTimes(2) + + flushController.notifyFlush('before_unload') + expect(transport.sendOnExit).toHaveBeenCalledTimes(2) + expect(flushController.notifyBeforeAddMessage).toHaveBeenCalledTimes(4) + + flushController.notifyFlush('before_unload') + expect(transport.sendOnExit).toHaveBeenCalledTimes(3) + }) }) }) diff --git a/packages/core/src/transport/batch.ts b/packages/core/src/transport/batch.ts index e4b68727d1..89bcf4331a 100644 --- a/packages/core/src/transport/batch.ts +++ b/packages/core/src/transport/batch.ts @@ -2,16 +2,17 @@ import { display } from '../tools/display' import type { Context } from '../tools/serialisation/context' import { objectValues } from '../tools/utils/polyfills' import { isPageExitReason } from '../browser/pageExitObservable' -import { computeBytesCount } from '../tools/utils/byteUtils' import { jsonStringify } from '../tools/serialisation/jsonStringify' -import type { HttpRequest } from './httpRequest' +import type { Encoder, EncoderResult } from '../tools/encoder' +import { computeBytesCount } from '../tools/utils/byteUtils' +import type { HttpRequest, Payload } from './httpRequest' import type { FlushController, FlushEvent } from './flushController' export class Batch { - private pushOnlyBuffer: string[] = [] private upsertBuffer: { [key: string]: string } = {} constructor( + private encoder: Encoder, private request: HttpRequest, public flushController: FlushController, private messageBytesLimit: number @@ -28,23 +29,52 @@ export class Batch { } private flush(event: FlushEvent) { - const messages = this.pushOnlyBuffer.concat(objectValues(this.upsertBuffer)) - - this.pushOnlyBuffer = [] + const upsertMessages = objectValues(this.upsertBuffer).join('\n') this.upsertBuffer = {} - const payload = { data: messages.join('\n'), bytesCount: event.bytesCount, flushReason: event.reason } - if (isPageExitReason(event.reason)) { - this.request.sendOnExit(payload) + const isPageExit = isPageExitReason(event.reason) + const send = isPageExit ? this.request.sendOnExit : this.request.send + + if ( + isPageExit && + // Note: checking that the encoder is async is not strictly needed, but it's an optimization: + // if the encoder is async we need to send two requests in some cases (one for encoded data + // and the other for non-encoded data). But if it's not async, we don't have to worry about + // it and always send a single request. + this.encoder.isAsync + ) { + const encoderResult = this.encoder.finishSync() + + // Send encoded messages + if (encoderResult.outputBytesCount) { + send(formatPayloadFromEncoder(encoderResult, event)) + } + + // Send messages that are not yet encoded at this point + const pendingMessages = [encoderResult.pendingData, upsertMessages].filter(Boolean).join('\n') + if (pendingMessages) { + send({ + data: pendingMessages, + bytesCount: computeBytesCount(pendingMessages), + flushReason: event.reason, + }) + } } else { - this.request.send(payload) + if (upsertMessages) { + this.encoder.write(this.encoder.isEmpty ? upsertMessages : `\n${upsertMessages}`) + } + this.encoder.finish((encoderResult) => { + send(formatPayloadFromEncoder(encoderResult, event)) + }) } } private addOrUpdate(message: Context, key?: string) { - const { processedMessage, messageBytesCount } = this.process(message) + const serializedMessage = jsonStringify(message)! - if (messageBytesCount >= this.messageBytesLimit) { + const estimatedMessageBytesCount = this.encoder.estimateEncodedBytesCount(serializedMessage) + + if (estimatedMessageBytesCount >= this.messageBytesLimit) { display.warn( `Discarded a message whose size was bigger than the maximum allowed size ${this.messageBytesLimit}KB.` ) @@ -55,38 +85,57 @@ export class Batch { this.remove(key) } - this.push(processedMessage, messageBytesCount, key) - } - - private process(message: Context) { - const processedMessage = jsonStringify(message)! - const messageBytesCount = computeBytesCount(processedMessage) - return { processedMessage, messageBytesCount } + this.push(serializedMessage, estimatedMessageBytesCount, key) } - private push(processedMessage: string, messageBytesCount: number, key?: string) { - // If there are other messages, a '\n' will be added at serialization - const separatorBytesCount = this.flushController.messagesCount > 0 ? 1 : 0 + private push(serializedMessage: string, estimatedMessageBytesCount: number, key?: string) { + this.flushController.notifyBeforeAddMessage(estimatedMessageBytesCount) - this.flushController.notifyBeforeAddMessage(messageBytesCount + separatorBytesCount) if (key !== undefined) { - this.upsertBuffer[key] = processedMessage + this.upsertBuffer[key] = serializedMessage + this.flushController.notifyAfterAddMessage() } else { - this.pushOnlyBuffer.push(processedMessage) + this.encoder.write( + this.encoder.isEmpty ? serializedMessage : `\n${serializedMessage}`, + (realMessageBytesCount) => { + this.flushController.notifyAfterAddMessage(realMessageBytesCount - estimatedMessageBytesCount) + } + ) } - this.flushController.notifyAfterAddMessage() } private remove(key: string) { const removedMessage = this.upsertBuffer[key] delete this.upsertBuffer[key] - const messageBytesCount = computeBytesCount(removedMessage) - // If there are other messages, a '\n' will be added at serialization - const separatorBytesCount = this.flushController.messagesCount > 1 ? 1 : 0 - this.flushController.notifyAfterRemoveMessage(messageBytesCount + separatorBytesCount) + const messageBytesCount = this.encoder.estimateEncodedBytesCount(removedMessage) + this.flushController.notifyAfterRemoveMessage(messageBytesCount) } private hasMessageFor(key?: string): key is string { return key !== undefined && this.upsertBuffer[key] !== undefined } } + +function formatPayloadFromEncoder(encoderResult: EncoderResult, flushEvent: FlushEvent): Payload { + let data: string | Blob + if (typeof encoderResult.output === 'string') { + data = encoderResult.output + } else { + data = new Blob([encoderResult.output], { + // This will set the 'Content-Type: text/plain' header. Reasoning: + // * The intake rejects the request if there is no content type. + // * The browser will issue CORS preflight requests if we set it to 'application/json', which + // could induce higher intake load (and maybe has other impacts). + // * Also it's not quite JSON, since we are concatenating multiple JSON objects separated by + // new lines. + type: 'text/plain', + }) + } + + return { + data, + bytesCount: encoderResult.outputBytesCount, + encoding: encoderResult.encoding, + flushReason: flushEvent.reason, + } +} diff --git a/packages/core/src/transport/flushController.ts b/packages/core/src/transport/flushController.ts index dfbe7d26d3..71887bb630 100644 --- a/packages/core/src/transport/flushController.ts +++ b/packages/core/src/transport/flushController.ts @@ -85,16 +85,19 @@ export function createFlushController({ * * This function needs to be called synchronously, right before adding the message, so no flush * event can happen after `notifyBeforeAddMessage` and before adding the message. + * + * @param estimatedMessageBytesCount: an estimation of the message bytes count once it is + * actually added. */ - notifyBeforeAddMessage(messageBytesCount: number) { - if (currentBytesCount + messageBytesCount >= bytesLimit) { + notifyBeforeAddMessage(estimatedMessageBytesCount: number) { + if (currentBytesCount + estimatedMessageBytesCount >= bytesLimit) { flush('bytes_limit') } // Consider the message to be added now rather than in `notifyAfterAddMessage`, because if no // message was added yet and `notifyAfterAddMessage` is called asynchronously, we still want // to notify when a flush is needed (for example on page exit). currentMessagesCount += 1 - currentBytesCount += messageBytesCount + currentBytesCount += estimatedMessageBytesCount scheduleDurationLimitTimeout() }, @@ -103,8 +106,13 @@ export function createFlushController({ * * This function can be called asynchronously after the message was added, but in this case it * should not be called if a flush event occurred in between. + * + * @param messageBytesCountDiff: the difference between the estimated message bytes count and + * its actual bytes count once added to the pool. */ - notifyAfterAddMessage() { + notifyAfterAddMessage(messageBytesCountDiff = 0) { + currentBytesCount += messageBytesCountDiff + if (currentMessagesCount >= messagesLimit) { flush('messages_limit') } else if (currentBytesCount >= bytesLimit) { @@ -117,6 +125,10 @@ export function createFlushController({ * * This function needs to be called synchronously, right after removing the message, so no flush * event can happen after removing the message and before `notifyAfterRemoveMessage`. + * + * @param messageBytesCount: the message bytes count that was added to the pool. Should + * correspond to the sum of bytes counts passed to `notifyBeforeAddMessage` and + * `notifyAfterAddMessage`. */ notifyAfterRemoveMessage(messageBytesCount: number) { currentBytesCount -= messageBytesCount diff --git a/packages/core/src/transport/httpRequest.ts b/packages/core/src/transport/httpRequest.ts index 97736c68ed..a2275bf1f5 100644 --- a/packages/core/src/transport/httpRequest.ts +++ b/packages/core/src/transport/httpRequest.ts @@ -24,7 +24,7 @@ export interface HttpResponse extends Context { } export interface Payload { - data: string | FormData + data: string | FormData | Blob bytesCount: number retry?: RetryInfo flushReason?: FlushReason diff --git a/packages/core/src/transport/startBatchWithReplica.ts b/packages/core/src/transport/startBatchWithReplica.ts index 55bcea76fb..36d4e81660 100644 --- a/packages/core/src/transport/startBatchWithReplica.ts +++ b/packages/core/src/transport/startBatchWithReplica.ts @@ -3,6 +3,7 @@ import type { Context } from '../tools/serialisation/context' import type { Observable } from '../tools/observable' import type { PageExitEvent } from '../browser/pageExitObservable' import type { RawError } from '../domain/error/error.types' +import { createIdentityEncoder } from '../tools/encoder' import { Batch } from './batch' import { createHttpRequest } from './httpRequest' import { createFlushController } from './flushController' @@ -20,6 +21,7 @@ export function startBatchWithReplica( function createBatch(configuration: Configuration, endpointBuilder: EndpointBuilder) { return new Batch( + createIdentityEncoder(), createHttpRequest(configuration, endpointBuilder, configuration.batchBytesLimit, reportError), createFlushController({ messagesLimit: configuration.batchMessagesLimit, diff --git a/packages/core/test/emulate/mockFlushController.ts b/packages/core/test/emulate/mockFlushController.ts index f6d507be7d..c894908d47 100644 --- a/packages/core/test/emulate/mockFlushController.ts +++ b/packages/core/test/emulate/mockFlushController.ts @@ -1,5 +1,5 @@ import { Observable } from '../../src/tools/observable' -import type { FlushEvent, FlushController } from '../../src/transport' +import type { FlushEvent, FlushController, FlushReason } from '../../src/transport' export type MockFlushController = ReturnType @@ -15,7 +15,11 @@ export function createMockFlushController() { currentBytesCount += messageBytesCount currentMessagesCount += 1 }), - notifyAfterAddMessage: jasmine.createSpy(), + notifyAfterAddMessage: jasmine + .createSpy() + .and.callFake((messageBytesCountDiff = 0) => { + currentBytesCount += messageBytesCountDiff + }), notifyAfterRemoveMessage: jasmine .createSpy() .and.callFake((messageBytesCount) => { @@ -25,8 +29,11 @@ export function createMockFlushController() { get messagesCount() { return currentMessagesCount }, + get bytesCount() { + return currentBytesCount + }, flushObservable, - notifyFlush() { + notifyFlush(reason: FlushReason = 'bytes_limit') { if (currentMessagesCount === 0) { throw new Error( 'MockFlushController.notifyFlush(): the original FlushController would not notify flush if no message was added' @@ -40,7 +47,7 @@ export function createMockFlushController() { currentBytesCount = 0 flushObservable.notify({ - reason: 'bytes_limit', + reason, bytesCount, messagesCount, }) From 149f2b6bd766fb69997e52d1e1c21f1135372b37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Zugmeyer?= Date: Mon, 28 Aug 2023 11:05:16 +0200 Subject: [PATCH 06/12] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20[RUM-253]=20adjust?= =?UTF-8?q?=20DeflateEncoder=20implementation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/domain/deflate/deflateEncoder.spec.ts | 208 ++++++++++++++---- .../rum/src/domain/deflate/deflateEncoder.ts | 116 ++++++---- .../domain/segmentCollection/segment.spec.ts | 73 +++--- .../src/domain/segmentCollection/segment.ts | 28 ++- .../segmentCollection/segmentCollection.ts | 15 +- 5 files changed, 304 insertions(+), 136 deletions(-) diff --git a/packages/rum/src/domain/deflate/deflateEncoder.spec.ts b/packages/rum/src/domain/deflate/deflateEncoder.spec.ts index b853abac40..716b374929 100644 --- a/packages/rum/src/domain/deflate/deflateEncoder.spec.ts +++ b/packages/rum/src/domain/deflate/deflateEncoder.spec.ts @@ -1,4 +1,4 @@ -import type { RawTelemetryEvent } from '@datadog/browser-core' +import type { RawTelemetryEvent, EncoderResult } from '@datadog/browser-core' import type { RumConfiguration } from '@datadog/browser-rum-core' import { noop, startFakeTelemetry } from '@datadog/browser-core' import { MockWorker } from '../../../test' @@ -20,28 +20,175 @@ describe('createDeflateEncoder', () => { telemetryEvents = startFakeTelemetry() }) - it('initializes the encoder with correct initial state', () => { - const encoder = createDeflateEncoder(configuration, worker, DeflateEncoderStreamId.REPLAY) + describe('write()', () => { + it('invokes write callbacks', () => { + const encoder = createDeflateEncoder(configuration, worker, DeflateEncoderStreamId.REPLAY) + const writeCallbackSpy = jasmine.createSpy() + encoder.write('foo', writeCallbackSpy) + encoder.write('bar', writeCallbackSpy) + + expect(writeCallbackSpy).not.toHaveBeenCalled() + + worker.processAllMessages() - expect(encoder.encodedBytes).toEqual(new Uint8Array(0)) - expect(encoder.encodedBytesCount).toBe(0) - expect(encoder.rawBytesCount).toBe(0) + expect(writeCallbackSpy).toHaveBeenCalledTimes(2) + expect(writeCallbackSpy.calls.argsFor(0)).toEqual([3]) + expect(writeCallbackSpy.calls.argsFor(1)).toEqual([3]) + }) + + it('marks the encoder as not empty', () => { + const encoder = createDeflateEncoder(configuration, worker, DeflateEncoderStreamId.REPLAY) + encoder.write('foo') + expect(encoder.isEmpty).toBe(false) + }) }) - it('encodes data correctly', () => { - const encoder = createDeflateEncoder(configuration, worker, DeflateEncoderStreamId.REPLAY) - const writeCallbackSpy = jasmine.createSpy() - encoder.write('foo', writeCallbackSpy) - encoder.write('bar', writeCallbackSpy) + describe('finish()', () => { + it('invokes the callback with the encoded data', () => { + const encoder = createDeflateEncoder(configuration, worker, DeflateEncoderStreamId.REPLAY) + const finishCallbackSpy = jasmine.createSpy<(result: EncoderResult) => void>() + encoder.write('foo') + encoder.write('bar') + encoder.finish(finishCallbackSpy) + + worker.processAllMessages() + + expect(finishCallbackSpy).toHaveBeenCalledOnceWith({ + output: new Uint8Array([...ENCODED_FOO, ...ENCODED_BAR, ...TRAILER]), + outputBytesCount: 7, + rawBytesCount: 6, + encoding: 'deflate', + }) + }) - expect(writeCallbackSpy).not.toHaveBeenCalled() + it('invokes the callback even if nothing has been written', () => { + const encoder = createDeflateEncoder(configuration, worker, DeflateEncoderStreamId.REPLAY) + const finishCallbackSpy = jasmine.createSpy<(result: EncoderResult) => void>() + encoder.finish(finishCallbackSpy) + + expect(finishCallbackSpy).toHaveBeenCalledOnceWith({ + output: new Uint8Array(0), + outputBytesCount: 0, + rawBytesCount: 0, + encoding: 'deflate', + }) + }) - worker.processAllMessages() + it('cancels pending write callbacks', () => { + const encoder = createDeflateEncoder(configuration, worker, DeflateEncoderStreamId.REPLAY) + const writeCallbackSpy = jasmine.createSpy() + encoder.write('foo', writeCallbackSpy) + encoder.write('bar', writeCallbackSpy) + encoder.finish(noop) + + worker.processAllMessages() + + expect(writeCallbackSpy).not.toHaveBeenCalled() + }) + + it('marks the encoder as empty', () => { + const encoder = createDeflateEncoder(configuration, worker, DeflateEncoderStreamId.REPLAY) + encoder.write('foo') + encoder.finish(noop) + expect(encoder.isEmpty).toBe(true) + }) + + it('supports calling finish() while another finish() call is pending', () => { + const encoder = createDeflateEncoder(configuration, worker, DeflateEncoderStreamId.REPLAY) + const finishCallbackSpy = jasmine.createSpy<(result: EncoderResult) => void>() + encoder.write('foo') + encoder.finish(finishCallbackSpy) + encoder.write('bar') + encoder.finish(finishCallbackSpy) + + worker.processAllMessages() + + expect(finishCallbackSpy).toHaveBeenCalledTimes(2) + expect(finishCallbackSpy.calls.allArgs()).toEqual([ + [ + { + output: new Uint8Array([...ENCODED_FOO, ...TRAILER]), + outputBytesCount: 4, + rawBytesCount: 3, + encoding: 'deflate', + }, + ], + [ + { + output: new Uint8Array([...ENCODED_BAR, ...TRAILER]), + outputBytesCount: 4, + rawBytesCount: 3, + encoding: 'deflate', + }, + ], + ]) + }) + }) + + describe('finishSync()', () => { + it('returns the encoded data up to this point and any pending data', () => { + const encoder = createDeflateEncoder(configuration, worker, DeflateEncoderStreamId.REPLAY) + encoder.write('foo') + encoder.write('bar') + + worker.processNextMessage() + + expect(encoder.finishSync()).toEqual({ + output: new Uint8Array([...ENCODED_FOO, ...TRAILER]), + outputBytesCount: 4, + rawBytesCount: 3, + pendingData: 'bar', + encoding: 'deflate', + }) + }) + + it('cancels pending write callbacks', () => { + const encoder = createDeflateEncoder(configuration, worker, DeflateEncoderStreamId.REPLAY) + const writeCallbackSpy = jasmine.createSpy() + encoder.write('foo', writeCallbackSpy) + encoder.write('bar', writeCallbackSpy) + encoder.finishSync() + + worker.processAllMessages() - expect(writeCallbackSpy).toHaveBeenCalledTimes(2) - expect(encoder.encodedBytes).toEqual(new Uint8Array([...ENCODED_FOO, ...ENCODED_BAR, ...TRAILER])) - expect(encoder.encodedBytesCount).toBe(7) - expect(encoder.rawBytesCount).toBe(6) + expect(writeCallbackSpy).not.toHaveBeenCalled() + }) + + it('marks the encoder as empty', () => { + const encoder = createDeflateEncoder(configuration, worker, DeflateEncoderStreamId.REPLAY) + encoder.write('foo') + encoder.finishSync() + expect(encoder.isEmpty).toBe(true) + }) + + it('supports calling finishSync() while another finish() call is pending', () => { + const encoder = createDeflateEncoder(configuration, worker, DeflateEncoderStreamId.REPLAY) + const finishCallbackSpy = jasmine.createSpy<(result: EncoderResult) => void>() + encoder.write('foo') + encoder.finish(finishCallbackSpy) + encoder.write('bar') + expect(encoder.finishSync()).toEqual({ + output: new Uint8Array(0), + outputBytesCount: 0, + rawBytesCount: 0, + pendingData: 'bar', + encoding: 'deflate', + }) + + worker.processAllMessages() + + expect(finishCallbackSpy).toHaveBeenCalledTimes(1) + expect(finishCallbackSpy.calls.allArgs()).toEqual([ + [ + { + output: new Uint8Array([...ENCODED_FOO, ...TRAILER]), + outputBytesCount: 4, + rawBytesCount: 3, + encoding: 'deflate', + }, + ], + ]) + }) }) it('ignores messages destined to other streams', () => { @@ -58,33 +205,6 @@ describe('createDeflateEncoder', () => { expect(writeCallbackSpy).not.toHaveBeenCalled() }) - it('resets the stream', () => { - const encoder = createDeflateEncoder(configuration, worker, DeflateEncoderStreamId.REPLAY) - const writeCallbackSpy = jasmine.createSpy() - encoder.write('foo', writeCallbackSpy) - encoder.reset() - encoder.write('bar', writeCallbackSpy) - - worker.processAllMessages() - - expect(writeCallbackSpy).toHaveBeenCalledTimes(2) - expect(encoder.encodedBytes).toEqual(new Uint8Array([...ENCODED_BAR, ...TRAILER])) - expect(encoder.encodedBytesCount).toBe(4) - expect(encoder.rawBytesCount).toBe(3) - }) - - it('the encoder state stays available when the write callback is invoked', () => { - const encoder = createDeflateEncoder(configuration, worker, DeflateEncoderStreamId.REPLAY) - encoder.write('foo', () => { - expect(encoder.encodedBytes).toEqual(new Uint8Array([...ENCODED_FOO, ...TRAILER])) - expect(encoder.encodedBytesCount).toBe(4) - expect(encoder.rawBytesCount).toBe(3) - }) - encoder.reset() - - worker.processAllMessages() - }) - it('unsubscribes from the worker responses come out of order', () => { const encoder = createDeflateEncoder(configuration, worker, DeflateEncoderStreamId.REPLAY) encoder.write('foo', noop) diff --git a/packages/rum/src/domain/deflate/deflateEncoder.ts b/packages/rum/src/domain/deflate/deflateEncoder.ts index 87612afca9..d2bedf3595 100644 --- a/packages/rum/src/domain/deflate/deflateEncoder.ts +++ b/packages/rum/src/domain/deflate/deflateEncoder.ts @@ -1,15 +1,9 @@ -import type { DeflateWorkerResponse } from '@datadog/browser-core' +import type { DeflateWorkerResponse, Encoder, EncoderResult } from '@datadog/browser-core' import type { RumConfiguration } from '@datadog/browser-rum-core' -import { addEventListener, addTelemetryDebug, concatBuffers } from '@datadog/browser-core' +import { addEventListener, addTelemetryDebug, assign, concatBuffers } from '@datadog/browser-core' import type { DeflateWorker } from './deflateWorker' -export interface DeflateEncoder { - write(data: string, callback: () => void): void - reset(): void - encodedBytesCount: number - encodedBytes: Uint8Array - rawBytesCount: number -} +export type DeflateEncoder = Encoder export const enum DeflateEncoderStreamId { REPLAY = 1, @@ -25,7 +19,12 @@ export function createDeflateEncoder( let compressedDataTrailer: Uint8Array let nextWriteActionId = 0 - const pendingWriteActions: Array<{ callback: () => void; id: number }> = [] + const pendingWriteActions: Array<{ + callback?: (additionalEncodedBytesCount: number) => void + finished: boolean + id: number + data: string + }> = [] const { stop: removeMessageListener } = addEventListener( configuration, @@ -36,18 +35,15 @@ export function createDeflateEncoder( return } + rawBytesCount += data.additionalBytesCount + compressedData.push(data.result) + compressedDataTrailer = data.trailer + const nextPendingAction = pendingWriteActions.shift() if (nextPendingAction && nextPendingAction.id === data.id) { - if (data.id === 0) { - // Initial state - rawBytesCount = data.additionalBytesCount - compressedData = [data.result] - } else { - rawBytesCount += data.additionalBytesCount - compressedData.push(data.result) + if (nextPendingAction.callback) { + nextPendingAction.callback(data.result.byteLength) } - compressedDataTrailer = data.trailer - nextPendingAction.callback() } else { removeMessageListener() addTelemetryDebug('Worker responses received out of order.') @@ -55,25 +51,44 @@ export function createDeflateEncoder( } ) - return { - get encodedBytes() { - if (!compressedData.length) { - return new Uint8Array(0) - } - - return concatBuffers(compressedData.concat(compressedDataTrailer)) - }, + function consumeResult(): EncoderResult { + const output = + compressedData.length === 0 ? new Uint8Array(0) : concatBuffers(compressedData.concat(compressedDataTrailer)) + const result = { + rawBytesCount, + output, + outputBytesCount: output.byteLength, + encoding: 'deflate', + } + rawBytesCount = 0 + compressedData = [] + return result + } - get encodedBytesCount() { - if (!compressedData.length) { - return 0 + function cancelUnfinishedPendingWriteActions() { + pendingWriteActions.forEach((pendingWriteAction) => { + if (!pendingWriteAction.finished) { + pendingWriteAction.finished = true + delete pendingWriteAction.callback } + }) + } - return compressedData.reduce((total, buffer) => total + buffer.length, 0) + compressedDataTrailer.length - }, + function sendResetIfNeeded() { + if (nextWriteActionId > 0) { + worker.postMessage({ + action: 'reset', + streamId, + }) + nextWriteActionId = 0 + } + } + + return { + isAsync: true, - get rawBytesCount() { - return rawBytesCount + get isEmpty() { + return nextWriteActionId === 0 }, write(data, callback) { @@ -86,16 +101,39 @@ export function createDeflateEncoder( pendingWriteActions.push({ id: nextWriteActionId, callback, + data, + finished: false, }) nextWriteActionId += 1 }, - reset() { - worker.postMessage({ - action: 'reset', - streamId, + finish(callback) { + sendResetIfNeeded() + + if (!pendingWriteActions.length) { + callback(consumeResult()) + } else { + cancelUnfinishedPendingWriteActions() + // Wait for the last action to finish before calling the finish callback + pendingWriteActions[pendingWriteActions.length - 1].callback = () => callback(consumeResult()) + } + }, + + finishSync() { + sendResetIfNeeded() + + const pendingData = pendingWriteActions + .filter((pendingWriteAction) => !pendingWriteAction.finished) + .map((pendingWriteAction) => pendingWriteAction.data) + .join('') + cancelUnfinishedPendingWriteActions() + return assign(consumeResult(), { + pendingData, }) - nextWriteActionId = 0 + }, + + estimateEncodedBytesCount(data) { + return data.length / 7 }, } } diff --git a/packages/rum/src/domain/segmentCollection/segment.spec.ts b/packages/rum/src/domain/segmentCollection/segment.spec.ts index bf66f962da..8291925f93 100644 --- a/packages/rum/src/domain/segmentCollection/segment.spec.ts +++ b/packages/rum/src/domain/segmentCollection/segment.spec.ts @@ -7,6 +7,7 @@ import { RecordType } from '../../types' import { getReplayStats, resetReplayStats } from '../replayStats' import type { DeflateEncoder } from '../deflate' import { DeflateEncoderStreamId, createDeflateEncoder } from '../deflate' +import type { AddRecordCallback, FlushCallback } from './segment' import { Segment } from './segment' const CONTEXT: SegmentContext = { application: { id: 'a' }, view: { id: 'b' }, session: { id: 'c' } } @@ -19,8 +20,7 @@ const FULL_SNAPSHOT_RECORD: BrowserRecord = { } const ENCODED_SEGMENT_HEADER_BYTES_COUNT = 12 // {"records":[ const ENCODED_RECORD_BYTES_COUNT = 25 -const ENCODED_FULL_SNAPSHOT_RECORD_BYTES_COUNT = 35 -const ENCODED_META_BYTES_COUNT = 192 // this should stay accurate as long as less than 10 records are added +const ENCODED_META_BYTES_COUNT = 193 // this should stay accurate as long as less than 10 records are added const TRAILER_BYTES_COUNT = 1 describe('Segment', () => { @@ -44,8 +44,8 @@ describe('Segment', () => { }) it('writes a segment', () => { - const addRecordCallbackSpy = jasmine.createSpy<() => void>() - const flushCallbackSpy = jasmine.createSpy<(metadata: BrowserSegmentMetadata) => void>() + const addRecordCallbackSpy = jasmine.createSpy() + const flushCallbackSpy = jasmine.createSpy() const segment = createSegment() segment.addRecord(RECORD, addRecordCallbackSpy) @@ -58,7 +58,7 @@ describe('Segment', () => { expect(addRecordCallbackSpy).toHaveBeenCalledTimes(1) expect(flushCallbackSpy).toHaveBeenCalledTimes(1) - expect(parseSegment(encoder.encodedBytes)).toEqual({ + expect(parseSegment(flushCallbackSpy.calls.mostRecent().args[1].output)).toEqual({ source: 'browser' as const, creation_reason: 'init' as const, end: 10, @@ -77,48 +77,59 @@ describe('Segment', () => { }) it('compressed bytes count is updated when a record is added', () => { + const addRecordCallbackSpy = jasmine.createSpy() const segment = createSegment() - segment.addRecord(RECORD, noop) + segment.addRecord(RECORD, addRecordCallbackSpy) worker.processAllMessages() - expect(encoder.encodedBytesCount).toBe( - ENCODED_SEGMENT_HEADER_BYTES_COUNT + ENCODED_RECORD_BYTES_COUNT + TRAILER_BYTES_COUNT + expect(addRecordCallbackSpy).toHaveBeenCalledOnceWith( + ENCODED_SEGMENT_HEADER_BYTES_COUNT + ENCODED_RECORD_BYTES_COUNT ) }) - it('calls the flush callback with metadata as argument', () => { - const flushCallbackSpy = jasmine.createSpy<(metadata: BrowserSegmentMetadata) => void>() + it('calls the flush callback with metadata and encoder output as argument', () => { + const flushCallbackSpy = jasmine.createSpy() const segment = createSegment() segment.addRecord(RECORD, noop) segment.flush(flushCallbackSpy) worker.processAllMessages() - expect(flushCallbackSpy).toHaveBeenCalledOnceWith({ - start: 10, - end: 10, - creation_reason: 'init', - has_full_snapshot: false, - index_in_view: 0, - source: 'browser', - records_count: 1, - ...CONTEXT, - }) + expect(flushCallbackSpy).toHaveBeenCalledOnceWith( + { + start: 10, + end: 10, + creation_reason: 'init', + has_full_snapshot: false, + index_in_view: 0, + source: 'browser', + records_count: 1, + ...CONTEXT, + }, + { + output: jasmine.any(Uint8Array) as unknown as Uint8Array, + outputBytesCount: + ENCODED_SEGMENT_HEADER_BYTES_COUNT + + ENCODED_RECORD_BYTES_COUNT + + ENCODED_META_BYTES_COUNT + + TRAILER_BYTES_COUNT, + rawBytesCount: ENCODED_SEGMENT_HEADER_BYTES_COUNT + ENCODED_RECORD_BYTES_COUNT + ENCODED_META_BYTES_COUNT, + encoding: 'deflate', + } + ) }) it('resets the encoder when a segment is flushed', () => { - const encodedBytesCounts: number[] = [] + const flushCallbackSpy = jasmine.createSpy() const segment1 = createSegment({ creationReason: 'init' }) - segment1.addRecord(RECORD, () => encodedBytesCounts.push(encoder.encodedBytesCount)) - segment1.flush(noop) + segment1.addRecord(RECORD, noop) + segment1.flush(flushCallbackSpy) const segment2 = createSegment({ creationReason: 'segment_duration_limit' }) - segment2.addRecord(FULL_SNAPSHOT_RECORD, () => encodedBytesCounts.push(encoder.encodedBytesCount)) - segment2.flush(noop) + segment2.addRecord(FULL_SNAPSHOT_RECORD, noop) + segment2.flush(flushCallbackSpy) worker.processAllMessages() - expect(encodedBytesCounts).toEqual([ - ENCODED_SEGMENT_HEADER_BYTES_COUNT + ENCODED_RECORD_BYTES_COUNT + TRAILER_BYTES_COUNT, - ENCODED_SEGMENT_HEADER_BYTES_COUNT + ENCODED_FULL_SNAPSHOT_RECORD_BYTES_COUNT + TRAILER_BYTES_COUNT, - ]) + expect(parseSegment(flushCallbackSpy.calls.argsFor(0)[1].output).records.length).toBe(1) + expect(parseSegment(flushCallbackSpy.calls.argsFor(1)[1].output).records.length).toBe(1) }) it('throws when trying to flush an empty segment', () => { @@ -244,14 +255,14 @@ describe('Segment', () => { it('when flushing a segment', () => { const segment = createSegment() - segment.addRecord(FULL_SNAPSHOT_RECORD, noop) + segment.addRecord(RECORD, noop) segment.flush(noop) worker.processAllMessages() expect(getReplayStats('b')).toEqual({ segments_count: 1, records_count: 1, segments_total_raw_size: - ENCODED_SEGMENT_HEADER_BYTES_COUNT + ENCODED_FULL_SNAPSHOT_RECORD_BYTES_COUNT + ENCODED_META_BYTES_COUNT, + ENCODED_SEGMENT_HEADER_BYTES_COUNT + ENCODED_RECORD_BYTES_COUNT + ENCODED_META_BYTES_COUNT, }) }) }) diff --git a/packages/rum/src/domain/segmentCollection/segment.ts b/packages/rum/src/domain/segmentCollection/segment.ts index aa4aac5127..395bb3b432 100644 --- a/packages/rum/src/domain/segmentCollection/segment.ts +++ b/packages/rum/src/domain/segmentCollection/segment.ts @@ -1,16 +1,19 @@ +import type { Encoder, EncoderResult } from '@datadog/browser-core' import { assign, sendToExtension } from '@datadog/browser-core' import type { BrowserRecord, BrowserSegmentMetadata, CreationReason, SegmentContext } from '../../types' import { RecordType } from '../../types' import * as replayStats from '../replayStats' -import type { DeflateEncoder } from '../deflate' export type FlushReason = Exclude | 'stop' +export type FlushCallback = (metadata: BrowserSegmentMetadata, encoderResult: EncoderResult) => void +export type AddRecordCallback = (encodedBytesCount: number) => void export class Segment { private metadata: BrowserSegmentMetadata + private encodedBytesCount = 0 constructor( - private encoder: DeflateEncoder, + private encoder: Encoder, context: SegmentContext, creationReason: CreationReason ) { @@ -32,7 +35,7 @@ export class Segment { replayStats.addSegment(viewId) } - addRecord(record: BrowserRecord, callback: () => void): void { + addRecord(record: BrowserRecord, callback: AddRecordCallback): void { this.metadata.start = Math.min(this.metadata.start, record.timestamp) this.metadata.end = Math.max(this.metadata.end, record.timestamp) this.metadata.records_count += 1 @@ -41,19 +44,22 @@ export class Segment { sendToExtension('record', { record, segment: this.metadata }) replayStats.addRecord(this.metadata.view.id) - const prefix = this.metadata.records_count === 1 ? '{"records":[' : ',' - this.encoder.write(prefix + JSON.stringify(record), callback) + const prefix = this.encoder.isEmpty ? '{"records":[' : ',' + this.encoder.write(prefix + JSON.stringify(record), (additionalEncodedBytesCount) => { + this.encodedBytesCount += additionalEncodedBytesCount + callback(this.encodedBytesCount) + }) } - flush(callback: (metadata: BrowserSegmentMetadata) => void) { - if (this.metadata.records_count === 0) { + flush(callback: FlushCallback) { + if (this.encoder.isEmpty) { throw new Error('Empty segment flushed') } - this.encoder.write(`],${JSON.stringify(this.metadata).slice(1)}\n`, () => { - replayStats.addWroteData(this.metadata.view.id, this.encoder.rawBytesCount) - callback(this.metadata) + this.encoder.write(`],${JSON.stringify(this.metadata).slice(1)}\n`) + this.encoder.finish((encoderResult) => { + replayStats.addWroteData(this.metadata.view.id, encoderResult.rawBytesCount) + callback(this.metadata, encoderResult) }) - this.encoder.reset() } } diff --git a/packages/rum/src/domain/segmentCollection/segmentCollection.ts b/packages/rum/src/domain/segmentCollection/segmentCollection.ts index 29681ef366..71cee4691c 100644 --- a/packages/rum/src/domain/segmentCollection/segmentCollection.ts +++ b/packages/rum/src/domain/segmentCollection/segmentCollection.ts @@ -99,8 +99,8 @@ export function doStartSegmentCollection( function flushSegment(flushReason: FlushReason) { if (state.status === SegmentCollectionStatus.SegmentPending) { - state.segment.flush((metadata) => { - const payload = buildReplayPayload(encoder.encodedBytes, metadata, encoder.rawBytesCount) + state.segment.flush((metadata, encoderResult) => { + const payload = buildReplayPayload(encoderResult.output, metadata, encoderResult.rawBytesCount) if (isPageExitReason(flushReason)) { httpRequest.sendOnExit(payload) @@ -144,15 +144,8 @@ export function doStartSegmentCollection( } } - const segment = state.segment - - segment.addRecord(record, () => { - if ( - // the written segment is still pending - state.status === SegmentCollectionStatus.SegmentPending && - state.segment === segment && - encoder.encodedBytesCount > SEGMENT_BYTES_LIMIT - ) { + state.segment.addRecord(record, (encodedBytesCount) => { + if (encodedBytesCount > SEGMENT_BYTES_LIMIT) { flushSegment('segment_bytes_limit') } }) From 7bccb1999e9f0d1655327a0f00f45fb2e1cea20b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Zugmeyer?= Date: Mon, 28 Aug 2023 15:23:10 +0200 Subject: [PATCH 07/12] =?UTF-8?q?=F0=9F=9A=9A=20[RUM-253]=20move=20some=20?= =?UTF-8?q?deflate=20types=20to=20core?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/core/src/domain/deflate/types.ts | 12 ++++++++++++ packages/rum/src/boot/recorderApi.ts | 10 ++++++++-- packages/rum/src/boot/startRecording.spec.ts | 11 +++++++++-- packages/rum/src/boot/startRecording.ts | 3 +-- .../rum/src/domain/deflate/deflateEncoder.spec.ts | 4 ++-- packages/rum/src/domain/deflate/deflateEncoder.ts | 15 +++++++-------- packages/rum/src/domain/deflate/deflateWorker.ts | 6 +----- packages/rum/src/domain/deflate/index.ts | 3 +-- .../src/domain/segmentCollection/segment.spec.ts | 7 +++---- .../segmentCollection/segmentCollection.spec.ts | 4 ++-- .../domain/segmentCollection/segmentCollection.ts | 3 +-- packages/rum/test/mockWorker.ts | 3 +-- 12 files changed, 48 insertions(+), 33 deletions(-) diff --git a/packages/core/src/domain/deflate/types.ts b/packages/core/src/domain/deflate/types.ts index fd48b0e809..d4a57f599c 100644 --- a/packages/core/src/domain/deflate/types.ts +++ b/packages/core/src/domain/deflate/types.ts @@ -1,3 +1,5 @@ +import type { Encoder } from '../../tools/encoder' + export type DeflateWorkerAction = // Action to send when creating the worker to check if the communication is working correctly. // The worker should respond with a 'initialized' response. @@ -39,3 +41,13 @@ export type DeflateWorkerResponse = streamId?: number error: Error | string } + +export interface DeflateWorker extends Worker { + postMessage(message: DeflateWorkerAction): void +} + +export type DeflateEncoder = Encoder + +export const enum DeflateEncoderStreamId { + REPLAY = 1, +} diff --git a/packages/rum/src/boot/recorderApi.ts b/packages/rum/src/boot/recorderApi.ts index 74133ea479..0ac0ca8c18 100644 --- a/packages/rum/src/boot/recorderApi.ts +++ b/packages/rum/src/boot/recorderApi.ts @@ -1,5 +1,12 @@ import type { RelativeTime } from '@datadog/browser-core' -import { Observable, canUseEventBridge, noop, runOnReadyState, relativeNow } from '@datadog/browser-core' +import { + Observable, + canUseEventBridge, + noop, + runOnReadyState, + relativeNow, + DeflateEncoderStreamId, +} from '@datadog/browser-core' import type { LifeCycle, ViewContexts, @@ -12,7 +19,6 @@ import { getReplayStats as getReplayStatsImpl } from '../domain/replayStats' import { getSessionReplayLink } from '../domain/getSessionReplayLink' import type { CreateDeflateWorker } from '../domain/deflate' import { - DeflateEncoderStreamId, createDeflateEncoder, startDeflateWorker, DeflateWorkerStatus, diff --git a/packages/rum/src/boot/startRecording.spec.ts b/packages/rum/src/boot/startRecording.spec.ts index d36fc08c89..fe625da677 100644 --- a/packages/rum/src/boot/startRecording.spec.ts +++ b/packages/rum/src/boot/startRecording.spec.ts @@ -1,5 +1,12 @@ import type { TimeStamp, HttpRequest, ClocksState } from '@datadog/browser-core' -import { PageExitReason, DefaultPrivacyLevel, noop, isIE, timeStampNow } from '@datadog/browser-core' +import { + PageExitReason, + DefaultPrivacyLevel, + noop, + isIE, + timeStampNow, + DeflateEncoderStreamId, +} from '@datadog/browser-core' import type { LifeCycle, ViewCreatedEvent, RumConfiguration } from '@datadog/browser-rum-core' import { LifeCycleEventType } from '@datadog/browser-rum-core' import type { Clock } from '@datadog/browser-core/test' @@ -9,7 +16,7 @@ import { createRumSessionManagerMock, setup } from '../../../rum-core/test' import { recordsPerFullSnapshot, readReplayPayload } from '../../test' import { setSegmentBytesLimit } from '../domain/segmentCollection' -import { DeflateEncoderStreamId, startDeflateWorker, createDeflateEncoder } from '../domain/deflate' +import { startDeflateWorker, createDeflateEncoder } from '../domain/deflate' import { RecordType } from '../types' import { resetReplayStats } from '../domain/replayStats' diff --git a/packages/rum/src/boot/startRecording.ts b/packages/rum/src/boot/startRecording.ts index 332ca865dd..1bff158ca1 100644 --- a/packages/rum/src/boot/startRecording.ts +++ b/packages/rum/src/boot/startRecording.ts @@ -1,4 +1,4 @@ -import type { RawError, HttpRequest } from '@datadog/browser-core' +import type { RawError, HttpRequest, DeflateEncoder } from '@datadog/browser-core' import { timeStampNow, createHttpRequest, addTelemetryDebug } from '@datadog/browser-core' import type { LifeCycle, @@ -10,7 +10,6 @@ import type { import { LifeCycleEventType } from '@datadog/browser-rum-core' import { record } from '../domain/record' -import type { DeflateEncoder } from '../domain/deflate' import { startSegmentCollection, SEGMENT_BYTES_LIMIT } from '../domain/segmentCollection' import { RecordType } from '../types' diff --git a/packages/rum/src/domain/deflate/deflateEncoder.spec.ts b/packages/rum/src/domain/deflate/deflateEncoder.spec.ts index 716b374929..d90f57e415 100644 --- a/packages/rum/src/domain/deflate/deflateEncoder.spec.ts +++ b/packages/rum/src/domain/deflate/deflateEncoder.spec.ts @@ -1,8 +1,8 @@ import type { RawTelemetryEvent, EncoderResult } from '@datadog/browser-core' import type { RumConfiguration } from '@datadog/browser-rum-core' -import { noop, startFakeTelemetry } from '@datadog/browser-core' +import { noop, startFakeTelemetry, DeflateEncoderStreamId } from '@datadog/browser-core' import { MockWorker } from '../../../test' -import { DeflateEncoderStreamId, createDeflateEncoder } from './deflateEncoder' +import { createDeflateEncoder } from './deflateEncoder' const OTHER_STREAM_ID = 10 as DeflateEncoderStreamId diff --git a/packages/rum/src/domain/deflate/deflateEncoder.ts b/packages/rum/src/domain/deflate/deflateEncoder.ts index d2bedf3595..1234b882f8 100644 --- a/packages/rum/src/domain/deflate/deflateEncoder.ts +++ b/packages/rum/src/domain/deflate/deflateEncoder.ts @@ -1,13 +1,12 @@ -import type { DeflateWorkerResponse, Encoder, EncoderResult } from '@datadog/browser-core' +import type { + DeflateWorkerResponse, + DeflateEncoder, + DeflateEncoderStreamId, + DeflateWorker, + EncoderResult, +} from '@datadog/browser-core' import type { RumConfiguration } from '@datadog/browser-rum-core' import { addEventListener, addTelemetryDebug, assign, concatBuffers } from '@datadog/browser-core' -import type { DeflateWorker } from './deflateWorker' - -export type DeflateEncoder = Encoder - -export const enum DeflateEncoderStreamId { - REPLAY = 1, -} export function createDeflateEncoder( configuration: RumConfiguration, diff --git a/packages/rum/src/domain/deflate/deflateWorker.ts b/packages/rum/src/domain/deflate/deflateWorker.ts index 17cd506882..106035e400 100644 --- a/packages/rum/src/domain/deflate/deflateWorker.ts +++ b/packages/rum/src/domain/deflate/deflateWorker.ts @@ -1,4 +1,4 @@ -import type { DeflateWorkerAction, DeflateWorkerResponse } from '@datadog/browser-core' +import type { DeflateWorker, DeflateWorkerResponse } from '@datadog/browser-core' import { addTelemetryError, display, includes, addEventListener, setTimeout, ONE_SECOND } from '@datadog/browser-core' import type { RumConfiguration } from '@datadog/browser-rum-core' @@ -36,10 +36,6 @@ type DeflateWorkerState = version: string } -export interface DeflateWorker extends Worker { - postMessage(message: DeflateWorkerAction): void -} - export type CreateDeflateWorker = typeof createDeflateWorker function createDeflateWorker(configuration: RumConfiguration): DeflateWorker { diff --git a/packages/rum/src/domain/deflate/index.ts b/packages/rum/src/domain/deflate/index.ts index fb2d904535..91ce2de850 100644 --- a/packages/rum/src/domain/deflate/index.ts +++ b/packages/rum/src/domain/deflate/index.ts @@ -1,6 +1,5 @@ -export { DeflateEncoderStreamId, DeflateEncoder, createDeflateEncoder } from './deflateEncoder' +export { createDeflateEncoder } from './deflateEncoder' export { - DeflateWorker, startDeflateWorker, DeflateWorkerStatus, getDeflateWorkerStatus, diff --git a/packages/rum/src/domain/segmentCollection/segment.spec.ts b/packages/rum/src/domain/segmentCollection/segment.spec.ts index 8291925f93..312e759511 100644 --- a/packages/rum/src/domain/segmentCollection/segment.spec.ts +++ b/packages/rum/src/domain/segmentCollection/segment.spec.ts @@ -1,12 +1,11 @@ -import type { TimeStamp } from '@datadog/browser-core' -import { noop, setDebugMode, isIE } from '@datadog/browser-core' +import type { DeflateEncoder, TimeStamp } from '@datadog/browser-core' +import { noop, setDebugMode, isIE, DeflateEncoderStreamId } from '@datadog/browser-core' import type { RumConfiguration } from '@datadog/browser-rum-core' import { MockWorker } from '../../../test' import type { CreationReason, BrowserRecord, SegmentContext, BrowserSegment, BrowserSegmentMetadata } from '../../types' import { RecordType } from '../../types' import { getReplayStats, resetReplayStats } from '../replayStats' -import type { DeflateEncoder } from '../deflate' -import { DeflateEncoderStreamId, createDeflateEncoder } from '../deflate' +import { createDeflateEncoder } from '../deflate' import type { AddRecordCallback, FlushCallback } from './segment' import { Segment } from './segment' diff --git a/packages/rum/src/domain/segmentCollection/segmentCollection.spec.ts b/packages/rum/src/domain/segmentCollection/segmentCollection.spec.ts index 53d0edfbdf..ce2c614a65 100644 --- a/packages/rum/src/domain/segmentCollection/segmentCollection.spec.ts +++ b/packages/rum/src/domain/segmentCollection/segmentCollection.spec.ts @@ -1,5 +1,5 @@ import type { ClocksState, HttpRequest, TimeStamp } from '@datadog/browser-core' -import { PageExitReason, isIE } from '@datadog/browser-core' +import { DeflateEncoderStreamId, PageExitReason, isIE } from '@datadog/browser-core' import type { ViewContexts, ViewContext, RumConfiguration } from '@datadog/browser-rum-core' import { LifeCycle, LifeCycleEventType } from '@datadog/browser-rum-core' import type { Clock } from '@datadog/browser-core/test' @@ -8,7 +8,7 @@ import { createRumSessionManagerMock } from '../../../../rum-core/test' import type { BrowserRecord, SegmentContext } from '../../types' import { RecordType } from '../../types' import { MockWorker, readMetadataFromReplayPayload } from '../../../test' -import { DeflateEncoderStreamId, createDeflateEncoder } from '../deflate' +import { createDeflateEncoder } from '../deflate' import { computeSegmentContext, doStartSegmentCollection, diff --git a/packages/rum/src/domain/segmentCollection/segmentCollection.ts b/packages/rum/src/domain/segmentCollection/segmentCollection.ts index 71cee4691c..0d5d3dddf7 100644 --- a/packages/rum/src/domain/segmentCollection/segmentCollection.ts +++ b/packages/rum/src/domain/segmentCollection/segmentCollection.ts @@ -1,9 +1,8 @@ -import type { HttpRequest, TimeoutId } from '@datadog/browser-core' +import type { DeflateEncoder, HttpRequest, TimeoutId } from '@datadog/browser-core' import { isPageExitReason, ONE_SECOND, clearTimeout, setTimeout } from '@datadog/browser-core' import type { LifeCycle, ViewContexts, RumSessionManager, RumConfiguration } from '@datadog/browser-rum-core' import { LifeCycleEventType } from '@datadog/browser-rum-core' import type { BrowserRecord, CreationReason, SegmentContext } from '../../types' -import type { DeflateEncoder } from '../deflate' import { buildReplayPayload } from './buildReplayPayload' import type { FlushReason } from './segment' import { Segment } from './segment' diff --git a/packages/rum/test/mockWorker.ts b/packages/rum/test/mockWorker.ts index 0f75f36d51..4a8d41a190 100644 --- a/packages/rum/test/mockWorker.ts +++ b/packages/rum/test/mockWorker.ts @@ -1,7 +1,6 @@ -import type { DeflateWorkerAction, DeflateWorkerResponse } from '@datadog/browser-core' +import type { DeflateWorker, DeflateWorkerAction, DeflateWorkerResponse } from '@datadog/browser-core' import { string2buf } from '../../worker/src/domain/deflate' import { createNewEvent } from '../../core/test' -import type { DeflateWorker } from '../src/domain/deflate' type DeflateWorkerListener = (event: { data: DeflateWorkerResponse }) => void From 9fe74cd1cf41bb232d2384fd05994281593cc0dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Zugmeyer?= Date: Tue, 29 Aug 2023 18:28:56 +0200 Subject: [PATCH 08/12] [RUM-253] allow providing an encoder when creating batches --- .../src/transport/startBatchWithReplica.ts | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/packages/core/src/transport/startBatchWithReplica.ts b/packages/core/src/transport/startBatchWithReplica.ts index 36d4e81660..8eea30e3f6 100644 --- a/packages/core/src/transport/startBatchWithReplica.ts +++ b/packages/core/src/transport/startBatchWithReplica.ts @@ -3,26 +3,36 @@ import type { Context } from '../tools/serialisation/context' import type { Observable } from '../tools/observable' import type { PageExitEvent } from '../browser/pageExitObservable' import type { RawError } from '../domain/error/error.types' +import type { Encoder } from '../tools/encoder' import { createIdentityEncoder } from '../tools/encoder' import { Batch } from './batch' import { createHttpRequest } from './httpRequest' import { createFlushController } from './flushController' +interface BatchConfiguration { + endpoint: EndpointBuilder + encoder?: Encoder +} + +interface ReplicaBatchConfiguration extends BatchConfiguration { + transformMessage?: (message: T) => T +} + export function startBatchWithReplica( configuration: Configuration, - primary: { endpoint: EndpointBuilder }, - replica: { endpoint: EndpointBuilder; transformMessage?: (message: T) => T } | undefined, + primary: BatchConfiguration, + replica: ReplicaBatchConfiguration | undefined, reportError: (error: RawError) => void, pageExitObservable: Observable, sessionExpireObservable: Observable ) { - const primaryBatch = createBatch(configuration, primary.endpoint) - const replicaBatch = replica && createBatch(configuration, replica.endpoint) + const primaryBatch = createBatch(configuration, primary) + const replicaBatch = replica && createBatch(configuration, replica) - function createBatch(configuration: Configuration, endpointBuilder: EndpointBuilder) { + function createBatch(configuration: Configuration, { endpoint, encoder }: BatchConfiguration) { return new Batch( - createIdentityEncoder(), - createHttpRequest(configuration, endpointBuilder, configuration.batchBytesLimit, reportError), + encoder || createIdentityEncoder(), + createHttpRequest(configuration, endpoint, configuration.batchBytesLimit, reportError), createFlushController({ messagesLimit: configuration.batchMessagesLimit, bytesLimit: configuration.batchBytesLimit, From 1cd7901907043ea7b3415a3e9976eed6af075409 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Zugmeyer?= Date: Mon, 28 Aug 2023 16:12:23 +0200 Subject: [PATCH 09/12] =?UTF-8?q?=E2=9A=97=E2=9C=A8=20[RUM-253]=20use=20de?= =?UTF-8?q?flate=20encoders=20for=20RUM=20batches?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit is not pretty. To avoid having inflated packages while dogfooding, we pass some functions from rum to rum-core when creating the RUM public API. In the future, those functions will be moved to the core package, and rum-core will be able to import them directly. --- packages/core/src/domain/deflate/types.ts | 2 + .../core/src/tools/experimentalFeatures.ts | 1 + .../rum-core/src/boot/rumPublicApi.spec.ts | 75 ++++++++++++++++++- packages/rum-core/src/boot/rumPublicApi.ts | 40 +++++++++- packages/rum-core/src/boot/startRum.ts | 15 +++- .../rum-core/src/transport/startRumBatch.ts | 21 +++++- .../rum/src/domain/deflate/deflateWorker.ts | 6 +- packages/rum/src/entries/main.ts | 3 +- 8 files changed, 149 insertions(+), 14 deletions(-) diff --git a/packages/core/src/domain/deflate/types.ts b/packages/core/src/domain/deflate/types.ts index d4a57f599c..5da332a59f 100644 --- a/packages/core/src/domain/deflate/types.ts +++ b/packages/core/src/domain/deflate/types.ts @@ -50,4 +50,6 @@ export type DeflateEncoder = Encoder export const enum DeflateEncoderStreamId { REPLAY = 1, + RUM = 2, + RUM_REPLICA = 3, } diff --git a/packages/core/src/tools/experimentalFeatures.ts b/packages/core/src/tools/experimentalFeatures.ts index 9306d19c9a..dedc1a149e 100644 --- a/packages/core/src/tools/experimentalFeatures.ts +++ b/packages/core/src/tools/experimentalFeatures.ts @@ -20,6 +20,7 @@ export enum ExperimentalFeature { SCROLLMAP = 'scrollmap', INTERACTION_TO_NEXT_PAINT = 'interaction_to_next_paint', DISABLE_REPLAY_INLINE_CSS = 'disable_replay_inline_css', + COMPRESS_BATCH = 'compress_batch', } const enabledExperimentalFeatures: Set = new Set() diff --git a/packages/rum-core/src/boot/rumPublicApi.spec.ts b/packages/rum-core/src/boot/rumPublicApi.spec.ts index 4cb83c8e88..de2635232e 100644 --- a/packages/rum-core/src/boot/rumPublicApi.spec.ts +++ b/packages/rum-core/src/boot/rumPublicApi.spec.ts @@ -1,5 +1,13 @@ -import type { RelativeTime, TimeStamp, Context } from '@datadog/browser-core' -import { ONE_SECOND, getTimeStamp, display, DefaultPrivacyLevel } from '@datadog/browser-core' +import type { RelativeTime, TimeStamp, Context, DeflateWorker } from '@datadog/browser-core' +import { + ONE_SECOND, + getTimeStamp, + display, + DefaultPrivacyLevel, + resetExperimentalFeatures, + ExperimentalFeature, + noop, +} from '@datadog/browser-core' import { initEventBridgeStub, deleteEventBridgeStub, @@ -149,6 +157,69 @@ describe('rum public api', () => { expect(startRumSpy).toHaveBeenCalled() }) }) + + describe('deflate worker', () => { + let rumPublicApi: RumPublicApi + let startDeflateWorkerSpy: jasmine.Spy + + beforeEach(() => { + startDeflateWorkerSpy = jasmine.createSpy().and.returnValue({} as DeflateWorker) + + rumPublicApi = makeRumPublicApi(startRumSpy, noopRecorderApi, { + startDeflateWorker: startDeflateWorkerSpy, + createDeflateEncoder: noop as any, + }) + }) + + afterEach(() => { + resetExperimentalFeatures() + deleteEventBridgeStub() + }) + + describe('without the COMPRESS_BATCH experimental flag', () => { + it('does not create a deflate worker', () => { + rumPublicApi.init(DEFAULT_INIT_CONFIGURATION) + + expect(startDeflateWorkerSpy).not.toHaveBeenCalled() + expect(startRumSpy.calls.mostRecent().args[6]).toBeUndefined() + }) + }) + + describe('with the COMPRESS_BATCH experimental flag', () => { + it('creates a deflate worker instance', () => { + rumPublicApi.init({ + ...DEFAULT_INIT_CONFIGURATION, + enableExperimentalFeatures: [ExperimentalFeature.COMPRESS_BATCH], + }) + + expect(startDeflateWorkerSpy).toHaveBeenCalledTimes(1) + expect(startRumSpy.calls.mostRecent().args[6]).toEqual(jasmine.any(Function)) + }) + + it('aborts the initialization if it fails to create a deflate worker', () => { + startDeflateWorkerSpy.and.returnValue(undefined) + + rumPublicApi.init({ + ...DEFAULT_INIT_CONFIGURATION, + enableExperimentalFeatures: [ExperimentalFeature.COMPRESS_BATCH], + }) + + expect(startRumSpy).not.toHaveBeenCalled() + }) + + it('if message bridge is present, does not create a deflate worker instance', () => { + initEventBridgeStub() + + rumPublicApi.init({ + ...DEFAULT_INIT_CONFIGURATION, + enableExperimentalFeatures: [ExperimentalFeature.COMPRESS_BATCH], + }) + + expect(startDeflateWorkerSpy).not.toHaveBeenCalled() + expect(startRumSpy).toHaveBeenCalledTimes(1) + }) + }) + }) }) describe('getInternalContext', () => { diff --git a/packages/rum-core/src/boot/rumPublicApi.ts b/packages/rum-core/src/boot/rumPublicApi.ts index c0fe8d8aea..5e072f1dfc 100644 --- a/packages/rum-core/src/boot/rumPublicApi.ts +++ b/packages/rum-core/src/boot/rumPublicApi.ts @@ -1,4 +1,14 @@ -import type { Context, InitConfiguration, TimeStamp, RelativeTime, User, Observable } from '@datadog/browser-core' +import type { + Context, + InitConfiguration, + TimeStamp, + RelativeTime, + User, + Observable, + DeflateWorker, + DeflateEncoderStreamId, + DeflateEncoder, +} from '@datadog/browser-core' import { noop, CustomerDataType, @@ -18,6 +28,8 @@ import { checkUser, sanitizeUser, sanitize, + isExperimentalFeatureEnabled, + ExperimentalFeature, } from '@datadog/browser-core' import type { LifeCycle } from '../domain/lifeCycle' import type { ViewContexts } from '../domain/contexts/viewContexts' @@ -57,12 +69,18 @@ export interface RecorderApi { } interface RumPublicApiOptions { ignoreInitIfSyntheticsWillInjectRum?: boolean + startDeflateWorker?: (configuration: RumConfiguration, source: string) => DeflateWorker | undefined + createDeflateEncoder?: ( + configuration: RumConfiguration, + worker: DeflateWorker, + streamId: DeflateEncoderStreamId + ) => DeflateEncoder } export function makeRumPublicApi( startRumImpl: StartRum, recorderApi: RecorderApi, - { ignoreInitIfSyntheticsWillInjectRum = true }: RumPublicApiOptions = {} + { ignoreInitIfSyntheticsWillInjectRum = true, startDeflateWorker, createDeflateEncoder }: RumPublicApiOptions = {} ) { let isAlreadyInitialized = false @@ -98,6 +116,8 @@ export function makeRumPublicApi( bufferApiCalls.add(() => addFeatureFlagEvaluationStrategy(key, value)) } + let deflateWorker: DeflateWorker | undefined + function initRum(initConfiguration: RumInitConfiguration) { // This function should be available, regardless of initialization success. getInitConfigurationStrategy = () => deepClone(initConfiguration) @@ -129,6 +149,17 @@ export function makeRumPublicApi( return } + if ( + isExperimentalFeatureEnabled(ExperimentalFeature.COMPRESS_BATCH) && + !eventBridgeAvailable && + startDeflateWorker + ) { + deflateWorker = startDeflateWorker(configuration, 'Datadog RUM') + if (!deflateWorker) { + return + } + } + if (!configuration.trackViewsManually) { doStartRum(initConfiguration, configuration) } else { @@ -158,7 +189,10 @@ export function makeRumPublicApi( recorderApi, globalContextManager, userContextManager, - initialViewOptions + initialViewOptions, + deflateWorker && + createDeflateEncoder && + ((streamId) => createDeflateEncoder(configuration, deflateWorker!, streamId)) ) getSessionReplayLinkStrategy = () => recorderApi.getSessionReplayLink(configuration, startRumResults.session, startRumResults.viewContexts) diff --git a/packages/rum-core/src/boot/startRum.ts b/packages/rum-core/src/boot/startRum.ts index 6ca15c69a2..b36aa78152 100644 --- a/packages/rum-core/src/boot/startRum.ts +++ b/packages/rum-core/src/boot/startRum.ts @@ -1,4 +1,11 @@ -import type { Observable, TelemetryEvent, RawError, ContextManager } from '@datadog/browser-core' +import type { + Observable, + TelemetryEvent, + RawError, + ContextManager, + DeflateEncoderStreamId, + DeflateEncoder, +} from '@datadog/browser-core' import { sendToExtension, createPageExitObservable, @@ -45,7 +52,8 @@ export function startRum( recorderApi: RecorderApi, globalContextManager: ContextManager, userContextManager: ContextManager, - initialViewOptions?: ViewOptions + initialViewOptions?: ViewOptions, + createDeflateEncoder?: (streamId: DeflateEncoderStreamId) => DeflateEncoder ) { const lifeCycle = new LifeCycle() @@ -86,7 +94,8 @@ export function startRum( telemetry.observable, reportError, pageExitObservable, - session.expireObservable + session.expireObservable, + createDeflateEncoder ) startCustomerDataTelemetry( configuration, diff --git a/packages/rum-core/src/transport/startRumBatch.ts b/packages/rum-core/src/transport/startRumBatch.ts index 60be71bed6..cfe84b3c93 100644 --- a/packages/rum-core/src/transport/startRumBatch.ts +++ b/packages/rum-core/src/transport/startRumBatch.ts @@ -1,5 +1,17 @@ -import type { Context, TelemetryEvent, Observable, RawError, PageExitEvent } from '@datadog/browser-core' -import { combine, isTelemetryReplicationAllowed, startBatchWithReplica } from '@datadog/browser-core' +import type { + Context, + TelemetryEvent, + Observable, + RawError, + PageExitEvent, + DeflateEncoder, +} from '@datadog/browser-core' +import { + DeflateEncoderStreamId, + combine, + isTelemetryReplicationAllowed, + startBatchWithReplica, +} from '@datadog/browser-core' import type { RumConfiguration } from '../domain/configuration' import type { LifeCycle } from '../domain/lifeCycle' import { LifeCycleEventType } from '../domain/lifeCycle' @@ -12,7 +24,8 @@ export function startRumBatch( telemetryEventObservable: Observable, reportError: (error: RawError) => void, pageExitObservable: Observable, - sessionExpireObservable: Observable + sessionExpireObservable: Observable, + createDeflateEncoder?: (streamId: DeflateEncoderStreamId) => DeflateEncoder ) { const replica = configuration.replica @@ -20,10 +33,12 @@ export function startRumBatch( configuration, { endpoint: configuration.rumEndpointBuilder, + encoder: createDeflateEncoder && createDeflateEncoder(DeflateEncoderStreamId.RUM), }, replica && { endpoint: replica.rumEndpointBuilder, transformMessage: (message) => combine(message, { application: { id: replica.applicationId } }), + encoder: createDeflateEncoder && createDeflateEncoder(DeflateEncoderStreamId.RUM_REPLICA), }, reportError, pageExitObservable, diff --git a/packages/rum/src/domain/deflate/deflateWorker.ts b/packages/rum/src/domain/deflate/deflateWorker.ts index 106035e400..c91d2bc8c9 100644 --- a/packages/rum/src/domain/deflate/deflateWorker.ts +++ b/packages/rum/src/domain/deflate/deflateWorker.ts @@ -47,7 +47,7 @@ let state: DeflateWorkerState = { status: DeflateWorkerStatus.Nil } export function startDeflateWorker( configuration: RumConfiguration, source: string, - onInitializationFailure: () => void, + onInitializationFailure?: () => void, createDeflateWorkerImpl = createDeflateWorker ) { if (state.status === DeflateWorkerStatus.Nil) { @@ -57,7 +57,9 @@ export function startDeflateWorker( switch (state.status) { case DeflateWorkerStatus.Loading: - state.initializationFailureCallbacks.push(onInitializationFailure) + if (onInitializationFailure) { + state.initializationFailureCallbacks.push(onInitializationFailure) + } return state.worker case DeflateWorkerStatus.Initialized: return state.worker diff --git a/packages/rum/src/entries/main.ts b/packages/rum/src/entries/main.ts index cfae23aced..a3b08cccb4 100644 --- a/packages/rum/src/entries/main.ts +++ b/packages/rum/src/entries/main.ts @@ -5,6 +5,7 @@ import { makeRumPublicApi, startRum } from '@datadog/browser-rum-core' import { startRecording } from '../boot/startRecording' import { makeRecorderApi } from '../boot/recorderApi' +import { createDeflateEncoder, startDeflateWorker } from '../domain/deflate' export { CommonProperties, @@ -30,7 +31,7 @@ export { export { DefaultPrivacyLevel } from '@datadog/browser-core' const recorderApi = makeRecorderApi(startRecording) -export const datadogRum = makeRumPublicApi(startRum, recorderApi) +export const datadogRum = makeRumPublicApi(startRum, recorderApi, { startDeflateWorker, createDeflateEncoder }) interface BrowserWindow extends Window { DD_RUM?: RumPublicApi From a7f956bafdb13ab6db4f869f92747b66bb85f564 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Zugmeyer?= Date: Mon, 4 Sep 2023 12:04:24 +0200 Subject: [PATCH 10/12] =?UTF-8?q?=E2=9C=85=E2=99=BB=EF=B8=8F=20[RUM-253]?= =?UTF-8?q?=20move=20segment=20file=20infos=20into=20a=20property?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test/e2e/lib/framework/intakeRegistry.ts | 8 +++++--- test/e2e/lib/framework/serverApps/intake.ts | 5 +++-- test/e2e/scenario/recorder/recorder.scenario.ts | 6 +++++- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/test/e2e/lib/framework/intakeRegistry.ts b/test/e2e/lib/framework/intakeRegistry.ts index cfcc0c6cca..0ec23e3f4e 100644 --- a/test/e2e/lib/framework/intakeRegistry.ts +++ b/test/e2e/lib/framework/intakeRegistry.ts @@ -21,9 +21,11 @@ export type ReplayIntakeRequest = { isBridge: false segment: BrowserSegment metadata: BrowserSegmentMetadataAndSegmentSizes - filename: string - encoding: string - mimetype: string + segmentFile: { + filename: string + encoding: string + mimetype: string + } } export type IntakeRequest = LogsIntakeRequest | RumIntakeRequest | ReplayIntakeRequest diff --git a/test/e2e/lib/framework/serverApps/intake.ts b/test/e2e/lib/framework/serverApps/intake.ts index bcfd9b603d..23b08f69bf 100644 --- a/test/e2e/lib/framework/serverApps/intake.ts +++ b/test/e2e/lib/framework/serverApps/intake.ts @@ -108,11 +108,12 @@ function readReplayIntakeRequest(req: express.Request): Promise { Promise.all([segmentPromise, metadataPromise]) - .then(([segmentEntry, metadata]) => ({ + .then(([{ segment, ...segmentFile }, metadata]) => ({ intakeType: 'replay' as const, isBridge: false as const, + segmentFile, metadata, - ...segmentEntry, + segment, })) .then(resolve, reject) }) diff --git a/test/e2e/scenario/recorder/recorder.scenario.ts b/test/e2e/scenario/recorder/recorder.scenario.ts index d76bac6253..988bf2f03f 100644 --- a/test/e2e/scenario/recorder/recorder.scenario.ts +++ b/test/e2e/scenario/recorder/recorder.scenario.ts @@ -36,7 +36,11 @@ describe('recorder', () => { await flushEvents() expect(intakeRegistry.replaySegments.length).toBe(1) - const { segment, metadata, encoding, filename, mimetype } = intakeRegistry.replayRequests[0] + const { + segment, + metadata, + segmentFile: { encoding, filename, mimetype }, + } = intakeRegistry.replayRequests[0] expect(metadata).toEqual({ application: { id: jasmine.stringMatching(UUID_RE) }, creation_reason: 'init', From 83f9442c27903a526f9e7e206e6ba55f407405ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Zugmeyer?= Date: Mon, 4 Sep 2023 12:35:05 +0200 Subject: [PATCH 11/12] =?UTF-8?q?=E2=9C=85=20[RUM-253]=20read=20encoded=20?= =?UTF-8?q?requests=20and=20expose=20the=20'encoding'=20on=20IntakeRequest?= =?UTF-8?q?=20objects?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test/e2e/lib/framework/intakeRegistry.ts | 14 +++--- test/e2e/lib/framework/serverApps/intake.ts | 49 +++++++++++++++------ 2 files changed, 43 insertions(+), 20 deletions(-) diff --git a/test/e2e/lib/framework/intakeRegistry.ts b/test/e2e/lib/framework/intakeRegistry.ts index 0ec23e3f4e..240b450166 100644 --- a/test/e2e/lib/framework/intakeRegistry.ts +++ b/test/e2e/lib/framework/intakeRegistry.ts @@ -4,21 +4,23 @@ import type { TelemetryEvent, TelemetryErrorEvent, TelemetryConfigurationEvent } import type { BrowserSegment } from '@datadog/browser-rum/src/types' import type { BrowserSegmentMetadataAndSegmentSizes } from '@datadog/browser-rum/src/domain/segmentCollection' +type BaseIntakeRequest = { + isBridge: boolean + encoding: string | null +} + export type LogsIntakeRequest = { intakeType: 'logs' - isBridge: boolean events: LogsEvent[] -} +} & BaseIntakeRequest export type RumIntakeRequest = { intakeType: 'rum' - isBridge: boolean events: Array -} +} & BaseIntakeRequest export type ReplayIntakeRequest = { intakeType: 'replay' - isBridge: false segment: BrowserSegment metadata: BrowserSegmentMetadataAndSegmentSizes segmentFile: { @@ -26,7 +28,7 @@ export type ReplayIntakeRequest = { encoding: string mimetype: string } -} +} & BaseIntakeRequest export type IntakeRequest = LogsIntakeRequest | RumIntakeRequest | ReplayIntakeRequest diff --git a/test/e2e/lib/framework/serverApps/intake.ts b/test/e2e/lib/framework/serverApps/intake.ts index 23b08f69bf..6108661a88 100644 --- a/test/e2e/lib/framework/serverApps/intake.ts +++ b/test/e2e/lib/framework/serverApps/intake.ts @@ -1,4 +1,4 @@ -import { createInflate } from 'zlib' +import { createInflate, inflateSync } from 'zlib' import https from 'https' import connectBusboy from 'connect-busboy' import express from 'express' @@ -6,11 +6,18 @@ import express from 'express' import cors from 'cors' import type { BrowserSegmentMetadataAndSegmentSizes } from '@datadog/browser-rum/src/domain/segmentCollection' import type { BrowserSegment } from '@datadog/browser-rum/src/types' -import type { IntakeRegistry, IntakeRequest, ReplayIntakeRequest } from '../intakeRegistry' +import type { + IntakeRegistry, + IntakeRequest, + LogsIntakeRequest, + ReplayIntakeRequest, + RumIntakeRequest, +} from '../intakeRegistry' interface IntakeRequestInfos { isBridge: boolean intakeType: IntakeRequest['intakeType'] + encoding: string | null } export function createIntakeServerApp(intakeRegistry: IntakeRegistry) { @@ -42,18 +49,22 @@ function computeIntakeRequestInfos(req: express.Request): IntakeRequestInfos { if (!ddforward) { throw new Error('ddforward is missing') } + const { pathname, searchParams } = new URL(ddforward, 'https://example.org') + + const encoding = req.headers['content-encoding'] || searchParams.get('dd-evp-encoding') if (req.query.bridge === 'true') { const eventType = req.query.event_type return { isBridge: true, + encoding, intakeType: eventType === 'log' ? 'logs' : 'rum', } } let intakeType: IntakeRequest['intakeType'] - // ddforward = /api/v2/rum?key=value - const endpoint = ddforward.split(/[/?]/)[3] + // pathname = /api/v2/rum + const endpoint = pathname.split(/[/?]/)[3] if (endpoint === 'logs' || endpoint === 'rum' || endpoint === 'replay') { intakeType = endpoint } else { @@ -61,26 +72,37 @@ function computeIntakeRequestInfos(req: express.Request): IntakeRequestInfos { } return { isBridge: false, + encoding, intakeType, } } -async function readIntakeRequest(req: express.Request, infos: IntakeRequestInfos): Promise { - if (infos.intakeType === 'replay') { - return readReplayIntakeRequest(req) - } +function readIntakeRequest(req: express.Request, infos: IntakeRequestInfos): Promise { + return infos.intakeType === 'replay' + ? readReplayIntakeRequest(req, infos as IntakeRequestInfos & { intakeType: 'replay' }) + : readRumOrLogsIntakeRequest(req, infos as IntakeRequestInfos & { intakeType: 'rum' | 'logs' }) +} + +async function readRumOrLogsIntakeRequest( + req: express.Request, + infos: IntakeRequestInfos & { intakeType: 'rum' | 'logs' } +): Promise { + const rawBody = await readStream(req) + const encodedBody = infos.encoding === 'deflate' ? inflateSync(rawBody) : rawBody return { - intakeType: infos.intakeType, - isBridge: infos.isBridge, - events: (await readStream(req)) + ...infos, + events: encodedBody .toString('utf-8') .split('\n') .map((line): any => JSON.parse(line)), } } -function readReplayIntakeRequest(req: express.Request): Promise { +function readReplayIntakeRequest( + req: express.Request, + infos: IntakeRequestInfos & { intakeType: 'replay' } +): Promise { return new Promise((resolve, reject) => { let segmentPromise: Promise<{ encoding: string @@ -109,8 +131,7 @@ function readReplayIntakeRequest(req: express.Request): Promise { Promise.all([segmentPromise, metadataPromise]) .then(([{ segment, ...segmentFile }, metadata]) => ({ - intakeType: 'replay' as const, - isBridge: false as const, + ...infos, segmentFile, metadata, segment, From 64156e481783928ce50d4c9ea7e2800864949ba9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Zugmeyer?= Date: Mon, 4 Sep 2023 12:35:56 +0200 Subject: [PATCH 12/12] =?UTF-8?q?=E2=9C=85=20[RUM-253]=20add=20E2E=20tests?= =?UTF-8?q?=20related=20to=20compression?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test/e2e/scenario/transport.scenario.ts | 48 +++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 test/e2e/scenario/transport.scenario.ts diff --git a/test/e2e/scenario/transport.scenario.ts b/test/e2e/scenario/transport.scenario.ts new file mode 100644 index 0000000000..019cfd31f9 --- /dev/null +++ b/test/e2e/scenario/transport.scenario.ts @@ -0,0 +1,48 @@ +import { ExperimentalFeature } from '@datadog/browser-core' +import { createTest, flushEvents } from '../lib/framework' +import { withBrowserLogs } from '../lib/helpers/browser' + +describe('transport', () => { + describe('data compression', () => { + createTest('send RUM data compressed') + .withRum({ + enableExperimentalFeatures: [ExperimentalFeature.COMPRESS_BATCH], + }) + .run(async ({ intakeRegistry }) => { + await flushEvents() + + expect(intakeRegistry.rumRequests.length).toBe(2) + + const plainRequest = intakeRegistry.rumRequests.find((request) => request.encoding === null) + const deflateRequest = intakeRegistry.rumRequests.find((request) => request.encoding === 'deflate') + + // The last view update should be sent without compression + expect(plainRequest!.events).toEqual([ + jasmine.objectContaining({ + type: 'view', + }), + ]) + + // Other data should be sent encoded + expect(deflateRequest!.events.length).toBeGreaterThan(0) + }) + + createTest("displays a message if the worker can't be started") + .withRum({ + enableExperimentalFeatures: [ExperimentalFeature.COMPRESS_BATCH], + }) + .withBasePath('/no-blob-worker-csp') + .run(async ({ intakeRegistry }) => { + await flushEvents() + + expect(intakeRegistry.rumRequests.length).toBe(0) + + await withBrowserLogs((logs) => { + const failedToStartLog = logs.find((log) => log.message.includes('Datadog RUM failed to start')) + const cspDocLog = logs.find((log) => log.message.includes('Please make sure CSP')) + expect(failedToStartLog).withContext("'Failed to start' log").toBeTruthy() + expect(cspDocLog).withContext("'CSP doc' log").toBeTruthy() + }) + }) + }) +})