From f7045c04d0da110d5084fd4ef90a604eafa83b28 Mon Sep 17 00:00:00 2001 From: Drew Bailey Date: Fri, 22 Jan 2021 09:18:17 -0500 Subject: [PATCH] prevent double job status update (#9768) * Prevent Job Statuses from being calculated twice https://github.com/hashicorp/nomad/pull/8435 introduced atomic eval insertion iwth job (de-)registration. This change removes a now obsolete guard which checked if the index was equal to the job.CreateIndex, which would empty the status. Now that the job regisration eval insetion is atomic with the registration this check is no longer necessary to set the job statuses correctly. * test to ensure only single job event for job register * periodic e2e * separate job update summary step * fix updatejobstability to use copy instead of modified reference of job * update envoygatewaybindaddresses copy to prevent job diff on null vs empty * set ConsulGatewayBindAddress to empty map instead of nil fix nil assertions for empty map rm unnecessary guard --- command/agent/job_endpoint.go | 3 +- e2e/e2e_test.go | 1 + e2e/e2eutil/job.go | 29 ++++++- e2e/periodic/input/simple.nomad | 28 ++++++ e2e/periodic/periodic.go | 82 ++++++++++++++++++ e2e/terraform/Makefile | 3 + nomad/fsm_test.go | 57 +++++++++++++ nomad/job_endpoint_hook_connect.go | 2 +- nomad/job_endpoint_hook_connect_test.go | 4 +- nomad/mock/mock.go | 3 +- nomad/state/state_store.go | 109 +++++++++++++----------- nomad/state/state_store_test.go | 40 +++------ nomad/structs/services_test.go | 17 ++++ 13 files changed, 293 insertions(+), 85 deletions(-) create mode 100644 e2e/periodic/input/simple.nomad create mode 100644 e2e/periodic/periodic.go diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index a1015b058c6..7f73453ca82 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -1345,9 +1345,8 @@ func apiConnectGatewayProxyToStructs(in *api.ConsulGatewayProxy) *structs.Consul return nil } - var bindAddresses map[string]*structs.ConsulGatewayBindAddress + bindAddresses := make(map[string]*structs.ConsulGatewayBindAddress) if in.EnvoyGatewayBindAddresses != nil { - bindAddresses = make(map[string]*structs.ConsulGatewayBindAddress) for k, v := range in.EnvoyGatewayBindAddresses { bindAddresses[k] = &structs.ConsulGatewayBindAddress{ Address: v.Address, diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index fdde940a566..87c0cb9bec3 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -24,6 +24,7 @@ import ( _ "github.com/hashicorp/nomad/e2e/nodedrain" _ "github.com/hashicorp/nomad/e2e/nomad09upgrade" _ "github.com/hashicorp/nomad/e2e/nomadexec" + _ "github.com/hashicorp/nomad/e2e/periodic" _ "github.com/hashicorp/nomad/e2e/podman" _ "github.com/hashicorp/nomad/e2e/quotas" _ "github.com/hashicorp/nomad/e2e/rescheduling" diff --git a/e2e/e2eutil/job.go b/e2e/e2eutil/job.go index 072f7e7843d..b954c762c22 100644 --- a/e2e/e2eutil/job.go +++ b/e2e/e2eutil/job.go @@ -6,12 +6,12 @@ import ( "io/ioutil" "os/exec" "regexp" + "strings" ) // Register registers a jobspec from a file but with a unique ID. // The caller is responsible for recording that ID for later cleanup. func Register(jobID, jobFilePath string) error { - cmd := exec.Command("nomad", "job", "run", "-") stdin, err := cmd.StdinPipe() if err != nil { @@ -40,6 +40,33 @@ func Register(jobID, jobFilePath string) error { return nil } +// PeriodicForce forces a periodic job to dispatch, returning the child job ID +// or an error +func PeriodicForce(jobID string) error { + // nomad job periodic force + cmd := exec.Command("nomad", "job", "periodic", "force", jobID) + + out, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("could not register job: %w\n%v", err, string(out)) + } + + return nil +} + +// JobInspectTemplate runs nomad job inspect and formats the output +// using the specified go template +func JobInspectTemplate(jobID, template string) (string, error) { + cmd := exec.Command("nomad", "job", "inspect", "-t", template, jobID) + out, err := cmd.CombinedOutput() + if err != nil { + return "", fmt.Errorf("could not inspect job: %w\n%v", err, string(out)) + } + outStr := string(out) + outStr = strings.TrimSuffix(outStr, "\n") + return outStr, nil +} + // Register registers a jobspec from a string, also with a unique ID. // The caller is responsible for recording that ID for later cleanup. func RegisterFromJobspec(jobID, jobspec string) error { diff --git a/e2e/periodic/input/simple.nomad b/e2e/periodic/input/simple.nomad new file mode 100644 index 00000000000..ea9920aa915 --- /dev/null +++ b/e2e/periodic/input/simple.nomad @@ -0,0 +1,28 @@ +job "periodic" { + datacenters = ["dc1"] + type = "batch" + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + + + periodic { + cron = "* * * * *" + prohibit_overlap = true + } + + group "group" { + task "task" { + driver = "docker" + + config { + image = "busybox:1" + command = "/bin/sh" + args = ["-c", "sleep 5"] + } + } + } +} + diff --git a/e2e/periodic/periodic.go b/e2e/periodic/periodic.go new file mode 100644 index 00000000000..8cfcd36a642 --- /dev/null +++ b/e2e/periodic/periodic.go @@ -0,0 +1,82 @@ +package periodic + +import ( + "fmt" + + "github.com/hashicorp/nomad/e2e/e2eutil" + "github.com/hashicorp/nomad/e2e/framework" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" +) + +type PeriodicTest struct { + framework.TC + jobIDs []string +} + +func init() { + framework.AddSuites(&framework.TestSuite{ + Component: "Periodic", + CanRunLocal: true, + Cases: []framework.TestCase{ + new(PeriodicTest), + }, + }) +} + +func (tc *PeriodicTest) BeforeAll(f *framework.F) { + e2eutil.WaitForLeader(f.T(), tc.Nomad()) +} + +func (tc *PeriodicTest) AfterEach(f *framework.F) { + nomadClient := tc.Nomad() + j := nomadClient.Jobs() + + for _, id := range tc.jobIDs { + j.Deregister(id, true, nil) + } + _, err := e2eutil.Command("nomad", "system", "gc") + f.NoError(err) +} + +func (tc *PeriodicTest) TestPeriodicDispatch_Basic(f *framework.F) { + t := f.T() + + uuid := uuid.Generate() + jobID := fmt.Sprintf("periodicjob-%s", uuid[0:8]) + tc.jobIDs = append(tc.jobIDs, jobID) + + // register job + e2eutil.Register(jobID, "periodic/input/simple.nomad") + + // force dispatch + require.NoError(t, e2eutil.PeriodicForce(jobID)) + + // Get the child job ID + childID, err := e2eutil.JobInspectTemplate(jobID, `{{with index . 1}}{{printf "%s" .ID}}{{end}}`) + require.NoError(t, err) + require.NotEmpty(t, childID) + + testutil.WaitForResult(func() (bool, error) { + status, err := e2eutil.JobInspectTemplate(jobID, `{{with index . 1}}{{printf "%s" .Status}}{{end}}`) + require.NoError(t, err) + require.NotEmpty(t, status) + if status == "dead" { + return true, nil + } + return false, fmt.Errorf("expected periodic job to be dead, got %s", status) + }, func(err error) { + require.NoError(t, err) + }) + + // Assert there are no pending children + pending, err := e2eutil.JobInspectTemplate(jobID, `{{with index . 0}}{{printf "%d" .JobSummary.Children.Pending}}{{end}}`) + require.NoError(t, err) + require.Equal(t, "0", pending) + + // Assert there are no pending children + dead, err := e2eutil.JobInspectTemplate(jobID, `{{with index . 0}}{{printf "%d" .JobSummary.Children.Dead}}{{end}}`) + require.NoError(t, err) + require.Equal(t, "1", dead) +} diff --git a/e2e/terraform/Makefile b/e2e/terraform/Makefile index 9a473a26d7a..863d4f50c10 100644 --- a/e2e/terraform/Makefile +++ b/e2e/terraform/Makefile @@ -1,6 +1,9 @@ NOMAD_SHA ?= $(shell git rev-parse HEAD) PKG_PATH = $(shell pwd)/../../pkg/linux_amd64/nomad +# The version of nomad that gets deployed depends on an order of precedence +# linked below +# https://github.com/hashicorp/nomad/blob/master/e2e/terraform/README.md#nomad-version dev-cluster: terraform apply -auto-approve \ -var="nomad_sha=$(NOMAD_SHA)" diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 20ecf18f32d..83949dc0e22 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -2,6 +2,7 @@ package nomad import ( "bytes" + "context" "fmt" "reflect" "strings" @@ -3422,3 +3423,59 @@ func TestFSM_ACLEvents(t *testing.T) { }) } } + +// TestFSM_EventBroker_JobRegisterFSMEvents asserts that only a single job +// register event is emitted when registering a job +func TestFSM_EventBroker_JobRegisterFSMEvents(t *testing.T) { + t.Parallel() + fsm := testFSM(t) + + job := mock.Job() + eval := mock.Eval() + eval.JobID = job.ID + + req := structs.JobRegisterRequest{ + Job: job, + Eval: eval, + } + buf, err := structs.Encode(structs.JobRegisterRequestType, req) + require.NoError(t, err) + + resp := fsm.Apply(makeLog(buf)) + require.Nil(t, resp) + + broker, err := fsm.State().EventBroker() + require.NoError(t, err) + + subReq := &stream.SubscribeRequest{ + Topics: map[structs.Topic][]string{ + structs.TopicJob: {"*"}, + }, + } + + sub, err := broker.Subscribe(subReq) + require.NoError(t, err) + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(500*time.Millisecond)) + defer cancel() + + // consume the queue + var events []structs.Event + for { + out, err := sub.Next(ctx) + if len(out.Events) == 0 { + break + } + + // consume the queue until the deadline has exceeded or until we've + // received more events than expected + if err == context.DeadlineExceeded || len(events) > 1 { + break + } + + events = append(events, out.Events...) + } + + require.Len(t, events, 1) + require.Equal(t, structs.TypeJobRegistered, events[0].Type) +} diff --git a/nomad/job_endpoint_hook_connect.go b/nomad/job_endpoint_hook_connect.go index 40f7eeeb0a1..e7249097f71 100644 --- a/nomad/job_endpoint_hook_connect.go +++ b/nomad/job_endpoint_hook_connect.go @@ -345,7 +345,7 @@ func gatewayProxyForBridge(gateway *structs.ConsulGateway) *structs.ConsulGatewa func gatewayBindAddresses(ingress *structs.ConsulIngressConfigEntry) map[string]*structs.ConsulGatewayBindAddress { if ingress == nil || len(ingress.Listeners) == 0 { - return nil + return make(map[string]*structs.ConsulGatewayBindAddress) } addresses := make(map[string]*structs.ConsulGatewayBindAddress) diff --git a/nomad/job_endpoint_hook_connect_test.go b/nomad/job_endpoint_hook_connect_test.go index 53252c2acf4..494dca56c17 100644 --- a/nomad/job_endpoint_hook_connect_test.go +++ b/nomad/job_endpoint_hook_connect_test.go @@ -412,12 +412,12 @@ func TestJobEndpointConnect_gatewayProxyIsDefault(t *testing.T) { func TestJobEndpointConnect_gatewayBindAddresses(t *testing.T) { t.Run("nil", func(t *testing.T) { result := gatewayBindAddresses(nil) - require.Nil(t, result) + require.Empty(t, result) }) t.Run("no listeners", func(t *testing.T) { result := gatewayBindAddresses(&structs.ConsulIngressConfigEntry{Listeners: nil}) - require.Nil(t, result) + require.Empty(t, result) }) t.Run("simple", func(t *testing.T) { diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index ccb8a4c1910..a3fe090542c 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -875,7 +875,8 @@ func ConnectIngressGatewayJob(mode string, inject bool) *structs.Job { Connect: &structs.ConsulConnect{ Gateway: &structs.ConsulGateway{ Proxy: &structs.ConsulGatewayProxy{ - ConnectTimeout: helper.TimeToPtr(3 * time.Second), + ConnectTimeout: helper.TimeToPtr(3 * time.Second), + EnvoyGatewayBindAddresses: make(map[string]*structs.ConsulGatewayBindAddress), }, Ingress: &structs.ConsulIngressConfigEntry{ Listeners: []*structs.ConsulIngressListener{{ diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 1c749827327..bcbc6c16787 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -4412,6 +4412,7 @@ func (s *StateStore) setJobStatuses(index uint64, txn *txn, if err := s.setJobStatus(index, txn, existing.(*structs.Job), evalDelete, forceStatus); err != nil { return err } + } return nil @@ -4427,9 +4428,7 @@ func (s *StateStore) setJobStatus(index uint64, txn *txn, // Capture the current status so we can check if there is a change oldStatus := job.Status - if index == job.CreateIndex { - oldStatus = "" - } + firstPass := index == job.CreateIndex newStatus := forceStatus // If forceStatus is not set, compute the jobs status. @@ -4441,8 +4440,12 @@ func (s *StateStore) setJobStatus(index uint64, txn *txn, } } - // Fast-path if nothing has changed. + // Fast-path if the job has changed. + // Still update the job summary if necessary. if oldStatus == newStatus { + if err := s.setJobSummary(txn, job, index, oldStatus, newStatus, firstPass); err != nil { + return err + } return nil } @@ -4460,64 +4463,72 @@ func (s *StateStore) setJobStatus(index uint64, txn *txn, } // Update the children summary - if updated.ParentID != "" { - // Try to update the summary of the parent job summary - summaryRaw, err := txn.First("job_summary", "id", updated.Namespace, updated.ParentID) - if err != nil { - return fmt.Errorf("unable to retrieve summary for parent job: %v", err) - } + if err := s.setJobSummary(txn, updated, index, oldStatus, newStatus, firstPass); err != nil { + return fmt.Errorf("job summary update failed %w", err) + } + return nil +} - // Only continue if the summary exists. It could not exist if the parent - // job was removed - if summaryRaw != nil { - existing := summaryRaw.(*structs.JobSummary) - pSummary := existing.Copy() - if pSummary.Children == nil { - pSummary.Children = new(structs.JobChildrenSummary) - } +func (s *StateStore) setJobSummary(txn *txn, updated *structs.Job, index uint64, oldStatus, newStatus string, firstPass bool) error { + if updated.ParentID == "" { + return nil + } - // Determine the transition and update the correct fields - children := pSummary.Children + // Try to update the summary of the parent job summary + summaryRaw, err := txn.First("job_summary", "id", updated.Namespace, updated.ParentID) + if err != nil { + return fmt.Errorf("unable to retrieve summary for parent job: %v", err) + } - // Decrement old status - if oldStatus != "" { - switch oldStatus { - case structs.JobStatusPending: - children.Pending-- - case structs.JobStatusRunning: - children.Running-- - case structs.JobStatusDead: - children.Dead-- - default: - return fmt.Errorf("unknown old job status %q", oldStatus) - } - } + // Only continue if the summary exists. It could not exist if the parent + // job was removed + if summaryRaw != nil { + existing := summaryRaw.(*structs.JobSummary) + pSummary := existing.Copy() + if pSummary.Children == nil { + pSummary.Children = new(structs.JobChildrenSummary) + } + + // Determine the transition and update the correct fields + children := pSummary.Children - // Increment new status - switch newStatus { + // Decrement old status + if !firstPass { + switch oldStatus { case structs.JobStatusPending: - children.Pending++ + children.Pending-- case structs.JobStatusRunning: - children.Running++ + children.Running-- case structs.JobStatusDead: - children.Dead++ + children.Dead-- default: - return fmt.Errorf("unknown new job status %q", newStatus) + return fmt.Errorf("unknown old job status %q", oldStatus) } + } - // Update the index - pSummary.ModifyIndex = index + // Increment new status + switch newStatus { + case structs.JobStatusPending: + children.Pending++ + case structs.JobStatusRunning: + children.Running++ + case structs.JobStatusDead: + children.Dead++ + default: + return fmt.Errorf("unknown new job status %q", newStatus) + } - // Insert the summary - if err := txn.Insert("job_summary", pSummary); err != nil { - return fmt.Errorf("job summary insert failed: %v", err) - } - if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { - return fmt.Errorf("index update failed: %v", err) - } + // Update the index + pSummary.ModifyIndex = index + + // Insert the summary + if err := txn.Insert("job_summary", pSummary); err != nil { + return fmt.Errorf("job summary insert failed: %v", err) + } + if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) } } - return nil } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 7bff0f1ca71..1c42e677f1c 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -6839,32 +6839,20 @@ func TestStateStore_UpdateJobStability(t *testing.T) { // Insert a job twice to get two versions job := mock.Job() - if err := state.UpsertJob(structs.MsgTypeTestSetup, 1, job); err != nil { - t.Fatalf("bad: %v", err) - } + require.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 1, job)) - if err := state.UpsertJob(structs.MsgTypeTestSetup, 2, job); err != nil { - t.Fatalf("bad: %v", err) - } + require.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 2, job.Copy())) // Update the stability to true err := state.UpdateJobStability(3, job.Namespace, job.ID, 0, true) - if err != nil { - t.Fatalf("bad: %v", err) - } + require.NoError(t, err) // Check that the job was updated properly ws := memdb.NewWatchSet() - jout, _ := state.JobByIDAndVersion(ws, job.Namespace, job.ID, 0) - if err != nil { - t.Fatalf("bad: %v", err) - } - if jout == nil { - t.Fatalf("bad: %#v", jout) - } - if !jout.Stable { - t.Fatalf("job not marked stable %#v", jout) - } + jout, err := state.JobByIDAndVersion(ws, job.Namespace, job.ID, 0) + require.NoError(t, err) + require.NotNil(t, jout) + require.True(t, jout.Stable, "job not marked as stable") // Update the stability to false err = state.UpdateJobStability(3, job.Namespace, job.ID, 0, false) @@ -6873,16 +6861,10 @@ func TestStateStore_UpdateJobStability(t *testing.T) { } // Check that the job was updated properly - jout, _ = state.JobByIDAndVersion(ws, job.Namespace, job.ID, 0) - if err != nil { - t.Fatalf("bad: %v", err) - } - if jout == nil { - t.Fatalf("bad: %#v", jout) - } - if jout.Stable { - t.Fatalf("job marked stable %#v", jout) - } + jout, err = state.JobByIDAndVersion(ws, job.Namespace, job.ID, 0) + require.NoError(t, err) + require.NotNil(t, jout) + require.False(t, jout.Stable) } // Test that nonexistent deployment can't be promoted diff --git a/nomad/structs/services_test.go b/nomad/structs/services_test.go index 12b3b5b55c4..493c6fefcc2 100644 --- a/nomad/structs/services_test.go +++ b/nomad/structs/services_test.go @@ -293,6 +293,23 @@ func TestConsulConnect_CopyEquals(t *testing.T) { require.False(t, c.Equals(o)) } +func TestConsulConnect_GatewayProxy_CopyEquals(t *testing.T) { + t.Parallel() + + c := &ConsulGatewayProxy{ + ConnectTimeout: helper.TimeToPtr(1 * time.Second), + EnvoyGatewayBindTaggedAddresses: false, + EnvoyGatewayBindAddresses: make(map[string]*ConsulGatewayBindAddress), + } + + require.NoError(t, c.Validate()) + + // Copies should be equivalent + o := c.Copy() + require.Equal(t, c, o) + require.True(t, c.Equals(o)) +} + func TestSidecarTask_MergeIntoTask(t *testing.T) { t.Parallel()