Skip to content

Commit

Permalink
🐛[batch] fix wrong index strategy
Browse files Browse the repository at this point in the history
store separately add/upsert messages and join them on flush
  • Loading branch information
bcaudan committed Jan 9, 2020
1 parent 2adeaa2 commit 07e7e3f
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 52 deletions.
77 changes: 44 additions & 33 deletions packages/core/src/transport.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import lodashMerge from 'lodash.merge'

import { monitor } from './internalMonitoring'
import { Context, jsonStringify } from './utils'
import { Context, jsonStringify, objectValues } from './utils'

/**
* Use POST request without content type to:
Expand Down Expand Up @@ -29,9 +29,10 @@ export class HttpRequest {

export class Batch<T> {
private beforeFlushOnUnloadHandlers: Array<() => void> = []
private buffer: string[] = []
private pushOnlyBuffer: string[] = []
private upsertBuffer: { [key: string]: string } = {}
private bufferBytesSize = 0
private bufferIndexByKey: { [key: string]: number } = {}
private bufferMessageCount = 0

constructor(
private request: HttpRequest,
Expand All @@ -53,25 +54,18 @@ export class Batch<T> {
this.addOrUpdate(message, key)
}

remove(index: number) {
const [removedMessage] = this.buffer.splice(index, 1)
const messageBytesSize = this.sizeInBytes(removedMessage)
this.bufferBytesSize -= messageBytesSize
if (this.buffer.length > 0) {
this.bufferBytesSize -= 1
}
}

beforeFlushOnUnload(handler: () => void) {
this.beforeFlushOnUnloadHandlers.push(handler)
}

flush() {
if (this.buffer.length !== 0) {
this.request.send(this.buffer.join('\n'), this.bufferBytesSize)
this.buffer = []
if (this.bufferMessageCount !== 0) {
const messages = [...this.pushOnlyBuffer, ...objectValues(this.upsertBuffer)]
this.request.send(messages.join('\n'), this.bufferBytesSize)
this.pushOnlyBuffer = []
this.upsertBuffer = {}
this.bufferBytesSize = 0
this.bufferIndexByKey = {}
this.bufferMessageCount = 0
}
}

Expand All @@ -81,58 +75,75 @@ export class Batch<T> {
console.warn(`Discarded a message whose size was bigger than the maximum allowed size ${this.maxMessageSize}KB.`)
return
}
if (key && this.bufferIndexByKey[key] !== undefined) {
this.remove(this.bufferIndexByKey[key])
if (this.hasMessageFor(key)) {
this.remove(key)
}
if (this.willReachedBytesLimitWith(messageBytesSize)) {
this.flush()
}
this.push(processedMessage, messageBytesSize)
if (key) {
this.bufferIndexByKey[key] = this.buffer.length - 1
}
this.push(processedMessage, messageBytesSize, key)
if (this.isFull()) {
this.flush()
}
}

private flushPeriodically() {
setTimeout(() => {
this.flush()
this.flushPeriodically()
}, this.flushTimeout)
}

private process(message: T) {
const contextualizedMessage = lodashMerge({}, this.contextProvider(), message) as Context
const processedMessage = jsonStringify(contextualizedMessage)!
const messageBytesSize = this.sizeInBytes(processedMessage)
return { processedMessage, messageBytesSize }
}

private push(processedMessage: string, messageBytesSize: number) {
if (this.buffer.length > 0) {
private push(processedMessage: string, messageBytesSize: number, key?: string) {
if (this.bufferMessageCount > 0) {
// \n separator at serialization
this.bufferBytesSize += 1
}
this.buffer.push(processedMessage)
if (key !== undefined) {
this.upsertBuffer[key] = processedMessage
} else {
this.pushOnlyBuffer.push(processedMessage)
}
this.bufferMessageCount += 1
this.bufferBytesSize += messageBytesSize
}

private remove(key: string) {
const removedMessage = this.upsertBuffer[key]
delete this.upsertBuffer[key]
const messageBytesSize = this.sizeInBytes(removedMessage)
this.bufferBytesSize -= messageBytesSize
this.bufferMessageCount -= 1
if (this.bufferMessageCount > 0) {
this.bufferBytesSize -= 1
}
}

private hasMessageFor(key?: string): key is string {
return key !== undefined && this.upsertBuffer[key] !== undefined
}

private willReachedBytesLimitWith(messageBytesSize: number) {
// byte of the separator at the end of the message
return this.bufferBytesSize + messageBytesSize + 1 >= this.bytesLimit
}

private isFull() {
return this.buffer.length === this.maxSize || this.bufferBytesSize >= this.bytesLimit
return this.bufferMessageCount === this.maxSize || this.bufferBytesSize >= this.bytesLimit
}

private sizeInBytes(candidate: string) {
// tslint:disable-next-line no-bitwise
return ~-encodeURI(candidate).split(/%..|./).length
}

private flushPeriodically() {
setTimeout(() => {
this.flush()
this.flushPeriodically()
}, this.flushTimeout)
}

private flushOnVisibilityHidden() {
/**
* With sendBeacon, requests are guaranteed to be successfully sent during document unload
Expand Down
8 changes: 8 additions & 0 deletions packages/core/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,11 @@ export function isPercentage(value: unknown) {
export function getRelativeTime(timestamp: number) {
return timestamp - performance.timing.navigationStart
}

export function objectValues(object: { [key: string]: unknown }) {
const values: unknown[] = []
Object.keys(object).forEach((key) => {
values.push(object[key])
})
return values
}
30 changes: 11 additions & 19 deletions packages/core/test/transport.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,25 +163,6 @@ describe('batch', () => {
warnStub.restore()
})

it('should remove a message', () => {
batch.add({ message: '1' })
const firstMessageSize = (batch as any).bufferBytesSize

batch.add({ message: '2' })
batch.remove(1)

expect((batch as any).bufferBytesSize).toBe(firstMessageSize)

batch.add({ message: '3' })
expect(transport.send).not.toHaveBeenCalled()

batch.add({ message: '4' })
expect(transport.send).toHaveBeenCalledWith(
'{"foo":"bar","message":"1"}\n{"foo":"bar","message":"3"}\n{"foo":"bar","message":"4"}',
jasmine.any(Number)
)
})

it('should upsert a message for a given key', () => {
batch.upsert({ message: '1' }, 'a')
batch.upsert({ message: '2' }, 'a')
Expand All @@ -201,5 +182,16 @@ describe('batch', () => {
'{"foo":"bar","message":"5"}\n{"foo":"bar","message":"6"}\n{"foo":"bar","message":"7"}',
jasmine.any(Number)
)

batch.upsert({ message: '8' }, 'a')
batch.upsert({ message: '9' }, 'b')
batch.upsert({ message: '10' }, 'a')
batch.upsert({ message: '11' }, 'b')
batch.flush()

expect(transport.send).toHaveBeenCalledWith(
'{"foo":"bar","message":"10"}\n{"foo":"bar","message":"11"}',
jasmine.any(Number)
)
})
})

0 comments on commit 07e7e3f

Please sign in to comment.