Skip to content

Commit

Permalink
Merge pull request #1479 from NASA-AMMOS/feature/seq-expansion-logging
Browse files Browse the repository at this point in the history
Better logging for sequencing server command expansion
  • Loading branch information
dandelany authored Jun 13, 2024
2 parents 0eee01d + bfb1237 commit e548fbc
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 29 deletions.
2 changes: 2 additions & 0 deletions sequencing-server/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const logger = getLogger('app');

const PORT: number = parseInt(getEnv().PORT, 10) ?? 27184;

logger.info(`Starting sequencing-server app on Node v${process.versions.node}...`);

const app: Application = express();
// WARNING: bodyParser.json() is vulnerable to a string too long issue. Iff that occurs,
// we should switch to a stream parser like https://www.npmjs.com/package/stream-json
Expand Down
43 changes: 19 additions & 24 deletions sequencing-server/src/backgroundTranspiler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import type { typecheckExpansion } from './worker';
import { Result } from '@nasa-jpl/aerie-ts-user-code-runner/build/utils/monads.js';
import { getLatestParcel, getLatestMissionModel, getExpansionRule } from './utils/hasura.js';
import type { Parcel } from './lib/batchLoaders/parcelBatchLoader.js';
import getLogger from './utils/logger.js';

const logger = getLogger('[ Background Transpiler ]');

export async function backgroundTranspiler(numberOfThreads: number = 2) {
if (graphqlClient === null) {
Expand All @@ -25,9 +28,7 @@ export async function backgroundTranspiler(numberOfThreads: number = 2) {
},
} = await getLatestMissionModel(graphqlClient);
if (!missionModelId) {
console.log(
'[ Background Transpiler ] Unable to fetch the latest mission model. Aborting background transpiling...',
);
logger.warn('Unable to fetch the latest mission model. Aborting background transpiling...');
return;
}

Expand All @@ -40,16 +41,14 @@ export async function backgroundTranspiler(numberOfThreads: number = 2) {
},
} = await getLatestParcel(graphqlClient);
if (!parcelID) {
console.log(
'[ Background Transpiler ] Unable to fetch the latest command dictionary. Aborting background transpiling...',
);
logger.warn('Unable to fetch the latest command dictionary. Aborting background transpiling...');
return;
}

const { expansion_rule } = await getExpansionRule(graphqlClient, missionModelId, parcelID);

if (expansion_rule === null || expansion_rule.length === 0) {
console.log(`[ Background Transpiler ] No expansion rules to transpile.`);
logger.info(`No expansion rules to transpile.`);
return;
}

Expand All @@ -71,8 +70,7 @@ export async function backgroundTranspiler(numberOfThreads: number = 2) {
});

if (parcel === null) {
console.log(`[ Background Transpiler ] Unable to fetch parcel.
Aborting transpiling...`);
logger.error(`Unable to fetch parcel.\nAborting transpiling...`);
return;
}

Expand All @@ -81,20 +79,19 @@ export async function backgroundTranspiler(numberOfThreads: number = 2) {
});

if (commandTypes === null) {
console.log(`[ Background Transpiler ] Unable to fetch command ts lib.
Aborting transpiling...`);
logger.error(`Unable to fetch command ts lib.\nAborting transpiling...`);
return;
}

// only process 'numberOfThreads' worth at a time ex. transpile 2 logics at a time
// This allows for expansion set and sequence expansion to utilize the remaining workers
for (let i = 0; i < expansion_rule.length; i += numberOfThreads) {
await Promise.all(
expansion_rule.slice(i, i + numberOfThreads).map(async expansion => {
expansion_rule.slice(i, i + numberOfThreads).map(async (expansion, j) => {
await promiseThrottler.run(async () => {
// Assuming expansion_rule elements have the same type
if (expansion instanceof Error) {
console.log(`[ Background Transpiler ] Expansion: ${expansion.name} could not be loaded`, expansion);
logger.error(`Expansion: ${expansion.name} could not be loaded`, expansion);
return Promise.reject(`Expansion: ${expansion.name} could not be loaded`);
}

Expand Down Expand Up @@ -126,24 +123,22 @@ export async function backgroundTranspiler(numberOfThreads: number = 2) {

// log error
if (!activitySchema) {
console.log(
`[ Background Transpiler ] Activity schema for ${expansion.activity_type} could not be loaded`,
activitySchema,
);
return Promise.reject('Activity schema for ${expansion.activity_type} could not be loaded');
const msg = `Activity schema for ${expansion.activity_type} could not be loaded`;
logger.error(msg, activitySchema);
return Promise.reject(msg);
}

const activityTypescript = generateTypescriptForGraphQLActivitySchema(activitySchema);

// log error
if (!activityTypescript) {
console.log(
`[ Background Transpiler ] Unable to generate typescript for activity ${expansion.activity_type}`,
activityTypescript,
);
return Promise.reject(`Unable to generate typescript for activity ${expansion.activity_type}`);
const msg = `Unable to generate typescript for activity ${expansion.activity_type}`;
logger.error(msg, activityTypescript);
return Promise.reject(msg);
}

const progress = `(${i + j + 1} of ${expansion_rule.length})`;
logger.info(`Assigning worker to typecheck ${expansion.activity_type} ${progress}`);
const typecheckingResult = (
piscina.run(
{
Expand All @@ -159,7 +154,7 @@ export async function backgroundTranspiler(numberOfThreads: number = 2) {
//Display any errors
typecheckingResult.then(result => {
if (result.isErr()) {
console.log(`Error transpiling ${expansion.activity_type}:\n ${result.unwrapErr().map(e => e.message)}`);
logger.error(`Error transpiling ${expansion.activity_type}:\n ${result.unwrapErr().map(e => e.message)}`);
}
});

Expand Down
4 changes: 4 additions & 0 deletions sequencing-server/src/routes/command-expansion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,10 @@ commandExpansionRouter.post('/expand-all-activity-instances', async (req, res, n
);

const rejectedExpansionResults = settledExpansionResults.filter(isRejected).map(p => p.reason);
if (rejectedExpansionResults.length) {
logger.error(`${rejectedExpansionResults.length} rejected expansion results`);
console.log(rejectedExpansionResults);
}

for (const expansionResult of rejectedExpansionResults) {
logger.error(expansionResult.reason);
Expand Down
20 changes: 15 additions & 5 deletions sequencing-server/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import './polyfills.js';

import vm from 'node:vm';
import * as fs from 'node:fs';
import { threadId } from 'worker_threads';

import ts from 'typescript';
import { CacheItem, UserCodeError, UserCodeRunner } from '@nasa-jpl/aerie-ts-user-code-runner';
Expand All @@ -10,8 +11,12 @@ import type { SimulatedActivity } from './lib/batchLoaders/simulatedActivityBatc
import type { CommandStem } from './lib/codegen/CommandEDSLPreface.js';
import type { SeqJson } from '@nasa-jpl/seq-json-schema/types';
import { deserializeWithTemporal } from './utils/temporalSerializers.js';
import getLogger from './utils/logger.js';
import { Result, SerializedResult } from '@nasa-jpl/aerie-ts-user-code-runner/build/utils/monads.js';

const logger = getLogger(`[ Worker ${threadId} ]`);
logger.info('Starting worker thread...');

const temporalPolyfillTypes = fs.readFileSync(
new URL('../src/types/TemporalPolyfillTypes.ts', import.meta.url).pathname,
'utf8',
Expand Down Expand Up @@ -40,9 +45,7 @@ export async function typecheckExpansion(opts: {
activityTypeName?: string;
}): Promise<SerializedResult<CacheItem, ReturnType<UserCodeError['toJSON']>[]>> {
const startTime = Date.now();
console.log(
`[ Worker ] started transpiling authoring logic ${opts.activityTypeName ? `- ${opts.activityTypeName}` : ''}`,
);
logger.info(`started transpiling authoring logic - ${opts.activityTypeName || 'unknown'}`);

const result = await codeRunner.preProcess(
opts.expansionLogic,
Expand All @@ -60,8 +63,8 @@ export async function typecheckExpansion(opts: {
);

const endTime = Date.now();
console.log(
`[ Worker ] finished transpiling ${opts.activityTypeName ? `- ${opts.activityTypeName}` : ''}, (${
logger.info(
`finished transpiling ${opts.activityTypeName ? `- ${opts.activityTypeName}` : ''}, (${
(endTime - startTime) / 1000
} s)`,
);
Expand Down Expand Up @@ -132,6 +135,9 @@ export async function executeExpansionFromBuildArtifacts(opts: {
}),
);

logger.info(`processing ${activityInstance.activityTypeName} at ${activityInstance.startTime.toLocaleString()}`);
logger.info(`Memory RSS: ${formatMemoryNumber(process.memoryUsage().rss)}`);

if (result.isOk()) {
let commands = result.unwrap();
if (!Array.isArray(commands)) {
Expand All @@ -150,3 +156,7 @@ export async function executeExpansionFromBuildArtifacts(opts: {
return Result.Err(result.unwrapErr().map(err => err.toJSON())).toJSON();
}
}

function formatMemoryNumber(mem: number) {
return `${Math.round((mem / 1024 / 1024) * 100) / 100} MB`;
}

0 comments on commit e548fbc

Please sign in to comment.