Skip to content

Commit

Permalink
Merge pull request #5631 from hashicorp/b-system-sched-constraint-errors
Browse files Browse the repository at this point in the history
fix system sched constraint errors
  • Loading branch information
langmartin authored May 6, 2019
2 parents d686fdd + b122853 commit c75357c
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 57 deletions.
9 changes: 9 additions & 0 deletions dev/cluster/client1.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ client {
server_join {
retry_join = ["127.0.0.1:4647", "127.0.0.1:5647", "127.0.0.1:6647"]
}
meta {
tag = "foo"
}
}

plugin "raw_exec" {
config {
enabled = true
}
}

ports {
Expand Down
10 changes: 9 additions & 1 deletion dev/cluster/client2.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,18 @@ name = "client2"
# Enable the client
client {
enabled = true

server_join {
retry_join = ["127.0.0.1:4647", "127.0.0.1:5647", "127.0.0.1:6647"]
}
meta {
tag = "foo"
}
}

plugin "raw_exec" {
config {
enabled = true
}
}

ports {
Expand Down
29 changes: 29 additions & 0 deletions dev/cluster/client3.hcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Increase log verbosity
log_level = "DEBUG"

# Setup data dir
data_dir = "/tmp/client3"

# Give the agent a unique name. Defaults to hostname
name = "client3"

# Enable the client
client {
enabled = true
server_join {
retry_join = ["127.0.0.1:4647", "127.0.0.1:5647", "127.0.0.1:6647"]
}
meta {
tag = "bar"
}
}

plugin "raw_exec" {
config {
enabled = true
}
}

ports {
http = 9646
}
124 changes: 68 additions & 56 deletions scheduler/system_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,10 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
option := s.stack.Select(missing.TaskGroup, nil)

if option == nil {
// If nodes were filtered because of constraint mismatches and we
// If the task can't be placed on this node, update reporting data
// and continue to short circuit the loop

// If this node was filtered because of constraint mismatches and we
// couldn't create an allocation then decrementing queued for that
// task group
if s.ctx.metrics.NodesFiltered > 0 {
Expand All @@ -297,13 +300,31 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
desired := s.plan.Annotations.DesiredTGUpdates[missing.TaskGroup.Name]
desired.Place -= 1
}

// Filtered nodes are not reported to users, just omitted from the job status
continue
}

// Check if this task group has already failed
// Check if this task group has already failed, reported to the user as a count
if metric, ok := s.failedTGAllocs[missing.TaskGroup.Name]; ok {
metric.CoalescedFailures += 1
continue
}

// Store the available nodes by datacenter
s.ctx.Metrics().NodesAvailable = s.nodesByDC

// Compute top K scoring node metadata
s.ctx.Metrics().PopulateScoreMetaData()

// Lazy initialize the failed map
if s.failedTGAllocs == nil {
s.failedTGAllocs = make(map[string]*structs.AllocMetric)
}

// Actual failure to start this task on this candidate node, report it individually
s.failedTGAllocs[missing.TaskGroup.Name] = s.ctx.Metrics()
continue
}

// Store the available nodes by datacenter
Expand All @@ -313,67 +334,58 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
s.ctx.Metrics().PopulateScoreMetaData()

// Set fields based on if we found an allocation option
if option != nil {
resources := &structs.AllocatedResources{
Tasks: option.TaskResources,
Shared: structs.AllocatedSharedResources{
DiskMB: int64(missing.TaskGroup.EphemeralDisk.SizeMB),
},
}
resources := &structs.AllocatedResources{
Tasks: option.TaskResources,
Shared: structs.AllocatedSharedResources{
DiskMB: int64(missing.TaskGroup.EphemeralDisk.SizeMB),
},
}

// Create an allocation for this
alloc := &structs.Allocation{
ID: uuid.Generate(),
Namespace: s.job.Namespace,
EvalID: s.eval.ID,
Name: missing.Name,
JobID: s.job.ID,
TaskGroup: missing.TaskGroup.Name,
Metrics: s.ctx.Metrics(),
NodeID: option.Node.ID,
NodeName: option.Node.Name,
TaskResources: resources.OldTaskResources(),
AllocatedResources: resources,
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
SharedResources: &structs.Resources{
DiskMB: missing.TaskGroup.EphemeralDisk.SizeMB,
},
}
// Create an allocation for this
alloc := &structs.Allocation{
ID: uuid.Generate(),
Namespace: s.job.Namespace,
EvalID: s.eval.ID,
Name: missing.Name,
JobID: s.job.ID,
TaskGroup: missing.TaskGroup.Name,
Metrics: s.ctx.Metrics(),
NodeID: option.Node.ID,
NodeName: option.Node.Name,
TaskResources: resources.OldTaskResources(),
AllocatedResources: resources,
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
SharedResources: &structs.Resources{
DiskMB: missing.TaskGroup.EphemeralDisk.SizeMB,
},
}

// If the new allocation is replacing an older allocation then we
// set the record the older allocation id so that they are chained
if missing.Alloc != nil {
alloc.PreviousAllocation = missing.Alloc.ID
}
// If the new allocation is replacing an older allocation then we record the
// older allocation id so that they are chained
if missing.Alloc != nil {
alloc.PreviousAllocation = missing.Alloc.ID
}

// If this placement involves preemption, set DesiredState to evict for those allocations
if option.PreemptedAllocs != nil {
var preemptedAllocIDs []string
for _, stop := range option.PreemptedAllocs {
s.plan.AppendPreemptedAlloc(stop, alloc.ID)

preemptedAllocIDs = append(preemptedAllocIDs, stop.ID)
if s.eval.AnnotatePlan && s.plan.Annotations != nil {
s.plan.Annotations.PreemptedAllocs = append(s.plan.Annotations.PreemptedAllocs, stop.Stub())
if s.plan.Annotations.DesiredTGUpdates != nil {
desired := s.plan.Annotations.DesiredTGUpdates[missing.TaskGroup.Name]
desired.Preemptions += 1
}
// If this placement involves preemption, set DesiredState to evict for those allocations
if option.PreemptedAllocs != nil {
var preemptedAllocIDs []string
for _, stop := range option.PreemptedAllocs {
s.plan.AppendPreemptedAlloc(stop, alloc.ID)

preemptedAllocIDs = append(preemptedAllocIDs, stop.ID)
if s.eval.AnnotatePlan && s.plan.Annotations != nil {
s.plan.Annotations.PreemptedAllocs = append(s.plan.Annotations.PreemptedAllocs, stop.Stub())
if s.plan.Annotations.DesiredTGUpdates != nil {
desired := s.plan.Annotations.DesiredTGUpdates[missing.TaskGroup.Name]
desired.Preemptions += 1
}
}
alloc.PreemptedAllocations = preemptedAllocIDs
}

s.plan.AppendAlloc(alloc)
} else {
// Lazy initialize the failed map
if s.failedTGAllocs == nil {
s.failedTGAllocs = make(map[string]*structs.AllocMetric)
}

s.failedTGAllocs[missing.TaskGroup.Name] = s.ctx.Metrics()
alloc.PreemptedAllocations = preemptedAllocIDs
}

s.plan.AppendAlloc(alloc)
}

return nil
Expand Down
73 changes: 73 additions & 0 deletions scheduler/system_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1307,6 +1307,79 @@ func TestSystemSched_Queued_With_Constraints(t *testing.T) {
if val, ok := h.Evals[0].QueuedAllocations["web"]; !ok || val != 0 {
t.Fatalf("bad queued allocations: %#v", h.Evals[0].QueuedAllocations)
}

}

// No errors reported when constraints prevent placement
func TestSystemSched_ConstraintErrors(t *testing.T) {
h := NewHarness(t)

var node *structs.Node
// Register some nodes
// the tag "aaaaaa" is hashed so that the nodes are processed
// in an order other than good, good, bad
for _, tag := range []string{"aaaaaa", "foo", "foo", "foo"} {
node = mock.Node()
node.Meta["tag"] = tag
node.ComputeClass()
require.Nil(t, h.State.UpsertNode(h.NextIndex(), node))
}

// Mark the last node as ineligible
node.SchedulingEligibility = structs.NodeSchedulingIneligible

// Make a job with a constraint that matches a subset of the nodes
job := mock.SystemJob()
job.Constraints = append(job.Constraints,
&structs.Constraint{
LTarget: "${meta.tag}",
RTarget: "foo",
Operand: "=",
})

require.Nil(t, h.State.UpsertJob(h.NextIndex(), job))

// Evaluate the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}

require.Nil(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))
require.Nil(t, h.Process(NewSystemScheduler, eval))
require.Equal(t, "complete", h.Evals[0].Status)

// QueuedAllocations is drained
val, ok := h.Evals[0].QueuedAllocations["web"]
require.True(t, ok)
require.Equal(t, 0, val)

// The plan has two NodeAllocations
require.Equal(t, 1, len(h.Plans))
require.Nil(t, h.Plans[0].Annotations)
require.Equal(t, 2, len(h.Plans[0].NodeAllocation))

// Two nodes were allocated and are running
ws := memdb.NewWatchSet()
as, err := h.State.AllocsByJob(ws, structs.DefaultNamespace, job.ID, false)
require.Nil(t, err)

running := 0
for _, a := range as {
if "running" == a.Job.Status {
running++
}
}

require.Equal(t, 2, len(as))
require.Equal(t, 2, running)

// Failed allocations is empty
require.Equal(t, 0, len(h.Evals[0].FailedTGAllocs))
}

func TestSystemSched_ChainedAlloc(t *testing.T) {
Expand Down

0 comments on commit c75357c

Please sign in to comment.