Skip to content

Commit

Permalink
fix: job queue panic for multirm [RM-123] (#9079)
Browse files Browse the repository at this point in the history
  • Loading branch information
carolinaecalderon authored Apr 3, 2024
1 parent f78b9aa commit 3688c3f
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 31 deletions.
12 changes: 9 additions & 3 deletions master/internal/rm/multirm/multirm.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,18 @@ func (m *MultiRMRouter) GetJobQ(rpName rm.ResourcePoolName) (map[model.JobID]*sp
func (m *MultiRMRouter) GetJobQueueStatsRequest(req *apiv1.GetJobQueueStatsRequest) (
*apiv1.GetJobQueueStatsResponse, error,
) {
resolvedRMName, err := m.getRM(rm.ResourcePoolName(req.ResourcePools[0]))
res, err := fanOutRMCall(m, func(rm rm.ResourceManager) (*apiv1.GetJobQueueStatsResponse, error) {
return rm.GetJobQueueStatsRequest(req)
})
if err != nil {
return nil, err
}

return m.rms[resolvedRMName].GetJobQueueStatsRequest(req)
all := &apiv1.GetJobQueueStatsResponse{}
for _, r := range res {
all.Results = append(all.Results, r.Results...)
}
return all, nil
}

// MoveJob routes a MoveJob call to a specified resource manager/pool.
Expand Down Expand Up @@ -389,7 +395,7 @@ func (m *MultiRMRouter) getRM(rpName rm.ResourcePoolName) (string, error) {
}
for _, p := range rps.ResourcePools {
if p.Name == rpName.String() {
m.syslog.Debugf("RM defined as %s, %s", name, p.Name)
m.syslog.Infof("RM defined as %s, %s", name, p.Name)
return name, nil
}
}
Expand Down
75 changes: 47 additions & 28 deletions master/internal/rm/multirm/multirm_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package multirm

import (
"fmt"
"os"
"testing"

"github.com/google/uuid"
Expand All @@ -16,13 +17,14 @@ import (
"github.com/determined-ai/determined/master/pkg/model"
"github.com/determined-ai/determined/proto/pkg/agentv1"
"github.com/determined-ai/determined/proto/pkg/apiv1"
"github.com/determined-ai/determined/proto/pkg/jobv1"
"github.com/determined-ai/determined/proto/pkg/resourcepoolv1"
)

const (
additionalRMName = "additional"
defaultRMName = "default"
rp = "resource-pool"
emptyRPName = rm.ResourcePoolName("")
)

var testMultiRM *MultiRMRouter
Expand All @@ -36,6 +38,8 @@ func TestMain(m *testing.M) {
},
syslog: logrus.WithField("component", "resource-router"),
}

os.Exit(m.Run())
}

func TestGetAllocationSummaries(t *testing.T) {
Expand Down Expand Up @@ -264,10 +268,10 @@ func TestGetResourcePools(t *testing.T) {
func TestGetDefaultResourcePools(t *testing.T) {
cases := []struct {
name string
res string
res rm.ResourcePoolName
err error
}{
{"route to default pool", "default", nil},
{"route to default pool", rm.ResourcePoolName("default"), nil},
}
for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -307,7 +311,7 @@ func TestResolveResourcePool(t *testing.T) {
rpName rm.ResourcePoolName
err error
}{
{"empty RP name will default", "", nil},
{"empty RP name will default", emptyRPName, nil},
{"defined RP in default", defaultRMName, nil},
{"defined RP in additional RM", additionalRMName, nil},
{"undefined RP", "bogus", ErrRPNotDefined("bogus")},
Expand All @@ -327,7 +331,7 @@ func TestTaskContainerDefaults(t *testing.T) {
rpName rm.ResourcePoolName
err error
}{
{"empty RP name will default", "", nil},
{"empty RP name will default", emptyRPName, nil},
{"defined RP in default", defaultRMName, nil},
{"defined RP in additional RM", additionalRMName, nil},
{"undefined RP", "bogus", ErrRPNotDefined("bogus")},
Expand All @@ -346,7 +350,7 @@ func TestGetJobQ(t *testing.T) {
rpName rm.ResourcePoolName
err error
}{
{"empty RP name will default", "", nil},
{"empty RP name will default", emptyRPName, nil},
{"defined RP in default", defaultRMName, nil},
{"defined RP in additional RM", additionalRMName, nil},
{"undefined RP", "bogus", ErrRPNotDefined("bogus")},
Expand All @@ -361,19 +365,29 @@ func TestGetJobQ(t *testing.T) {

func TestGetJobQueueStatsRequest(t *testing.T) {
cases := []struct {
name string
req *apiv1.GetJobQueueStatsRequest
err error
name string
req *apiv1.GetJobQueueStatsRequest
err error
expectedLen int
}{
{"empty RP name will default", &apiv1.GetJobQueueStatsRequest{ResourcePools: []string{""}}, nil},
{"defined RP in default", &apiv1.GetJobQueueStatsRequest{ResourcePools: []string{defaultRMName}}, nil},
{"defined RP in additional RM", &apiv1.GetJobQueueStatsRequest{ResourcePools: []string{additionalRMName}}, nil},
{"undefined RP", &apiv1.GetJobQueueStatsRequest{ResourcePools: []string{"bogus"}}, ErrRPNotDefined("bogus")},
// TODO RM-136: revist this.
// Per the mocks set-up, no matter the pools in the request, return the max # of responses.
{"empty request", &apiv1.GetJobQueueStatsRequest{}, nil, 2},
{"empty RP name will default", &apiv1.GetJobQueueStatsRequest{ResourcePools: []string{""}}, nil, 2},
{"defined RP in default", &apiv1.GetJobQueueStatsRequest{ResourcePools: []string{defaultRMName}}, nil, 2},
{"defined RP in additional RM", &apiv1.GetJobQueueStatsRequest{ResourcePools: []string{additionalRMName}}, nil, 2},
{"undefined RP", &apiv1.GetJobQueueStatsRequest{ResourcePools: []string{"bogus"}}, nil, 2},
{
"undefined RP + defined RP",
&apiv1.GetJobQueueStatsRequest{ResourcePools: []string{"bogus", defaultRMName}},
nil, 2,
},
}
for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
_, err := testMultiRM.GetJobQueueStatsRequest(tt.req)
res, err := testMultiRM.GetJobQueueStatsRequest(tt.req)
require.Equal(t, tt.err, err)
require.Equal(t, tt.expectedLen, len(res.Results))
})
}
}
Expand Down Expand Up @@ -683,22 +697,27 @@ func mockRM(poolName rm.ResourcePoolName) *mocks.ResourceManager {
mockRM.On("SetGroupWeight", mock.Anything).Return(nil)
mockRM.On("SetGroupPriority", mock.Anything).Return(nil)
mockRM.On("IsReattachableOnlyAfterStarted", mock.Anything).Return(true)
mockRM.On("GetDefaultComputeResourcePool").Return("default", nil)
mockRM.On("GetDefaultAuxResourcePool").Return("default", nil)
mockRM.On("GetDefaultComputeResourcePool").Return(poolName, nil)
mockRM.On("GetDefaultAuxResourcePool").Return(poolName, nil)
mockRM.On("ValidateResourcePool", mock.Anything).Return(nil)
mockRM.On("ResolveResourcePool", poolName, 0, 0).Return(poolName, nil)
mockRM.On("TaskContainerDefaults", mock.Anything, mock.Anything).Return(mock.Anything, nil)
mockRM.On("GetJobQ", mock.Anything).Return(mock.Anything, nil)
mockRM.On("GetJobQueueStatsRequest", mock.Anything).Return(mock.Anything, nil)

mockRM.On("ResolveResourcePool", poolName, mock.Anything, mock.Anything).Return(poolName, nil)
mockRM.On("ResolveResourcePool", emptyRPName, mock.Anything, mock.Anything).Return(emptyRPName, nil)

mockRM.On("TaskContainerDefaults", mock.Anything, mock.Anything).Return(model.TaskContainerDefaultsConfig{}, nil)
mockRM.On("GetJobQ", mock.Anything).Return(map[model.JobID]*sproto.RMJobInfo{}, nil)
mockRM.On("GetJobQueueStatsRequest", mock.Anything).Return(&apiv1.GetJobQueueStatsResponse{
Results: []*apiv1.RPQueueStat{{ResourcePool: poolName.String()}},
}, nil)
mockRM.On("MoveJob", mock.Anything).Return(nil)
mockRM.On("GetExternalJobs", mock.Anything).Return(mock.Anything, nil)
mockRM.On("GetAgent", mock.Anything).Return(mock.Anything, nil)
mockRM.On("EnableAgent", mock.Anything).Return(mock.Anything, nil)
mockRM.On("DisableAgent", mock.Anything).Return(mock.Anything, nil)
mockRM.On("GetSlots", mock.Anything).Return(mock.Anything, nil)
mockRM.On("GetSlot", mock.Anything).Return(mock.Anything, nil)
mockRM.On("EnableSlot", mock.Anything).Return(mock.Anything, nil)
mockRM.On("DisableSlot", mock.Anything).Return(mock.Anything, nil)
mockRM.On("GetExternalJobs", mock.Anything).Return([]*jobv1.Job{}, nil)
mockRM.On("GetAgent", mock.Anything).Return(&apiv1.GetAgentResponse{}, nil)
mockRM.On("EnableAgent", mock.Anything).Return(&apiv1.EnableAgentResponse{}, nil)
mockRM.On("DisableAgent", mock.Anything).Return(&apiv1.DisableAgentResponse{}, nil)
mockRM.On("GetSlots", mock.Anything).Return(&apiv1.GetSlotsResponse{}, nil)
mockRM.On("GetSlot", mock.Anything).Return(&apiv1.GetSlotResponse{}, nil)
mockRM.On("EnableSlot", mock.Anything).Return(&apiv1.EnableSlotResponse{}, nil)
mockRM.On("DisableSlot", mock.Anything).Return(&apiv1.DisableSlotResponse{}, nil)

return &mockRM
}

0 comments on commit 3688c3f

Please sign in to comment.