Skip to content

Commit

Permalink
refactoring + stream processing
Browse files Browse the repository at this point in the history
  • Loading branch information
bmacnaughton committed Jan 22, 2025
1 parent c61d8b9 commit 02c94ed
Show file tree
Hide file tree
Showing 6 changed files with 422 additions and 60 deletions.
4 changes: 3 additions & 1 deletion lib/log-processor/log-processor.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ export default class LogProcessor {
proc.cpuUserPercent = proc.cpuUserTotal / elapsedMicros;
proc.cpuSystemPercent = proc.cpuSystemTotal / elapsedMicros;
}
this.routeMetricsResults.push(new RouteMetricsResults(this.summary.lines));
const rmr = new RouteMetricsResults();
rmr.indexArray(this.summary.lines);
this.routeMetricsResults.push(rmr);
}

/**
Expand Down
205 changes: 176 additions & 29 deletions lib/log-processor/utils/index.mjs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import {createHistogram} from 'node:perf_hooks';
// import {createHistogram} from 'node:perf_hooks';
// import assert from 'node:assert';
import readline from 'node:readline';

import TypeHeader from './types/typeHeader.mjs';
import TypeProc from './types/typeProc.mjs';
Expand All @@ -9,16 +11,22 @@ import TypeLoad from './types/typeLoad.mjs';
import TypeRoute from './types/typeRoute.mjs';
import TypeStatus from './types/typeStatus.mjs';

import ParseError from './types/metaParseError.mjs';

// import KeyedRoute from './types/metaKeyedRoute.mjs';

export default class RouteMetricsResults {
//
// invoke with the logObjects from a route-metrics log file.
//
constructor(logObjects, options = {}) {
constructor(options = {}) {
// no longer needed as TypeBase inserts in sorted order
//logObjects.sort((a, b) => a.ts - b.ts);
this.logObjects = logObjects;
this.logObjects = null;
this.first = null;
this.last = null;
// type 'route'
this.routesIndex = Object.create(null);
//this.routesIndex = Object.create(null);
// every type other than 'route'
this.typesIndex = Object.create(null);

Expand All @@ -35,47 +43,60 @@ export default class RouteMetricsResults {
this.route = this.typesIndex.route = new TypeRoute();
this.status = this.typesIndex.status = new TypeStatus();

// create the indexes and find the earliest and latest timestamps.
this.preprocess();
// for groups of routes. if no foldRules are specified, this is the same
// as each individual route. if foldRules are specified, multiple routes
// are grouped according to the foldRules.
this.keyedRoutes = Object.create(null);

// for parse errors
this.parseErrors = new ParseError();
}

//
// - build an index by route name. a name is defined by 'METHOD path'. locust
// names are where the path is found, but they sometimes have a description,
// e.g., '/api/users (register)', so this combines the method and name after
// truncating the name at the first space.
// index an array of preparsed logObjects.
//
// - add the route key to the route object
//
// - find the earliest and latest timestamps in locust output. this is the
// only way to estimate the start and end times of the locust run.
//
preprocess() {
for (const logObject of this.logObjects) {
indexArray(logObjects) {
// not clear the benefit of keeping all the log objects separately.
//this.logObjects = logObjects;

for (const logObject of logObjects) {
if (!this.first || logObject.ts < this.first.ts) {
this.first = logObject;
}
if (!this.last || logObject.ts > this.last.ts) {
this.last = logObject;
}

// routes get indexed by a key that includes elements of the url and
// whether the user specifies fold rules. fold rules are used to merge
// multiple urls into a single name, e.g., /api/users/1, /api/users/2.
// TODO NODE-3712 fold rules are currently implemented in route-metrics
// log-processor but should be moved to this preprocessing step.
//
// routes are also stored like any other type, but no other types are
// handled multiple ways like routes.
//
// TODO NODE-3712 currently implemented in route-metrics log-processor
// but should be here (fold rules).
if (logObject.type === 'route') {
let url = logObject.entry.url;
// there are no fold rules yet, but here's where i think the logic will
// go when i move them from the csv/json reporters. foldRules will result
// in KeyedRoute objects being created for each foldRule a route matches.
//
for (const foldRule of this.foldRules) {
if (foldRule.pat.test(logObject.entry.url)) {
url = foldRule.fold;
break;
}
}
const routeKey = RouteMetricsRoute.makeKey(logObject.entry.method, url);
let route = this.routesIndex[routeKey];
if (!route) {
route = new RouteMetricsRoute(logObject.entry.method, routeKey, logObject.entry.url);
this.routesIndex[routeKey] = route;
}
route.add(logObject);

// //
// // left as an example of how to use KeyedRoute
// //
// const rk = KeyedRoute.makeKey(logObject.entry.method, logObject.entry.url);
// const keyedRoute = this.keyedRoutes[rk];
// if (!keyedRoute) {
// this.keyedRoutes[rk] = new KeyedRoute(rk, logObject.entry.method, logObject.entry.url);
// }
// this.keyedRoutes[rk].add(logObject);
} else if (logObject.type === 'unknown-config-items') {
continue;
}
Expand All @@ -89,6 +110,118 @@ export default class RouteMetricsResults {
}
type.add(logObject);
}

// // integrity check while debugging
// for (const key in this.keyedRoutes) {
// const k = this.keyedRoutes[key];
// const i = this.routesIndex[key];
// check(k, i);
// }
}

async indexStream(stream) {
const generator = readline.createInterface({
input: stream,
crlfDelay: Infinity,
});

const gen = generator[Symbol.asyncIterator]();

// console.log(await gen.next());

// for await (const line of gen) {
// console.log(line);
// }
return this.indexAsyncGenerator(gen);
}

async indexAsyncGenerator(gen) {
let ix = -1;
let lastLogObject = null;

// verify the first line is a header.
try {
const {done, value: line} = await gen.next();
if (done) {
throw new Error('empty log file');
}
lastLogObject = JSON.parse(line);

if (!lastLogObject.ts || !lastLogObject.type || !lastLogObject.entry) {
throw new Error('invalid log entry');
}
if (lastLogObject.type !== 'header') {
throw new Error('first log entry must be a header');
}
this.header.add(lastLogObject);
this.first = this.last = lastLogObject;
} catch (e) {
throw new Error(`error parsing header: ${e.message}`);
}

for await (const line of gen) {
ix += 1;
try {
const logObject = JSON.parse(line);
if (!logObject.ts || !logObject.type || !logObject.entry) {
throw new Error('invalid log entry');
}

if (logObject.ts < this.first.ts) {
this.first = logObject;
}
if (logObject.ts > this.last.ts) {
this.last = logObject;
}

// routes get indexed by a key that includes elements of the url and
// whether the user specifies fold rules. fold rules are used to merge
// multiple urls into a single name, e.g., /api/users/1, /api/users/2.
// TODO NODE-3712 fold rules are currently implemented in route-metrics
// log-processor but should be moved to this preprocessing step.
//
// routes are also stored like any other type, but no other types are
// handled multiple ways like routes.
if (logObject.type === 'route') {
let url = logObject.entry.url;
// there are no fold rules yet, but here's where i think the logic will
// go when i move them from the csv/json reporters. foldRules will result
// in KeyedRoute objects being created for each foldRule a route matches.
//
for (const foldRule of this.foldRules) {
if (foldRule.pat.test(logObject.entry.url)) {
url = foldRule.fold;
break;
}
}

// //
// // left as an example of how to use KeyedRoute
// //
// const rk = KeyedRoute.makeKey(logObject.entry.method, logObject.entry.url);
// const keyedRoute = this.keyedRoutes[rk];
// if (!keyedRoute) {
// this.keyedRoutes[rk] = new KeyedRoute(rk, logObject.entry.method, logObject.entry.url);
// }
// this.keyedRoutes[rk].add(logObject);
} else if (logObject.type === 'unknown-config-items') {
continue;
}

// now each type is captured in its own index with type-specific
// behavior.
const type = this.typesIndex[logObject.type];
if (!type) {
// it's a bug in the code; fix it.
throw new Error(`no instance for ${logObject.type}`);
}
type.add(logObject);
} catch (e) {
// here when the line couldn't be parsed as JSON (or some unanticipated
// error in the try block).
this.parseErrors.add(e, line);
}
}
}

makeRouteKey(route) {
Expand All @@ -112,11 +245,11 @@ export default class RouteMetricsResults {
}

get earliestTimestamp() {
return this.logObjects[0].ts;
return this.first.ts;
}

get latestTimestamp() {
return this.logObjects.at(-1).ts;
return this.last.ts;
}

*iterateRoutes() {
Expand All @@ -143,6 +276,7 @@ export async function readRouteMetricsLog(filepath) {
//
// class to hold the route metrics logObjects for a single route.
//
/*
class RouteMetricsRoute {
static makeKey(method, url) {
return `${method} ${url}`;
Expand Down Expand Up @@ -225,3 +359,16 @@ class RouteMetricsRoute {
return intervals;
}
}
// */

/*
function check(left, right) {
const things = [
'key', 'method', 'url', 'logObjects', 'min', 'max', 'totalEt', '_successCount', 'histogram',
'earliestTimestamp', 'latestTimestamp', 'earliest', 'latest', 'count', 'successCount', 'mean',
];
for (const thing of things) {
assert.deepStrictEqual(left[thing], right[thing]);
}
}
// */
93 changes: 93 additions & 0 deletions lib/log-processor/utils/index.test.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import assert from 'node:assert';
import fs from 'node:fs';

import RouteMetricsResults from './index.mjs';
import read from './read-log.mjs';

const files = [
{name: 'minimal-route-metrics.log', counts: {
lines: 16,
header: 1,
patch: 3,
status: 1,
proc: 4,
eventloop: 4,
gc: 0,
route: 3,
}},
{name: 'nrwb-route-metrics.log', counts: {
lines: 2075,
header: 1,
patch: 1,
status: 1,
proc: 13,
eventloop: 0,
gc: 11,
route: 2048,
}},
];

describe('RouteMetricsResults', function() {
let arrayRmr;
let streamRmr;

for (const {name, counts} of files) {
const {lines, header, patch, status, proc, eventloop, gc, route} = counts;
const file = `./test/log-files/${name}`;
it(`${name}: read an array of log lines`, async function() {
const logLines = await read(file);
// captured with wc -l
assert.equal(logLines.length, lines);
logLines.forEach(line => assert.equal(typeof line, 'string'));
});

it(`${name}: should convert lines to logObjects`, async function() {
const logObjects = await read(file, {convert: true});
assert.equal(logObjects.length, lines);
logObjects.forEach(line => assert.equal(typeof line, 'object'));
});

it(`${name}: indexes an array of logObjects`, async function() {
const logObjects = await read(file, {convert: true});
const rmr = new RouteMetricsResults();
rmr.indexArray(logObjects);

assert.equal(rmr.header.count, header, `headers expected: ${header}`);
assert.equal(rmr.patch.count, patch, `patches expected: ${patch}`);
assert.equal(rmr.status.count, status, `statuses expected: ${status}`);
assert.equal(rmr.proc.count, proc, `procs expected: ${proc}`);
assert.equal(rmr.eventloop.count, eventloop, `eventloops expected: ${eventloop}`);
assert.equal(rmr.gc.count, gc, `gc expected: ${gc}`);
assert.equal(rmr.route.count, route, `routes expected: ${route}`);

arrayRmr = rmr;
});

it(`${name}: indexes a log file stream`, async function() {
const stream = fs.createReadStream(file);
const rmr = new RouteMetricsResults();
await rmr.indexStream(stream);

assert.equal(rmr.header.count, header, `headers expected: ${header}`);
assert.equal(rmr.patch.count, patch, `patches expected: ${patch}`);
assert.equal(rmr.status.count, status, `statuses expected: ${status}`);
assert.equal(rmr.proc.count, proc, `procs expected: ${proc}`);
assert.equal(rmr.eventloop.count, eventloop, `eventloops expected: ${eventloop}`);
assert.equal(rmr.gc.count, gc, `gc expected: ${gc}`);
assert.equal(rmr.route.count, route, `routes expected: ${route}`);

streamRmr = rmr;
});

it('creates identical RouteMetricsResults from array and stream', function() {
// this first loop is redundant to the last deepEqual, but it will gives
// better messages if a test fails.
const things = ['first', 'last', 'typesIndex', 'foldRules', 'keyedRoutes', 'parseErrors'];
for (const thing of things) {
assert.deepEqual(arrayRmr[thing], streamRmr[thing], `expected ${thing} to be the same`);
}
// now check everything.
assert.deepEqual(arrayRmr, streamRmr);
});
}
});
Loading

0 comments on commit 02c94ed

Please sign in to comment.