Skip to content

Commit

Permalink
chore: deprecate job move within priority group (#9624)
Browse files Browse the repository at this point in the history
  • Loading branch information
kkunapuli authored Jul 10, 2024
1 parent d257b89 commit 1ec5d01
Show file tree
Hide file tree
Showing 26 changed files with 87 additions and 1,035 deletions.
16 changes: 3 additions & 13 deletions docs/model-dev-guide/manage-job-queue.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,12 @@ Available operations include:

- Changing priorities for resource pools (priority scheduler)
- Changing weights for resource pools (fair share scheduler)
- Changing the order of queued jobs
- Changing resource pools

Constraints:

- The priority and fair share fields are mutually exclusive. The priority field is active only for
the priority scheduler, and the fair share field is active only for the fair share scheduler.
- The ``ahead-of``, ``behind-of``, and WebUI **Move to Top** operations are available only for the
priority scheduler and are not supported for the Kubernetes priority scheduler.
- The change resource pool operation can only be performed on experiments. For other tasks, cancel
and resubmit the task to change the resource pool.

Expand All @@ -79,6 +76,8 @@ Modify the Job Queue using the WebUI
#. Click the **Manage Job** option.
#. Make your changes on the pop-up page, and click **OK**.

.. _modify-job-queue-cli:

Modify the Job Queue using the CLI
==================================

Expand All @@ -89,26 +88,17 @@ To modify the job queue using the CLI, use the ``det job update`` command. Run `
$ det job update jobID --priority 10
$ det job update jobID --resource-pool a100
$ det job update jobID --ahead-of jobID-2
To update multiple jobs in a batch, provide updates as shown:

.. code::
$ det job update-batch job1.priority=1 job2.resource-pool="compute" job3.ahead-of=job1
$ det job update-batch job1.priority=1 job2.resource-pool="compute"
Example workflow:

.. code::
$ det job list
# | ID | Type | Job Name | Priority | Submitted | Slots (acquired/needed) | Status | User
-----+--------------------------------------+-----------------+--------------------------+------------+---------------------------+---------
0 | 0d714127 | TYPE_EXPERIMENT | first_job | 42 | 2022-01-01 00:01:00 | 1/1 | STATE_SCHEDULED | user1
1 | 73853c5c | TYPE_EXPERIMENT | second_job | 42 | 2022-01-01 00:01:01 | 0/1 | STATE_QUEUED | user1
$ det job update 73853c5c --ahead-of 0d714127
$ det job list
# | ID | Type | Job Name | Priority | Submitted | Slots (acquired/needed) | Status | User
-----+--------------------------------------+-----------------+--------------------------+------------+---------------------------+---------
Expand Down
7 changes: 7 additions & 0 deletions docs/release-notes/deprecate-job-move.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
:orphan:

**Deprecations**

- Agent and Kubernetes Resource Manager: Jobs can no longer be moved within the same priority
group. To reposition a job, update its priority using the CLI or WebUI. For detailed
instructions, visit :ref:`modify-job-queue-cli`. This change was announced in version 0.33.0.
24 changes: 3 additions & 21 deletions harness/determined/cli/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ def update(args: argparse.Namespace) -> None:
priority=args.priority,
weight=args.weight,
resourcePool=args.resource_pool,
behindOf=args.behind_of,
aheadOf=args.ahead_of,
)
bindings.post_UpdateJobQueue(sess, body=bindings.v1UpdateJobQueueRequest(updates=[update]))

Expand All @@ -111,16 +109,12 @@ def _single_update(
priority: str = "",
weight: str = "",
resource_pool: str = "",
behind_of: str = "",
ahead_of: str = "",
) -> None:
update = bindings.v1QueueControl(
jobId=job_id,
priority=int(priority) if priority != "" else None,
weight=int(weight) if weight != "" else None,
resourcePool=resource_pool if resource_pool != "" else None,
behindOf=behind_of if behind_of != "" else None,
aheadOf=ahead_of if ahead_of != "" else None,
)
bindings.post_UpdateJobQueue(session, body=bindings.v1UpdateJobQueueRequest(updates=[update]))

Expand All @@ -136,11 +130,9 @@ def check_is_priority(pools: bindings.v1GetResourcePoolsResponse, resource_pool:


def validate_operation_args(operation: str) -> dict:
valid_cmds = ("priority", "weight", "resource_pool", "ahead_of", "behind_of")
valid_cmds = ("priority", "weight", "resource_pool")
replacements = {
"resource-pool": "resource_pool",
"ahead-of": "ahead_of",
"behind-of": "behind_of",
}
args = {}
values = operation.split(".")
Expand Down Expand Up @@ -219,16 +211,6 @@ def validate_operation_args(operation: str) -> dict:
type=str,
help="The target resource pool to move the job to.",
),
cli.Arg(
"--ahead-of",
type=str,
help="The job ID of the job to be put ahead of in the queue.",
),
cli.Arg(
"--behind-of",
type=str,
help="The job ID of the job to be put behind in the queue.",
),
),
],
),
Expand All @@ -242,8 +224,8 @@ def validate_operation_args(operation: str) -> dict:
nargs=argparse.ONE_OR_MORE,
type=str,
help="The target job ID(s) and target operation(s), formatted as "
"<jobID>.<operation>=<value>. Operations include priority, weight, "
"resource-pool, ahead-of, and behind-of.",
"<jobID>.<operation>=<value>. Operations include priority, weight, and "
"resource-pool.",
)
],
),
Expand Down
19 changes: 0 additions & 19 deletions master/internal/db/postgres_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"fmt"

"github.com/pkg/errors"
"github.com/shopspring/decimal"
"github.com/uptrace/bun"

"github.com/determined-ai/determined/master/pkg/model"
Expand Down Expand Up @@ -40,20 +38,3 @@ func JobByID(ctx context.Context, jobID model.JobID) (*model.Job, error) {
}
return &j, nil
}

// UpdateJobPosition propagates the new queue position to the job.
func UpdateJobPosition(ctx context.Context, jobID model.JobID, position decimal.Decimal) error {
if jobID.String() == "" {
return errors.Errorf("error modifying job with empty id")
}

j := model.Job{JobID: jobID, QPos: position}
_, err := Bun().NewUpdate().Model(&j).
Column("q_position").
Where("job_id = ?", jobID).
Exec(ctx)
if err != nil {
return fmt.Errorf("updating job position: %w", err)
}
return nil
}
71 changes: 0 additions & 71 deletions master/internal/db/postgres_jobs_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,77 +66,6 @@ func TestJobByID(t *testing.T) {
})
}

func TestUpdateJobPosition(t *testing.T) {
closeDB := setupDBForTest(t)
defer closeDB()

t.Run("update position", func(t *testing.T) {
// create and send job
sendJob, err := createAndAddJob(10)
require.NoError(t, err)

// update job position
newPos := decimal.NewFromInt(5)
err = UpdateJobPosition(context.Background(), sendJob.JobID, newPos)
require.NoError(t, err)

// retrieve job and confirm pos update
recvJob, err := JobByID(context.Background(), sendJob.JobID)
require.NoError(t, err)
assert.Equal(t, newPos.Equal(recvJob.QPos), true)
// expect other fields to remain the same
assert.Equal(t, sendJob.JobID, recvJob.JobID)
assert.Equal(t, sendJob.JobType, recvJob.JobType)
assert.Equal(t, sendJob.OwnerID, recvJob.OwnerID)
})

t.Run("update position - negative value", func(t *testing.T) {
// create and send job
sendJob, err := createAndAddJob(10)
require.NoError(t, err)

// update job position
newPos := decimal.NewFromInt(-5)
err = UpdateJobPosition(context.Background(), sendJob.JobID, newPos)
require.NoError(t, err)

// retrieve job and confirm pos update
recvJob, err := JobByID(context.Background(), sendJob.JobID)
require.NoError(t, err)
assert.Equal(t, newPos.Equal(recvJob.QPos), true)
// expect other fields to remain the same
assert.Equal(t, sendJob.JobID, recvJob.JobID)
assert.Equal(t, sendJob.JobType, recvJob.JobType)
assert.Equal(t, sendJob.OwnerID, recvJob.OwnerID)
})

t.Run("update position - empty ID", func(t *testing.T) {
sendJob, err := createAndAddJob(10)
require.NoError(t, err)

// update job position
newPos := decimal.NewFromInt(5)
err = UpdateJobPosition(context.Background(), model.JobID(""), newPos)
require.Error(t, err)

// retrieve job and ensure queue pos not updated
recvJob, err := JobByID(context.Background(), sendJob.JobID)
require.NoError(t, err)
assert.Equal(t, sendJob.QPos.Equal(recvJob.QPos), true)
})

t.Run("update position - ID does not exist", func(t *testing.T) {
// create and send job
_, err := createAndAddJob(10)
require.NoError(t, err)

// update job position for a job that doesn't exist
newPos := decimal.NewFromInt(5)
err = UpdateJobPosition(context.Background(), model.NewJobID(), newPos)
require.NoError(t, err)
})
}

// TODO [RM-27] initialize db in a TestMain(...) when there's enough package isolation.
func setupDBForTest(t *testing.T) func() {
require.NoError(t, etc.SetRootPath(RootFromDB))
Expand Down
14 changes: 0 additions & 14 deletions master/internal/job/jobservice/jobservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,20 +205,6 @@ func (s *Service) applyUpdate(update *jobv1.QueueControl) error {
s.syslog.Error("resource pool must be set")
}
return j.SetResourcePool(action.ResourcePool)
case *jobv1.QueueControl_AheadOf:
return s.rm.MoveJob(sproto.MoveJob{
ID: jobID,
Anchor: model.JobID(action.AheadOf),
Ahead: true,
ResourcePool: j.ResourcePool(),
})
case *jobv1.QueueControl_BehindOf:
return s.rm.MoveJob(sproto.MoveJob{
ID: jobID,
Anchor: model.JobID(action.BehindOf),
Ahead: false,
ResourcePool: j.ResourcePool(),
})
default:
return fmt.Errorf("unexpected action: %v", action)
}
Expand Down
9 changes: 0 additions & 9 deletions master/internal/rm/agentrm/agent_resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,15 +374,6 @@ func (*ResourceManager) IsReattachableOnlyAfterStarted() bool {
return true
}

// MoveJob implements rm.ResourceManager.
func (a *ResourceManager) MoveJob(msg sproto.MoveJob) error {
pool, err := a.poolByName(msg.ResourcePool)
if err != nil {
return fmt.Errorf("move job found no resource pool with name %s: %w", msg.ResourcePool, err)
}
return pool.MoveJob(msg)
}

// NotifyContainerRunning implements rm.ResourceManager.
func (*ResourceManager) NotifyContainerRunning(sproto.NotifyContainerRunning) error {
// Agent Resource Manager does not implement a handler for the
Expand Down
105 changes: 0 additions & 105 deletions master/internal/rm/agentrm/resource_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,111 +660,6 @@ func (rp *resourcePool) CapacityCheck(msg sproto.CapacityCheck) (sproto.Capacity
}, nil
}

func (rp *resourcePool) MoveJob(msg sproto.MoveJob) error {
rp.mu.Lock()
defer rp.mu.Unlock()
rp.reschedule = true

return rp.moveJob(msg.ID, msg.Anchor, msg.Ahead)
}

func (rp *resourcePool) moveJob(
jobID model.JobID,
anchorID model.JobID,
aheadOf bool,
) error {
if anchorID == "" || jobID == "" || anchorID == jobID {
return nil
}

// check whether the msg belongs to this resource pool or not.
// job messages to agent rm are forwarded to all resource pools.
if _, ok := rp.queuePositions[jobID]; !ok {
return nil
}

if rp.config.Scheduler.GetType() != config.PriorityScheduling {
return fmt.Errorf("unable to perform operation on resource pool with %s",
rp.config.Scheduler.GetType())
}

if _, ok := rp.groups[jobID]; !ok {
return sproto.ErrJobNotFound(jobID)
}
if _, ok := rp.queuePositions[anchorID]; !ok {
return sproto.ErrJobNotFound(anchorID)
}

prioChange, secondAnchor, anchorPriority := tasklist.FindAnchor(
jobID,
anchorID,
aheadOf,
rp.taskList,
rp.groups,
rp.queuePositions,
false,
)

if secondAnchor == "" {
return fmt.Errorf("unable to move job with ID %s", jobID)
}

if secondAnchor == jobID {
return nil
}

if prioChange {
group := rp.groups[jobID]
if group == nil {
return fmt.Errorf("moveJob cannot find group for job %s", jobID)
}
oldPriority := *group.Priority
err := rp.setGroupPriority(sproto.SetGroupPriority{
Priority: anchorPriority,
ResourcePool: rp.config.PoolName,
JobID: jobID,
})
if err != nil {
return err
}

priorityChanger, ok := tasklist.GroupPriorityChangeRegistry.Load(jobID)
if !ok {
return fmt.Errorf("unable to move job with ID %s", jobID)
}

if priorityChanger != nil {
if err := priorityChanger(anchorPriority); err != nil {
_ = rp.setGroupPriority(sproto.SetGroupPriority{
Priority: oldPriority,
ResourcePool: rp.config.PoolName,
JobID: jobID,
})
return err
}
}

if !tasklist.NeedMove(
rp.queuePositions[jobID],
rp.queuePositions[anchorID],
rp.queuePositions[secondAnchor],
aheadOf,
) {
return nil
}
}

jobPosition, err := rp.queuePositions.SetJobPosition(jobID, anchorID, secondAnchor, aheadOf, false)
if err != nil {
return err
}
if err := internaldb.UpdateJobPosition(context.TODO(), jobID, jobPosition); err != nil {
return err
}

return nil
}

func (rp *resourcePool) RecoverJobPosition(msg sproto.RecoverJobPosition) {
rp.mu.Lock()
defer rp.mu.Unlock()
Expand Down
Loading

0 comments on commit 1ec5d01

Please sign in to comment.