Skip to content

Commit

Permalink
fix: historical allocations not showing task allocation workspace (#9496
Browse files Browse the repository at this point in the history
)
  • Loading branch information
eecsliu authored Jun 13, 2024
1 parent 8e9067b commit 382995c
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 12 deletions.
53 changes: 46 additions & 7 deletions e2e_tests/tests/experiment/test_allocation_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@

from determined.common.api import bindings
from tests import api_utils
from tests import cluster as clu
from tests import command as cmd
from tests import config as conf
from tests import experiment as exp
from tests.cluster import utils as cluster_utils

API_URL = "/resources/allocation/allocations-csv?"

Expand All @@ -33,7 +33,7 @@ def test_experiment_capture() -> None:

# Check if an entry exists for experiment that just ran
reader = csv.DictReader(io.StringIO(r.text))
matches = [row for row in reader if int(row["experiment_id"]) == experiment_id]
matches = [row for row in reader if row["experiment_id"] == str(experiment_id)]
assert len(matches) >= 1, f"could not find any rows for experiment {experiment_id}"


Expand All @@ -48,15 +48,20 @@ def test_notebook_capture() -> None:

for line in notebook.stdout:
if re.search("Jupyter Notebook .*is running at", line) is not None:
return
break

assert task_id is not None

end_time = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
sess = api_utils.user_session()
r = sess.get(f"{API_URL}timestamp_after={start_time}&timestamp_before={end_time}")
assert r.status_code == requests.codes.ok, r.text

assert re.search(f"{task_id},NOTEBOOK", r.text) is not None
assert re.search(f"{task_id}.*,NOTEBOOK", r.text) is not None

workspace = cluster_utils.get_task_info(sess, "notebook", task_id).get("workspaceName", None)
assert workspace is not None
assert re.search(f"{workspace},,", r.text) is not None


# Create a No_Op Experiment/Tensorboard & Confirm Tensorboard task is captured
Expand All @@ -76,8 +81,8 @@ def test_tensorboard_experiment_capture() -> None:
["tensorboard", "start", "--detach", str(experiment_id)],
) as tb:
assert tb.task_id
clu.utils.wait_for_task_state(sess, "tensorboard", tb.task_id, "RUNNING")
clu.utils.wait_for_task_state(sess, "tensorboard", tb.task_id, "TERMINATED")
cluster_utils.wait_for_task_state(sess, "tensorboard", tb.task_id, "RUNNING")
cluster_utils.wait_for_task_state(sess, "tensorboard", tb.task_id, "TERMINATED")

# Ensure that end_time captures tensorboard
end_time = (
Expand All @@ -88,8 +93,42 @@ def test_tensorboard_experiment_capture() -> None:

# Confirm Experiment is captured and valid
reader = csv.DictReader(io.StringIO(r.text))
matches = [row for row in reader if int(row["experiment_id"]) == experiment_id]
matches = [row for row in reader if row["experiment_id"] == str(experiment_id)]
assert len(matches) >= 1

# Confirm Tensorboard task is captured
assert re.search(f"{tb.task_id}.*,TENSORBOARD", r.text) is not None

workspace = cluster_utils.get_task_info(sess, "tensorboard", tb.task_id).get(
"workspaceName", None
)
assert workspace is not None
assert re.search(f"{workspace},,", r.text) is not None


# Create a command and confirm that the task is captured.
@pytest.mark.e2e_cpu
def test_cmd_capture() -> None:
sess = api_utils.user_session()
start_time = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")

task_id = None
with cmd.interactive_command(sess, ["cmd", "run", "sleep 10s"]) as sleep_cmd:
task_id = sleep_cmd.task_id

for line in sleep_cmd.stdout:
if re.search("Resources for Command .*have started", line) is not None:
break

assert task_id is not None

end_time = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
sess = api_utils.user_session()
r = sess.get(f"{API_URL}timestamp_after={start_time}&timestamp_before={end_time}")
assert r.status_code == requests.codes.ok, r.text

assert re.search(f"{task_id}.*,COMMAND", r.text) is not None

workspace = cluster_utils.get_task_info(sess, "command", task_id).get("workspaceName", None)
assert workspace is not None
assert re.search(f"{workspace},,", r.text) is not None
32 changes: 27 additions & 5 deletions master/internal/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,15 +459,30 @@ func (m *Master) getResourceAllocations(c echo.Context) error {
Where("ts.event_type = 'IMAGEPULL'").
Group("a.allocation_id")

taskWorkspaceIDs := db.Bun().NewSelect().
ColumnExpr("t.task_id").
ColumnExpr("c.generic_command_spec->'Metadata'->>'workspace_id' AS workspace_id").
TableExpr("tasks_in_range t").
Join("INNER JOIN command_state c ON t.task_id = c.task_id")

taskWorkspaceNames := db.Bun().NewSelect().
ColumnExpr("t.task_id").
ColumnExpr("w.name as workspace_name").
With("task_wids", taskWorkspaceIDs).
TableExpr("task_wids t").
Join("INNER JOIN workspaces w ON t.workspace_id::int=w.id")

// Get experiment info for tasks within time range
taskExperimentInfo := db.Bun().NewSelect().
ColumnExpr("t.task_id").
ColumnExpr("e.id as experiment_id").
ColumnExpr("w.name as workspace_name").
ColumnExpr("COALESCE(w.name, task_wnames.workspace_name) as workspace_name").
With("task_wnames", taskWorkspaceNames).
TableExpr("tasks_in_range t").
Join("INNER JOIN experiments e ON t.job_id = e.job_id").
Join("INNER JOIN projects p ON e.project_id = p.id").
Join("INNER JOIN workspaces w ON p.workspace_id = w.id")
Join("LEFT JOIN experiments e ON t.job_id = e.job_id").
Join("LEFT JOIN projects p ON e.project_id = p.id").
Join("LEFT JOIN workspaces w ON p.workspace_id = w.id").
Join("LEFT JOIN task_wnames ON task_wnames.task_id=t.task_id")

// Get task information row-by-row for all tasks in time range
rows, err := db.Bun().NewSelect().
Expand Down Expand Up @@ -533,6 +548,13 @@ func (m *Master) getResourceAllocations(c echo.Context) error {
return err
}

nullIfZero := func(val int) string {
if val == 0 {
return ""
}
return strconv.Itoa(val)
}

// Write each entry to the output CSV
for rows.Next() {
allocationMetadata := new(AllocationMetadata)
Expand All @@ -544,7 +566,7 @@ func (m *Master) getResourceAllocations(c echo.Context) error {
string(allocationMetadata.TaskType),
allocationMetadata.Username,
allocationMetadata.WorkspaceName,
strconv.Itoa(allocationMetadata.ExperimentID),
nullIfZero(allocationMetadata.ExperimentID),
strconv.Itoa(allocationMetadata.Slots),
formatTimestamp(allocationMetadata.StartTime),
formatTimestamp(allocationMetadata.EndTime),
Expand Down

0 comments on commit 382995c

Please sign in to comment.