Skip to content

Commit

Permalink
chore: add multirm module to ResourceManager (#8857)
Browse files Browse the repository at this point in the history
  • Loading branch information
carolinaecalderon authored Mar 4, 2024
1 parent d507edd commit c3012ff
Show file tree
Hide file tree
Showing 11 changed files with 1,353 additions and 86 deletions.
2 changes: 1 addition & 1 deletion master/internal/checkpoint_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func runCheckpointGCTask(
return nil
}

resolvedRM, rp, err := rm.ResolveResourcePool("", // todo (multirm)
resolvedRM, rp, err := rm.ResolveResourcePool("",
sproto.ResolveResourcesRequest{
ResourcePool: "",
Workspace: -1,
Expand Down
2 changes: 1 addition & 1 deletion master/internal/command/command_job_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (c *Command) ToV1Job() (*jobv1.Job, error) {
}

j.IsPreemptible = false
j.Priority = int32(config.ReadPriority(j.ResourcePool, &c.Config))
j.Priority = int32(config.ReadPriority(c.Config.Resources.ResourceManager, j.ResourcePool, &c.Config))
j.Weight = config.ReadWeight(j.ResourcePool, &c.Config)

j.ResourcePool = c.Config.Resources.ResourcePool
Expand Down
89 changes: 50 additions & 39 deletions master/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,17 +474,24 @@ func readPriorityFromScheduler(conf *SchedulerConfig) *int {

// ReadRMPreemptionStatus resolves the preemption status for a resource manager.
// TODO(Brad): Move these to a resource pool level API.
func ReadRMPreemptionStatus(rpName string) bool {
func ReadRMPreemptionStatus(rmName, rpName string) bool {
config := GetMasterConfig()
return readRMPreemptionStatus(config, rpName)
}

func readRMPreemptionStatus(config *Config, rpName string) bool {
// TODO(RM-38) make this be correct for len(resourceManagers) > 0 by taking
// a resource manager name too.
r := config.ResourceManagers()[0]
if rmName == "" && len(config.ResourceManagers()) == 1 {
return readRMPreemptionStatus(config.ResourceManagers()[0], rpName)
}

for _, r := range config.ResourceManagers() {
if r.ResourceManager.Name() == rmName {
return readRMPreemptionStatus(r, rpName)
}
}

panic("unexpected resource configuration")
}

for _, rpConfig := range r.ResourcePools {
func readRMPreemptionStatus(config *ResourceManagerWithPoolsConfig, rpName string) bool {
for _, rpConfig := range config.ResourcePools {
if rpConfig.PoolName != rpName {
continue
}
Expand All @@ -496,20 +503,20 @@ func readRMPreemptionStatus(config *Config, rpName string) bool {

// if not found, fall back to resource manager config
switch {
case r.ResourceManager.AgentRM != nil:
if r.ResourceManager.AgentRM.Scheduler == nil {
case config.ResourceManager.AgentRM != nil:
if config.ResourceManager.AgentRM.Scheduler == nil {
panic("scheduler not configured")
}
return r.ResourceManager.AgentRM.Scheduler.GetPreemption()
case r.ResourceManager.KubernetesRM != nil:
return r.ResourceManager.KubernetesRM.GetPreemption()
return config.ResourceManager.AgentRM.Scheduler.GetPreemption()
case config.ResourceManager.KubernetesRM != nil:
return config.ResourceManager.KubernetesRM.GetPreemption()
default:
panic("unexpected resource configuration")
}
}

// ReadPriority resolves the priority value for a job.
func ReadPriority(rpName string, jobConf interface{}) int {
func ReadPriority(rmName string, rpName string, jobConf interface{}) int {
config := GetMasterConfig()
var prio *int
// look at the idividual job config
Expand All @@ -523,35 +530,39 @@ func ReadPriority(rpName string, jobConf interface{}) int {
return *prio
}

var schedulerConf *SchedulerConfig

// if not found, fall back to the resource pools config
// TODO(RM-38) make this be correct for len(resourceManagers) > 0 by taking
// a resource manager name too.
r := config.ResourceManagers()[0]
for _, rm := range config.ResourceManagers() {
if rm.ResourceManager.Name() == rmName {
var schedulerConf *SchedulerConfig

for _, rpConfig := range rm.ResourcePools {
if rpConfig.PoolName != rpName {
continue
}
schedulerConf = rpConfig.Scheduler
break
}

for _, rpConfig := range r.ResourcePools {
if rpConfig.PoolName != rpName {
continue
}
schedulerConf = rpConfig.Scheduler
}
prio = readPriorityFromScheduler(schedulerConf)
if prio != nil {
return *prio
}
prio = readPriorityFromScheduler(schedulerConf)
if prio != nil {
return *prio
}

// if not found, fall back to resource manager config
if r.ResourceManager.AgentRM != nil {
schedulerConf = r.ResourceManager.AgentRM.Scheduler
prio = readPriorityFromScheduler(schedulerConf)
if prio != nil {
return *prio
}
}
// if not found, fall back to resource manager config
if rm.ResourceManager.AgentRM != nil {
schedulerConf = rm.ResourceManager.AgentRM.Scheduler
prio = readPriorityFromScheduler(schedulerConf)
if prio != nil {
return *prio
}
}

if r.ResourceManager.KubernetesRM != nil {
return KubernetesDefaultPriority
if rm.ResourceManager.KubernetesRM != nil {
return KubernetesDefaultPriority
}

break
}
}

return DefaultSchedulingPriority
Expand Down
72 changes: 71 additions & 1 deletion master/internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ func TestRMPreemptionStatus(t *testing.T) {
err := yaml.Unmarshal([]byte(configRaw), unmarshaled, yaml.DisallowUnknownFields)
assert.NilError(t, unmarshaled.Resolve())
assert.NilError(t, err)
assert.DeepEqual(t, readRMPreemptionStatus(unmarshaled, rpName), expected)
assert.DeepEqual(t, readRMPreemptionStatus(unmarshaled.ResourceManagers()[0], rpName), expected)
}

testCases := []struct {
Expand Down Expand Up @@ -845,3 +845,73 @@ resource_manager:
})
}
}

func TestMultiRMPreemptionAndPriority(t *testing.T) {
prio1 := 3
prio2 := 30

cfg := DefaultConfig()
cfg.ResourceConfig = ResourceConfig{
RootManagerInternal: &ResourceManagerConfig{AgentRM: &AgentResourceManagerConfig{
Name: DefaultRMName, Scheduler: &SchedulerConfig{
Priority: &PrioritySchedulerConfig{
Preemption: false,
DefaultPriority: &prio1,
},
},
}},
RootPoolsInternal: []ResourcePoolConfig{{
PoolName: "default",
Scheduler: &SchedulerConfig{Priority: &PrioritySchedulerConfig{
Preemption: true,
DefaultPriority: &prio2,
}},
}},
AdditionalResourceManagersInternal: []*ResourceManagerWithPoolsConfig{
{ResourceManager: &ResourceManagerConfig{KubernetesRM: &KubernetesResourceManagerConfig{
Name: "test",
DefaultScheduler: "not-preemption-scheduler",
}}, ResourcePools: []ResourcePoolConfig{{
PoolName: "default",
Scheduler: &SchedulerConfig{Priority: &PrioritySchedulerConfig{
Preemption: true,
DefaultPriority: &prio1,
}},
}}},
},
}

SetMasterConfig(cfg)

// 'default' RP exists under 'default' RM, so the preemption will
// be 'True' & priority to prio2, like the RP.
status := ReadRMPreemptionStatus("default", "default")
require.True(t, status)

priority := ReadPriority("default", "default", model.CommandConfig{})
require.Equal(t, prio2, priority)

// 'test1' RP doesn't exist under 'default' RM, so the preemption and
// priority will default to the RM's.
status = ReadRMPreemptionStatus("default", "test1")
require.False(t, status)

priority = ReadPriority("default", "test1", model.CommandConfig{})
require.Equal(t, prio1, priority)

// 'default' RP exists under 'test' RM, so the preemption
// & priority will default to the RP's.
status = ReadRMPreemptionStatus("test", "default")
require.True(t, status)

priority = ReadPriority("test", "default", model.CommandConfig{})
require.Equal(t, prio1, priority)

// 'test1' RP doesn't exist under 'default' RM, so the preemption
// & priority will default to the RM's.
status = ReadRMPreemptionStatus("test", "test1")
require.False(t, status)

priority = ReadPriority("test", "test1", model.CommandConfig{})
require.Equal(t, KubernetesDefaultPriority, priority)
}
3 changes: 3 additions & 0 deletions master/internal/config/resource_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
// DefaultRMName is the default resource manager name when a user does not provide one.
const DefaultRMName = "default"

// DefaultRMIndex is the default resource manager index given a list of Resources().
const DefaultRMIndex = 0

// DefaultResourceConfig returns the default resource configuration.
func DefaultResourceConfig() *ResourceConfig {
return &ResourceConfig{
Expand Down
2 changes: 1 addition & 1 deletion master/internal/config/resource_manager_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (r ResourceManagerConfig) Name() string {
if k8RM := r.KubernetesRM; k8RM != nil {
return k8RM.Name
}
// TODO dispatcher.
// TODO (multirm) dispatcher.

panic(fmt.Sprintf("unknown rm type %+v", r))
}
Expand Down
51 changes: 45 additions & 6 deletions master/internal/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"sync"
"time"

"github.com/determined-ai/determined/master/internal/rm/agentrm"
"github.com/determined-ai/determined/master/internal/rm/kubernetesrm"

"github.com/coreos/go-systemd/activation"
"github.com/google/uuid"
"github.com/labstack/echo-contrib/prometheus"
Expand Down Expand Up @@ -53,6 +56,7 @@ import (
"github.com/determined-ai/determined/master/internal/prom"
"github.com/determined-ai/determined/master/internal/proxy"
"github.com/determined-ai/determined/master/internal/rm"
"github.com/determined-ai/determined/master/internal/rm/multirm"
"github.com/determined-ai/determined/master/internal/rm/tasklist"
"github.com/determined-ai/determined/master/internal/sproto"
"github.com/determined-ai/determined/master/internal/stream"
Expand Down Expand Up @@ -952,6 +956,45 @@ func (m *Master) postTaskLogs(c echo.Context) (interface{}, error) {
return "", nil
}

func buildRM(
db *db.PgDB,
echo *echo.Echo,
rmConfigs []*config.ResourceManagerWithPoolsConfig,
tcd *model.TaskContainerDefaultsConfig,
opts *aproto.MasterSetAgentOptions,
cert *tls.Certificate,
) rm.ResourceManager {
if len(rmConfigs) <= 1 {
config := rmConfigs[0]
switch {
case config.ResourceManager.AgentRM != nil:
return agentrm.New(db, echo, config, opts, cert)
case config.ResourceManager.KubernetesRM != nil:
return kubernetesrm.New(db, config, tcd, opts, cert)
default:
panic("no expected resource manager config is defined")
}
}

// Set the default RM name for the multi-rm, from the default RM index.
defaultRMName := rmConfigs[config.DefaultRMIndex].ResourceManager.Name()
rms := map[string]rm.ResourceManager{}

for _, cfg := range rmConfigs {
c := cfg.ResourceManager
switch {
case c.AgentRM != nil:
rms[c.Name()] = agentrm.New(db, echo, cfg, opts, cert)
case c.KubernetesRM != nil:
rms[c.Name()] = kubernetesrm.New(db, cfg, tcd, opts, cert)
default:
panic("no expected resource manager config is defined")
}
}

return multirm.New(defaultRMName, rms)
}

// Run causes the Determined master to connect the database and begin listening for HTTP requests.
//
// gRPCLogInitDone is closed when the grpclog package's logger singletons are set. This is just
Expand Down Expand Up @@ -1140,19 +1183,15 @@ func (m *Master) Run(ctx context.Context, gRPCLogInitDone chan struct{}) error {
}

// Resource Manager.
// TODO(multirm) do multiple resource managers.
r := m.config.ResourceManagers()[0]
m.rm = rm.New(
m.db,
m.echo,
r,
m.rm = buildRM(m.db, m.echo, m.config.ResourceManagers(),
&m.config.TaskContainerDefaults,
&aproto.MasterSetAgentOptions{
MasterInfo: m.Info(),
LoggingOptions: m.config.Logging,
},
cert,
)

jobservice.SetDefaultService(m.rm)

tasksGroup := m.echo.Group("/tasks")
Expand Down
9 changes: 5 additions & 4 deletions master/internal/experiment_job_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ func (e *internalExperiment) ToV1Job() (*jobv1.Job, error) {
WorkspaceId: int32(workspace.ID),
}

j.IsPreemptible = config.ReadRMPreemptionStatus(j.ResourcePool)
j.Priority = int32(config.ReadPriority(j.ResourcePool, &e.activeConfig))
j.Weight = config.ReadWeight(j.ResourcePool, &e.activeConfig)

rmName := e.activeConfig.Resources().ResourceManager()
j.ResourcePool = e.activeConfig.Resources().ResourcePool()

j.IsPreemptible = config.ReadRMPreemptionStatus(rmName, j.ResourcePool)
j.Priority = int32(config.ReadPriority(rmName, j.ResourcePool, &e.activeConfig))
j.Weight = config.ReadWeight(j.ResourcePool, &e.activeConfig)

return &j, nil
}

Expand Down
Loading

0 comments on commit c3012ff

Please sign in to comment.