Skip to content

Commit

Permalink
fix: major performance issues with bytea performance brianc#2240
Browse files Browse the repository at this point in the history
  • Loading branch information
regevbr committed Jul 3, 2020
1 parent 5e0d684 commit 64c78b0
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 95 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"packages/*"
],
"scripts": {
"test": "yarn lint && yarn lerna exec yarn test",
"test": "export PGDATABASE=data && export PGUSER=user && export PGPASSWORD=pass && yarn lint && yarn lerna exec yarn test",
"build": "yarn lerna exec --scope pg-protocol yarn build",
"pretest": "yarn build",
"lint": "if [ -x ./node_modules/.bin/prettier ]; then eslint '*/**/*.{js,ts,tsx}'; fi;"
Expand Down
128 changes: 37 additions & 91 deletions packages/pg-protocol/src/parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,10 @@ const enum MessageCodes {

export type MessageCallback = (msg: BackendMessage) => void

interface CombinedBuffer {
combinedBuffer: Buffer
combinedBufferOffset: number
combinedBufferLength: number
combinedBufferFullLength: number
reuseRemainingBuffer: boolean
}

export class Parser {
private remainingBuffer: Buffer = emptyBuffer
private remainingBufferLength: number = 0
private remainingBufferOffset: number = 0
private buffer: Buffer = emptyBuffer
private bufferLength: number = 0
private bufferOffset: number = 0
private reader = new BufferReader()
private mode: Mode

Expand All @@ -96,111 +88,65 @@ export class Parser {
}

public parse(buffer: Buffer, callback: MessageCallback) {
const {
combinedBuffer,
combinedBufferOffset,
combinedBufferLength,
reuseRemainingBuffer,
combinedBufferFullLength,
} = this.mergeBuffer(buffer)
let offset = combinedBufferOffset
while (offset + HEADER_LENGTH <= combinedBufferFullLength) {
this.mergeBuffer(buffer)
const bufferFullLength = this.bufferOffset + this.bufferLength
let offset = this.bufferOffset
while (offset + HEADER_LENGTH <= bufferFullLength) {
// code is 1 byte long - it identifies the message type
const code = combinedBuffer[offset]

const code = this.buffer[offset]
// length is 1 Uint32BE - it is the length of the message EXCLUDING the code
const length = combinedBuffer.readUInt32BE(offset + CODE_LENGTH)

const length = this.buffer.readUInt32BE(offset + CODE_LENGTH)
const fullMessageLength = CODE_LENGTH + length

if (fullMessageLength + offset <= combinedBufferFullLength) {
const message = this.handlePacket(offset + HEADER_LENGTH, code, length, combinedBuffer)
if (fullMessageLength + offset <= bufferFullLength) {
const message = this.handlePacket(offset + HEADER_LENGTH, code, length, this.buffer)
callback(message)
offset += fullMessageLength
} else {
break
}
}
this.consumeBuffer({
combinedBuffer,
combinedBufferOffset: offset,
combinedBufferLength,
reuseRemainingBuffer,
combinedBufferFullLength,
})
if (offset === bufferFullLength) {
// No more use for the buffer
this.buffer = emptyBuffer
this.bufferLength = 0
this.bufferOffset = 0
} else {
// Adjust the cursors of remainingBuffer
this.bufferLength = bufferFullLength - offset
this.bufferOffset = offset
}
}

private mergeBuffer(buffer: Buffer): CombinedBuffer {
let combinedBuffer = buffer
let combinedBufferLength = buffer.byteLength
let combinedBufferOffset = 0
let reuseRemainingBuffer = this.remainingBufferLength > 0
if (reuseRemainingBuffer) {
const newLength = this.remainingBufferLength + combinedBufferLength
const newFullLength = newLength + this.remainingBufferOffset
if (newFullLength > this.remainingBuffer.byteLength) {
private mergeBuffer(buffer: Buffer): void {
if (this.bufferLength > 0) {
const newLength = this.bufferLength + buffer.byteLength
const newFullLength = newLength + this.bufferOffset
if (newFullLength > this.buffer.byteLength) {
// We can't concat the new buffer with the remaining one
let newBuffer: Buffer
if (newLength <= this.remainingBuffer.byteLength && this.remainingBufferOffset >= this.remainingBufferLength) {
if (newLength <= this.buffer.byteLength && this.bufferOffset >= this.bufferLength) {
// We can move the relevant part to the beginning of the buffer instead of allocating a new buffer
newBuffer = this.remainingBuffer
newBuffer = this.buffer
} else {
// Allocate a new larger buffer
let newBufferLength = this.remainingBuffer.byteLength * 2
let newBufferLength = this.buffer.byteLength * 2
while (newLength >= newBufferLength) {
newBufferLength *= 2
}
newBuffer = Buffer.allocUnsafe(newBufferLength)
}
// Move the remaining buffer to the new one
this.remainingBuffer.copy(
newBuffer,
0,
this.remainingBufferOffset,
this.remainingBufferOffset + this.remainingBufferLength
)
this.remainingBuffer = newBuffer
this.remainingBufferOffset = 0
this.buffer.copy(newBuffer, 0, this.bufferOffset, this.bufferOffset + this.bufferLength)
this.buffer = newBuffer
this.bufferOffset = 0
}
// Concat the new buffer with the remaining one
buffer.copy(this.remainingBuffer, this.remainingBufferOffset + this.remainingBufferLength)
combinedBuffer = this.remainingBuffer
combinedBufferLength = this.remainingBufferLength = newLength
combinedBufferOffset = this.remainingBufferOffset
}
const combinedBufferFullLength = combinedBufferOffset + combinedBufferLength
return {
combinedBuffer,
combinedBufferOffset,
combinedBufferLength,
reuseRemainingBuffer,
combinedBufferFullLength,
}
}

private consumeBuffer({
combinedBufferOffset,
combinedBufferFullLength,
reuseRemainingBuffer,
combinedBuffer,
combinedBufferLength,
}: CombinedBuffer) {
if (combinedBufferOffset === combinedBufferFullLength) {
// No more use for the buffer
this.remainingBuffer = emptyBuffer
this.remainingBufferLength = 0
this.remainingBufferOffset = 0
buffer.copy(this.buffer, this.bufferOffset + this.bufferLength)
this.bufferLength = newLength
} else {
this.remainingBufferLength = combinedBufferFullLength - combinedBufferOffset
if (reuseRemainingBuffer) {
// Adjust the cursors of remainingBuffer
this.remainingBufferOffset = combinedBufferOffset
} else {
// To avoid side effects, copy the remaining part of the new buffer to remainingBuffer with extra space for next buffer
this.remainingBuffer = Buffer.allocUnsafe(combinedBufferLength * 2)
combinedBuffer.copy(this.remainingBuffer, 0, combinedBufferOffset)
this.remainingBufferOffset = 0
}
this.buffer = buffer
this.bufferOffset = 0
this.bufferLength = buffer.byteLength
}
}

Expand Down
20 changes: 17 additions & 3 deletions packages/pg/bench.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
const pg = require('./lib')
const pool = new pg.Pool()

const params = {
text:
Expand All @@ -17,7 +16,7 @@ const seq = {
}

const exec = async (client, q) => {
const result = await client.query({
await client.query({
text: q.text,
values: q.values,
rowMode: 'array',
Expand All @@ -40,6 +39,7 @@ const run = async () => {
const client = new pg.Client()
await client.connect()
await client.query('CREATE TEMP TABLE foobar(name TEXT, age NUMERIC)')
await client.query('CREATE TEMP TABLE buf(name TEXT, data BYTEA)')
await bench(client, params, 1000)
console.log('warmup done')
const seconds = 5
Expand All @@ -61,7 +61,21 @@ const run = async () => {
console.log('insert queries:', queries)
console.log('qps', queries / seconds)
console.log('on my laptop best so far seen 5799 qps')
console.log()

console.log('')
console.log('Warming up bytea test')
await client.query({
text: 'INSERT INTO buf(name, data) VALUES ($1, $2)',
values: ['test', Buffer.allocUnsafe(104857600)],
})
console.log('bytea warmup done')
const start = Date.now()
const results = await client.query('SELECT * FROM buf')
const time = Date.now() - start
console.log('bytea time:', time, 'ms')
console.log('bytea length:', results.rows[0].data.byteLength, 'bytes')
console.log('on my laptop best so far seen 1107ms and 104857600 bytes')

await client.end()
await client.end()
}
Expand Down

0 comments on commit 64c78b0

Please sign in to comment.