Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: hookup service wrapper for use in clients #12329

Merged
merged 2 commits into from
Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"sync"
"time"

"github.com/hashicorp/nomad/client/lib/cgutil"

log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocdir"
Expand All @@ -20,9 +18,11 @@ import (
"github.com/hashicorp/nomad/client/devicemanager"
"github.com/hashicorp/nomad/client/dynamicplugins"
cinterfaces "github.com/hashicorp/nomad/client/interfaces"
"github.com/hashicorp/nomad/client/lib/cgutil"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/client/serviceregistration/wrapper"
cstate "github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/vaultclient"
Expand Down Expand Up @@ -178,6 +178,10 @@ type allocRunner struct {
// rpcClient is the RPC Client that should be used by the allocrunner and its
// hooks to communicate with Nomad Servers.
rpcClient RPCer

// serviceRegWrapper is the handler wrapper that is used by service hooks
// to perform service and check registration and deregistration.
serviceRegWrapper *wrapper.HandlerWrapper
}

// RPCer is the interface needed by hooks to make RPC calls.
Expand Down Expand Up @@ -221,6 +225,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
driverManager: config.DriverManager,
serversContactedCh: config.ServersContactedCh,
rpcClient: config.RPCClient,
serviceRegWrapper: config.ServiceRegWrapper,
}

// Create the logger based on the allocation ID
Expand Down Expand Up @@ -274,6 +279,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error {
ServersContactedCh: ar.serversContactedCh,
StartConditionMetCtx: ar.taskHookCoordinator.startConditionForTask(task),
ShutdownDelayCtx: ar.shutdownDelayCtx,
ServiceRegWrapper: ar.serviceRegWrapper,
}

if ar.cpusetManager != nil {
Expand Down
4 changes: 2 additions & 2 deletions client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
newNetworkHook(hookLogger, ns, alloc, nm, nc, ar, builtTaskEnv),
newGroupServiceHook(groupServiceHookConfig{
alloc: alloc,
consul: ar.consulClient,
consulNamespace: alloc.ConsulNamespace(),
namespace: alloc.ServiceProviderNamespace(),
serviceRegWrapper: ar.serviceRegWrapper,
restarter: ar,
taskEnvBuilder: envBuilder,
networkStatusGetter: ar,
Expand Down
4 changes: 3 additions & 1 deletion client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,8 @@ func TestAllocRunner_TaskGroup_ShutdownDelay(t *testing.T) {
tg := alloc.Job.TaskGroups[0]
tg.Services = []*structs.Service{
{
Name: "shutdown_service",
Name: "shutdown_service",
Provider: structs.ServiceProviderConsul,
},
}

Expand Down Expand Up @@ -1314,6 +1315,7 @@ func TestAllocRunner_TaskFailed_KillTG(t *testing.T) {
{
Name: "fakservice",
PortLabel: "http",
Provider: structs.ServiceProviderConsul,
Checks: []*structs.ServiceCheck{
{
Name: "fakecheck",
Expand Down
1 change: 1 addition & 0 deletions client/allocrunner/alloc_runner_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func TestAllocRunner_Restore_RunningTerminal(t *testing.T) {
{
Name: "foo",
PortLabel: "8888",
Provider: structs.ServiceProviderConsul,
},
}
task := alloc.Job.TaskGroups[0].Tasks[0]
Expand Down
5 changes: 5 additions & 0 deletions client/allocrunner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/client/serviceregistration/wrapper"
cstate "github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -81,4 +82,8 @@ type Config struct {
// RPCClient is the RPC Client that should be used by the allocrunner and its
// hooks to communicate with Nomad Servers.
RPCClient RPCer

// ServiceRegWrapper is the handler wrapper that is used by service hooks
// to perform service and check registration and deregistration.
ServiceRegWrapper *wrapper.HandlerWrapper
}
36 changes: 26 additions & 10 deletions client/allocrunner/groupservice_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/client/serviceregistration/wrapper"
"github.com/hashicorp/nomad/client/taskenv"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
Expand All @@ -25,15 +26,22 @@ type networkStatusGetter interface {
// deregistration.
type groupServiceHook struct {
allocID string
jobID string
group string
restarter agentconsul.WorkloadRestarter
consulClient serviceregistration.Handler
consulNamespace string
prerun bool
deregistered bool
networkStatusGetter networkStatusGetter
shutdownDelayCtx context.Context

// namespace is the Nomad or Consul namespace in which service
// registrations will be made.
namespace string

// serviceRegWrapper is the handler wrapper that is used to perform service
// and check registration and deregistration.
serviceRegWrapper *wrapper.HandlerWrapper

logger log.Logger

// The following fields may be updated
Expand All @@ -51,13 +59,19 @@ type groupServiceHook struct {

type groupServiceHookConfig struct {
alloc *structs.Allocation
consul serviceregistration.Handler
consulNamespace string
restarter agentconsul.WorkloadRestarter
taskEnvBuilder *taskenv.Builder
networkStatusGetter networkStatusGetter
shutdownDelayCtx context.Context
logger log.Logger

// namespace is the Nomad or Consul namespace in which service
// registrations will be made.
namespace string

// serviceRegWrapper is the handler wrapper that is used to perform service
// and check registration and deregistration.
serviceRegWrapper *wrapper.HandlerWrapper
}

func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook {
Expand All @@ -70,15 +84,16 @@ func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook {

h := &groupServiceHook{
allocID: cfg.alloc.ID,
jobID: cfg.alloc.JobID,
group: cfg.alloc.TaskGroup,
restarter: cfg.restarter,
consulClient: cfg.consul,
consulNamespace: cfg.consulNamespace,
namespace: cfg.namespace,
taskEnvBuilder: cfg.taskEnvBuilder,
delay: shutdownDelay,
networkStatusGetter: cfg.networkStatusGetter,
logger: cfg.logger.Named(groupServiceHookName),
services: cfg.alloc.Job.LookupTaskGroup(cfg.alloc.TaskGroup).Services,
serviceRegWrapper: cfg.serviceRegWrapper,
shutdownDelayCtx: cfg.shutdownDelayCtx,
}

Expand Down Expand Up @@ -114,7 +129,7 @@ func (h *groupServiceHook) prerunLocked() error {
}

services := h.getWorkloadServices()
return h.consulClient.RegisterWorkload(services)
return h.serviceRegWrapper.RegisterWorkload(services)
}

func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
Expand Down Expand Up @@ -157,7 +172,7 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
return nil
}

return h.consulClient.UpdateWorkload(oldWorkloadServices, newWorkloadServices)
return h.serviceRegWrapper.UpdateWorkload(oldWorkloadServices, newWorkloadServices)
}

func (h *groupServiceHook) PreTaskRestart() error {
Expand Down Expand Up @@ -213,7 +228,7 @@ func (h *groupServiceHook) Postrun() error {
func (h *groupServiceHook) deregister() {
if len(h.services) > 0 {
workloadServices := h.getWorkloadServices()
h.consulClient.RemoveWorkload(workloadServices)
h.serviceRegWrapper.RemoveWorkload(workloadServices)
}
}

Expand All @@ -229,8 +244,9 @@ func (h *groupServiceHook) getWorkloadServices() *serviceregistration.WorkloadSe
// Create task services struct with request's driver metadata
return &serviceregistration.WorkloadServices{
AllocID: h.allocID,
JobID: h.jobID,
Group: h.group,
Namespace: h.consulNamespace,
Namespace: h.namespace,
Restarter: h.restarter,
Services: interpolatedServices,
Networks: h.networks,
Expand Down
Loading