Skip to content

Commit

Permalink
feat(concurrency): ability to limit max simultaneous requests (#76)
Browse files Browse the repository at this point in the history
* rebase to devel current, yahooFinanceFetch typescript
* implement our own Queue that relays promises
* coverage
  • Loading branch information
gadicc authored Apr 12, 2021
1 parent 28664fa commit 3424d44
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 4 deletions.
17 changes: 16 additions & 1 deletion src/lib/options.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,21 @@
export default {
// TODO, keep defaults there too?
import type { ValidationOptions } from "./validateAndCoerceTypes";
import type { QueueOptions } from "./queue";

export interface Options {
queue: QueueOptions;
validation: ValidationOptions;
}

const options: Options = {
queue: {
concurrency: 4, // Min: 1, Max: Infinity
timeout: 60,
},
validation: {
logErrors: true,
logOptionsErrors: true,
},
};

export default options;
48 changes: 48 additions & 0 deletions src/lib/queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
interface Job {
func: Function;
resolve: Function;
reject: Function;
}

export interface QueueOptions {
_queue?: Queue;
concurrency?: number;
timeout?: number; // TODO
}

export default class Queue {
concurrency: number = 1;

_running: number = 0;
_queue: Array<Job> = [];

constructor(opts: QueueOptions = {}) {
if (opts.concurrency) this.concurrency = opts.concurrency;
}

runNext() {
const job = this._queue.shift();
if (!job) return;

this._running++;
job
.func()
.then((result: any) => job.resolve(result))
.catch((error: any) => job.reject(error))
.finally(() => {
this._running--;
this.checkQueue();
});
}

checkQueue() {
if (this._running < this.concurrency) this.runNext();
}

add(func: Function) {
return new Promise((resolve, reject) => {
this._queue.push({ func, resolve, reject });
this.checkQueue();
});
}
}
2 changes: 1 addition & 1 deletion src/lib/validateAndCoerceTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ export function resolvePath(obj: any, instancePath: string) {
return ref;
}

interface ValidationOptions {
export interface ValidationOptions {
logErrors: boolean;
logOptionsErrors: boolean;
}
Expand Down
118 changes: 118 additions & 0 deletions src/lib/yahooFinanceFetch.spec.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
import * as util from "util";

import Queue from "./queue";
import _yahooFinanceFetch from "./yahooFinanceFetch";
import errors from "./errors";

import _env from "../env-node";
import _opts from "./options";

// https://dev.to/devcrafter91/elegant-way-to-check-if-a-promise-is-pending-577g
function isPending(promise: any) {
return util.inspect(promise).includes("pending");
}

describe("yahooFinanceFetch", () => {
const yahooFinanceFetch = _yahooFinanceFetch.bind({ _env, _opts });

Expand Down Expand Up @@ -50,4 +58,114 @@ describe("yahooFinanceFetch", () => {
)
).rejects.toBeInstanceOf(Error);
});

describe("concurrency", () => {
/*
process.on("unhandledRejection", (up) => {
console.error("Unhandled promise rejection!");
throw up;
});
*/

function immediate() {
return new Promise((resolve) => {
setImmediate(resolve);
});
}

function makeFetch() {
function fetch() {
return new Promise((resolve, reject) => {
fetch.fetches.push({
resolve,
reject,
resolveWith(obj: any) {
resolve({
ok: true,
async json() {
return obj;
},
});
return immediate();
},
});
});
}
fetch.fetches = [] as any[];
fetch.reset = function reset() {
// TODO check that all are resolved/rejected
fetch.fetches = [];
};
return fetch;
}

let env: any;
let yahooFinanceFetch: any;
let moduleOpts: any;

beforeEach(() => {
env = { ..._env, fetch: makeFetch() };
yahooFinanceFetch = _yahooFinanceFetch.bind({ _env: env, _opts });
moduleOpts = { queue: { _queue: new Queue() } };
});

it("Queue takes options in constructor", () => {
const queue = new Queue({ concurrency: 5 });
expect(queue.concurrency).toBe(5);
});

it("yahooFinanceFetch branch check for alternate queue", () => {
const promises = [
yahooFinanceFetch("", {}),
yahooFinanceFetch("", {}, {}),
yahooFinanceFetch("", {}, { queue: {} }),
];

env.fetch.fetches[0].resolveWith({ ok: true });
env.fetch.fetches[1].resolveWith({ ok: true });
env.fetch.fetches[2].resolveWith({ ok: true });

return Promise.all(promises);
});

it("assert defualts to {} for empty queue opts", () => {
moduleOpts.queue.concurrency = 1;
const opts = { ..._opts };
// @ts-ignore: intentional to test runtime failures
delete opts.queue;
const yahooFinanceFetch = _yahooFinanceFetch.bind({ _env: env, _opts });

const promise = yahooFinanceFetch("", {}, moduleOpts);
env.fetch.fetches[0].resolveWith({ ok: true });
return expect(promise).resolves.toMatchObject({ ok: true });
});

it("single item in queue", () => {
moduleOpts.queue.concurrency = 1;

const promise = yahooFinanceFetch("", {}, moduleOpts);
env.fetch.fetches[0].resolveWith({ ok: true });
return expect(promise).resolves.toMatchObject({ ok: true });
});

it("waits if exceeding concurrency max", async () => {
moduleOpts.queue.concurrency = 1;

const promises = [
yahooFinanceFetch("", {}, moduleOpts),
yahooFinanceFetch("", {}, moduleOpts),
];

// Second func should not be called until 1st reoslves (limit 1)
expect(env.fetch.fetches.length).toBe(1);

await env.fetch.fetches[0].resolveWith({ ok: true });
expect(env.fetch.fetches.length).toBe(2);

await env.fetch.fetches[1].resolveWith({ ok: true });
await Promise.all(promises);
});

// TODO, timeout test
});
});
27 changes: 25 additions & 2 deletions src/lib/yahooFinanceFetch.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
import Queue from "./queue";

import type { Options } from "./options";
import type { QueueOptions } from "./queue";

import errors from "./errors";
import pkg from "../../package.json";

Expand All @@ -13,12 +18,27 @@ interface YahooFinanceFetchThisEnv {
interface YahooFinanceFetchThis {
[key: string]: any;
_env: YahooFinanceFetchThisEnv;
_opts: Object;
_opts: Options;
}

interface YahooFinanceFetchModuleOptions {
devel?: string | boolean;
fetchOptions?: Object;
queue?: QueueOptions;
}

const _queue = new Queue();

function assertQueueOptions(queue: any, opts: any) {
opts; //?
if (
typeof opts.concurrency === "number" &&
queue.concurrency !== opts.concurrency
)
queue.concurrency = opts.concurrency;

if (typeof opts.timeout === "number" && queue.timeout !== opts.timeout)
queue.timeout = opts.timeout;
}

async function yahooFinanceFetch(
Expand All @@ -33,6 +53,9 @@ async function yahooFinanceFetch(
"yahooFinanceFetch called without this._env set"
);

const queue = moduleOpts.queue?._queue || _queue;
assertQueueOptions(queue, { ...this._opts.queue, ...moduleOpts.queue });

const { URLSearchParams, fetch, fetchDevel } = this._env;

// @ts-ignore TODO copy interface? @types lib?
Expand All @@ -52,7 +75,7 @@ async function yahooFinanceFetch(
// used in moduleExec.ts
if (func === "csv") func = "text";

const res = await fetchFunc(url, fetchOptions);
const res = (await queue.add(() => fetchFunc(url, fetchOptions))) as any;
const result = await res[func]();

/*
Expand Down

0 comments on commit 3424d44

Please sign in to comment.