Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

♻️ [RUM-249] update worker protocol #2346

Merged
merged 15 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions packages/core/src/domain/telemetry/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,15 @@ export function addTelemetryDebug(message: string, context?: Context) {
)
}

export function addTelemetryError(e: unknown) {
export function addTelemetryError(e: unknown, context?: Context) {
addTelemetry(
assign(
{
type: TelemetryType.log,
status: StatusType.error,
},
formatError(e)
formatError(e),
context
)
)
}
Expand Down
50 changes: 41 additions & 9 deletions packages/rum/src/domain/segmentCollection/segment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { RecordType } from '../../types'
import * as replayStats from '../replayStats'
import type { DeflateWorker } from './startDeflateWorker'

// Arbitrary id, will be replaced when we have multiple parallel streams.
const STREAM_ID = 1
let nextId = 0

export type FlushReason = Exclude<CreationReason, 'init'> | 'stop'
Expand All @@ -15,6 +17,7 @@ export class Segment {
public readonly metadata: BrowserSegmentMetadata

private id = nextId++
private pendingWriteCount = 0

constructor(
private worker: DeflateWorker,
Expand All @@ -41,22 +44,30 @@ export class Segment {

replayStats.addSegment(viewId)
replayStats.addRecord(viewId)
let rawBytesCount = 0
let compressedBytesCount = 0
const compressedData: Uint8Array[] = []

const { stop: removeMessageListener } = addEventListener(
worker,
'message',
({ data }: MessageEvent<DeflateWorkerResponse>) => {
if (data.type === 'errored' || data.type === 'initialized') {
if (data.type !== 'wrote') {
return
}

if (data.id === this.id) {
this.pendingWriteCount -= 1
replayStats.addWroteData(viewId, data.additionalBytesCount)
if (data.type === 'flushed') {
onFlushed(data.result, data.rawBytesCount)
rawBytesCount += data.additionalBytesCount
compressedBytesCount += data.result.length
compressedData.push(data.result)
if (this.flushReason && this.pendingWriteCount === 0) {
compressedData.push(data.trailer)
onFlushed(concatBuffers(compressedData), rawBytesCount)
removeMessageListener()
} else {
onWrote(data.compressedBytesCount)
onWrote(compressedBytesCount)
}
} else if (data.id > this.id) {
// Messages should be received in the same order as they are sent, so if we receive a
Expand All @@ -73,7 +84,7 @@ export class Segment {
}
)
sendToExtension('record', { record: initialRecord, segment: this.metadata })
this.worker.postMessage({ data: `{"records":[${JSON.stringify(initialRecord)}`, id: this.id, action: 'write' })
this.write(`{"records":[${JSON.stringify(initialRecord)}`)
}

addRecord(record: BrowserRecord): void {
Expand All @@ -83,15 +94,36 @@ export class Segment {
replayStats.addRecord(this.metadata.view.id)
this.metadata.has_full_snapshot ||= record.type === RecordType.FullSnapshot
sendToExtension('record', { record, segment: this.metadata })
this.worker.postMessage({ data: `,${JSON.stringify(record)}`, id: this.id, action: 'write' })
this.write(`,${JSON.stringify(record)}`)
}

flush(reason: FlushReason) {
this.write(`],${JSON.stringify(this.metadata).slice(1)}\n`)
this.worker.postMessage({
data: `],${JSON.stringify(this.metadata).slice(1)}\n`,
id: this.id,
action: 'flush',
action: 'reset',
streamId: STREAM_ID,
})
this.flushReason = reason
}

private write(data: string) {
this.pendingWriteCount += 1
this.worker.postMessage({
data,
id: this.id,
streamId: STREAM_ID,
action: 'write',
})
}
}

function concatBuffers(buffers: Uint8Array[]) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💭 thought: ‏this function is needed in both the worker and RUM. I duplicated it, it's not great, but it avoids adding a dependency between the worker code and main packages. Maybe the worker can expose some utility function like that, but we'd need to ensure that it is tree-shakable to a pulling too much of the worker code in main packages.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be simpler if the utility code was exposed from core?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try it. I put it in core, so now the worker depends on core. I was hesitating about it, because in the future core might very well depend on the worker. We'll see how it goes!

const length = buffers.reduce((total, buffer) => total + buffer.length, 0)
const result = new Uint8Array(length)
let offset = 0
for (const buffer of buffers) {
result.set(buffer, offset)
offset += buffer.length
}
return result
}
125 changes: 8 additions & 117 deletions packages/rum/src/domain/segmentCollection/startDeflateWorker.spec.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import type { RawTelemetryEvent } from '@datadog/browser-core'
import { display, isIE, noop, resetTelemetry, startFakeTelemetry } from '@datadog/browser-core'
import type { DeflateWorkerResponse } from '@datadog/browser-worker'
import { MockWorker } from '../../../test'
import type { DeflateWorker } from './startDeflateWorker'
import { startDeflateWorker, resetDeflateWorkerState, createDeflateWorker } from './startDeflateWorker'
import type { createDeflateWorker } from './startDeflateWorker'
import { startDeflateWorker, resetDeflateWorkerState } from './startDeflateWorker'

// Arbitrary stream ids used for tests
const TEST_STREAM_ID = 5

describe('startDeflateWorker', () => {
let deflateWorker: MockWorker
Expand Down Expand Up @@ -164,128 +166,17 @@ describe('startDeflateWorker', () => {
startDeflateWorker(noop, createDeflateWorkerSpy)
deflateWorker.processAllMessages()

deflateWorker.dispatchErrorMessage('boom')
deflateWorker.dispatchErrorMessage('boom', TEST_STREAM_ID)
expect(telemetryEvents).toEqual([
{
type: 'log',
status: 'error',
message: 'Uncaught "boom"',
error: { stack: jasmine.any(String) },
worker_version: 'dev',
stream_id: TEST_STREAM_ID,
},
])
})
})
})

describe('createDeflateWorker', () => {
beforeEach(() => {
if (isIE()) {
pending('no TextEncoder support')
}
})
it('buffers data and responds with the buffer deflated compressedBytesCount when writing', (done) => {
const deflateWorker = createDeflateWorker()
listen(deflateWorker, 3, (events) => {
expect(events).toEqual([
{ type: 'wrote', id: 0, compressedBytesCount: 11, additionalBytesCount: 3 },
{ type: 'wrote', id: 1, compressedBytesCount: 20, additionalBytesCount: 3 },
{ type: 'wrote', id: 2, compressedBytesCount: 29, additionalBytesCount: 3 },
])
done()
})
deflateWorker.postMessage({ id: 0, action: 'write', data: 'foo' })
deflateWorker.postMessage({ id: 1, action: 'write', data: 'bar' })
deflateWorker.postMessage({ id: 2, action: 'write', data: 'baz' })
})

it('responds with the resulting bytes when completing', (done) => {
const deflateWorker = createDeflateWorker()
listen(deflateWorker, 2, (events) => {
expect(events).toEqual([
{ type: 'wrote', id: 0, compressedBytesCount: 11, additionalBytesCount: 3 },
{
type: 'flushed',
id: 1,
result: new Uint8Array([120, 156, 74, 203, 207, 7, 0, 0, 0, 255, 255, 3, 0, 2, 130, 1, 69]),
additionalBytesCount: 0,
rawBytesCount: 3,
},
])
done()
})
deflateWorker.postMessage({ id: 0, action: 'write', data: 'foo' })
deflateWorker.postMessage({ id: 1, action: 'flush' })
})

it('writes the remaining data specified by "flush"', (done) => {
const deflateWorker = createDeflateWorker()
listen(deflateWorker, 1, (events) => {
expect(events).toEqual([
{
type: 'flushed',
id: 0,
result: new Uint8Array([120, 156, 74, 203, 207, 7, 0, 0, 0, 255, 255, 3, 0, 2, 130, 1, 69]),
additionalBytesCount: 3,
rawBytesCount: 3,
},
])
done()
})
deflateWorker.postMessage({ id: 0, action: 'flush', data: 'foo' })
})

it('flushes several deflates one after the other', (done) => {
const deflateWorker = createDeflateWorker()
listen(deflateWorker, 4, (events) => {
expect(events).toEqual([
{
type: 'wrote',
id: 0,
compressedBytesCount: 11,
additionalBytesCount: 3,
},
{
type: 'flushed',
id: 1,
result: new Uint8Array([120, 156, 74, 203, 207, 7, 0, 0, 0, 255, 255, 3, 0, 2, 130, 1, 69]),
additionalBytesCount: 0,
rawBytesCount: 3,
},
{
type: 'wrote',
id: 2,
compressedBytesCount: 11,
additionalBytesCount: 3,
},
{
type: 'flushed',
id: 3,
result: new Uint8Array([120, 156, 74, 74, 44, 2, 0, 0, 0, 255, 255, 3, 0, 2, 93, 1, 54]),
additionalBytesCount: 0,
rawBytesCount: 3,
},
])
done()
})
deflateWorker.postMessage({ id: 0, action: 'write', data: 'foo' })
deflateWorker.postMessage({ id: 1, action: 'flush' })
deflateWorker.postMessage({ id: 2, action: 'write', data: 'bar' })
deflateWorker.postMessage({ id: 3, action: 'flush' })
})

function listen(
deflateWorker: DeflateWorker,
expectedResponseCount: number,
onDone: (responses: DeflateWorkerResponse[]) => void
) {
const responses: DeflateWorkerResponse[] = []
const listener = (event: { data: DeflateWorkerResponse }) => {
const responsesCount = responses.push(event.data)
if (responsesCount === expectedResponseCount) {
deflateWorker.removeEventListener('message', listener)
onDone(responses)
}
}
deflateWorker.addEventListener('message', listener)
}
})
16 changes: 10 additions & 6 deletions packages/rum/src/domain/segmentCollection/startDeflateWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type DeflateWorkerState =
| {
status: DeflateWorkerStatus.Initialized
worker: DeflateWorker
version: string
}

export interface DeflateWorker extends Worker {
Expand Down Expand Up @@ -86,9 +87,9 @@ export function doStartDeflateWorker(createDeflateWorkerImpl = createDeflateWork
addEventListener(worker, 'error', onError)
addEventListener(worker, 'message', ({ data }: MessageEvent<DeflateWorkerResponse>) => {
if (data.type === 'errored') {
onError(data.error)
onError(data.error, data.streamId)
} else if (data.type === 'initialized') {
onInitialized(worker)
onInitialized(worker, data.version)
}
})
worker.postMessage({ action: 'init' })
Expand All @@ -98,14 +99,14 @@ export function doStartDeflateWorker(createDeflateWorkerImpl = createDeflateWork
}
}

function onInitialized(worker: DeflateWorker) {
function onInitialized(worker: DeflateWorker, version: string) {
if (state.status === DeflateWorkerStatus.Loading) {
state.callbacks.forEach((callback) => callback(worker))
state = { status: DeflateWorkerStatus.Initialized, worker }
state = { status: DeflateWorkerStatus.Initialized, worker, version }
}
}

function onError(error: unknown) {
function onError(error: unknown, streamId?: number) {
if (state.status === DeflateWorkerStatus.Loading) {
display.error('Session Replay recording failed to start: an error occurred while creating the Worker:', error)
if (error instanceof Event || (error instanceof Error && isMessageCspRelated(error.message))) {
Expand All @@ -119,7 +120,10 @@ function onError(error: unknown) {
state.callbacks.forEach((callback) => callback())
state = { status: DeflateWorkerStatus.Error }
} else {
addTelemetryError(error)
addTelemetryError(error, {
worker_version: state.status === DeflateWorkerStatus.Initialized && state.version,
stream_id: streamId,
})
}
}

Expand Down
Loading