From 410ae593e78617144ad475cc2a39a065af924331 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 1 Feb 2016 13:57:35 -0800 Subject: [PATCH 1/5] Fix double pull with introduction of AllocModifyIndex --- api/nodes.go | 12 --- api/nodes_test.go | 18 ----- client/alloc_runner.go | 18 ++++- client/alloc_runner_test.go | 19 ++--- client/client.go | 107 ++++++++++++++++++++++---- client/client_test.go | 10 ++- client/util.go | 31 ++++---- client/util_test.go | 16 ++-- command/agent/node_endpoint.go | 24 ------ command/agent/node_endpoint_test.go | 54 ------------- nomad/alloc_endpoint.go | 37 +++++++++ nomad/alloc_endpoint_test.go | 44 ++++++++++- nomad/fsm_test.go | 1 + nomad/node_endpoint.go | 4 +- nomad/node_endpoint_test.go | 2 +- nomad/state/state_store.go | 2 + nomad/structs/structs.go | 17 +++- website/source/docs/http/node.html.md | 38 --------- 18 files changed, 247 insertions(+), 207 deletions(-) diff --git a/api/nodes.go b/api/nodes.go index 44fa7ecc8f7..7d4e6dba0fb 100644 --- a/api/nodes.go +++ b/api/nodes.go @@ -61,18 +61,6 @@ func (n *Nodes) Allocations(nodeID string, q *QueryOptions) ([]*Allocation, *Que return resp, qm, nil } -// ClientAllocations is used to return a lightweight list of allocations associated with a node. -// It is primarily used by the client in order to determine which allocations actually need -// an update. -func (n *Nodes) ClientAllocations(nodeID string, q *QueryOptions) (map[string]uint64, *QueryMeta, error) { - var resp map[string]uint64 - qm, err := n.client.query("/v1/node/"+nodeID+"/clientallocations", &resp, q) - if err != nil { - return nil, nil, err - } - return resp, qm, nil -} - // ForceEvaluate is used to force-evaluate an existing node. func (n *Nodes) ForceEvaluate(nodeID string, q *WriteOptions) (string, *WriteMeta, error) { var resp nodeEvalResponse diff --git a/api/nodes_test.go b/api/nodes_test.go index 81fb30db7eb..0a57321763a 100644 --- a/api/nodes_test.go +++ b/api/nodes_test.go @@ -207,24 +207,6 @@ func TestNodes_Allocations(t *testing.T) { } } -func TestNodes_ClientAllocations(t *testing.T) { - c, s := makeClient(t, nil, nil) - defer s.Stop() - nodes := c.Nodes() - - // Looking up by a non-existent node returns nothing. We - // don't check the index here because it's possible the node - // has already registered, in which case we will get a non- - // zero result anyways. - allocs, _, err := nodes.ClientAllocations("nope", nil) - if err != nil { - t.Fatalf("err: %s", err) - } - if n := len(allocs); n != 0 { - t.Fatalf("expected 0 allocs, got: %d", n) - } -} - func TestNodes_ForceEvaluate(t *testing.T) { c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) { c.DevMode = true diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 71de749c78d..f17a420b4d3 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -38,7 +38,8 @@ type AllocRunner struct { logger *log.Logger consulService *ConsulService - alloc *structs.Allocation + alloc *structs.Allocation + allocLock sync.Mutex dirtyCh chan struct{} @@ -184,6 +185,8 @@ func (r *AllocRunner) DestroyContext() error { // Alloc returns the associated allocation func (r *AllocRunner) Alloc() *structs.Allocation { + r.allocLock.Lock() + defer r.allocLock.Unlock() return r.alloc } @@ -336,6 +339,11 @@ OUTER: for { select { case update := <-r.updateCh: + // Store the updated allocation. + r.allocLock.Lock() + r.alloc = update + r.allocLock.Unlock() + // Check if we're in a terminal status if update.TerminalStatus() { break OUTER @@ -408,6 +416,14 @@ func (r *AllocRunner) Update(update *structs.Allocation) { } } +// shouldUpdate takes the AllocModifyIndex of an allocation sent from the server and +// checks if the current running allocation is behind and should be updated. +func (r *AllocRunner) shouldUpdate(serverIndex uint64) bool { + r.allocLock.Lock() + defer r.allocLock.Unlock() + return r.alloc.AllocModifyIndex < serverIndex +} + // Destroy is used to indicate that the allocation context should be destroyed func (r *AllocRunner) Destroy() { r.destroyLock.Lock() diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 3d6c203bd1a..80a96685986 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -91,7 +91,7 @@ func TestAllocRunner_Destroy(t *testing.T) { func TestAllocRunner_Update(t *testing.T) { ctestutil.ExecCompatible(t) - upd, ar := testAllocRunner(false) + _, ar := testAllocRunner(false) // Ensure task takes some time task := ar.alloc.Job.TaskGroups[0].Tasks[0] @@ -99,27 +99,20 @@ func TestAllocRunner_Update(t *testing.T) { task.Config["args"] = []string{"10"} go ar.Run() defer ar.Destroy() - start := time.Now() // Update the alloc definition newAlloc := new(structs.Allocation) *newAlloc = *ar.alloc - newAlloc.DesiredStatus = structs.AllocDesiredStatusStop + newAlloc.Name = "FOO" + newAlloc.AllocModifyIndex++ ar.Update(newAlloc) + // Check the alloc runner stores the update allocation. testutil.WaitForResult(func() (bool, error) { - if upd.Count == 0 { - return false, nil - } - last := upd.Allocs[upd.Count-1] - return last.ClientStatus == structs.AllocClientStatusDead, nil + return ar.Alloc().Name == "FOO", nil }, func(err error) { - t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates) + t.Fatalf("err: %v %#v", err, ar.Alloc()) }) - - if time.Since(start) > 15*time.Second { - t.Fatalf("took too long to terminate") - } } func TestAllocRunner_SaveRestoreState(t *testing.T) { diff --git a/client/client.go b/client/client.go index 33f5efde9fc..c0b7cb70338 100644 --- a/client/client.go +++ b/client/client.go @@ -627,7 +627,7 @@ func (c *Client) run() { } // Watch for changes in allocations - allocUpdates := make(chan []*structs.Allocation, 1) + allocUpdates := make(chan *allocUpdates, 1) go c.watchAllocations(allocUpdates) // Create a snapshot timer @@ -642,8 +642,8 @@ func (c *Client) run() { c.logger.Printf("[ERR] client: failed to save state: %v", err) } - case allocs := <-allocUpdates: - c.runAllocs(allocs) + case update := <-allocUpdates: + c.runAllocs(update) case <-heartbeat: if err := c.updateNodeStatus(); err != nil { @@ -722,8 +722,22 @@ func (c *Client) updateAllocStatus(alloc *structs.Allocation) error { return nil } +// allocUpdates holds the results of receiving updated allocations from the +// servers. +type allocUpdates struct { + // pulled is the set of allocations that were downloaded from the servers. + pulled map[string]*structs.Allocation + + // filtered is the set of allocations that were not pulled because their + // AllocModifyIndex didn't change. + filtered map[string]struct{} +} + // watchAllocations is used to scan for updates to allocations -func (c *Client) watchAllocations(allocUpdates chan []*structs.Allocation) { +func (c *Client) watchAllocations(updates chan *allocUpdates) { + // The request and response for getting the map of allocations that should + // be running on the Node to their AllocModifyIndex which is incremented + // when the allocation is updated by the servers. req := structs.NodeSpecificRequest{ NodeID: c.Node().ID, QueryOptions: structs.QueryOptions{ @@ -731,12 +745,24 @@ func (c *Client) watchAllocations(allocUpdates chan []*structs.Allocation) { AllowStale: true, }, } - var resp structs.NodeAllocsResponse + var resp structs.NodeClientAllocsResponse + + // The request and response for pulling down the set of allocations that are + // new, or updated server side. + allocsReq := structs.AllocsGetRequest{ + QueryOptions: structs.QueryOptions{ + Region: c.config.Region, + AllowStale: true, + }, + } + var allocsResp structs.AllocsGetResponse for { - // Get the allocations, blocking for updates - resp = structs.NodeAllocsResponse{} - err := c.RPC("Node.GetAllocs", &req, &resp) + // Get the allocation modify index map, blocking for updates. We will + // use this to determine exactly what allocations need to be downloaded + // in full. + resp = structs.NodeClientAllocsResponse{} + err := c.RPC("Node.GetClientAllocs", &req, &resp) if err != nil { c.logger.Printf("[ERR] client: failed to query for node allocations: %v", err) retry := c.retryIntv(getAllocRetryIntv) @@ -755,16 +781,69 @@ func (c *Client) watchAllocations(allocUpdates chan []*structs.Allocation) { default: } - // Check for updates + // Filter all allocations whose AllocModifyIndex was not incremented. + // These are the allocations who have either not been updated, or whose + // updates are a result of the client sending an update for the alloc. + // This lets us reduce the network traffic to the server as we don't + // need to pull all the allocations. + var pull []string + filtered := make(map[string]struct{}) + c.allocLock.Lock() + for allocID, modifyIndex := range resp.Allocs { + // Pull the allocation if we don't have an alloc runner for the + // allocation or if the alloc runner requires an updated allocation. + runner, ok := c.allocs[allocID] + if !ok || runner.shouldUpdate(modifyIndex) { + pull = append(pull, allocID) + } + filtered[allocID] = struct{}{} + } + c.allocLock.Unlock() + c.logger.Printf("[DEBUG] client: updated allocations at index %d (pulled %d) (filtered %d)", + resp.Index, len(pull), len(filtered)) + + // Pull the allocations that passed filtering. + allocsResp.Allocs = nil + if len(pull) != 0 { + // Pull the allocations that need to be updated. + allocsReq.AllocIDs = pull + allocsResp = structs.AllocsGetResponse{} + if err := c.RPC("Alloc.GetAllocs", &allocsReq, &allocsResp); err != nil { + c.logger.Printf("[ERR] client: failed to query updated allocations: %v", err) + retry := c.retryIntv(getAllocRetryIntv) + select { + case <-time.After(retry): + continue + case <-c.shutdownCh: + return + } + } + + // Check for shutdown + select { + case <-c.shutdownCh: + return + default: + } + } + + // Update the query index. if resp.Index <= req.MinQueryIndex { continue } req.MinQueryIndex = resp.Index - c.logger.Printf("[DEBUG] client: updated allocations at index %d (%d allocs)", resp.Index, len(resp.Allocs)) - // Push the updates + // Push the updates. + pulled := make(map[string]*structs.Allocation, len(allocsResp.Allocs)) + for _, alloc := range allocsResp.Allocs { + pulled[alloc.ID] = alloc + } + update := &allocUpdates{ + filtered: filtered, + pulled: pulled, + } select { - case allocUpdates <- resp.Allocs: + case updates <- update: case <-c.shutdownCh: return } @@ -772,7 +851,7 @@ func (c *Client) watchAllocations(allocUpdates chan []*structs.Allocation) { } // runAllocs is invoked when we get an updated set of allocations -func (c *Client) runAllocs(updated []*structs.Allocation) { +func (c *Client) runAllocs(update *allocUpdates) { // Get the existing allocs c.allocLock.RLock() exist := make([]*structs.Allocation, 0, len(c.allocs)) @@ -782,7 +861,7 @@ func (c *Client) runAllocs(updated []*structs.Allocation) { c.allocLock.RUnlock() // Diff the existing and updated allocations - diff := diffAllocs(exist, updated) + diff := diffAllocs(exist, update) c.logger.Printf("[DEBUG] client: %#v", diff) // Remove the old allocations diff --git a/client/client_test.go b/client/client_test.go index 9e94542c188..d1b7bc87a02 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -355,10 +355,14 @@ func TestClient_WatchAllocs(t *testing.T) { t.Fatalf("err: %v", err) } - // Update the other allocation - alloc2.DesiredStatus = structs.AllocDesiredStatusStop + // Update the other allocation. Have to make a copy because the allocs are + // shared in memory in the test and the modify index would be updated in the + // alloc runner. + alloc2_2 := new(structs.Allocation) + *alloc2_2 = *alloc2 + alloc2_2.DesiredStatus = structs.AllocDesiredStatusStop err = state.UpsertAllocs(102, - []*structs.Allocation{alloc2}) + []*structs.Allocation{alloc2_2}) if err != nil { t.Fatalf("err: %v", err) } diff --git a/client/util.go b/client/util.go index 4efec9e184e..a8afcd17152 100644 --- a/client/util.go +++ b/client/util.go @@ -31,33 +31,28 @@ func (d *diffResult) GoString() string { // diffAllocs is used to diff the existing and updated allocations // to see what has happened. -func diffAllocs(existing, updated []*structs.Allocation) *diffResult { - result := &diffResult{} - - // Index the updated allocations by id - idx := make(map[string]*structs.Allocation) - for _, update := range updated { - idx[update.ID] = update - } - +func diffAllocs(existing []*structs.Allocation, allocs *allocUpdates) *diffResult { // Scan the existing allocations + result := &diffResult{} existIdx := make(map[string]struct{}) for _, exist := range existing { // Mark this as existing existIdx[exist.ID] = struct{}{} - // Check for presence in the new set - update, ok := idx[exist.ID] + // Check if the alloc was updated or filtered because an update wasn't + // needed. + alloc, pulled := allocs.pulled[exist.ID] + _, filtered := allocs.filtered[exist.ID] - // If not present, removed - if !ok { + // If not updated or filtered, removed + if !pulled && !filtered { result.removed = append(result.removed, exist) continue } // Check for an update - if update.ModifyIndex > exist.ModifyIndex { - result.updated = append(result.updated, allocTuple{exist, update}) + if pulled && alloc.AllocModifyIndex > exist.AllocModifyIndex { + result.updated = append(result.updated, allocTuple{exist, alloc}) continue } @@ -66,9 +61,9 @@ func diffAllocs(existing, updated []*structs.Allocation) *diffResult { } // Scan the updated allocations for any that are new - for _, update := range updated { - if _, ok := existIdx[update.ID]; !ok { - result.added = append(result.added, update) + for id, pulled := range allocs.pulled { + if _, ok := existIdx[id]; !ok { + result.added = append(result.added, pulled) } } return result diff --git a/client/util_test.go b/client/util_test.go index d6c165b9b9c..64413df9aff 100644 --- a/client/util_test.go +++ b/client/util_test.go @@ -17,7 +17,7 @@ func TestDiffAllocs(t *testing.T) { alloc2 := mock.Alloc() // Update alloc2u := new(structs.Allocation) *alloc2u = *alloc2 - alloc2u.ModifyIndex += 1 + alloc2u.AllocModifyIndex += 1 alloc3 := mock.Alloc() // Remove alloc4 := mock.Alloc() // Add @@ -26,13 +26,17 @@ func TestDiffAllocs(t *testing.T) { alloc2, alloc3, } - updated := []*structs.Allocation{ - alloc1, - alloc2u, - alloc4, + update := &allocUpdates{ + pulled: map[string]*structs.Allocation{ + alloc2u.ID: alloc2u, + alloc4.ID: alloc4, + }, + filtered: map[string]struct{}{ + alloc1.ID: struct{}{}, + }, } - result := diffAllocs(exist, updated) + result := diffAllocs(exist, update) if len(result.ignore) != 1 || result.ignore[0] != alloc1 { t.Fatalf("Bad: %#v", result.ignore) diff --git a/command/agent/node_endpoint.go b/command/agent/node_endpoint.go index f249212c355..c872912e06e 100644 --- a/command/agent/node_endpoint.go +++ b/command/agent/node_endpoint.go @@ -36,9 +36,6 @@ func (s *HTTPServer) NodeSpecificRequest(resp http.ResponseWriter, req *http.Req case strings.HasSuffix(path, "/evaluate"): nodeName := strings.TrimSuffix(path, "/evaluate") return s.nodeForceEvaluate(resp, req, nodeName) - case strings.HasSuffix(path, "/clientallocations"): - nodeName := strings.TrimSuffix(path, "/clientallocations") - return s.nodeClientAllocations(resp, req, nodeName) case strings.HasSuffix(path, "/allocations"): nodeName := strings.TrimSuffix(path, "/allocations") return s.nodeAllocations(resp, req, nodeName) @@ -92,27 +89,6 @@ func (s *HTTPServer) nodeAllocations(resp http.ResponseWriter, req *http.Request return out.Allocs, nil } -func (s *HTTPServer) nodeClientAllocations(resp http.ResponseWriter, req *http.Request, - nodeID string) (interface{}, error) { - if req.Method != "GET" { - return nil, CodedError(405, ErrInvalidMethod) - } - args := structs.NodeSpecificRequest{ - NodeID: nodeID, - } - if s.parse(resp, req, &args.Region, &args.QueryOptions) { - return nil, nil - } - - var out structs.NodeClientAllocsResponse - if err := s.agent.RPC("Node.GetClientAllocs", &args, &out); err != nil { - return nil, err - } - - setMeta(resp, &out.QueryMeta) - return out.Allocs, nil -} - func (s *HTTPServer) nodeToggleDrain(resp http.ResponseWriter, req *http.Request, nodeID string) (interface{}, error) { if req.Method != "PUT" && req.Method != "POST" { diff --git a/command/agent/node_endpoint_test.go b/command/agent/node_endpoint_test.go index d4ec061b9a2..a63739a18bf 100644 --- a/command/agent/node_endpoint_test.go +++ b/command/agent/node_endpoint_test.go @@ -214,60 +214,6 @@ func TestHTTP_NodeAllocations(t *testing.T) { }) } -func TestHTTP_NodeClientAllocations(t *testing.T) { - httpTest(t, nil, func(s *TestServer) { - // Create the job - node := mock.Node() - args := structs.NodeRegisterRequest{ - Node: node, - WriteRequest: structs.WriteRequest{Region: "global"}, - } - var resp structs.NodeUpdateResponse - if err := s.Agent.RPC("Node.Register", &args, &resp); err != nil { - t.Fatalf("err: %v", err) - } - - // Directly manipulate the state - state := s.Agent.server.State() - alloc1 := mock.Alloc() - alloc1.NodeID = node.ID - err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1}) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Make the HTTP request - req, err := http.NewRequest("GET", "/v1/node/"+node.ID+"/clientallocations", nil) - if err != nil { - t.Fatalf("err: %v", err) - } - respW := httptest.NewRecorder() - - // Make the request - obj, err := s.Server.NodeSpecificRequest(respW, req) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Check for the index - if respW.HeaderMap.Get("X-Nomad-Index") == "" { - t.Fatalf("missing index") - } - if respW.HeaderMap.Get("X-Nomad-KnownLeader") != "true" { - t.Fatalf("missing known leader") - } - if respW.HeaderMap.Get("X-Nomad-LastContact") == "" { - t.Fatalf("missing last contact") - } - - // Check the node - allocs := obj.(map[string]uint64) - if len(allocs) != 1 || allocs[alloc1.ID] != 1000 { - t.Fatalf("bad: %#v", allocs) - } - }) -} - func TestHTTP_NodeDrain(t *testing.T) { httpTest(t, nil, func(s *TestServer) { // Create the node diff --git a/nomad/alloc_endpoint.go b/nomad/alloc_endpoint.go index ca65621b18b..e49e2383083 100644 --- a/nomad/alloc_endpoint.go +++ b/nomad/alloc_endpoint.go @@ -1,6 +1,7 @@ package nomad import ( + "fmt" "time" "github.com/armon/go-metrics" @@ -110,3 +111,39 @@ func (a *Alloc) GetAlloc(args *structs.AllocSpecificRequest, }} return a.srv.blockingRPC(&opts) } + +// GetAllocs is used to lookup a set of allocations +func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest, + reply *structs.AllocsGetResponse) error { + if done, err := a.srv.forward("Alloc.GetAllocs", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "alloc", "get_alloc"}, time.Now()) + + // Lookup the allocations + snap, err := a.srv.fsm.State().Snapshot() + if err != nil { + return err + } + + allocs := make([]*structs.Allocation, len(args.AllocIDs)) + for i, alloc := range args.AllocIDs { + out, err := snap.AllocByID(alloc) + if err != nil { + return err + } + if out == nil { + return fmt.Errorf("unknown alloc id %q", alloc) + } + + allocs[i] = out + if reply.Index < out.ModifyIndex { + reply.Index = out.ModifyIndex + } + } + + // Set the response + a.srv.setQueryMeta(&reply.QueryMeta) + reply.Allocs = allocs + return nil +} diff --git a/nomad/alloc_endpoint_test.go b/nomad/alloc_endpoint_test.go index 2f82ad6de35..b6ea20cd6a3 100644 --- a/nomad/alloc_endpoint_test.go +++ b/nomad/alloc_endpoint_test.go @@ -192,7 +192,7 @@ func TestAllocEndpoint_GetAlloc_Blocking(t *testing.T) { } }) - // Lookup the jobs + // Lookup the allocs get := &structs.AllocSpecificRequest{ AllocID: alloc2.ID, QueryOptions: structs.QueryOptions{ @@ -216,3 +216,45 @@ func TestAllocEndpoint_GetAlloc_Blocking(t *testing.T) { t.Fatalf("bad: %#v", resp.Alloc) } } + +func TestAllocEndpoint_GetAllocs(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + alloc := mock.Alloc() + alloc2 := mock.Alloc() + state := s1.fsm.State() + err := state.UpsertAllocs(1000, []*structs.Allocation{alloc, alloc2}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Lookup the allocs + get := &structs.AllocsGetRequest{ + AllocIDs: []string{alloc.ID, alloc2.ID}, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + var resp structs.AllocsGetResponse + if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAllocs", get, &resp); err != nil { + t.Fatalf("err: %v", err) + } + if resp.Index != 1000 { + t.Fatalf("Bad index: %d %d", resp.Index, 1000) + } + + if len(resp.Allocs) != 2 { + t.Fatalf("bad: %#v", resp.Allocs) + } + + // Lookup non-existent allocs. + get = &structs.AllocsGetRequest{ + AllocIDs: []string{"foo"}, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAllocs", get, &resp); err == nil { + t.Fatalf("expect error") + } +} diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index ef76486a842..d9d7de61173 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -421,6 +421,7 @@ func TestFSM_UpsertAllocs(t *testing.T) { } alloc.CreateIndex = out.CreateIndex alloc.ModifyIndex = out.ModifyIndex + alloc.AllocModifyIndex = out.AllocModifyIndex if !reflect.DeepEqual(alloc, out) { t.Fatalf("bad: %#v %#v", alloc, out) } diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 94ff3d4fca4..a56d842f372 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -387,7 +387,7 @@ func (n *Node) GetAllocs(args *structs.NodeSpecificRequest, return n.srv.blockingRPC(&opts) } -// GetClientAllocs is used to request a lightweight list of modify indexes +// GetClientAllocs is used to request a lightweight list of alloc modify indexes // per allocation. func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, reply *structs.NodeClientAllocsResponse) error { @@ -421,7 +421,7 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, // Setup the output if len(allocs) != 0 { for _, alloc := range allocs { - reply.Allocs[alloc.ID] = alloc.ModifyIndex + reply.Allocs[alloc.ID] = alloc.AllocModifyIndex reply.Index = maxUint64(reply.Index, alloc.ModifyIndex) } } else { diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 743e6127ed2..7b2aa720aa9 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -666,7 +666,7 @@ func TestClientEndpoint_GetClientAllocs_Blocking(t *testing.T) { allocUpdate.NodeID = alloc.NodeID allocUpdate.ID = alloc.ID allocUpdate.ClientStatus = structs.AllocClientStatusRunning - err := state.UpdateAllocFromClient(200, allocUpdate) + err := state.UpsertAllocs(200, []*structs.Allocation{allocUpdate}) if err != nil { t.Fatalf("err: %v", err) } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 553bcbf0777..d097bb0d7cf 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -793,10 +793,12 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er if existing == nil { alloc.CreateIndex = index alloc.ModifyIndex = index + alloc.AllocModifyIndex = index } else { exist := existing.(*structs.Allocation) alloc.CreateIndex = exist.CreateIndex alloc.ModifyIndex = index + alloc.AllocModifyIndex = index alloc.ClientStatus = exist.ClientStatus alloc.ClientDescription = exist.ClientDescription } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index fa7ac8bfc84..1e92c7d0334 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -272,6 +272,12 @@ type AllocSpecificRequest struct { QueryOptions } +// AllocsGetcRequest is used to query a set of allocations +type AllocsGetRequest struct { + AllocIDs []string + QueryOptions +} + // PeriodicForceReqeuest is used to force a specific periodic job. type PeriodicForceRequest struct { JobID string @@ -378,6 +384,12 @@ type SingleAllocResponse struct { QueryMeta } +// AllocsGetResponse is used to return a set of allocations +type AllocsGetResponse struct { + Allocs []*Allocation + QueryMeta +} + // JobAllocationsResponse is used to return the allocations for a job type JobAllocationsResponse struct { Allocations []*AllocListStub @@ -1647,8 +1659,9 @@ type Allocation struct { TaskStates map[string]*TaskState // Raft Indexes - CreateIndex uint64 - ModifyIndex uint64 + CreateIndex uint64 + ModifyIndex uint64 + AllocModifyIndex uint64 } // TerminalStatus returns if the desired or actual status is terminal and diff --git a/website/source/docs/http/node.html.md b/website/source/docs/http/node.html.md index 81c7fa137f3..794e0283abe 100644 --- a/website/source/docs/http/node.html.md +++ b/website/source/docs/http/node.html.md @@ -311,44 +311,6 @@ be specified using the `?region=` query parameter. -
-
Description
-
- Query the allocations belonging to a single node. This endpoint only returns - a map from allocation id to its modify index and is primarily used by the client - to determine which allocations need to be updated. -
- -
Method
-
GET
- -
URL
-
`/v1/node//clientallocations`
- -
Parameters
-
- None -
- -
Blocking Queries
-
- [Supported](/docs/http/index.html#blocking-queries) -
- -
Returns
-
- - ```javascript - { - "d66ea8d7-1d4c-119e-46b3-e23713a4ab72": 9, - "abf34c35-1d4c-119e-46b3-e23713a4ab72": 10 - } - ``` - -
-
- - ## PUT / POST
From 3b90539fe9f2ad576afe691179dafd7162d01a5b Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 1 Feb 2016 15:43:43 -0800 Subject: [PATCH 2/5] Fix locks and use task runners state not alloc state --- client/alloc_runner.go | 17 ++++++++++++----- client/client.go | 3 ++- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index f17a420b4d3..2d52c07f6e0 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -156,6 +156,8 @@ func (r *AllocRunner) SaveState() error { func (r *AllocRunner) saveAllocRunnerState() error { r.taskStatusLock.RLock() defer r.taskStatusLock.RUnlock() + r.allocLock.Lock() + defer r.allocLock.Unlock() snap := allocRunnerState{ Alloc: r.alloc, RestartPolicy: r.RestartPolicy, @@ -224,7 +226,8 @@ func (r *AllocRunner) syncStatus() error { // Scan the task states to determine the status of the alloc var pending, running, dead, failed bool r.taskStatusLock.RLock() - for _, state := range r.alloc.TaskStates { + for _, tr := range r.tasks { + state := tr.state switch state.State { case structs.TaskStateRunning: running = true @@ -239,13 +242,17 @@ func (r *AllocRunner) syncStatus() error { } } } + r.taskStatusLock.RUnlock() + + // Determine the alloc status + r.allocLock.Lock() + defer r.allocLock.Unlock() + if len(r.alloc.TaskStates) > 0 { taskDesc, _ := json.Marshal(r.alloc.TaskStates) r.alloc.ClientDescription = string(taskDesc) } - r.taskStatusLock.RUnlock() - // Determine the alloc status if failed { r.alloc.ClientStatus = structs.AllocClientStatusFailed } else if running { @@ -379,8 +386,8 @@ OUTER: } // Destroy each sub-task - r.taskLock.RLock() - defer r.taskLock.RUnlock() + r.taskLock.Lock() + defer r.taskLock.Unlock() for _, tr := range r.tasks { tr.Destroy() } diff --git a/client/client.go b/client/client.go index c0b7cb70338..8a51fc07336 100644 --- a/client/client.go +++ b/client/client.go @@ -795,8 +795,9 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) { runner, ok := c.allocs[allocID] if !ok || runner.shouldUpdate(modifyIndex) { pull = append(pull, allocID) + } else { + filtered[allocID] = struct{}{} } - filtered[allocID] = struct{}{} } c.allocLock.Unlock() c.logger.Printf("[DEBUG] client: updated allocations at index %d (pulled %d) (filtered %d)", From c0ab75a7f92e967c7f4568d0de5b31c6d4d952c9 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 1 Feb 2016 17:47:53 -0800 Subject: [PATCH 3/5] Don't share task state with the alloc in the task runner --- client/alloc_runner.go | 37 ++++++++++++++++--------------------- client/task_runner.go | 27 ++++++++++++++++++++------- nomad/structs/structs.go | 25 +++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 28 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 2d52c07f6e0..bb971eac6da 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -1,7 +1,6 @@ package client import ( - "encoding/json" "fmt" "log" "os" @@ -113,8 +112,7 @@ func (r *AllocRunner) RestoreState() error { task := &structs.Task{Name: name} restartTracker := newRestartTracker(r.RestartPolicy) tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, - r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker, - r.consulService) + r.alloc, task, restartTracker, r.consulService) r.tasks[name] = tr // Skip tasks in terminal states. @@ -189,7 +187,7 @@ func (r *AllocRunner) DestroyContext() error { func (r *AllocRunner) Alloc() *structs.Allocation { r.allocLock.Lock() defer r.allocLock.Unlock() - return r.alloc + return r.alloc.Copy() } // dirtySyncState is used to watch for state being marked dirty to sync @@ -223,11 +221,17 @@ func (r *AllocRunner) retrySyncState(stopCh chan struct{}) { // syncStatus is used to run and sync the status when it changes func (r *AllocRunner) syncStatus() error { + // Get a copy of our alloc. + alloc := r.Alloc() + // Scan the task states to determine the status of the alloc var pending, running, dead, failed bool r.taskStatusLock.RLock() - for _, tr := range r.tasks { - state := tr.state + for name, tr := range r.tasks { + // Store the state of each task in the copied alloc. + state := tr.getState() + alloc.TaskStates[name] = state + switch state.State { case structs.TaskStateRunning: running = true @@ -245,26 +249,18 @@ func (r *AllocRunner) syncStatus() error { r.taskStatusLock.RUnlock() // Determine the alloc status - r.allocLock.Lock() - defer r.allocLock.Unlock() - - if len(r.alloc.TaskStates) > 0 { - taskDesc, _ := json.Marshal(r.alloc.TaskStates) - r.alloc.ClientDescription = string(taskDesc) - } - if failed { - r.alloc.ClientStatus = structs.AllocClientStatusFailed + alloc.ClientStatus = structs.AllocClientStatusFailed } else if running { - r.alloc.ClientStatus = structs.AllocClientStatusRunning + alloc.ClientStatus = structs.AllocClientStatusRunning } else if dead && !pending { - r.alloc.ClientStatus = structs.AllocClientStatusDead + alloc.ClientStatus = structs.AllocClientStatusDead } // Attempt to update the status - if err := r.updater(r.alloc); err != nil { + if err := r.updater(alloc); err != nil { r.logger.Printf("[ERR] client: failed to update alloc '%s' status to %s: %s", - r.alloc.ID, r.alloc.ClientStatus, err) + alloc.ID, alloc.ClientStatus, err) return err } return nil @@ -334,8 +330,7 @@ func (r *AllocRunner) Run() { task.Resources = alloc.TaskResources[task.Name] restartTracker := newRestartTracker(r.RestartPolicy) tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, - r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker, - r.consulService) + r.alloc, task, restartTracker, r.consulService) r.tasks[task.Name] = tr go tr.Run() } diff --git a/client/task_runner.go b/client/task_runner.go index 1a81eb1e3c6..40a8da25e44 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -27,10 +27,11 @@ type TaskRunner struct { restartTracker *RestartTracker consulService *ConsulService - task *structs.Task - state *structs.TaskState - updateCh chan *structs.Task - handle driver.DriverHandle + task *structs.Task + state *structs.TaskState + stateLock sync.RWMutex + updateCh chan *structs.Task + handle driver.DriverHandle destroy bool destroyCh chan struct{} @@ -43,6 +44,7 @@ type TaskRunner struct { // taskRunnerState is used to snapshot the state of the task runner type taskRunnerState struct { Task *structs.Task + State *structs.TaskState HandleID string } @@ -52,7 +54,7 @@ type TaskStateUpdater func(taskName string) // NewTaskRunner is used to create a new task context func NewTaskRunner(logger *log.Logger, config *config.Config, updater TaskStateUpdater, ctx *driver.ExecContext, - alloc *structs.Allocation, task *structs.Task, state *structs.TaskState, + alloc *structs.Allocation, task *structs.Task, restartTracker *RestartTracker, consulService *ConsulService) *TaskRunner { tc := &TaskRunner{ @@ -64,7 +66,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, ctx: ctx, alloc: alloc, task: task, - state: state, + state: new(structs.TaskState), updateCh: make(chan *structs.Task, 8), destroyCh: make(chan struct{}), waitCh: make(chan struct{}), @@ -100,6 +102,7 @@ func (r *TaskRunner) RestoreState() error { // Restore fields r.task = snap.Task + r.state = snap.State // Restore the driver if snap.HandleID != "" { @@ -126,7 +129,8 @@ func (r *TaskRunner) SaveState() error { r.snapshotLock.Lock() defer r.snapshotLock.Unlock() snap := taskRunnerState{ - Task: r.task, + Task: r.task, + State: r.getState(), } if r.handle != nil { snap.HandleID = r.handle.ID() @@ -158,8 +162,10 @@ func (r *TaskRunner) appendEvent(event *structs.TaskEvent) { // setState is used to update the state of the task runner func (r *TaskRunner) setState(state string, event *structs.TaskEvent) { // Update the task. + r.stateLock.Lock() r.state.State = state r.appendEvent(event) + r.stateLock.Unlock() // Persist our state to disk. if err := r.SaveState(); err != nil { @@ -170,6 +176,13 @@ func (r *TaskRunner) setState(state string, event *structs.TaskEvent) { r.updater(r.task.Name) } +// Get state returns a copy of our state. +func (r *TaskRunner) getState() *structs.TaskState { + r.stateLock.RLock() + r.stateLock.RUnlock() + return r.state.Copy() +} + // createDriver makes a driver for the task func (r *TaskRunner) createDriver() (driver.Driver, error) { taskEnv, err := driver.GetTaskEnv(r.ctx.AllocDir, r.config.Node, r.task) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 1e92c7d0334..c63bae5a422 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1448,6 +1448,16 @@ type TaskState struct { Events []*TaskEvent } +func (ts *TaskState) Copy() *TaskState { + copy := new(TaskState) + copy.State = ts.State + copy.Events = make([]*TaskEvent, len(ts.Events)) + for i, e := range ts.Events { + copy.Events[i] = e.Copy() + } + return copy +} + const ( // A Driver failure indicates that the task could not be started due to a // failure in the driver. @@ -1482,6 +1492,12 @@ type TaskEvent struct { KillError string // Error killing the task. } +func (te *TaskEvent) Copy() *TaskEvent { + copy := new(TaskEvent) + *copy = *te + return copy +} + func NewTaskEvent(event string) *TaskEvent { return &TaskEvent{ Type: event, @@ -1664,6 +1680,15 @@ type Allocation struct { AllocModifyIndex uint64 } +func (a *Allocation) Copy() *Allocation { + i, err := copystructure.Copy(a) + if err != nil { + panic(err) + } + + return i.(*Allocation) +} + // TerminalStatus returns if the desired or actual status is terminal and // will no longer transition. func (a *Allocation) TerminalStatus() bool { From 679a57c70432cda291d7dd805b4ecfc58fbf3497 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 1 Feb 2016 18:15:12 -0800 Subject: [PATCH 4/5] Comment on AllocModifyIndex --- nomad/structs/structs.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index c63bae5a422..d8976792762 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1675,8 +1675,11 @@ type Allocation struct { TaskStates map[string]*TaskState // Raft Indexes - CreateIndex uint64 - ModifyIndex uint64 + CreateIndex uint64 + ModifyIndex uint64 + + // AllocModifyIndex is not updated when the client updates allocations. This + // lets the client pull only the allocs updated by the server. AllocModifyIndex uint64 } From 019883c5b1d3e0249678edb8178609de9d8b4c66 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 2 Feb 2016 11:09:29 -0800 Subject: [PATCH 5/5] Clean interaction between alloc-runner and task-runner --- client/alloc_runner.go | 142 ++++++++++++++++++++++++------------- client/task_runner.go | 47 ++---------- client/task_runner_test.go | 51 +++++++------ 3 files changed, 126 insertions(+), 114 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index bb971eac6da..6b559ce91db 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -21,12 +21,6 @@ const ( allocSyncRetryIntv = 15 * time.Second ) -// taskStatus is used to track the status of a task -type taskStatus struct { - Status string - Description string -} - // AllocStateUpdater is used to update the status of an allocation type AllocStateUpdater func(alloc *structs.Allocation) error @@ -40,10 +34,15 @@ type AllocRunner struct { alloc *structs.Allocation allocLock sync.Mutex + // Explicit status of allocation. Set when there are failures + allocClientStatus string + allocClientDescription string + dirtyCh chan struct{} ctx *driver.ExecContext tasks map[string]*TaskRunner + taskStates map[string]*structs.TaskState restored map[string]struct{} RestartPolicy *structs.RestartPolicy taskLock sync.RWMutex @@ -60,10 +59,12 @@ type AllocRunner struct { // allocRunnerState is used to snapshot the state of the alloc runner type allocRunnerState struct { - Alloc *structs.Allocation - RestartPolicy *structs.RestartPolicy - TaskStatus map[string]taskStatus - Context *driver.ExecContext + Alloc *structs.Allocation + AllocClientStatus string + AllocClientDescription string + RestartPolicy *structs.RestartPolicy + TaskStates map[string]*structs.TaskState + Context *driver.ExecContext } // NewAllocRunner is used to create a new allocation context @@ -77,6 +78,7 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStat consulService: consulService, dirtyCh: make(chan struct{}, 1), tasks: make(map[string]*TaskRunner), + taskStates: alloc.TaskStates, restored: make(map[string]struct{}), updateCh: make(chan *structs.Allocation, 8), destroyCh: make(chan struct{}), @@ -102,10 +104,13 @@ func (r *AllocRunner) RestoreState() error { r.alloc = snap.Alloc r.RestartPolicy = snap.RestartPolicy r.ctx = snap.Context + r.allocClientStatus = snap.AllocClientStatus + r.allocClientDescription = snap.AllocClientDescription + r.taskStates = snap.TaskStates // Restore the task runners var mErr multierror.Error - for name, state := range r.alloc.TaskStates { + for name, state := range r.taskStates { // Mark the task as restored. r.restored[name] = struct{}{} @@ -157,9 +162,12 @@ func (r *AllocRunner) saveAllocRunnerState() error { r.allocLock.Lock() defer r.allocLock.Unlock() snap := allocRunnerState{ - Alloc: r.alloc, - RestartPolicy: r.RestartPolicy, - Context: r.ctx, + Alloc: r.alloc, + RestartPolicy: r.RestartPolicy, + Context: r.ctx, + AllocClientStatus: r.allocClientStatus, + AllocClientDescription: r.allocClientDescription, + TaskStates: r.taskStates, } return persistState(r.stateFilePath(), &snap) } @@ -186,8 +194,46 @@ func (r *AllocRunner) DestroyContext() error { // Alloc returns the associated allocation func (r *AllocRunner) Alloc() *structs.Allocation { r.allocLock.Lock() - defer r.allocLock.Unlock() - return r.alloc.Copy() + alloc := r.alloc.Copy() + r.allocLock.Unlock() + + // Scan the task states to determine the status of the alloc + var pending, running, dead, failed bool + r.taskStatusLock.RLock() + alloc.TaskStates = r.taskStates + for _, state := range r.taskStates { + switch state.State { + case structs.TaskStateRunning: + running = true + case structs.TaskStatePending: + pending = true + case structs.TaskStateDead: + last := len(state.Events) - 1 + if state.Events[last].Type == structs.TaskDriverFailure { + failed = true + } else { + dead = true + } + } + } + r.taskStatusLock.RUnlock() + + // The status has explicitely been set. + if r.allocClientStatus != "" || r.allocClientDescription != "" { + alloc.ClientStatus = r.allocClientStatus + alloc.ClientDescription = r.allocClientDescription + return alloc + } + + // Determine the alloc status + if failed { + alloc.ClientStatus = structs.AllocClientStatusFailed + } else if running { + alloc.ClientStatus = structs.AllocClientStatusRunning + } else if dead && !pending { + alloc.ClientStatus = structs.AllocClientStatusDead + } + return alloc } // dirtySyncState is used to watch for state being marked dirty to sync @@ -224,39 +270,6 @@ func (r *AllocRunner) syncStatus() error { // Get a copy of our alloc. alloc := r.Alloc() - // Scan the task states to determine the status of the alloc - var pending, running, dead, failed bool - r.taskStatusLock.RLock() - for name, tr := range r.tasks { - // Store the state of each task in the copied alloc. - state := tr.getState() - alloc.TaskStates[name] = state - - switch state.State { - case structs.TaskStateRunning: - running = true - case structs.TaskStatePending: - pending = true - case structs.TaskStateDead: - last := len(state.Events) - 1 - if state.Events[last].Type == structs.TaskDriverFailure { - failed = true - } else { - dead = true - } - } - } - r.taskStatusLock.RUnlock() - - // Determine the alloc status - if failed { - alloc.ClientStatus = structs.AllocClientStatusFailed - } else if running { - alloc.ClientStatus = structs.AllocClientStatusRunning - } else if dead && !pending { - alloc.ClientStatus = structs.AllocClientStatusDead - } - // Attempt to update the status if err := r.updater(alloc); err != nil { r.logger.Printf("[ERR] client: failed to update alloc '%s' status to %s: %s", @@ -277,13 +290,42 @@ func (r *AllocRunner) setStatus(status, desc string) { } // setTaskState is used to set the status of a task -func (r *AllocRunner) setTaskState(taskName string) { +func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEvent) { + r.taskStatusLock.Lock() + defer r.taskStatusLock.Unlock() + taskState, ok := r.taskStates[taskName] + if !ok { + r.logger.Printf("[ERR] client: setting task state for unknown task %q", taskName) + return + } + + // Set the tasks state. + taskState.State = state + r.appendTaskEvent(taskState, event) + select { case r.dirtyCh <- struct{}{}: default: } } +// appendTaskEvent updates the task status by appending the new event. +func (r *AllocRunner) appendTaskEvent(state *structs.TaskState, event *structs.TaskEvent) { + capacity := 10 + if state.Events == nil { + state.Events = make([]*structs.TaskEvent, 0, capacity) + } + + // If we hit capacity, then shift it. + if len(state.Events) == capacity { + old := state.Events + state.Events = make([]*structs.TaskEvent, 0, capacity) + state.Events = append(state.Events, old[1:]...) + } + + state.Events = append(state.Events, event) +} + // Run is a long running goroutine used to manage an allocation func (r *AllocRunner) Run() { defer close(r.waitCh) diff --git a/client/task_runner.go b/client/task_runner.go index 40a8da25e44..8eac8e23e13 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -27,11 +27,9 @@ type TaskRunner struct { restartTracker *RestartTracker consulService *ConsulService - task *structs.Task - state *structs.TaskState - stateLock sync.RWMutex - updateCh chan *structs.Task - handle driver.DriverHandle + task *structs.Task + updateCh chan *structs.Task + handle driver.DriverHandle destroy bool destroyCh chan struct{} @@ -44,12 +42,11 @@ type TaskRunner struct { // taskRunnerState is used to snapshot the state of the task runner type taskRunnerState struct { Task *structs.Task - State *structs.TaskState HandleID string } // TaskStateUpdater is used to signal that tasks state has changed. -type TaskStateUpdater func(taskName string) +type TaskStateUpdater func(taskName, state string, event *structs.TaskEvent) // NewTaskRunner is used to create a new task context func NewTaskRunner(logger *log.Logger, config *config.Config, @@ -66,7 +63,6 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, ctx: ctx, alloc: alloc, task: task, - state: new(structs.TaskState), updateCh: make(chan *structs.Task, 8), destroyCh: make(chan struct{}), waitCh: make(chan struct{}), @@ -102,7 +98,6 @@ func (r *TaskRunner) RestoreState() error { // Restore fields r.task = snap.Task - r.state = snap.State // Restore the driver if snap.HandleID != "" { @@ -129,8 +124,7 @@ func (r *TaskRunner) SaveState() error { r.snapshotLock.Lock() defer r.snapshotLock.Unlock() snap := taskRunnerState{ - Task: r.task, - State: r.getState(), + Task: r.task, } if r.handle != nil { snap.HandleID = r.handle.ID() @@ -143,44 +137,15 @@ func (r *TaskRunner) DestroyState() error { return os.RemoveAll(r.stateFilePath()) } -func (r *TaskRunner) appendEvent(event *structs.TaskEvent) { - capacity := 10 - if r.state.Events == nil { - r.state.Events = make([]*structs.TaskEvent, 0, capacity) - } - - // If we hit capacity, then shift it. - if len(r.state.Events) == capacity { - old := r.state.Events - r.state.Events = make([]*structs.TaskEvent, 0, capacity) - r.state.Events = append(r.state.Events, old[1:]...) - } - - r.state.Events = append(r.state.Events, event) -} - // setState is used to update the state of the task runner func (r *TaskRunner) setState(state string, event *structs.TaskEvent) { - // Update the task. - r.stateLock.Lock() - r.state.State = state - r.appendEvent(event) - r.stateLock.Unlock() - // Persist our state to disk. if err := r.SaveState(); err != nil { r.logger.Printf("[ERR] client: failed to save state of Task Runner: %v", r.task.Name) } // Indicate the task has been updated. - r.updater(r.task.Name) -} - -// Get state returns a copy of our state. -func (r *TaskRunner) getState() *structs.TaskState { - r.stateLock.RLock() - r.stateLock.RUnlock() - return r.state.Copy() + r.updater(r.task.Name, state, event) } // createDriver makes a driver for the task diff --git a/client/task_runner_test.go b/client/task_runner_test.go index 6efb368f2a4..52c8c73b3a4 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -21,9 +21,15 @@ func testLogger() *log.Logger { return log.New(os.Stderr, "", log.LstdFlags) } -type MockTaskStateUpdater struct{} +type MockTaskStateUpdater struct { + state string + events []*structs.TaskEvent +} -func (m *MockTaskStateUpdater) Update(name string) {} +func (m *MockTaskStateUpdater) Update(name, state string, event *structs.TaskEvent) { + m.state = state + m.events = append(m.events, event) +} func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) { logger := testLogger() @@ -48,14 +54,13 @@ func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) { restartTracker = noRestartsTracker() } - state := alloc.TaskStates[task.Name] - tr := NewTaskRunner(logger, conf, upd.Update, ctx, mock.Alloc(), task, state, restartTracker, consulClient) + tr := NewTaskRunner(logger, conf, upd.Update, ctx, mock.Alloc(), task, restartTracker, consulClient) return upd, tr } func TestTaskRunner_SimpleRun(t *testing.T) { ctestutil.ExecCompatible(t) - _, tr := testTaskRunner(false) + upd, tr := testTaskRunner(false) go tr.Run() defer tr.Destroy() defer tr.ctx.AllocDir.Destroy() @@ -66,26 +71,26 @@ func TestTaskRunner_SimpleRun(t *testing.T) { t.Fatalf("timeout") } - if len(tr.state.Events) != 2 { - t.Fatalf("should have 2 updates: %#v", tr.state.Events) + if len(upd.events) != 2 { + t.Fatalf("should have 2 updates: %#v", upd.events) } - if tr.state.State != structs.TaskStateDead { - t.Fatalf("TaskState %v; want %v", tr.state.State, structs.TaskStateDead) + if upd.state != structs.TaskStateDead { + t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead) } - if tr.state.Events[0].Type != structs.TaskStarted { - t.Fatalf("First Event was %v; want %v", tr.state.Events[0].Type, structs.TaskStarted) + if upd.events[0].Type != structs.TaskStarted { + t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskStarted) } - if tr.state.Events[1].Type != structs.TaskTerminated { - t.Fatalf("First Event was %v; want %v", tr.state.Events[1].Type, structs.TaskTerminated) + if upd.events[1].Type != structs.TaskTerminated { + t.Fatalf("First Event was %v; want %v", upd.events[1].Type, structs.TaskTerminated) } } func TestTaskRunner_Destroy(t *testing.T) { ctestutil.ExecCompatible(t) - _, tr := testTaskRunner(true) + upd, tr := testTaskRunner(true) defer tr.ctx.AllocDir.Destroy() // Change command to ensure we run for a bit @@ -105,20 +110,20 @@ func TestTaskRunner_Destroy(t *testing.T) { t.Fatalf("timeout") } - if len(tr.state.Events) != 2 { - t.Fatalf("should have 2 updates: %#v", tr.state.Events) + if len(upd.events) != 2 { + t.Fatalf("should have 2 updates: %#v", upd.events) } - if tr.state.State != structs.TaskStateDead { - t.Fatalf("TaskState %v; want %v", tr.state.State, structs.TaskStateDead) + if upd.state != structs.TaskStateDead { + t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead) } - if tr.state.Events[0].Type != structs.TaskStarted { - t.Fatalf("First Event was %v; want %v", tr.state.Events[0].Type, structs.TaskStarted) + if upd.events[0].Type != structs.TaskStarted { + t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskStarted) } - if tr.state.Events[1].Type != structs.TaskKilled { - t.Fatalf("First Event was %v; want %v", tr.state.Events[1].Type, structs.TaskKilled) + if upd.events[1].Type != structs.TaskKilled { + t.Fatalf("First Event was %v; want %v", upd.events[1].Type, structs.TaskKilled) } } @@ -167,7 +172,7 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) { // Create a new task runner consulClient, _ := NewConsulService(&consulServiceConfig{tr.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}}) tr2 := NewTaskRunner(tr.logger, tr.config, upd.Update, - tr.ctx, tr.alloc, &structs.Task{Name: tr.task.Name}, tr.state, tr.restartTracker, + tr.ctx, tr.alloc, &structs.Task{Name: tr.task.Name}, tr.restartTracker, consulClient) if err := tr2.RestoreState(); err != nil { t.Fatalf("err: %v", err)