Skip to content

Commit

Permalink
Events/eval alloc events (#9012)
Browse files Browse the repository at this point in the history
* generic eval update event

first pass at alloc client update events

* api/event client
  • Loading branch information
drewbailey committed Oct 13, 2020
1 parent 0c6118c commit a0b10c6
Show file tree
Hide file tree
Showing 14 changed files with 265 additions and 40 deletions.
93 changes: 93 additions & 0 deletions api/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package api

import (
"context"
"encoding/json"
"fmt"
)

type Events struct {
Index uint64
Events []Event
}

type Topic string

type Event struct {
Topic Topic
Type string
Key string
FilterKeys []string
Index uint64
Payload interface{}
}

func (e *Events) IsHeartBeat() bool {
return e.Index == 0 && len(e.Events) == 0
}

type EventStream struct {
client *Client
}

func (c *Client) EventStream() *EventStream {
return &EventStream{client: c}
}

func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, index uint64, q *QueryOptions) (<-chan *Events, <-chan error) {

errCh := make(chan error, 1)

r, err := e.client.newRequest("GET", "/v1/event/stream")
if err != nil {
errCh <- err
return nil, errCh
}
r.setQueryOptions(q)

// Build topic query params
for topic, keys := range topics {
for _, k := range keys {
r.params.Add("topic", fmt.Sprintf("%s:%s", topic, k))
}
}

_, resp, err := requireOK(e.client.doRequest(r))

if err != nil {
errCh <- err
return nil, errCh
}

eventsCh := make(chan *Events, 10)
go func() {
defer resp.Body.Close()

dec := json.NewDecoder(resp.Body)

for {
select {
case <-ctx.Done():
close(eventsCh)
return
default:
}

// Decode next newline delimited json of events
var events Events
if err := dec.Decode(&events); err != nil {
close(eventsCh)
errCh <- err
return
}
if events.IsHeartBeat() {
continue
}

eventsCh <- &events

}
}()

return eventsCh, errCh
}
50 changes: 50 additions & 0 deletions api/event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package api

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestEvent_Stream(t *testing.T) {
t.Parallel()

c, s := makeClient(t, nil, nil)
defer s.Stop()

// register job to generate events
jobs := c.Jobs()
job := testJob()
resp2, _, err := jobs.Register(job, nil)
require.Nil(t, err)
require.NotNil(t, resp2)

// build event stream request
events := c.EventStream()
q := &QueryOptions{}
topics := map[Topic][]string{
"Eval": {"*"},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

streamCh, errCh := events.Stream(ctx, topics, 0, q)

OUTER:
for {
select {
case event := <-streamCh:
require.Equal(t, len(event.Events), 1)
require.Equal(t, "Eval", string(event.Events[0].Topic))

break OUTER
case err := <-errCh:
require.Fail(t, err.Error())
case <-time.After(5 * time.Second):
require.Fail(t, "failed waiting for event stream event")
}
}
}
3 changes: 2 additions & 1 deletion nomad/alloc_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"context"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -193,7 +194,7 @@ func TestAllocEndpoint_List_Blocking(t *testing.T) {
alloc2.ClientStatus = structs.AllocClientStatusRunning
time.AfterFunc(100*time.Millisecond, func() {
state.UpsertJobSummary(3, mock.JobSummary(alloc2.JobID))
if err := state.UpdateAllocsFromClient(4, []*structs.Allocation{alloc2}); err != nil {
if err := state.UpdateAllocsFromClient(context.Background(), 4, []*structs.Allocation{alloc2}); err != nil {
t.Fatalf("err: %v", err)
}
})
Expand Down
3 changes: 2 additions & 1 deletion nomad/deployment_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -1300,7 +1301,7 @@ func TestDeploymentEndpoint_Allocations_Blocking(t *testing.T) {
a2.ClientStatus = structs.AllocClientStatusRunning
time.AfterFunc(100*time.Millisecond, func() {
assert.Nil(state.UpsertJobSummary(5, mock.JobSummary(a2.JobID)), "UpsertJobSummary")
assert.Nil(state.UpdateAllocsFromClient(6, []*structs.Allocation{a2}), "updateAllocsFromClient")
assert.Nil(state.UpdateAllocsFromClient(context.Background(), 6, []*structs.Allocation{a2}), "updateAllocsFromClient")
})

req.MinQueryIndex = 4
Expand Down
6 changes: 3 additions & 3 deletions nomad/deploymentwatcher/deployments_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,7 +1025,7 @@ func TestDeploymentWatcher_Watch_ProgressDeadline(t *testing.T) {
Healthy: helper.BoolToPtr(false),
Timestamp: now,
}
require.Nil(m.state.UpdateAllocsFromClient(100, []*structs.Allocation{a2}))
require.Nil(m.state.UpdateAllocsFromClient(context.Background(), 100, []*structs.Allocation{a2}))

// Wait for the deployment to be failed
testutil.WaitForResult(func() (bool, error) {
Expand Down Expand Up @@ -1209,7 +1209,7 @@ func TestDeploymentWatcher_Watch_ProgressDeadline_Canaries(t *testing.T) {
Healthy: helper.BoolToPtr(true),
Timestamp: now,
}
require.Nil(m.state.UpdateAllocsFromClient(m.nextIndex(), []*structs.Allocation{a2}))
require.Nil(m.state.UpdateAllocsFromClient(context.Background(), m.nextIndex(), []*structs.Allocation{a2}))

// Wait for the deployment to cross the deadline
dout, err := m.state.DeploymentByID(nil, d.ID)
Expand Down Expand Up @@ -1382,7 +1382,7 @@ func TestDeploymentWatcher_Watch_StartWithoutProgressDeadline(t *testing.T) {
Healthy: helper.BoolToPtr(false),
Timestamp: time.Now(),
}
require.Nil(m.state.UpdateAllocsFromClient(m.nextIndex(), []*structs.Allocation{a2}))
require.Nil(m.state.UpdateAllocsFromClient(context.Background(), m.nextIndex(), []*structs.Allocation{a2}))

// Wait for the alloc's DesiredState to set reschedule
testutil.WaitForResult(func() (bool, error) {
Expand Down
2 changes: 1 addition & 1 deletion nomad/drainer_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) {
new.ClientStatus = structs.AllocClientStatusComplete
updates = append(updates, new)
}
require.Nil(state.UpdateAllocsFromClient(1000, updates))
require.Nil(state.UpdateAllocsFromClient(context.Background(), 1000, updates))

// Check that the node drain is removed
testutil.WaitForResult(func() (bool, error) {
Expand Down
26 changes: 15 additions & 11 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,13 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
case structs.JobDeregisterRequestType:
return n.applyDeregisterJob(buf[1:], log.Index)
case structs.EvalUpdateRequestType:
return n.applyUpdateEval(buf[1:], log.Index)
return n.applyUpdateEval(msgType, buf[1:], log.Index)
case structs.EvalDeleteRequestType:
return n.applyDeleteEval(buf[1:], log.Index)
case structs.AllocUpdateRequestType:
return n.applyAllocUpdate(buf[1:], log.Index)
case structs.AllocClientUpdateRequestType:
return n.applyAllocClientUpdate(buf[1:], log.Index)
return n.applyAllocClientUpdate(msgType, buf[1:], log.Index)
case structs.ReconcileJobSummariesRequestType:
return n.applyReconcileSummaries(buf[1:], log.Index)
case structs.VaultAccessorRegisterRequestType:
Expand Down Expand Up @@ -570,7 +570,7 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
// so this may be nil during server upgrades.
if req.Eval != nil {
req.Eval.JobModifyIndex = index
if err := n.upsertEvals(index, []*structs.Evaluation{req.Eval}); err != nil {
if err := n.upsertEvals(context.Background(), index, []*structs.Evaluation{req.Eval}); err != nil {
return err
}
}
Expand Down Expand Up @@ -602,7 +602,7 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} {
// always attempt upsert eval even if job deregister fail
if req.Eval != nil {
req.Eval.JobModifyIndex = index
if err := n.upsertEvals(index, []*structs.Evaluation{req.Eval}); err != nil {
if err := n.upsertEvals(context.Background(), index, []*structs.Evaluation{req.Eval}); err != nil {
return err
}
}
Expand Down Expand Up @@ -689,17 +689,20 @@ func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, pu
return nil
}

func (n *nomadFSM) applyUpdateEval(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyUpdateEval(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "update_eval"}, time.Now())
var req structs.EvalUpdateRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
return n.upsertEvals(index, req.Evals)

ctx := context.WithValue(context.Background(), state.CtxMsgType, msgType)

return n.upsertEvals(ctx, index, req.Evals)
}

func (n *nomadFSM) upsertEvals(index uint64, evals []*structs.Evaluation) error {
if err := n.state.UpsertEvals(index, evals); err != nil {
func (n *nomadFSM) upsertEvals(ctx context.Context, index uint64, evals []*structs.Evaluation) error {
if err := n.state.UpsertEvalsCtx(ctx, index, evals); err != nil {
n.logger.Error("UpsertEvals failed", "error", err)
return err
}
Expand Down Expand Up @@ -786,7 +789,7 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} {
return nil
}

func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyAllocClientUpdate(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "alloc_client_update"}, time.Now())
var req structs.AllocUpdateRequest
if err := structs.Decode(buf, &req); err != nil {
Expand All @@ -807,15 +810,16 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{}
}
}

ctx := context.WithValue(context.Background(), state.CtxMsgType, msgType)
// Update all the client allocations
if err := n.state.UpdateAllocsFromClient(index, req.Alloc); err != nil {
if err := n.state.UpdateAllocsFromClient(ctx, index, req.Alloc); err != nil {
n.logger.Error("UpdateAllocFromClient failed", "error", err)
return err
}

// Update any evals
if len(req.Evals) > 0 {
if err := n.upsertEvals(index, req.Evals); err != nil {
if err := n.upsertEvals(ctx, index, req.Evals); err != nil {
n.logger.Error("applyAllocClientUpdate failed to update evaluations", "error", err)
return err
}
Expand Down
3 changes: 2 additions & 1 deletion nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"context"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -2048,7 +2049,7 @@ func TestClientEndpoint_GetAllocs_Blocking(t *testing.T) {
allocUpdate.ID = alloc.ID
allocUpdate.ClientStatus = structs.AllocClientStatusRunning
state.UpsertJobSummary(199, mock.JobSummary(allocUpdate.JobID))
err := state.UpdateAllocsFromClient(200, []*structs.Allocation{allocUpdate})
err := state.UpdateAllocsFromClient(context.Background(), 200, []*structs.Allocation{allocUpdate})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
57 changes: 57 additions & 0 deletions nomad/state/events.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package state

import (
"fmt"

"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
)
Expand All @@ -27,6 +29,8 @@ const (

TypeAllocCreated = "AllocCreated"
TypeAllocUpdated = "AllocUpdated"

TypeEvalUpdated = "EvalUpdated"
)

type JobEvent struct {
Expand Down Expand Up @@ -66,3 +70,56 @@ type JobDrainDetails struct {
Type string
AllocDetails map[string]NodeDrainAllocDetails
}

func GenericEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
var eventType string
switch changes.MsgType {
case structs.EvalUpdateRequestType:
eventType = TypeEvalUpdated
case structs.AllocClientUpdateRequestType:
eventType = TypeAllocUpdated
}

var events []stream.Event
for _, change := range changes.Changes {
switch change.Table {
case "evals":
after, ok := change.After.(*structs.Evaluation)
if !ok {
return nil, fmt.Errorf("transaction change was not an Evaluation")
}

event := stream.Event{
Topic: TopicEval,
Type: eventType,
Index: changes.Index,
Key: after.ID,
Payload: &EvalEvent{
Eval: after,
},
}

events = append(events, event)

case "allocs":
after, ok := change.After.(*structs.Allocation)
if !ok {
return nil, fmt.Errorf("transaction change was not an Allocation")
}

event := stream.Event{
Topic: TopicAlloc,
Type: eventType,
Index: changes.Index,
Key: after.ID,
Payload: &AllocEvent{
Alloc: after,
},
}

events = append(events, event)
}
}

return events, nil
}
Loading

0 comments on commit a0b10c6

Please sign in to comment.