diff --git a/docs/model-dev-guide/manage-job-queue.rst b/docs/model-dev-guide/manage-job-queue.rst index ab48ab92ef7..70c207b23af 100644 --- a/docs/model-dev-guide/manage-job-queue.rst +++ b/docs/model-dev-guide/manage-job-queue.rst @@ -58,15 +58,12 @@ Available operations include: - Changing priorities for resource pools (priority scheduler) - Changing weights for resource pools (fair share scheduler) -- Changing the order of queued jobs - Changing resource pools Constraints: - The priority and fair share fields are mutually exclusive. The priority field is active only for the priority scheduler, and the fair share field is active only for the fair share scheduler. -- The ``ahead-of``, ``behind-of``, and WebUI **Move to Top** operations are available only for the - priority scheduler and are not supported for the Kubernetes priority scheduler. - The change resource pool operation can only be performed on experiments. For other tasks, cancel and resubmit the task to change the resource pool. @@ -79,6 +76,8 @@ Modify the Job Queue using the WebUI #. Click the **Manage Job** option. #. Make your changes on the pop-up page, and click **OK**. +.. _modify-job-queue-cli: + Modify the Job Queue using the CLI ================================== @@ -89,26 +88,17 @@ To modify the job queue using the CLI, use the ``det job update`` command. Run ` $ det job update jobID --priority 10 $ det job update jobID --resource-pool a100 - $ det job update jobID --ahead-of jobID-2 To update multiple jobs in a batch, provide updates as shown: .. code:: - $ det job update-batch job1.priority=1 job2.resource-pool="compute" job3.ahead-of=job1 + $ det job update-batch job1.priority=1 job2.resource-pool="compute" Example workflow: .. code:: - $ det job list - # | ID | Type | Job Name | Priority | Submitted | Slots (acquired/needed) | Status | User - -----+--------------------------------------+-----------------+--------------------------+------------+---------------------------+--------- - 0 | 0d714127 | TYPE_EXPERIMENT | first_job | 42 | 2022-01-01 00:01:00 | 1/1 | STATE_SCHEDULED | user1 - 1 | 73853c5c | TYPE_EXPERIMENT | second_job | 42 | 2022-01-01 00:01:01 | 0/1 | STATE_QUEUED | user1 - - $ det job update 73853c5c --ahead-of 0d714127 - $ det job list # | ID | Type | Job Name | Priority | Submitted | Slots (acquired/needed) | Status | User -----+--------------------------------------+-----------------+--------------------------+------------+---------------------------+--------- diff --git a/docs/release-notes/deprecate-job-move.rst b/docs/release-notes/deprecate-job-move.rst new file mode 100644 index 00000000000..da8168dc2ce --- /dev/null +++ b/docs/release-notes/deprecate-job-move.rst @@ -0,0 +1,7 @@ +:orphan: + +**Deprecations** + +- Agent and Kubernetes Resource Manager: Jobs can no longer be moved within the same priority + group. To reposition a job, update its priority using the CLI or WebUI. For detailed + instructions, visit :ref:`modify-job-queue-cli`. This change was announced in version 0.33.0. diff --git a/harness/determined/cli/job.py b/harness/determined/cli/job.py index 779955b9095..a71f1768046 100644 --- a/harness/determined/cli/job.py +++ b/harness/determined/cli/job.py @@ -92,8 +92,6 @@ def update(args: argparse.Namespace) -> None: priority=args.priority, weight=args.weight, resourcePool=args.resource_pool, - behindOf=args.behind_of, - aheadOf=args.ahead_of, ) bindings.post_UpdateJobQueue(sess, body=bindings.v1UpdateJobQueueRequest(updates=[update])) @@ -111,16 +109,12 @@ def _single_update( priority: str = "", weight: str = "", resource_pool: str = "", - behind_of: str = "", - ahead_of: str = "", ) -> None: update = bindings.v1QueueControl( jobId=job_id, priority=int(priority) if priority != "" else None, weight=int(weight) if weight != "" else None, resourcePool=resource_pool if resource_pool != "" else None, - behindOf=behind_of if behind_of != "" else None, - aheadOf=ahead_of if ahead_of != "" else None, ) bindings.post_UpdateJobQueue(session, body=bindings.v1UpdateJobQueueRequest(updates=[update])) @@ -136,11 +130,9 @@ def check_is_priority(pools: bindings.v1GetResourcePoolsResponse, resource_pool: def validate_operation_args(operation: str) -> dict: - valid_cmds = ("priority", "weight", "resource_pool", "ahead_of", "behind_of") + valid_cmds = ("priority", "weight", "resource_pool") replacements = { "resource-pool": "resource_pool", - "ahead-of": "ahead_of", - "behind-of": "behind_of", } args = {} values = operation.split(".") @@ -219,16 +211,6 @@ def validate_operation_args(operation: str) -> dict: type=str, help="The target resource pool to move the job to.", ), - cli.Arg( - "--ahead-of", - type=str, - help="The job ID of the job to be put ahead of in the queue.", - ), - cli.Arg( - "--behind-of", - type=str, - help="The job ID of the job to be put behind in the queue.", - ), ), ], ), @@ -242,8 +224,8 @@ def validate_operation_args(operation: str) -> dict: nargs=argparse.ONE_OR_MORE, type=str, help="The target job ID(s) and target operation(s), formatted as " - ".=. Operations include priority, weight, " - "resource-pool, ahead-of, and behind-of.", + ".=. Operations include priority, weight, and " + "resource-pool.", ) ], ), diff --git a/master/internal/db/postgres_jobs.go b/master/internal/db/postgres_jobs.go index 081a463965b..51f2ef39a5a 100644 --- a/master/internal/db/postgres_jobs.go +++ b/master/internal/db/postgres_jobs.go @@ -4,8 +4,6 @@ import ( "context" "fmt" - "github.com/pkg/errors" - "github.com/shopspring/decimal" "github.com/uptrace/bun" "github.com/determined-ai/determined/master/pkg/model" @@ -40,20 +38,3 @@ func JobByID(ctx context.Context, jobID model.JobID) (*model.Job, error) { } return &j, nil } - -// UpdateJobPosition propagates the new queue position to the job. -func UpdateJobPosition(ctx context.Context, jobID model.JobID, position decimal.Decimal) error { - if jobID.String() == "" { - return errors.Errorf("error modifying job with empty id") - } - - j := model.Job{JobID: jobID, QPos: position} - _, err := Bun().NewUpdate().Model(&j). - Column("q_position"). - Where("job_id = ?", jobID). - Exec(ctx) - if err != nil { - return fmt.Errorf("updating job position: %w", err) - } - return nil -} diff --git a/master/internal/db/postgres_jobs_intg_test.go b/master/internal/db/postgres_jobs_intg_test.go index 028d980f168..80e03fb7d78 100644 --- a/master/internal/db/postgres_jobs_intg_test.go +++ b/master/internal/db/postgres_jobs_intg_test.go @@ -66,77 +66,6 @@ func TestJobByID(t *testing.T) { }) } -func TestUpdateJobPosition(t *testing.T) { - closeDB := setupDBForTest(t) - defer closeDB() - - t.Run("update position", func(t *testing.T) { - // create and send job - sendJob, err := createAndAddJob(10) - require.NoError(t, err) - - // update job position - newPos := decimal.NewFromInt(5) - err = UpdateJobPosition(context.Background(), sendJob.JobID, newPos) - require.NoError(t, err) - - // retrieve job and confirm pos update - recvJob, err := JobByID(context.Background(), sendJob.JobID) - require.NoError(t, err) - assert.Equal(t, newPos.Equal(recvJob.QPos), true) - // expect other fields to remain the same - assert.Equal(t, sendJob.JobID, recvJob.JobID) - assert.Equal(t, sendJob.JobType, recvJob.JobType) - assert.Equal(t, sendJob.OwnerID, recvJob.OwnerID) - }) - - t.Run("update position - negative value", func(t *testing.T) { - // create and send job - sendJob, err := createAndAddJob(10) - require.NoError(t, err) - - // update job position - newPos := decimal.NewFromInt(-5) - err = UpdateJobPosition(context.Background(), sendJob.JobID, newPos) - require.NoError(t, err) - - // retrieve job and confirm pos update - recvJob, err := JobByID(context.Background(), sendJob.JobID) - require.NoError(t, err) - assert.Equal(t, newPos.Equal(recvJob.QPos), true) - // expect other fields to remain the same - assert.Equal(t, sendJob.JobID, recvJob.JobID) - assert.Equal(t, sendJob.JobType, recvJob.JobType) - assert.Equal(t, sendJob.OwnerID, recvJob.OwnerID) - }) - - t.Run("update position - empty ID", func(t *testing.T) { - sendJob, err := createAndAddJob(10) - require.NoError(t, err) - - // update job position - newPos := decimal.NewFromInt(5) - err = UpdateJobPosition(context.Background(), model.JobID(""), newPos) - require.Error(t, err) - - // retrieve job and ensure queue pos not updated - recvJob, err := JobByID(context.Background(), sendJob.JobID) - require.NoError(t, err) - assert.Equal(t, sendJob.QPos.Equal(recvJob.QPos), true) - }) - - t.Run("update position - ID does not exist", func(t *testing.T) { - // create and send job - _, err := createAndAddJob(10) - require.NoError(t, err) - - // update job position for a job that doesn't exist - newPos := decimal.NewFromInt(5) - err = UpdateJobPosition(context.Background(), model.NewJobID(), newPos) - require.NoError(t, err) - }) -} - // TODO [RM-27] initialize db in a TestMain(...) when there's enough package isolation. func setupDBForTest(t *testing.T) func() { require.NoError(t, etc.SetRootPath(RootFromDB)) diff --git a/master/internal/job/jobservice/jobservice.go b/master/internal/job/jobservice/jobservice.go index 797f5a9ca1f..e0f7864a214 100644 --- a/master/internal/job/jobservice/jobservice.go +++ b/master/internal/job/jobservice/jobservice.go @@ -205,20 +205,6 @@ func (s *Service) applyUpdate(update *jobv1.QueueControl) error { s.syslog.Error("resource pool must be set") } return j.SetResourcePool(action.ResourcePool) - case *jobv1.QueueControl_AheadOf: - return s.rm.MoveJob(sproto.MoveJob{ - ID: jobID, - Anchor: model.JobID(action.AheadOf), - Ahead: true, - ResourcePool: j.ResourcePool(), - }) - case *jobv1.QueueControl_BehindOf: - return s.rm.MoveJob(sproto.MoveJob{ - ID: jobID, - Anchor: model.JobID(action.BehindOf), - Ahead: false, - ResourcePool: j.ResourcePool(), - }) default: return fmt.Errorf("unexpected action: %v", action) } diff --git a/master/internal/rm/agentrm/agent_resource_manager.go b/master/internal/rm/agentrm/agent_resource_manager.go index a9b6c2c4e39..191f4ba1e7a 100644 --- a/master/internal/rm/agentrm/agent_resource_manager.go +++ b/master/internal/rm/agentrm/agent_resource_manager.go @@ -374,15 +374,6 @@ func (*ResourceManager) IsReattachableOnlyAfterStarted() bool { return true } -// MoveJob implements rm.ResourceManager. -func (a *ResourceManager) MoveJob(msg sproto.MoveJob) error { - pool, err := a.poolByName(msg.ResourcePool) - if err != nil { - return fmt.Errorf("move job found no resource pool with name %s: %w", msg.ResourcePool, err) - } - return pool.MoveJob(msg) -} - // NotifyContainerRunning implements rm.ResourceManager. func (*ResourceManager) NotifyContainerRunning(sproto.NotifyContainerRunning) error { // Agent Resource Manager does not implement a handler for the diff --git a/master/internal/rm/agentrm/resource_pool.go b/master/internal/rm/agentrm/resource_pool.go index f39e335aeb5..2e7ee248ab0 100644 --- a/master/internal/rm/agentrm/resource_pool.go +++ b/master/internal/rm/agentrm/resource_pool.go @@ -660,111 +660,6 @@ func (rp *resourcePool) CapacityCheck(msg sproto.CapacityCheck) (sproto.Capacity }, nil } -func (rp *resourcePool) MoveJob(msg sproto.MoveJob) error { - rp.mu.Lock() - defer rp.mu.Unlock() - rp.reschedule = true - - return rp.moveJob(msg.ID, msg.Anchor, msg.Ahead) -} - -func (rp *resourcePool) moveJob( - jobID model.JobID, - anchorID model.JobID, - aheadOf bool, -) error { - if anchorID == "" || jobID == "" || anchorID == jobID { - return nil - } - - // check whether the msg belongs to this resource pool or not. - // job messages to agent rm are forwarded to all resource pools. - if _, ok := rp.queuePositions[jobID]; !ok { - return nil - } - - if rp.config.Scheduler.GetType() != config.PriorityScheduling { - return fmt.Errorf("unable to perform operation on resource pool with %s", - rp.config.Scheduler.GetType()) - } - - if _, ok := rp.groups[jobID]; !ok { - return sproto.ErrJobNotFound(jobID) - } - if _, ok := rp.queuePositions[anchorID]; !ok { - return sproto.ErrJobNotFound(anchorID) - } - - prioChange, secondAnchor, anchorPriority := tasklist.FindAnchor( - jobID, - anchorID, - aheadOf, - rp.taskList, - rp.groups, - rp.queuePositions, - false, - ) - - if secondAnchor == "" { - return fmt.Errorf("unable to move job with ID %s", jobID) - } - - if secondAnchor == jobID { - return nil - } - - if prioChange { - group := rp.groups[jobID] - if group == nil { - return fmt.Errorf("moveJob cannot find group for job %s", jobID) - } - oldPriority := *group.Priority - err := rp.setGroupPriority(sproto.SetGroupPriority{ - Priority: anchorPriority, - ResourcePool: rp.config.PoolName, - JobID: jobID, - }) - if err != nil { - return err - } - - priorityChanger, ok := tasklist.GroupPriorityChangeRegistry.Load(jobID) - if !ok { - return fmt.Errorf("unable to move job with ID %s", jobID) - } - - if priorityChanger != nil { - if err := priorityChanger(anchorPriority); err != nil { - _ = rp.setGroupPriority(sproto.SetGroupPriority{ - Priority: oldPriority, - ResourcePool: rp.config.PoolName, - JobID: jobID, - }) - return err - } - } - - if !tasklist.NeedMove( - rp.queuePositions[jobID], - rp.queuePositions[anchorID], - rp.queuePositions[secondAnchor], - aheadOf, - ) { - return nil - } - } - - jobPosition, err := rp.queuePositions.SetJobPosition(jobID, anchorID, secondAnchor, aheadOf, false) - if err != nil { - return err - } - if err := internaldb.UpdateJobPosition(context.TODO(), jobID, jobPosition); err != nil { - return err - } - - return nil -} - func (rp *resourcePool) RecoverJobPosition(msg sproto.RecoverJobPosition) { rp.mu.Lock() defer rp.mu.Unlock() diff --git a/master/internal/rm/agentrm/resource_pool_test.go b/master/internal/rm/agentrm/resource_pool_test.go index a2efef8fb4c..154a38f5667 100644 --- a/master/internal/rm/agentrm/resource_pool_test.go +++ b/master/internal/rm/agentrm/resource_pool_test.go @@ -5,7 +5,6 @@ import ( "time" "github.com/google/uuid" - "github.com/shopspring/decimal" "github.com/stretchr/testify/require" "gotest.tools/assert" @@ -230,234 +229,3 @@ func TestSettingGroupPriority(t *testing.T) { assert.Check(t, rp.groups[jobID] == nil) rp.mu.Unlock() } - -func setupRPSamePriority(t *testing.T) *resourcePool { - defaultPriority := 50 - config := config.ResourcePoolConfig{ - Scheduler: &config.SchedulerConfig{ - Priority: &config.PrioritySchedulerConfig{ - DefaultPriority: &defaultPriority, - }, - FittingPolicy: best, - }, - } - - rp := setupResourcePool(t, nil, &config, nil, nil, nil) - - rp.queuePositions = map[model.JobID]decimal.Decimal{ - "job1": decimal.New(100, 1000), - "job2": decimal.New(200, 1000), - "job3": decimal.New(300, 1000), - } - - rp.groups = map[model.JobID]*tasklist.Group{ - "job1": {Priority: &defaultPriority}, - "job2": {Priority: &defaultPriority}, - "job3": {Priority: &defaultPriority}, - } - - rp.taskList.AddTask(&sproto.AllocateRequest{ - AllocationID: "allocation1", - JobID: "job1", - }) - rp.taskList.AddTask(&sproto.AllocateRequest{ - AllocationID: "allocation2", - JobID: "job2", - }) - rp.taskList.AddTask(&sproto.AllocateRequest{ - AllocationID: "allocation3", - JobID: "job3", - }) - - return rp -} - -func TestMoveMessagesPromote(t *testing.T) { - rp := setupRPSamePriority(t) - - // move job3 above job2 - prioChange, secondAnchor, anchorPriority := tasklist.FindAnchor( - "job3", - "job2", - true, - rp.taskList, - rp.groups, - rp.queuePositions, - false, - ) - - assert.Assert(t, !prioChange) - assert.Equal(t, secondAnchor, model.JobID("job1")) - assert.Equal(t, anchorPriority, 50) -} - -func TestMoveMessagesPromoteHead(t *testing.T) { - rp := setupRPSamePriority(t) - - // move job3 ahead of job1, the first job - prioChange, secondAnchor, anchorPriority := tasklist.FindAnchor( - "job3", - "job1", - true, - rp.taskList, - rp.groups, - rp.queuePositions, - false, - ) - - assert.Assert(t, !prioChange) - assert.Equal(t, secondAnchor, sproto.HeadAnchor) - assert.Equal(t, anchorPriority, 50) -} - -func TestMoveMessagesDemote(t *testing.T) { - rp := setupRPSamePriority(t) - - // move job1 behind job2 - prioChange, secondAnchor, anchorPriority := tasklist.FindAnchor( - "job1", - "job2", - false, - rp.taskList, - rp.groups, - rp.queuePositions, - false, - ) - - assert.Assert(t, !prioChange) - assert.Equal(t, secondAnchor, model.JobID("job3")) - assert.Equal(t, anchorPriority, 50) -} - -func TestMoveMessagesDemoteTail(t *testing.T) { - rp := setupRPSamePriority(t) - - // move job1 behind job3, the last job - prioChange, secondAnchor, anchorPriority := tasklist.FindAnchor( - "job1", - "job3", - false, - rp.taskList, - rp.groups, - rp.queuePositions, - false, - ) - - assert.Assert(t, !prioChange) - assert.Equal(t, secondAnchor, sproto.TailAnchor) - assert.Equal(t, anchorPriority, 50) -} - -func TestMoveMessagesAcrossPrioLanes(t *testing.T) { - defaultPriority := 50 - config := config.ResourcePoolConfig{ - Scheduler: &config.SchedulerConfig{ - Priority: &config.PrioritySchedulerConfig{ - DefaultPriority: &defaultPriority, - }, - FittingPolicy: best, - }, - } - - rp := setupResourcePool(t, nil, &config, nil, nil, nil) - - rp.queuePositions = map[model.JobID]decimal.Decimal{ - "job1": decimal.New(100, 1000), - "job2": decimal.New(100, 1000), - "job3": decimal.New(100, 1000), - } - - lowPriority := 60 - highPriority := 40 - - rp.groups = map[model.JobID]*tasklist.Group{ - "job1": {Priority: &highPriority}, - "job2": {Priority: &defaultPriority}, - "job3": {Priority: &lowPriority}, - } - - rp.taskList.AddTask(&sproto.AllocateRequest{ - AllocationID: "allocation1", - JobID: "job1", - }) - rp.taskList.AddTask(&sproto.AllocateRequest{ - AllocationID: "allocation2", - JobID: "job2", - }) - rp.taskList.AddTask(&sproto.AllocateRequest{ - AllocationID: "allocation3", - JobID: "job3", - }) - - // move job2 ahead of job1 - prioChange, secondAnchor, anchorPriority := tasklist.FindAnchor( - "job2", - "job1", - true, - rp.taskList, - rp.groups, - rp.queuePositions, - false, - ) - - assert.Assert(t, prioChange) - assert.Equal(t, secondAnchor, sproto.HeadAnchor) - assert.Equal(t, anchorPriority, 40) -} - -func TestMoveMessagesAcrossPrioLanesBehind(t *testing.T) { - defaultPriority := 50 - config := config.ResourcePoolConfig{ - Scheduler: &config.SchedulerConfig{ - Priority: &config.PrioritySchedulerConfig{ - DefaultPriority: &defaultPriority, - }, - FittingPolicy: best, - }, - } - - rp := setupResourcePool(t, nil, &config, nil, nil, nil) - - rp.queuePositions = map[model.JobID]decimal.Decimal{ - "job1": decimal.New(100, 1000), - "job2": decimal.New(100, 1000), - "job3": decimal.New(100, 1000), - } - - lowPriority := 60 - highPriority := 40 - - rp.groups = map[model.JobID]*tasklist.Group{ - "job1": {Priority: &highPriority}, - "job2": {Priority: &defaultPriority}, - "job3": {Priority: &lowPriority}, - } - - rp.taskList.AddTask(&sproto.AllocateRequest{ - AllocationID: "allocation1", - JobID: "job1", - }) - rp.taskList.AddTask(&sproto.AllocateRequest{ - AllocationID: "allocation2", - JobID: "job2", - }) - rp.taskList.AddTask(&sproto.AllocateRequest{ - AllocationID: "allocation3", - JobID: "job3", - }) - - // move job1 behind job2 - prioChange, secondAnchor, anchorPriority := tasklist.FindAnchor( - "job1", - "job2", - false, - rp.taskList, - rp.groups, - rp.queuePositions, - false, - ) - - assert.Assert(t, prioChange) - assert.Equal(t, secondAnchor, model.JobID("job3")) - assert.Equal(t, anchorPriority, 50) -} diff --git a/master/internal/rm/dispatcherrm/dispatcher_resource_manager.go b/master/internal/rm/dispatcherrm/dispatcher_resource_manager.go index e08d4cf485b..29e3d9230f6 100644 --- a/master/internal/rm/dispatcherrm/dispatcher_resource_manager.go +++ b/master/internal/rm/dispatcherrm/dispatcher_resource_manager.go @@ -513,14 +513,6 @@ func (m *DispatcherResourceManager) getLauncherProvidedPools( return result } -// MoveJob implements rm.ResourceManager. -func (*DispatcherResourceManager) MoveJob(req sproto.MoveJob) error { - // TODO(HAL-2863): We may not be able to support these specific actions, but how we - // let people interact with the job queue in dispatcher/slurm world. - // ctx.Respond(fmt.Errorf("modifying job positions is not yet supported in slurm")) - return rmerrors.UnsupportedError("move job unsupported in the dispatcher RM") -} - // RecoverJobPosition implements rm.ResourceManager. func (m *DispatcherResourceManager) RecoverJobPosition(sproto.RecoverJobPosition) { m.syslog.Warn("move job unsupported in the dispatcher RM") diff --git a/master/internal/rm/kubernetesrm/job.go b/master/internal/rm/kubernetesrm/job.go index fcaa5814385..3426cd23a62 100644 --- a/master/internal/rm/kubernetesrm/job.go +++ b/master/internal/rm/kubernetesrm/job.go @@ -470,11 +470,6 @@ func (j *job) changePriority() { rmevents.Publish(j.allocationID, &sproto.ReleaseResources{Reason: "priority changed"}) } -func (j *job) changePosition() { - j.syslog.Info("interrupting job to change positions") - rmevents.Publish(j.allocationID, &sproto.ReleaseResources{Reason: "queue position changed"}) -} - func (j *job) Kill() { j.mu.Lock() defer j.mu.Unlock() diff --git a/master/internal/rm/kubernetesrm/jobs.go b/master/internal/rm/kubernetesrm/jobs.go index 1c7e3c94c67..691f9baee7d 100644 --- a/master/internal/rm/kubernetesrm/jobs.go +++ b/master/internal/rm/kubernetesrm/jobs.go @@ -598,12 +598,6 @@ func (j *jobsService) ChangePriority(id model.AllocationID) { j.changePriority(id) } -func (j *jobsService) ChangePosition(id model.AllocationID) { - j.mu.Lock() - defer j.mu.Unlock() - j.changePosition(id) -} - func (j *jobsService) KillJob(id model.AllocationID) { j.mu.Lock() defer j.mu.Unlock() @@ -706,7 +700,7 @@ func (j *jobsService) reattachJob(msg reattachJobRequest) (reattachJobResponse, } gatewayResources, err := j.recreateGatewayProxyResources( - msg.allocationID, services, tcpRoutes, gatewayPorts, + services, tcpRoutes, gatewayPorts, ) if err != nil { cleanup() @@ -733,7 +727,6 @@ func (j *jobsService) reattachJob(msg reattachJobRequest) (reattachJobResponse, } func (j *jobsService) recreateGatewayProxyResources( - allocationID model.AllocationID, services []k8sV1.Service, tcpRoutes []alphaGatewayTyped.TCPRoute, gatewayPorts []int, @@ -1512,15 +1505,6 @@ func (j *jobsService) changePriority(id model.AllocationID) { ref.changePriority() } -func (j *jobsService) changePosition(id model.AllocationID) { - ref, err := j.verifyJobAndGetRef(id) - if err != nil { - j.syslog.WithError(err).Debug("changing allocation position") - return - } - ref.changePosition() -} - func (j *jobsService) killJob(id model.AllocationID) { ref, err := j.verifyJobAndGetRef(id) if err != nil { diff --git a/master/internal/rm/kubernetesrm/kubernetes_resource_manager.go b/master/internal/rm/kubernetesrm/kubernetes_resource_manager.go index 5ec3d21e418..dddf6053e21 100644 --- a/master/internal/rm/kubernetesrm/kubernetes_resource_manager.go +++ b/master/internal/rm/kubernetesrm/kubernetes_resource_manager.go @@ -302,15 +302,6 @@ func (k *ResourceManager) GetSlots(msg *apiv1.GetSlotsRequest) (*apiv1.GetSlotsR return k.jobsService.GetSlots(msg), nil } -// MoveJob implements rm.ResourceManager. -func (k *ResourceManager) MoveJob(msg sproto.MoveJob) error { - rp, err := k.poolByName(msg.ResourcePool) - if err != nil { - return fmt.Errorf("move job found no resource pool with name %s: %w", msg.ResourcePool, err) - } - return rp.MoveJob(msg) -} - // RecoverJobPosition implements rm.ResourceManager. func (k *ResourceManager) RecoverJobPosition(msg sproto.RecoverJobPosition) { rp, err := k.poolByName(msg.ResourcePool) diff --git a/master/internal/rm/kubernetesrm/resource_pool.go b/master/internal/rm/kubernetesrm/resource_pool.go index f825e60bbea..a9b0d3dbc3e 100644 --- a/master/internal/rm/kubernetesrm/resource_pool.go +++ b/master/internal/rm/kubernetesrm/resource_pool.go @@ -1,7 +1,6 @@ package kubernetesrm import ( - "context" "fmt" "sync" @@ -188,14 +187,6 @@ func (k *kubernetesResourcePool) SetGroupPriority(msg sproto.SetGroupPriority) e return nil } -func (k *kubernetesResourcePool) MoveJob(msg sproto.MoveJob) error { - k.mu.Lock() - defer k.mu.Unlock() - k.tryAdmitPendingTasks = true - - return k.moveJob(msg.ID, msg.Anchor, msg.Ahead) -} - func (k *kubernetesResourcePool) DeleteJob(msg sproto.DeleteJob) sproto.DeleteJobResponse { k.mu.Lock() defer k.mu.Unlock() @@ -337,89 +328,6 @@ func (k *kubernetesResourcePool) addTask(msg sproto.AllocateRequest) { k.reqList.AddTask(&msg) } -func (k *kubernetesResourcePool) moveJob( - jobID model.JobID, - anchorID model.JobID, - aheadOf bool, -) error { - for it := k.reqList.Iterator(); it.Next(); { - if it.Value().JobID == jobID { - if req := it.Value(); !req.Preemptible { - return fmt.Errorf( - "move job for %s unsupported in k8s because it may be destructive", - req.Name, - ) - } - } - } - - if anchorID == "" || jobID == "" || anchorID == jobID { - return nil - } - - if _, ok := k.queuePositions[jobID]; !ok { - return nil - } - - if _, ok := k.groups[jobID]; !ok { - return sproto.ErrJobNotFound(jobID) - } - if _, ok := k.queuePositions[anchorID]; !ok { - return sproto.ErrJobNotFound(anchorID) - } - - prioChange, secondAnchor, anchorPriority := tasklist.FindAnchor( - jobID, - anchorID, - aheadOf, - k.reqList, - k.groups, - k.queuePositions, - true, - ) - - if secondAnchor == "" { - return fmt.Errorf("unable to move job with ID %s", jobID) - } - - if secondAnchor == jobID { - return nil - } - - if prioChange { - g := k.getOrCreateGroup(jobID) - oldPriority := g.Priority - g.Priority = &anchorPriority - - priorityChanger, ok := tasklist.GroupPriorityChangeRegistry.Load(jobID) - if !ok { - return fmt.Errorf("unable to move job with ID %s", jobID) - } - if priorityChanger != nil { - if err := priorityChanger(anchorPriority); err != nil { - g.Priority = oldPriority - return err - } - } - } - - jobPosition, err := k.queuePositions.SetJobPosition(jobID, anchorID, secondAnchor, aheadOf, true) - if err != nil { - return err - } - if err := db.UpdateJobPosition(context.TODO(), jobID, jobPosition); err != nil { - return err - } - - allocationID, ok := k.jobIDToAllocationID[jobID] - if !ok { - return fmt.Errorf("job with ID %s has no valid task address", jobID) - } - - k.jobsService.ChangePosition(allocationID) - return nil -} - func (k *kubernetesResourcePool) correctJobQInfo( reqs []*sproto.AllocateRequest, q map[model.JobID]*sproto.RMJobInfo, diff --git a/master/internal/rm/multirm/multirm.go b/master/internal/rm/multirm/multirm.go index a93b4f5cd3c..ec29bf35a26 100644 --- a/master/internal/rm/multirm/multirm.go +++ b/master/internal/rm/multirm/multirm.go @@ -248,16 +248,6 @@ func (m *MultiRMRouter) GetJobQueueStatsRequest(req *apiv1.GetJobQueueStatsReque return all, nil } -// MoveJob routes a MoveJob call to a specified resource manager/pool. -func (m *MultiRMRouter) MoveJob(req sproto.MoveJob) error { - resolvedRMName, err := m.getRMName(rm.ResourcePoolName(req.ResourcePool)) - if err != nil { - return err - } - - return m.rms[resolvedRMName].MoveJob(req) -} - // RecoverJobPosition routes a RecoverJobPosition call to a specified resource manager/pool. func (m *MultiRMRouter) RecoverJobPosition(req sproto.RecoverJobPosition) { resolvedRMName, err := m.getRMName(rm.ResourcePoolName(req.ResourcePool)) diff --git a/master/internal/rm/multirm/multirm_intg_test.go b/master/internal/rm/multirm/multirm_intg_test.go index f27864b0ab4..2722f6d0a2c 100644 --- a/master/internal/rm/multirm/multirm_intg_test.go +++ b/master/internal/rm/multirm/multirm_intg_test.go @@ -391,25 +391,6 @@ func TestGetJobQueueStatsRequest(t *testing.T) { } } -func TestMoveJob(t *testing.T) { - cases := []struct { - name string - req sproto.MoveJob - err error - }{ - {"empty RP name will default", sproto.MoveJob{ResourcePool: ""}, nil}, - {"defined RP in default", sproto.MoveJob{ResourcePool: defaultRMName}, nil}, - {"defined RP in additional RM", sproto.MoveJob{ResourcePool: additionalRMName}, nil}, - {"undefined RP", sproto.MoveJob{ResourcePool: "bogus"}, ErrRPNotDefined("bogus")}, - } - for _, tt := range cases { - t.Run(tt.name, func(t *testing.T) { - err := testMultiRM.MoveJob(tt.req) - require.Equal(t, tt.err, err) - }) - } -} - func TestGetExternalJobs(t *testing.T) { cases := []struct { name string @@ -708,7 +689,6 @@ func mockRM(poolName rm.ResourcePoolName) *mocks.ResourceManager { mockRM.On("GetJobQueueStatsRequest", mock.Anything).Return(&apiv1.GetJobQueueStatsResponse{ Results: []*apiv1.RPQueueStat{{ResourcePool: poolName.String()}}, }, nil) - mockRM.On("MoveJob", mock.Anything).Return(nil) mockRM.On("GetExternalJobs", mock.Anything).Return([]*jobv1.Job{}, nil) mockRM.On("GetAgent", mock.Anything).Return(&apiv1.GetAgentResponse{}, nil) mockRM.On("EnableAgent", mock.Anything).Return(&apiv1.EnableAgentResponse{}, nil) diff --git a/master/internal/rm/resource_manager_iface.go b/master/internal/rm/resource_manager_iface.go index a30f9aaad84..f2aeb72cce5 100644 --- a/master/internal/rm/resource_manager_iface.go +++ b/master/internal/rm/resource_manager_iface.go @@ -40,7 +40,6 @@ type ResourceManager interface { // Job queue GetJobQ(ResourcePoolName) (map[model.JobID]*sproto.RMJobInfo, error) GetJobQueueStatsRequest(*apiv1.GetJobQueueStatsRequest) (*apiv1.GetJobQueueStatsResponse, error) - MoveJob(sproto.MoveJob) error RecoverJobPosition(sproto.RecoverJobPosition) GetExternalJobs(ResourcePoolName) ([]*jobv1.Job, error) diff --git a/master/internal/rm/tasklist/job.go b/master/internal/rm/tasklist/job.go index de50bc2890f..7d51996b89c 100644 --- a/master/internal/rm/tasklist/job.go +++ b/master/internal/rm/tasklist/job.go @@ -13,8 +13,6 @@ import ( "github.com/determined-ai/determined/proto/pkg/jobv1" ) -var invalidJobQPos = decimal.NewFromInt(0) - // ReduceToJobQInfo takes a list of AllocateRequest and reduces it to a summary of the Job Queue. func ReduceToJobQInfo(reqs AllocReqs) map[model.JobID]*sproto.RMJobInfo { isAdded := make(map[model.JobID]*sproto.RMJobInfo) @@ -93,37 +91,6 @@ func AssignmentIsScheduled(allocatedResources *sproto.ResourcesAllocated) bool { // JobSortState models a job queue, and the positions of all jobs within it. type JobSortState map[model.JobID]decimal.Decimal -// SetJobPosition sets the job position in the queue, relative to the anchors. -func (j JobSortState) SetJobPosition( - jobID model.JobID, - anchor1 model.JobID, - anchor2 model.JobID, - aheadOf bool, - isK8s bool, -) (decimal.Decimal, error) { - newPos, err := computeNewJobPos(jobID, anchor1, anchor2, j) - if err != nil { - return decimal.Decimal{}, err - } - // if the calculated position results in the wrong order - // we subtract a minimal decimal amount instead. - minDecimal := decimal.New(1, sproto.DecimalExp) - - if isK8s { - minDecimal = decimal.New(1, sproto.K8sExp) - } - if aheadOf && newPos.GreaterThanOrEqual(j[anchor1]) { - newPos = j[anchor1].Sub(minDecimal) - } else if !aheadOf && newPos.LessThanOrEqual(j[anchor1]) { - newPos = j[anchor1].Add(minDecimal) - } - - j[sproto.TailAnchor] = InitializeQueuePosition(time.Now(), isK8s) - j[jobID] = newPos - - return newPos, nil -} - // RecoverJobPosition explicitly sets the position of a job. func (j JobSortState) RecoverJobPosition(jobID model.JobID, position decimal.Decimal) { j[jobID] = position @@ -141,48 +108,6 @@ func InitializeJobSortState(isK8s bool) JobSortState { return state } -func computeNewJobPos( - jobID model.JobID, - anchor1 model.JobID, - anchor2 model.JobID, - qPositions JobSortState, -) (decimal.Decimal, error) { - if anchor1 == jobID || anchor2 == jobID { - return invalidJobQPos, fmt.Errorf("cannot move job relative to itself") - } - - qPos1, ok := qPositions[anchor1] - if !ok { - return invalidJobQPos, fmt.Errorf("could not find anchor job %s", anchor1) - } - - qPos2, ok := qPositions[anchor2] - if !ok { - return invalidJobQPos, fmt.Errorf("could not find anchor job %s", anchor2) - } - - qPos3, ok := qPositions[jobID] - if !ok { - return invalidJobQPos, fmt.Errorf("could not find job %s", jobID) - } - - // check if qPos3 is between qPos1 and qPos2 - smallPos := decimal.Min(qPos1, qPos2) - bigPos := decimal.Max(qPos1, qPos2) - if qPos3.GreaterThan(smallPos) && qPos3.LessThan(bigPos) { - return qPos3, nil // no op. Job is already in the correct position. - } - - newPos := decimal.Avg(qPos1, qPos2) - - if newPos.Equal(qPos1) || newPos.Equal(qPos2) { - return invalidJobQPos, fmt.Errorf("unable to compute a new job position for job %s", - jobID) - } - - return newPos, nil -} - // InitializeQueuePosition constructs a new queue position from time and RM type. func InitializeQueuePosition(aTime time.Time, isK8s bool) decimal.Decimal { // we could add exponent to give us more insertions if needed. @@ -204,76 +129,6 @@ func GetJobSubmissionTime(taskList *TaskList, jobID model.JobID) (time.Time, err return time.Time{}, fmt.Errorf("could not find an active job with id %s", jobID) } -// FindAnchor finds a second anchor and its priority and determines if the moving job needs a -// priority change to move ahead or behind the anchor. -func FindAnchor( - jobID model.JobID, - anchorID model.JobID, - aheadOf bool, - taskList *TaskList, - groups map[model.JobID]*Group, - queuePositions JobSortState, - k8s bool, -) (bool, model.JobID, int) { - var secondAnchor model.JobID - targetPriority := 0 - anchorPriority := 0 - anchorIdx := 0 - prioChange := false - - sortedReqs := SortTasksWithPosition(taskList, groups, queuePositions, k8s) - - for i, req := range sortedReqs { - if req.JobID == jobID { - targetPriority = *groups[req.JobID].Priority - } else if req.JobID == anchorID { - anchorPriority = *groups[req.JobID].Priority - anchorIdx = i - } - } - - if aheadOf { - if anchorIdx == 0 { - secondAnchor = sproto.HeadAnchor - } else { - secondAnchor = sortedReqs[anchorIdx-1].JobID - } - } else { - if anchorIdx >= len(sortedReqs)-1 { - secondAnchor = sproto.TailAnchor - } else { - secondAnchor = sortedReqs[anchorIdx+1].JobID - } - } - - if targetPriority != anchorPriority { - prioChange = true - } - - return prioChange, secondAnchor, anchorPriority -} - -// NeedMove returns true if the jobPos indicates a job needs a move to be ahead of or behind -// the anchorPos. -func NeedMove( - jobPos decimal.Decimal, - anchorPos decimal.Decimal, - secondPos decimal.Decimal, - aheadOf bool, -) bool { - if aheadOf { - if jobPos.LessThan(anchorPos) && jobPos.GreaterThan(secondPos) { - return false - } - return true - } - if jobPos.GreaterThan(anchorPos) && jobPos.LessThan(secondPos) { - return false - } - - return true -} - // SortTasksWithPosition returns a sorted view of the sproto.AllocateRequest's that make up // the TaskList, sorted in priority order. func SortTasksWithPosition( diff --git a/master/internal/sproto/jobs.go b/master/internal/sproto/jobs.go index 7f5ee39404a..311c3b7f150 100644 --- a/master/internal/sproto/jobs.go +++ b/master/internal/sproto/jobs.go @@ -70,13 +70,6 @@ type ( ResourcePool string JobID model.JobID } - // MoveJob requests the job to be moved within a priority queue relative to another job. - MoveJob struct { - ID model.JobID - Anchor model.JobID - Ahead bool - ResourcePool string - } ) // RecoverJobPosition gets sent from the experiment or command actor to the resource pool. diff --git a/proto/pkg/jobv1/job.pb.go b/proto/pkg/jobv1/job.pb.go index 2df938f9908..c5adb231f03 100644 --- a/proto/pkg/jobv1/job.pb.go +++ b/proto/pkg/jobv1/job.pb.go @@ -689,6 +689,7 @@ func (m *QueueControl) GetAction() isQueueControl_Action { return nil } +// Deprecated: Do not use. func (x *QueueControl) GetAheadOf() string { if x, ok := x.GetAction().(*QueueControl_AheadOf); ok { return x.AheadOf @@ -696,6 +697,7 @@ func (x *QueueControl) GetAheadOf() string { return "" } +// Deprecated: Do not use. func (x *QueueControl) GetBehindOf() string { if x, ok := x.GetAction().(*QueueControl_BehindOf); ok { return x.BehindOf @@ -729,12 +731,16 @@ type isQueueControl_Action interface { } type QueueControl_AheadOf struct { - // The desired job position in the queue in terms of another job. + // Deprecated; do not use. + // + // Deprecated: Do not use. AheadOf string `protobuf:"bytes,2,opt,name=ahead_of,json=aheadOf,proto3,oneof"` } type QueueControl_BehindOf struct { - // The desired job position in the queue in terms of another job. + // Deprecated; do not use. + // + // Deprecated: Do not use. BehindOf string `protobuf:"bytes,4,opt,name=behind_of,json=behindOf,proto3,oneof"` } @@ -983,59 +989,60 @@ var file_determined_job_v1_job_proto_rawDesc = []byte{ 0x74, 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x64, 0x65, 0x74, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x65, 0x64, 0x2e, 0x6a, 0x6f, 0x62, 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x65, 0x64, 0x4a, 0x6f, 0x62, 0x48, 0x00, 0x52, 0x07, 0x6c, 0x69, 0x6d, 0x69, - 0x74, 0x65, 0x64, 0x42, 0x05, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x22, 0xe3, 0x01, 0x0a, 0x0c, 0x51, + 0x74, 0x65, 0x64, 0x42, 0x05, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x22, 0xeb, 0x01, 0x0a, 0x0c, 0x51, 0x75, 0x65, 0x75, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x12, 0x15, 0x0a, 0x06, 0x6a, 0x6f, 0x62, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6a, 0x6f, 0x62, - 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x08, 0x61, 0x68, 0x65, 0x61, 0x64, 0x5f, 0x6f, 0x66, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x07, 0x61, 0x68, 0x65, 0x61, 0x64, 0x4f, 0x66, 0x12, - 0x1d, 0x0a, 0x09, 0x62, 0x65, 0x68, 0x69, 0x6e, 0x64, 0x5f, 0x6f, 0x66, 0x18, 0x04, 0x20, 0x01, - 0x28, 0x09, 0x48, 0x00, 0x52, 0x08, 0x62, 0x65, 0x68, 0x69, 0x6e, 0x64, 0x4f, 0x66, 0x12, 0x25, - 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x70, 0x6f, 0x6f, 0x6c, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, - 0x65, 0x50, 0x6f, 0x6f, 0x6c, 0x12, 0x1c, 0x0a, 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, - 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x48, 0x00, 0x52, 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, - 0x69, 0x74, 0x79, 0x12, 0x18, 0x0a, 0x06, 0x77, 0x65, 0x69, 0x67, 0x68, 0x74, 0x18, 0x06, 0x20, - 0x01, 0x28, 0x02, 0x48, 0x00, 0x52, 0x06, 0x77, 0x65, 0x69, 0x67, 0x68, 0x74, 0x3a, 0x17, 0x92, - 0x41, 0x14, 0x0a, 0x12, 0xd2, 0x01, 0x06, 0x6a, 0x6f, 0x62, 0x5f, 0x69, 0x64, 0xd2, 0x01, 0x06, - 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x42, 0x08, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, - 0x22, 0x80, 0x01, 0x0a, 0x0a, 0x51, 0x75, 0x65, 0x75, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, - 0x21, 0x0a, 0x0c, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x43, 0x6f, 0x75, - 0x6e, 0x74, 0x12, 0x27, 0x0a, 0x0f, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x64, 0x5f, - 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0e, 0x73, 0x63, 0x68, - 0x65, 0x64, 0x75, 0x6c, 0x65, 0x64, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x3a, 0x26, 0x92, 0x41, 0x23, - 0x0a, 0x21, 0xd2, 0x01, 0x0c, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x5f, 0x63, 0x6f, 0x75, 0x6e, - 0x74, 0xd2, 0x01, 0x0f, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x64, 0x5f, 0x63, 0x6f, - 0x75, 0x6e, 0x74, 0x22, 0x72, 0x0a, 0x13, 0x41, 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x65, - 0x51, 0x75, 0x65, 0x75, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x65, - 0x72, 0x69, 0x6f, 0x64, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x0b, 0x70, 0x65, 0x72, 0x69, 0x6f, 0x64, 0x53, 0x74, 0x61, 0x72, 0x74, 0x12, 0x18, 0x0a, - 0x07, 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x02, 0x52, 0x07, - 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x3a, 0x1e, 0x92, 0x41, 0x1b, 0x0a, 0x19, 0xd2, 0x01, - 0x0c, 0x70, 0x65, 0x72, 0x69, 0x6f, 0x64, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, 0xd2, 0x01, 0x07, - 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x2a, 0xb9, 0x01, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, - 0x12, 0x14, 0x0a, 0x10, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, - 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x13, 0x0a, 0x0f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x45, - 0x58, 0x50, 0x45, 0x52, 0x49, 0x4d, 0x45, 0x4e, 0x54, 0x10, 0x01, 0x12, 0x11, 0x0a, 0x0d, 0x54, - 0x59, 0x50, 0x45, 0x5f, 0x4e, 0x4f, 0x54, 0x45, 0x42, 0x4f, 0x4f, 0x4b, 0x10, 0x02, 0x12, 0x14, - 0x0a, 0x10, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x54, 0x45, 0x4e, 0x53, 0x4f, 0x52, 0x42, 0x4f, 0x41, - 0x52, 0x44, 0x10, 0x03, 0x12, 0x0e, 0x0a, 0x0a, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x48, 0x45, - 0x4c, 0x4c, 0x10, 0x04, 0x12, 0x10, 0x0a, 0x0c, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x43, 0x4f, 0x4d, - 0x4d, 0x41, 0x4e, 0x44, 0x10, 0x05, 0x12, 0x16, 0x0a, 0x12, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x43, - 0x48, 0x45, 0x43, 0x4b, 0x50, 0x4f, 0x49, 0x4e, 0x54, 0x5f, 0x47, 0x43, 0x10, 0x06, 0x12, 0x11, - 0x0a, 0x0d, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x45, 0x58, 0x54, 0x45, 0x52, 0x4e, 0x41, 0x4c, 0x10, - 0x07, 0x12, 0x10, 0x0a, 0x0c, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x47, 0x45, 0x4e, 0x45, 0x52, 0x49, - 0x43, 0x10, 0x08, 0x2a, 0x65, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x15, 0x0a, 0x11, - 0x53, 0x54, 0x41, 0x54, 0x45, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, - 0x44, 0x10, 0x00, 0x12, 0x10, 0x0a, 0x0c, 0x53, 0x54, 0x41, 0x54, 0x45, 0x5f, 0x51, 0x55, 0x45, - 0x55, 0x45, 0x44, 0x10, 0x01, 0x12, 0x13, 0x0a, 0x0f, 0x53, 0x54, 0x41, 0x54, 0x45, 0x5f, 0x53, - 0x43, 0x48, 0x45, 0x44, 0x55, 0x4c, 0x45, 0x44, 0x10, 0x02, 0x12, 0x1e, 0x0a, 0x1a, 0x53, 0x54, - 0x41, 0x54, 0x45, 0x5f, 0x53, 0x43, 0x48, 0x45, 0x44, 0x55, 0x4c, 0x45, 0x44, 0x5f, 0x42, 0x41, - 0x43, 0x4b, 0x46, 0x49, 0x4c, 0x4c, 0x45, 0x44, 0x10, 0x03, 0x42, 0x35, 0x5a, 0x33, 0x67, 0x69, - 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x64, 0x65, 0x74, 0x65, 0x72, 0x6d, 0x69, - 0x6e, 0x65, 0x64, 0x2d, 0x61, 0x69, 0x2f, 0x64, 0x65, 0x74, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x65, - 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x6a, 0x6f, 0x62, 0x76, - 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x49, 0x64, 0x12, 0x1f, 0x0a, 0x08, 0x61, 0x68, 0x65, 0x61, 0x64, 0x5f, 0x6f, 0x66, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x42, 0x02, 0x18, 0x01, 0x48, 0x00, 0x52, 0x07, 0x61, 0x68, 0x65, 0x61, + 0x64, 0x4f, 0x66, 0x12, 0x21, 0x0a, 0x09, 0x62, 0x65, 0x68, 0x69, 0x6e, 0x64, 0x5f, 0x6f, 0x66, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x42, 0x02, 0x18, 0x01, 0x48, 0x00, 0x52, 0x08, 0x62, 0x65, + 0x68, 0x69, 0x6e, 0x64, 0x4f, 0x66, 0x12, 0x25, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x5f, 0x70, 0x6f, 0x6f, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, + 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x50, 0x6f, 0x6f, 0x6c, 0x12, 0x1c, 0x0a, + 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x48, + 0x00, 0x52, 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x12, 0x18, 0x0a, 0x06, 0x77, + 0x65, 0x69, 0x67, 0x68, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x02, 0x48, 0x00, 0x52, 0x06, 0x77, + 0x65, 0x69, 0x67, 0x68, 0x74, 0x3a, 0x17, 0x92, 0x41, 0x14, 0x0a, 0x12, 0xd2, 0x01, 0x06, 0x6a, + 0x6f, 0x62, 0x5f, 0x69, 0x64, 0xd2, 0x01, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x42, 0x08, + 0x0a, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x80, 0x01, 0x0a, 0x0a, 0x51, 0x75, 0x65, + 0x75, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x21, 0x0a, 0x0c, 0x71, 0x75, 0x65, 0x75, 0x65, + 0x64, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x71, + 0x75, 0x65, 0x75, 0x65, 0x64, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x27, 0x0a, 0x0f, 0x73, 0x63, + 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x64, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x05, 0x52, 0x0e, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x64, 0x43, 0x6f, + 0x75, 0x6e, 0x74, 0x3a, 0x26, 0x92, 0x41, 0x23, 0x0a, 0x21, 0xd2, 0x01, 0x0c, 0x71, 0x75, 0x65, + 0x75, 0x65, 0x64, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0xd2, 0x01, 0x0f, 0x73, 0x63, 0x68, 0x65, + 0x64, 0x75, 0x6c, 0x65, 0x64, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x72, 0x0a, 0x13, 0x41, + 0x67, 0x67, 0x72, 0x65, 0x67, 0x61, 0x74, 0x65, 0x51, 0x75, 0x65, 0x75, 0x65, 0x53, 0x74, 0x61, + 0x74, 0x73, 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x65, 0x72, 0x69, 0x6f, 0x64, 0x5f, 0x73, 0x74, 0x61, + 0x72, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x70, 0x65, 0x72, 0x69, 0x6f, 0x64, + 0x53, 0x74, 0x61, 0x72, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x02, 0x52, 0x07, 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x3a, + 0x1e, 0x92, 0x41, 0x1b, 0x0a, 0x19, 0xd2, 0x01, 0x0c, 0x70, 0x65, 0x72, 0x69, 0x6f, 0x64, 0x5f, + 0x73, 0x74, 0x61, 0x72, 0x74, 0xd2, 0x01, 0x07, 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x2a, + 0xb9, 0x01, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x14, 0x0a, 0x10, 0x54, 0x59, 0x50, 0x45, + 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x13, + 0x0a, 0x0f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x45, 0x58, 0x50, 0x45, 0x52, 0x49, 0x4d, 0x45, 0x4e, + 0x54, 0x10, 0x01, 0x12, 0x11, 0x0a, 0x0d, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4e, 0x4f, 0x54, 0x45, + 0x42, 0x4f, 0x4f, 0x4b, 0x10, 0x02, 0x12, 0x14, 0x0a, 0x10, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x54, + 0x45, 0x4e, 0x53, 0x4f, 0x52, 0x42, 0x4f, 0x41, 0x52, 0x44, 0x10, 0x03, 0x12, 0x0e, 0x0a, 0x0a, + 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x48, 0x45, 0x4c, 0x4c, 0x10, 0x04, 0x12, 0x10, 0x0a, 0x0c, + 0x54, 0x59, 0x50, 0x45, 0x5f, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x10, 0x05, 0x12, 0x16, + 0x0a, 0x12, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x43, 0x48, 0x45, 0x43, 0x4b, 0x50, 0x4f, 0x49, 0x4e, + 0x54, 0x5f, 0x47, 0x43, 0x10, 0x06, 0x12, 0x11, 0x0a, 0x0d, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x45, + 0x58, 0x54, 0x45, 0x52, 0x4e, 0x41, 0x4c, 0x10, 0x07, 0x12, 0x10, 0x0a, 0x0c, 0x54, 0x59, 0x50, + 0x45, 0x5f, 0x47, 0x45, 0x4e, 0x45, 0x52, 0x49, 0x43, 0x10, 0x08, 0x2a, 0x65, 0x0a, 0x05, 0x53, + 0x74, 0x61, 0x74, 0x65, 0x12, 0x15, 0x0a, 0x11, 0x53, 0x54, 0x41, 0x54, 0x45, 0x5f, 0x55, 0x4e, + 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x10, 0x0a, 0x0c, 0x53, + 0x54, 0x41, 0x54, 0x45, 0x5f, 0x51, 0x55, 0x45, 0x55, 0x45, 0x44, 0x10, 0x01, 0x12, 0x13, 0x0a, + 0x0f, 0x53, 0x54, 0x41, 0x54, 0x45, 0x5f, 0x53, 0x43, 0x48, 0x45, 0x44, 0x55, 0x4c, 0x45, 0x44, + 0x10, 0x02, 0x12, 0x1e, 0x0a, 0x1a, 0x53, 0x54, 0x41, 0x54, 0x45, 0x5f, 0x53, 0x43, 0x48, 0x45, + 0x44, 0x55, 0x4c, 0x45, 0x44, 0x5f, 0x42, 0x41, 0x43, 0x4b, 0x46, 0x49, 0x4c, 0x4c, 0x45, 0x44, + 0x10, 0x03, 0x42, 0x35, 0x5a, 0x33, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x64, 0x65, 0x74, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x65, 0x64, 0x2d, 0x61, 0x69, 0x2f, 0x64, + 0x65, 0x74, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x65, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, + 0x70, 0x6b, 0x67, 0x2f, 0x6a, 0x6f, 0x62, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, } var ( diff --git a/proto/src/determined/job/v1/job.proto b/proto/src/determined/job/v1/job.proto index 5f038937d66..b45ebe70396 100644 --- a/proto/src/determined/job/v1/job.proto +++ b/proto/src/determined/job/v1/job.proto @@ -165,10 +165,10 @@ message QueueControl { string job_id = 1; // The action to perform. oneof action { - // The desired job position in the queue in terms of another job. - string ahead_of = 2; - // The desired job position in the queue in terms of another job. - string behind_of = 4; + // Deprecated; do not use. + string ahead_of = 2 [deprecated = true]; + // Deprecated; do not use. + string behind_of = 4 [deprecated = true]; // Name of the target resource_pool to move the job to. string resource_pool = 3; // The desired job priority in priority scheduler. diff --git a/webui/react/src/pages/JobQueue/JobQueue.tsx b/webui/react/src/pages/JobQueue/JobQueue.tsx index b7ae754a931..5c86aec5c10 100644 --- a/webui/react/src/pages/JobQueue/JobQueue.tsx +++ b/webui/react/src/pages/JobQueue/JobQueue.tsx @@ -26,13 +26,7 @@ import * as Api from 'services/api-ts-sdk'; import userStore from 'stores/users'; import { DetailedUser, FullJob, Job, JobAction, JobState, JobType, ResourcePool } from 'types'; import handleError, { ErrorLevel, ErrorType } from 'utils/error'; -import { - canManageJob, - jobTypeToCommandType, - moveJobToTop, - orderedSchedulers, - unsupportedQPosSchedulers, -} from 'utils/job'; +import { canManageJob, jobTypeToCommandType, orderedSchedulers } from 'utils/job'; import { useObservable } from 'utils/observable'; import { routeToReactUrl } from 'utils/routes'; import { numericSorter } from 'utils/sort'; @@ -119,14 +113,6 @@ const JobQueue: React.FC = ({ rpStats, selectedRp, jobState }) => { usePolling(fetchJobsTable, { rerunOnNewFn: true }); - const rpTotalJobCount = useCallback( - (rpName: string) => { - const stats = rpStats.find((rp) => rp.resourcePool === rpName)?.stats; - return stats ? stats.queuedCount + stats.scheduledCount : 0; - }, - [rpStats], - ); - const dropDownOnTrigger = useCallback( (job: Job) => { if (!('entityId' in job)) return {}; @@ -142,17 +128,6 @@ const JobQueue: React.FC = ({ rpStats, selectedRp, jobState }) => { }; } - if ( - selectedRp && - isJobOrderAvailable && - !!topJob && - job.summary.jobsAhead > 0 && - canManageJob(job, selectedRp) && - !unsupportedQPosSchedulers.has(selectedRp.schedulerType) - ) { - triggers[JobAction.MoveToTop] = () => moveJobToTop(topJob, job); - } - // if job is an experiment type add action to kill it if (job.type === JobType.EXPERIMENT) { triggers[JobAction.Cancel] = async () => { @@ -178,7 +153,7 @@ const JobQueue: React.FC = ({ rpStats, selectedRp, jobState }) => { }); return triggers; }, - [selectedRp, isJobOrderAvailable, topJob, fetchJobsTable], + [selectedRp, fetchJobsTable], ); const onModalClose = useCallback(() => { @@ -223,7 +198,6 @@ const JobQueue: React.FC = ({ rpStats, selectedRp, jobState }) => { actionOrder={[ JobAction.ManageJob, - JobAction.MoveToTop, JobAction.ViewLog, JobAction.Cancel, JobAction.Kill, @@ -231,7 +205,6 @@ const JobQueue: React.FC = ({ rpStats, selectedRp, jobState }) => { confirmations={{ [JobAction.Cancel]: { cancelText: 'Abort', onError: handleError }, [JobAction.Kill]: { danger: true, onError: handleError }, - [JobAction.MoveToTop]: { onError: handleError }, }} id={record.name} kind="job" @@ -387,7 +360,6 @@ const JobQueue: React.FC = ({ rpStats, selectedRp, jobState }) => { void; rpStats: api.V1RPQueueStat[]; schedulerType: api.V1SchedulerType; @@ -41,12 +39,8 @@ interface FormValues { weight?: string; } -const formValuesToUpdate = async ( - values: FormValues, - job: Job, -): Promise => { - const { position, resourcePool } = { - position: parseInt(values.position, 10), +const formValuesToUpdate = (values: FormValues, job: Job): api.V1QueueControl | undefined => { + const { resourcePool } = { resourcePool: values.resourcePool, }; const update: api.V1QueueControl = { jobId: job.jobId }; @@ -54,10 +48,6 @@ const formValuesToUpdate = async ( if (resourcePool !== job.resourcePool) { return { ...update, resourcePool }; } - if (position !== job.summary.jobsAhead + 1) { - const allJobs = await getJobQ({ resourcePool }, {}); - return moveJobToPositionUpdate(allJobs.jobs, job.jobId, position); - } if (values.priority !== undefined) { const priority = parseInt(values.priority, 10); if (priority !== job.priority) { @@ -79,7 +69,6 @@ const ManageJobModalComponent: React.FC = ({ job, schedulerType, initialPool, - jobCount, }) => { const [form] = Form.useForm(); const isOrderedQ = orderedSchedulers.has(schedulerType); @@ -214,12 +203,6 @@ const ManageJobModalComponent: React.FC = ({ name="priority"> -