Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce BufferPool to replace BufferList #2669

Merged
merged 4 commits into from
Dec 10, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
"bson-ext": "^2.0.0"
},
"dependencies": {
"bl": "^2.2.1",
"bson": "^4.0.4",
"denque": "^1.4.1",
"lodash": "^4.17.20"
Expand Down
21 changes: 8 additions & 13 deletions src/cmap/message_stream.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import BufferList = require('bl');
import { Duplex, DuplexOptions } from 'stream';
import { Response, Msg, BinMsg, Query, WriteProtocolMessageType, MessageHeader } from './commands';
import { MongoError, MongoParseError } from '../error';
Expand All @@ -11,7 +10,7 @@ import {
CompressorName
} from './wire_protocol/compression';
import type { Document, BSONSerializeOptions } from '../bson';
import type { Callback } from '../utils';
import { BufferPool, Callback } from '../utils';
import type { ClientSession } from '../sessions';

const MESSAGE_HEADER_SIZE = 16;
Expand Down Expand Up @@ -48,21 +47,19 @@ export interface OperationDescription extends BSONSerializeOptions {
* @internal
*/
export class MessageStream extends Duplex {
/** @internal */
maxBsonMessageSize: number;
[kBuffer]: BufferList;
/** @internal */
[kBuffer]: BufferPool;

constructor(options: MessageStreamOptions = {}) {
super(options);

this.maxBsonMessageSize = options.maxBsonMessageSize || kDefaultMaxBsonMessageSize;

this[kBuffer] = new BufferList();
this[kBuffer] = new BufferPool();
}

_write(chunk: Buffer, _: unknown, callback: Callback<Buffer>): void {
const buffer = this[kBuffer];
buffer.append(chunk);

this[kBuffer].append(chunk);
processIncomingData(this, callback);
}

Expand Down Expand Up @@ -135,7 +132,7 @@ function processIncomingData(stream: MessageStream, callback: Callback<Buffer>)
return;
}

const sizeOfMessage = buffer.readInt32LE(0);
const sizeOfMessage = buffer.peek(4).readInt32LE();
if (sizeOfMessage < 0) {
callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}`));
return;
Expand All @@ -155,9 +152,7 @@ function processIncomingData(stream: MessageStream, callback: Callback<Buffer>)
return;
}

const message = buffer.slice(0, sizeOfMessage);
buffer.consume(sizeOfMessage);

const message = buffer.read(sizeOfMessage);
const messageHeader: MessageHeader = {
length: message.readInt32LE(0),
requestId: message.readInt32LE(4),
Expand Down
3 changes: 2 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ export type {
ClientMetadata,
ClientMetadataOptions,
MongoDBNamespace,
InterruptableAsyncInterval
InterruptibleAsyncInterval,
BufferPool
} from './utils';
export type { WriteConcern, W, WriteConcernOptions } from './write_concern';
export type { ExecutionResult } from './operations/execute_operation';
Expand Down
10 changes: 5 additions & 5 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
now,
makeStateMachine,
calculateDurationInMs,
makeInterruptableAsyncInterval
makeInterruptibleAsyncInterval
} from '../utils';
import { EventEmitter } from 'events';
import { connect } from '../cmap/connect';
Expand All @@ -17,7 +17,7 @@ import {
} from './events';

import { Server } from './server';
import type { InterruptableAsyncInterval, Callback } from '../utils';
import type { InterruptibleAsyncInterval, Callback } from '../utils';
import type { TopologyVersion } from './server_description';
import type { ConnectionOptions } from '../cmap/connection';

Expand Down Expand Up @@ -65,7 +65,7 @@ export class Monitor extends EventEmitter {
[kConnection]?: Connection;
[kCancellationToken]: EventEmitter;
/** @internal */
[kMonitorId]?: InterruptableAsyncInterval;
[kMonitorId]?: InterruptibleAsyncInterval;
[kRTTPinger]?: RTTPinger;

constructor(server: Server, options?: Partial<MonitorOptions>) {
Expand Down Expand Up @@ -123,7 +123,7 @@ export class Monitor extends EventEmitter {
// start
const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
this[kMonitorId] = makeInterruptableAsyncInterval(monitorServer(this), {
this[kMonitorId] = makeInterruptibleAsyncInterval(monitorServer(this), {
interval: heartbeatFrequencyMS,
minInterval: minHeartbeatFrequencyMS,
immediate: true
Expand Down Expand Up @@ -153,7 +153,7 @@ export class Monitor extends EventEmitter {
// restart monitoring
const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
this[kMonitorId] = makeInterruptableAsyncInterval(monitorServer(this), {
this[kMonitorId] = makeInterruptibleAsyncInterval(monitorServer(this), {
interval: heartbeatFrequencyMS,
minInterval: minHeartbeatFrequencyMS
});
Expand Down
106 changes: 101 additions & 5 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ export function calculateDurationInMs(started: number): number {
return elapsed < 0 ? 0 : elapsed;
}

export interface InterruptableAsyncIntervalOptions {
export interface InterruptibleAsyncIntervalOptions {
/** The interval to execute a method on */
interval: number;
/** A minimum interval that must elapse before the method is called */
Expand All @@ -977,7 +977,7 @@ export interface InterruptableAsyncIntervalOptions {
}

/** @internal */
export interface InterruptableAsyncInterval {
export interface InterruptibleAsyncInterval {
wake(): void;
stop(): void;
}
Expand All @@ -991,10 +991,10 @@ export interface InterruptableAsyncInterval {
*
* @param fn - An async function to run on an interval, must accept a `callback` as its only parameter
*/
export function makeInterruptableAsyncInterval(
export function makeInterruptibleAsyncInterval(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import needs to be updated in test/unit/utils.test.js to fix CI

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops thanks, fixed

fn: (callback: Callback) => void,
options?: Partial<InterruptableAsyncIntervalOptions>
): InterruptableAsyncInterval {
options?: Partial<InterruptibleAsyncIntervalOptions>
): InterruptibleAsyncInterval {
let timerId: NodeJS.Timeout | undefined;
let lastCallTime: number;
let lastWakeTime: number;
Expand Down Expand Up @@ -1155,3 +1155,99 @@ export function isRecord(

return isRecord;
}

const kBuffers = Symbol('buffers');
const kLength = Symbol('length');

/**
* A pool of Buffers which allow you to read them as if they were one
* @internal
*/
export class BufferPool {
[kBuffers]: Buffer[];
[kLength]: number;

constructor() {
this[kBuffers] = [];
this[kLength] = 0;
}

get length(): number {
return this[kLength];
}

/** Adds a buffer to the internal buffer pool list */
append(buffer: Buffer): void {
this[kBuffers].push(buffer);
this[kLength] += buffer.length;
}

/** Returns the requested number of bytes without consuming them */
peek(size: number): Buffer {
return this.read(size, false);
}

/** Reads the requested number of bytes, optionally consuming them */
read(size: number, consume = true): Buffer {
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
if (typeof size !== 'number' || size < 0) {
throw new TypeError('Parameter size must be a non-negative number');
}

if (size > this[kLength]) {
return Buffer.alloc(0);
}

let result: Buffer;

// read the whole buffer
if (size === this.length) {
result = Buffer.concat(this[kBuffers]);

if (consume) {
this[kBuffers] = [];
this[kLength] = 0;
}
}

// size is within first buffer, no need to concat
else if (size <= this[kBuffers][0].length) {
result = this[kBuffers][0].slice(0, size);
if (consume) {
this[kBuffers][0] = this[kBuffers][0].slice(size);
this[kLength] -= size;
}
}

// size is beyond first buffer, need to track and copy
else {
result = Buffer.allocUnsafe(size);

let idx;
let offset = 0;
let bytesToCopy = size;
for (idx = 0; idx < this[kBuffers].length; ++idx) {
let bytesCopied;
if (bytesToCopy > this[kBuffers][idx].length) {
bytesCopied = this[kBuffers][idx].copy(result, offset, 0);
offset += bytesCopied;
} else {
bytesCopied = this[kBuffers][idx].copy(result, offset, 0, bytesToCopy);
if (consume) {
this[kBuffers][idx] = this[kBuffers][idx].slice(0, bytesCopied);
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
}
offset += bytesCopied;
break;
}

bytesToCopy -= bytesCopied;
}

// compact the internal buffer array
if (consume) {
this[kBuffers] = this[kBuffers].slice(idx);
}
}

return result;
}
}
96 changes: 90 additions & 6 deletions test/unit/utils.test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
'use strict';
const { eachAsync, now, makeInterruptableAsyncInterval } = require('../../src/utils');
const { eachAsync, now, makeInterruptibleAsyncInterval, BufferPool } = require('../../src/utils');
const { expect } = require('chai');
const sinon = require('sinon');

Expand Down Expand Up @@ -35,7 +35,7 @@ describe('utils', function () {
});
});

context('makeInterruptableAsyncInterval', function () {
context('makeInterruptibleAsyncInterval', function () {
before(function () {
this.clock = sinon.useFakeTimers();
});
Expand All @@ -47,7 +47,7 @@ describe('utils', function () {
it('should execute a method in an repeating interval', function (done) {
let lastTime = now();
const marks = [];
const executor = makeInterruptableAsyncInterval(
const executor = makeInterruptibleAsyncInterval(
callback => {
marks.push(now() - lastTime);
lastTime = now();
Expand All @@ -69,7 +69,7 @@ describe('utils', function () {
it('should schedule execution sooner if requested within min interval threshold', function (done) {
let lastTime = now();
const marks = [];
const executor = makeInterruptableAsyncInterval(
const executor = makeInterruptibleAsyncInterval(
callback => {
marks.push(now() - lastTime);
lastTime = now();
Expand All @@ -93,7 +93,7 @@ describe('utils', function () {
it('should debounce multiple requests to wake the interval sooner', function (done) {
let lastTime = now();
const marks = [];
const executor = makeInterruptableAsyncInterval(
const executor = makeInterruptibleAsyncInterval(
callback => {
marks.push(now() - lastTime);
lastTime = now();
Expand All @@ -119,7 +119,7 @@ describe('utils', function () {
let clockCalled = 0;
let lastTime = now();
const marks = [];
const executor = makeInterruptableAsyncInterval(
const executor = makeInterruptibleAsyncInterval(
callback => {
marks.push(now() - lastTime);
lastTime = now();
Expand Down Expand Up @@ -161,4 +161,88 @@ describe('utils', function () {
this.clock.tick(250);
});
});

context('BufferPool', function () {
it('should report the correct length', function () {
const buffer = new BufferPool();
buffer.append(Buffer.from([0, 1]));
buffer.append(Buffer.from([2, 3]));
buffer.append(Buffer.from([2, 3]));
expect(buffer).property('length').to.equal(6);
});

it('return an empty buffer if too many bytes requested', function () {
const buffer = new BufferPool();
buffer.append(Buffer.from([0, 1, 2, 3]));
const data = buffer.read(6);
expect(data).to.have.length(0);
expect(buffer).property('length').to.equal(4);
});

context('peek', function () {
it('exact size', function () {
const buffer = new BufferPool();
buffer.append(Buffer.from([0, 1]));
const data = buffer.peek(2);
expect(data).to.eql(Buffer.from([0, 1]));
expect(buffer).property('length').to.equal(2);
});

it('within first buffer', function () {
const buffer = new BufferPool();
buffer.append(Buffer.from([0, 1, 2, 3]));
const data = buffer.peek(2);
expect(data).to.eql(Buffer.from([0, 1]));
expect(buffer).property('length').to.equal(4);
});

it('across multiple buffers', function () {
const buffer = new BufferPool();
buffer.append(Buffer.from([0, 1]));
buffer.append(Buffer.from([2, 3]));
buffer.append(Buffer.from([4, 5]));
const data = buffer.peek(6);
expect(data).to.eql(Buffer.from([0, 1, 2, 3, 4, 5]));
expect(buffer).property('length').to.equal(6);
});
});

context('read', function () {
it('should throw an error if a negative size is requested', function () {
const buffer = new BufferPool();
expect(() => buffer.read(-1)).to.throw(/Parameter size must be a non-negative number/);
});

it('should throw an error if a non-number size is requested', function () {
const buffer = new BufferPool();
expect(() => buffer.read('256')).to.throw(/Parameter size must be a non-negative number/);
});

it('exact size', function () {
const buffer = new BufferPool();
buffer.append(Buffer.from([0, 1]));
const data = buffer.read(2);
expect(data).to.eql(Buffer.from([0, 1]));
expect(buffer).property('length').to.equal(0);
});

it('within first buffer', function () {
const buffer = new BufferPool();
buffer.append(Buffer.from([0, 1, 2, 3]));
const data = buffer.read(2);
expect(data).to.eql(Buffer.from([0, 1]));
expect(buffer).property('length').to.equal(2);
});

it('across multiple buffers', function () {
const buffer = new BufferPool();
buffer.append(Buffer.from([0, 1]));
buffer.append(Buffer.from([2, 3]));
buffer.append(Buffer.from([4, 5]));
const data = buffer.read(6);
expect(data).to.eql(Buffer.from([0, 1, 2, 3, 4, 5]));
expect(buffer).property('length').to.equal(0);
});
});
});
});