diff --git a/api/system.go b/api/system.go new file mode 100644 index 00000000000..3b1bf9ae9dc --- /dev/null +++ b/api/system.go @@ -0,0 +1,17 @@ +package api + +// Status is used to query the status-related endpoints. +type System struct { + client *Client +} + +// System returns a handle on the system endpoints. +func (c *Client) System() *System { + return &System{client: c} +} + +func (s *System) GarbageCollect() error { + var req struct{} + _, err := s.client.write("/v1/system/gc", &req, nil, nil) + return err +} diff --git a/api/system_test.go b/api/system_test.go new file mode 100644 index 00000000000..430332417e1 --- /dev/null +++ b/api/system_test.go @@ -0,0 +1,14 @@ +package api + +import ( + "testing" +) + +func TestSystem_GarbageCollect(t *testing.T) { + c, s := makeClient(t, nil, nil) + defer s.Stop() + e := c.System() + if err := e.GarbageCollect(); err != nil { + t.Fatal(err) + } +} diff --git a/command/agent/http.go b/command/agent/http.go index 6b36971faf2..b81d26e7172 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -118,6 +118,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/status/leader", s.wrap(s.StatusLeaderRequest)) s.mux.HandleFunc("/v1/status/peers", s.wrap(s.StatusPeersRequest)) + s.mux.HandleFunc("/v1/system/gc", s.wrap(s.GarbageCollectRequest)) + if enableDebug { s.mux.HandleFunc("/debug/pprof/", pprof.Index) s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) diff --git a/command/agent/system_endpoint.go b/command/agent/system_endpoint.go new file mode 100644 index 00000000000..2ba23db61e1 --- /dev/null +++ b/command/agent/system_endpoint.go @@ -0,0 +1,24 @@ +package agent + +import ( + "net/http" + + "github.com/hashicorp/nomad/nomad/structs" +) + +func (s *HTTPServer) GarbageCollectRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if req.Method != "PUT" { + return nil, CodedError(405, ErrInvalidMethod) + } + + var args structs.GenericRequest + if s.parse(resp, req, &args.Region, &args.QueryOptions) { + return nil, nil + } + + var gResp structs.GenericResponse + if err := s.agent.RPC("System.GarbageCollect", &args, &gResp); err != nil { + return nil, err + } + return nil, nil +} diff --git a/command/agent/system_endpoint_test.go b/command/agent/system_endpoint_test.go new file mode 100644 index 00000000000..bd3ca741894 --- /dev/null +++ b/command/agent/system_endpoint_test.go @@ -0,0 +1,23 @@ +package agent + +import ( + "net/http" + "net/http/httptest" + "testing" +) + +func TestHTTP_SystemGarbageCollect(t *testing.T) { + httpTest(t, nil, func(s *TestServer) { + // Make the HTTP request + req, err := http.NewRequest("PUT", "/v1/system/gc", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + respW := httptest.NewRecorder() + + // Make the request + if _, err := s.Server.GarbageCollectRequest(respW, req); err != nil { + t.Fatalf("err: %v", err) + } + }) +} diff --git a/nomad/core_sched.go b/nomad/core_sched.go index f557c9285c1..9c4cc4fa983 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -2,6 +2,7 @@ package nomad import ( "fmt" + "math" "time" "github.com/hashicorp/nomad/nomad/state" @@ -48,10 +49,18 @@ func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error { return err } - // Get the time table to calculate GC cutoffs. - tt := c.srv.fsm.TimeTable() - cutoff := time.Now().UTC().Add(-1 * c.srv.config.JobGCThreshold) - oldThreshold := tt.NearestIndex(cutoff) + var oldThreshold uint64 + if eval.TriggeredBy == structs.EvalTriggerForceGC { + // The GC was forced, so set the threshold to its maximum so everything + // will GC. + oldThreshold = math.MaxUint64 + c.srv.logger.Println("[DEBUG] sched.core: forced job GC") + } else { + // Get the time table to calculate GC cutoffs. + tt := c.srv.fsm.TimeTable() + cutoff := time.Now().UTC().Add(-1 * c.srv.config.JobGCThreshold) + oldThreshold = tt.NearestIndex(cutoff) + } c.srv.logger.Printf("[DEBUG] sched.core: job GC: scanning before index %d (%v)", oldThreshold, c.srv.config.JobGCThreshold) @@ -125,12 +134,20 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error { return err } - // Compute the old threshold limit for GC using the FSM - // time table. This is a rough mapping of a time to the - // Raft index it belongs to. - tt := c.srv.fsm.TimeTable() - cutoff := time.Now().UTC().Add(-1 * c.srv.config.EvalGCThreshold) - oldThreshold := tt.NearestIndex(cutoff) + var oldThreshold uint64 + if eval.TriggeredBy == structs.EvalTriggerForceGC { + // The GC was forced, so set the threshold to its maximum so everything + // will GC. + oldThreshold = math.MaxUint64 + c.srv.logger.Println("[DEBUG] sched.core: forced eval GC") + } else { + // Compute the old threshold limit for GC using the FSM + // time table. This is a rough mapping of a time to the + // Raft index it belongs to. + tt := c.srv.fsm.TimeTable() + cutoff := time.Now().UTC().Add(-1 * c.srv.config.EvalGCThreshold) + oldThreshold = tt.NearestIndex(cutoff) + } c.srv.logger.Printf("[DEBUG] sched.core: eval GC: scanning before index %d (%v)", oldThreshold, c.srv.config.EvalGCThreshold) diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index 4d71de5bc73..f5eefafd368 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -69,6 +69,61 @@ func TestCoreScheduler_EvalGC(t *testing.T) { } } +func TestCoreScheduler_EvalGC_Force(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Insert "dead" eval + state := s1.fsm.State() + eval := mock.Eval() + eval.Status = structs.EvalStatusFailed + err := state.UpsertEvals(1000, []*structs.Evaluation{eval}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Insert "dead" alloc + alloc := mock.Alloc() + alloc.EvalID = eval.ID + alloc.DesiredStatus = structs.AllocDesiredStatusFailed + err = state.UpsertAllocs(1001, []*structs.Allocation{alloc}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Create a core scheduler + snap, err := state.Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + core := NewCoreScheduler(s1, snap) + + // Attempt the GC + gc := s1.forceCoreJobEval(structs.CoreJobEvalGC) + err = core.Process(gc) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should be gone + out, err := state.EvalByID(eval.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out != nil { + t.Fatalf("bad: %v", out) + } + + outA, err := state.AllocByID(alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if outA != nil { + t.Fatalf("bad: %v", outA) + } +} + func TestCoreScheduler_NodeGC(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown() @@ -112,6 +167,45 @@ func TestCoreScheduler_NodeGC(t *testing.T) { } } +func TestCoreScheduler_NodeGC_Force(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Insert "dead" node + state := s1.fsm.State() + node := mock.Node() + node.Status = structs.NodeStatusDown + err := state.UpsertNode(1000, node) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Create a core scheduler + snap, err := state.Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + core := NewCoreScheduler(s1, snap) + + // Attempt the GC + gc := s1.forceCoreJobEval(structs.CoreJobNodeGC) + gc.ModifyIndex = 2000 + err = core.Process(gc) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should be gone + out, err := state.NodeByID(node.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out != nil { + t.Fatalf("bad: %v", out) + } +} + func TestCoreScheduler_JobGC(t *testing.T) { tests := []struct { test, evalStatus, allocStatus string @@ -215,3 +309,104 @@ func TestCoreScheduler_JobGC(t *testing.T) { } } } + +func TestCoreScheduler_JobGC_Force(t *testing.T) { + tests := []struct { + test, evalStatus, allocStatus string + shouldExist bool + }{ + { + test: "Terminal", + evalStatus: structs.EvalStatusFailed, + allocStatus: structs.AllocDesiredStatusFailed, + shouldExist: false, + }, + { + test: "Has Alloc", + evalStatus: structs.EvalStatusFailed, + allocStatus: structs.AllocDesiredStatusRun, + shouldExist: true, + }, + { + test: "Has Eval", + evalStatus: structs.EvalStatusPending, + allocStatus: structs.AllocDesiredStatusFailed, + shouldExist: true, + }, + } + + for _, test := range tests { + s1 := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Insert job. + state := s1.fsm.State() + job := mock.Job() + job.GC = true + err := state.UpsertJob(1000, job) + if err != nil { + t.Fatalf("test(%s) err: %v", test.test, err) + } + + // Insert eval + eval := mock.Eval() + eval.JobID = job.ID + eval.Status = test.evalStatus + err = state.UpsertEvals(1001, []*structs.Evaluation{eval}) + if err != nil { + t.Fatalf("test(%s) err: %v", test.test, err) + } + + // Insert alloc + alloc := mock.Alloc() + alloc.JobID = job.ID + alloc.EvalID = eval.ID + alloc.DesiredStatus = test.allocStatus + err = state.UpsertAllocs(1002, []*structs.Allocation{alloc}) + if err != nil { + t.Fatalf("test(%s) err: %v", test.test, err) + } + + // Create a core scheduler + snap, err := state.Snapshot() + if err != nil { + t.Fatalf("test(%s) err: %v", test.test, err) + } + core := NewCoreScheduler(s1, snap) + + // Attempt the GC + gc := s1.forceCoreJobEval(structs.CoreJobJobGC) + gc.ModifyIndex = 2000 + err = core.Process(gc) + if err != nil { + t.Fatalf("test(%s) err: %v", test.test, err) + } + + // Should still exist + out, err := state.JobByID(job.ID) + if err != nil { + t.Fatalf("test(%s) err: %v", test.test, err) + } + if (test.shouldExist && out == nil) || (!test.shouldExist && out != nil) { + t.Fatalf("test(%s) bad: %v", test.test, out) + } + + outE, err := state.EvalByID(eval.ID) + if err != nil { + t.Fatalf("test(%s) err: %v", test.test, err) + } + if (test.shouldExist && outE == nil) || (!test.shouldExist && outE != nil) { + t.Fatalf("test(%s) bad: %v", test.test, out) + } + + outA, err := state.AllocByID(alloc.ID) + if err != nil { + t.Fatalf("test(%s) err: %v", test.test, err) + } + if (test.shouldExist && outA == nil) || (!test.shouldExist && outA != nil) { + t.Fatalf("test(%s) bad: %v", test.test, outA) + } + } +} + diff --git a/nomad/leader.go b/nomad/leader.go index 88a62802f32..f1fdb8ef9eb 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -269,6 +269,14 @@ func (s *Server) coreJobEval(job string) *structs.Evaluation { } } +// forceCoreJobEval returns an evaluation for a core job that will ignore GC +// cutoffs. +func (s *Server) forceCoreJobEval(job string) *structs.Evaluation { + eval := s.coreJobEval(job) + eval.TriggeredBy = structs.EvalTriggerForceGC + return eval +} + // reapFailedEvaluations is used to reap evaluations that // have reached their delivery limit and should be failed func (s *Server) reapFailedEvaluations(stopCh chan struct{}) { diff --git a/nomad/server.go b/nomad/server.go index edc90f2ebe1..e377ef30482 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -144,6 +144,7 @@ type endpoints struct { Alloc *Alloc Region *Region Periodic *Periodic + System *System } // NewServer is used to construct a new Nomad server from the @@ -380,6 +381,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error { s.endpoints.Alloc = &Alloc{s} s.endpoints.Region = &Region{s} s.endpoints.Periodic = &Periodic{s} + s.endpoints.System = &System{s} // Register the handlers s.rpcServer.Register(s.endpoints.Status) @@ -390,6 +392,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error { s.rpcServer.Register(s.endpoints.Alloc) s.rpcServer.Register(s.endpoints.Region) s.rpcServer.Register(s.endpoints.Periodic) + s.rpcServer.Register(s.endpoints.System) list, err := net.ListenTCP("tcp", s.config.RPCAddr) if err != nil { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 93f43f57a4a..c798783830b 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2183,6 +2183,7 @@ const ( EvalTriggerPeriodicJob = "periodic-job" EvalTriggerNodeUpdate = "node-update" EvalTriggerScheduled = "scheduled" + EvalTriggerForceGC = "force-gc" EvalTriggerRollingUpdate = "rolling-update" ) diff --git a/nomad/system_endpoint.go b/nomad/system_endpoint.go new file mode 100644 index 00000000000..0486516777f --- /dev/null +++ b/nomad/system_endpoint.go @@ -0,0 +1,23 @@ +package nomad + +import ( + "github.com/hashicorp/nomad/nomad/structs" +) + +// System endpoint is used to call invoke system tasks. +type System struct { + srv *Server +} + +// GarbageCollect is used to trigger the system to immediately garbage collect nodes, evals +// and jobs. +func (s *System) GarbageCollect(args *structs.GenericRequest, reply *structs.GenericResponse) error { + if done, err := s.srv.forward("System.GarbageCollect", args, args, reply); done { + return err + } + + s.srv.evalBroker.Enqueue(s.srv.forceCoreJobEval(structs.CoreJobEvalGC)) + s.srv.evalBroker.Enqueue(s.srv.forceCoreJobEval(structs.CoreJobNodeGC)) + s.srv.evalBroker.Enqueue(s.srv.forceCoreJobEval(structs.CoreJobJobGC)) + return nil +} diff --git a/nomad/system_endpoint_test.go b/nomad/system_endpoint_test.go new file mode 100644 index 00000000000..746ed27e50f --- /dev/null +++ b/nomad/system_endpoint_test.go @@ -0,0 +1,54 @@ +package nomad + +import ( + "fmt" + "testing" + + "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" +) + +func TestSystemEndpoint_GarbageCollect(t *testing.T) { + //s1 := testServer(t, func(c *Config) { + //c.NumSchedulers = 0 // Prevent automatic dequeue + //}) + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Insert a job that can be GC'd + state := s1.fsm.State() + job := mock.Job() + job.GC = true + if err := state.UpsertJob(0, job); err != nil { + t.Fatalf("UpsertAllocs() failed: %v", err) + } + + // Make the GC request + req := &structs.GenericRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + }, + } + var resp structs.GenericResponse + if err := msgpackrpc.CallWithCodec(codec, "System.GarbageCollect", req, &resp); err != nil { + t.Fatalf("expect err") + } + + testutil.WaitForResult(func() (bool, error) { + // Check if the job has been GC'd + exist, err := state.JobByID(job.ID) + if err != nil { + return false, err + } + if exist != nil { + return false, fmt.Errorf("job %q wasn't garbage collected", job.ID) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) +}