Skip to content

Commit

Permalink
Merge pull request #61 from CodeGrade/refactor/more-informative-job-s…
Browse files Browse the repository at this point in the history
…tatus

Refactor: More Informative Job Status
  • Loading branch information
williams-jack authored Aug 2, 2024
2 parents 677c6da + 76a25e0 commit 5b6daca
Show file tree
Hide file tree
Showing 20 changed files with 137 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ export const createOrUpdateImmediateJob = async (

try {
// TODO: Return job id from db operation and in response obj
const jobID = await putJobInQueue(req.body, true);
return res.status(200).json({ message: "OK", jobID });
const status = await putJobInQueue(req.body, true);
return res.status(200).json({ message: "OK", status });
} catch (error) {
console.error(error);
if (error instanceof GradingQueueOperationException) {
Expand All @@ -136,8 +136,8 @@ export const createOrUpdateJob = async (req: Request, res: Response) => {

try {
// TODO: Return job id from db operation and in response obj
const jobID = await putJobInQueue(req.body, false);
return res.status(200).json({ message: "OK", jobID });
const status = await putJobInQueue(req.body, false);
return res.status(200).json({ message: "OK", status });
} catch (err) {
console.error(err);
if (err instanceof GradingQueueOperationException) {
Expand Down Expand Up @@ -191,8 +191,6 @@ export const jobStatus = async (req: Request, res: Response) => {
const jobQueueStatus = await getJobStatus(jobID);
if (!jobQueueStatus) {
return res.json("We could not find the job you're looking for. Please contact a professor or admin.");
} else if (typeof jobQueueStatus === "string") {
return res.json(jobQueueStatus);
}
const { reservation, queuePosition } = jobQueueStatus;
if (reservationWaitingOnRelease(reservation.releaseAt)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { Request, Response } from "express";
import { errorResponse } from "./utils";
import { jobConfigInHoldingPen } from "@codegrade-orca/db";

export const holdingPenStatus = async (req: Request, res: Response) => {
const jobConfigID = parseInt(req.params.jobConfigID);
if (isNaN(jobConfigID)) {
return errorResponse(res, 400, [`Given job config ID ${jobConfigID} is not a number.`]);
}
if (await jobConfigInHoldingPen(jobConfigID)) {
res.json("This job is waiting of a grader image to build.");
} else {
res.json("This job could not be found in a holding pen. Please contact an admin or professor.");
}
}
3 changes: 2 additions & 1 deletion orchestrator/packages/api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { getConfig } from "@codegrade-orca/common";
import express = require("express");
import cors = require("cors");
import dockerImagesRouter from "./routes/docker-images";
import holdingPenRouter from "./routes/holding-pen";

const CONFIG = getConfig();

Expand All @@ -14,7 +15,7 @@ const PORT = process.env.PORT || 4000;
app.use(cors());
app.use(express.json());

app.use("/api/v1", gradingQueueRouter, dockerImagesRouter);
app.use("/api/v1", gradingQueueRouter, dockerImagesRouter, holdingPenRouter);
app.use("/status", (_req, res) => res.json({"message": "ok"}));
app.use("/images", express.static(CONFIG.dockerImageFolder));

Expand Down
8 changes: 8 additions & 0 deletions orchestrator/packages/api/src/routes/holding-pen.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { Router } from "express";
import { holdingPenStatus } from "../controllers/holding-pen-controller";

const holdingPenRouter = Router();

holdingPenRouter.get("/holding_pen/:jobConfigID/status", holdingPenStatus);

export default holdingPenRouter;
1 change: 1 addition & 0 deletions orchestrator/packages/common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { existsSync } from "fs";
export * from "./grading-jobs";
export * from "./types";
export * from "./config";
export * from "./utils";
export { validations };

export const toMilliseconds = (seconds: number) => seconds * 1_000;
Expand Down
1 change: 1 addition & 0 deletions orchestrator/packages/common/src/types/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from "./grading-queue";
export * from "./image-build-service";
export * from "./job-result";
export * from "./job-status";
4 changes: 4 additions & 0 deletions orchestrator/packages/common/src/types/job-status.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export interface JobStatus {
location: "HoldingPen" | "Queue",
id: number,
};
1 change: 1 addition & 0 deletions orchestrator/packages/common/src/utils/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './push-status-update';
12 changes: 12 additions & 0 deletions orchestrator/packages/common/src/utils/push-status-update.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { JobStatus } from "../types";

export const pushStatusUpdate = async (status: JobStatus, responseURL: string, key: string): Promise<void> => {
// NOTE: This is meant to be 'fire-and-forget,' -- we don't care about errors
// beyond logging.
try {
const body = JSON.stringify({ key, status });
await fetch(responseURL, { body, method: 'POST', headers: { 'Content-Type': 'application/json' } });
} catch (err) {
console.error(`Could not POST status update to ${responseURL}; ${err instanceof Error ? err.message : err}`);
}
};
4 changes: 4 additions & 0 deletions orchestrator/packages/db/src/holding-pen-operations/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import prismaInstance from '../prisma-instance';

export const jobConfigInHoldingPen = async (jobConfigID: number): Promise<boolean> =>
Boolean(await prismaInstance.jobConfigAwaitingImage.count({ where: { id: jobConfigID } }));
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { GradingJobConfig } from '@codegrade-orca/common';
import { GradingJobConfig, JobStatus } from '@codegrade-orca/common';
import prismaInstance from '../prisma-instance';
import { createOrUpdateJobWithClient } from '../shared/create-or-update-job';
import { ImageBuildInfo, JobConfigAwaitingImage } from '@prisma/client';


export type CancelJobInfo = Pick<GradingJobConfig, 'response_url'|'key'>;
export type CancelJobInfo = Pick<GradingJobConfig, 'response_url' | 'key'>;
export type EnqueuedJobInfo = JobStatus & Pick<GradingJobConfig, 'response_url' | 'key'>;

const handleCompletedImageBuild = (dockerfileSHASum: string, wasSuccessful: boolean): Promise<Array<CancelJobInfo> | null> => {
const handleCompletedImageBuild = (dockerfileSHASum: string, wasSuccessful: boolean): Promise<Array<CancelJobInfo | EnqueuedJobInfo>> => {
return prismaInstance.$transaction(async (tx) => {
const imageBuildInfo = await tx.imageBuildInfo.findUnique({
where: {
Expand All @@ -16,27 +17,28 @@ const handleCompletedImageBuild = (dockerfileSHASum: string, wasSuccessful: bool
jobConfigs: true
}
}) as ImageBuildInfo & { jobConfigs: Array<JobConfigAwaitingImage> };
let jobInfoForCancellation: Array<CancelJobInfo> | null = null;
let jobInfo: Array<CancelJobInfo | EnqueuedJobInfo> = [];
if (wasSuccessful) {
await Promise.all(
jobInfo = await Promise.all(
imageBuildInfo.jobConfigs.map(async (c) => {
await createOrUpdateJobWithClient(
const status = await createOrUpdateJobWithClient(
c.jobConfig as object as GradingJobConfig,
c.isImmediate, tx
);
return { ...status, response_url: c.clientURL, key: c.clientKey };
})
);
} else {
jobInfoForCancellation = imageBuildInfo.jobConfigs.map(
({clientKey, clientURL}) => ({response_url: clientURL, key: clientKey})
jobInfo = imageBuildInfo.jobConfigs.map(
({ clientKey, clientURL }) => ({ response_url: clientURL, key: clientKey })
);
}
await tx.imageBuildInfo.delete({
where: {
dockerfileSHA: dockerfileSHASum
}
});
return jobInfoForCancellation;
return jobInfo;
});
};

Expand Down
1 change: 1 addition & 0 deletions orchestrator/packages/db/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export * from "./exceptions";
export * from "./server-operations";
export * from "./image-builder-operations";
export * from "./holding-pen-operations";
export * from "./shared";
export * from "./api-key-operations";
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,14 @@ export interface JobQueueStatus {
queuePosition: number
}

const getJobStatus = async (jobID: number): Promise<JobQueueStatus | string | null> =>
const getJobStatus = async (jobID: number): Promise<JobQueueStatus | null> =>
prismaInstance.$transaction(async (tx) => {
const job = await tx.job.findFirst({
where: { id: jobID },
include: { reservation: true }
});
if (!job) {
const jobInHoldingPen = Boolean(await tx.jobConfigAwaitingImage.count(
{ where: { id: jobID } }
));
return jobInHoldingPen ? "This job is waiting on its grader image to be built." : null;
return null;
}
const reservation: Reservation = job.reservation || await getAssociatedReservation(job, tx);
return {
Expand Down
18 changes: 9 additions & 9 deletions orchestrator/packages/db/src/shared/create-or-update-job.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { CollationType, Job, Prisma } from '@prisma/client';
import { GradingJobConfig, imageWithSHAExists, toMilliseconds } from '@codegrade-orca/common';
import { GradingJobConfig, JobStatus, imageWithSHAExists, toMilliseconds } from '@codegrade-orca/common';
import prismaInstance from '../prisma-instance';
import { getAssociatedReservation, immediateJobExists, retireReservationAndJob, submitterJobExists } from '../utils';
import { GradingQueueOperationException } from '../exceptions';
Expand All @@ -25,7 +25,7 @@ export const createOrUpdateJob = (jobConfig: GradingJobConfig, isImmediateJob: b
});
};

export const createOrUpdateJobWithClient = async (jobConfig: GradingJobConfig, isImmediateJob: boolean, tx: Prisma.TransactionClient): Promise<number> => {
export const createOrUpdateJobWithClient = async (jobConfig: GradingJobConfig, isImmediateJob: boolean, tx: Prisma.TransactionClient): Promise<JobStatus> => {
const existingImmediateJob = await immediateJobExists(jobConfig.key, jobConfig.response_url, tx);
const existingSubmitterJob =
await submitterJobExists(jobConfig.key, jobConfig.response_url, tx);
Expand All @@ -41,7 +41,7 @@ export const createOrUpdateJobWithClient = async (jobConfig: GradingJobConfig, i
config: jobConfig as object
}
});
return id;
return { location: 'Queue', id };
}

if (existingSubmitterJob && isImmediateJob) {
Expand All @@ -60,7 +60,7 @@ export const createOrUpdateJobWithClient = async (jobConfig: GradingJobConfig, i
return await (isImmediateJob ? createImmediateJob(jobConfig, tx) : createSubmitterJob(jobConfig, tx));
}

const placeJobInHoldingPen = async (jobConfig: GradingJobConfig, tx: Prisma.TransactionClient): Promise<number> => {
const placeJobInHoldingPen = async (jobConfig: GradingJobConfig, tx: Prisma.TransactionClient): Promise<JobStatus> => {
const { id } = await tx.jobConfigAwaitingImage.create({
data: {
jobConfig: jobConfig as object,
Expand All @@ -69,10 +69,10 @@ const placeJobInHoldingPen = async (jobConfig: GradingJobConfig, tx: Prisma.Tran
imageBuildSHA: jobConfig.grader_image_sha
}
});
return id;
return { location: 'HoldingPen', id };
}

const createImmediateJob = async (jobConfig: GradingJobConfig, tx: Prisma.TransactionClient) => {
const createImmediateJob = async (jobConfig: GradingJobConfig, tx: Prisma.TransactionClient): Promise<JobStatus> => {
const { id } = await tx.job.create({
data: {
clientKey: jobConfig.key,
Expand All @@ -85,10 +85,10 @@ const createImmediateJob = async (jobConfig: GradingJobConfig, tx: Prisma.Transa
jobID: id
}
});
return id;
return { location: 'Queue', id };
};

const createSubmitterJob = async (jobConfig: GradingJobConfig, tx: Prisma.TransactionClient): Promise<number> => {
const createSubmitterJob = async (jobConfig: GradingJobConfig, tx: Prisma.TransactionClient): Promise<JobStatus> => {
const submitter = await tx.submitter.upsert({
where: {
clientURL_collationType_collationID: {
Expand Down Expand Up @@ -122,5 +122,5 @@ const createSubmitterJob = async (jobConfig: GradingJobConfig, tx: Prisma.Transa
}
});

return id;
return { location: 'Queue', id };
};
8 changes: 6 additions & 2 deletions orchestrator/packages/image-build-service/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import {
GraderImageBuildRequest,
toMilliseconds,
isImageBuildResult
isImageBuildResult,
pushStatusUpdate
} from "@codegrade-orca/common";
import { getNextImageBuild, handleCompletedImageBuild } from "@codegrade-orca/db";
import { createAndStoreGraderImage, removeStaleImageFiles } from "./process-request";
import { cleanUpDockerFiles, sendJobResultForBuildFail, removeImageFromDockerIfExists, notifyClientOfBuildResult } from "./utils";
import { EnqueuedJobInfo } from "@codegrade-orca/db/dist/image-builder-operations/handle-completed-image-build";

const LOOP_SLEEP_TIME = 5; // Seconds

Expand All @@ -28,8 +30,10 @@ const main = async () => {
response_url: nextBuildReq.responseURL,
};
const result = await createAndStoreGraderImage(infoAsBuildReq);
await handleCompletedImageBuild(nextBuildReq.dockerfileSHA, true);
// When success is passed as true, we get EnqueuedJobInfo[].
const jobInfo = await handleCompletedImageBuild(nextBuildReq.dockerfileSHA, true) as EnqueuedJobInfo[];
await notifyClientOfBuildResult(result, infoAsBuildReq);
await Promise.all(jobInfo.map(({ key, response_url, ...status }) => pushStatusUpdate(status, response_url, key)));
console.info(`Successfully built image with SHA ${nextBuildReq.dockerfileSHA}.`);
} catch (err) {
if (isImageBuildResult(err) && infoAsBuildReq) {
Expand Down
33 changes: 25 additions & 8 deletions worker/orca_grader/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
from orca_grader.executor.builder.docker_grading_job_executor_builder import DockerGradingJobExecutorBuilder
from orca_grader.executor.builder.grading_job_executor_builder import GradingJobExecutorBuilder
from orca_grader.job_retrieval.local.local_grading_job_retriever import LocalGradingJobRetriever
from orca_grader.job_retrieval.postgres.grading_job_retriever import PostgresGradingJobRetirever
from orca_grader.job_retrieval.postgres.grading_job_retriever import PostgresGradingJobRetriever
from orca_grader.docker_utils.images.utils import does_image_exist_locally
from orca_grader.docker_utils.images.image_loading import retrieve_image_tgz_from_url, load_image_from_tgz
from orca_grader.job_termination.nonblocking_thread_executor import NonBlockingThreadPoolExecutor
from orca_grader.job_termination.stop_worker import GracefulKiller
from orca_grader.validations.exceptions import InvalidGradingJobJSONException
from orca_grader.validations.grading_job import is_valid_grading_job_json
from orca_grader.common.services.push_status import post_job_status_to_client

CONTAINER_WORKING_DIR = '/home/orca-grader'

Expand All @@ -36,7 +37,7 @@ def run_local_job(job_path: str, no_container: bool,

def process_jobs_from_db(no_container: bool,
container_command: List[str] | None):
retriever = PostgresGradingJobRetirever()
retriever = PostgresGradingJobRetriever()
with GracefulKiller() as killer:
with NonBlockingThreadPoolExecutor(max_workers=2) as futures_executor:
stop_future = futures_executor.submit(killer.wait_for_stop_signal)
Expand All @@ -52,11 +53,12 @@ def process_jobs_from_db(no_container: bool,
if job_retrieval_future.exception() is None:
grading_job = job_retrieval_future.result()
if grading_job is not None:
reenqueue_job(grading_job)
updated_db_id = reenqueue_job(grading_job)
inform_client_of_reenqueue(grading_job,
updated_db_id)

if job_retrieval_future in done:
if job_retrieval_future.exception():
# TODO: replace with log statement.
print(job_retrieval_future.exception())
time.sleep(1)
continue
Expand All @@ -68,7 +70,14 @@ def process_jobs_from_db(no_container: bool,
if stop_future in done:
break

print(f"Pulled job with key {grading_job['key']} and url {grading_job['response_url']}")
post_job_status_to_client(
location="Worker",
response_url=grading_job['response_url'],
key=grading_job['key']
)
print(
f"Pulled job with key {grading_job['key']} and url {grading_job['response_url']}"
)

job_execution_future = futures_executor.submit(
run_grading_job, grading_job, no_container, container_command)
Expand All @@ -79,7 +88,8 @@ def process_jobs_from_db(no_container: bool,
# 2. Stop future not done, job future done.
# 3. Stop future done, job future done.
if stop_future in done and job_execution_future in not_done:
reenqueue_job(grading_job)
updated_db_id = reenqueue_job(grading_job)
inform_client_of_reenqueue(grading_job, updated_db_id)

if job_execution_future in done:
if type(job_execution_future.exception()) == InvalidWorkerStateException:
Expand Down Expand Up @@ -114,8 +124,8 @@ def run_grading_job(grading_job: GradingJobJSON, no_container: bool,
except Exception as e:
print(e)
if type(e) == CalledProcessError:
print(e.stdout)
print(e.stderr)
print(e.stdout)
print(e.stderr)
if "response_url" in grading_job:
push_results_with_exception(grading_job, e)
else:
Expand Down Expand Up @@ -164,6 +174,13 @@ def handle_grading_job(grading_job: GradingJobJSON, container_sha: str | None =
print(result.stderr.decode())


def inform_client_of_reenqueue(grading_job: GradingJobJSON,
updated_db_id: int) -> None:
post_job_status_to_client(location="Queue",
response_url=grading_job["response_url"],
key=grading_job["key"])


def can_execute_job(grading_job: GradingJobJSON) -> bool:
try:
return is_valid_grading_job_json(grading_job)
Expand Down
Loading

0 comments on commit 5b6daca

Please sign in to comment.