Skip to content

Commit

Permalink
Merge 093e058 in staging-30
Browse files Browse the repository at this point in the history
Co-authored-by: Benoît Zugmeyer <[email protected]>
  • Loading branch information
dd-mergequeue[bot] and BenoitZugmeyer authored Jul 24, 2023
2 parents c86a90d + 093e058 commit b8fa9df
Show file tree
Hide file tree
Showing 9 changed files with 412 additions and 258 deletions.
5 changes: 3 additions & 2 deletions packages/core/src/domain/telemetry/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,15 @@ export function addTelemetryDebug(message: string, context?: Context) {
)
}

export function addTelemetryError(e: unknown) {
export function addTelemetryError(e: unknown, context?: Context) {
addTelemetry(
assign(
{
type: TelemetryType.log,
status: StatusType.error,
},
formatError(e)
formatError(e),
context
)
)
}
Expand Down
50 changes: 41 additions & 9 deletions packages/rum/src/domain/segmentCollection/segment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { RecordType } from '../../types'
import * as replayStats from '../replayStats'
import type { DeflateWorker } from './startDeflateWorker'

// Arbitrary id, will be replaced when we have multiple parallel streams.
const STREAM_ID = 1
let nextId = 0

export type FlushReason = Exclude<CreationReason, 'init'> | 'stop'
Expand All @@ -15,6 +17,7 @@ export class Segment {
public readonly metadata: BrowserSegmentMetadata

private id = nextId++
private pendingWriteCount = 0

constructor(
private worker: DeflateWorker,
Expand All @@ -41,22 +44,30 @@ export class Segment {

replayStats.addSegment(viewId)
replayStats.addRecord(viewId)
let rawBytesCount = 0
let compressedBytesCount = 0
const compressedData: Uint8Array[] = []

const { stop: removeMessageListener } = addEventListener(
worker,
'message',
({ data }: MessageEvent<DeflateWorkerResponse>) => {
if (data.type === 'errored' || data.type === 'initialized') {
if (data.type !== 'wrote') {
return
}

if (data.id === this.id) {
this.pendingWriteCount -= 1
replayStats.addWroteData(viewId, data.additionalBytesCount)
if (data.type === 'flushed') {
onFlushed(data.result, data.rawBytesCount)
rawBytesCount += data.additionalBytesCount
compressedBytesCount += data.result.length
compressedData.push(data.result)
if (this.flushReason && this.pendingWriteCount === 0) {
compressedData.push(data.trailer)
onFlushed(concatBuffers(compressedData), rawBytesCount)
removeMessageListener()
} else {
onWrote(data.compressedBytesCount)
onWrote(compressedBytesCount)
}
} else if (data.id > this.id) {
// Messages should be received in the same order as they are sent, so if we receive a
Expand All @@ -73,7 +84,7 @@ export class Segment {
}
)
sendToExtension('record', { record: initialRecord, segment: this.metadata })
this.worker.postMessage({ data: `{"records":[${JSON.stringify(initialRecord)}`, id: this.id, action: 'write' })
this.write(`{"records":[${JSON.stringify(initialRecord)}`)
}

addRecord(record: BrowserRecord): void {
Expand All @@ -83,15 +94,36 @@ export class Segment {
replayStats.addRecord(this.metadata.view.id)
this.metadata.has_full_snapshot ||= record.type === RecordType.FullSnapshot
sendToExtension('record', { record, segment: this.metadata })
this.worker.postMessage({ data: `,${JSON.stringify(record)}`, id: this.id, action: 'write' })
this.write(`,${JSON.stringify(record)}`)
}

flush(reason: FlushReason) {
this.write(`],${JSON.stringify(this.metadata).slice(1)}\n`)
this.worker.postMessage({
data: `],${JSON.stringify(this.metadata).slice(1)}\n`,
id: this.id,
action: 'flush',
action: 'reset',
streamId: STREAM_ID,
})
this.flushReason = reason
}

private write(data: string) {
this.pendingWriteCount += 1
this.worker.postMessage({
data,
id: this.id,
streamId: STREAM_ID,
action: 'write',
})
}
}

function concatBuffers(buffers: Uint8Array[]) {
const length = buffers.reduce((total, buffer) => total + buffer.length, 0)
const result = new Uint8Array(length)
let offset = 0
for (const buffer of buffers) {
result.set(buffer, offset)
offset += buffer.length
}
return result
}
125 changes: 8 additions & 117 deletions packages/rum/src/domain/segmentCollection/startDeflateWorker.spec.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import type { RawTelemetryEvent } from '@datadog/browser-core'
import { display, isIE, noop, resetTelemetry, startFakeTelemetry } from '@datadog/browser-core'
import type { DeflateWorkerResponse } from '@datadog/browser-worker'
import { MockWorker } from '../../../test'
import type { DeflateWorker } from './startDeflateWorker'
import { startDeflateWorker, resetDeflateWorkerState, createDeflateWorker } from './startDeflateWorker'
import type { createDeflateWorker } from './startDeflateWorker'
import { startDeflateWorker, resetDeflateWorkerState } from './startDeflateWorker'

// Arbitrary stream ids used for tests
const TEST_STREAM_ID = 5

describe('startDeflateWorker', () => {
let deflateWorker: MockWorker
Expand Down Expand Up @@ -164,128 +166,17 @@ describe('startDeflateWorker', () => {
startDeflateWorker(noop, createDeflateWorkerSpy)
deflateWorker.processAllMessages()

deflateWorker.dispatchErrorMessage('boom')
deflateWorker.dispatchErrorMessage('boom', TEST_STREAM_ID)
expect(telemetryEvents).toEqual([
{
type: 'log',
status: 'error',
message: 'Uncaught "boom"',
error: { stack: jasmine.any(String) },
worker_version: 'dev',
stream_id: TEST_STREAM_ID,
},
])
})
})
})

describe('createDeflateWorker', () => {
beforeEach(() => {
if (isIE()) {
pending('no TextEncoder support')
}
})
it('buffers data and responds with the buffer deflated compressedBytesCount when writing', (done) => {
const deflateWorker = createDeflateWorker()
listen(deflateWorker, 3, (events) => {
expect(events).toEqual([
{ type: 'wrote', id: 0, compressedBytesCount: 11, additionalBytesCount: 3 },
{ type: 'wrote', id: 1, compressedBytesCount: 20, additionalBytesCount: 3 },
{ type: 'wrote', id: 2, compressedBytesCount: 29, additionalBytesCount: 3 },
])
done()
})
deflateWorker.postMessage({ id: 0, action: 'write', data: 'foo' })
deflateWorker.postMessage({ id: 1, action: 'write', data: 'bar' })
deflateWorker.postMessage({ id: 2, action: 'write', data: 'baz' })
})

it('responds with the resulting bytes when completing', (done) => {
const deflateWorker = createDeflateWorker()
listen(deflateWorker, 2, (events) => {
expect(events).toEqual([
{ type: 'wrote', id: 0, compressedBytesCount: 11, additionalBytesCount: 3 },
{
type: 'flushed',
id: 1,
result: new Uint8Array([120, 156, 74, 203, 207, 7, 0, 0, 0, 255, 255, 3, 0, 2, 130, 1, 69]),
additionalBytesCount: 0,
rawBytesCount: 3,
},
])
done()
})
deflateWorker.postMessage({ id: 0, action: 'write', data: 'foo' })
deflateWorker.postMessage({ id: 1, action: 'flush' })
})

it('writes the remaining data specified by "flush"', (done) => {
const deflateWorker = createDeflateWorker()
listen(deflateWorker, 1, (events) => {
expect(events).toEqual([
{
type: 'flushed',
id: 0,
result: new Uint8Array([120, 156, 74, 203, 207, 7, 0, 0, 0, 255, 255, 3, 0, 2, 130, 1, 69]),
additionalBytesCount: 3,
rawBytesCount: 3,
},
])
done()
})
deflateWorker.postMessage({ id: 0, action: 'flush', data: 'foo' })
})

it('flushes several deflates one after the other', (done) => {
const deflateWorker = createDeflateWorker()
listen(deflateWorker, 4, (events) => {
expect(events).toEqual([
{
type: 'wrote',
id: 0,
compressedBytesCount: 11,
additionalBytesCount: 3,
},
{
type: 'flushed',
id: 1,
result: new Uint8Array([120, 156, 74, 203, 207, 7, 0, 0, 0, 255, 255, 3, 0, 2, 130, 1, 69]),
additionalBytesCount: 0,
rawBytesCount: 3,
},
{
type: 'wrote',
id: 2,
compressedBytesCount: 11,
additionalBytesCount: 3,
},
{
type: 'flushed',
id: 3,
result: new Uint8Array([120, 156, 74, 74, 44, 2, 0, 0, 0, 255, 255, 3, 0, 2, 93, 1, 54]),
additionalBytesCount: 0,
rawBytesCount: 3,
},
])
done()
})
deflateWorker.postMessage({ id: 0, action: 'write', data: 'foo' })
deflateWorker.postMessage({ id: 1, action: 'flush' })
deflateWorker.postMessage({ id: 2, action: 'write', data: 'bar' })
deflateWorker.postMessage({ id: 3, action: 'flush' })
})

function listen(
deflateWorker: DeflateWorker,
expectedResponseCount: number,
onDone: (responses: DeflateWorkerResponse[]) => void
) {
const responses: DeflateWorkerResponse[] = []
const listener = (event: { data: DeflateWorkerResponse }) => {
const responsesCount = responses.push(event.data)
if (responsesCount === expectedResponseCount) {
deflateWorker.removeEventListener('message', listener)
onDone(responses)
}
}
deflateWorker.addEventListener('message', listener)
}
})
16 changes: 10 additions & 6 deletions packages/rum/src/domain/segmentCollection/startDeflateWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type DeflateWorkerState =
| {
status: DeflateWorkerStatus.Initialized
worker: DeflateWorker
version: string
}

export interface DeflateWorker extends Worker {
Expand Down Expand Up @@ -86,9 +87,9 @@ export function doStartDeflateWorker(createDeflateWorkerImpl = createDeflateWork
addEventListener(worker, 'error', onError)
addEventListener(worker, 'message', ({ data }: MessageEvent<DeflateWorkerResponse>) => {
if (data.type === 'errored') {
onError(data.error)
onError(data.error, data.streamId)
} else if (data.type === 'initialized') {
onInitialized(worker)
onInitialized(worker, data.version)
}
})
worker.postMessage({ action: 'init' })
Expand All @@ -98,14 +99,14 @@ export function doStartDeflateWorker(createDeflateWorkerImpl = createDeflateWork
}
}

function onInitialized(worker: DeflateWorker) {
function onInitialized(worker: DeflateWorker, version: string) {
if (state.status === DeflateWorkerStatus.Loading) {
state.callbacks.forEach((callback) => callback(worker))
state = { status: DeflateWorkerStatus.Initialized, worker }
state = { status: DeflateWorkerStatus.Initialized, worker, version }
}
}

function onError(error: unknown) {
function onError(error: unknown, streamId?: number) {
if (state.status === DeflateWorkerStatus.Loading) {
display.error('Session Replay recording failed to start: an error occurred while creating the Worker:', error)
if (error instanceof Event || (error instanceof Error && isMessageCspRelated(error.message))) {
Expand All @@ -119,7 +120,10 @@ function onError(error: unknown) {
state.callbacks.forEach((callback) => callback())
state = { status: DeflateWorkerStatus.Error }
} else {
addTelemetryError(error)
addTelemetryError(error, {
worker_version: state.status === DeflateWorkerStatus.Initialized && state.version,
stream_id: streamId,
})
}
}

Expand Down
Loading

0 comments on commit b8fa9df

Please sign in to comment.