diff --git a/packages/core/src/domain/telemetry/telemetry.ts b/packages/core/src/domain/telemetry/telemetry.ts index 6d383b347d..3fff1b08dc 100644 --- a/packages/core/src/domain/telemetry/telemetry.ts +++ b/packages/core/src/domain/telemetry/telemetry.ts @@ -143,14 +143,15 @@ export function addTelemetryDebug(message: string, context?: Context) { ) } -export function addTelemetryError(e: unknown) { +export function addTelemetryError(e: unknown, context?: Context) { addTelemetry( assign( { type: TelemetryType.log, status: StatusType.error, }, - formatError(e) + formatError(e), + context ) ) } diff --git a/packages/rum/src/domain/segmentCollection/segment.ts b/packages/rum/src/domain/segmentCollection/segment.ts index debdeff26a..022fdbd44d 100644 --- a/packages/rum/src/domain/segmentCollection/segment.ts +++ b/packages/rum/src/domain/segmentCollection/segment.ts @@ -5,6 +5,8 @@ import { RecordType } from '../../types' import * as replayStats from '../replayStats' import type { DeflateWorker } from './startDeflateWorker' +// Arbitrary id, will be replaced when we have multiple parallel streams. +const STREAM_ID = 1 let nextId = 0 export type FlushReason = Exclude | 'stop' @@ -15,6 +17,7 @@ export class Segment { public readonly metadata: BrowserSegmentMetadata private id = nextId++ + private pendingWriteCount = 0 constructor( private worker: DeflateWorker, @@ -41,22 +44,30 @@ export class Segment { replayStats.addSegment(viewId) replayStats.addRecord(viewId) + let rawBytesCount = 0 + let compressedBytesCount = 0 + const compressedData: Uint8Array[] = [] const { stop: removeMessageListener } = addEventListener( worker, 'message', ({ data }: MessageEvent) => { - if (data.type === 'errored' || data.type === 'initialized') { + if (data.type !== 'wrote') { return } if (data.id === this.id) { + this.pendingWriteCount -= 1 replayStats.addWroteData(viewId, data.additionalBytesCount) - if (data.type === 'flushed') { - onFlushed(data.result, data.rawBytesCount) + rawBytesCount += data.additionalBytesCount + compressedBytesCount += data.result.length + compressedData.push(data.result) + if (this.flushReason && this.pendingWriteCount === 0) { + compressedData.push(data.trailer) + onFlushed(concatBuffers(compressedData), rawBytesCount) removeMessageListener() } else { - onWrote(data.compressedBytesCount) + onWrote(compressedBytesCount) } } else if (data.id > this.id) { // Messages should be received in the same order as they are sent, so if we receive a @@ -73,7 +84,7 @@ export class Segment { } ) sendToExtension('record', { record: initialRecord, segment: this.metadata }) - this.worker.postMessage({ data: `{"records":[${JSON.stringify(initialRecord)}`, id: this.id, action: 'write' }) + this.write(`{"records":[${JSON.stringify(initialRecord)}`) } addRecord(record: BrowserRecord): void { @@ -83,15 +94,36 @@ export class Segment { replayStats.addRecord(this.metadata.view.id) this.metadata.has_full_snapshot ||= record.type === RecordType.FullSnapshot sendToExtension('record', { record, segment: this.metadata }) - this.worker.postMessage({ data: `,${JSON.stringify(record)}`, id: this.id, action: 'write' }) + this.write(`,${JSON.stringify(record)}`) } flush(reason: FlushReason) { + this.write(`],${JSON.stringify(this.metadata).slice(1)}\n`) this.worker.postMessage({ - data: `],${JSON.stringify(this.metadata).slice(1)}\n`, - id: this.id, - action: 'flush', + action: 'reset', + streamId: STREAM_ID, }) this.flushReason = reason } + + private write(data: string) { + this.pendingWriteCount += 1 + this.worker.postMessage({ + data, + id: this.id, + streamId: STREAM_ID, + action: 'write', + }) + } +} + +function concatBuffers(buffers: Uint8Array[]) { + const length = buffers.reduce((total, buffer) => total + buffer.length, 0) + const result = new Uint8Array(length) + let offset = 0 + for (const buffer of buffers) { + result.set(buffer, offset) + offset += buffer.length + } + return result } diff --git a/packages/rum/src/domain/segmentCollection/startDeflateWorker.spec.ts b/packages/rum/src/domain/segmentCollection/startDeflateWorker.spec.ts index 24c1069718..8f814915a8 100644 --- a/packages/rum/src/domain/segmentCollection/startDeflateWorker.spec.ts +++ b/packages/rum/src/domain/segmentCollection/startDeflateWorker.spec.ts @@ -1,9 +1,11 @@ import type { RawTelemetryEvent } from '@datadog/browser-core' import { display, isIE, noop, resetTelemetry, startFakeTelemetry } from '@datadog/browser-core' -import type { DeflateWorkerResponse } from '@datadog/browser-worker' import { MockWorker } from '../../../test' -import type { DeflateWorker } from './startDeflateWorker' -import { startDeflateWorker, resetDeflateWorkerState, createDeflateWorker } from './startDeflateWorker' +import type { createDeflateWorker } from './startDeflateWorker' +import { startDeflateWorker, resetDeflateWorkerState } from './startDeflateWorker' + +// Arbitrary stream ids used for tests +const TEST_STREAM_ID = 5 describe('startDeflateWorker', () => { let deflateWorker: MockWorker @@ -164,128 +166,17 @@ describe('startDeflateWorker', () => { startDeflateWorker(noop, createDeflateWorkerSpy) deflateWorker.processAllMessages() - deflateWorker.dispatchErrorMessage('boom') + deflateWorker.dispatchErrorMessage('boom', TEST_STREAM_ID) expect(telemetryEvents).toEqual([ { type: 'log', status: 'error', message: 'Uncaught "boom"', error: { stack: jasmine.any(String) }, + worker_version: 'dev', + stream_id: TEST_STREAM_ID, }, ]) }) }) }) - -describe('createDeflateWorker', () => { - beforeEach(() => { - if (isIE()) { - pending('no TextEncoder support') - } - }) - it('buffers data and responds with the buffer deflated compressedBytesCount when writing', (done) => { - const deflateWorker = createDeflateWorker() - listen(deflateWorker, 3, (events) => { - expect(events).toEqual([ - { type: 'wrote', id: 0, compressedBytesCount: 11, additionalBytesCount: 3 }, - { type: 'wrote', id: 1, compressedBytesCount: 20, additionalBytesCount: 3 }, - { type: 'wrote', id: 2, compressedBytesCount: 29, additionalBytesCount: 3 }, - ]) - done() - }) - deflateWorker.postMessage({ id: 0, action: 'write', data: 'foo' }) - deflateWorker.postMessage({ id: 1, action: 'write', data: 'bar' }) - deflateWorker.postMessage({ id: 2, action: 'write', data: 'baz' }) - }) - - it('responds with the resulting bytes when completing', (done) => { - const deflateWorker = createDeflateWorker() - listen(deflateWorker, 2, (events) => { - expect(events).toEqual([ - { type: 'wrote', id: 0, compressedBytesCount: 11, additionalBytesCount: 3 }, - { - type: 'flushed', - id: 1, - result: new Uint8Array([120, 156, 74, 203, 207, 7, 0, 0, 0, 255, 255, 3, 0, 2, 130, 1, 69]), - additionalBytesCount: 0, - rawBytesCount: 3, - }, - ]) - done() - }) - deflateWorker.postMessage({ id: 0, action: 'write', data: 'foo' }) - deflateWorker.postMessage({ id: 1, action: 'flush' }) - }) - - it('writes the remaining data specified by "flush"', (done) => { - const deflateWorker = createDeflateWorker() - listen(deflateWorker, 1, (events) => { - expect(events).toEqual([ - { - type: 'flushed', - id: 0, - result: new Uint8Array([120, 156, 74, 203, 207, 7, 0, 0, 0, 255, 255, 3, 0, 2, 130, 1, 69]), - additionalBytesCount: 3, - rawBytesCount: 3, - }, - ]) - done() - }) - deflateWorker.postMessage({ id: 0, action: 'flush', data: 'foo' }) - }) - - it('flushes several deflates one after the other', (done) => { - const deflateWorker = createDeflateWorker() - listen(deflateWorker, 4, (events) => { - expect(events).toEqual([ - { - type: 'wrote', - id: 0, - compressedBytesCount: 11, - additionalBytesCount: 3, - }, - { - type: 'flushed', - id: 1, - result: new Uint8Array([120, 156, 74, 203, 207, 7, 0, 0, 0, 255, 255, 3, 0, 2, 130, 1, 69]), - additionalBytesCount: 0, - rawBytesCount: 3, - }, - { - type: 'wrote', - id: 2, - compressedBytesCount: 11, - additionalBytesCount: 3, - }, - { - type: 'flushed', - id: 3, - result: new Uint8Array([120, 156, 74, 74, 44, 2, 0, 0, 0, 255, 255, 3, 0, 2, 93, 1, 54]), - additionalBytesCount: 0, - rawBytesCount: 3, - }, - ]) - done() - }) - deflateWorker.postMessage({ id: 0, action: 'write', data: 'foo' }) - deflateWorker.postMessage({ id: 1, action: 'flush' }) - deflateWorker.postMessage({ id: 2, action: 'write', data: 'bar' }) - deflateWorker.postMessage({ id: 3, action: 'flush' }) - }) - - function listen( - deflateWorker: DeflateWorker, - expectedResponseCount: number, - onDone: (responses: DeflateWorkerResponse[]) => void - ) { - const responses: DeflateWorkerResponse[] = [] - const listener = (event: { data: DeflateWorkerResponse }) => { - const responsesCount = responses.push(event.data) - if (responsesCount === expectedResponseCount) { - deflateWorker.removeEventListener('message', listener) - onDone(responses) - } - } - deflateWorker.addEventListener('message', listener) - } -}) diff --git a/packages/rum/src/domain/segmentCollection/startDeflateWorker.ts b/packages/rum/src/domain/segmentCollection/startDeflateWorker.ts index 507f5b6e0f..8cb4dd1883 100644 --- a/packages/rum/src/domain/segmentCollection/startDeflateWorker.ts +++ b/packages/rum/src/domain/segmentCollection/startDeflateWorker.ts @@ -28,6 +28,7 @@ type DeflateWorkerState = | { status: DeflateWorkerStatus.Initialized worker: DeflateWorker + version: string } export interface DeflateWorker extends Worker { @@ -86,9 +87,9 @@ export function doStartDeflateWorker(createDeflateWorkerImpl = createDeflateWork addEventListener(worker, 'error', onError) addEventListener(worker, 'message', ({ data }: MessageEvent) => { if (data.type === 'errored') { - onError(data.error) + onError(data.error, data.streamId) } else if (data.type === 'initialized') { - onInitialized(worker) + onInitialized(worker, data.version) } }) worker.postMessage({ action: 'init' }) @@ -98,14 +99,14 @@ export function doStartDeflateWorker(createDeflateWorkerImpl = createDeflateWork } } -function onInitialized(worker: DeflateWorker) { +function onInitialized(worker: DeflateWorker, version: string) { if (state.status === DeflateWorkerStatus.Loading) { state.callbacks.forEach((callback) => callback(worker)) - state = { status: DeflateWorkerStatus.Initialized, worker } + state = { status: DeflateWorkerStatus.Initialized, worker, version } } } -function onError(error: unknown) { +function onError(error: unknown, streamId?: number) { if (state.status === DeflateWorkerStatus.Loading) { display.error('Session Replay recording failed to start: an error occurred while creating the Worker:', error) if (error instanceof Event || (error instanceof Error && isMessageCspRelated(error.message))) { @@ -119,7 +120,10 @@ function onError(error: unknown) { state.callbacks.forEach((callback) => callback()) state = { status: DeflateWorkerStatus.Error } } else { - addTelemetryError(error) + addTelemetryError(error, { + worker_version: state.status === DeflateWorkerStatus.Initialized && state.version, + stream_id: streamId, + }) } } diff --git a/packages/rum/test/mockWorker.ts b/packages/rum/test/mockWorker.ts index 1bf324fabb..2cf0d695ec 100644 --- a/packages/rum/test/mockWorker.ts +++ b/packages/rum/test/mockWorker.ts @@ -9,8 +9,8 @@ export class MockWorker implements DeflateWorker { public onerror = null readonly pendingMessages: DeflateWorkerAction[] = [] - private rawBytesCount = 0 - private deflatedData: Uint8Array[] = [] + + private streams = new Map() private listeners: { message: DeflateWorkerListener[] error: Array<(error: unknown) => void> @@ -70,42 +70,38 @@ export class MockWorker implements DeflateWorker { listener({ data: { type: 'initialized', + version: 'dev', }, }) ) break case 'write': { - const additionalBytesCount = this.pushData(message.data) + let stream = this.streams.get(message.streamId) + if (!stream) { + stream = [] + this.streams.set(message.streamId, stream) + } + // In the mock worker, for simplicity, we'll just use the UTF-8 encoded string instead of deflating it. + const binaryData = new TextEncoder().encode(message.data) + stream.push(binaryData) + this.listeners.message.forEach((listener) => listener({ data: { type: 'wrote', id: message.id, - compressedBytesCount: uint8ArraysSize(this.deflatedData), - additionalBytesCount, + streamId: message.streamId, + result: binaryData, + trailer: new Uint8Array([32]), // emulate a trailer with a single space + additionalBytesCount: binaryData.length, }, }) ) } break - case 'flush': - { - const additionalBytesCount = this.pushData(message.data) - this.listeners.message.forEach((listener) => - listener({ - data: { - type: 'flushed', - id: message.id, - result: mergeUint8Arrays(this.deflatedData), - rawBytesCount: this.rawBytesCount, - additionalBytesCount, - }, - }) - ) - this.deflatedData.length = 0 - this.rawBytesCount = 0 - } + case 'reset': + this.streams.delete(message.streamId) break } } @@ -116,29 +112,7 @@ export class MockWorker implements DeflateWorker { this.listeners.error.forEach((listener) => listener(error)) } - dispatchErrorMessage(error: Error | string) { - this.listeners.message.forEach((listener) => listener({ data: { type: 'errored', error } })) - } - - private pushData(data?: string) { - const encodedData = new TextEncoder().encode(data) - this.rawBytesCount += encodedData.length - // In the mock worker, for simplicity, we'll just use the UTF-8 encoded string instead of deflating it. - this.deflatedData.push(encodedData) - return encodedData.length - } -} - -function uint8ArraysSize(arrays: Uint8Array[]) { - return arrays.reduce((sum, bytes) => sum + bytes.length, 0) -} - -function mergeUint8Arrays(arrays: Uint8Array[]) { - const result = new Uint8Array(uint8ArraysSize(arrays)) - let offset = 0 - for (const bytes of arrays) { - result.set(bytes, offset) - offset += bytes.byteLength + dispatchErrorMessage(error: Error | string, streamId?: number) { + this.listeners.message.forEach((listener) => listener({ data: { type: 'errored', error, streamId } })) } - return result } diff --git a/packages/worker/src/boot/startWorker.spec.ts b/packages/worker/src/boot/startWorker.spec.ts new file mode 100644 index 0000000000..e6108c7a7e --- /dev/null +++ b/packages/worker/src/boot/startWorker.spec.ts @@ -0,0 +1,207 @@ +import type { DeflateWorkerAction, DeflateWorkerResponse } from '../types' +import type { WorkerScope } from './startWorker' +import { startWorker } from './startWorker' + +// Arbitrary stream ids used for tests +const TEST_STREAM_ID = 5 +const OTHER_TEST_STREAM_ID = 6 + +// Zlib streams using a default compression are starting with bytes 120 156 (0x78 0x9c) +// https://stackoverflow.com/a/9050274 +const STREAM_START = [120, 156] + +// Deflate block generated when compressing "foo" alone +const FOO_COMPRESSED = [74, 203, 207, 7, 0, 0, 0, 255, 255] +// Zlib trailer when finishing the stream after compressing "foo" +const FOO_COMPRESSED_TRAILER = [3, 0, 2, 130, 1, 69] // empty deflate block + adler32 checksum + +// Deflate block generated when compressing "bar" alone +const BAR_COMPRESSED = [74, 74, 44, 2, 0, 0, 0, 255, 255] +// Zlib trailer when finishing the stream after compressing "bar" +const BAR_COMPRESSED_TRAILER = [3, 0, 2, 93, 1, 54] + +// Deflate block generated when compressing "baz" alone +const BAZ_COMPRESSED = [74, 74, 172, 2, 0, 0, 0, 255, 255] + +// Zlib trailer when finishing the stream after compressing "foo" then "bar" +const FOO_BAR_COMPRESSED_TRAILER = [3, 0, 8, 171, 2, 122] +// Zlib trailer when finishing the stream after compressing "foo" then "bar" then "baz" +const FOO_BAR_BAZ_COMPRESSED_TRAILER = [3, 0, 18, 123, 3, 183] +// Zlib trailer when finishing the stream after compressing "foo" then "baz" +const FOO_BAZ_COMPRESSED_TRAILER = [3, 0, 8, 179, 2, 130] + +describe('startWorker', () => { + let workerScope: { + addEventListener: jasmine.Spy + postMessage: jasmine.Spy + } + + beforeEach(() => { + workerScope = { + addEventListener: jasmine.createSpy(), + postMessage: jasmine.createSpy(), + } + startWorker(workerScope) + }) + + function emulateAction(message: DeflateWorkerAction): DeflateWorkerResponse { + workerScope.postMessage.calls.reset() + workerScope.addEventListener.calls.allArgs().forEach(([eventName, listener]) => { + if (eventName === 'message') { + listener({ data: message } as MessageEvent) + } + }) + return workerScope.postMessage.calls.mostRecent()?.args[0] + } + + it('buffers data and responds with the buffer deflated result when writing', () => { + expect(emulateAction({ id: 0, streamId: TEST_STREAM_ID, action: 'write', data: 'foo' })).toEqual({ + type: 'wrote', + id: 0, + streamId: TEST_STREAM_ID, + result: new Uint8Array([...STREAM_START, ...FOO_COMPRESSED]), + trailer: new Uint8Array(FOO_COMPRESSED_TRAILER), + additionalBytesCount: 3, + }) + + expect(emulateAction({ id: 1, streamId: TEST_STREAM_ID, action: 'write', data: 'bar' })).toEqual({ + type: 'wrote', + id: 1, + streamId: TEST_STREAM_ID, + result: new Uint8Array(BAR_COMPRESSED), + trailer: new Uint8Array(FOO_BAR_COMPRESSED_TRAILER), + additionalBytesCount: 3, + }) + + expect(emulateAction({ id: 2, streamId: TEST_STREAM_ID, action: 'write', data: 'baz' })).toEqual({ + type: 'wrote', + id: 2, + streamId: TEST_STREAM_ID, + result: new Uint8Array(BAZ_COMPRESSED), + trailer: new Uint8Array(FOO_BAR_BAZ_COMPRESSED_TRAILER), + additionalBytesCount: 3, + }) + }) + + it('resets the stream state', () => { + expect(emulateAction({ action: 'write', id: 0, streamId: TEST_STREAM_ID, data: 'foo' })).toEqual({ + type: 'wrote', + id: 0, + streamId: TEST_STREAM_ID, + result: new Uint8Array([...STREAM_START, ...FOO_COMPRESSED]), + trailer: new Uint8Array(FOO_COMPRESSED_TRAILER), + additionalBytesCount: 3, + }) + expect(emulateAction({ action: 'reset', streamId: TEST_STREAM_ID })).toBeUndefined() + expect(emulateAction({ action: 'write', id: 1, streamId: TEST_STREAM_ID, data: 'bar' })).toEqual({ + type: 'wrote', + id: 1, + streamId: TEST_STREAM_ID, + // As the result starts with the beginning of a stream, we are sure that `reset` was + // effective + result: new Uint8Array([...STREAM_START, ...BAR_COMPRESSED]), + trailer: new Uint8Array(BAR_COMPRESSED_TRAILER), + additionalBytesCount: 3, + }) + expect(emulateAction({ action: 'reset', streamId: TEST_STREAM_ID })).toBeUndefined() + }) + + it('support writing to different streams at the same time', () => { + expect( + emulateAction({ + id: 0, + streamId: TEST_STREAM_ID, + action: 'write', + data: 'foo', + }) + ).toEqual({ + type: 'wrote', + id: 0, + streamId: TEST_STREAM_ID, + result: new Uint8Array([...STREAM_START, ...FOO_COMPRESSED]), + trailer: new Uint8Array(FOO_COMPRESSED_TRAILER), + additionalBytesCount: 3, + }) + + expect( + emulateAction({ + id: 1, + streamId: OTHER_TEST_STREAM_ID, + action: 'write', + data: 'bar', + }) + ).toEqual({ + type: 'wrote', + id: 1, + streamId: OTHER_TEST_STREAM_ID, + result: new Uint8Array([...STREAM_START, ...BAR_COMPRESSED]), + trailer: new Uint8Array(BAR_COMPRESSED_TRAILER), + additionalBytesCount: 3, + }) + + expect( + emulateAction({ + streamId: OTHER_TEST_STREAM_ID, + action: 'reset', + }) + ).toBeUndefined() + + expect( + emulateAction({ + id: 2, + streamId: TEST_STREAM_ID, + action: 'write', + data: 'baz', + }) + ).toEqual({ + type: 'wrote', + id: 2, + streamId: TEST_STREAM_ID, + result: new Uint8Array(BAZ_COMPRESSED), + trailer: new Uint8Array(FOO_BAZ_COMPRESSED_TRAILER), + additionalBytesCount: 3, + }) + }) + + it('reports an error when an unexpected exception occurs', () => { + expect(emulateAction(null as any)).toEqual({ + type: 'errored', + error: jasmine.any(TypeError), + streamId: undefined, + }) + }) + + it('reports an error when an unexpected exception occurs while writing on a stream', () => { + if (!window.TextEncoder) { + pending('No TextEncoder support') + } + spyOn(TextEncoder.prototype, 'encode').and.callFake(() => { + throw new Error('Something went wrong!') + }) + expect( + emulateAction({ + id: 2, + streamId: TEST_STREAM_ID, + action: 'write', + data: 'baz', + }) + ).toEqual({ + type: 'errored', + error: new Error('Something went wrong!'), + streamId: TEST_STREAM_ID, + }) + }) + + it('use the string representation of the error when it fails to send it through postMessage', () => { + workerScope.postMessage.and.callFake((response) => { + if (response.type === 'errored' && response.error instanceof Error) { + throw new DOMException("Failed to execute 'postMessage' on 'WorkerScope'") + } + }) + expect(emulateAction(null as any)).toEqual({ + type: 'errored', + error: jasmine.stringContaining('TypeError'), + streamId: undefined, + }) + }) +}) diff --git a/packages/worker/src/boot/startWorker.ts b/packages/worker/src/boot/startWorker.ts index 66481c6201..baa4f3ba5f 100644 --- a/packages/worker/src/boot/startWorker.ts +++ b/packages/worker/src/boot/startWorker.ts @@ -1,76 +1,126 @@ /* eslint-disable local-rules/disallow-zone-js-patched-values */ import { Deflate, constants, string2buf } from '../domain/deflate' -import type { DeflateWorkerAction } from '../types' +import type { DeflateWorkerAction, DeflateWorkerResponse } from '../types' -export function startWorker() { - monitor(() => { - let deflate = new Deflate() - let rawBytesCount = 0 - self.addEventListener( - 'message', - monitor((event: MessageEvent) => { - const data = event.data - switch (data.action) { - case 'init': - self.postMessage({ - type: 'initialized', - }) - break - case 'write': { - const additionalBytesCount = pushData(data.data) - self.postMessage({ - type: 'wrote', - id: data.id, - compressedBytesCount: deflate.chunks.reduce((total, chunk) => total + chunk.length, 0), - additionalBytesCount, - }) - break - } - case 'flush': { - const additionalBytesCount = data.data ? pushData(data.data) : 0 - deflate.push('', constants.Z_FINISH) - self.postMessage({ - type: 'flushed', - id: data.id, - result: deflate.result, - additionalBytesCount, - rawBytesCount, - }) - deflate = new Deflate() - rawBytesCount = 0 - break - } +declare const __BUILD_ENV__SDK_VERSION__: string + +export interface WorkerScope { + addEventListener(eventName: 'message', listener: (event: MessageEvent) => void): void + postMessage(response: DeflateWorkerResponse): void +} + +export function startWorker(workerScope: WorkerScope = self) { + try { + const streams = new Map() + workerScope.addEventListener('message', (event: MessageEvent) => { + try { + const response = handleAction(streams, event.data) + if (response) { + workerScope.postMessage(response) } - }) - ) + } catch (error) { + sendError(workerScope, error, event.data && 'streamId' in event.data ? event.data.streamId : undefined) + } + }) + } catch (error) { + sendError(workerScope, error) + } +} + +function sendError(workerScope: WorkerScope, error: unknown, streamId?: number) { + try { + workerScope.postMessage({ + type: 'errored', + error: error as Error, + streamId, + }) + } catch (_) { + // DATA_CLONE_ERR, cf https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm + workerScope.postMessage({ + type: 'errored', + error: String(error), + streamId, + }) + } +} + +function handleAction(streams: Map, message: DeflateWorkerAction): DeflateWorkerResponse | undefined { + switch (message.action) { + case 'init': + return { + type: 'initialized', + version: __BUILD_ENV__SDK_VERSION__, + } + + case 'write': { + let deflate = streams.get(message.streamId) + if (!deflate) { + deflate = new Deflate() + streams.set(message.streamId, deflate) + } + const previousChunksLength = deflate.chunks.length - function pushData(data: string) { // TextEncoder is not supported on old browser version like Edge 18, therefore we use string2buf - const binaryData = string2buf(data) + const binaryData = string2buf(message.data) deflate.push(binaryData, constants.Z_SYNC_FLUSH) - rawBytesCount += binaryData.length - return binaryData.length - } - })() -} -function monitor(fn: (...args: Args) => Result): (...args: Args) => Result | undefined { - return (...args) => { - try { - return fn(...args) - } catch (e) { - try { - self.postMessage({ - type: 'errored', - error: e, - }) - } catch (_) { - // DATA_CLONE_ERR, cf https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm - self.postMessage({ - type: 'errored', - error: String(e), - }) + return { + type: 'wrote', + id: message.id, + streamId: message.streamId, + result: concatBuffers(deflate.chunks.slice(previousChunksLength)), + trailer: makeTrailer(deflate), + additionalBytesCount: binaryData.length, } } + + case 'reset': + streams.delete(message.streamId) + break } } + +function concatBuffers(buffers: Uint8Array[]) { + const length = buffers.reduce((total, buffer) => total + buffer.length, 0) + const result = new Uint8Array(length) + let offset = 0 + for (const buffer of buffers) { + result.set(buffer, offset) + offset += buffer.length + } + return result +} + +/** + * Creates a buffer of bytes to append to the end of the Zlib stream to finish it. It is composed of + * two parts: + * * an empty deflate block as specified in https://www.rfc-editor.org/rfc/rfc1951.html#page-13 , + * which happens to be always 3, 0 + * * an adler32 checksum as specified in https://www.rfc-editor.org/rfc/rfc1950.html#page-4 + * + * This is essentially what pako writes to the stream when invoking `deflate.push('', + * constants.Z_FINISH)` operation after some data has been pushed with "Z_SYNC_FLUSH", but doing so + * ends the stream and no more data can be pushed into it. + * + * Since we want to let the main thread end the stream synchronously at any point without needing to + * send a message to the worker to flush it, we send back a trailer in each "wrote" response so the + * main thread can just append it to the compressed data to end the stream. + * + * Beside creating a valid zlib stream, those 6 bits are expected to be here so the Datadog backend + * can merge streams together (see internal doc). + */ +function makeTrailer(deflate: Deflate): Uint8Array { + /* eslint-disable no-bitwise */ + const adler = deflate.strm.adler + return new Uint8Array([ + // Empty deflate block + 3, + 0, + // Adler32 checksum + (adler >>> 24) & 0xff, + (adler >>> 16) & 0xff, + (adler >>> 8) & 0xff, + adler & 0xff, + ]) + /* eslint-enable no-bitwise */ +} diff --git a/packages/worker/src/domain/deflate.d.ts b/packages/worker/src/domain/deflate.d.ts index 7e4dbc80e2..1cd22de574 100644 --- a/packages/worker/src/domain/deflate.d.ts +++ b/packages/worker/src/domain/deflate.d.ts @@ -1,6 +1,7 @@ export class Deflate { chunks: Uint8Array[] result: Uint8Array + strm: { adler: number } push(data: Uint8Array | ArrayBuffer | string, flushMode: number | boolean): boolean } diff --git a/packages/worker/src/types.ts b/packages/worker/src/types.ts index 0dfe3cd842..fd48b0e809 100644 --- a/packages/worker/src/types.ts +++ b/packages/worker/src/types.ts @@ -9,39 +9,33 @@ export type DeflateWorkerAction = | { action: 'write' id: number + streamId: number data: string } - // Action to send when finishing to write some data. The worker will respond with a 'flushed' - // response, with the same id, measurements of the wrote data bytes count and the complete deflate - // data. + // Action to send when all data has been written and the state of the stream needs to be reset. | { - action: 'flush' - id: number - data?: string + action: 'reset' + streamId: number } export type DeflateWorkerResponse = // Response to 'init' action | { type: 'initialized' + version: string } // Response to 'write' action | { type: 'wrote' id: number - compressedBytesCount: number - additionalBytesCount: number - } - // Response to 'flush' action - | { - type: 'flushed' - id: number + streamId: number result: Uint8Array + trailer: Uint8Array additionalBytesCount: number - rawBytesCount: number } // Could happen at any time when something goes wrong in the worker | { type: 'errored' + streamId?: number error: Error | string }