Skip to content

Commit

Permalink
prevent double job status update (#9768)
Browse files Browse the repository at this point in the history
* Prevent Job Statuses from being calculated twice

#8435 introduced atomic eval
insertion iwth job (de-)registration. This change removes a now obsolete
guard which checked if the index was equal to the job.CreateIndex, which
would empty the status. Now that the job regisration eval insetion is
atomic with the registration this check is no longer necessary to set
the job statuses correctly.

* test to ensure only single job event for job register

* periodic e2e

* separate job update summary step

* fix updatejobstability to use copy instead of modified reference of job

* update envoygatewaybindaddresses copy to prevent job diff on null vs empty

* set ConsulGatewayBindAddress to empty map instead of nil

fix nil assertions for empty map

rm unnecessary guard
  • Loading branch information
drewbailey authored Jan 22, 2021
1 parent 9069631 commit 3cb1132
Show file tree
Hide file tree
Showing 13 changed files with 293 additions and 85 deletions.
3 changes: 1 addition & 2 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1345,9 +1345,8 @@ func apiConnectGatewayProxyToStructs(in *api.ConsulGatewayProxy) *structs.Consul
return nil
}

var bindAddresses map[string]*structs.ConsulGatewayBindAddress
bindAddresses := make(map[string]*structs.ConsulGatewayBindAddress)
if in.EnvoyGatewayBindAddresses != nil {
bindAddresses = make(map[string]*structs.ConsulGatewayBindAddress)
for k, v := range in.EnvoyGatewayBindAddresses {
bindAddresses[k] = &structs.ConsulGatewayBindAddress{
Address: v.Address,
Expand Down
1 change: 1 addition & 0 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
_ "github.com/hashicorp/nomad/e2e/nodedrain"
_ "github.com/hashicorp/nomad/e2e/nomad09upgrade"
_ "github.com/hashicorp/nomad/e2e/nomadexec"
_ "github.com/hashicorp/nomad/e2e/periodic"
_ "github.com/hashicorp/nomad/e2e/podman"
_ "github.com/hashicorp/nomad/e2e/quotas"
_ "github.com/hashicorp/nomad/e2e/rescheduling"
Expand Down
29 changes: 28 additions & 1 deletion e2e/e2eutil/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"io/ioutil"
"os/exec"
"regexp"
"strings"
)

// Register registers a jobspec from a file but with a unique ID.
// The caller is responsible for recording that ID for later cleanup.
func Register(jobID, jobFilePath string) error {

cmd := exec.Command("nomad", "job", "run", "-")
stdin, err := cmd.StdinPipe()
if err != nil {
Expand Down Expand Up @@ -40,6 +40,33 @@ func Register(jobID, jobFilePath string) error {
return nil
}

// PeriodicForce forces a periodic job to dispatch, returning the child job ID
// or an error
func PeriodicForce(jobID string) error {
// nomad job periodic force
cmd := exec.Command("nomad", "job", "periodic", "force", jobID)

out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("could not register job: %w\n%v", err, string(out))
}

return nil
}

// JobInspectTemplate runs nomad job inspect and formats the output
// using the specified go template
func JobInspectTemplate(jobID, template string) (string, error) {
cmd := exec.Command("nomad", "job", "inspect", "-t", template, jobID)
out, err := cmd.CombinedOutput()
if err != nil {
return "", fmt.Errorf("could not inspect job: %w\n%v", err, string(out))
}
outStr := string(out)
outStr = strings.TrimSuffix(outStr, "\n")
return outStr, nil
}

// Register registers a jobspec from a string, also with a unique ID.
// The caller is responsible for recording that ID for later cleanup.
func RegisterFromJobspec(jobID, jobspec string) error {
Expand Down
28 changes: 28 additions & 0 deletions e2e/periodic/input/simple.nomad
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
job "periodic" {
datacenters = ["dc1"]
type = "batch"

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}


periodic {
cron = "* * * * *"
prohibit_overlap = true
}

group "group" {
task "task" {
driver = "docker"

config {
image = "busybox:1"
command = "/bin/sh"
args = ["-c", "sleep 5"]
}
}
}
}

82 changes: 82 additions & 0 deletions e2e/periodic/periodic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package periodic

import (
"fmt"

"github.com/hashicorp/nomad/e2e/e2eutil"
"github.com/hashicorp/nomad/e2e/framework"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)

type PeriodicTest struct {
framework.TC
jobIDs []string
}

func init() {
framework.AddSuites(&framework.TestSuite{
Component: "Periodic",
CanRunLocal: true,
Cases: []framework.TestCase{
new(PeriodicTest),
},
})
}

func (tc *PeriodicTest) BeforeAll(f *framework.F) {
e2eutil.WaitForLeader(f.T(), tc.Nomad())
}

func (tc *PeriodicTest) AfterEach(f *framework.F) {
nomadClient := tc.Nomad()
j := nomadClient.Jobs()

for _, id := range tc.jobIDs {
j.Deregister(id, true, nil)
}
_, err := e2eutil.Command("nomad", "system", "gc")
f.NoError(err)
}

func (tc *PeriodicTest) TestPeriodicDispatch_Basic(f *framework.F) {
t := f.T()

uuid := uuid.Generate()
jobID := fmt.Sprintf("periodicjob-%s", uuid[0:8])
tc.jobIDs = append(tc.jobIDs, jobID)

// register job
e2eutil.Register(jobID, "periodic/input/simple.nomad")

// force dispatch
require.NoError(t, e2eutil.PeriodicForce(jobID))

// Get the child job ID
childID, err := e2eutil.JobInspectTemplate(jobID, `{{with index . 1}}{{printf "%s" .ID}}{{end}}`)
require.NoError(t, err)
require.NotEmpty(t, childID)

testutil.WaitForResult(func() (bool, error) {
status, err := e2eutil.JobInspectTemplate(jobID, `{{with index . 1}}{{printf "%s" .Status}}{{end}}`)
require.NoError(t, err)
require.NotEmpty(t, status)
if status == "dead" {
return true, nil
}
return false, fmt.Errorf("expected periodic job to be dead, got %s", status)
}, func(err error) {
require.NoError(t, err)
})

// Assert there are no pending children
pending, err := e2eutil.JobInspectTemplate(jobID, `{{with index . 0}}{{printf "%d" .JobSummary.Children.Pending}}{{end}}`)
require.NoError(t, err)
require.Equal(t, "0", pending)

// Assert there are no pending children
dead, err := e2eutil.JobInspectTemplate(jobID, `{{with index . 0}}{{printf "%d" .JobSummary.Children.Dead}}{{end}}`)
require.NoError(t, err)
require.Equal(t, "1", dead)
}
3 changes: 3 additions & 0 deletions e2e/terraform/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
NOMAD_SHA ?= $(shell git rev-parse HEAD)
PKG_PATH = $(shell pwd)/../../pkg/linux_amd64/nomad

# The version of nomad that gets deployed depends on an order of precedence
# linked below
# https://github.com/hashicorp/nomad/blob/master/e2e/terraform/README.md#nomad-version
dev-cluster:
terraform apply -auto-approve \
-var="nomad_sha=$(NOMAD_SHA)"
Expand Down
57 changes: 57 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nomad

import (
"bytes"
"context"
"fmt"
"reflect"
"strings"
Expand Down Expand Up @@ -3422,3 +3423,59 @@ func TestFSM_ACLEvents(t *testing.T) {
})
}
}

// TestFSM_EventBroker_JobRegisterFSMEvents asserts that only a single job
// register event is emitted when registering a job
func TestFSM_EventBroker_JobRegisterFSMEvents(t *testing.T) {
t.Parallel()
fsm := testFSM(t)

job := mock.Job()
eval := mock.Eval()
eval.JobID = job.ID

req := structs.JobRegisterRequest{
Job: job,
Eval: eval,
}
buf, err := structs.Encode(structs.JobRegisterRequestType, req)
require.NoError(t, err)

resp := fsm.Apply(makeLog(buf))
require.Nil(t, resp)

broker, err := fsm.State().EventBroker()
require.NoError(t, err)

subReq := &stream.SubscribeRequest{
Topics: map[structs.Topic][]string{
structs.TopicJob: {"*"},
},
}

sub, err := broker.Subscribe(subReq)
require.NoError(t, err)

ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(500*time.Millisecond))
defer cancel()

// consume the queue
var events []structs.Event
for {
out, err := sub.Next(ctx)
if len(out.Events) == 0 {
break
}

// consume the queue until the deadline has exceeded or until we've
// received more events than expected
if err == context.DeadlineExceeded || len(events) > 1 {
break
}

events = append(events, out.Events...)
}

require.Len(t, events, 1)
require.Equal(t, structs.TypeJobRegistered, events[0].Type)
}
2 changes: 1 addition & 1 deletion nomad/job_endpoint_hook_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func gatewayProxyForBridge(gateway *structs.ConsulGateway) *structs.ConsulGatewa

func gatewayBindAddresses(ingress *structs.ConsulIngressConfigEntry) map[string]*structs.ConsulGatewayBindAddress {
if ingress == nil || len(ingress.Listeners) == 0 {
return nil
return make(map[string]*structs.ConsulGatewayBindAddress)
}

addresses := make(map[string]*structs.ConsulGatewayBindAddress)
Expand Down
4 changes: 2 additions & 2 deletions nomad/job_endpoint_hook_connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,12 +412,12 @@ func TestJobEndpointConnect_gatewayProxyIsDefault(t *testing.T) {
func TestJobEndpointConnect_gatewayBindAddresses(t *testing.T) {
t.Run("nil", func(t *testing.T) {
result := gatewayBindAddresses(nil)
require.Nil(t, result)
require.Empty(t, result)
})

t.Run("no listeners", func(t *testing.T) {
result := gatewayBindAddresses(&structs.ConsulIngressConfigEntry{Listeners: nil})
require.Nil(t, result)
require.Empty(t, result)
})

t.Run("simple", func(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion nomad/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,8 @@ func ConnectIngressGatewayJob(mode string, inject bool) *structs.Job {
Connect: &structs.ConsulConnect{
Gateway: &structs.ConsulGateway{
Proxy: &structs.ConsulGatewayProxy{
ConnectTimeout: helper.TimeToPtr(3 * time.Second),
ConnectTimeout: helper.TimeToPtr(3 * time.Second),
EnvoyGatewayBindAddresses: make(map[string]*structs.ConsulGatewayBindAddress),
},
Ingress: &structs.ConsulIngressConfigEntry{
Listeners: []*structs.ConsulIngressListener{{
Expand Down
Loading

0 comments on commit 3cb1132

Please sign in to comment.