Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: improve group service stanza interpolation and check_restart support #6586

Merged
merged 6 commits into from
Nov 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/allochealth/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ OUTER:
type taskHealthState struct {
task *structs.Task
state *structs.TaskState
taskRegistrations *consul.TaskRegistration
taskRegistrations *consul.ServiceRegistrations
}

// event takes the deadline time for the allocation to be healthy and the update
Expand Down
34 changes: 34 additions & 0 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
cstate "github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/vaultclient"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/device"
Expand Down Expand Up @@ -1001,6 +1002,39 @@ func (ar *allocRunner) RestartTask(taskName string, taskEvent *structs.TaskEvent
return tr.Restart(context.TODO(), taskEvent, false)
}

// Restart satisfies the WorkloadRestarter interface restarts all task runners
// concurrently
func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
waitCh := make(chan struct{})
var err *multierror.Error
var errMutex sync.Mutex

go func() {
var wg sync.WaitGroup
defer close(waitCh)
for tn, tr := range ar.tasks {
wg.Add(1)
go func(taskName string, r agentconsul.WorkloadRestarter) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviews: This might be overkill but I wanted restarts to be as quick as possible especially when there are numerous tasks in the group. I'd like to hear your thoughts.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this replace the RestartAll method below?

Also, what happens when we're in the middle of this Restart but we get a call to update or kill? The alloc runner hooks get their actions serialized by a lock but allocRunner has a lot of "make sure you call this after/before this other method". (Although this is probably out of scope for this PR because that's the existing condition of things here.)

defer wg.Done()
e := r.Restart(ctx, event, failure)
if e != nil {
errMutex.Lock()
defer errMutex.Unlock()
err = multierror.Append(err, fmt.Errorf("failed to restart task %s: %v", taskName, e))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do this pattern elsewhere. Might be nice to create an errgroup like package on top of multierror that runs all goroutines and returns all errors, unlike errgroup which only returns the first error and cancels the other goroutines.

Not a blocker and would make a nice followup PR if you want to chase it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a PR to multierror for this pattern. Will open up a followup Nomad PR to make use of it once I get it merged. hashicorp/go-multierror#28

}
}(tn, tr)
}
wg.Wait()
}()

select {
case <-waitCh:
case <-ctx.Done():
}

return err.ErrorOrNil()
}

// RestartAll signalls all task runners in the allocation to restart and passes
// a copy of the task event to each restart event.
// Returns any errors in a concatenated form.
Expand Down
9 changes: 8 additions & 1 deletion client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
clientconfig "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)
Expand Down Expand Up @@ -125,7 +126,13 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir),
newAllocHealthWatcherHook(hookLogger, alloc, hs, ar.Listener(), ar.consulClient),
newNetworkHook(hookLogger, ns, alloc, nm, nc),
newGroupServiceHook(hookLogger, alloc, ar.consulClient),
newGroupServiceHook(groupServiceHookConfig{
alloc: alloc,
consul: ar.consulClient,
restarter: ar,
taskEnvBuilder: taskenv.NewBuilder(config.Node, ar.Alloc(), nil, config.Region).SetAllocDir(ar.allocDir.AllocDir),
logger: hookLogger,
}),
newConsulSockHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig),
}

Expand Down
4 changes: 2 additions & 2 deletions client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ func TestAllocRunner_DeploymentHealth_Unhealthy_Checks(t *testing.T) {
consulClient := conf.Consul.(*cconsul.MockConsulServiceClient)
consulClient.AllocRegistrationsFn = func(allocID string) (*consul.AllocRegistration, error) {
return &consul.AllocRegistration{
Tasks: map[string]*consul.TaskRegistration{
Tasks: map[string]*consul.ServiceRegistrations{
task.Name: {
Services: map[string]*consul.ServiceRegistration{
"123": {
Expand Down Expand Up @@ -847,7 +847,7 @@ func TestAllocRunner_TaskFailed_KillTG(t *testing.T) {
consulClient := conf.Consul.(*cconsul.MockConsulServiceClient)
consulClient.AllocRegistrationsFn = func(allocID string) (*consul.AllocRegistration, error) {
return &consul.AllocRegistration{
Tasks: map[string]*consul.TaskRegistration{
Tasks: map[string]*consul.ServiceRegistrations{
task.Name: {
Services: map[string]*consul.ServiceRegistration{
"123": {
Expand Down
13 changes: 9 additions & 4 deletions client/allocrunner/alloc_runner_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ func TestAllocRunner_Restore_RunningTerminal(t *testing.T) {
// 5. Assert task and logmon are cleaned up

alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Services = []*structs.Service{
{
Name: "foo",
PortLabel: "8888",
},
}
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
Expand Down Expand Up @@ -117,13 +123,12 @@ func TestAllocRunner_Restore_RunningTerminal(t *testing.T) {
// 2 removals (canary+noncanary) during prekill
// 2 removals (canary+noncanary) during exited
// 2 removals (canary+noncanary) during stop
// 1 remove group during stop
// 2 removals (canary+noncanary) group during stop
consulOps := conf2.Consul.(*consul.MockConsulServiceClient).GetOps()
require.Len(t, consulOps, 7)
for _, op := range consulOps[:6] {
require.Len(t, consulOps, 8)
for _, op := range consulOps {
require.Equal(t, "remove", op.Op)
}
require.Equal(t, "remove_group", consulOps[6].Op)

// Assert terminated task event was emitted
events := ar2.AllocState().TaskStates[task.Name].Events
Expand Down
128 changes: 116 additions & 12 deletions client/allocrunner/groupservice_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,63 @@ package allocrunner
import (
"sync"

hclog "github.com/hashicorp/go-hclog"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/taskenv"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)

// groupServiceHook manages task group Consul service registration and
// deregistration.
type groupServiceHook struct {
alloc *structs.Allocation
allocID string
group string
restarter agentconsul.WorkloadRestarter
consulClient consul.ConsulServiceAPI
prerun bool
mu sync.Mutex

logger log.Logger

// The following fields may be updated
canary bool
services []*structs.Service
networks structs.Networks
taskEnvBuilder *taskenv.Builder

// Since Update() may be called concurrently with any other hook all
// hook methods must be fully serialized
mu sync.Mutex
}

type groupServiceHookConfig struct {
alloc *structs.Allocation
consul consul.ConsulServiceAPI
restarter agentconsul.WorkloadRestarter
taskEnvBuilder *taskenv.Builder
logger log.Logger
}

func newGroupServiceHook(logger hclog.Logger, alloc *structs.Allocation, consulClient consul.ConsulServiceAPI) *groupServiceHook {
func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook {
h := &groupServiceHook{
alloc: alloc,
consulClient: consulClient,
allocID: cfg.alloc.ID,
group: cfg.alloc.TaskGroup,
restarter: cfg.restarter,
consulClient: cfg.consul,
taskEnvBuilder: cfg.taskEnvBuilder,
}
h.logger = cfg.logger.Named(h.Name())
h.services = cfg.alloc.Job.LookupTaskGroup(h.group).Services

if cfg.alloc.AllocatedResources != nil {
h.networks = cfg.alloc.AllocatedResources.Shared.Networks
}

if cfg.alloc.DeploymentStatus != nil {
h.canary = cfg.alloc.DeploymentStatus.Canary
}
h.logger = logger.Named(h.Name())
return h
}

Expand All @@ -41,26 +74,97 @@ func (h *groupServiceHook) Prerun() error {
h.prerun = true
h.mu.Unlock()
}()
return h.consulClient.RegisterGroup(h.alloc)

if len(h.services) == 0 {
return nil
}

services := h.getWorkloadServices()
return h.consulClient.RegisterWorkload(services)
}

func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
h.mu.Lock()
defer h.mu.Unlock()
oldAlloc := h.alloc
h.alloc = req.Alloc
oldWorkloadServices := h.getWorkloadServices()

// Store new updated values out of request
canary := false
if req.Alloc.DeploymentStatus != nil {
canary = req.Alloc.DeploymentStatus.Canary
}

var networks structs.Networks
if req.Alloc.AllocatedResources != nil {
networks = req.Alloc.AllocatedResources.Shared.Networks
}

// Update group service hook fields
h.networks = networks
h.services = req.Alloc.Job.LookupTaskGroup(h.group).Services
h.canary = canary
h.taskEnvBuilder.UpdateTask(req.Alloc, nil)

// Create new task services struct with those new values
newWorkloadServices := h.getWorkloadServices()

if !h.prerun {
// Update called before Prerun. Update alloc and exit to allow
// Prerun to do initial registration.
return nil
}

return h.consulClient.UpdateGroup(oldAlloc, h.alloc)
return h.consulClient.UpdateWorkload(oldWorkloadServices, newWorkloadServices)
}

func (h *groupServiceHook) Postrun() error {
h.mu.Lock()
defer h.mu.Unlock()
return h.consulClient.RemoveGroup(h.alloc)
h.deregister()
return nil
}

func (h *groupServiceHook) driverNet() *drivers.DriverNetwork {
if len(h.networks) == 0 {
return nil
}

//TODO(schmichael) only support one network for now
net := h.networks[0]
//TODO(schmichael) there's probably a better way than hacking driver network
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I eagerly await the day these comments are in red. 😁

return &drivers.DriverNetwork{
AutoAdvertise: true,
IP: net.IP,
// Copy PortLabels from group network
PortMap: net.PortLabels(),
}
}

// deregister services from Consul.
func (h *groupServiceHook) deregister() {
if len(h.services) > 0 {
workloadServices := h.getWorkloadServices()
h.consulClient.RemoveWorkload(workloadServices)

// Canary flag may be getting flipped when the alloc is being
// destroyed, so remove both variations of the service
workloadServices.Canary = !workloadServices.Canary
h.consulClient.RemoveWorkload(workloadServices)
}
}

func (h *groupServiceHook) getWorkloadServices() *agentconsul.WorkloadServices {
nickethier marked this conversation as resolved.
Show resolved Hide resolved
// Interpolate with the task's environment
interpolatedServices := taskenv.InterpolateServices(h.taskEnvBuilder.Build(), h.services)

// Create task services struct with request's driver metadata
return &agentconsul.WorkloadServices{
AllocID: h.allocID,
Group: h.group,
Restarter: h.restarter,
Services: interpolatedServices,
DriverNetwork: h.driverNet(),
Networks: h.networks,
Canary: h.canary,
}
}
Loading