Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: add Nomad template service functionality to runner. #12458

Merged
merged 4 commits into from
Apr 6, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions client/allocrunner/taskrunner/task_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (tr *TaskRunner) initHooks() {
clientConfig: tr.clientConfig,
envBuilder: tr.envBuilder,
consulNamespace: consulNamespace,
nomadNamespace: tr.alloc.Job.Namespace,
}))
}

Expand Down
8 changes: 8 additions & 0 deletions client/allocrunner/taskrunner/template/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ type TaskTemplateManagerConfig struct {

// MaxTemplateEventRate is the maximum rate at which we should emit events.
MaxTemplateEventRate time.Duration

// NomadNamespace is the Nomad namespace for the task
NomadNamespace string
}

// Validate validates the configuration.
Expand Down Expand Up @@ -807,6 +810,11 @@ func newRunnerConfig(config *TaskTemplateManagerConfig,
}
}

// Set up Nomad
conf.Nomad.Namespace = &config.NomadNamespace
conf.Nomad.Transport.CustomDialer = cc.TemplateDialer
conf.Nomad.Token = &cc.Node.SecretID
jrasell marked this conversation as resolved.
Show resolved Hide resolved

conf.Finalize()
return conf, nil
}
Expand Down
30 changes: 18 additions & 12 deletions client/allocrunner/taskrunner/template/template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,28 +124,32 @@ func (m *MockTaskHooks) SetState(state string, event *structs.TaskEvent) {}
// testHarness is used to test the TaskTemplateManager by spinning up
// Consul/Vault as needed
type testHarness struct {
manager *TaskTemplateManager
mockHooks *MockTaskHooks
templates []*structs.Template
envBuilder *taskenv.Builder
node *structs.Node
config *config.Config
vaultToken string
taskDir string
vault *testutil.TestVault
consul *ctestutil.TestServer
emitRate time.Duration
manager *TaskTemplateManager
mockHooks *MockTaskHooks
templates []*structs.Template
envBuilder *taskenv.Builder
node *structs.Node
config *config.Config
vaultToken string
taskDir string
vault *testutil.TestVault
consul *ctestutil.TestServer
emitRate time.Duration
nomadNamespace string
}

// newTestHarness returns a harness starting a dev consul and vault server,
// building the appropriate config and creating a TaskTemplateManager
func newTestHarness(t *testing.T, templates []*structs.Template, consul, vault bool) *testHarness {
region := "global"
mockNode := mock.Node()

harness := &testHarness{
mockHooks: NewMockTaskHooks(),
templates: templates,
node: mock.Node(),
node: mockNode,
config: &config.Config{
Node: mockNode,
Region: region,
TemplateConfig: &config.ClientTemplateConfig{
FunctionDenylist: config.DefaultTemplateFunctionDenylist,
Expand All @@ -160,6 +164,7 @@ func newTestHarness(t *testing.T, templates []*structs.Template, consul, vault b
task := a.Job.TaskGroups[0].Tasks[0]
task.Name = TestTaskName
harness.envBuilder = taskenv.NewBuilder(harness.node, a, task, region)
harness.nomadNamespace = a.Namespace

// Make a tempdir
d, err := ioutil.TempDir("", "ct_test")
Expand Down Expand Up @@ -1486,6 +1491,7 @@ OUTER:
func TestTaskTemplateManager_Config_ServerName(t *testing.T) {
ci.Parallel(t)
c := config.DefaultConfig()
c.Node = mock.Node()
c.VaultConfig = &sconfig.VaultConfig{
Enabled: helper.BoolToPtr(true),
Addr: "https://localhost/",
Expand Down
4 changes: 4 additions & 0 deletions client/allocrunner/taskrunner/template_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type templateHookConfig struct {

// consulNamespace is the current Consul namespace
consulNamespace string

// nomadNamespace is the job's Nomad namespace
nomadNamespace string
}

type templateHook struct {
Expand Down Expand Up @@ -122,6 +125,7 @@ func (h *templateHook) newManager() (unblock chan struct{}, err error) {
TaskDir: h.taskDir,
EnvBuilder: h.config.envBuilder,
MaxTemplateEventRate: template.DefaultMaxTemplateEventRate,
NomadNamespace: h.config.nomadNamespace,
})
if err != nil {
h.logger.Error("failed to create template manager", "error", err)
Expand Down
5 changes: 5 additions & 0 deletions client/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/bufconndialer"
"github.com/hashicorp/nomad/helper/pluginutils/loader"
"github.com/hashicorp/nomad/nomad/structs"
structsc "github.com/hashicorp/nomad/nomad/structs/config"
Expand Down Expand Up @@ -285,6 +286,10 @@ type Config struct {
// NomadServiceDiscovery determines whether the Nomad native service
// discovery client functionality is enabled.
NomadServiceDiscovery bool

// TemplateDialer is our custom HTTP dialer for consul-template. This is
// used for template functions which require access to the Nomad API.
TemplateDialer *bufconndialer.BufConnWrapper
}

// ClientTemplateConfig is configuration on the client specific to template
Expand Down
14 changes: 14 additions & 0 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/command/agent/event"
"github.com/hashicorp/nomad/helper/bufconndialer"
"github.com/hashicorp/nomad/helper/pluginutils/loader"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/lib/cpuset"
Expand Down Expand Up @@ -107,6 +108,12 @@ type Agent struct {
shutdownCh chan struct{}
shutdownLock sync.Mutex

// builtinDialer dials the builtinListener. It is used for connecting
// consul-template to the HTTP API in process. In the event this agent is
// not running in client mode, these two fields will be nil.
builtinListener net.Listener
builtinDialer *bufconndialer.BufConnWrapper

InmemSink *metrics.InmemSink
}

Expand Down Expand Up @@ -906,6 +913,13 @@ func (a *Agent) setupClient() error {
conf.StateDBFactory = state.GetStateDBFactory(conf.DevMode)
}

// Set up a custom listener and dialer. This is used by Nomad clients when
// running consul-template functions that utilise the Nomad API. We lazy
// load this into the client config, therefore this needs to happen before
// we call NewClient.
a.builtinListener, a.builtinDialer = bufconndialer.New()
conf.TemplateDialer = a.builtinDialer
Comment on lines +916 to +921
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great placement 👍


nomadClient, err := client.NewClient(
conf, a.consulCatalog, a.consulProxies, a.consulService, nil)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions command/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ func SetupLoggers(ui cli.Ui, config *Config) (*logutils.LevelFilter, *gatedwrite
// setupAgent is used to start the agent and various interfaces
func (c *Command) setupAgent(config *Config, logger hclog.InterceptLogger, logOutput io.Writer, inmem *metrics.InmemSink) error {
c.Ui.Output("Starting Nomad agent...")

agent, err := NewAgent(config, logger, logOutput, inmem)
if err != nil {
// log the error as well, so it appears at the end
Expand Down
38 changes: 33 additions & 5 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,6 @@ func NewHTTPServers(agent *Agent, config *Config) ([]*HTTPServer, error) {

// Start the listener
for _, addr := range config.normalizedAddrs.HTTP {
// Create the mux
mux := http.NewServeMux()

lnAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
serverInitializationErrors = multierror.Append(serverInitializationErrors, err)
Expand All @@ -143,7 +140,7 @@ func NewHTTPServers(agent *Agent, config *Config) ([]*HTTPServer, error) {
// Create the server
srv := &HTTPServer{
agent: agent,
mux: mux,
mux: http.NewServeMux(),
listener: ln,
listenerCh: make(chan struct{}),
logger: agent.httpLogger,
Expand All @@ -155,7 +152,7 @@ func NewHTTPServers(agent *Agent, config *Config) ([]*HTTPServer, error) {
// Create HTTP server with timeouts
httpServer := http.Server{
Addr: srv.Addr,
Handler: handlers.CompressHandler(mux),
Handler: handlers.CompressHandler(srv.mux),
ConnState: makeConnState(config.TLSConfig.EnableHTTP, handshakeTimeout, maxConns),
ErrorLog: newHTTPServerLogger(srv.logger),
}
Expand All @@ -168,6 +165,37 @@ func NewHTTPServers(agent *Agent, config *Config) ([]*HTTPServer, error) {
srvs = append(srvs, srv)
}

// This HTTP server is only create when running in client mode, otherwise
// the builtinDialer and builtinListener will be nil.
if agent.builtinDialer != nil && agent.builtinListener != nil {
srv := &HTTPServer{
agent: agent,
mux: http.NewServeMux(),
listener: agent.builtinListener,
listenerCh: make(chan struct{}),
logger: agent.httpLogger,
Addr: "builtin",
wsUpgrader: wsUpgrader,
}

// TODO(schmichael) only register services and inject auth token (or
// inject auth token into template config?)
jrasell marked this conversation as resolved.
Show resolved Hide resolved
srv.registerHandlers(config.EnableDebug)

httpServer := http.Server{
Addr: srv.Addr,
Handler: srv.mux,
ErrorLog: newHTTPServerLogger(srv.logger),
}

go func() {
defer close(srv.listenerCh)
httpServer.Serve(agent.builtinListener)
}()

srvs = append(srvs, srv)
}

if serverInitializationErrors != nil {
for _, srv := range srvs {
srv.Shutdown()
Expand Down
Loading