Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prevent double job status update; periodic job count miscalculation #9768

Merged
merged 7 commits into from
Jan 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1345,9 +1345,8 @@ func apiConnectGatewayProxyToStructs(in *api.ConsulGatewayProxy) *structs.Consul
return nil
}

var bindAddresses map[string]*structs.ConsulGatewayBindAddress
bindAddresses := make(map[string]*structs.ConsulGatewayBindAddress)
if in.EnvoyGatewayBindAddresses != nil {
bindAddresses = make(map[string]*structs.ConsulGatewayBindAddress)
for k, v := range in.EnvoyGatewayBindAddresses {
bindAddresses[k] = &structs.ConsulGatewayBindAddress{
Address: v.Address,
Expand Down
1 change: 1 addition & 0 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
_ "github.com/hashicorp/nomad/e2e/nodedrain"
_ "github.com/hashicorp/nomad/e2e/nomad09upgrade"
_ "github.com/hashicorp/nomad/e2e/nomadexec"
_ "github.com/hashicorp/nomad/e2e/periodic"
_ "github.com/hashicorp/nomad/e2e/podman"
_ "github.com/hashicorp/nomad/e2e/quotas"
_ "github.com/hashicorp/nomad/e2e/rescheduling"
Expand Down
29 changes: 28 additions & 1 deletion e2e/e2eutil/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"io/ioutil"
"os/exec"
"regexp"
"strings"
)

// Register registers a jobspec from a file but with a unique ID.
// The caller is responsible for recording that ID for later cleanup.
func Register(jobID, jobFilePath string) error {

cmd := exec.Command("nomad", "job", "run", "-")
stdin, err := cmd.StdinPipe()
if err != nil {
Expand Down Expand Up @@ -40,6 +40,33 @@ func Register(jobID, jobFilePath string) error {
return nil
}

// PeriodicForce forces a periodic job to dispatch, returning the child job ID
// or an error
func PeriodicForce(jobID string) error {
// nomad job periodic force
cmd := exec.Command("nomad", "job", "periodic", "force", jobID)

out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("could not register job: %w\n%v", err, string(out))
}

return nil
}

// JobInspectTemplate runs nomad job inspect and formats the output
// using the specified go template
func JobInspectTemplate(jobID, template string) (string, error) {
cmd := exec.Command("nomad", "job", "inspect", "-t", template, jobID)
out, err := cmd.CombinedOutput()
if err != nil {
return "", fmt.Errorf("could not inspect job: %w\n%v", err, string(out))
}
outStr := string(out)
outStr = strings.TrimSuffix(outStr, "\n")
return outStr, nil
}

// Register registers a jobspec from a string, also with a unique ID.
// The caller is responsible for recording that ID for later cleanup.
func RegisterFromJobspec(jobID, jobspec string) error {
Expand Down
28 changes: 28 additions & 0 deletions e2e/periodic/input/simple.nomad
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
job "periodic" {
datacenters = ["dc1"]
type = "batch"

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}


periodic {
cron = "* * * * *"
prohibit_overlap = true
}
drewbailey marked this conversation as resolved.
Show resolved Hide resolved

group "group" {
task "task" {
driver = "docker"

config {
image = "busybox:1"
command = "/bin/sh"
args = ["-c", "sleep 5"]
}
}
}
}

82 changes: 82 additions & 0 deletions e2e/periodic/periodic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package periodic

import (
"fmt"

"github.com/hashicorp/nomad/e2e/e2eutil"
"github.com/hashicorp/nomad/e2e/framework"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)

type PeriodicTest struct {
framework.TC
jobIDs []string
}

func init() {
framework.AddSuites(&framework.TestSuite{
Component: "Periodic",
CanRunLocal: true,
Cases: []framework.TestCase{
new(PeriodicTest),
},
})
}

func (tc *PeriodicTest) BeforeAll(f *framework.F) {
e2eutil.WaitForLeader(f.T(), tc.Nomad())
}

func (tc *PeriodicTest) AfterEach(f *framework.F) {
nomadClient := tc.Nomad()
j := nomadClient.Jobs()

for _, id := range tc.jobIDs {
j.Deregister(id, true, nil)
}
_, err := e2eutil.Command("nomad", "system", "gc")
f.NoError(err)
}

func (tc *PeriodicTest) TestPeriodicDispatch_Basic(f *framework.F) {
t := f.T()

uuid := uuid.Generate()
jobID := fmt.Sprintf("periodicjob-%s", uuid[0:8])
tc.jobIDs = append(tc.jobIDs, jobID)

// register job
e2eutil.Register(jobID, "periodic/input/simple.nomad")

// force dispatch
require.NoError(t, e2eutil.PeriodicForce(jobID))

// Get the child job ID
childID, err := e2eutil.JobInspectTemplate(jobID, `{{with index . 1}}{{printf "%s" .ID}}{{end}}`)
require.NoError(t, err)
require.NotEmpty(t, childID)

testutil.WaitForResult(func() (bool, error) {
status, err := e2eutil.JobInspectTemplate(jobID, `{{with index . 1}}{{printf "%s" .Status}}{{end}}`)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really slick 👍

require.NoError(t, err)
require.NotEmpty(t, status)
if status == "dead" {
return true, nil
}
return false, fmt.Errorf("expected periodic job to be dead, got %s", status)
}, func(err error) {
require.NoError(t, err)
})

// Assert there are no pending children
pending, err := e2eutil.JobInspectTemplate(jobID, `{{with index . 0}}{{printf "%d" .JobSummary.Children.Pending}}{{end}}`)
require.NoError(t, err)
require.Equal(t, "0", pending)

// Assert there are no pending children
dead, err := e2eutil.JobInspectTemplate(jobID, `{{with index . 0}}{{printf "%d" .JobSummary.Children.Dead}}{{end}}`)
require.NoError(t, err)
require.Equal(t, "1", dead)
}
3 changes: 3 additions & 0 deletions e2e/terraform/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
NOMAD_SHA ?= $(shell git rev-parse HEAD)
PKG_PATH = $(shell pwd)/../../pkg/linux_amd64/nomad

# The version of nomad that gets deployed depends on an order of precedence
# linked below
# https://github.com/hashicorp/nomad/blob/master/e2e/terraform/README.md#nomad-version
dev-cluster:
terraform apply -auto-approve \
-var="nomad_sha=$(NOMAD_SHA)"
Expand Down
57 changes: 57 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nomad

import (
"bytes"
"context"
"fmt"
"reflect"
"strings"
Expand Down Expand Up @@ -3422,3 +3423,59 @@ func TestFSM_ACLEvents(t *testing.T) {
})
}
}

// TestFSM_EventBroker_JobRegisterFSMEvents asserts that only a single job
// register event is emitted when registering a job
func TestFSM_EventBroker_JobRegisterFSMEvents(t *testing.T) {
t.Parallel()
fsm := testFSM(t)

job := mock.Job()
eval := mock.Eval()
eval.JobID = job.ID

req := structs.JobRegisterRequest{
Job: job,
Eval: eval,
}
buf, err := structs.Encode(structs.JobRegisterRequestType, req)
require.NoError(t, err)

resp := fsm.Apply(makeLog(buf))
require.Nil(t, resp)

broker, err := fsm.State().EventBroker()
require.NoError(t, err)

subReq := &stream.SubscribeRequest{
Topics: map[structs.Topic][]string{
structs.TopicJob: {"*"},
},
}

sub, err := broker.Subscribe(subReq)
require.NoError(t, err)

ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(500*time.Millisecond))
defer cancel()

// consume the queue
var events []structs.Event
for {
out, err := sub.Next(ctx)
if len(out.Events) == 0 {
break
}

// consume the queue until the deadline has exceeded or until we've
// received more events than expected
if err == context.DeadlineExceeded || len(events) > 1 {
break
}

events = append(events, out.Events...)
}

require.Len(t, events, 1)
require.Equal(t, structs.TypeJobRegistered, events[0].Type)
}
2 changes: 1 addition & 1 deletion nomad/job_endpoint_hook_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func gatewayProxyForBridge(gateway *structs.ConsulGateway) *structs.ConsulGatewa

func gatewayBindAddresses(ingress *structs.ConsulIngressConfigEntry) map[string]*structs.ConsulGatewayBindAddress {
if ingress == nil || len(ingress.Listeners) == 0 {
return nil
return make(map[string]*structs.ConsulGatewayBindAddress)
}

addresses := make(map[string]*structs.ConsulGatewayBindAddress)
Expand Down
4 changes: 2 additions & 2 deletions nomad/job_endpoint_hook_connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,12 +412,12 @@ func TestJobEndpointConnect_gatewayProxyIsDefault(t *testing.T) {
func TestJobEndpointConnect_gatewayBindAddresses(t *testing.T) {
t.Run("nil", func(t *testing.T) {
result := gatewayBindAddresses(nil)
require.Nil(t, result)
require.Empty(t, result)
})

t.Run("no listeners", func(t *testing.T) {
result := gatewayBindAddresses(&structs.ConsulIngressConfigEntry{Listeners: nil})
require.Nil(t, result)
require.Empty(t, result)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shoenig requested re-review, this commit seems to deviate from how a bit of the consul gateway empty values are handled

})

t.Run("simple", func(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion nomad/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,8 @@ func ConnectIngressGatewayJob(mode string, inject bool) *structs.Job {
Connect: &structs.ConsulConnect{
Gateway: &structs.ConsulGateway{
Proxy: &structs.ConsulGatewayProxy{
ConnectTimeout: helper.TimeToPtr(3 * time.Second),
ConnectTimeout: helper.TimeToPtr(3 * time.Second),
EnvoyGatewayBindAddresses: make(map[string]*structs.ConsulGatewayBindAddress),
},
Ingress: &structs.ConsulIngressConfigEntry{
Listeners: []*structs.ConsulIngressListener{{
Expand Down
Loading