Skip to content

Commit

Permalink
Merge branch 'main' into feat/job-cancellation-orchestrator
Browse files Browse the repository at this point in the history
  • Loading branch information
williams-jack committed Jul 20, 2024
2 parents dd966d6 + 3c7d80e commit e46a647
Show file tree
Hide file tree
Showing 12 changed files with 344 additions and 245 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ const bashGradingScriptCommand = {
on_complete: {
anyOf: [
{
$ref: "https://orca-schemas.com/grading-job-config/grading-script-command",
type: "array",
items: {
$ref: "https://orca-schemas.com/grading-job-config/grading-script-command",
}
},
{ type: "string" },
{ type: "number" },
Expand All @@ -27,7 +30,10 @@ const bashGradingScriptCommand = {
on_fail: {
anyOf: [
{
$ref: "https://orca-schemas.com/grading-job-config/grading-script-command",
type: "array",
items: {
$ref: "https://orca-schemas.com/grading-job-config/grading-script-command",
}
},
{ type: "string" },
{ type: "number" },
Expand Down Expand Up @@ -56,7 +62,10 @@ const conditionalGradingScriptCommand = {
on_true: {
anyOf: [
{
$ref: "https://orca-schemas.com/grading-job-config/grading-script-command",
type: "array",
items: {
$ref: "https://orca-schemas.com/grading-job-config/grading-script-command",
}
},
{ type: "string" },
{ type: "number" },
Expand All @@ -65,7 +74,10 @@ const conditionalGradingScriptCommand = {
on_false: {
anyOf: [
{
$ref: "https://orca-schemas.com/grading-job-config/grading-script-command",
type: "array",
items: {
$ref: "https://orca-schemas.com/grading-job-config/grading-script-command",
}
},
{ type: "string" },
{ type: "number" },
Expand Down
2 changes: 1 addition & 1 deletion orchestrator/packages/db/run-migration
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ if [[ $? -ne 0 ]]; then
exit 1
fi

export POSTGRES_URL=postgresql://postgres:password@localhost
export POSTGRES_URL=postgresql://postgres:password@localhost:5434
npx prisma migrate dev --name $1
docker stop "$container_name" > /dev/null
5 changes: 3 additions & 2 deletions worker/orca_grader/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def process_jobs_from_db(no_container: bool,
if job_retrieval_future.exception():
# TODO: replace with log statement.
print(job_retrieval_future.exception())
time.sleep(1)
continue
grading_job = job_retrieval_future.result()
if grading_job is None:
Expand Down Expand Up @@ -149,9 +150,9 @@ def handle_grading_job(grading_job: GradingJobJSON, container_sha: str | None =
result = executor.execute()
if result and result.stdout:
# TODO: make this a log statement of some sort.
print(result.stderr.decode())
elif result and result.stderr:
print(result.stdout.decode())
elif result and result.stderr:
print(result.stderr.decode())


def can_execute_job(grading_job: GradingJobJSON) -> bool:
Expand Down
13 changes: 9 additions & 4 deletions worker/orca_grader/common/grading_job/grading_job_result.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import List, Optional
from typing import Dict, List, Optional
from orca_grader.container.grading_script.grading_script_command_response import GradingScriptCommandResponse
from orca_grader.common.types.grading_job_json_types import GradingJobResultJSON



class GradingJobResult:

def __init__(self, command_responses: List[GradingScriptCommandResponse],
Expand All @@ -21,10 +22,14 @@ def get_output(self) -> Optional[str]:
def get_execution_errors(self) -> List[Exception]:
return self.__execution_errors

def to_json(self) -> GradingJobResultJSON:
result = {"type": "GradingJobResult"}
def to_json(self, interpolated_dirs: Dict[str, str]) -> GradingJobResultJSON:
result = dict()
json_responses = list(
map(lambda c: c.to_json(), self.__command_responses))
map(
lambda c: c.to_json(interpolated_dirs=interpolated_dirs),
self.__command_responses
)
)
result["shell_responses"] = json_responses
if self.__execution_errors is not None:
result["errors"] = list(
Expand Down
17 changes: 11 additions & 6 deletions worker/orca_grader/common/services/push_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import time
import requests
from requests import HTTPError
from typing import Dict, Optional
from orca_grader.common.grading_job.grading_job_result import GradingJobResult
from orca_grader.common.services.exceptions import PushResultsFailureException
from orca_grader.common.types.grading_job_json_types import GradingJobJSON
Expand All @@ -11,18 +12,22 @@
# TODO: Update if POST data format changes, or simply remove this comment once solidifed.


def push_results_to_response_url(job_result: GradingJobResult, key: str,
response_url: str) -> None:
result_as_json = job_result.to_json()
result_as_json["key"] = key
def push_results_to_response_url(job_result: GradingJobResult,
key: str,
response_url: str,
interpolated_dirs: Dict[str, str]) -> None:
result_as_json = {
**job_result.to_json(interpolated_dirs=interpolated_dirs),
"key": key
}
_send_results_with_exponential_backoff(result_as_json, response_url)


def push_results_with_exception(grading_job: GradingJobJSON,
e: Exception) -> None:
output = GradingJobResult([], [e])
output = GradingJobResult([], [e]).to_json()
key, response_url = grading_job["key"], grading_job["response_url"]
push_results_to_response_url(output, key, response_url)
push_results_to_response_url(output, key, response_url, {})


def _send_results_with_exponential_backoff(payload: dict, response_url: str, n: int = 1):
Expand Down
167 changes: 87 additions & 80 deletions worker/orca_grader/container/build_script/preprocess/preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,91 +11,98 @@
from orca_grader.container.build_script.json_helpers.grading_script_command import RESERVED_KEYWORDS, is_bash_command, is_conditional_command
from orca_grader.common.types.grading_job_json_types import GradingScriptCommandJSON

DEFAULT_COMMAND_TIMEOUT = 60 # 1 minute
DEFAULT_COMMAND_TIMEOUT = 60 # 1 minute


class GradingScriptPreprocessor:

def __init__(self, secret: str, json_cmds: List[GradingScriptCommandJSON],
code_files: Dict[str, CodeFileInfo], code_file_processor: CodeFileProcessor,
cmd_timeout: int = DEFAULT_COMMAND_TIMEOUT) -> None:
flattened_cmds = flatten_grading_script(json_cmds)
if CycleDetector.contains_cycle(flattened_cmds):
raise NotADAGException()
self.__interpolated_dirs = {
"$DOWNLOADED": f"{secret}/downloaded",
"$EXTRACTED": f"{secret}/extracted",
"$BUILD": f"{secret}/build"
}
self.__code_file_processor = code_file_processor
self.__json_cmds = flattened_cmds
self.__code_files = code_files
self.__cmds = [None for _ in range(len(flattened_cmds))]
self.__cmd_timeout = cmd_timeout

def preprocess_job(self) -> GradingScriptCommand:
self.__download_and_process_code_files()
script = self.__generate_grading_script()
return script
def __init__(self, secret: str, json_cmds: List[GradingScriptCommandJSON],
code_files: Dict[str, CodeFileInfo], code_file_processor: CodeFileProcessor,
cmd_timeout: int = DEFAULT_COMMAND_TIMEOUT) -> None:
flattened_cmds = flatten_grading_script(json_cmds)
if CycleDetector.contains_cycle(flattened_cmds):
raise NotADAGException()
self.__interpolated_dirs = {
"$DOWNLOADED": f"{secret}/downloaded",
"$EXTRACTED": f"{secret}/extracted",
"$BUILD": f"{secret}/build"
}
self.__code_file_processor = code_file_processor
self.__json_cmds = flattened_cmds
self.__code_files = code_files
self.__cmds = [None for _ in range(len(flattened_cmds))]
self.__cmd_timeout = cmd_timeout

def preprocess_job(self) -> GradingScriptCommand:
self.__download_and_process_code_files()
script = self.__generate_grading_script()
return script

def __download_and_process_code_files(self) -> None:
"""
Given a list of CodeFileInfo objects, download and extract (if necessary) each one.
"""
self.__create_script_dirs()
download_dir = self.__interpolated_dirs["$DOWNLOADED"]
extract_dir = self.__interpolated_dirs["$EXTRACTED"]
for name, code_file in self.__code_files.items():
file_download_dir = os.path.join(download_dir, name)
file_extract_dir = os.path.join(extract_dir, name)
self.__code_file_processor.process_file(code_file, file_download_dir,
file_extract_dir)

def __generate_grading_script(self) -> GradingScriptCommand:
for i in range(len(self.__cmds)):
if self.__cmds[i] is None:
self.__get_grading_command_by_index(i)
return self.__cmds[0]

def __download_and_process_code_files(self) -> None:
"""
Given a list of CodeFileInfo objects, download and extract (if necessary) each one.
"""
self.__create_script_dirs()
download_dir = self.__interpolated_dirs["$DOWNLOADED"]
extract_dir = self.__interpolated_dirs["$EXTRACTED"]
for name, code_file in self.__code_files.items():
file_download_dir = os.path.join(download_dir, name)
file_extract_dir = os.path.join(extract_dir, name)
self.__code_file_processor.process_file(code_file, file_download_dir,
file_extract_dir)
def __get_grading_command_by_index(self, index: int) -> GradingScriptCommand:
if self.__cmds[index] is not None:
return self.__cmds[index]
if is_bash_command(self.__json_cmds[index]):
return self.__process_bash_command_json(self.__json_cmds[index], index)
elif is_conditional_command(self.__json_cmds[index]):
return self.__process_conditional_command_json(self.__json_cmds[index], index)
else:
raise InvalidGradingScriptCommand()

def __generate_grading_script(self) -> GradingScriptCommand:
for i in range(len(self.__cmds)):
if self.__cmds[i] is None:
self.__get_grading_command_by_index(i)
return self.__cmds[0]
def __process_bash_command_json(self, json_command: GradingScriptCommandJSON, index: int) -> GradingScriptCommand:
shell_cmd: str | List[str] = self.__add_interpolated_paths(
json_command["cmd"])
on_fail, on_complete = json_command["on_fail"], json_command["on_complete"]
working_dir = self.__add_interpolated_paths(
json_command["working_dir"]) if "working_dir" in json_command else None
cmd = BashGradingScriptCommand(shell_cmd,
on_complete=self.__get_grading_command_by_index(
on_complete) if on_complete != "output" else None,
on_fail=self.__get_grading_command_by_index(
on_fail) if on_fail != "abort" else None,
timeout=json_command["timeout"] if "timeout" in json_command else self.__cmd_timeout,
working_dir=working_dir)
self.__cmds[index] = cmd
return cmd

def __get_grading_command_by_index(self, index: int) -> GradingScriptCommand:
if self.__cmds[index] is not None:
return self.__cmds[index]
if is_bash_command(self.__json_cmds[index]):
return self.__process_bash_command_json(self.__json_cmds[index], index)
elif is_conditional_command(self.__json_cmds[index]):
return self.__process_conditional_command_json(self.__json_cmds[index], index)
else:
raise InvalidGradingScriptCommand()
def __process_conditional_command_json(self, json_command: GradingScriptCommandJSON, index: int):
conditional: Dict[str, str] = json_command["condition"]
predicate: GradingScriptPredicate = GradingScriptPredicate(
conditional["predicate"])
fs_path: str = self.__add_interpolated_paths(conditional["path"])
on_false, on_true = json_command["on_false"], json_command["on_true"]
cmd = ConditionalGradingScriptCommand(self.__get_grading_command_by_index(on_true),
self.__get_grading_command_by_index(on_false), fs_path, predicate)
self.__cmds[index] = cmd
return cmd

def __process_bash_command_json(self, json_command: GradingScriptCommandJSON, index: int) -> GradingScriptCommand:
shell_cmd: str | List[str] = self.__add_interpolated_paths(json_command["cmd"])
on_fail, on_complete = json_command["on_fail"], json_command["on_complete"]
working_dir = self.__add_interpolated_paths(json_command["working_dir"]) if "working_dir" in json_command else None
cmd = BashGradingScriptCommand(shell_cmd,
on_complete=self.__get_grading_command_by_index(on_complete) if on_complete != "output" else None,
on_fail=self.__get_grading_command_by_index(on_fail) if on_fail != "abort" else None,
timeout=json_command["timeout"] if "timeout" in json_command else self.__cmd_timeout,
working_dir=working_dir)
self.__cmds[index] = cmd
return cmd

def __process_conditional_command_json(self, json_command: GradingScriptCommandJSON, index: int):
conditional: Dict[str, str] = json_command["condition"]
predicate: GradingScriptPredicate = GradingScriptPredicate(conditional["predicate"])
fs_path: str = self.__add_interpolated_paths(conditional["path"])
on_false, on_true = json_command["on_false"], json_command["on_true"]
cmd = ConditionalGradingScriptCommand(self.__get_grading_command_by_index(on_true),
self.__get_grading_command_by_index(on_false), fs_path, predicate)
self.__cmds[index] = cmd
return cmd
def __add_interpolated_paths(self, cmd: str | List[str]) -> str | List[str]:
formatted_cmd = cmd
for var in self.__interpolated_dirs:
formatted_cmd = formatted_cmd.replace(var, self.__interpolated_dirs[var]) if type(cmd) == str \
else list(map(lambda prog_arg: prog_arg.replace(var, self.__interpolated_dirs[var]), formatted_cmd))
return formatted_cmd

def __add_interpolated_paths(self, cmd: str | List[str]) -> str | List[str]:
formatted_cmd = cmd
for var in self.__interpolated_dirs:
formatted_cmd = formatted_cmd.replace(var, self.__interpolated_dirs[var]) if type(cmd) == str \
else list(map(lambda prog_arg: prog_arg.replace(var, self.__interpolated_dirs[var]), formatted_cmd))
return formatted_cmd

def __create_script_dirs(self) -> None:
for item in self.__interpolated_dirs.items():
path_var, dir = item
os.makedirs(dir, exist_ok=(path_var == "$ASSETS")) # If the download, extract, or build dir already exists, something has gone very wrong...
def __create_script_dirs(self) -> None:
for item in self.__interpolated_dirs.items():
path_var, dir = item
# If the download, extract, or build dir already exists, something has gone very wrong...
os.makedirs(dir, exist_ok=(path_var == "$ASSETS"))
17 changes: 10 additions & 7 deletions worker/orca_grader/container/do_grading.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ def do_grading(secret: str, grading_job_json: GradingJobJSON) -> GradingJobResul
output = GradingJobResult(command_responses, [preprocess_e])
except Exception as e:
output = GradingJobResult(command_responses, [e])
print(output.to_json())
print(output.to_json(interpolated_dirs=interpolated_dirs))
reverse_interpolated_dirs = {v: k for k, v in interpolated_dirs.items()}
push_results_to_response_url(output,
grading_job_json["key"],
grading_job_json["container_response_url"] if
"container_response_url" in grading_job_json else
grading_job_json["response_url"])
grading_job_json["container_response_url"]
if "container_response_url" in grading_job_json else
grading_job_json["response_url"],
interpolated_dirs=reverse_interpolated_dirs)
return output


Expand Down Expand Up @@ -79,7 +81,8 @@ def cleanup(secret: str) -> None:
output = GradingJobResult([], [e.with_traceback(None)])
push_results_to_response_url(output,
grading_job["key"],
grading_job["container_response_url"] if
"container_response_url" in grading_job else
grading_job["response_url"])
grading_job["container_response_url"]
if "container_response_url" in grading_job else
grading_job["response_url"],
interpolated_dirs={})
# cleanup(secret) # useful for execution with no container, but generally optional
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
from orca_grader.common.grading_job.grading_job_result import GradingJobResult
from orca_grader.container.grading_script.grading_script_command_response import GradingScriptCommandResponse


class GradingScriptCommand:
"""
Represents a single command in a grading script. Either executes a predicate/check
or a bash command.
"""
"""
Represents a single command in a grading script. Either executes a predicate/check
or a bash command.
"""

def execute(self, responses: List[GradingScriptCommandResponse]) -> GradingJobResult:
pass
def execute(self, responses: List[GradingScriptCommandResponse]) -> GradingJobResult:
pass
Loading

0 comments on commit e46a647

Please sign in to comment.