Skip to content

Commit

Permalink
feat: more useful job status info
Browse files Browse the repository at this point in the history
Communicate location and (possibly) db id of job to client.

chore: typo in job retriever class
  • Loading branch information
williams-jack committed Aug 2, 2024
1 parent 6c09aa3 commit dad26eb
Show file tree
Hide file tree
Showing 13 changed files with 91 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ export const createOrUpdateImmediateJob = async (

try {
// TODO: Return job id from db operation and in response obj
const jobID = await putJobInQueue(req.body, true);
return res.status(200).json({ message: "OK", jobID });
const status = await putJobInQueue(req.body, true);
return res.status(200).json({ message: "OK", status });
} catch (error) {
console.error(error);
if (error instanceof GradingQueueOperationException) {
Expand All @@ -136,8 +136,8 @@ export const createOrUpdateJob = async (req: Request, res: Response) => {

try {
// TODO: Return job id from db operation and in response obj
const jobID = await putJobInQueue(req.body, false);
return res.status(200).json({ message: "OK", jobID });
const status = await putJobInQueue(req.body, false);
return res.status(200).json({ message: "OK", status });
} catch (err) {
console.error(err);
if (err instanceof GradingQueueOperationException) {
Expand Down
1 change: 1 addition & 0 deletions orchestrator/packages/common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { existsSync } from "fs";
export * from "./grading-jobs";
export * from "./types";
export * from "./config";
export * from "./utils";
export { validations };

export const toMilliseconds = (seconds: number) => seconds * 1_000;
Expand Down
1 change: 1 addition & 0 deletions orchestrator/packages/common/src/types/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from "./grading-queue";
export * from "./image-build-service";
export * from "./job-result";
export * from "./job-status";
4 changes: 4 additions & 0 deletions orchestrator/packages/common/src/types/job-status.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export interface JobStatus {
location: "HoldingPen" | "Queue",
id: number,
};
1 change: 1 addition & 0 deletions orchestrator/packages/common/src/utils/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './push-status-update';
12 changes: 12 additions & 0 deletions orchestrator/packages/common/src/utils/push-status-update.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { JobStatus } from "../types";

export const pushStatusUpdate = async (status: JobStatus, responseURL: string, key: string): Promise<void> => {
// NOTE: This is meant to be 'fire-and-forget,' -- we don't care about errors
// beyond logging.
try {
const body = JSON.stringify({ key, status });
await fetch(responseURL, { body, method: 'POST', headers: { 'Content-Type': 'application/json' } });
} catch (err) {
console.error(`Could not POST status update to ${responseURL}; ${err instanceof Error ? err.message : err}`);
}
};
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { GradingJobConfig } from '@codegrade-orca/common';
import { GradingJobConfig, JobStatus } from '@codegrade-orca/common';
import prismaInstance from '../prisma-instance';
import { createOrUpdateJobWithClient } from '../shared/create-or-update-job';
import { ImageBuildInfo, JobConfigAwaitingImage } from '@prisma/client';


export type CancelJobInfo = Pick<GradingJobConfig, 'response_url'|'key'>;
export type CancelJobInfo = Pick<GradingJobConfig, 'response_url' | 'key'>;
export type EnqueuedJobInfo = JobStatus & Pick<GradingJobConfig, 'response_url' | 'key'>;

const handleCompletedImageBuild = (dockerfileSHASum: string, wasSuccessful: boolean): Promise<Array<CancelJobInfo> | null> => {
const handleCompletedImageBuild = (dockerfileSHASum: string, wasSuccessful: boolean): Promise<Array<CancelJobInfo | EnqueuedJobInfo>> => {
return prismaInstance.$transaction(async (tx) => {
const imageBuildInfo = await tx.imageBuildInfo.findUnique({
where: {
Expand All @@ -16,27 +17,28 @@ const handleCompletedImageBuild = (dockerfileSHASum: string, wasSuccessful: bool
jobConfigs: true
}
}) as ImageBuildInfo & { jobConfigs: Array<JobConfigAwaitingImage> };
let jobInfoForCancellation: Array<CancelJobInfo> | null = null;
let jobInfo: Array<CancelJobInfo | EnqueuedJobInfo> = [];
if (wasSuccessful) {
await Promise.all(
jobInfo = await Promise.all(
imageBuildInfo.jobConfigs.map(async (c) => {
await createOrUpdateJobWithClient(
const status = await createOrUpdateJobWithClient(
c.jobConfig as object as GradingJobConfig,
c.isImmediate, tx
);
return { ...status, response_url: c.clientURL, key: c.clientKey };
})
);
} else {
jobInfoForCancellation = imageBuildInfo.jobConfigs.map(
({clientKey, clientURL}) => ({response_url: clientURL, key: clientKey})
jobInfo = imageBuildInfo.jobConfigs.map(
({ clientKey, clientURL }) => ({ response_url: clientURL, key: clientKey })
);
}
await tx.imageBuildInfo.delete({
where: {
dockerfileSHA: dockerfileSHASum
}
});
return jobInfoForCancellation;
return jobInfo;
});
};

Expand Down
18 changes: 9 additions & 9 deletions orchestrator/packages/db/src/shared/create-or-update-job.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { CollationType, Job, Prisma } from '@prisma/client';
import { GradingJobConfig, imageWithSHAExists, toMilliseconds } from '@codegrade-orca/common';
import { GradingJobConfig, JobStatus, imageWithSHAExists, toMilliseconds } from '@codegrade-orca/common';
import prismaInstance from '../prisma-instance';
import { getAssociatedReservation, immediateJobExists, retireReservationAndJob, submitterJobExists } from '../utils';
import { GradingQueueOperationException } from '../exceptions';
Expand All @@ -25,7 +25,7 @@ export const createOrUpdateJob = (jobConfig: GradingJobConfig, isImmediateJob: b
});
};

export const createOrUpdateJobWithClient = async (jobConfig: GradingJobConfig, isImmediateJob: boolean, tx: Prisma.TransactionClient): Promise<number> => {
export const createOrUpdateJobWithClient = async (jobConfig: GradingJobConfig, isImmediateJob: boolean, tx: Prisma.TransactionClient): Promise<JobStatus> => {
const existingImmediateJob = await immediateJobExists(jobConfig.key, jobConfig.response_url, tx);
const existingSubmitterJob =
await submitterJobExists(jobConfig.key, jobConfig.response_url, tx);
Expand All @@ -41,7 +41,7 @@ export const createOrUpdateJobWithClient = async (jobConfig: GradingJobConfig, i
config: jobConfig as object
}
});
return id;
return { location: 'Queue', id };
}

if (existingSubmitterJob && isImmediateJob) {
Expand All @@ -60,7 +60,7 @@ export const createOrUpdateJobWithClient = async (jobConfig: GradingJobConfig, i
return await (isImmediateJob ? createImmediateJob(jobConfig, tx) : createSubmitterJob(jobConfig, tx));
}

const placeJobInHoldingPen = async (jobConfig: GradingJobConfig, tx: Prisma.TransactionClient): Promise<number> => {
const placeJobInHoldingPen = async (jobConfig: GradingJobConfig, tx: Prisma.TransactionClient): Promise<JobStatus> => {
const { id } = await tx.jobConfigAwaitingImage.create({
data: {
jobConfig: jobConfig as object,
Expand All @@ -69,10 +69,10 @@ const placeJobInHoldingPen = async (jobConfig: GradingJobConfig, tx: Prisma.Tran
imageBuildSHA: jobConfig.grader_image_sha
}
});
return id;
return { location: 'HoldingPen', id };
}

const createImmediateJob = async (jobConfig: GradingJobConfig, tx: Prisma.TransactionClient) => {
const createImmediateJob = async (jobConfig: GradingJobConfig, tx: Prisma.TransactionClient): Promise<JobStatus> => {
const { id } = await tx.job.create({
data: {
clientKey: jobConfig.key,
Expand All @@ -85,10 +85,10 @@ const createImmediateJob = async (jobConfig: GradingJobConfig, tx: Prisma.Transa
jobID: id
}
});
return id;
return { location: 'Queue', id };
};

const createSubmitterJob = async (jobConfig: GradingJobConfig, tx: Prisma.TransactionClient): Promise<number> => {
const createSubmitterJob = async (jobConfig: GradingJobConfig, tx: Prisma.TransactionClient): Promise<JobStatus> => {
const submitter = await tx.submitter.upsert({
where: {
clientURL_collationType_collationID: {
Expand Down Expand Up @@ -122,5 +122,5 @@ const createSubmitterJob = async (jobConfig: GradingJobConfig, tx: Prisma.Transa
}
});

return id;
return { location: 'Queue', id };
};
8 changes: 6 additions & 2 deletions orchestrator/packages/image-build-service/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import {
GraderImageBuildRequest,
toMilliseconds,
isImageBuildResult
isImageBuildResult,
pushStatusUpdate
} from "@codegrade-orca/common";
import { getNextImageBuild, handleCompletedImageBuild } from "@codegrade-orca/db";
import { createAndStoreGraderImage, removeStaleImageFiles } from "./process-request";
import { cleanUpDockerFiles, sendJobResultForBuildFail, removeImageFromDockerIfExists, notifyClientOfBuildResult } from "./utils";
import { EnqueuedJobInfo } from "@codegrade-orca/db/dist/image-builder-operations/handle-completed-image-build";

const LOOP_SLEEP_TIME = 5; // Seconds

Expand All @@ -28,8 +30,10 @@ const main = async () => {
response_url: nextBuildReq.responseURL,
};
const result = await createAndStoreGraderImage(infoAsBuildReq);
await handleCompletedImageBuild(nextBuildReq.dockerfileSHA, true);
// When success is passed as true, we get EnqueuedJobInfo[].
const jobInfo = await handleCompletedImageBuild(nextBuildReq.dockerfileSHA, true) as EnqueuedJobInfo[];
await notifyClientOfBuildResult(result, infoAsBuildReq);
await Promise.all(jobInfo.map(({ key, response_url, ...status }) => pushStatusUpdate(status, response_url, key)));
console.info(`Successfully built image with SHA ${nextBuildReq.dockerfileSHA}.`);
} catch (err) {
if (isImageBuildResult(err) && infoAsBuildReq) {
Expand Down
33 changes: 25 additions & 8 deletions worker/orca_grader/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
from orca_grader.executor.builder.docker_grading_job_executor_builder import DockerGradingJobExecutorBuilder
from orca_grader.executor.builder.grading_job_executor_builder import GradingJobExecutorBuilder
from orca_grader.job_retrieval.local.local_grading_job_retriever import LocalGradingJobRetriever
from orca_grader.job_retrieval.postgres.grading_job_retriever import PostgresGradingJobRetirever
from orca_grader.job_retrieval.postgres.grading_job_retriever import PostgresGradingJobRetriever
from orca_grader.docker_utils.images.utils import does_image_exist_locally
from orca_grader.docker_utils.images.image_loading import retrieve_image_tgz_from_url, load_image_from_tgz
from orca_grader.job_termination.nonblocking_thread_executor import NonBlockingThreadPoolExecutor
from orca_grader.job_termination.stop_worker import GracefulKiller
from orca_grader.validations.exceptions import InvalidGradingJobJSONException
from orca_grader.validations.grading_job import is_valid_grading_job_json
from orca_grader.common.services.push_status import post_job_status_to_client

CONTAINER_WORKING_DIR = '/home/orca-grader'

Expand All @@ -36,7 +37,7 @@ def run_local_job(job_path: str, no_container: bool,

def process_jobs_from_db(no_container: bool,
container_command: List[str] | None):
retriever = PostgresGradingJobRetirever()
retriever = PostgresGradingJobRetriever()
with GracefulKiller() as killer:
with NonBlockingThreadPoolExecutor(max_workers=2) as futures_executor:
stop_future = futures_executor.submit(killer.wait_for_stop_signal)
Expand All @@ -52,11 +53,12 @@ def process_jobs_from_db(no_container: bool,
if job_retrieval_future.exception() is None:
grading_job = job_retrieval_future.result()
if grading_job is not None:
reenqueue_job(grading_job)
updated_db_id = reenqueue_job(grading_job)
inform_client_of_reenqueue(grading_job,
updated_db_id)

if job_retrieval_future in done:
if job_retrieval_future.exception():
# TODO: replace with log statement.
print(job_retrieval_future.exception())
time.sleep(1)
continue
Expand All @@ -68,7 +70,14 @@ def process_jobs_from_db(no_container: bool,
if stop_future in done:
break

print(f"Pulled job with key {grading_job['key']} and url {grading_job['response_url']}")
post_job_status_to_client(
location="Worker",
response_url=grading_job['response_url'],
key=grading_job['key']
)
print(
f"Pulled job with key {grading_job['key']} and url {grading_job['response_url']}"
)

job_execution_future = futures_executor.submit(
run_grading_job, grading_job, no_container, container_command)
Expand All @@ -79,7 +88,8 @@ def process_jobs_from_db(no_container: bool,
# 2. Stop future not done, job future done.
# 3. Stop future done, job future done.
if stop_future in done and job_execution_future in not_done:
reenqueue_job(grading_job)
updated_db_id = reenqueue_job(grading_job)
inform_client_of_reenqueue(grading_job, updated_db_id)

if job_execution_future in done:
if type(job_execution_future.exception()) == InvalidWorkerStateException:
Expand Down Expand Up @@ -114,8 +124,8 @@ def run_grading_job(grading_job: GradingJobJSON, no_container: bool,
except Exception as e:
print(e)
if type(e) == CalledProcessError:
print(e.stdout)
print(e.stderr)
print(e.stdout)
print(e.stderr)
if "response_url" in grading_job:
push_results_with_exception(grading_job, e)
else:
Expand Down Expand Up @@ -164,6 +174,13 @@ def handle_grading_job(grading_job: GradingJobJSON, container_sha: str | None =
print(result.stderr.decode())


def inform_client_of_reenqueue(grading_job: GradingJobJSON,
updated_db_id: int) -> None:
post_job_status_to_client(location="Queue",
response_url=grading_job["response_url"],
key=grading_job["key"])


def can_execute_job(grading_job: GradingJobJSON) -> bool:
try:
return is_valid_grading_job_json(grading_job)
Expand Down
13 changes: 13 additions & 0 deletions worker/orca_grader/common/services/push_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import requests
from typing import Optional


def post_job_status_to_client(location: str, response_url: str,
key: str, id: Optional[int] = None) -> None:
try:
status = {"location": location}
if id is not None:
status["id"] = id
requests.post(response_url, json=status)
except Exception as e:
print(e)
5 changes: 3 additions & 2 deletions worker/orca_grader/db/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ def reenqueue_job(grading_job: GradingJobJSON):
grading_job["client_url"]
)
__delete_more_recent_job_queue_info(session, more_recent_job)
__create_immediate_job(session, {**grading_job,
return __create_immediate_job(session, {**grading_job,
**more_recent_job.config})
else:
__create_immediate_job(session, grading_job)
return __create_immediate_job(session, grading_job)


def __create_immediate_job(session: Session, grading_job: GradingJobJSON):
Expand All @@ -88,6 +88,7 @@ def __create_immediate_job(session: Session, grading_job: GradingJobJSON):
insert(Reservation)
.values(job_id=inserted_job_id, release_at=grading_job["release_at"])
)
return inserted_job_id


def __omit(d: Dict[str, Any], keys: Set[str]) -> Dict[str, Any]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from orca_grader.job_retrieval.grading_job_retriever import GradingJobRetriever


class PostgresGradingJobRetirever(GradingJobRetriever):
class PostgresGradingJobRetriever(GradingJobRetriever):

def retrieve_grading_job(self) -> Optional[GradingJobJSON]:
return get_next_job()

0 comments on commit dad26eb

Please sign in to comment.