From dad26ebc459976d8177c0771a9d55d6c7d48a77e Mon Sep 17 00:00:00 2001 From: williams-jack Date: Thu, 1 Aug 2024 22:47:38 -0400 Subject: [PATCH] feat: more useful job status info Communicate location and (possibly) db id of job to client. chore: typo in job retriever class --- .../controllers/grading-queue-controller.ts | 8 ++--- orchestrator/packages/common/src/index.ts | 1 + .../packages/common/src/types/index.ts | 1 + .../packages/common/src/types/job-status.ts | 4 +++ .../packages/common/src/utils/index.ts | 1 + .../common/src/utils/push-status-update.ts | 12 +++++++ .../handle-completed-image-build.ts | 20 ++++++----- .../db/src/shared/create-or-update-job.ts | 18 +++++----- .../packages/image-build-service/src/index.ts | 8 +++-- worker/orca_grader/__main__.py | 33 ++++++++++++++----- .../common/services/push_status.py | 13 ++++++++ worker/orca_grader/db/operations.py | 5 +-- .../postgres/grading_job_retriever.py | 2 +- 13 files changed, 91 insertions(+), 35 deletions(-) create mode 100644 orchestrator/packages/common/src/types/job-status.ts create mode 100644 orchestrator/packages/common/src/utils/index.ts create mode 100644 orchestrator/packages/common/src/utils/push-status-update.ts create mode 100644 worker/orca_grader/common/services/push_status.py diff --git a/orchestrator/packages/api/src/controllers/grading-queue-controller.ts b/orchestrator/packages/api/src/controllers/grading-queue-controller.ts index 8a64436c..291dcfcf 100644 --- a/orchestrator/packages/api/src/controllers/grading-queue-controller.ts +++ b/orchestrator/packages/api/src/controllers/grading-queue-controller.ts @@ -115,8 +115,8 @@ export const createOrUpdateImmediateJob = async ( try { // TODO: Return job id from db operation and in response obj - const jobID = await putJobInQueue(req.body, true); - return res.status(200).json({ message: "OK", jobID }); + const status = await putJobInQueue(req.body, true); + return res.status(200).json({ message: "OK", status }); } catch (error) { console.error(error); if (error instanceof GradingQueueOperationException) { @@ -136,8 +136,8 @@ export const createOrUpdateJob = async (req: Request, res: Response) => { try { // TODO: Return job id from db operation and in response obj - const jobID = await putJobInQueue(req.body, false); - return res.status(200).json({ message: "OK", jobID }); + const status = await putJobInQueue(req.body, false); + return res.status(200).json({ message: "OK", status }); } catch (err) { console.error(err); if (err instanceof GradingQueueOperationException) { diff --git a/orchestrator/packages/common/src/index.ts b/orchestrator/packages/common/src/index.ts index 0c28c0ed..2ce785d2 100644 --- a/orchestrator/packages/common/src/index.ts +++ b/orchestrator/packages/common/src/index.ts @@ -6,6 +6,7 @@ import { existsSync } from "fs"; export * from "./grading-jobs"; export * from "./types"; export * from "./config"; +export * from "./utils"; export { validations }; export const toMilliseconds = (seconds: number) => seconds * 1_000; diff --git a/orchestrator/packages/common/src/types/index.ts b/orchestrator/packages/common/src/types/index.ts index 4b97bf8b..eaccd89b 100644 --- a/orchestrator/packages/common/src/types/index.ts +++ b/orchestrator/packages/common/src/types/index.ts @@ -1,3 +1,4 @@ export * from "./grading-queue"; export * from "./image-build-service"; export * from "./job-result"; +export * from "./job-status"; diff --git a/orchestrator/packages/common/src/types/job-status.ts b/orchestrator/packages/common/src/types/job-status.ts new file mode 100644 index 00000000..d8240809 --- /dev/null +++ b/orchestrator/packages/common/src/types/job-status.ts @@ -0,0 +1,4 @@ +export interface JobStatus { + location: "HoldingPen" | "Queue", + id: number, +}; diff --git a/orchestrator/packages/common/src/utils/index.ts b/orchestrator/packages/common/src/utils/index.ts new file mode 100644 index 00000000..cedb287b --- /dev/null +++ b/orchestrator/packages/common/src/utils/index.ts @@ -0,0 +1 @@ +export * from './push-status-update'; diff --git a/orchestrator/packages/common/src/utils/push-status-update.ts b/orchestrator/packages/common/src/utils/push-status-update.ts new file mode 100644 index 00000000..6faaf16a --- /dev/null +++ b/orchestrator/packages/common/src/utils/push-status-update.ts @@ -0,0 +1,12 @@ +import { JobStatus } from "../types"; + +export const pushStatusUpdate = async (status: JobStatus, responseURL: string, key: string): Promise => { + // NOTE: This is meant to be 'fire-and-forget,' -- we don't care about errors + // beyond logging. + try { + const body = JSON.stringify({ key, status }); + await fetch(responseURL, { body, method: 'POST', headers: { 'Content-Type': 'application/json' } }); + } catch (err) { + console.error(`Could not POST status update to ${responseURL}; ${err instanceof Error ? err.message : err}`); + } +}; diff --git a/orchestrator/packages/db/src/image-builder-operations/handle-completed-image-build.ts b/orchestrator/packages/db/src/image-builder-operations/handle-completed-image-build.ts index 72059820..b5fb2578 100644 --- a/orchestrator/packages/db/src/image-builder-operations/handle-completed-image-build.ts +++ b/orchestrator/packages/db/src/image-builder-operations/handle-completed-image-build.ts @@ -1,12 +1,13 @@ -import { GradingJobConfig } from '@codegrade-orca/common'; +import { GradingJobConfig, JobStatus } from '@codegrade-orca/common'; import prismaInstance from '../prisma-instance'; import { createOrUpdateJobWithClient } from '../shared/create-or-update-job'; import { ImageBuildInfo, JobConfigAwaitingImage } from '@prisma/client'; -export type CancelJobInfo = Pick; +export type CancelJobInfo = Pick; +export type EnqueuedJobInfo = JobStatus & Pick; -const handleCompletedImageBuild = (dockerfileSHASum: string, wasSuccessful: boolean): Promise | null> => { +const handleCompletedImageBuild = (dockerfileSHASum: string, wasSuccessful: boolean): Promise> => { return prismaInstance.$transaction(async (tx) => { const imageBuildInfo = await tx.imageBuildInfo.findUnique({ where: { @@ -16,19 +17,20 @@ const handleCompletedImageBuild = (dockerfileSHASum: string, wasSuccessful: bool jobConfigs: true } }) as ImageBuildInfo & { jobConfigs: Array }; - let jobInfoForCancellation: Array | null = null; + let jobInfo: Array = []; if (wasSuccessful) { - await Promise.all( + jobInfo = await Promise.all( imageBuildInfo.jobConfigs.map(async (c) => { - await createOrUpdateJobWithClient( + const status = await createOrUpdateJobWithClient( c.jobConfig as object as GradingJobConfig, c.isImmediate, tx ); + return { ...status, response_url: c.clientURL, key: c.clientKey }; }) ); } else { - jobInfoForCancellation = imageBuildInfo.jobConfigs.map( - ({clientKey, clientURL}) => ({response_url: clientURL, key: clientKey}) + jobInfo = imageBuildInfo.jobConfigs.map( + ({ clientKey, clientURL }) => ({ response_url: clientURL, key: clientKey }) ); } await tx.imageBuildInfo.delete({ @@ -36,7 +38,7 @@ const handleCompletedImageBuild = (dockerfileSHASum: string, wasSuccessful: bool dockerfileSHA: dockerfileSHASum } }); - return jobInfoForCancellation; + return jobInfo; }); }; diff --git a/orchestrator/packages/db/src/shared/create-or-update-job.ts b/orchestrator/packages/db/src/shared/create-or-update-job.ts index 2be509f9..740432be 100644 --- a/orchestrator/packages/db/src/shared/create-or-update-job.ts +++ b/orchestrator/packages/db/src/shared/create-or-update-job.ts @@ -1,5 +1,5 @@ import { CollationType, Job, Prisma } from '@prisma/client'; -import { GradingJobConfig, imageWithSHAExists, toMilliseconds } from '@codegrade-orca/common'; +import { GradingJobConfig, JobStatus, imageWithSHAExists, toMilliseconds } from '@codegrade-orca/common'; import prismaInstance from '../prisma-instance'; import { getAssociatedReservation, immediateJobExists, retireReservationAndJob, submitterJobExists } from '../utils'; import { GradingQueueOperationException } from '../exceptions'; @@ -25,7 +25,7 @@ export const createOrUpdateJob = (jobConfig: GradingJobConfig, isImmediateJob: b }); }; -export const createOrUpdateJobWithClient = async (jobConfig: GradingJobConfig, isImmediateJob: boolean, tx: Prisma.TransactionClient): Promise => { +export const createOrUpdateJobWithClient = async (jobConfig: GradingJobConfig, isImmediateJob: boolean, tx: Prisma.TransactionClient): Promise => { const existingImmediateJob = await immediateJobExists(jobConfig.key, jobConfig.response_url, tx); const existingSubmitterJob = await submitterJobExists(jobConfig.key, jobConfig.response_url, tx); @@ -41,7 +41,7 @@ export const createOrUpdateJobWithClient = async (jobConfig: GradingJobConfig, i config: jobConfig as object } }); - return id; + return { location: 'Queue', id }; } if (existingSubmitterJob && isImmediateJob) { @@ -60,7 +60,7 @@ export const createOrUpdateJobWithClient = async (jobConfig: GradingJobConfig, i return await (isImmediateJob ? createImmediateJob(jobConfig, tx) : createSubmitterJob(jobConfig, tx)); } -const placeJobInHoldingPen = async (jobConfig: GradingJobConfig, tx: Prisma.TransactionClient): Promise => { +const placeJobInHoldingPen = async (jobConfig: GradingJobConfig, tx: Prisma.TransactionClient): Promise => { const { id } = await tx.jobConfigAwaitingImage.create({ data: { jobConfig: jobConfig as object, @@ -69,10 +69,10 @@ const placeJobInHoldingPen = async (jobConfig: GradingJobConfig, tx: Prisma.Tran imageBuildSHA: jobConfig.grader_image_sha } }); - return id; + return { location: 'HoldingPen', id }; } -const createImmediateJob = async (jobConfig: GradingJobConfig, tx: Prisma.TransactionClient) => { +const createImmediateJob = async (jobConfig: GradingJobConfig, tx: Prisma.TransactionClient): Promise => { const { id } = await tx.job.create({ data: { clientKey: jobConfig.key, @@ -85,10 +85,10 @@ const createImmediateJob = async (jobConfig: GradingJobConfig, tx: Prisma.Transa jobID: id } }); - return id; + return { location: 'Queue', id }; }; -const createSubmitterJob = async (jobConfig: GradingJobConfig, tx: Prisma.TransactionClient): Promise => { +const createSubmitterJob = async (jobConfig: GradingJobConfig, tx: Prisma.TransactionClient): Promise => { const submitter = await tx.submitter.upsert({ where: { clientURL_collationType_collationID: { @@ -122,5 +122,5 @@ const createSubmitterJob = async (jobConfig: GradingJobConfig, tx: Prisma.Transa } }); - return id; + return { location: 'Queue', id }; }; diff --git a/orchestrator/packages/image-build-service/src/index.ts b/orchestrator/packages/image-build-service/src/index.ts index f87f00fb..2218c1b3 100644 --- a/orchestrator/packages/image-build-service/src/index.ts +++ b/orchestrator/packages/image-build-service/src/index.ts @@ -1,11 +1,13 @@ import { GraderImageBuildRequest, toMilliseconds, - isImageBuildResult + isImageBuildResult, + pushStatusUpdate } from "@codegrade-orca/common"; import { getNextImageBuild, handleCompletedImageBuild } from "@codegrade-orca/db"; import { createAndStoreGraderImage, removeStaleImageFiles } from "./process-request"; import { cleanUpDockerFiles, sendJobResultForBuildFail, removeImageFromDockerIfExists, notifyClientOfBuildResult } from "./utils"; +import { EnqueuedJobInfo } from "@codegrade-orca/db/dist/image-builder-operations/handle-completed-image-build"; const LOOP_SLEEP_TIME = 5; // Seconds @@ -28,8 +30,10 @@ const main = async () => { response_url: nextBuildReq.responseURL, }; const result = await createAndStoreGraderImage(infoAsBuildReq); - await handleCompletedImageBuild(nextBuildReq.dockerfileSHA, true); + // When success is passed as true, we get EnqueuedJobInfo[]. + const jobInfo = await handleCompletedImageBuild(nextBuildReq.dockerfileSHA, true) as EnqueuedJobInfo[]; await notifyClientOfBuildResult(result, infoAsBuildReq); + await Promise.all(jobInfo.map(({ key, response_url, ...status }) => pushStatusUpdate(status, response_url, key))); console.info(`Successfully built image with SHA ${nextBuildReq.dockerfileSHA}.`); } catch (err) { if (isImageBuildResult(err) && infoAsBuildReq) { diff --git a/worker/orca_grader/__main__.py b/worker/orca_grader/__main__.py index cf38404c..d5e0a86e 100644 --- a/worker/orca_grader/__main__.py +++ b/worker/orca_grader/__main__.py @@ -15,13 +15,14 @@ from orca_grader.executor.builder.docker_grading_job_executor_builder import DockerGradingJobExecutorBuilder from orca_grader.executor.builder.grading_job_executor_builder import GradingJobExecutorBuilder from orca_grader.job_retrieval.local.local_grading_job_retriever import LocalGradingJobRetriever -from orca_grader.job_retrieval.postgres.grading_job_retriever import PostgresGradingJobRetirever +from orca_grader.job_retrieval.postgres.grading_job_retriever import PostgresGradingJobRetriever from orca_grader.docker_utils.images.utils import does_image_exist_locally from orca_grader.docker_utils.images.image_loading import retrieve_image_tgz_from_url, load_image_from_tgz from orca_grader.job_termination.nonblocking_thread_executor import NonBlockingThreadPoolExecutor from orca_grader.job_termination.stop_worker import GracefulKiller from orca_grader.validations.exceptions import InvalidGradingJobJSONException from orca_grader.validations.grading_job import is_valid_grading_job_json +from orca_grader.common.services.push_status import post_job_status_to_client CONTAINER_WORKING_DIR = '/home/orca-grader' @@ -36,7 +37,7 @@ def run_local_job(job_path: str, no_container: bool, def process_jobs_from_db(no_container: bool, container_command: List[str] | None): - retriever = PostgresGradingJobRetirever() + retriever = PostgresGradingJobRetriever() with GracefulKiller() as killer: with NonBlockingThreadPoolExecutor(max_workers=2) as futures_executor: stop_future = futures_executor.submit(killer.wait_for_stop_signal) @@ -52,11 +53,12 @@ def process_jobs_from_db(no_container: bool, if job_retrieval_future.exception() is None: grading_job = job_retrieval_future.result() if grading_job is not None: - reenqueue_job(grading_job) + updated_db_id = reenqueue_job(grading_job) + inform_client_of_reenqueue(grading_job, + updated_db_id) if job_retrieval_future in done: if job_retrieval_future.exception(): - # TODO: replace with log statement. print(job_retrieval_future.exception()) time.sleep(1) continue @@ -68,7 +70,14 @@ def process_jobs_from_db(no_container: bool, if stop_future in done: break - print(f"Pulled job with key {grading_job['key']} and url {grading_job['response_url']}") + post_job_status_to_client( + location="Worker", + response_url=grading_job['response_url'], + key=grading_job['key'] + ) + print( + f"Pulled job with key {grading_job['key']} and url {grading_job['response_url']}" + ) job_execution_future = futures_executor.submit( run_grading_job, grading_job, no_container, container_command) @@ -79,7 +88,8 @@ def process_jobs_from_db(no_container: bool, # 2. Stop future not done, job future done. # 3. Stop future done, job future done. if stop_future in done and job_execution_future in not_done: - reenqueue_job(grading_job) + updated_db_id = reenqueue_job(grading_job) + inform_client_of_reenqueue(grading_job, updated_db_id) if job_execution_future in done: if type(job_execution_future.exception()) == InvalidWorkerStateException: @@ -114,8 +124,8 @@ def run_grading_job(grading_job: GradingJobJSON, no_container: bool, except Exception as e: print(e) if type(e) == CalledProcessError: - print(e.stdout) - print(e.stderr) + print(e.stdout) + print(e.stderr) if "response_url" in grading_job: push_results_with_exception(grading_job, e) else: @@ -164,6 +174,13 @@ def handle_grading_job(grading_job: GradingJobJSON, container_sha: str | None = print(result.stderr.decode()) +def inform_client_of_reenqueue(grading_job: GradingJobJSON, + updated_db_id: int) -> None: + post_job_status_to_client(location="Queue", + response_url=grading_job["response_url"], + key=grading_job["key"]) + + def can_execute_job(grading_job: GradingJobJSON) -> bool: try: return is_valid_grading_job_json(grading_job) diff --git a/worker/orca_grader/common/services/push_status.py b/worker/orca_grader/common/services/push_status.py new file mode 100644 index 00000000..19db4b99 --- /dev/null +++ b/worker/orca_grader/common/services/push_status.py @@ -0,0 +1,13 @@ +import requests +from typing import Optional + + +def post_job_status_to_client(location: str, response_url: str, + key: str, id: Optional[int] = None) -> None: + try: + status = {"location": location} + if id is not None: + status["id"] = id + requests.post(response_url, json=status) + except Exception as e: + print(e) diff --git a/worker/orca_grader/db/operations.py b/worker/orca_grader/db/operations.py index 93896763..12087169 100644 --- a/worker/orca_grader/db/operations.py +++ b/worker/orca_grader/db/operations.py @@ -67,10 +67,10 @@ def reenqueue_job(grading_job: GradingJobJSON): grading_job["client_url"] ) __delete_more_recent_job_queue_info(session, more_recent_job) - __create_immediate_job(session, {**grading_job, + return __create_immediate_job(session, {**grading_job, **more_recent_job.config}) else: - __create_immediate_job(session, grading_job) + return __create_immediate_job(session, grading_job) def __create_immediate_job(session: Session, grading_job: GradingJobJSON): @@ -88,6 +88,7 @@ def __create_immediate_job(session: Session, grading_job: GradingJobJSON): insert(Reservation) .values(job_id=inserted_job_id, release_at=grading_job["release_at"]) ) + return inserted_job_id def __omit(d: Dict[str, Any], keys: Set[str]) -> Dict[str, Any]: diff --git a/worker/orca_grader/job_retrieval/postgres/grading_job_retriever.py b/worker/orca_grader/job_retrieval/postgres/grading_job_retriever.py index e6e82791..e2f0ca76 100644 --- a/worker/orca_grader/job_retrieval/postgres/grading_job_retriever.py +++ b/worker/orca_grader/job_retrieval/postgres/grading_job_retriever.py @@ -4,7 +4,7 @@ from orca_grader.job_retrieval.grading_job_retriever import GradingJobRetriever -class PostgresGradingJobRetirever(GradingJobRetriever): +class PostgresGradingJobRetriever(GradingJobRetriever): def retrieve_grading_job(self) -> Optional[GradingJobJSON]: return get_next_job()