From 02c94ed78af3885bf63463fd0965e6db6cb14eb3 Mon Sep 17 00:00:00 2001 From: bmacnaughton Date: Tue, 21 Jan 2025 18:46:04 -0800 Subject: [PATCH] refactoring + stream processing --- lib/log-processor/log-processor.mjs | 4 +- lib/log-processor/utils/index.mjs | 205 +++++++++++++++--- lib/log-processor/utils/index.test.mjs | 93 ++++++++ lib/log-processor/utils/read-log.mjs | 34 +-- .../utils/types/metaKeyedRoute.mjs | 98 +++++++++ .../utils/types/metaParseError.mjs | 48 ++++ 6 files changed, 422 insertions(+), 60 deletions(-) create mode 100644 lib/log-processor/utils/index.test.mjs create mode 100644 lib/log-processor/utils/types/metaKeyedRoute.mjs create mode 100644 lib/log-processor/utils/types/metaParseError.mjs diff --git a/lib/log-processor/log-processor.mjs b/lib/log-processor/log-processor.mjs index 7064785..d53ac59 100644 --- a/lib/log-processor/log-processor.mjs +++ b/lib/log-processor/log-processor.mjs @@ -112,7 +112,9 @@ export default class LogProcessor { proc.cpuUserPercent = proc.cpuUserTotal / elapsedMicros; proc.cpuSystemPercent = proc.cpuSystemTotal / elapsedMicros; } - this.routeMetricsResults.push(new RouteMetricsResults(this.summary.lines)); + const rmr = new RouteMetricsResults(); + rmr.indexArray(this.summary.lines); + this.routeMetricsResults.push(rmr); } /** diff --git a/lib/log-processor/utils/index.mjs b/lib/log-processor/utils/index.mjs index 1dea4a8..aa57ed8 100644 --- a/lib/log-processor/utils/index.mjs +++ b/lib/log-processor/utils/index.mjs @@ -1,4 +1,6 @@ -import {createHistogram} from 'node:perf_hooks'; +// import {createHistogram} from 'node:perf_hooks'; +// import assert from 'node:assert'; +import readline from 'node:readline'; import TypeHeader from './types/typeHeader.mjs'; import TypeProc from './types/typeProc.mjs'; @@ -9,16 +11,22 @@ import TypeLoad from './types/typeLoad.mjs'; import TypeRoute from './types/typeRoute.mjs'; import TypeStatus from './types/typeStatus.mjs'; +import ParseError from './types/metaParseError.mjs'; + +// import KeyedRoute from './types/metaKeyedRoute.mjs'; + export default class RouteMetricsResults { // // invoke with the logObjects from a route-metrics log file. // - constructor(logObjects, options = {}) { + constructor(options = {}) { // no longer needed as TypeBase inserts in sorted order //logObjects.sort((a, b) => a.ts - b.ts); - this.logObjects = logObjects; + this.logObjects = null; + this.first = null; + this.last = null; // type 'route' - this.routesIndex = Object.create(null); + //this.routesIndex = Object.create(null); // every type other than 'route' this.typesIndex = Object.create(null); @@ -35,47 +43,60 @@ export default class RouteMetricsResults { this.route = this.typesIndex.route = new TypeRoute(); this.status = this.typesIndex.status = new TypeStatus(); - // create the indexes and find the earliest and latest timestamps. - this.preprocess(); + // for groups of routes. if no foldRules are specified, this is the same + // as each individual route. if foldRules are specified, multiple routes + // are grouped according to the foldRules. + this.keyedRoutes = Object.create(null); + + // for parse errors + this.parseErrors = new ParseError(); } // - // - build an index by route name. a name is defined by 'METHOD path'. locust - // names are where the path is found, but they sometimes have a description, - // e.g., '/api/users (register)', so this combines the method and name after - // truncating the name at the first space. + // index an array of preparsed logObjects. // - // - add the route key to the route object - // - // - find the earliest and latest timestamps in locust output. this is the - // only way to estimate the start and end times of the locust run. - // - preprocess() { - for (const logObject of this.logObjects) { + indexArray(logObjects) { + // not clear the benefit of keeping all the log objects separately. + //this.logObjects = logObjects; + + for (const logObject of logObjects) { + if (!this.first || logObject.ts < this.first.ts) { + this.first = logObject; + } + if (!this.last || logObject.ts > this.last.ts) { + this.last = logObject; + } + // routes get indexed by a key that includes elements of the url and // whether the user specifies fold rules. fold rules are used to merge // multiple urls into a single name, e.g., /api/users/1, /api/users/2. + // TODO NODE-3712 fold rules are currently implemented in route-metrics + // log-processor but should be moved to this preprocessing step. // // routes are also stored like any other type, but no other types are // handled multiple ways like routes. - // - // TODO NODE-3712 currently implemented in route-metrics log-processor - // but should be here (fold rules). if (logObject.type === 'route') { let url = logObject.entry.url; + // there are no fold rules yet, but here's where i think the logic will + // go when i move them from the csv/json reporters. foldRules will result + // in KeyedRoute objects being created for each foldRule a route matches. + // for (const foldRule of this.foldRules) { if (foldRule.pat.test(logObject.entry.url)) { url = foldRule.fold; break; } } - const routeKey = RouteMetricsRoute.makeKey(logObject.entry.method, url); - let route = this.routesIndex[routeKey]; - if (!route) { - route = new RouteMetricsRoute(logObject.entry.method, routeKey, logObject.entry.url); - this.routesIndex[routeKey] = route; - } - route.add(logObject); + + // // + // // left as an example of how to use KeyedRoute + // // + // const rk = KeyedRoute.makeKey(logObject.entry.method, logObject.entry.url); + // const keyedRoute = this.keyedRoutes[rk]; + // if (!keyedRoute) { + // this.keyedRoutes[rk] = new KeyedRoute(rk, logObject.entry.method, logObject.entry.url); + // } + // this.keyedRoutes[rk].add(logObject); } else if (logObject.type === 'unknown-config-items') { continue; } @@ -89,6 +110,118 @@ export default class RouteMetricsResults { } type.add(logObject); } + + // // integrity check while debugging + // for (const key in this.keyedRoutes) { + // const k = this.keyedRoutes[key]; + // const i = this.routesIndex[key]; + // check(k, i); + // } + } + + async indexStream(stream) { + const generator = readline.createInterface({ + input: stream, + crlfDelay: Infinity, + }); + + const gen = generator[Symbol.asyncIterator](); + + // console.log(await gen.next()); + + // for await (const line of gen) { + // console.log(line); + // } + return this.indexAsyncGenerator(gen); + } + + async indexAsyncGenerator(gen) { + let ix = -1; + let lastLogObject = null; + + // verify the first line is a header. + try { + const {done, value: line} = await gen.next(); + if (done) { + throw new Error('empty log file'); + } + lastLogObject = JSON.parse(line); + + if (!lastLogObject.ts || !lastLogObject.type || !lastLogObject.entry) { + throw new Error('invalid log entry'); + } + if (lastLogObject.type !== 'header') { + throw new Error('first log entry must be a header'); + } + this.header.add(lastLogObject); + this.first = this.last = lastLogObject; + } catch (e) { + throw new Error(`error parsing header: ${e.message}`); + } + + for await (const line of gen) { + ix += 1; + try { + const logObject = JSON.parse(line); + if (!logObject.ts || !logObject.type || !logObject.entry) { + throw new Error('invalid log entry'); + } + + if (logObject.ts < this.first.ts) { + this.first = logObject; + } + if (logObject.ts > this.last.ts) { + this.last = logObject; + } + + // routes get indexed by a key that includes elements of the url and + // whether the user specifies fold rules. fold rules are used to merge + // multiple urls into a single name, e.g., /api/users/1, /api/users/2. + // TODO NODE-3712 fold rules are currently implemented in route-metrics + // log-processor but should be moved to this preprocessing step. + // + // routes are also stored like any other type, but no other types are + // handled multiple ways like routes. + if (logObject.type === 'route') { + let url = logObject.entry.url; + // there are no fold rules yet, but here's where i think the logic will + // go when i move them from the csv/json reporters. foldRules will result + // in KeyedRoute objects being created for each foldRule a route matches. + // + for (const foldRule of this.foldRules) { + if (foldRule.pat.test(logObject.entry.url)) { + url = foldRule.fold; + break; + } + } + + // // + // // left as an example of how to use KeyedRoute + // // + // const rk = KeyedRoute.makeKey(logObject.entry.method, logObject.entry.url); + // const keyedRoute = this.keyedRoutes[rk]; + // if (!keyedRoute) { + // this.keyedRoutes[rk] = new KeyedRoute(rk, logObject.entry.method, logObject.entry.url); + // } + // this.keyedRoutes[rk].add(logObject); + } else if (logObject.type === 'unknown-config-items') { + continue; + } + + // now each type is captured in its own index with type-specific + // behavior. + const type = this.typesIndex[logObject.type]; + if (!type) { + // it's a bug in the code; fix it. + throw new Error(`no instance for ${logObject.type}`); + } + type.add(logObject); + } catch (e) { + // here when the line couldn't be parsed as JSON (or some unanticipated + // error in the try block). + this.parseErrors.add(e, line); + } + } } makeRouteKey(route) { @@ -112,11 +245,11 @@ export default class RouteMetricsResults { } get earliestTimestamp() { - return this.logObjects[0].ts; + return this.first.ts; } get latestTimestamp() { - return this.logObjects.at(-1).ts; + return this.last.ts; } *iterateRoutes() { @@ -143,6 +276,7 @@ export async function readRouteMetricsLog(filepath) { // // class to hold the route metrics logObjects for a single route. // +/* class RouteMetricsRoute { static makeKey(method, url) { return `${method} ${url}`; @@ -225,3 +359,16 @@ class RouteMetricsRoute { return intervals; } } +// */ + +/* +function check(left, right) { + const things = [ + 'key', 'method', 'url', 'logObjects', 'min', 'max', 'totalEt', '_successCount', 'histogram', + 'earliestTimestamp', 'latestTimestamp', 'earliest', 'latest', 'count', 'successCount', 'mean', + ]; + for (const thing of things) { + assert.deepStrictEqual(left[thing], right[thing]); + } +} +// */ diff --git a/lib/log-processor/utils/index.test.mjs b/lib/log-processor/utils/index.test.mjs new file mode 100644 index 0000000..44b68e6 --- /dev/null +++ b/lib/log-processor/utils/index.test.mjs @@ -0,0 +1,93 @@ +import assert from 'node:assert'; +import fs from 'node:fs'; + +import RouteMetricsResults from './index.mjs'; +import read from './read-log.mjs'; + +const files = [ + {name: 'minimal-route-metrics.log', counts: { + lines: 16, + header: 1, + patch: 3, + status: 1, + proc: 4, + eventloop: 4, + gc: 0, + route: 3, + }}, + {name: 'nrwb-route-metrics.log', counts: { + lines: 2075, + header: 1, + patch: 1, + status: 1, + proc: 13, + eventloop: 0, + gc: 11, + route: 2048, + }}, +]; + +describe('RouteMetricsResults', function() { + let arrayRmr; + let streamRmr; + + for (const {name, counts} of files) { + const {lines, header, patch, status, proc, eventloop, gc, route} = counts; + const file = `./test/log-files/${name}`; + it(`${name}: read an array of log lines`, async function() { + const logLines = await read(file); + // captured with wc -l + assert.equal(logLines.length, lines); + logLines.forEach(line => assert.equal(typeof line, 'string')); + }); + + it(`${name}: should convert lines to logObjects`, async function() { + const logObjects = await read(file, {convert: true}); + assert.equal(logObjects.length, lines); + logObjects.forEach(line => assert.equal(typeof line, 'object')); + }); + + it(`${name}: indexes an array of logObjects`, async function() { + const logObjects = await read(file, {convert: true}); + const rmr = new RouteMetricsResults(); + rmr.indexArray(logObjects); + + assert.equal(rmr.header.count, header, `headers expected: ${header}`); + assert.equal(rmr.patch.count, patch, `patches expected: ${patch}`); + assert.equal(rmr.status.count, status, `statuses expected: ${status}`); + assert.equal(rmr.proc.count, proc, `procs expected: ${proc}`); + assert.equal(rmr.eventloop.count, eventloop, `eventloops expected: ${eventloop}`); + assert.equal(rmr.gc.count, gc, `gc expected: ${gc}`); + assert.equal(rmr.route.count, route, `routes expected: ${route}`); + + arrayRmr = rmr; + }); + + it(`${name}: indexes a log file stream`, async function() { + const stream = fs.createReadStream(file); + const rmr = new RouteMetricsResults(); + await rmr.indexStream(stream); + + assert.equal(rmr.header.count, header, `headers expected: ${header}`); + assert.equal(rmr.patch.count, patch, `patches expected: ${patch}`); + assert.equal(rmr.status.count, status, `statuses expected: ${status}`); + assert.equal(rmr.proc.count, proc, `procs expected: ${proc}`); + assert.equal(rmr.eventloop.count, eventloop, `eventloops expected: ${eventloop}`); + assert.equal(rmr.gc.count, gc, `gc expected: ${gc}`); + assert.equal(rmr.route.count, route, `routes expected: ${route}`); + + streamRmr = rmr; + }); + + it('creates identical RouteMetricsResults from array and stream', function() { + // this first loop is redundant to the last deepEqual, but it will gives + // better messages if a test fails. + const things = ['first', 'last', 'typesIndex', 'foldRules', 'keyedRoutes', 'parseErrors']; + for (const thing of things) { + assert.deepEqual(arrayRmr[thing], streamRmr[thing], `expected ${thing} to be the same`); + } + // now check everything. + assert.deepEqual(arrayRmr, streamRmr); + }); + } +}); diff --git a/lib/log-processor/utils/read-log.mjs b/lib/log-processor/utils/read-log.mjs index d3a7b5e..c680e33 100644 --- a/lib/log-processor/utils/read-log.mjs +++ b/lib/log-processor/utils/read-log.mjs @@ -6,43 +6,17 @@ import fsp from 'node:fs/promises'; // much more complex to handle all this with streaming data. // export default async function getRouteMetrics(filename, options = {}) { - const {convert = false, check = false} = options; + const {convert = false} = options; // read the route-metrics log file - const logText = await fsp.readFile(filename, {encoding: 'utf8'}); + let logText = await fsp.readFile(filename, {encoding: 'utf8'}); + logText = logText.split('\n').slice(0, -1); if (!convert) { return logText; } - const logObjects = convertLogText(logText); - - check && checkLogObjects(logObjects); + const logObjects = logText.map(JSON.parse); return logObjects; } - -export function convertLogText(logText) { - // convert the route metrics log entries to objects. - const logObjects = logText.split('\n').slice(0, -1).map(JSON.parse); - - return logObjects; -} - -export function checkLogObjects(logObjects) { - if (logObjects.length === 0) { - return false; - } - - if (logObjects[0].type !== 'header') { - return false; - } - - for (const logObject of logObjects) { - if (!logObject.ts || !logObject.type || !logObject.entry) { - return false; - } - } - - return true; -} diff --git a/lib/log-processor/utils/types/metaKeyedRoute.mjs b/lib/log-processor/utils/types/metaKeyedRoute.mjs new file mode 100644 index 0000000..9514066 --- /dev/null +++ b/lib/log-processor/utils/types/metaKeyedRoute.mjs @@ -0,0 +1,98 @@ +import {createHistogram} from 'node:perf_hooks'; + +import TypeBase from './_typeBase.mjs'; + +// +// class to hold the route metrics logObjects for a single route. +// +export default class RouteMetricsRoute extends TypeBase { + static makeKey(method, url) { + return `${method} ${url}`; + } + + constructor(key, method, url) { + // the key is the type for these instances + super(key); + + this.key = key; + this.method = method; + this.url = url; + this.logObjects = []; + this.min = Infinity; + this.max = 0; + this.totalEt = 0; + this._successCount = null; + this.histogram = createHistogram(); + } + + add(logObject) { + // logObject is a route, but the type of this instance is the key, so a bit + // of sleight-of-hand is needed. + const type = logObject.type; + logObject.type = this.key; + super.add(logObject); + logObject.type = type; + + this.totalEt += logObject.entry.et; + this.min = Math.min(this.min, logObject.entry.et); + this.max = Math.max(this.max, logObject.entry.et); + this.histogram.record(logObject.entry.et); + } + + get earliestTimestamp() { + return this.logObjects[0].ts; + } + + get latestTimestamp() { + return this.logObjects.at(-1).ts; + } + + get earliest() { + return this.logObjects[0]; + } + get latest() { + return this.logObjects.at(-1); + } + + get count() { + return this.logObjects.length; + } + + get successCount() { + if (this._successCount === null) { + this._successCount = 0; + for (const logObject of this.logObjects) { + if (logObject.entry.statusCode < 400) { + this._successCount += 1; + } + } + } + + return this._successCount; + } + + get mean() { + return this.totalEt / this.count; + } + + makePercentiles(percentiles = [50, 75, 90, 95, 99]) { + return percentiles.map(p => this.histogram.percentile(p)); + } + + makeRequestsPerSecond() { + const intervals = Object.create(null); + const intervalCount = Math.ceil((this.latestTimestamp - this.earliestTimestamp) / 1000); + const firstInterval = Math.floor(this.earliestTimestamp / 1000); + + for (let i = firstInterval; i <= firstInterval + intervalCount; i++) { + intervals[i] = 0; + } + + for (const logObject of this.logObjects) { + const interval = Math.floor(logObject.ts / 1000); + intervals[interval] += 1; + } + + return intervals; + } +} diff --git a/lib/log-processor/utils/types/metaParseError.mjs b/lib/log-processor/utils/types/metaParseError.mjs new file mode 100644 index 0000000..0d92280 --- /dev/null +++ b/lib/log-processor/utils/types/metaParseError.mjs @@ -0,0 +1,48 @@ +import TypeBase from './_typeBase.mjs'; + +// +// class to hold the route metrics logObjects for a single route. +// +export default class ParseError extends TypeBase { + static makeKey(method, url) { + return `${method} ${url}`; + } + + constructor() { + super('log-error'); + } + + add(error, logLine, previousLogObject = null) { + // we make a pseudo-logObject to capture the error + const logObject = { + ts: previousLogObject?.ts, + type: this.key, + tid: 0, + entry: { + error, + logLine, + previousLogObject, + } + }; + super.add(logObject); + } + + get earliestTimestamp() { + return this.logObjects[0].ts; + } + + get latestTimestamp() { + return this.logObjects.at(-1).ts; + } + + get earliest() { + return this.logObjects[0]; + } + get latest() { + return this.logObjects.at(-1); + } + + get count() { + return this.logObjects.length; + } +}