diff --git a/client/client.go b/client/client.go index 545b6986e34..6a960a83298 100644 --- a/client/client.go +++ b/client/client.go @@ -1274,14 +1274,13 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) { } } - c.logger.Printf("[DEBUG] client: updated allocations at index %d (pulled %d) (filtered %d)", - resp.Index, len(pull), len(filtered)) - // Pull the allocations that passed filtering. allocsResp.Allocs = nil + var pulledAllocs map[string]*structs.Allocation if len(pull) != 0 { // Pull the allocations that need to be updated. allocsReq.AllocIDs = pull + allocsReq.MinQueryIndex = resp.Index - 1 allocsResp = structs.AllocsGetResponse{} if err := c.RPC("Alloc.GetAllocs", &allocsReq, &allocsResp); err != nil { c.logger.Printf("[ERR] client: failed to query updated allocations: %v", err) @@ -1296,6 +1295,28 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) { } } + // Ensure that we received all the allocations we wanted + pulledAllocs = make(map[string]*structs.Allocation, len(allocsResp.Allocs)) + for _, alloc := range allocsResp.Allocs { + pulledAllocs[alloc.ID] = alloc + } + + for _, desiredID := range pull { + if _, ok := pulledAllocs[desiredID]; !ok { + // We didn't get everything we wanted. Do not update the + // MinQueryIndex, sleep and then retry. + wait := c.retryIntv(2 * time.Second) + select { + case <-time.After(wait): + // Wait for the server we contact to receive the + // allocations + continue + case <-c.shutdownCh: + return + } + } + } + // Check for shutdown select { case <-c.shutdownCh: @@ -1304,19 +1325,18 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) { } } + c.logger.Printf("[DEBUG] client: updated allocations at index %d (total %d) (pulled %d) (filtered %d)", + resp.Index, len(resp.Allocs), len(allocsResp.Allocs), len(filtered)) + // Update the query index. if resp.Index > req.MinQueryIndex { req.MinQueryIndex = resp.Index } // Push the updates. - pulled := make(map[string]*structs.Allocation, len(allocsResp.Allocs)) - for _, alloc := range allocsResp.Allocs { - pulled[alloc.ID] = alloc - } update := &allocUpdates{ filtered: filtered, - pulled: pulled, + pulled: pulledAllocs, } select { case updates <- update: diff --git a/nomad/alloc_endpoint.go b/nomad/alloc_endpoint.go index e49e2383083..9826a15475d 100644 --- a/nomad/alloc_endpoint.go +++ b/nomad/alloc_endpoint.go @@ -1,7 +1,6 @@ package nomad import ( - "fmt" "time" "github.com/armon/go-metrics" @@ -97,7 +96,7 @@ func (a *Alloc) GetAlloc(args *structs.AllocSpecificRequest, if out != nil { reply.Index = out.ModifyIndex } else { - // Use the last index that affected the nodes table + // Use the last index that affected the allocs table index, err := snap.Index("allocs") if err != nil { return err @@ -118,32 +117,73 @@ func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest, if done, err := a.srv.forward("Alloc.GetAllocs", args, args, reply); done { return err } - defer metrics.MeasureSince([]string{"nomad", "alloc", "get_alloc"}, time.Now()) + defer metrics.MeasureSince([]string{"nomad", "alloc", "get_allocs"}, time.Now()) - // Lookup the allocations - snap, err := a.srv.fsm.State().Snapshot() - if err != nil { - return err + // Build the watch + items := make([]watch.Item, 0, len(args.AllocIDs)) + for _, allocID := range args.AllocIDs { + items = append(items, watch.Item{Alloc: allocID}) } allocs := make([]*structs.Allocation, len(args.AllocIDs)) - for i, alloc := range args.AllocIDs { - out, err := snap.AllocByID(alloc) - if err != nil { - return err - } - if out == nil { - return fmt.Errorf("unknown alloc id %q", alloc) - } - - allocs[i] = out - if reply.Index < out.ModifyIndex { - reply.Index = out.ModifyIndex - } - } - // Set the response - a.srv.setQueryMeta(&reply.QueryMeta) - reply.Allocs = allocs - return nil + // Setup the blocking query. We wait for at least one of the requested + // allocations to be above the min query index. This guarantees that the + // server has received that index. + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + watch: watch.NewItems(items...), + run: func() error { + // Lookup the allocation + snap, err := a.srv.fsm.State().Snapshot() + if err != nil { + return err + } + + thresholdMet := false + maxIndex := uint64(0) + for i, alloc := range args.AllocIDs { + out, err := snap.AllocByID(alloc) + if err != nil { + return err + } + if out == nil { + // We don't have the alloc yet + thresholdMet = false + break + } + + // Store the pointer + allocs[i] = out + + // Check if we have passed the minimum index + if out.ModifyIndex > args.QueryOptions.MinQueryIndex { + thresholdMet = true + } + + if maxIndex < out.ModifyIndex { + maxIndex = out.ModifyIndex + } + } + + // Setup the output + if thresholdMet { + reply.Allocs = allocs + reply.Index = maxIndex + } else { + // Use the last index that affected the nodes table + index, err := snap.Index("allocs") + if err != nil { + return err + } + reply.Index = index + } + + // Set the query response + a.srv.setQueryMeta(&reply.QueryMeta) + return nil + }, + } + return a.srv.blockingRPC(&opts) } diff --git a/nomad/alloc_endpoint_test.go b/nomad/alloc_endpoint_test.go index 3af23f322e3..854a66a6e62 100644 --- a/nomad/alloc_endpoint_test.go +++ b/nomad/alloc_endpoint_test.go @@ -248,8 +248,10 @@ func TestAllocEndpoint_GetAllocs(t *testing.T) { // Lookup the allocs get := &structs.AllocsGetRequest{ - AllocIDs: []string{alloc.ID, alloc2.ID}, - QueryOptions: structs.QueryOptions{Region: "global"}, + AllocIDs: []string{alloc.ID, alloc2.ID}, + QueryOptions: structs.QueryOptions{ + Region: "global", + }, } var resp structs.AllocsGetResponse if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAllocs", get, &resp); err != nil { @@ -272,3 +274,57 @@ func TestAllocEndpoint_GetAllocs(t *testing.T) { t.Fatalf("expect error") } } + +func TestAllocEndpoint_GetAllocs_Blocking(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + state := s1.fsm.State() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the allocs + alloc1 := mock.Alloc() + alloc2 := mock.Alloc() + + // First create an unrelated alloc + time.AfterFunc(100*time.Millisecond, func() { + state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID)) + err := state.UpsertAllocs(100, []*structs.Allocation{alloc1}) + if err != nil { + t.Fatalf("err: %v", err) + } + }) + + // Create the alloc we are watching later + time.AfterFunc(200*time.Millisecond, func() { + state.UpsertJobSummary(199, mock.JobSummary(alloc2.JobID)) + err := state.UpsertAllocs(200, []*structs.Allocation{alloc2}) + if err != nil { + t.Fatalf("err: %v", err) + } + }) + + // Lookup the allocs + get := &structs.AllocsGetRequest{ + AllocIDs: []string{alloc1.ID, alloc2.ID}, + QueryOptions: structs.QueryOptions{ + Region: "global", + MinQueryIndex: 150, + }, + } + var resp structs.AllocsGetResponse + start := time.Now() + if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAllocs", get, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + if elapsed := time.Since(start); elapsed < 200*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + if resp.Index != 200 { + t.Fatalf("Bad index: %d %d", resp.Index, 200) + } + if len(resp.Allocs) != 2 { + t.Fatalf("bad: %#v", resp.Allocs) + } +}