Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Events/eval alloc events #9012

Merged
merged 2 commits into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions api/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package api

import (
"context"
"encoding/json"
"fmt"
)

type Events struct {
Index uint64
Events []Event
}

type Topic string

type Event struct {
Topic Topic
Type string
Key string
FilterKeys []string
Index uint64
Payload interface{}
}

func (e *Events) IsHeartBeat() bool {
return e.Index == 0 && len(e.Events) == 0
}

type EventStream struct {
client *Client
}

func (c *Client) EventStream() *EventStream {
return &EventStream{client: c}
}

func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, index uint64, q *QueryOptions) (<-chan *Events, <-chan error) {

errCh := make(chan error, 1)

r, err := e.client.newRequest("GET", "/v1/event/stream")
if err != nil {
errCh <- err
return nil, errCh
}
r.setQueryOptions(q)

// Build topic query params
for topic, keys := range topics {
for _, k := range keys {
r.params.Add("topic", fmt.Sprintf("%s:%s", topic, k))
}
}

_, resp, err := requireOK(e.client.doRequest(r))

if err != nil {
errCh <- err
return nil, errCh
}

eventsCh := make(chan *Events, 10)
go func() {
defer resp.Body.Close()

dec := json.NewDecoder(resp.Body)

for {
select {
case <-ctx.Done():
close(eventsCh)
return
default:
}

// Decode next newline delimited json of events
var events Events
if err := dec.Decode(&events); err != nil {
close(eventsCh)
errCh <- err
return
}
if events.IsHeartBeat() {
continue
}

eventsCh <- &events

}
}()

return eventsCh, errCh
}
50 changes: 50 additions & 0 deletions api/event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package api

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestEvent_Stream(t *testing.T) {
t.Parallel()

c, s := makeClient(t, nil, nil)
defer s.Stop()

// register job to generate events
jobs := c.Jobs()
job := testJob()
resp2, _, err := jobs.Register(job, nil)
require.Nil(t, err)
require.NotNil(t, resp2)

// build event stream request
events := c.EventStream()
q := &QueryOptions{}
topics := map[Topic][]string{
"Eval": {"*"},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

streamCh, errCh := events.Stream(ctx, topics, 0, q)

OUTER:
for {
select {
case event := <-streamCh:
require.Equal(t, len(event.Events), 1)
require.Equal(t, "Eval", string(event.Events[0].Topic))

break OUTER
case err := <-errCh:
require.Fail(t, err.Error())
case <-time.After(5 * time.Second):
require.Fail(t, "failed waiting for event stream event")
}
}
}
3 changes: 2 additions & 1 deletion nomad/alloc_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"context"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -193,7 +194,7 @@ func TestAllocEndpoint_List_Blocking(t *testing.T) {
alloc2.ClientStatus = structs.AllocClientStatusRunning
time.AfterFunc(100*time.Millisecond, func() {
state.UpsertJobSummary(3, mock.JobSummary(alloc2.JobID))
if err := state.UpdateAllocsFromClient(4, []*structs.Allocation{alloc2}); err != nil {
if err := state.UpdateAllocsFromClient(context.Background(), 4, []*structs.Allocation{alloc2}); err != nil {
t.Fatalf("err: %v", err)
}
})
Expand Down
3 changes: 2 additions & 1 deletion nomad/deployment_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -1300,7 +1301,7 @@ func TestDeploymentEndpoint_Allocations_Blocking(t *testing.T) {
a2.ClientStatus = structs.AllocClientStatusRunning
time.AfterFunc(100*time.Millisecond, func() {
assert.Nil(state.UpsertJobSummary(5, mock.JobSummary(a2.JobID)), "UpsertJobSummary")
assert.Nil(state.UpdateAllocsFromClient(6, []*structs.Allocation{a2}), "updateAllocsFromClient")
assert.Nil(state.UpdateAllocsFromClient(context.Background(), 6, []*structs.Allocation{a2}), "updateAllocsFromClient")
})

req.MinQueryIndex = 4
Expand Down
6 changes: 3 additions & 3 deletions nomad/deploymentwatcher/deployments_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,7 +1025,7 @@ func TestDeploymentWatcher_Watch_ProgressDeadline(t *testing.T) {
Healthy: helper.BoolToPtr(false),
Timestamp: now,
}
require.Nil(m.state.UpdateAllocsFromClient(100, []*structs.Allocation{a2}))
require.Nil(m.state.UpdateAllocsFromClient(context.Background(), 100, []*structs.Allocation{a2}))

// Wait for the deployment to be failed
testutil.WaitForResult(func() (bool, error) {
Expand Down Expand Up @@ -1209,7 +1209,7 @@ func TestDeploymentWatcher_Watch_ProgressDeadline_Canaries(t *testing.T) {
Healthy: helper.BoolToPtr(true),
Timestamp: now,
}
require.Nil(m.state.UpdateAllocsFromClient(m.nextIndex(), []*structs.Allocation{a2}))
require.Nil(m.state.UpdateAllocsFromClient(context.Background(), m.nextIndex(), []*structs.Allocation{a2}))

// Wait for the deployment to cross the deadline
dout, err := m.state.DeploymentByID(nil, d.ID)
Expand Down Expand Up @@ -1382,7 +1382,7 @@ func TestDeploymentWatcher_Watch_StartWithoutProgressDeadline(t *testing.T) {
Healthy: helper.BoolToPtr(false),
Timestamp: time.Now(),
}
require.Nil(m.state.UpdateAllocsFromClient(m.nextIndex(), []*structs.Allocation{a2}))
require.Nil(m.state.UpdateAllocsFromClient(context.Background(), m.nextIndex(), []*structs.Allocation{a2}))

// Wait for the alloc's DesiredState to set reschedule
testutil.WaitForResult(func() (bool, error) {
Expand Down
2 changes: 1 addition & 1 deletion nomad/drainer_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) {
new.ClientStatus = structs.AllocClientStatusComplete
updates = append(updates, new)
}
require.Nil(state.UpdateAllocsFromClient(1000, updates))
require.Nil(state.UpdateAllocsFromClient(context.Background(), 1000, updates))

// Check that the node drain is removed
testutil.WaitForResult(func() (bool, error) {
Expand Down
26 changes: 15 additions & 11 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,13 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
case structs.JobDeregisterRequestType:
return n.applyDeregisterJob(buf[1:], log.Index)
case structs.EvalUpdateRequestType:
return n.applyUpdateEval(buf[1:], log.Index)
return n.applyUpdateEval(msgType, buf[1:], log.Index)
case structs.EvalDeleteRequestType:
return n.applyDeleteEval(buf[1:], log.Index)
case structs.AllocUpdateRequestType:
return n.applyAllocUpdate(buf[1:], log.Index)
case structs.AllocClientUpdateRequestType:
return n.applyAllocClientUpdate(buf[1:], log.Index)
return n.applyAllocClientUpdate(msgType, buf[1:], log.Index)
case structs.ReconcileJobSummariesRequestType:
return n.applyReconcileSummaries(buf[1:], log.Index)
case structs.VaultAccessorRegisterRequestType:
Expand Down Expand Up @@ -570,7 +570,7 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
// so this may be nil during server upgrades.
if req.Eval != nil {
req.Eval.JobModifyIndex = index
if err := n.upsertEvals(index, []*structs.Evaluation{req.Eval}); err != nil {
if err := n.upsertEvals(context.Background(), index, []*structs.Evaluation{req.Eval}); err != nil {
return err
}
}
Expand Down Expand Up @@ -602,7 +602,7 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} {
// always attempt upsert eval even if job deregister fail
if req.Eval != nil {
req.Eval.JobModifyIndex = index
if err := n.upsertEvals(index, []*structs.Evaluation{req.Eval}); err != nil {
if err := n.upsertEvals(context.Background(), index, []*structs.Evaluation{req.Eval}); err != nil {
return err
}
}
Expand Down Expand Up @@ -689,17 +689,20 @@ func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, pu
return nil
}

func (n *nomadFSM) applyUpdateEval(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyUpdateEval(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "update_eval"}, time.Now())
var req structs.EvalUpdateRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
return n.upsertEvals(index, req.Evals)

ctx := context.WithValue(context.Background(), state.CtxMsgType, msgType)

return n.upsertEvals(ctx, index, req.Evals)
}

func (n *nomadFSM) upsertEvals(index uint64, evals []*structs.Evaluation) error {
if err := n.state.UpsertEvals(index, evals); err != nil {
func (n *nomadFSM) upsertEvals(ctx context.Context, index uint64, evals []*structs.Evaluation) error {
if err := n.state.UpsertEvalsCtx(ctx, index, evals); err != nil {
n.logger.Error("UpsertEvals failed", "error", err)
return err
}
Expand Down Expand Up @@ -786,7 +789,7 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} {
return nil
}

func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{} {
func (n *nomadFSM) applyAllocClientUpdate(msgType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "alloc_client_update"}, time.Now())
var req structs.AllocUpdateRequest
if err := structs.Decode(buf, &req); err != nil {
Expand All @@ -807,15 +810,16 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{}
}
}

ctx := context.WithValue(context.Background(), state.CtxMsgType, msgType)
// Update all the client allocations
if err := n.state.UpdateAllocsFromClient(index, req.Alloc); err != nil {
if err := n.state.UpdateAllocsFromClient(ctx, index, req.Alloc); err != nil {
n.logger.Error("UpdateAllocFromClient failed", "error", err)
return err
}

// Update any evals
if len(req.Evals) > 0 {
if err := n.upsertEvals(index, req.Evals); err != nil {
if err := n.upsertEvals(ctx, index, req.Evals); err != nil {
n.logger.Error("applyAllocClientUpdate failed to update evaluations", "error", err)
return err
}
Expand Down
3 changes: 2 additions & 1 deletion nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"context"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -2048,7 +2049,7 @@ func TestClientEndpoint_GetAllocs_Blocking(t *testing.T) {
allocUpdate.ID = alloc.ID
allocUpdate.ClientStatus = structs.AllocClientStatusRunning
state.UpsertJobSummary(199, mock.JobSummary(allocUpdate.JobID))
err := state.UpdateAllocsFromClient(200, []*structs.Allocation{allocUpdate})
err := state.UpdateAllocsFromClient(context.Background(), 200, []*structs.Allocation{allocUpdate})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
57 changes: 57 additions & 0 deletions nomad/state/events.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package state

import (
"fmt"

"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
)
Expand All @@ -27,6 +29,8 @@ const (

TypeAllocCreated = "AllocCreated"
TypeAllocUpdated = "AllocUpdated"

TypeEvalUpdated = "EvalUpdated"
)

type JobEvent struct {
Expand Down Expand Up @@ -66,3 +70,56 @@ type JobDrainDetails struct {
Type string
AllocDetails map[string]NodeDrainAllocDetails
}

func GenericEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
var eventType string
switch changes.MsgType {
case structs.EvalUpdateRequestType:
eventType = TypeEvalUpdated
case structs.AllocClientUpdateRequestType:
eventType = TypeAllocUpdated
}

var events []stream.Event
for _, change := range changes.Changes {
switch change.Table {
case "evals":
after, ok := change.After.(*structs.Evaluation)
if !ok {
return nil, fmt.Errorf("transaction change was not an Evaluation")
}

event := stream.Event{
Topic: TopicEval,
Type: eventType,
Index: changes.Index,
Key: after.ID,
Payload: &EvalEvent{
Eval: after,
},
}

events = append(events, event)

case "allocs":
after, ok := change.After.(*structs.Allocation)
if !ok {
return nil, fmt.Errorf("transaction change was not an Allocation")
}

event := stream.Event{
Topic: TopicAlloc,
Type: eventType,
Index: changes.Index,
Key: after.ID,
Payload: &AllocEvent{
Alloc: after,
},
}

events = append(events, event)
}
}

return events, nil
}
Loading