Skip to content

Commit

Permalink
adds two base event stream e2e tests (hashicorp#9126)
Browse files Browse the repository at this point in the history
* adds two base event stream e2e tests

test evaluation filter keys are included

* Apply suggestions from code review

Co-authored-by: Tim Gross <[email protected]>

* gc aftereach

Co-authored-by: Tim Gross <[email protected]>
  • Loading branch information
drewbailey and tgross authored Oct 20, 2020
1 parent cf9a86f commit 659943a
Show file tree
Hide file tree
Showing 7 changed files with 311 additions and 4 deletions.
1 change: 1 addition & 0 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
_ "github.com/hashicorp/nomad/e2e/consultemplate"
_ "github.com/hashicorp/nomad/e2e/csi"
_ "github.com/hashicorp/nomad/e2e/deployment"
_ "github.com/hashicorp/nomad/e2e/events"
_ "github.com/hashicorp/nomad/e2e/example"
_ "github.com/hashicorp/nomad/e2e/lifecycle"
_ "github.com/hashicorp/nomad/e2e/metrics"
Expand Down
184 changes: 184 additions & 0 deletions e2e/events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package events

import (
"context"
"fmt"

"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/e2e/e2eutil"
"github.com/hashicorp/nomad/e2e/framework"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)

type EventsTest struct {
framework.TC
jobIDs []string
}

func init() {
framework.AddSuites(&framework.TestSuite{
Component: "Events",
CanRunLocal: true,
Cases: []framework.TestCase{
new(EventsTest),
},
})
}

func (tc *EventsTest) BeforeAll(f *framework.F) {
e2eutil.WaitForLeader(f.T(), tc.Nomad())
}

func (tc *EventsTest) AfterEach(f *framework.F) {
nomadClient := tc.Nomad()
j := nomadClient.Jobs()

for _, id := range tc.jobIDs {
j.Deregister(id, true, nil)
}
_, err := e2eutil.Command("nomad", "system", "gc")
f.NoError(err)
}

// TestDeploymentEvents registers a job then applies a change
// An event stream listening to Deployment Events asserts that
// a DeploymentPromotion event is emitted
func (tc *EventsTest) TestDeploymentEvents(f *framework.F) {
t := f.T()

nomadClient := tc.Nomad()
events := nomadClient.EventStream()

uuid := uuid.Generate()
jobID := fmt.Sprintf("deployment-%s", uuid[0:8])
tc.jobIDs = append(tc.jobIDs, jobID)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

topics := map[api.Topic][]string{
"Deployment": {jobID},
}

var deployEvents []api.Event
streamCh, err := events.Stream(ctx, topics, 0, nil)
require.NoError(t, err)

// gather deployment events
go func() {
for {
select {
case <-ctx.Done():
return
case event := <-streamCh:
if event.IsHeartbeat() {
continue
}

deployEvents = append(deployEvents, event.Events...)
}
}
}()

// register job
e2eutil.RegisterAndWaitForAllocs(t, nomadClient, "events/input/initial.nomad", jobID, "")

// update job
e2eutil.RegisterAllocs(t, nomadClient, "events/input/deploy.nomad", jobID, "")

ds := e2eutil.DeploymentsForJob(t, nomadClient, jobID)
require.Equal(t, 2, len(ds))
deploy := ds[0]

// wait for deployment to be running and ready for auto promote
e2eutil.WaitForDeployment(t, nomadClient, deploy.ID, structs.DeploymentStatusRunning, structs.DeploymentStatusDescriptionRunningAutoPromotion)

// ensure there is a deployment promotion event
testutil.WaitForResult(func() (bool, error) {
for _, e := range deployEvents {
if e.Type == "DeploymentPromotion" {
return true, nil
}
}
var got []string
for _, e := range deployEvents {
got = append(got, e.Type)
}
return false, fmt.Errorf("expected to receive deployment promotion event, got: %#v", got)
}, func(e error) {
f.NoError(e)
})
}

// TestBlockedEvalEvents applies a job with a large memory requirement. The
// event stream checks for a failed task group alloc
func (tc *EventsTest) TestBlockedEvalEvents(f *framework.F) {
t := f.T()

nomadClient := tc.Nomad()
events := nomadClient.EventStream()

uuid := uuid.Generate()
jobID := fmt.Sprintf("blocked-deploy-%s", uuid[0:8])
tc.jobIDs = append(tc.jobIDs, jobID)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

topics := map[api.Topic][]string{
"Eval": {"*"},
}

var evalEvents []api.Event
streamCh, err := events.Stream(ctx, topics, 0, nil)
require.NoError(t, err)

// gather deployment events
go func() {
for {
select {
case <-ctx.Done():
return
case event := <-streamCh:
if event.IsHeartbeat() {
continue
}

evalEvents = append(evalEvents, event.Events...)
}
}
}()

// register job
e2eutil.Register(jobID, "events/input/large-job.nomad")

// ensure there is a deployment promotion event
testutil.WaitForResult(func() (bool, error) {
for _, e := range evalEvents {
evalRaw, ok := e.Payload["Eval"].(map[string]interface{})
if !ok {
return false, fmt.Errorf("type assertion on eval")
}

ftg, ok := evalRaw["FailedTGAllocs"].(map[string]interface{})
if !ok {
continue
}

tg, ok := ftg["one"].(map[string]interface{})
if !ok {
continue
}
mem := tg["DimensionExhausted"].(map[string]interface{})["memory"]
require.NotNil(t, mem, "memory dimension was nil")
memInt := int(mem.(float64))
require.Greater(t, memInt, 0, "memory dimension was zero")
return true, nil

}
return false, fmt.Errorf("expected blocked eval with memory exhausted, got: %#v", evalEvents)
}, func(e error) {
require.NoError(t, e)
})
}
39 changes: 39 additions & 0 deletions e2e/events/input/deploy.nomad
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
job "deployment_auto.nomad" {
datacenters = ["dc1", "dc2"]

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}

group "one" {
count = 3

update {
max_parallel = 3
auto_promote = true
canary = 2
min_healthy_time = "1s"
}

task "one" {
driver = "raw_exec"

env {
version = "1"
}

config {
command = "/bin/sleep"

# change args to update the job, the only changes
args = ["1000000"]
}

resources {
cpu = 20
memory = 20
}
}
}
}
37 changes: 37 additions & 0 deletions e2e/events/input/initial.nomad
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
job "deployment_auto.nomad" {
datacenters = ["dc1"]

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}

group "one" {
count = 3

update {
max_parallel = 3
auto_promote = true
canary = 2
}

task "one" {
driver = "raw_exec"

env {
version = "0"
}
config {
command = "/bin/sleep"

# change args to update the job, the only changes
args = ["1000000"]
}

resources {
cpu = 20
memory = 20
}
}
}
}
40 changes: 40 additions & 0 deletions e2e/events/input/large-job.nomad
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
job "events" {
datacenters = ["dc1"]

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}

group "one" {
count = 3

update {
max_parallel = 3
auto_promote = true
canary = 2
min_healthy_time = "1s"
}

task "one" {
driver = "raw_exec"

env {
version = "1"
}

config {
command = "/bin/sleep"

# change args to update the job, the only changes
args = ["1000000"]
}

resources {
cpu = 20
memory = 2000000
}
}
}
}

12 changes: 8 additions & 4 deletions nomad/state/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,14 @@ func GenericEventsFromChanges(tx ReadTxn, changes Changes) (*structs.Events, err
}

event := structs.Event{
Topic: structs.TopicEval,
Type: eventType,
Index: changes.Index,
Key: after.ID,
Topic: structs.TopicEval,
Type: eventType,
Index: changes.Index,
Key: after.ID,
FilterKeys: []string{
after.JobID,
after.DeploymentID,
},
Namespace: after.Namespace,
Payload: &EvalEvent{
Eval: after,
Expand Down
2 changes: 2 additions & 0 deletions nomad/state/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ func TestGenericEventsFromChanges_EvalUpdateRequestType(t *testing.T) {
e := events[0]
require.Equal(t, structs.TopicEval, e.Topic)
require.Equal(t, TypeEvalUpdated, e.Type)
require.Contains(t, e.FilterKeys, e2.JobID)
require.Contains(t, e.FilterKeys, e2.DeploymentID)
event := e.Payload.(*EvalEvent)
require.Equal(t, "blocked", event.Eval.Status)
}
Expand Down

0 comments on commit 659943a

Please sign in to comment.