Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job GC endpoint #828

Merged
merged 1 commit into from
Feb 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions api/system.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package api

// Status is used to query the status-related endpoints.
type System struct {
client *Client
}

// System returns a handle on the system endpoints.
func (c *Client) System() *System {
return &System{client: c}
}

func (s *System) GarbageCollect() error {
var req struct{}
_, err := s.client.write("/v1/system/gc", &req, nil, nil)
return err
}
14 changes: 14 additions & 0 deletions api/system_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package api

import (
"testing"
)

func TestSystem_GarbageCollect(t *testing.T) {
c, s := makeClient(t, nil, nil)
defer s.Stop()
e := c.System()
if err := e.GarbageCollect(); err != nil {
t.Fatal(err)
}
}
2 changes: 2 additions & 0 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/status/leader", s.wrap(s.StatusLeaderRequest))
s.mux.HandleFunc("/v1/status/peers", s.wrap(s.StatusPeersRequest))

s.mux.HandleFunc("/v1/system/gc", s.wrap(s.GarbageCollectRequest))

if enableDebug {
s.mux.HandleFunc("/debug/pprof/", pprof.Index)
s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
Expand Down
24 changes: 24 additions & 0 deletions command/agent/system_endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package agent

import (
"net/http"

"github.com/hashicorp/nomad/nomad/structs"
)

func (s *HTTPServer) GarbageCollectRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "PUT" {
return nil, CodedError(405, ErrInvalidMethod)
}

var args structs.GenericRequest
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}

var gResp structs.GenericResponse
if err := s.agent.RPC("System.GarbageCollect", &args, &gResp); err != nil {
return nil, err
}
return nil, nil
}
23 changes: 23 additions & 0 deletions command/agent/system_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package agent

import (
"net/http"
"net/http/httptest"
"testing"
)

func TestHTTP_SystemGarbageCollect(t *testing.T) {
httpTest(t, nil, func(s *TestServer) {
// Make the HTTP request
req, err := http.NewRequest("PUT", "/v1/system/gc", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
respW := httptest.NewRecorder()

// Make the request
if _, err := s.Server.GarbageCollectRequest(respW, req); err != nil {
t.Fatalf("err: %v", err)
}
})
}
37 changes: 27 additions & 10 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nomad

import (
"fmt"
"math"
"time"

"github.com/hashicorp/nomad/nomad/state"
Expand Down Expand Up @@ -48,10 +49,18 @@ func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error {
return err
}

// Get the time table to calculate GC cutoffs.
tt := c.srv.fsm.TimeTable()
cutoff := time.Now().UTC().Add(-1 * c.srv.config.JobGCThreshold)
oldThreshold := tt.NearestIndex(cutoff)
var oldThreshold uint64
if eval.TriggeredBy == structs.EvalTriggerForceGC {
// The GC was forced, so set the threshold to its maximum so everything
// will GC.
oldThreshold = math.MaxUint64
c.srv.logger.Println("[DEBUG] sched.core: forced job GC")
} else {
// Get the time table to calculate GC cutoffs.
tt := c.srv.fsm.TimeTable()
cutoff := time.Now().UTC().Add(-1 * c.srv.config.JobGCThreshold)
oldThreshold = tt.NearestIndex(cutoff)
}
c.srv.logger.Printf("[DEBUG] sched.core: job GC: scanning before index %d (%v)",
oldThreshold, c.srv.config.JobGCThreshold)

Expand Down Expand Up @@ -125,12 +134,20 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
return err
}

// Compute the old threshold limit for GC using the FSM
// time table. This is a rough mapping of a time to the
// Raft index it belongs to.
tt := c.srv.fsm.TimeTable()
cutoff := time.Now().UTC().Add(-1 * c.srv.config.EvalGCThreshold)
oldThreshold := tt.NearestIndex(cutoff)
var oldThreshold uint64
if eval.TriggeredBy == structs.EvalTriggerForceGC {
// The GC was forced, so set the threshold to its maximum so everything
// will GC.
oldThreshold = math.MaxUint64
c.srv.logger.Println("[DEBUG] sched.core: forced eval GC")
} else {
// Compute the old threshold limit for GC using the FSM
// time table. This is a rough mapping of a time to the
// Raft index it belongs to.
tt := c.srv.fsm.TimeTable()
cutoff := time.Now().UTC().Add(-1 * c.srv.config.EvalGCThreshold)
oldThreshold = tt.NearestIndex(cutoff)
}
c.srv.logger.Printf("[DEBUG] sched.core: eval GC: scanning before index %d (%v)",
oldThreshold, c.srv.config.EvalGCThreshold)

Expand Down
195 changes: 195 additions & 0 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,61 @@ func TestCoreScheduler_EvalGC(t *testing.T) {
}
}

func TestCoreScheduler_EvalGC_Force(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

// Insert "dead" eval
state := s1.fsm.State()
eval := mock.Eval()
eval.Status = structs.EvalStatusFailed
err := state.UpsertEvals(1000, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("err: %v", err)
}

// Insert "dead" alloc
alloc := mock.Alloc()
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
err = state.UpsertAllocs(1001, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}

// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.forceCoreJobEval(structs.CoreJobEvalGC)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}

// Should be gone
out, err := state.EvalByID(eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %v", out)
}

outA, err := state.AllocByID(alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA != nil {
t.Fatalf("bad: %v", outA)
}
}

func TestCoreScheduler_NodeGC(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
Expand Down Expand Up @@ -112,6 +167,45 @@ func TestCoreScheduler_NodeGC(t *testing.T) {
}
}

func TestCoreScheduler_NodeGC_Force(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

// Insert "dead" node
state := s1.fsm.State()
node := mock.Node()
node.Status = structs.NodeStatusDown
err := state.UpsertNode(1000, node)
if err != nil {
t.Fatalf("err: %v", err)
}

// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.forceCoreJobEval(structs.CoreJobNodeGC)
gc.ModifyIndex = 2000
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}

// Should be gone
out, err := state.NodeByID(node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %v", out)
}
}

func TestCoreScheduler_JobGC(t *testing.T) {
tests := []struct {
test, evalStatus, allocStatus string
Expand Down Expand Up @@ -215,3 +309,104 @@ func TestCoreScheduler_JobGC(t *testing.T) {
}
}
}

func TestCoreScheduler_JobGC_Force(t *testing.T) {
tests := []struct {
test, evalStatus, allocStatus string
shouldExist bool
}{
{
test: "Terminal",
evalStatus: structs.EvalStatusFailed,
allocStatus: structs.AllocDesiredStatusFailed,
shouldExist: false,
},
{
test: "Has Alloc",
evalStatus: structs.EvalStatusFailed,
allocStatus: structs.AllocDesiredStatusRun,
shouldExist: true,
},
{
test: "Has Eval",
evalStatus: structs.EvalStatusPending,
allocStatus: structs.AllocDesiredStatusFailed,
shouldExist: true,
},
}

for _, test := range tests {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

// Insert job.
state := s1.fsm.State()
job := mock.Job()
job.GC = true
err := state.UpsertJob(1000, job)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}

// Insert eval
eval := mock.Eval()
eval.JobID = job.ID
eval.Status = test.evalStatus
err = state.UpsertEvals(1001, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}

// Insert alloc
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = test.allocStatus
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}

// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.forceCoreJobEval(structs.CoreJobJobGC)
gc.ModifyIndex = 2000
err = core.Process(gc)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}

// Should still exist
out, err := state.JobByID(job.ID)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
if (test.shouldExist && out == nil) || (!test.shouldExist && out != nil) {
t.Fatalf("test(%s) bad: %v", test.test, out)
}

outE, err := state.EvalByID(eval.ID)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
if (test.shouldExist && outE == nil) || (!test.shouldExist && outE != nil) {
t.Fatalf("test(%s) bad: %v", test.test, out)
}

outA, err := state.AllocByID(alloc.ID)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
if (test.shouldExist && outA == nil) || (!test.shouldExist && outA != nil) {
t.Fatalf("test(%s) bad: %v", test.test, outA)
}
}
}

8 changes: 8 additions & 0 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,14 @@ func (s *Server) coreJobEval(job string) *structs.Evaluation {
}
}

// forceCoreJobEval returns an evaluation for a core job that will ignore GC
// cutoffs.
func (s *Server) forceCoreJobEval(job string) *structs.Evaluation {
eval := s.coreJobEval(job)
eval.TriggeredBy = structs.EvalTriggerForceGC
return eval
}

// reapFailedEvaluations is used to reap evaluations that
// have reached their delivery limit and should be failed
func (s *Server) reapFailedEvaluations(stopCh chan struct{}) {
Expand Down
Loading