Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: detect and log unexpected scheduling collisions #11793

Merged
merged 29 commits into from
Jan 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
67eaae6
initial work on port collision detection
lgfa29 Jan 6, 2022
86f90e9
pass copy on scheduler worker events and gc old entries
lgfa29 Jan 11, 2022
520febd
fix test for new network index methods signature
lgfa29 Jan 11, 2022
8de0caf
add tests for new Copy() methods
lgfa29 Jan 11, 2022
1cfa5d8
godoc for scheduler.Context EventsCh
lgfa29 Jan 11, 2022
7383f08
log collision reason on AllocsFit
lgfa29 Jan 11, 2022
edc529d
fix SetNode collision check logic to match current implementation
lgfa29 Jan 11, 2022
318f93e
exhaust node on port collision
lgfa29 Jan 11, 2022
676b549
fix NoopScheduler and NewEvalContext test
lgfa29 Jan 11, 2022
547eb10
fix scheduler tests that had network port collisions
lgfa29 Jan 12, 2022
2d0d7ea
fix test for agent command
lgfa29 Jan 12, 2022
84bfecf
revert mock.Alloc to have reserved ports
lgfa29 Jan 12, 2022
ce2e15c
fix linter and broken test
lgfa29 Jan 12, 2022
ec08415
rename EvalEvent -> PortCollisionEvent and add tests port collision d…
lgfa29 Jan 13, 2022
a089c7a
use testutil.TestMultiplier for binpack iterator tests
lgfa29 Jan 13, 2022
e1a57a6
add logic to copy NodeNetworks when copying NodeResources
lgfa29 Jan 13, 2022
dbebd4f
address comments from code review
lgfa29 Jan 13, 2022
e9c72f0
improve log message for unexpected node port collisions and dedent th…
lgfa29 Jan 13, 2022
3f76756
re-order some Copy method to match the struct order and add nil checks
lgfa29 Jan 13, 2022
60b8a1d
small comment updates
lgfa29 Jan 13, 2022
574102c
remove failing tests
lgfa29 Jan 13, 2022
56946d4
can't Fatalf from goroutine
lgfa29 Jan 13, 2022
3c32065
add nomad.plan.node_rejected metric
lgfa29 Jan 14, 2022
60ccce4
set planner scheduler events channel
lgfa29 Jan 14, 2022
b08a423
refactor scheduler event interface
lgfa29 Jan 14, 2022
cbc1528
fix NetworkIndex Copy
lgfa29 Jan 14, 2022
835b26b
sanitize PortCollisionEvent at log time
lgfa29 Jan 14, 2022
fc0e144
use CopySkipJob when sanitizing PortCollisionEvent
lgfa29 Jan 15, 2022
d2fdd01
fix linter
lgfa29 Jan 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1801,7 +1801,7 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
// Ignore eval event creation during snapshot restore
snap.UpsertEvals(structs.IgnoreUnknownTypeFlag, 100, []*structs.Evaluation{eval})
// Create the scheduler and run it
sched, err := scheduler.NewScheduler(eval.Type, n.logger, snap, planner)
sched, err := scheduler.NewScheduler(eval.Type, n.logger, nil, snap, planner)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1785,7 +1785,7 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse)
}

// Create the scheduler and run it
sched, err := scheduler.NewScheduler(eval.Type, j.logger, snap, planner)
sched, err := scheduler.NewScheduler(eval.Type, j.logger, j.srv.workersEventCh, snap, planner)
if err != nil {
return err
}
Expand Down
53 changes: 53 additions & 0 deletions nomad/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mock

import (
"fmt"
"math/rand"
"time"

"github.com/hashicorp/nomad/helper"
Expand Down Expand Up @@ -1349,6 +1350,58 @@ func Alloc() *structs.Allocation {
return alloc
}

func AllocWithoutReservedPort() *structs.Allocation {
alloc := Alloc()
alloc.Resources.Networks[0].ReservedPorts = nil
alloc.TaskResources["web"].Networks[0].ReservedPorts = nil
alloc.AllocatedResources.Tasks["web"].Networks[0].ReservedPorts = nil

return alloc
}

func AllocForNode(n *structs.Node) *structs.Allocation {
nodeIP := n.NodeResources.NodeNetworks[0].Addresses[0].Address

dynamicPortRange := structs.DefaultMaxDynamicPort - structs.DefaultMinDynamicPort
randomDynamicPort := rand.Intn(dynamicPortRange) + structs.DefaultMinDynamicPort

alloc := Alloc()
alloc.NodeID = n.ID

// Set node IP address.
alloc.Resources.Networks[0].IP = nodeIP
alloc.TaskResources["web"].Networks[0].IP = nodeIP
alloc.AllocatedResources.Tasks["web"].Networks[0].IP = nodeIP

// Set dynamic port to a random value.
alloc.TaskResources["web"].Networks[0].DynamicPorts = []structs.Port{{Label: "http", Value: randomDynamicPort}}
alloc.AllocatedResources.Tasks["web"].Networks[0].DynamicPorts = []structs.Port{{Label: "http", Value: randomDynamicPort}}

return alloc

}

func AllocForNodeWithoutReservedPort(n *structs.Node) *structs.Allocation {
nodeIP := n.NodeResources.NodeNetworks[0].Addresses[0].Address

dynamicPortRange := structs.DefaultMaxDynamicPort - structs.DefaultMinDynamicPort
randomDynamicPort := rand.Intn(dynamicPortRange) + structs.DefaultMinDynamicPort

alloc := AllocWithoutReservedPort()
alloc.NodeID = n.ID

// Set node IP address.
alloc.Resources.Networks[0].IP = nodeIP
alloc.TaskResources["web"].Networks[0].IP = nodeIP
alloc.AllocatedResources.Tasks["web"].Networks[0].IP = nodeIP

// Set dynamic port to a random value.
alloc.TaskResources["web"].Networks[0].DynamicPorts = []structs.Port{{Label: "http", Value: randomDynamicPort}}
alloc.AllocatedResources.Tasks["web"].Networks[0].DynamicPorts = []structs.Port{{Label: "http", Value: randomDynamicPort}}

return alloc
}

// ConnectAlloc adds a Connect proxy sidecar group service to mock.Alloc.
func ConnectAlloc() *structs.Allocation {
alloc := Alloc()
Expand Down
2 changes: 2 additions & 0 deletions nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,8 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan
return true
}
if !fit {
metrics.IncrCounterWithLabels([]string{"nomad", "plan", "node_rejected"}, 1, []metrics.Label{{Name: "node_id", Value: nodeID}})

// Log the reason why the node's allocations could not be made
if reason != "" {
//TODO This was debug level and should return
Expand Down
48 changes: 48 additions & 0 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -229,6 +230,7 @@ type Server struct {
workers []*Worker
workerLock sync.RWMutex
workerConfigLock sync.RWMutex
workersEventCh chan interface{}

// aclCache is used to maintain the parsed ACL objects
aclCache *lru.TwoQueueCache
Expand Down Expand Up @@ -344,6 +346,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr
blockedEvals: NewBlockedEvals(evalBroker, logger),
rpcTLS: incomingTLS,
aclCache: aclCache,
workersEventCh: make(chan interface{}, 1),
}

s.shutdownCtx, s.shutdownCancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -1581,6 +1584,8 @@ func reloadSchedulers(s *Server, newArgs *SchedulerWorkerPoolArgs) {
func (s *Server) setupWorkers(ctx context.Context) error {
poolArgs := s.GetSchedulerWorkerConfig()

go s.listenWorkerEvents()

// we will be writing to the worker slice
s.workerLock.Lock()
defer s.workerLock.Unlock()
Expand Down Expand Up @@ -1665,6 +1670,49 @@ func (s *Server) stopOldWorkers(oldWorkers []*Worker) {
}
}

// listenWorkerEvents listens for events emitted by scheduler workers and log
// them if necessary. Some events may be skipped to avoid polluting logs with
// duplicates.
func (s *Server) listenWorkerEvents() {
loggedAt := make(map[string]time.Time)

gcDeadline := 4 * time.Hour
gcTicker := time.NewTicker(10 * time.Second)
defer gcTicker.Stop()

for {
select {
case <-gcTicker.C:
for k, v := range loggedAt {
if time.Since(v) >= gcDeadline {
delete(loggedAt, k)
}
}
case e := <-s.workersEventCh:
switch event := e.(type) {
case *scheduler.PortCollisionEvent:
if event == nil || event.Node == nil {
continue
}

if _, ok := loggedAt[event.Node.ID]; ok {
continue
}

eventJson, err := json.Marshal(event.Sanitize())
if err != nil {
s.logger.Debug("failed to encode event to JSON", "error", err)
}
s.logger.Warn("unexpected node port collision, refer to https://www.nomadproject.io/s/port-plan-failure for more information",
"node_id", event.Node.ID, "reason", event.Reason, "event", string(eventJson))
loggedAt[event.Node.ID] = time.Now()
}
case <-s.shutdownCh:
return
}
}
}

// numPeers is used to check on the number of known peers, including the local
// node.
func (s *Server) numPeers() (int, error) {
Expand Down
8 changes: 6 additions & 2 deletions nomad/structs/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,12 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevi
if netIdx == nil {
netIdx = NewNetworkIndex()
defer netIdx.Release()
if netIdx.SetNode(node) || netIdx.AddAllocs(allocs) {
return false, "reserved port collision", used, nil

if collision, reason := netIdx.SetNode(node); collision {
return false, fmt.Sprintf("reserved node port collision: %v", reason), used, nil
}
if collision, reason := netIdx.AddAllocs(allocs); collision {
return false, fmt.Sprintf("reserved alloc port collision: %v", reason), used, nil
}
}

Expand Down
Loading