Skip to content

Commit

Permalink
Merge pull request #828 from hashicorp/f-gc-endpoint
Browse files Browse the repository at this point in the history
Job GC endpoint
  • Loading branch information
dadgar committed Feb 21, 2016
2 parents d9059c5 + 902c14b commit 9d8d8fe
Show file tree
Hide file tree
Showing 12 changed files with 391 additions and 10 deletions.
17 changes: 17 additions & 0 deletions api/system.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package api

// Status is used to query the status-related endpoints.
type System struct {
client *Client
}

// System returns a handle on the system endpoints.
func (c *Client) System() *System {
return &System{client: c}
}

func (s *System) GarbageCollect() error {
var req struct{}
_, err := s.client.write("/v1/system/gc", &req, nil, nil)
return err
}
14 changes: 14 additions & 0 deletions api/system_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package api

import (
"testing"
)

func TestSystem_GarbageCollect(t *testing.T) {
c, s := makeClient(t, nil, nil)
defer s.Stop()
e := c.System()
if err := e.GarbageCollect(); err != nil {
t.Fatal(err)
}
}
2 changes: 2 additions & 0 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/status/leader", s.wrap(s.StatusLeaderRequest))
s.mux.HandleFunc("/v1/status/peers", s.wrap(s.StatusPeersRequest))

s.mux.HandleFunc("/v1/system/gc", s.wrap(s.GarbageCollectRequest))

if enableDebug {
s.mux.HandleFunc("/debug/pprof/", pprof.Index)
s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
Expand Down
24 changes: 24 additions & 0 deletions command/agent/system_endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package agent

import (
"net/http"

"github.com/hashicorp/nomad/nomad/structs"
)

func (s *HTTPServer) GarbageCollectRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "PUT" {
return nil, CodedError(405, ErrInvalidMethod)
}

var args structs.GenericRequest
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}

var gResp structs.GenericResponse
if err := s.agent.RPC("System.GarbageCollect", &args, &gResp); err != nil {
return nil, err
}
return nil, nil
}
23 changes: 23 additions & 0 deletions command/agent/system_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package agent

import (
"net/http"
"net/http/httptest"
"testing"
)

func TestHTTP_SystemGarbageCollect(t *testing.T) {
httpTest(t, nil, func(s *TestServer) {
// Make the HTTP request
req, err := http.NewRequest("PUT", "/v1/system/gc", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
respW := httptest.NewRecorder()

// Make the request
if _, err := s.Server.GarbageCollectRequest(respW, req); err != nil {
t.Fatalf("err: %v", err)
}
})
}
37 changes: 27 additions & 10 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nomad

import (
"fmt"
"math"
"time"

"github.com/hashicorp/nomad/nomad/state"
Expand Down Expand Up @@ -48,10 +49,18 @@ func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error {
return err
}

// Get the time table to calculate GC cutoffs.
tt := c.srv.fsm.TimeTable()
cutoff := time.Now().UTC().Add(-1 * c.srv.config.JobGCThreshold)
oldThreshold := tt.NearestIndex(cutoff)
var oldThreshold uint64
if eval.TriggeredBy == structs.EvalTriggerForceGC {
// The GC was forced, so set the threshold to its maximum so everything
// will GC.
oldThreshold = math.MaxUint64
c.srv.logger.Println("[DEBUG] sched.core: forced job GC")
} else {
// Get the time table to calculate GC cutoffs.
tt := c.srv.fsm.TimeTable()
cutoff := time.Now().UTC().Add(-1 * c.srv.config.JobGCThreshold)
oldThreshold = tt.NearestIndex(cutoff)
}
c.srv.logger.Printf("[DEBUG] sched.core: job GC: scanning before index %d (%v)",
oldThreshold, c.srv.config.JobGCThreshold)

Expand Down Expand Up @@ -125,12 +134,20 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
return err
}

// Compute the old threshold limit for GC using the FSM
// time table. This is a rough mapping of a time to the
// Raft index it belongs to.
tt := c.srv.fsm.TimeTable()
cutoff := time.Now().UTC().Add(-1 * c.srv.config.EvalGCThreshold)
oldThreshold := tt.NearestIndex(cutoff)
var oldThreshold uint64
if eval.TriggeredBy == structs.EvalTriggerForceGC {
// The GC was forced, so set the threshold to its maximum so everything
// will GC.
oldThreshold = math.MaxUint64
c.srv.logger.Println("[DEBUG] sched.core: forced eval GC")
} else {
// Compute the old threshold limit for GC using the FSM
// time table. This is a rough mapping of a time to the
// Raft index it belongs to.
tt := c.srv.fsm.TimeTable()
cutoff := time.Now().UTC().Add(-1 * c.srv.config.EvalGCThreshold)
oldThreshold = tt.NearestIndex(cutoff)
}
c.srv.logger.Printf("[DEBUG] sched.core: eval GC: scanning before index %d (%v)",
oldThreshold, c.srv.config.EvalGCThreshold)

Expand Down
195 changes: 195 additions & 0 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,61 @@ func TestCoreScheduler_EvalGC(t *testing.T) {
}
}

func TestCoreScheduler_EvalGC_Force(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

// Insert "dead" eval
state := s1.fsm.State()
eval := mock.Eval()
eval.Status = structs.EvalStatusFailed
err := state.UpsertEvals(1000, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("err: %v", err)
}

// Insert "dead" alloc
alloc := mock.Alloc()
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
err = state.UpsertAllocs(1001, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}

// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.forceCoreJobEval(structs.CoreJobEvalGC)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}

// Should be gone
out, err := state.EvalByID(eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %v", out)
}

outA, err := state.AllocByID(alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA != nil {
t.Fatalf("bad: %v", outA)
}
}

func TestCoreScheduler_NodeGC(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
Expand Down Expand Up @@ -112,6 +167,45 @@ func TestCoreScheduler_NodeGC(t *testing.T) {
}
}

func TestCoreScheduler_NodeGC_Force(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

// Insert "dead" node
state := s1.fsm.State()
node := mock.Node()
node.Status = structs.NodeStatusDown
err := state.UpsertNode(1000, node)
if err != nil {
t.Fatalf("err: %v", err)
}

// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.forceCoreJobEval(structs.CoreJobNodeGC)
gc.ModifyIndex = 2000
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}

// Should be gone
out, err := state.NodeByID(node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %v", out)
}
}

func TestCoreScheduler_JobGC(t *testing.T) {
tests := []struct {
test, evalStatus, allocStatus string
Expand Down Expand Up @@ -215,3 +309,104 @@ func TestCoreScheduler_JobGC(t *testing.T) {
}
}
}

func TestCoreScheduler_JobGC_Force(t *testing.T) {
tests := []struct {
test, evalStatus, allocStatus string
shouldExist bool
}{
{
test: "Terminal",
evalStatus: structs.EvalStatusFailed,
allocStatus: structs.AllocDesiredStatusFailed,
shouldExist: false,
},
{
test: "Has Alloc",
evalStatus: structs.EvalStatusFailed,
allocStatus: structs.AllocDesiredStatusRun,
shouldExist: true,
},
{
test: "Has Eval",
evalStatus: structs.EvalStatusPending,
allocStatus: structs.AllocDesiredStatusFailed,
shouldExist: true,
},
}

for _, test := range tests {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

// Insert job.
state := s1.fsm.State()
job := mock.Job()
job.GC = true
err := state.UpsertJob(1000, job)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}

// Insert eval
eval := mock.Eval()
eval.JobID = job.ID
eval.Status = test.evalStatus
err = state.UpsertEvals(1001, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}

// Insert alloc
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = test.allocStatus
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}

// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.forceCoreJobEval(structs.CoreJobJobGC)
gc.ModifyIndex = 2000
err = core.Process(gc)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}

// Should still exist
out, err := state.JobByID(job.ID)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
if (test.shouldExist && out == nil) || (!test.shouldExist && out != nil) {
t.Fatalf("test(%s) bad: %v", test.test, out)
}

outE, err := state.EvalByID(eval.ID)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
if (test.shouldExist && outE == nil) || (!test.shouldExist && outE != nil) {
t.Fatalf("test(%s) bad: %v", test.test, out)
}

outA, err := state.AllocByID(alloc.ID)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
if (test.shouldExist && outA == nil) || (!test.shouldExist && outA != nil) {
t.Fatalf("test(%s) bad: %v", test.test, outA)
}
}
}

8 changes: 8 additions & 0 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,14 @@ func (s *Server) coreJobEval(job string) *structs.Evaluation {
}
}

// forceCoreJobEval returns an evaluation for a core job that will ignore GC
// cutoffs.
func (s *Server) forceCoreJobEval(job string) *structs.Evaluation {
eval := s.coreJobEval(job)
eval.TriggeredBy = structs.EvalTriggerForceGC
return eval
}

// reapFailedEvaluations is used to reap evaluations that
// have reached their delivery limit and should be failed
func (s *Server) reapFailedEvaluations(stopCh chan struct{}) {
Expand Down
Loading

0 comments on commit 9d8d8fe

Please sign in to comment.