From 925622a85161797e6ed813683f18df790d12a348 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 10 Jan 2017 13:25:52 -0800 Subject: [PATCH 1/2] GetAllocs uses a blocking query This PR makes GetAllocs use a blocking query as well as adding a sanity check to the clients watchAllocation code to ensure it gets the correct allocations. This PR fixes https://github.com/hashicorp/nomad/issues/2119 and https://github.com/hashicorp/nomad/issues/2153. The issue was that the client was talking to two different servers, one to check which allocations to pull and the other to pull those allocations. However the latter call was not with a blocking query and thus the client would not retreive the allocations it requested. The logging has been improved to make the problem more clear as well. --- client/client.go | 35 ++++++++++---- nomad/alloc_endpoint.go | 90 ++++++++++++++++++++++++++---------- nomad/alloc_endpoint_test.go | 60 +++++++++++++++++++++++- 3 files changed, 150 insertions(+), 35 deletions(-) diff --git a/client/client.go b/client/client.go index 545b6986e34..0732875c72e 100644 --- a/client/client.go +++ b/client/client.go @@ -1274,14 +1274,13 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) { } } - c.logger.Printf("[DEBUG] client: updated allocations at index %d (pulled %d) (filtered %d)", - resp.Index, len(pull), len(filtered)) - // Pull the allocations that passed filtering. allocsResp.Allocs = nil + var pulledAllocs map[string]*structs.Allocation if len(pull) != 0 { // Pull the allocations that need to be updated. allocsReq.AllocIDs = pull + allocsReq.MinQueryIndex = resp.Index - 1 allocsResp = structs.AllocsGetResponse{} if err := c.RPC("Alloc.GetAllocs", &allocsReq, &allocsResp); err != nil { c.logger.Printf("[ERR] client: failed to query updated allocations: %v", err) @@ -1296,6 +1295,27 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) { } } + // Ensure that we received all the allocations we wanted + pulledAllocs = make(map[string]*structs.Allocation, len(allocsResp.Allocs)) + for _, alloc := range allocsResp.Allocs { + pulledAllocs[alloc.ID] = alloc + } + + for _, desiredID := range pull { + if _, ok := pulledAllocs[desiredID]; !ok { + // We didn't get everything we wanted. Do not update the + // MinQueryIndex, sleep and then retry. + select { + case <-time.After(2 * time.Second): + // Wait for the server we contact to receive the + // allocations + continue + case <-c.shutdownCh: + return + } + } + } + // Check for shutdown select { case <-c.shutdownCh: @@ -1304,19 +1324,18 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) { } } + c.logger.Printf("[DEBUG] client: updated allocations at index %d (total %d) (pulled %d) (filtered %d)", + resp.Index, len(resp.Allocs), len(allocsResp.Allocs), len(filtered)) + // Update the query index. if resp.Index > req.MinQueryIndex { req.MinQueryIndex = resp.Index } // Push the updates. - pulled := make(map[string]*structs.Allocation, len(allocsResp.Allocs)) - for _, alloc := range allocsResp.Allocs { - pulled[alloc.ID] = alloc - } update := &allocUpdates{ filtered: filtered, - pulled: pulled, + pulled: pulledAllocs, } select { case updates <- update: diff --git a/nomad/alloc_endpoint.go b/nomad/alloc_endpoint.go index e49e2383083..9826a15475d 100644 --- a/nomad/alloc_endpoint.go +++ b/nomad/alloc_endpoint.go @@ -1,7 +1,6 @@ package nomad import ( - "fmt" "time" "github.com/armon/go-metrics" @@ -97,7 +96,7 @@ func (a *Alloc) GetAlloc(args *structs.AllocSpecificRequest, if out != nil { reply.Index = out.ModifyIndex } else { - // Use the last index that affected the nodes table + // Use the last index that affected the allocs table index, err := snap.Index("allocs") if err != nil { return err @@ -118,32 +117,73 @@ func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest, if done, err := a.srv.forward("Alloc.GetAllocs", args, args, reply); done { return err } - defer metrics.MeasureSince([]string{"nomad", "alloc", "get_alloc"}, time.Now()) + defer metrics.MeasureSince([]string{"nomad", "alloc", "get_allocs"}, time.Now()) - // Lookup the allocations - snap, err := a.srv.fsm.State().Snapshot() - if err != nil { - return err + // Build the watch + items := make([]watch.Item, 0, len(args.AllocIDs)) + for _, allocID := range args.AllocIDs { + items = append(items, watch.Item{Alloc: allocID}) } allocs := make([]*structs.Allocation, len(args.AllocIDs)) - for i, alloc := range args.AllocIDs { - out, err := snap.AllocByID(alloc) - if err != nil { - return err - } - if out == nil { - return fmt.Errorf("unknown alloc id %q", alloc) - } - - allocs[i] = out - if reply.Index < out.ModifyIndex { - reply.Index = out.ModifyIndex - } - } - // Set the response - a.srv.setQueryMeta(&reply.QueryMeta) - reply.Allocs = allocs - return nil + // Setup the blocking query. We wait for at least one of the requested + // allocations to be above the min query index. This guarantees that the + // server has received that index. + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + watch: watch.NewItems(items...), + run: func() error { + // Lookup the allocation + snap, err := a.srv.fsm.State().Snapshot() + if err != nil { + return err + } + + thresholdMet := false + maxIndex := uint64(0) + for i, alloc := range args.AllocIDs { + out, err := snap.AllocByID(alloc) + if err != nil { + return err + } + if out == nil { + // We don't have the alloc yet + thresholdMet = false + break + } + + // Store the pointer + allocs[i] = out + + // Check if we have passed the minimum index + if out.ModifyIndex > args.QueryOptions.MinQueryIndex { + thresholdMet = true + } + + if maxIndex < out.ModifyIndex { + maxIndex = out.ModifyIndex + } + } + + // Setup the output + if thresholdMet { + reply.Allocs = allocs + reply.Index = maxIndex + } else { + // Use the last index that affected the nodes table + index, err := snap.Index("allocs") + if err != nil { + return err + } + reply.Index = index + } + + // Set the query response + a.srv.setQueryMeta(&reply.QueryMeta) + return nil + }, + } + return a.srv.blockingRPC(&opts) } diff --git a/nomad/alloc_endpoint_test.go b/nomad/alloc_endpoint_test.go index 3af23f322e3..854a66a6e62 100644 --- a/nomad/alloc_endpoint_test.go +++ b/nomad/alloc_endpoint_test.go @@ -248,8 +248,10 @@ func TestAllocEndpoint_GetAllocs(t *testing.T) { // Lookup the allocs get := &structs.AllocsGetRequest{ - AllocIDs: []string{alloc.ID, alloc2.ID}, - QueryOptions: structs.QueryOptions{Region: "global"}, + AllocIDs: []string{alloc.ID, alloc2.ID}, + QueryOptions: structs.QueryOptions{ + Region: "global", + }, } var resp structs.AllocsGetResponse if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAllocs", get, &resp); err != nil { @@ -272,3 +274,57 @@ func TestAllocEndpoint_GetAllocs(t *testing.T) { t.Fatalf("expect error") } } + +func TestAllocEndpoint_GetAllocs_Blocking(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + state := s1.fsm.State() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the allocs + alloc1 := mock.Alloc() + alloc2 := mock.Alloc() + + // First create an unrelated alloc + time.AfterFunc(100*time.Millisecond, func() { + state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID)) + err := state.UpsertAllocs(100, []*structs.Allocation{alloc1}) + if err != nil { + t.Fatalf("err: %v", err) + } + }) + + // Create the alloc we are watching later + time.AfterFunc(200*time.Millisecond, func() { + state.UpsertJobSummary(199, mock.JobSummary(alloc2.JobID)) + err := state.UpsertAllocs(200, []*structs.Allocation{alloc2}) + if err != nil { + t.Fatalf("err: %v", err) + } + }) + + // Lookup the allocs + get := &structs.AllocsGetRequest{ + AllocIDs: []string{alloc1.ID, alloc2.ID}, + QueryOptions: structs.QueryOptions{ + Region: "global", + MinQueryIndex: 150, + }, + } + var resp structs.AllocsGetResponse + start := time.Now() + if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAllocs", get, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + if elapsed := time.Since(start); elapsed < 200*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + if resp.Index != 200 { + t.Fatalf("Bad index: %d %d", resp.Index, 200) + } + if len(resp.Allocs) != 2 { + t.Fatalf("bad: %#v", resp.Allocs) + } +} From 23e84ecc12e85f5f4e7347e12783192a2beaaead Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 11 Jan 2017 13:24:23 -0800 Subject: [PATCH 2/2] Random wait --- client/client.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/client/client.go b/client/client.go index 0732875c72e..6a960a83298 100644 --- a/client/client.go +++ b/client/client.go @@ -1305,8 +1305,9 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) { if _, ok := pulledAllocs[desiredID]; !ok { // We didn't get everything we wanted. Do not update the // MinQueryIndex, sleep and then retry. + wait := c.retryIntv(2 * time.Second) select { - case <-time.After(2 * time.Second): + case <-time.After(wait): // Wait for the server we contact to receive the // allocations continue