Skip to content

Commit

Permalink
scheduler: detect and log unexpected scheduling collisions (#11793)
Browse files Browse the repository at this point in the history
  • Loading branch information
lgfa29 committed Jan 17, 2022
1 parent 6cf2c52 commit 21cdd93
Show file tree
Hide file tree
Showing 22 changed files with 871 additions and 104 deletions.
2 changes: 1 addition & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1729,7 +1729,7 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
// Ignore eval event creation during snapshot restore
snap.UpsertEvals(structs.IgnoreUnknownTypeFlag, 100, []*structs.Evaluation{eval})
// Create the scheduler and run it
sched, err := scheduler.NewScheduler(eval.Type, n.logger, snap, planner)
sched, err := scheduler.NewScheduler(eval.Type, n.logger, nil, snap, planner)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1770,7 +1770,7 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse)
}

// Create the scheduler and run it
sched, err := scheduler.NewScheduler(eval.Type, j.logger, snap, planner)
sched, err := scheduler.NewScheduler(eval.Type, j.logger, j.srv.workersEventCh, snap, planner)
if err != nil {
return err
}
Expand Down
53 changes: 53 additions & 0 deletions nomad/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mock

import (
"fmt"
"math/rand"
"time"

"github.com/hashicorp/nomad/helper"
Expand Down Expand Up @@ -1171,6 +1172,58 @@ func Alloc() *structs.Allocation {
return alloc
}

func AllocWithoutReservedPort() *structs.Allocation {
alloc := Alloc()
alloc.Resources.Networks[0].ReservedPorts = nil
alloc.TaskResources["web"].Networks[0].ReservedPorts = nil
alloc.AllocatedResources.Tasks["web"].Networks[0].ReservedPorts = nil

return alloc
}

func AllocForNode(n *structs.Node) *structs.Allocation {
nodeIP := n.NodeResources.NodeNetworks[0].Addresses[0].Address

dynamicPortRange := structs.MaxDynamicPort - structs.MinDynamicPort
randomDynamicPort := rand.Intn(dynamicPortRange) + structs.MinDynamicPort

alloc := Alloc()
alloc.NodeID = n.ID

// Set node IP address.
alloc.Resources.Networks[0].IP = nodeIP
alloc.TaskResources["web"].Networks[0].IP = nodeIP
alloc.AllocatedResources.Tasks["web"].Networks[0].IP = nodeIP

// Set dynamic port to a random value.
alloc.TaskResources["web"].Networks[0].DynamicPorts = []structs.Port{{Label: "http", Value: randomDynamicPort}}
alloc.AllocatedResources.Tasks["web"].Networks[0].DynamicPorts = []structs.Port{{Label: "http", Value: randomDynamicPort}}

return alloc

}

func AllocForNodeWithoutReservedPort(n *structs.Node) *structs.Allocation {
nodeIP := n.NodeResources.NodeNetworks[0].Addresses[0].Address

dynamicPortRange := structs.MaxDynamicPort - structs.MinDynamicPort
randomDynamicPort := rand.Intn(dynamicPortRange) + structs.MinDynamicPort

alloc := AllocWithoutReservedPort()
alloc.NodeID = n.ID

// Set node IP address.
alloc.Resources.Networks[0].IP = nodeIP
alloc.TaskResources["web"].Networks[0].IP = nodeIP
alloc.AllocatedResources.Tasks["web"].Networks[0].IP = nodeIP

// Set dynamic port to a random value.
alloc.TaskResources["web"].Networks[0].DynamicPorts = []structs.Port{{Label: "http", Value: randomDynamicPort}}
alloc.AllocatedResources.Tasks["web"].Networks[0].DynamicPorts = []structs.Port{{Label: "http", Value: randomDynamicPort}}

return alloc
}

// ConnectJob adds a Connect proxy sidecar group service to mock.Alloc.
func ConnectAlloc() *structs.Allocation {
alloc := Alloc()
Expand Down
2 changes: 2 additions & 0 deletions nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,8 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan
return true
}
if !fit {
metrics.IncrCounterWithLabels([]string{"nomad", "plan", "node_rejected"}, 1, []metrics.Label{{Name: "node_id", Value: nodeID}})

// Log the reason why the node's allocations could not be made
if reason != "" {
//TODO This was debug level and should return
Expand Down
50 changes: 49 additions & 1 deletion nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -226,7 +227,8 @@ type Server struct {
vault VaultClient

// Worker used for processing
workers []*Worker
workers []*Worker
workersEventCh chan interface{}

// aclCache is used to maintain the parsed ACL objects
aclCache *lru.TwoQueueCache
Expand Down Expand Up @@ -342,6 +344,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr
blockedEvals: NewBlockedEvals(evalBroker, logger),
rpcTLS: incomingTLS,
aclCache: aclCache,
workersEventCh: make(chan interface{}, 1),
}

s.shutdownCtx, s.shutdownCancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -1450,6 +1453,8 @@ func (s *Server) setupWorkers() error {
return fmt.Errorf("invalid configuration: %q scheduler not enabled", structs.JobTypeCore)
}

go s.listenWorkerEvents()

// Start the workers
for i := 0; i < s.config.NumSchedulers; i++ {
if w, err := NewWorker(s); err != nil {
Expand All @@ -1462,6 +1467,49 @@ func (s *Server) setupWorkers() error {
return nil
}

// listenWorkerEvents listens for events emitted by scheduler workers and log
// them if necessary. Some events may be skipped to avoid polluting logs with
// duplicates.
func (s *Server) listenWorkerEvents() {
loggedAt := make(map[string]time.Time)

gcDeadline := 4 * time.Hour
gcTicker := time.NewTicker(10 * time.Second)
defer gcTicker.Stop()

for {
select {
case <-gcTicker.C:
for k, v := range loggedAt {
if time.Since(v) >= gcDeadline {
delete(loggedAt, k)
}
}
case e := <-s.workersEventCh:
switch event := e.(type) {
case *scheduler.PortCollisionEvent:
if event == nil || event.Node == nil {
continue
}

if _, ok := loggedAt[event.Node.ID]; ok {
continue
}

eventJson, err := json.Marshal(event.Sanitize())
if err != nil {
s.logger.Debug("failed to encode event to JSON", "error", err)
}
s.logger.Warn("unexpected node port collision, refer to https://www.nomadproject.io/s/port-plan-failure for more information",
"node_id", event.Node.ID, "reason", event.Reason, "event", string(eventJson))
loggedAt[event.Node.ID] = time.Now()
}
case <-s.shutdownCh:
return
}
}
}

// numPeers is used to check on the number of known peers, including the local
// node.
func (s *Server) numPeers() (int, error) {
Expand Down
8 changes: 6 additions & 2 deletions nomad/structs/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,12 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevi
if netIdx == nil {
netIdx = NewNetworkIndex()
defer netIdx.Release()
if netIdx.SetNode(node) || netIdx.AddAllocs(allocs) {
return false, "reserved port collision", used, nil

if collision, reason := netIdx.SetNode(node); collision {
return false, fmt.Sprintf("reserved node port collision: %v", reason), used, nil
}
if collision, reason := netIdx.AddAllocs(allocs); collision {
return false, fmt.Sprintf("reserved alloc port collision: %v", reason), used, nil
}
}

Expand Down
Loading

0 comments on commit 21cdd93

Please sign in to comment.