diff --git a/package.json b/package.json index 282ca9376..6ab9fa918 100644 --- a/package.json +++ b/package.json @@ -10,7 +10,7 @@ "packages/*" ], "scripts": { - "test": "yarn lint && yarn lerna exec yarn test", + "test": "export PGDATABASE=data && export PGUSER=user && export PGPASSWORD=pass && yarn lint && yarn lerna exec yarn test", "build": "yarn lerna exec --scope pg-protocol yarn build", "pretest": "yarn build", "lint": "if [ -x ./node_modules/.bin/prettier ]; then eslint '*/**/*.{js,ts,tsx}'; fi;" diff --git a/packages/pg-protocol/src/parser.ts b/packages/pg-protocol/src/parser.ts index 1827c3d1f..a00dabec9 100644 --- a/packages/pg-protocol/src/parser.ts +++ b/packages/pg-protocol/src/parser.ts @@ -73,18 +73,10 @@ const enum MessageCodes { export type MessageCallback = (msg: BackendMessage) => void -interface CombinedBuffer { - combinedBuffer: Buffer - combinedBufferOffset: number - combinedBufferLength: number - combinedBufferFullLength: number - reuseRemainingBuffer: boolean -} - export class Parser { - private remainingBuffer: Buffer = emptyBuffer - private remainingBufferLength: number = 0 - private remainingBufferOffset: number = 0 + private buffer: Buffer = emptyBuffer + private bufferLength: number = 0 + private bufferOffset: number = 0 private reader = new BufferReader() private mode: Mode @@ -96,111 +88,65 @@ export class Parser { } public parse(buffer: Buffer, callback: MessageCallback) { - const { - combinedBuffer, - combinedBufferOffset, - combinedBufferLength, - reuseRemainingBuffer, - combinedBufferFullLength, - } = this.mergeBuffer(buffer) - let offset = combinedBufferOffset - while (offset + HEADER_LENGTH <= combinedBufferFullLength) { + this.mergeBuffer(buffer) + const bufferFullLength = this.bufferOffset + this.bufferLength + let offset = this.bufferOffset + while (offset + HEADER_LENGTH <= bufferFullLength) { // code is 1 byte long - it identifies the message type - const code = combinedBuffer[offset] - + const code = this.buffer[offset] // length is 1 Uint32BE - it is the length of the message EXCLUDING the code - const length = combinedBuffer.readUInt32BE(offset + CODE_LENGTH) - + const length = this.buffer.readUInt32BE(offset + CODE_LENGTH) const fullMessageLength = CODE_LENGTH + length - - if (fullMessageLength + offset <= combinedBufferFullLength) { - const message = this.handlePacket(offset + HEADER_LENGTH, code, length, combinedBuffer) + if (fullMessageLength + offset <= bufferFullLength) { + const message = this.handlePacket(offset + HEADER_LENGTH, code, length, this.buffer) callback(message) offset += fullMessageLength } else { break } } - this.consumeBuffer({ - combinedBuffer, - combinedBufferOffset: offset, - combinedBufferLength, - reuseRemainingBuffer, - combinedBufferFullLength, - }) + if (offset === bufferFullLength) { + // No more use for the buffer + this.buffer = emptyBuffer + this.bufferLength = 0 + this.bufferOffset = 0 + } else { + // Adjust the cursors of remainingBuffer + this.bufferLength = bufferFullLength - offset + this.bufferOffset = offset + } } - private mergeBuffer(buffer: Buffer): CombinedBuffer { - let combinedBuffer = buffer - let combinedBufferLength = buffer.byteLength - let combinedBufferOffset = 0 - let reuseRemainingBuffer = this.remainingBufferLength > 0 - if (reuseRemainingBuffer) { - const newLength = this.remainingBufferLength + combinedBufferLength - const newFullLength = newLength + this.remainingBufferOffset - if (newFullLength > this.remainingBuffer.byteLength) { + private mergeBuffer(buffer: Buffer): void { + if (this.bufferLength > 0) { + const newLength = this.bufferLength + buffer.byteLength + const newFullLength = newLength + this.bufferOffset + if (newFullLength > this.buffer.byteLength) { // We can't concat the new buffer with the remaining one let newBuffer: Buffer - if (newLength <= this.remainingBuffer.byteLength && this.remainingBufferOffset >= this.remainingBufferLength) { + if (newLength <= this.buffer.byteLength && this.bufferOffset >= this.bufferLength) { // We can move the relevant part to the beginning of the buffer instead of allocating a new buffer - newBuffer = this.remainingBuffer + newBuffer = this.buffer } else { // Allocate a new larger buffer - let newBufferLength = this.remainingBuffer.byteLength * 2 + let newBufferLength = this.buffer.byteLength * 2 while (newLength >= newBufferLength) { newBufferLength *= 2 } newBuffer = Buffer.allocUnsafe(newBufferLength) } // Move the remaining buffer to the new one - this.remainingBuffer.copy( - newBuffer, - 0, - this.remainingBufferOffset, - this.remainingBufferOffset + this.remainingBufferLength - ) - this.remainingBuffer = newBuffer - this.remainingBufferOffset = 0 + this.buffer.copy(newBuffer, 0, this.bufferOffset, this.bufferOffset + this.bufferLength) + this.buffer = newBuffer + this.bufferOffset = 0 } // Concat the new buffer with the remaining one - buffer.copy(this.remainingBuffer, this.remainingBufferOffset + this.remainingBufferLength) - combinedBuffer = this.remainingBuffer - combinedBufferLength = this.remainingBufferLength = newLength - combinedBufferOffset = this.remainingBufferOffset - } - const combinedBufferFullLength = combinedBufferOffset + combinedBufferLength - return { - combinedBuffer, - combinedBufferOffset, - combinedBufferLength, - reuseRemainingBuffer, - combinedBufferFullLength, - } - } - - private consumeBuffer({ - combinedBufferOffset, - combinedBufferFullLength, - reuseRemainingBuffer, - combinedBuffer, - combinedBufferLength, - }: CombinedBuffer) { - if (combinedBufferOffset === combinedBufferFullLength) { - // No more use for the buffer - this.remainingBuffer = emptyBuffer - this.remainingBufferLength = 0 - this.remainingBufferOffset = 0 + buffer.copy(this.buffer, this.bufferOffset + this.bufferLength) + this.bufferLength = newLength } else { - this.remainingBufferLength = combinedBufferFullLength - combinedBufferOffset - if (reuseRemainingBuffer) { - // Adjust the cursors of remainingBuffer - this.remainingBufferOffset = combinedBufferOffset - } else { - // To avoid side effects, copy the remaining part of the new buffer to remainingBuffer with extra space for next buffer - this.remainingBuffer = Buffer.allocUnsafe(combinedBufferLength * 2) - combinedBuffer.copy(this.remainingBuffer, 0, combinedBufferOffset) - this.remainingBufferOffset = 0 - } + this.buffer = buffer + this.bufferOffset = 0 + this.bufferLength = buffer.byteLength } } diff --git a/packages/pg/bench.js b/packages/pg/bench.js index 80c07dc19..c861c3ae6 100644 --- a/packages/pg/bench.js +++ b/packages/pg/bench.js @@ -1,5 +1,4 @@ const pg = require('./lib') -const pool = new pg.Pool() const params = { text: @@ -17,7 +16,7 @@ const seq = { } const exec = async (client, q) => { - const result = await client.query({ + await client.query({ text: q.text, values: q.values, rowMode: 'array', @@ -40,6 +39,7 @@ const run = async () => { const client = new pg.Client() await client.connect() await client.query('CREATE TEMP TABLE foobar(name TEXT, age NUMERIC)') + await client.query('CREATE TEMP TABLE buf(name TEXT, data BYTEA)') await bench(client, params, 1000) console.log('warmup done') const seconds = 5 @@ -61,7 +61,21 @@ const run = async () => { console.log('insert queries:', queries) console.log('qps', queries / seconds) console.log('on my laptop best so far seen 5799 qps') - console.log() + + console.log('') + console.log('Warming up bytea test') + await client.query({ + text: 'INSERT INTO buf(name, data) VALUES ($1, $2)', + values: ['test', Buffer.allocUnsafe(104857600)], + }) + console.log('bytea warmup done') + const start = Date.now() + const results = await client.query('SELECT * FROM buf') + const time = Date.now() - start + console.log('bytea time:', time, 'ms') + console.log('bytea length:', results.rows[0].data.byteLength, 'bytes') + console.log('on my laptop best so far seen 1107ms and 104857600 bytes') + await client.end() await client.end() }