diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index a18eeabad58..2e6c6db2ce8 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -15,6 +15,7 @@ import ( _ "github.com/hashicorp/nomad/e2e/consultemplate" _ "github.com/hashicorp/nomad/e2e/csi" _ "github.com/hashicorp/nomad/e2e/deployment" + _ "github.com/hashicorp/nomad/e2e/events" _ "github.com/hashicorp/nomad/e2e/example" _ "github.com/hashicorp/nomad/e2e/lifecycle" _ "github.com/hashicorp/nomad/e2e/metrics" diff --git a/e2e/events/events.go b/e2e/events/events.go new file mode 100644 index 00000000000..7f8015e1de0 --- /dev/null +++ b/e2e/events/events.go @@ -0,0 +1,184 @@ +package events + +import ( + "context" + "fmt" + + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/e2e/e2eutil" + "github.com/hashicorp/nomad/e2e/framework" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" +) + +type EventsTest struct { + framework.TC + jobIDs []string +} + +func init() { + framework.AddSuites(&framework.TestSuite{ + Component: "Events", + CanRunLocal: true, + Cases: []framework.TestCase{ + new(EventsTest), + }, + }) +} + +func (tc *EventsTest) BeforeAll(f *framework.F) { + e2eutil.WaitForLeader(f.T(), tc.Nomad()) +} + +func (tc *EventsTest) AfterEach(f *framework.F) { + nomadClient := tc.Nomad() + j := nomadClient.Jobs() + + for _, id := range tc.jobIDs { + j.Deregister(id, true, nil) + } + _, err := e2eutil.Command("nomad", "system", "gc") + f.NoError(err) +} + +// TestDeploymentEvents registers a job then applies a change +// An event stream listening to Deployment Events asserts that +// a DeploymentPromotion event is emitted +func (tc *EventsTest) TestDeploymentEvents(f *framework.F) { + t := f.T() + + nomadClient := tc.Nomad() + events := nomadClient.EventStream() + + uuid := uuid.Generate() + jobID := fmt.Sprintf("deployment-%s", uuid[0:8]) + tc.jobIDs = append(tc.jobIDs, jobID) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + topics := map[api.Topic][]string{ + "Deployment": {jobID}, + } + + var deployEvents []api.Event + streamCh, err := events.Stream(ctx, topics, 0, nil) + require.NoError(t, err) + + // gather deployment events + go func() { + for { + select { + case <-ctx.Done(): + return + case event := <-streamCh: + if event.IsHeartbeat() { + continue + } + + deployEvents = append(deployEvents, event.Events...) + } + } + }() + + // register job + e2eutil.RegisterAndWaitForAllocs(t, nomadClient, "events/input/initial.nomad", jobID, "") + + // update job + e2eutil.RegisterAllocs(t, nomadClient, "events/input/deploy.nomad", jobID, "") + + ds := e2eutil.DeploymentsForJob(t, nomadClient, jobID) + require.Equal(t, 2, len(ds)) + deploy := ds[0] + + // wait for deployment to be running and ready for auto promote + e2eutil.WaitForDeployment(t, nomadClient, deploy.ID, structs.DeploymentStatusRunning, structs.DeploymentStatusDescriptionRunningAutoPromotion) + + // ensure there is a deployment promotion event + testutil.WaitForResult(func() (bool, error) { + for _, e := range deployEvents { + if e.Type == "DeploymentPromotion" { + return true, nil + } + } + var got []string + for _, e := range deployEvents { + got = append(got, e.Type) + } + return false, fmt.Errorf("expected to receive deployment promotion event, got: %#v", got) + }, func(e error) { + f.NoError(e) + }) +} + +// TestBlockedEvalEvents applies a job with a large memory requirement. The +// event stream checks for a failed task group alloc +func (tc *EventsTest) TestBlockedEvalEvents(f *framework.F) { + t := f.T() + + nomadClient := tc.Nomad() + events := nomadClient.EventStream() + + uuid := uuid.Generate() + jobID := fmt.Sprintf("blocked-deploy-%s", uuid[0:8]) + tc.jobIDs = append(tc.jobIDs, jobID) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + topics := map[api.Topic][]string{ + "Eval": {"*"}, + } + + var evalEvents []api.Event + streamCh, err := events.Stream(ctx, topics, 0, nil) + require.NoError(t, err) + + // gather deployment events + go func() { + for { + select { + case <-ctx.Done(): + return + case event := <-streamCh: + if event.IsHeartbeat() { + continue + } + + evalEvents = append(evalEvents, event.Events...) + } + } + }() + + // register job + e2eutil.Register(jobID, "events/input/large-job.nomad") + + // ensure there is a deployment promotion event + testutil.WaitForResult(func() (bool, error) { + for _, e := range evalEvents { + evalRaw, ok := e.Payload["Eval"].(map[string]interface{}) + if !ok { + return false, fmt.Errorf("type assertion on eval") + } + + ftg, ok := evalRaw["FailedTGAllocs"].(map[string]interface{}) + if !ok { + continue + } + + tg, ok := ftg["one"].(map[string]interface{}) + if !ok { + continue + } + mem := tg["DimensionExhausted"].(map[string]interface{})["memory"] + require.NotNil(t, mem, "memory dimension was nil") + memInt := int(mem.(float64)) + require.Greater(t, memInt, 0, "memory dimension was zero") + return true, nil + + } + return false, fmt.Errorf("expected blocked eval with memory exhausted, got: %#v", evalEvents) + }, func(e error) { + require.NoError(t, e) + }) +} diff --git a/e2e/events/input/deploy.nomad b/e2e/events/input/deploy.nomad new file mode 100644 index 00000000000..f889872d444 --- /dev/null +++ b/e2e/events/input/deploy.nomad @@ -0,0 +1,39 @@ +job "deployment_auto.nomad" { + datacenters = ["dc1", "dc2"] + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + + group "one" { + count = 3 + + update { + max_parallel = 3 + auto_promote = true + canary = 2 + min_healthy_time = "1s" + } + + task "one" { + driver = "raw_exec" + + env { + version = "1" + } + + config { + command = "/bin/sleep" + + # change args to update the job, the only changes + args = ["1000000"] + } + + resources { + cpu = 20 + memory = 20 + } + } + } +} diff --git a/e2e/events/input/initial.nomad b/e2e/events/input/initial.nomad new file mode 100644 index 00000000000..3384c64f9d5 --- /dev/null +++ b/e2e/events/input/initial.nomad @@ -0,0 +1,37 @@ +job "deployment_auto.nomad" { + datacenters = ["dc1"] + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + + group "one" { + count = 3 + + update { + max_parallel = 3 + auto_promote = true + canary = 2 + } + + task "one" { + driver = "raw_exec" + + env { + version = "0" + } + config { + command = "/bin/sleep" + + # change args to update the job, the only changes + args = ["1000000"] + } + + resources { + cpu = 20 + memory = 20 + } + } + } +} diff --git a/e2e/events/input/large-job.nomad b/e2e/events/input/large-job.nomad new file mode 100644 index 00000000000..2b15e6059a1 --- /dev/null +++ b/e2e/events/input/large-job.nomad @@ -0,0 +1,40 @@ +job "events" { + datacenters = ["dc1"] + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + + group "one" { + count = 3 + + update { + max_parallel = 3 + auto_promote = true + canary = 2 + min_healthy_time = "1s" + } + + task "one" { + driver = "raw_exec" + + env { + version = "1" + } + + config { + command = "/bin/sleep" + + # change args to update the job, the only changes + args = ["1000000"] + } + + resources { + cpu = 20 + memory = 2000000 + } + } + } +} + diff --git a/nomad/state/events.go b/nomad/state/events.go index b9884a085b1..58c0b5d190d 100644 --- a/nomad/state/events.go +++ b/nomad/state/events.go @@ -102,10 +102,14 @@ func GenericEventsFromChanges(tx ReadTxn, changes Changes) (*structs.Events, err } event := structs.Event{ - Topic: structs.TopicEval, - Type: eventType, - Index: changes.Index, - Key: after.ID, + Topic: structs.TopicEval, + Type: eventType, + Index: changes.Index, + Key: after.ID, + FilterKeys: []string{ + after.JobID, + after.DeploymentID, + }, Namespace: after.Namespace, Payload: &EvalEvent{ Eval: after, diff --git a/nomad/state/events_test.go b/nomad/state/events_test.go index 34ee35944f7..86d84d397b9 100644 --- a/nomad/state/events_test.go +++ b/nomad/state/events_test.go @@ -335,6 +335,8 @@ func TestGenericEventsFromChanges_EvalUpdateRequestType(t *testing.T) { e := events[0] require.Equal(t, structs.TopicEval, e.Topic) require.Equal(t, TypeEvalUpdated, e.Type) + require.Contains(t, e.FilterKeys, e2.JobID) + require.Contains(t, e.FilterKeys, e2.DeploymentID) event := e.Payload.(*EvalEvent) require.Equal(t, "blocked", event.Eval.Status) }