diff --git a/nomad/fsm.go b/nomad/fsm.go index 72757431211..90dba323158 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -1801,7 +1801,7 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error { // Ignore eval event creation during snapshot restore snap.UpsertEvals(structs.IgnoreUnknownTypeFlag, 100, []*structs.Evaluation{eval}) // Create the scheduler and run it - sched, err := scheduler.NewScheduler(eval.Type, n.logger, snap, planner) + sched, err := scheduler.NewScheduler(eval.Type, n.logger, nil, snap, planner) if err != nil { return err } diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 8d9119800e7..097c4830960 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -1785,7 +1785,7 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse) } // Create the scheduler and run it - sched, err := scheduler.NewScheduler(eval.Type, j.logger, snap, planner) + sched, err := scheduler.NewScheduler(eval.Type, j.logger, j.srv.workersEventCh, snap, planner) if err != nil { return err } diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 8f4c00cc362..95886654624 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -2,6 +2,7 @@ package mock import ( "fmt" + "math/rand" "time" "github.com/hashicorp/nomad/helper" @@ -1349,6 +1350,58 @@ func Alloc() *structs.Allocation { return alloc } +func AllocWithoutReservedPort() *structs.Allocation { + alloc := Alloc() + alloc.Resources.Networks[0].ReservedPorts = nil + alloc.TaskResources["web"].Networks[0].ReservedPorts = nil + alloc.AllocatedResources.Tasks["web"].Networks[0].ReservedPorts = nil + + return alloc +} + +func AllocForNode(n *structs.Node) *structs.Allocation { + nodeIP := n.NodeResources.NodeNetworks[0].Addresses[0].Address + + dynamicPortRange := structs.DefaultMaxDynamicPort - structs.DefaultMinDynamicPort + randomDynamicPort := rand.Intn(dynamicPortRange) + structs.DefaultMinDynamicPort + + alloc := Alloc() + alloc.NodeID = n.ID + + // Set node IP address. + alloc.Resources.Networks[0].IP = nodeIP + alloc.TaskResources["web"].Networks[0].IP = nodeIP + alloc.AllocatedResources.Tasks["web"].Networks[0].IP = nodeIP + + // Set dynamic port to a random value. + alloc.TaskResources["web"].Networks[0].DynamicPorts = []structs.Port{{Label: "http", Value: randomDynamicPort}} + alloc.AllocatedResources.Tasks["web"].Networks[0].DynamicPorts = []structs.Port{{Label: "http", Value: randomDynamicPort}} + + return alloc + +} + +func AllocForNodeWithoutReservedPort(n *structs.Node) *structs.Allocation { + nodeIP := n.NodeResources.NodeNetworks[0].Addresses[0].Address + + dynamicPortRange := structs.DefaultMaxDynamicPort - structs.DefaultMinDynamicPort + randomDynamicPort := rand.Intn(dynamicPortRange) + structs.DefaultMinDynamicPort + + alloc := AllocWithoutReservedPort() + alloc.NodeID = n.ID + + // Set node IP address. + alloc.Resources.Networks[0].IP = nodeIP + alloc.TaskResources["web"].Networks[0].IP = nodeIP + alloc.AllocatedResources.Tasks["web"].Networks[0].IP = nodeIP + + // Set dynamic port to a random value. + alloc.TaskResources["web"].Networks[0].DynamicPorts = []structs.Port{{Label: "http", Value: randomDynamicPort}} + alloc.AllocatedResources.Tasks["web"].Networks[0].DynamicPorts = []structs.Port{{Label: "http", Value: randomDynamicPort}} + + return alloc +} + // ConnectAlloc adds a Connect proxy sidecar group service to mock.Alloc. func ConnectAlloc() *structs.Allocation { alloc := Alloc() diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 7e97bd603fd..73761d86068 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -475,6 +475,8 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan return true } if !fit { + metrics.IncrCounterWithLabels([]string{"nomad", "plan", "node_rejected"}, 1, []metrics.Label{{Name: "node_id", Value: nodeID}}) + // Log the reason why the node's allocations could not be made if reason != "" { //TODO This was debug level and should return diff --git a/nomad/server.go b/nomad/server.go index 533b2ca01f9..1ae341af8bf 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "crypto/x509" + "encoding/json" "errors" "fmt" "io/ioutil" @@ -229,6 +230,7 @@ type Server struct { workers []*Worker workerLock sync.RWMutex workerConfigLock sync.RWMutex + workersEventCh chan interface{} // aclCache is used to maintain the parsed ACL objects aclCache *lru.TwoQueueCache @@ -344,6 +346,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr blockedEvals: NewBlockedEvals(evalBroker, logger), rpcTLS: incomingTLS, aclCache: aclCache, + workersEventCh: make(chan interface{}, 1), } s.shutdownCtx, s.shutdownCancel = context.WithCancel(context.Background()) @@ -1581,6 +1584,8 @@ func reloadSchedulers(s *Server, newArgs *SchedulerWorkerPoolArgs) { func (s *Server) setupWorkers(ctx context.Context) error { poolArgs := s.GetSchedulerWorkerConfig() + go s.listenWorkerEvents() + // we will be writing to the worker slice s.workerLock.Lock() defer s.workerLock.Unlock() @@ -1665,6 +1670,49 @@ func (s *Server) stopOldWorkers(oldWorkers []*Worker) { } } +// listenWorkerEvents listens for events emitted by scheduler workers and log +// them if necessary. Some events may be skipped to avoid polluting logs with +// duplicates. +func (s *Server) listenWorkerEvents() { + loggedAt := make(map[string]time.Time) + + gcDeadline := 4 * time.Hour + gcTicker := time.NewTicker(10 * time.Second) + defer gcTicker.Stop() + + for { + select { + case <-gcTicker.C: + for k, v := range loggedAt { + if time.Since(v) >= gcDeadline { + delete(loggedAt, k) + } + } + case e := <-s.workersEventCh: + switch event := e.(type) { + case *scheduler.PortCollisionEvent: + if event == nil || event.Node == nil { + continue + } + + if _, ok := loggedAt[event.Node.ID]; ok { + continue + } + + eventJson, err := json.Marshal(event.Sanitize()) + if err != nil { + s.logger.Debug("failed to encode event to JSON", "error", err) + } + s.logger.Warn("unexpected node port collision, refer to https://www.nomadproject.io/s/port-plan-failure for more information", + "node_id", event.Node.ID, "reason", event.Reason, "event", string(eventJson)) + loggedAt[event.Node.ID] = time.Now() + } + case <-s.shutdownCh: + return + } + } +} + // numPeers is used to check on the number of known peers, including the local // node. func (s *Server) numPeers() (int, error) { diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index b77e3cb2616..5fe4d4c6ef5 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -187,8 +187,12 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevi if netIdx == nil { netIdx = NewNetworkIndex() defer netIdx.Release() - if netIdx.SetNode(node) || netIdx.AddAllocs(allocs) { - return false, "reserved port collision", used, nil + + if collision, reason := netIdx.SetNode(node); collision { + return false, fmt.Sprintf("reserved node port collision: %v", reason), used, nil + } + if collision, reason := netIdx.AddAllocs(allocs); collision { + return false, fmt.Sprintf("reserved alloc port collision: %v", reason), used, nil } } diff --git a/nomad/structs/network.go b/nomad/structs/network.go index 76de50bfc61..9a07fb83ffe 100644 --- a/nomad/structs/network.go +++ b/nomad/structs/network.go @@ -5,6 +5,8 @@ import ( "math/rand" "net" "sync" + + "github.com/hashicorp/nomad/helper" ) const ( @@ -74,6 +76,83 @@ func (idx *NetworkIndex) getUsedPortsFor(ip string) Bitmap { return used } +func (idx *NetworkIndex) Copy() *NetworkIndex { + if idx == nil { + return nil + } + + c := new(NetworkIndex) + *c = *idx + + c.AvailNetworks = copyNetworkResources(idx.AvailNetworks) + c.NodeNetworks = copyNodeNetworks(idx.NodeNetworks) + c.AvailAddresses = copyAvailAddresses(idx.AvailAddresses) + if idx.AvailBandwidth != nil && len(idx.AvailBandwidth) == 0 { + c.AvailBandwidth = make(map[string]int) + } else { + c.AvailBandwidth = helper.CopyMapStringInt(idx.AvailBandwidth) + } + if len(idx.UsedPorts) > 0 { + c.UsedPorts = make(map[string]Bitmap, len(idx.UsedPorts)) + for k, v := range idx.UsedPorts { + c.UsedPorts[k], _ = v.Copy() + } + } + if idx.UsedBandwidth != nil && len(idx.UsedBandwidth) == 0 { + c.UsedBandwidth = make(map[string]int) + } else { + c.UsedBandwidth = helper.CopyMapStringInt(idx.UsedBandwidth) + } + + return c +} + +func copyNetworkResources(resources []*NetworkResource) []*NetworkResource { + l := len(resources) + if l == 0 { + return nil + } + + c := make([]*NetworkResource, l) + for i, resource := range resources { + c[i] = resource.Copy() + } + return c +} + +func copyNodeNetworks(resources []*NodeNetworkResource) []*NodeNetworkResource { + l := len(resources) + if l == 0 { + return nil + } + + c := make([]*NodeNetworkResource, l) + for i, resource := range resources { + c[i] = resource.Copy() + } + return c +} + +func copyAvailAddresses(a map[string][]NodeNetworkAddress) map[string][]NodeNetworkAddress { + l := len(a) + if l == 0 { + return nil + } + + c := make(map[string][]NodeNetworkAddress, l) + for k, v := range a { + if len(v) == 0 { + continue + } + c[k] = make([]NodeNetworkAddress, len(v)) + for i, a := range v { + c[k][i] = a + } + } + + return c +} + // Release is called when the network index is no longer needed // to attempt to re-use some of the memory it has allocated func (idx *NetworkIndex) Release() { @@ -96,7 +175,7 @@ func (idx *NetworkIndex) Overcommitted() bool { // SetNode is used to setup the available network resources. Returns // true if there is a collision -func (idx *NetworkIndex) SetNode(node *Node) (collide bool) { +func (idx *NetworkIndex) SetNode(node *Node) (collide bool, reason string) { // COMPAT(0.11): Remove in 0.11 // Grab the network resources, handling both new and old @@ -125,8 +204,9 @@ func (idx *NetworkIndex) SetNode(node *Node) (collide bool) { for _, n := range nodeNetworks { for _, a := range n.Addresses { idx.AvailAddresses[a.Alias] = append(idx.AvailAddresses[a.Alias], a) - if idx.AddReservedPortsForIP(a.ReservedPorts, a.Address) { + if c, r := idx.AddReservedPortsForIP(a.ReservedPorts, a.Address); c { collide = true + reason = fmt.Sprintf("collision when reserving ports for node network %s in node %s: %v", a.Alias, node.ID, r) } } } @@ -134,11 +214,16 @@ func (idx *NetworkIndex) SetNode(node *Node) (collide bool) { // COMPAT(0.11): Remove in 0.11 // Handle reserving ports, handling both new and old if node.ReservedResources != nil && node.ReservedResources.Networks.ReservedHostPorts != "" { - collide = idx.AddReservedPortRange(node.ReservedResources.Networks.ReservedHostPorts) + c, r := idx.AddReservedPortRange(node.ReservedResources.Networks.ReservedHostPorts) + collide = c + if collide { + reason = fmt.Sprintf("collision when reserving port range for node %s: %v", node.ID, r) + } } else if node.Reserved != nil { for _, n := range node.Reserved.Networks { - if idx.AddReserved(n) { + if c, r := idx.AddReserved(n); c { collide = true + reason = fmt.Sprintf("collision when reserving network %s for node %s: %v", n.IP, node.ID, r) } } } @@ -156,7 +241,7 @@ func (idx *NetworkIndex) SetNode(node *Node) (collide bool) { // AddAllocs is used to add the used network resources. Returns // true if there is a collision -func (idx *NetworkIndex) AddAllocs(allocs []*Allocation) (collide bool) { +func (idx *NetworkIndex) AddAllocs(allocs []*Allocation) (collide bool, reason string) { for _, alloc := range allocs { // Do not consider the resource impact of terminal allocations if alloc.TerminalStatus() { @@ -167,38 +252,42 @@ func (idx *NetworkIndex) AddAllocs(allocs []*Allocation) (collide bool) { // Only look at AllocatedPorts if populated, otherwise use pre 0.12 logic // COMPAT(1.0): Remove when network resources struct is removed. if len(alloc.AllocatedResources.Shared.Ports) > 0 { - if idx.AddReservedPorts(alloc.AllocatedResources.Shared.Ports) { + if c, r := idx.AddReservedPorts(alloc.AllocatedResources.Shared.Ports); c { collide = true + reason = fmt.Sprintf("collision when reserving port for alloc %s: %v", alloc.ID, r) } } else { // Add network resources that are at the task group level if len(alloc.AllocatedResources.Shared.Networks) > 0 { for _, network := range alloc.AllocatedResources.Shared.Networks { - if idx.AddReserved(network) { + if c, r := idx.AddReserved(network); c { collide = true + reason = fmt.Sprintf("collision when reserving port for network %s in alloc %s: %v", network.IP, alloc.ID, r) } } } - for _, task := range alloc.AllocatedResources.Tasks { - if len(task.Networks) == 0 { + for task, resources := range alloc.AllocatedResources.Tasks { + if len(resources.Networks) == 0 { continue } - n := task.Networks[0] - if idx.AddReserved(n) { + n := resources.Networks[0] + if c, r := idx.AddReserved(n); c { collide = true + reason = fmt.Sprintf("collision when reserving port for network %s in task %s of alloc %s: %v", n.IP, task, alloc.ID, r) } } } } else { // COMPAT(0.11): Remove in 0.11 - for _, task := range alloc.TaskResources { - if len(task.Networks) == 0 { + for task, resources := range alloc.TaskResources { + if len(resources.Networks) == 0 { continue } - n := task.Networks[0] - if idx.AddReserved(n) { + n := resources.Networks[0] + if c, r := idx.AddReserved(n); c { collide = true + reason = fmt.Sprintf("(deprecated) collision when reserving port for network %s in task %s of alloc %s: %v", n.IP, task, alloc.ID, r) } } } @@ -208,7 +297,7 @@ func (idx *NetworkIndex) AddAllocs(allocs []*Allocation) (collide bool) { // AddReserved is used to add a reserved network usage, returns true // if there is a port collision -func (idx *NetworkIndex) AddReserved(n *NetworkResource) (collide bool) { +func (idx *NetworkIndex) AddReserved(n *NetworkResource) (collide bool, reasons []string) { // Add the port usage used := idx.getUsedPortsFor(n.IP) @@ -216,10 +305,12 @@ func (idx *NetworkIndex) AddReserved(n *NetworkResource) (collide bool) { for _, port := range ports { // Guard against invalid port if port.Value < 0 || port.Value >= MaxValidPort { - return true + return true, []string{fmt.Sprintf("invalid port %d", port.Value)} } if used.Check(uint(port.Value)) { collide = true + reason := fmt.Sprintf("port %d already in use", port.Value) + reasons = append(reasons, reason) } else { used.Set(uint(port.Value)) } @@ -231,14 +322,16 @@ func (idx *NetworkIndex) AddReserved(n *NetworkResource) (collide bool) { return } -func (idx *NetworkIndex) AddReservedPorts(ports AllocatedPorts) (collide bool) { +func (idx *NetworkIndex) AddReservedPorts(ports AllocatedPorts) (collide bool, reasons []string) { for _, port := range ports { used := idx.getUsedPortsFor(port.HostIP) if port.Value < 0 || port.Value >= MaxValidPort { - return true + return true, []string{fmt.Sprintf("invalid port %d", port.Value)} } if used.Check(uint(port.Value)) { collide = true + reason := fmt.Sprintf("port %d already in use", port.Value) + reasons = append(reasons, reason) } else { used.Set(uint(port.Value)) } @@ -250,7 +343,7 @@ func (idx *NetworkIndex) AddReservedPorts(ports AllocatedPorts) (collide bool) { // AddReservedPortRange marks the ports given as reserved on all network // interfaces. The port format is comma delimited, with spans given as n1-n2 // (80,100-200,205) -func (idx *NetworkIndex) AddReservedPortRange(ports string) (collide bool) { +func (idx *NetworkIndex) AddReservedPortRange(ports string) (collide bool, reasons []string) { // Convert the ports into a slice of ints resPorts, err := ParsePortRanges(ports) if err != nil { @@ -266,10 +359,12 @@ func (idx *NetworkIndex) AddReservedPortRange(ports string) (collide bool) { for _, port := range resPorts { // Guard against invalid port if port >= MaxValidPort { - return true + return true, []string{fmt.Sprintf("invalid port %d", port)} } if used.Check(uint(port)) { collide = true + reason := fmt.Sprintf("port %d already in use", port) + reasons = append(reasons, reason) } else { used.Set(uint(port)) } @@ -281,7 +376,7 @@ func (idx *NetworkIndex) AddReservedPortRange(ports string) (collide bool) { // AddReservedPortsForIP checks whether any reserved ports collide with those // in use for the IP address. -func (idx *NetworkIndex) AddReservedPortsForIP(ports string, ip string) (collide bool) { +func (idx *NetworkIndex) AddReservedPortsForIP(ports string, ip string) (collide bool, reasons []string) { // Convert the ports into a slice of ints resPorts, err := ParsePortRanges(ports) if err != nil { @@ -292,10 +387,12 @@ func (idx *NetworkIndex) AddReservedPortsForIP(ports string, ip string) (collide for _, port := range resPorts { // Guard against invalid port if port >= MaxValidPort { - return true + return true, []string{fmt.Sprintf("invalid port %d", port)} } if used.Check(uint(port)) { collide = true + reason := fmt.Sprintf("port %d already in use", port) + reasons = append(reasons, reason) } else { used.Set(uint(port)) } @@ -368,7 +465,7 @@ func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, erro return nil, addrErr } - return nil, fmt.Errorf("no addresses available for %q network", port.HostNetwork) + return nil, fmt.Errorf("no addresses available for %s network", port.HostNetwork) } offer = append(offer, *allocPort) @@ -409,7 +506,7 @@ func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, erro return nil, addrErr } - return nil, fmt.Errorf("no addresses available for %q network", port.HostNetwork) + return nil, fmt.Errorf("no addresses available for %s network", port.HostNetwork) } offer = append(offer, *allocPort) } diff --git a/nomad/structs/network_test.go b/nomad/structs/network_test.go index e7b0a52bedc..277f36f1acb 100644 --- a/nomad/structs/network_test.go +++ b/nomad/structs/network_test.go @@ -9,6 +9,118 @@ import ( "github.com/stretchr/testify/require" ) +func TestNetworkIndex_Copy(t *testing.T) { + n := &Node{ + NodeResources: &NodeResources{ + Networks: []*NetworkResource{ + { + Device: "eth0", + CIDR: "192.168.0.100/32", + IP: "192.168.0.100", + MBits: 1000, + }, + }, + NodeNetworks: []*NodeNetworkResource{ + { + Mode: "host", + Device: "eth0", + Speed: 1000, + Addresses: []NodeNetworkAddress{ + { + Alias: "default", + Address: "192.168.0.100", + Family: NodeNetworkAF_IPv4, + }, + }, + }, + }, + }, + Reserved: &Resources{ + Networks: []*NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []Port{{Label: "ssh", Value: 22}}, + MBits: 1, + }, + }, + }, + ReservedResources: &NodeReservedResources{ + Networks: NodeReservedNetworkResources{ + ReservedHostPorts: "22", + }, + }, + } + + allocs := []*Allocation{ + { + AllocatedResources: &AllocatedResources{ + Tasks: map[string]*AllocatedTaskResources{ + "web": { + Networks: []*NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 20, + ReservedPorts: []Port{{"one", 8000, 0, ""}, {"two", 9000, 0, ""}}, + }, + }, + }, + }, + }, + }, + { + AllocatedResources: &AllocatedResources{ + Tasks: map[string]*AllocatedTaskResources{ + "api": { + Networks: []*NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 50, + ReservedPorts: []Port{{"one", 10000, 0, ""}}, + }, + }, + }, + }, + }, + }, + } + + netIdx := NewNetworkIndex() + netIdx.SetNode(n) + netIdx.AddAllocs(allocs) + + // Copy must be equal. + netIdxCopy := netIdx.Copy() + require.Equal(t, netIdx, netIdxCopy) + + // Modifying copy should not affect original value. + n.NodeResources.Networks[0].Device = "eth1" + n.ReservedResources.Networks.ReservedHostPorts = "22,80" + allocs = append(allocs, &Allocation{ + AllocatedResources: &AllocatedResources{ + Tasks: map[string]*AllocatedTaskResources{ + "db": { + Networks: []*NetworkResource{ + { + Device: "eth1", + IP: "192.168.0.104", + MBits: 50, + ReservedPorts: []Port{{"one", 4567, 0, ""}}, + }, + }, + }, + }, + }, + }) + netIdxCopy.SetNode(n) + netIdxCopy.AddAllocs(allocs) + netIdxCopy.MinDynamicPort = 1000 + netIdxCopy.MaxDynamicPort = 2000 + require.NotEqual(t, netIdx, netIdxCopy) +} + func TestNetworkIndex_Overcommitted(t *testing.T) { t.Skip() idx := NewNetworkIndex() @@ -20,8 +132,8 @@ func TestNetworkIndex_Overcommitted(t *testing.T) { MBits: 505, ReservedPorts: []Port{{"one", 8000, 0, ""}, {"two", 9000, 0, ""}}, } - collide := idx.AddReserved(reserved) - if collide { + collide, reasons := idx.AddReserved(reserved) + if collide || len(reasons) != 0 { t.Fatalf("bad") } if !idx.Overcommitted() { @@ -71,8 +183,8 @@ func TestNetworkIndex_SetNode(t *testing.T) { }, }, } - collide := idx.SetNode(n) - if collide { + collide, reason := idx.SetNode(n) + if collide || reason != "" { t.Fatalf("bad") } @@ -123,8 +235,8 @@ func TestNetworkIndex_AddAllocs(t *testing.T) { }, }, } - collide := idx.AddAllocs(allocs) - if collide { + collide, reason := idx.AddAllocs(allocs) + if collide || reason != "" { t.Fatalf("bad") } @@ -151,8 +263,8 @@ func TestNetworkIndex_AddReserved(t *testing.T) { MBits: 20, ReservedPorts: []Port{{"one", 8000, 0, ""}, {"two", 9000, 0, ""}}, } - collide := idx.AddReserved(reserved) - if collide { + collide, reasons := idx.AddReserved(reserved) + if collide || len(reasons) > 0 { t.Fatalf("bad") } @@ -167,8 +279,8 @@ func TestNetworkIndex_AddReserved(t *testing.T) { } // Try to reserve the same network - collide = idx.AddReserved(reserved) - if !collide { + collide, reasons = idx.AddReserved(reserved) + if !collide || len(reasons) == 0 { t.Fatalf("bad") } } @@ -375,8 +487,8 @@ func TestNetworkIndex_SetNode_Old(t *testing.T) { }, }, } - collide := idx.SetNode(n) - if collide { + collide, reason := idx.SetNode(n) + if collide || reason != "" { t.Fatalf("bad") } @@ -427,8 +539,8 @@ func TestNetworkIndex_AddAllocs_Old(t *testing.T) { }, }, } - collide := idx.AddAllocs(allocs) - if collide { + collide, reason := idx.AddAllocs(allocs) + if collide || reason != "" { t.Fatalf("bad") } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index b1fbdb0425b..2c20932a24f 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2514,6 +2514,22 @@ func (n *NodeNetworkResource) Equals(o *NodeNetworkResource) bool { return reflect.DeepEqual(n, o) } +func (n *NodeNetworkResource) Copy() *NodeNetworkResource { + if n == nil { + return nil + } + + c := new(NodeNetworkResource) + *c = *n + + if n.Addresses != nil { + c.Addresses = make([]NodeNetworkAddress, len(n.Addresses)) + copy(c.Addresses, n.Addresses) + } + + return c +} + func (n *NodeNetworkResource) HasAlias(alias string) bool { for _, addr := range n.Addresses { if addr.Alias == alias { @@ -2879,6 +2895,13 @@ func (n *NodeResources) Copy() *NodeResources { newN.Cpu = n.Cpu.Copy() newN.Networks = n.Networks.Copy() + if n.NodeNetworks != nil { + newN.NodeNetworks = make([]*NodeNetworkResource, len(n.NodeNetworks)) + for i, nn := range n.NodeNetworks { + newN.NodeNetworks[i] = nn.Copy() + } + } + // Copy the devices if n.Devices != nil { devices := len(n.Devices) diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index aa3e735dd66..40b343910b8 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -3042,6 +3042,36 @@ func TestMemoryResources_Add(t *testing.T) { }, r) } +func TestNodeNetworkResource_Copy(t *testing.T) { + netResource := &NodeNetworkResource{ + Mode: "host", + Device: "eth0", + MacAddress: "00:00:00:00:00:00", + Speed: 1000, + Addresses: []NodeNetworkAddress{ + { + Family: NodeNetworkAF_IPv4, + Alias: "default", + Address: "192.168.0.2", + ReservedPorts: "22", + Gateway: "192.168.0.1", + }, + }, + } + + // Copy must be equal. + netResourceCopy := netResource.Copy() + require.Equal(t, netResource, netResourceCopy) + + // Modifying copy should not modify original value. + netResourceCopy.Mode = "alloc" + netResourceCopy.Device = "eth1" + netResourceCopy.MacAddress = "11:11:11:11:11:11" + netResourceCopy.Speed = 500 + netResourceCopy.Addresses[0].Alias = "copy" + require.NotEqual(t, netResource, netResourceCopy) +} + func TestEncodeDecode(t *testing.T) { type FooRequest struct { Foo string @@ -6084,6 +6114,23 @@ func TestNodeResources_Copy(t *testing.T) { Device: "foo", }, }, + NodeNetworks: []*NodeNetworkResource{ + { + Mode: "host", + Device: "eth0", + MacAddress: "00:00:00:00:00:00", + Speed: 1000, + Addresses: []NodeNetworkAddress{ + { + Family: NodeNetworkAF_IPv4, + Alias: "private", + Address: "192.168.0.100", + ReservedPorts: "22,80", + Gateway: "192.168.0.1", + }, + }, + }, + }, } kopy := orig.Copy() @@ -6091,7 +6138,11 @@ func TestNodeResources_Copy(t *testing.T) { // Make sure slices aren't shared kopy.Cpu.ReservableCpuCores[1] = 9000 - assert.NotEqual(t, orig, kopy) + assert.NotEqual(t, orig.Cpu.ReservableCpuCores, kopy.Cpu.ReservableCpuCores) + + kopy.NodeNetworks[0].MacAddress = "11:11:11:11:11:11" + kopy.NodeNetworks[0].Addresses[0].Alias = "public" + assert.NotEqual(t, orig.NodeNetworks[0], kopy.NodeNetworks[0]) } func TestNodeResources_Merge(t *testing.T) { diff --git a/nomad/worker.go b/nomad/worker.go index 10e8cae0614..5c116e724c2 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -566,7 +566,7 @@ func (w *Worker) invokeScheduler(snap *state.StateSnapshot, eval *structs.Evalua if eval.Type == structs.JobTypeCore { sched = NewCoreScheduler(w.srv, snap) } else { - sched, err = scheduler.NewScheduler(eval.Type, w.logger, snap, w) + sched, err = scheduler.NewScheduler(eval.Type, w.logger, w.srv.workersEventCh, snap, w) if err != nil { return fmt.Errorf("failed to instantiate scheduler: %v", err) } diff --git a/nomad/worker_test.go b/nomad/worker_test.go index 334790e5f9d..0e872dedd35 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -22,10 +22,11 @@ import ( ) type NoopScheduler struct { - state scheduler.State - planner scheduler.Planner - eval *structs.Evaluation - err error + state scheduler.State + planner scheduler.Planner + eval *structs.Evaluation + eventsCh chan<- interface{} + err error } func (n *NoopScheduler) Process(eval *structs.Evaluation) error { @@ -40,7 +41,7 @@ func (n *NoopScheduler) Process(eval *structs.Evaluation) error { } func init() { - scheduler.BuiltinSchedulers["noop"] = func(logger log.Logger, s scheduler.State, p scheduler.Planner) scheduler.Scheduler { + scheduler.BuiltinSchedulers["noop"] = func(logger log.Logger, eventsCh chan<- interface{}, s scheduler.State, p scheduler.Planner) scheduler.Scheduler { n := &NoopScheduler{ state: s, planner: p, diff --git a/scheduler/context.go b/scheduler/context.go index fa7b5164870..bd201abf858 100644 --- a/scheduler/context.go +++ b/scheduler/context.go @@ -42,6 +42,10 @@ type Context interface { // Eligibility returns a tracker for node eligibility in the context of the // eval. Eligibility() *EvalEligibility + + // SendEvent provides best-effort delivery of scheduling and placement + // events. + SendEvent(event interface{}) } // EvalCache is used to cache certain things during an evaluation @@ -72,9 +76,57 @@ func (e *EvalCache) SemverConstraintCache() map[string]VerConstraints { return e.semverCache } +// PortCollisionEvent is an event that can happen during scheduling when +// an unexpected port collision is detected. +type PortCollisionEvent struct { + Reason string + Node *structs.Node + Allocations []*structs.Allocation + + // TODO: this is a large struct, but may be required to debug unexpected + // port collisions. Re-evaluate its need in the future if the bug is fixed + // or not caused by this field. + NetIndex *structs.NetworkIndex +} + +func (ev *PortCollisionEvent) Copy() *PortCollisionEvent { + if ev == nil { + return nil + } + c := new(PortCollisionEvent) + *c = *ev + c.Node = ev.Node.Copy() + if len(ev.Allocations) > 0 { + for i, a := range ev.Allocations { + c.Allocations[i] = a.Copy() + } + + } + c.NetIndex = ev.NetIndex.Copy() + return c +} + +func (ev *PortCollisionEvent) Sanitize() *PortCollisionEvent { + if ev == nil { + return nil + } + clean := ev.Copy() + + clean.Node = ev.Node.Sanitize() + clean.Node.Meta = make(map[string]string) + + for i, alloc := range ev.Allocations { + clean.Allocations[i] = alloc.CopySkipJob() + clean.Allocations[i].Job = nil + } + + return clean +} + // EvalContext is a Context used during an Evaluation type EvalContext struct { EvalCache + eventsCh chan<- interface{} state State plan *structs.Plan logger log.Logger @@ -83,12 +135,13 @@ type EvalContext struct { } // NewEvalContext constructs a new EvalContext -func NewEvalContext(s State, p *structs.Plan, log log.Logger) *EvalContext { +func NewEvalContext(eventsCh chan<- interface{}, s State, p *structs.Plan, log log.Logger) *EvalContext { ctx := &EvalContext{ - state: s, - plan: p, - logger: log, - metrics: new(structs.AllocMetric), + eventsCh: eventsCh, + state: s, + plan: p, + logger: log, + metrics: new(structs.AllocMetric), } return ctx } @@ -164,6 +217,17 @@ func (e *EvalContext) Eligibility() *EvalEligibility { return e.eligibility } +func (e *EvalContext) SendEvent(event interface{}) { + if e == nil || e.eventsCh == nil { + return + } + + select { + case e.eventsCh <- event: + default: + } +} + type ComputedClassFeasibility byte const ( diff --git a/scheduler/context_test.go b/scheduler/context_test.go index e14290ee36e..75bc7ed3686 100644 --- a/scheduler/context_test.go +++ b/scheduler/context_test.go @@ -21,7 +21,7 @@ func testContext(t testing.TB) (*state.StateStore, *EvalContext) { logger := testlog.HCLogger(t) - ctx := NewEvalContext(state, plan, logger) + ctx := NewEvalContext(nil, state, plan, logger) return state, ctx } @@ -392,3 +392,48 @@ func TestEvalEligibility_GetClasses_JobEligible_TaskGroupIneligible(t *testing.T actClasses := e.GetClasses() require.Equal(t, expClasses, actClasses) } + +func TestPortCollisionEvent_Copy(t *testing.T) { + ev := &PortCollisionEvent{ + Reason: "original", + Node: mock.Node(), + Allocations: []*structs.Allocation{ + mock.Alloc(), + mock.Alloc(), + }, + NetIndex: structs.NewNetworkIndex(), + } + ev.NetIndex.SetNode(ev.Node) + + // Copy must be equal + evCopy := ev.Copy() + require.Equal(t, ev, evCopy) + + // Modifying the copy should not affect the original value + evCopy.Reason = "copy" + require.NotEqual(t, ev.Reason, evCopy.Reason) + + evCopy.Node.Attributes["test"] = "true" + require.NotEqual(t, ev.Node, evCopy.Node) + + evCopy.Allocations = append(evCopy.Allocations, mock.Alloc()) + require.NotEqual(t, ev.Allocations, evCopy.Allocations) + + evCopy.NetIndex.AddReservedPortRange("1000-2000") + require.NotEqual(t, ev.NetIndex, evCopy.NetIndex) +} + +func TestPortCollisionEvent_Sanitize(t *testing.T) { + ev := &PortCollisionEvent{ + Reason: "original", + Node: mock.Node(), + Allocations: []*structs.Allocation{ + mock.Alloc(), + }, + NetIndex: structs.NewNetworkIndex(), + } + + cleanEv := ev.Sanitize() + require.Empty(t, cleanEv.Node.SecretID) + require.Nil(t, cleanEv.Allocations[0].Job) +} diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index fd19e259bd6..f263d864b24 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -76,10 +76,11 @@ func (s *SetStatusError) Error() string { // most workloads. It also supports a 'batch' mode to optimize for fast decision // making at the cost of quality. type GenericScheduler struct { - logger log.Logger - state State - planner Planner - batch bool + logger log.Logger + eventsCh chan<- interface{} + state State + planner Planner + batch bool eval *structs.Evaluation job *structs.Job @@ -100,23 +101,25 @@ type GenericScheduler struct { } // NewServiceScheduler is a factory function to instantiate a new service scheduler -func NewServiceScheduler(logger log.Logger, state State, planner Planner) Scheduler { +func NewServiceScheduler(logger log.Logger, eventsCh chan<- interface{}, state State, planner Planner) Scheduler { s := &GenericScheduler{ - logger: logger.Named("service_sched"), - state: state, - planner: planner, - batch: false, + logger: logger.Named("service_sched"), + eventsCh: eventsCh, + state: state, + planner: planner, + batch: false, } return s } // NewBatchScheduler is a factory function to instantiate a new batch scheduler -func NewBatchScheduler(logger log.Logger, state State, planner Planner) Scheduler { +func NewBatchScheduler(logger log.Logger, eventsCh chan<- interface{}, state State, planner Planner) Scheduler { s := &GenericScheduler{ - logger: logger.Named("batch_sched"), - state: state, - planner: planner, - batch: true, + logger: logger.Named("batch_sched"), + eventsCh: eventsCh, + state: state, + planner: planner, + batch: true, } return s } @@ -245,7 +248,7 @@ func (s *GenericScheduler) process() (bool, error) { s.failedTGAllocs = nil // Create an evaluation context - s.ctx = NewEvalContext(s.state, s.plan, s.logger) + s.ctx = NewEvalContext(s.eventsCh, s.state, s.plan, s.logger) // Construct the placement stack s.stack = NewGenericStack(s.batch, s.ctx) diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 1b38d9c0005..ecb2784c6bf 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -2277,10 +2277,9 @@ func TestServiceSched_JobModify_InPlace(t *testing.T) { // Create allocs that are part of the old deployment var allocs []*structs.Allocation for i := 0; i < 10; i++ { - alloc := mock.Alloc() + alloc := mock.AllocForNode(nodes[i]) alloc.Job = job alloc.JobID = job.ID - alloc.NodeID = nodes[i].ID alloc.Name = fmt.Sprintf("my-job.web[%d]", i) alloc.DeploymentID = d.ID alloc.DeploymentStatus = &structs.AllocDeploymentStatus{Healthy: helper.BoolToPtr(true)} @@ -4273,10 +4272,9 @@ func TestBatchSched_Run_LostAlloc(t *testing.T) { // Create two running allocations var allocs []*structs.Allocation for i := 0; i <= 1; i++ { - alloc := mock.Alloc() + alloc := mock.AllocForNodeWithoutReservedPort(node) alloc.Job = job alloc.JobID = job.ID - alloc.NodeID = node.ID alloc.Name = fmt.Sprintf("my-job.web[%d]", i) alloc.ClientStatus = structs.AllocClientStatusRunning allocs = append(allocs, alloc) @@ -4764,10 +4762,9 @@ func TestBatchSched_ScaleDown_SameName(t *testing.T) { // Create a few running alloc var allocs []*structs.Allocation for i := 0; i < 5; i++ { - alloc := mock.Alloc() + alloc := mock.AllocForNodeWithoutReservedPort(node) alloc.Job = job alloc.JobID = job.ID - alloc.NodeID = node.ID alloc.Name = "my-job.web[0]" alloc.ClientStatus = structs.AllocClientStatusRunning alloc.Metrics = scoreMetric @@ -5777,10 +5774,9 @@ func TestServiceSched_Migrate_CanaryStatus(t *testing.T) { var allocs []*structs.Allocation for i := 0; i < 3; i++ { - alloc := mock.Alloc() + alloc := mock.AllocForNodeWithoutReservedPort(node1) alloc.Job = job alloc.JobID = job.ID - alloc.NodeID = node1.ID alloc.DeploymentID = deployment.ID alloc.Name = fmt.Sprintf("my-job.web[%d]", i) allocs = append(allocs, alloc) diff --git a/scheduler/rank.go b/scheduler/rank.go index 9bd19b9bd42..fa3223d114f 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -206,10 +206,34 @@ OUTER: continue } - // Index the existing network usage + // Index the existing network usage. + // This should never collide, since it represents the current state of + // the node. If it does collide though, it means we found a bug! So + // collect as much information as possible. netIdx := structs.NewNetworkIndex() - netIdx.SetNode(option.Node) - netIdx.AddAllocs(proposed) + if collide, reason := netIdx.SetNode(option.Node); collide { + iter.ctx.SendEvent(&PortCollisionEvent{ + Reason: reason, + NetIndex: netIdx.Copy(), + Node: option.Node, + }) + iter.ctx.Metrics().ExhaustedNode(option.Node, "network: port collision") + continue + } + if collide, reason := netIdx.AddAllocs(proposed); collide { + event := &PortCollisionEvent{ + Reason: reason, + NetIndex: netIdx.Copy(), + Node: option.Node, + Allocations: make([]*structs.Allocation, len(proposed)), + } + for i, alloc := range proposed { + event.Allocations[i] = alloc.Copy() + } + iter.ctx.SendEvent(event) + iter.ctx.Metrics().ExhaustedNode(option.Node, "network: port collision") + continue + } // Create a device allocator devAllocator := newDeviceAllocator(iter.ctx, option.Node) diff --git a/scheduler/rank_test.go b/scheduler/rank_test.go index 3bc979cf447..c953fdfd5ec 100644 --- a/scheduler/rank_test.go +++ b/scheduler/rank_test.go @@ -492,6 +492,221 @@ func TestBinPackIterator_Network_Failure(t *testing.T) { require.Equal(1, ctx.metrics.DimensionExhausted["network: bandwidth exceeded"]) } +func TestBinPackIterator_Network_PortCollision_Node(t *testing.T) { + _, ctx := testContext(t) + eventsCh := make(chan interface{}) + ctx.eventsCh = eventsCh + + // Collide on host with duplicate IPs. + nodes := []*RankedNode{ + { + Node: &structs.Node{ + ID: uuid.Generate(), + Resources: &structs.Resources{ + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + CIDR: "192.168.0.100/32", + IP: "192.158.0.100", + }, + }, + }, + NodeResources: &structs.NodeResources{ + Cpu: structs.NodeCpuResources{ + CpuShares: 4096, + }, + Memory: structs.NodeMemoryResources{ + MemoryMB: 4096, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + CIDR: "192.168.0.100/32", + IP: "192.158.0.100", + }, + }, + NodeNetworks: []*structs.NodeNetworkResource{ + { + Mode: "host", + Device: "eth0", + Addresses: []structs.NodeNetworkAddress{ + { + Alias: "default", + Address: "192.168.0.100", + ReservedPorts: "22,80", + }, + { + Alias: "private", + Address: "192.168.0.100", + ReservedPorts: "22", + }, + }, + }, + }, + }, + }, + }, + } + static := NewStaticRankIterator(ctx, nodes) + + taskGroup := &structs.TaskGroup{ + EphemeralDisk: &structs.EphemeralDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + }, + }, + }, + }, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + }, + }, + } + binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) + binp.SetTaskGroup(taskGroup) + + scoreNorm := NewScoreNormalizationIterator(ctx, binp) + out := collectRanked(scoreNorm) + + // We expect a placement failure due to port collision. + require.Len(t, out, 0) + require.Equal(t, 1, ctx.metrics.DimensionExhausted["network: port collision"]) +} + +func TestBinPackIterator_Network_PortCollision_Alloc(t *testing.T) { + state, ctx := testContext(t) + eventsCh := make(chan interface{}) + ctx.eventsCh = eventsCh + + nodes := []*RankedNode{ + { + Node: &structs.Node{ + ID: uuid.Generate(), + NodeResources: &structs.NodeResources{ + Cpu: structs.NodeCpuResources{ + CpuShares: 2048, + }, + Memory: structs.NodeMemoryResources{ + MemoryMB: 2048, + }, + }, + }, + }, + } + static := NewStaticRankIterator(ctx, nodes) + + // Add allocations with port collision. + j := mock.Job() + alloc1 := &structs.Allocation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + EvalID: uuid.Generate(), + NodeID: nodes[0].Node.ID, + JobID: j.ID, + Job: j, + AllocatedResources: &structs.AllocatedResources{ + Tasks: map[string]*structs.AllocatedTaskResources{ + "web": { + Cpu: structs.AllocatedCpuResources{ + CpuShares: 1024, + }, + Memory: structs.AllocatedMemoryResources{ + MemoryMB: 1024, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []structs.Port{{Label: "admin", Value: 5000}}, + MBits: 50, + DynamicPorts: []structs.Port{{Label: "http", Value: 9876}}, + }, + }, + }, + }, + }, + DesiredStatus: structs.AllocDesiredStatusRun, + ClientStatus: structs.AllocClientStatusPending, + TaskGroup: "web", + } + alloc2 := &structs.Allocation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + EvalID: uuid.Generate(), + NodeID: nodes[0].Node.ID, + JobID: j.ID, + Job: j, + AllocatedResources: &structs.AllocatedResources{ + Tasks: map[string]*structs.AllocatedTaskResources{ + "web": { + Cpu: structs.AllocatedCpuResources{ + CpuShares: 1024, + }, + Memory: structs.AllocatedMemoryResources{ + MemoryMB: 1024, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []structs.Port{{Label: "admin", Value: 5000}}, + MBits: 50, + DynamicPorts: []structs.Port{{Label: "http", Value: 9876}}, + }, + }, + }, + }, + }, + DesiredStatus: structs.AllocDesiredStatusRun, + ClientStatus: structs.AllocClientStatusPending, + TaskGroup: "web", + } + require.NoError(t, state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID))) + require.NoError(t, state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))) + require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1, alloc2})) + + taskGroup := &structs.TaskGroup{ + EphemeralDisk: &structs.EphemeralDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + }, + }, + }, + }, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + }, + }, + } + binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig) + binp.SetTaskGroup(taskGroup) + + scoreNorm := NewScoreNormalizationIterator(ctx, binp) + out := collectRanked(scoreNorm) + + // We expect a placement failure due to port collision. + require.Len(t, out, 0) + require.Equal(t, 1, ctx.metrics.DimensionExhausted["network: port collision"]) +} + // Tests bin packing iterator with host network interpolation of task group level ports configuration func TestBinPackIterator_Network_Interpolation_Success(t *testing.T) { _, ctx := testContext(t) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index face624137f..b472041e059 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -29,7 +29,7 @@ var BuiltinSchedulers = map[string]Factory{ // NewScheduler is used to instantiate and return a new scheduler // given the scheduler name, initial state, and planner. -func NewScheduler(name string, logger log.Logger, state State, planner Planner) (Scheduler, error) { +func NewScheduler(name string, logger log.Logger, eventsCh chan<- interface{}, state State, planner Planner) (Scheduler, error) { // Lookup the factory function factory, ok := BuiltinSchedulers[name] if !ok { @@ -37,12 +37,12 @@ func NewScheduler(name string, logger log.Logger, state State, planner Planner) } // Instantiate the scheduler - sched := factory(logger, state, planner) + sched := factory(logger, eventsCh, state, planner) return sched, nil } // Factory is used to instantiate a new Scheduler -type Factory func(log.Logger, State, Planner) Scheduler +type Factory func(log.Logger, chan<- interface{}, State, Planner) Scheduler // Scheduler is the top level instance for a scheduler. A scheduler is // meant to only encapsulate business logic, pushing the various plumbing diff --git a/scheduler/scheduler_sysbatch_test.go b/scheduler/scheduler_sysbatch_test.go index 302c5c0bd79..acee0affdb8 100644 --- a/scheduler/scheduler_sysbatch_test.go +++ b/scheduler/scheduler_sysbatch_test.go @@ -1447,10 +1447,9 @@ func TestSysBatch_Preemption(t *testing.T) { Cpu: structs.AllocatedCpuResources{CpuShares: 512}, Memory: structs.AllocatedMemoryResources{MemoryMB: 1024}, Networks: []*structs.NetworkResource{{ - Device: "eth0", - IP: "192.168.0.100", - ReservedPorts: []structs.Port{{Label: "web", Value: 80}}, - MBits: 200, + Device: "eth0", + IP: "192.168.0.100", + MBits: 200, }}, }, }, @@ -1513,10 +1512,9 @@ func TestSysBatch_Preemption(t *testing.T) { Cpu: structs.AllocatedCpuResources{CpuShares: 1024}, Memory: structs.AllocatedMemoryResources{MemoryMB: 25}, Networks: []*structs.NetworkResource{{ - Device: "eth0", - IP: "192.168.0.100", - ReservedPorts: []structs.Port{{Label: "web", Value: 80}}, - MBits: 400, + Device: "eth0", + IP: "192.168.0.100", + MBits: 400, }}, }, }, diff --git a/scheduler/scheduler_system.go b/scheduler/scheduler_system.go index 325e7ae00a5..dff79aa8bc2 100644 --- a/scheduler/scheduler_system.go +++ b/scheduler/scheduler_system.go @@ -26,6 +26,7 @@ const ( // whereas 'sysbatch' considers the task complete on success. type SystemScheduler struct { logger log.Logger + eventsCh chan<- interface{} state State planner Planner sysbatch bool @@ -50,18 +51,20 @@ type SystemScheduler struct { // NewSystemScheduler is a factory function to instantiate a new system // scheduler. -func NewSystemScheduler(logger log.Logger, state State, planner Planner) Scheduler { +func NewSystemScheduler(logger log.Logger, eventsCh chan<- interface{}, state State, planner Planner) Scheduler { return &SystemScheduler{ logger: logger.Named("system_sched"), + eventsCh: eventsCh, state: state, planner: planner, sysbatch: false, } } -func NewSysBatchScheduler(logger log.Logger, state State, planner Planner) Scheduler { +func NewSysBatchScheduler(logger log.Logger, eventsCh chan<- interface{}, state State, planner Planner) Scheduler { return &SystemScheduler{ logger: logger.Named("sysbatch_sched"), + eventsCh: eventsCh, state: state, planner: planner, sysbatch: true, @@ -136,7 +139,7 @@ func (s *SystemScheduler) process() (bool, error) { s.failedTGAllocs = nil // Create an evaluation context - s.ctx = NewEvalContext(s.state, s.plan, s.logger) + s.ctx = NewEvalContext(s.eventsCh, s.state, s.plan, s.logger) // Construct the placement stack s.stack = NewSystemStack(s.sysbatch, s.ctx) diff --git a/scheduler/scheduler_system_test.go b/scheduler/scheduler_system_test.go index ac9c3725e72..fc94a618059 100644 --- a/scheduler/scheduler_system_test.go +++ b/scheduler/scheduler_system_test.go @@ -697,10 +697,9 @@ func TestSystemSched_JobModify_InPlace(t *testing.T) { var allocs []*structs.Allocation for _, node := range nodes { - alloc := mock.Alloc() + alloc := mock.AllocForNode(node) alloc.Job = job alloc.JobID = job.ID - alloc.NodeID = node.ID alloc.Name = "my-job.web[0]" allocs = append(allocs, alloc) } @@ -1861,10 +1860,9 @@ func TestSystemSched_Preemption(t *testing.T) { Cpu: structs.AllocatedCpuResources{CpuShares: 512}, Memory: structs.AllocatedMemoryResources{MemoryMB: 1024}, Networks: []*structs.NetworkResource{{ - Device: "eth0", - IP: "192.168.0.100", - ReservedPorts: []structs.Port{{Label: "web", Value: 80}}, - MBits: 200, + Device: "eth0", + IP: "192.168.0.100", + MBits: 200, }}, }, }, @@ -1927,10 +1925,9 @@ func TestSystemSched_Preemption(t *testing.T) { Cpu: structs.AllocatedCpuResources{CpuShares: 1024}, Memory: structs.AllocatedMemoryResources{MemoryMB: 25}, Networks: []*structs.NetworkResource{{ - Device: "eth0", - IP: "192.168.0.100", - ReservedPorts: []structs.Port{{Label: "web", Value: 80}}, - MBits: 400, + Device: "eth0", + IP: "192.168.0.100", + MBits: 400, }}, }, }, diff --git a/scheduler/testing.go b/scheduler/testing.go index ba3c46273ca..c3037f07538 100644 --- a/scheduler/testing.go +++ b/scheduler/testing.go @@ -262,7 +262,19 @@ func (h *Harness) Snapshot() State { // a snapshot of current state using the harness for planning. func (h *Harness) Scheduler(factory Factory) Scheduler { logger := testlog.HCLogger(h.t) - return factory(logger, h.Snapshot(), h) + eventsCh := make(chan interface{}) + + // Listen for and log events from the scheduler. + go func() { + for e := range eventsCh { + switch event := e.(type) { + case *PortCollisionEvent: + h.t.Errorf("unexpected worker eval event: %v", event.Reason) + } + } + }() + + return factory(logger, eventsCh, h.Snapshot(), h) } // Process is used to process an evaluation given a factory