Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better logging for sequencing server command expansion #1479

Merged
merged 2 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sequencing-server/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const logger = getLogger('app');

const PORT: number = parseInt(getEnv().PORT, 10) ?? 27184;

logger.info(`Starting sequencing-server app on Node v${process.versions.node}...`);

const app: Application = express();
// WARNING: bodyParser.json() is vulnerable to a string too long issue. Iff that occurs,
// we should switch to a stream parser like https://www.npmjs.com/package/stream-json
Expand Down
43 changes: 19 additions & 24 deletions sequencing-server/src/backgroundTranspiler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import type { typecheckExpansion } from './worker';
import { Result } from '@nasa-jpl/aerie-ts-user-code-runner/build/utils/monads.js';
import { getLatestParcel, getLatestMissionModel, getExpansionRule } from './utils/hasura.js';
import type { Parcel } from './lib/batchLoaders/parcelBatchLoader.js';
import getLogger from './utils/logger.js';

const logger = getLogger('[ Background Transpiler ]');

export async function backgroundTranspiler(numberOfThreads: number = 2) {
if (graphqlClient === null) {
Expand All @@ -25,9 +28,7 @@ export async function backgroundTranspiler(numberOfThreads: number = 2) {
},
} = await getLatestMissionModel(graphqlClient);
if (!missionModelId) {
console.log(
'[ Background Transpiler ] Unable to fetch the latest mission model. Aborting background transpiling...',
);
logger.warn('Unable to fetch the latest mission model. Aborting background transpiling...');
return;
}

Expand All @@ -40,16 +41,14 @@ export async function backgroundTranspiler(numberOfThreads: number = 2) {
},
} = await getLatestParcel(graphqlClient);
if (!parcelID) {
console.log(
'[ Background Transpiler ] Unable to fetch the latest command dictionary. Aborting background transpiling...',
);
logger.warn('Unable to fetch the latest command dictionary. Aborting background transpiling...');
return;
}

const { expansion_rule } = await getExpansionRule(graphqlClient, missionModelId, parcelID);

if (expansion_rule === null || expansion_rule.length === 0) {
console.log(`[ Background Transpiler ] No expansion rules to transpile.`);
logger.info(`No expansion rules to transpile.`);
return;
}

Expand All @@ -71,8 +70,7 @@ export async function backgroundTranspiler(numberOfThreads: number = 2) {
});

if (parcel === null) {
console.log(`[ Background Transpiler ] Unable to fetch parcel.
Aborting transpiling...`);
logger.error(`Unable to fetch parcel.\nAborting transpiling...`);
return;
}

Expand All @@ -81,20 +79,19 @@ export async function backgroundTranspiler(numberOfThreads: number = 2) {
});

if (commandTypes === null) {
console.log(`[ Background Transpiler ] Unable to fetch command ts lib.
Aborting transpiling...`);
logger.error(`Unable to fetch command ts lib.\nAborting transpiling...`);
return;
}

// only process 'numberOfThreads' worth at a time ex. transpile 2 logics at a time
// This allows for expansion set and sequence expansion to utilize the remaining workers
for (let i = 0; i < expansion_rule.length; i += numberOfThreads) {
await Promise.all(
expansion_rule.slice(i, i + numberOfThreads).map(async expansion => {
expansion_rule.slice(i, i + numberOfThreads).map(async (expansion, j) => {
await promiseThrottler.run(async () => {
// Assuming expansion_rule elements have the same type
if (expansion instanceof Error) {
console.log(`[ Background Transpiler ] Expansion: ${expansion.name} could not be loaded`, expansion);
logger.error(`Expansion: ${expansion.name} could not be loaded`, expansion);
return Promise.reject(`Expansion: ${expansion.name} could not be loaded`);
}

Expand Down Expand Up @@ -126,24 +123,22 @@ export async function backgroundTranspiler(numberOfThreads: number = 2) {

// log error
if (!activitySchema) {
console.log(
`[ Background Transpiler ] Activity schema for ${expansion.activity_type} could not be loaded`,
activitySchema,
);
return Promise.reject('Activity schema for ${expansion.activity_type} could not be loaded');
const msg = `Activity schema for ${expansion.activity_type} could not be loaded`;
logger.error(msg, activitySchema);
return Promise.reject(msg);
}

const activityTypescript = generateTypescriptForGraphQLActivitySchema(activitySchema);

// log error
if (!activityTypescript) {
console.log(
`[ Background Transpiler ] Unable to generate typescript for activity ${expansion.activity_type}`,
activityTypescript,
);
return Promise.reject(`Unable to generate typescript for activity ${expansion.activity_type}`);
const msg = `Unable to generate typescript for activity ${expansion.activity_type}`;
logger.error(msg, activityTypescript);
return Promise.reject(msg);
}

const progress = `(${i + j + 1} of ${expansion_rule.length})`;
logger.info(`Assigning worker to typecheck ${expansion.activity_type} ${progress}`);
const typecheckingResult = (
piscina.run(
{
Expand All @@ -159,7 +154,7 @@ export async function backgroundTranspiler(numberOfThreads: number = 2) {
//Display any errors
typecheckingResult.then(result => {
if (result.isErr()) {
console.log(`Error transpiling ${expansion.activity_type}:\n ${result.unwrapErr().map(e => e.message)}`);
logger.error(`Error transpiling ${expansion.activity_type}:\n ${result.unwrapErr().map(e => e.message)}`);
}
});

Expand Down
4 changes: 4 additions & 0 deletions sequencing-server/src/routes/command-expansion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,10 @@ commandExpansionRouter.post('/expand-all-activity-instances', async (req, res, n
);

const rejectedExpansionResults = settledExpansionResults.filter(isRejected).map(p => p.reason);
if (rejectedExpansionResults.length) {
logger.error(`${rejectedExpansionResults.length} rejected expansion results`);
console.log(rejectedExpansionResults);
}

for (const expansionResult of rejectedExpansionResults) {
logger.error(expansionResult.reason);
Expand Down
20 changes: 15 additions & 5 deletions sequencing-server/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import './polyfills.js';

import vm from 'node:vm';
import * as fs from 'node:fs';
import { threadId } from 'worker_threads';

import ts from 'typescript';
import { CacheItem, UserCodeError, UserCodeRunner } from '@nasa-jpl/aerie-ts-user-code-runner';
Expand All @@ -10,8 +11,12 @@ import type { SimulatedActivity } from './lib/batchLoaders/simulatedActivityBatc
import type { CommandStem } from './lib/codegen/CommandEDSLPreface.js';
import type { SeqJson } from '@nasa-jpl/seq-json-schema/types';
import { deserializeWithTemporal } from './utils/temporalSerializers.js';
import getLogger from './utils/logger.js';
import { Result, SerializedResult } from '@nasa-jpl/aerie-ts-user-code-runner/build/utils/monads.js';

const logger = getLogger(`[ Worker ${threadId} ]`);
logger.info('Starting worker thread...');

const temporalPolyfillTypes = fs.readFileSync(
new URL('../src/types/TemporalPolyfillTypes.ts', import.meta.url).pathname,
'utf8',
Expand Down Expand Up @@ -40,9 +45,7 @@ export async function typecheckExpansion(opts: {
activityTypeName?: string;
}): Promise<SerializedResult<CacheItem, ReturnType<UserCodeError['toJSON']>[]>> {
const startTime = Date.now();
console.log(
`[ Worker ] started transpiling authoring logic ${opts.activityTypeName ? `- ${opts.activityTypeName}` : ''}`,
);
logger.info(`started transpiling authoring logic - ${opts.activityTypeName || 'unknown'}`);

const result = await codeRunner.preProcess(
opts.expansionLogic,
Expand All @@ -60,8 +63,8 @@ export async function typecheckExpansion(opts: {
);

const endTime = Date.now();
console.log(
`[ Worker ] finished transpiling ${opts.activityTypeName ? `- ${opts.activityTypeName}` : ''}, (${
logger.info(
`finished transpiling ${opts.activityTypeName ? `- ${opts.activityTypeName}` : ''}, (${
(endTime - startTime) / 1000
} s)`,
);
Expand Down Expand Up @@ -132,6 +135,9 @@ export async function executeExpansionFromBuildArtifacts(opts: {
}),
);

logger.info(`processing ${activityInstance.activityTypeName} at ${activityInstance.startTime.toLocaleString()}`);
logger.info(`Memory RSS: ${formatMemoryNumber(process.memoryUsage().rss)}`);

if (result.isOk()) {
let commands = result.unwrap();
if (!Array.isArray(commands)) {
Expand All @@ -150,3 +156,7 @@ export async function executeExpansionFromBuildArtifacts(opts: {
return Result.Err(result.unwrapErr().map(err => err.toJSON())).toJSON();
}
}

function formatMemoryNumber(mem: number) {
return `${Math.round((mem / 1024 / 1024) * 100) / 100} MB`;
}
Loading