diff --git a/dev/src/rate-limiter.ts b/dev/src/rate-limiter.ts new file mode 100644 index 000000000..e2d7be390 --- /dev/null +++ b/dev/src/rate-limiter.ts @@ -0,0 +1,153 @@ +/*! + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import * as assert from 'assert'; + +import {Timestamp} from './timestamp'; + +/** + * A helper that uses the Token Bucket algorithm to rate limit the number of + * operations that can be made in a second. + * + * Before a given request containing a number of operations can proceed, + * RateLimiter determines doing so stays under the provided rate limits. It can + * also determine how much time is required before a request can be made. + * + * RateLimiter can also implement a gradually increasing rate limit. This is + * used to enforce the 500/50/5 rule + * (https://cloud.google.com/datastore/docs/best-practices#ramping_up_traffic). + * + * @private + */ +export class RateLimiter { + // Number of tokens available. Each operation consumes one token. + availableTokens: number; + + // When the token bucket was last refilled. + lastRefillTimeMillis: number; + + /** + * @param initialCapacity Initial maximum number of operations per second. + * @param multiplier Rate by which to increase the capacity. + * @param multiplierMillis How often the capacity should increase in + * milliseconds. + * @param startTimeMillis The starting time in epoch milliseconds that the + * rate limit is based on. Used for testing the limiter. + */ + constructor( + private readonly initialCapacity: number, + private readonly multiplier: number, + private readonly multiplierMillis: number, + private readonly startTimeMillis = Date.now() + ) { + this.availableTokens = initialCapacity; + this.lastRefillTimeMillis = startTimeMillis; + } + + /** + * Tries to make the number of operations. Returns true if the request + * succeeded and false otherwise. + * + * @param requestTimeMillis The date used to calculate the number of available + * tokens. Used for testing the limiter. + * @private + */ + tryMakeRequest( + numOperations: number, + requestTimeMillis = Date.now() + ): boolean { + this.refillTokens(requestTimeMillis); + if (numOperations <= this.availableTokens) { + this.availableTokens -= numOperations; + return true; + } + return false; + } + + /** + * Returns the number of ms needed to make a request with the provided number + * of operations. Returns 0 if the request can be made with the existing + * capacity. Returns -1 if the request is not possible with the current + * capacity. + * + * @param requestTimeMillis The date used to calculate the number of available + * tokens. Used for testing the limiter. + * @private + */ + getNextRequestDelayMs( + numOperations: number, + requestTimeMillis = Date.now() + ): number { + if (numOperations < this.availableTokens) { + return 0; + } + + const capacity = this.calculateCapacity(requestTimeMillis); + if (capacity < numOperations) { + return -1; + } + + const requiredTokens = numOperations - this.availableTokens; + return Math.ceil((requiredTokens * 1000) / capacity); + } + + /** + * Refills the number of available tokens based on how much time has elapsed + * since the last time the tokens were refilled. + * + * @param requestTimeMillis The date used to calculate the number of available + * tokens. Used for testing the limiter. + * @private + */ + private refillTokens(requestTimeMillis = Date.now()): void { + if (requestTimeMillis >= this.lastRefillTimeMillis) { + const elapsedTime = requestTimeMillis - this.lastRefillTimeMillis; + const capacity = this.calculateCapacity(requestTimeMillis); + const tokensToAdd = Math.floor((elapsedTime * capacity) / 1000); + if (tokensToAdd > 0) { + this.availableTokens = Math.min( + capacity, + this.availableTokens + tokensToAdd + ); + this.lastRefillTimeMillis = requestTimeMillis; + } + } else { + throw new Error( + 'Request time should not be before the last token refill time.' + ); + } + } + + /** + * Calculates the maximum capacity based on the provided date. + * + * @private + */ + // Visible for testing. + calculateCapacity(requestTimeMillis: number): number { + assert( + requestTimeMillis >= this.startTimeMillis, + 'startTime cannot be after currentTime' + ); + const millisElapsed = requestTimeMillis - this.startTimeMillis; + const operationsPerSecond = Math.floor( + Math.pow( + this.multiplier, + Math.floor(millisElapsed / this.multiplierMillis) + ) * this.initialCapacity + ); + return operationsPerSecond; + } +} diff --git a/dev/test/rate-limiter.ts b/dev/test/rate-limiter.ts new file mode 100644 index 000000000..e3271303d --- /dev/null +++ b/dev/test/rate-limiter.ts @@ -0,0 +1,109 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {expect} from 'chai'; + +import {RateLimiter} from '../src/rate-limiter'; + +describe('RateLimiter', () => { + let limiter: RateLimiter; + + beforeEach(() => { + limiter = new RateLimiter( + /* initialCapacity= */ 500, + /* multiplier= */ 1.5, + /* multiplierMillis= */ 5 * 60 * 1000, + /* startTime= */ new Date(0).getTime() + ); + }); + + it('accepts and rejects requests based on capacity', () => { + expect(limiter.tryMakeRequest(250, new Date(0).getTime())).to.be.true; + expect(limiter.tryMakeRequest(250, new Date(0).getTime())).to.be.true; + + // Once tokens have been used, further requests should fail. + expect(limiter.tryMakeRequest(1, new Date(0).getTime())).to.be.false; + + // Tokens will only refill up to max capacity. + expect(limiter.tryMakeRequest(501, new Date(1 * 1000).getTime())).to.be + .false; + expect(limiter.tryMakeRequest(500, new Date(1 * 1000).getTime())).to.be + .true; + + // Tokens will refill incrementally based on the number of ms elapsed. + expect(limiter.tryMakeRequest(250, new Date(1 * 1000 + 499).getTime())).to + .be.false; + expect(limiter.tryMakeRequest(249, new Date(1 * 1000 + 500).getTime())).to + .be.true; + + // Scales with multiplier. + expect(limiter.tryMakeRequest(751, new Date((5 * 60 - 1) * 1000).getTime())) + .to.be.false; + expect(limiter.tryMakeRequest(751, new Date(5 * 60 * 1000).getTime())).to.be + .false; + expect(limiter.tryMakeRequest(750, new Date(5 * 60 * 1000).getTime())).to.be + .true; + + // Tokens will never exceed capacity. + expect(limiter.tryMakeRequest(751, new Date((5 * 60 + 3) * 1000).getTime())) + .to.be.false; + + // Rejects requests made before lastRefillTime + expect(() => + limiter.tryMakeRequest(751, new Date((5 * 60 + 2) * 1000).getTime()) + ).to.throw('Request time should not be before the last token refill time.'); + }); + + it('calculates the number of ms needed to place the next request', () => { + // Should return 0 if there are enough tokens for the request to be made. + let timestamp = new Date(0).getTime(); + expect(limiter.getNextRequestDelayMs(500, timestamp)).to.equal(0); + + // Should factor in remaining tokens when calculating the time. + expect(limiter.tryMakeRequest(250, timestamp)); + expect(limiter.getNextRequestDelayMs(500, timestamp)).to.equal(500); + + // Once tokens have been used, should calculate time before next request. + timestamp = new Date(1 * 1000).getTime(); + expect(limiter.tryMakeRequest(500, timestamp)).to.be.true; + expect(limiter.getNextRequestDelayMs(100, timestamp)).to.equal(200); + expect(limiter.getNextRequestDelayMs(250, timestamp)).to.equal(500); + expect(limiter.getNextRequestDelayMs(500, timestamp)).to.equal(1000); + expect(limiter.getNextRequestDelayMs(501, timestamp)).to.equal(-1); + + // Scales with multiplier. + timestamp = new Date(5 * 60 * 1000).getTime(); + expect(limiter.tryMakeRequest(750, timestamp)).to.be.true; + expect(limiter.getNextRequestDelayMs(250, timestamp)).to.equal(334); + expect(limiter.getNextRequestDelayMs(500, timestamp)).to.equal(667); + expect(limiter.getNextRequestDelayMs(750, timestamp)).to.equal(1000); + expect(limiter.getNextRequestDelayMs(751, timestamp)).to.equal(-1); + }); + + it('calculates the maximum number of operations correctly', async () => { + expect(limiter.calculateCapacity(new Date(0).getTime())).to.equal(500); + expect( + limiter.calculateCapacity(new Date(5 * 60 * 1000).getTime()) + ).to.equal(750); + expect( + limiter.calculateCapacity(new Date(10 * 60 * 1000).getTime()) + ).to.equal(1125); + expect( + limiter.calculateCapacity(new Date(15 * 60 * 1000).getTime()) + ).to.equal(1687); + expect( + limiter.calculateCapacity(new Date(90 * 60 * 1000).getTime()) + ).to.equal(738945); + }); +});