Skip to content

Commit

Permalink
scheduler: detect and log unexpected scheduling collisions (#11793)
Browse files Browse the repository at this point in the history
  • Loading branch information
lgfa29 authored Jan 15, 2022
1 parent fcd86e4 commit 8a427a4
Show file tree
Hide file tree
Showing 23 changed files with 855 additions and 107 deletions.
2 changes: 1 addition & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1801,7 +1801,7 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
// Ignore eval event creation during snapshot restore
snap.UpsertEvals(structs.IgnoreUnknownTypeFlag, 100, []*structs.Evaluation{eval})
// Create the scheduler and run it
sched, err := scheduler.NewScheduler(eval.Type, n.logger, snap, planner)
sched, err := scheduler.NewScheduler(eval.Type, n.logger, nil, snap, planner)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1785,7 +1785,7 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse)
}

// Create the scheduler and run it
sched, err := scheduler.NewScheduler(eval.Type, j.logger, snap, planner)
sched, err := scheduler.NewScheduler(eval.Type, j.logger, j.srv.workersEventCh, snap, planner)
if err != nil {
return err
}
Expand Down
53 changes: 53 additions & 0 deletions nomad/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mock

import (
"fmt"
"math/rand"
"time"

"github.com/hashicorp/nomad/helper"
Expand Down Expand Up @@ -1349,6 +1350,58 @@ func Alloc() *structs.Allocation {
return alloc
}

func AllocWithoutReservedPort() *structs.Allocation {
alloc := Alloc()
alloc.Resources.Networks[0].ReservedPorts = nil
alloc.TaskResources["web"].Networks[0].ReservedPorts = nil
alloc.AllocatedResources.Tasks["web"].Networks[0].ReservedPorts = nil

return alloc
}

func AllocForNode(n *structs.Node) *structs.Allocation {
nodeIP := n.NodeResources.NodeNetworks[0].Addresses[0].Address

dynamicPortRange := structs.DefaultMaxDynamicPort - structs.DefaultMinDynamicPort
randomDynamicPort := rand.Intn(dynamicPortRange) + structs.DefaultMinDynamicPort

alloc := Alloc()
alloc.NodeID = n.ID

// Set node IP address.
alloc.Resources.Networks[0].IP = nodeIP
alloc.TaskResources["web"].Networks[0].IP = nodeIP
alloc.AllocatedResources.Tasks["web"].Networks[0].IP = nodeIP

// Set dynamic port to a random value.
alloc.TaskResources["web"].Networks[0].DynamicPorts = []structs.Port{{Label: "http", Value: randomDynamicPort}}
alloc.AllocatedResources.Tasks["web"].Networks[0].DynamicPorts = []structs.Port{{Label: "http", Value: randomDynamicPort}}

return alloc

}

func AllocForNodeWithoutReservedPort(n *structs.Node) *structs.Allocation {
nodeIP := n.NodeResources.NodeNetworks[0].Addresses[0].Address

dynamicPortRange := structs.DefaultMaxDynamicPort - structs.DefaultMinDynamicPort
randomDynamicPort := rand.Intn(dynamicPortRange) + structs.DefaultMinDynamicPort

alloc := AllocWithoutReservedPort()
alloc.NodeID = n.ID

// Set node IP address.
alloc.Resources.Networks[0].IP = nodeIP
alloc.TaskResources["web"].Networks[0].IP = nodeIP
alloc.AllocatedResources.Tasks["web"].Networks[0].IP = nodeIP

// Set dynamic port to a random value.
alloc.TaskResources["web"].Networks[0].DynamicPorts = []structs.Port{{Label: "http", Value: randomDynamicPort}}
alloc.AllocatedResources.Tasks["web"].Networks[0].DynamicPorts = []structs.Port{{Label: "http", Value: randomDynamicPort}}

return alloc
}

// ConnectAlloc adds a Connect proxy sidecar group service to mock.Alloc.
func ConnectAlloc() *structs.Allocation {
alloc := Alloc()
Expand Down
2 changes: 2 additions & 0 deletions nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,8 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan
return true
}
if !fit {
metrics.IncrCounterWithLabels([]string{"nomad", "plan", "node_rejected"}, 1, []metrics.Label{{Name: "node_id", Value: nodeID}})

// Log the reason why the node's allocations could not be made
if reason != "" {
//TODO This was debug level and should return
Expand Down
48 changes: 48 additions & 0 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -229,6 +230,7 @@ type Server struct {
workers []*Worker
workerLock sync.RWMutex
workerConfigLock sync.RWMutex
workersEventCh chan interface{}

// aclCache is used to maintain the parsed ACL objects
aclCache *lru.TwoQueueCache
Expand Down Expand Up @@ -344,6 +346,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr
blockedEvals: NewBlockedEvals(evalBroker, logger),
rpcTLS: incomingTLS,
aclCache: aclCache,
workersEventCh: make(chan interface{}, 1),
}

s.shutdownCtx, s.shutdownCancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -1581,6 +1584,8 @@ func reloadSchedulers(s *Server, newArgs *SchedulerWorkerPoolArgs) {
func (s *Server) setupWorkers(ctx context.Context) error {
poolArgs := s.GetSchedulerWorkerConfig()

go s.listenWorkerEvents()

// we will be writing to the worker slice
s.workerLock.Lock()
defer s.workerLock.Unlock()
Expand Down Expand Up @@ -1665,6 +1670,49 @@ func (s *Server) stopOldWorkers(oldWorkers []*Worker) {
}
}

// listenWorkerEvents listens for events emitted by scheduler workers and log
// them if necessary. Some events may be skipped to avoid polluting logs with
// duplicates.
func (s *Server) listenWorkerEvents() {
loggedAt := make(map[string]time.Time)

gcDeadline := 4 * time.Hour
gcTicker := time.NewTicker(10 * time.Second)
defer gcTicker.Stop()

for {
select {
case <-gcTicker.C:
for k, v := range loggedAt {
if time.Since(v) >= gcDeadline {
delete(loggedAt, k)
}
}
case e := <-s.workersEventCh:
switch event := e.(type) {
case *scheduler.PortCollisionEvent:
if event == nil || event.Node == nil {
continue
}

if _, ok := loggedAt[event.Node.ID]; ok {
continue
}

eventJson, err := json.Marshal(event.Sanitize())
if err != nil {
s.logger.Debug("failed to encode event to JSON", "error", err)
}
s.logger.Warn("unexpected node port collision, refer to https://www.nomadproject.io/s/port-plan-failure for more information",
"node_id", event.Node.ID, "reason", event.Reason, "event", string(eventJson))
loggedAt[event.Node.ID] = time.Now()
}
case <-s.shutdownCh:
return
}
}
}

// numPeers is used to check on the number of known peers, including the local
// node.
func (s *Server) numPeers() (int, error) {
Expand Down
8 changes: 6 additions & 2 deletions nomad/structs/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,12 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevi
if netIdx == nil {
netIdx = NewNetworkIndex()
defer netIdx.Release()
if netIdx.SetNode(node) || netIdx.AddAllocs(allocs) {
return false, "reserved port collision", used, nil

if collision, reason := netIdx.SetNode(node); collision {
return false, fmt.Sprintf("reserved node port collision: %v", reason), used, nil
}
if collision, reason := netIdx.AddAllocs(allocs); collision {
return false, fmt.Sprintf("reserved alloc port collision: %v", reason), used, nil
}
}

Expand Down
Loading

0 comments on commit 8a427a4

Please sign in to comment.