diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 86c8ff37ad6..63b1c0071b2 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -113,6 +113,7 @@ func (tr *TaskRunner) initHooks() { clientConfig: tr.clientConfig, envBuilder: tr.envBuilder, consulNamespace: consulNamespace, + nomadNamespace: tr.alloc.Job.Namespace, })) } diff --git a/client/allocrunner/taskrunner/template/template.go b/client/allocrunner/taskrunner/template/template.go index f37d76f7be8..d6c71819662 100644 --- a/client/allocrunner/taskrunner/template/template.go +++ b/client/allocrunner/taskrunner/template/template.go @@ -102,6 +102,9 @@ type TaskTemplateManagerConfig struct { // MaxTemplateEventRate is the maximum rate at which we should emit events. MaxTemplateEventRate time.Duration + + // NomadNamespace is the Nomad namespace for the task + NomadNamespace string } // Validate validates the configuration. @@ -807,6 +810,13 @@ func newRunnerConfig(config *TaskTemplateManagerConfig, } } + // Set up Nomad + conf.Nomad.Namespace = &config.NomadNamespace + conf.Nomad.Transport.CustomDialer = cc.TemplateDialer + + // Use the Node's SecretID to authenticate Nomad template function calls. + conf.Nomad.Token = &cc.Node.SecretID + conf.Finalize() return conf, nil } diff --git a/client/allocrunner/taskrunner/template/template_test.go b/client/allocrunner/taskrunner/template/template_test.go index 71ccbd5da97..2bc0d5d98ec 100644 --- a/client/allocrunner/taskrunner/template/template_test.go +++ b/client/allocrunner/taskrunner/template/template_test.go @@ -124,28 +124,32 @@ func (m *MockTaskHooks) SetState(state string, event *structs.TaskEvent) {} // testHarness is used to test the TaskTemplateManager by spinning up // Consul/Vault as needed type testHarness struct { - manager *TaskTemplateManager - mockHooks *MockTaskHooks - templates []*structs.Template - envBuilder *taskenv.Builder - node *structs.Node - config *config.Config - vaultToken string - taskDir string - vault *testutil.TestVault - consul *ctestutil.TestServer - emitRate time.Duration + manager *TaskTemplateManager + mockHooks *MockTaskHooks + templates []*structs.Template + envBuilder *taskenv.Builder + node *structs.Node + config *config.Config + vaultToken string + taskDir string + vault *testutil.TestVault + consul *ctestutil.TestServer + emitRate time.Duration + nomadNamespace string } // newTestHarness returns a harness starting a dev consul and vault server, // building the appropriate config and creating a TaskTemplateManager func newTestHarness(t *testing.T, templates []*structs.Template, consul, vault bool) *testHarness { region := "global" + mockNode := mock.Node() + harness := &testHarness{ mockHooks: NewMockTaskHooks(), templates: templates, - node: mock.Node(), + node: mockNode, config: &config.Config{ + Node: mockNode, Region: region, TemplateConfig: &config.ClientTemplateConfig{ FunctionDenylist: config.DefaultTemplateFunctionDenylist, @@ -160,6 +164,7 @@ func newTestHarness(t *testing.T, templates []*structs.Template, consul, vault b task := a.Job.TaskGroups[0].Tasks[0] task.Name = TestTaskName harness.envBuilder = taskenv.NewBuilder(harness.node, a, task, region) + harness.nomadNamespace = a.Namespace // Make a tempdir d, err := ioutil.TempDir("", "ct_test") @@ -1486,6 +1491,7 @@ OUTER: func TestTaskTemplateManager_Config_ServerName(t *testing.T) { ci.Parallel(t) c := config.DefaultConfig() + c.Node = mock.Node() c.VaultConfig = &sconfig.VaultConfig{ Enabled: helper.BoolToPtr(true), Addr: "https://localhost/", diff --git a/client/allocrunner/taskrunner/template_hook.go b/client/allocrunner/taskrunner/template_hook.go index 8b4fb1222a9..a5ad9f8fd88 100644 --- a/client/allocrunner/taskrunner/template_hook.go +++ b/client/allocrunner/taskrunner/template_hook.go @@ -39,6 +39,9 @@ type templateHookConfig struct { // consulNamespace is the current Consul namespace consulNamespace string + + // nomadNamespace is the job's Nomad namespace + nomadNamespace string } type templateHook struct { @@ -122,6 +125,7 @@ func (h *templateHook) newManager() (unblock chan struct{}, err error) { TaskDir: h.taskDir, EnvBuilder: h.config.envBuilder, MaxTemplateEventRate: template.DefaultMaxTemplateEventRate, + NomadNamespace: h.config.nomadNamespace, }) if err != nil { h.logger.Error("failed to create template manager", "error", err) diff --git a/client/config/config.go b/client/config/config.go index 25b42071632..c69ddfa1dd1 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -17,6 +17,7 @@ import ( log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/helper/bufconndialer" "github.com/hashicorp/nomad/helper/pluginutils/loader" "github.com/hashicorp/nomad/nomad/structs" structsc "github.com/hashicorp/nomad/nomad/structs/config" @@ -285,6 +286,10 @@ type Config struct { // NomadServiceDiscovery determines whether the Nomad native service // discovery client functionality is enabled. NomadServiceDiscovery bool + + // TemplateDialer is our custom HTTP dialer for consul-template. This is + // used for template functions which require access to the Nomad API. + TemplateDialer *bufconndialer.BufConnWrapper } // ClientTemplateConfig is configuration on the client specific to template diff --git a/command/agent/agent.go b/command/agent/agent.go index 9d2668d8842..89958e32a6b 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -24,6 +24,7 @@ import ( "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/command/agent/event" + "github.com/hashicorp/nomad/helper/bufconndialer" "github.com/hashicorp/nomad/helper/pluginutils/loader" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/lib/cpuset" @@ -107,6 +108,12 @@ type Agent struct { shutdownCh chan struct{} shutdownLock sync.Mutex + // builtinDialer dials the builtinListener. It is used for connecting + // consul-template to the HTTP API in process. In the event this agent is + // not running in client mode, these two fields will be nil. + builtinListener net.Listener + builtinDialer *bufconndialer.BufConnWrapper + InmemSink *metrics.InmemSink } @@ -906,6 +913,13 @@ func (a *Agent) setupClient() error { conf.StateDBFactory = state.GetStateDBFactory(conf.DevMode) } + // Set up a custom listener and dialer. This is used by Nomad clients when + // running consul-template functions that utilise the Nomad API. We lazy + // load this into the client config, therefore this needs to happen before + // we call NewClient. + a.builtinListener, a.builtinDialer = bufconndialer.New() + conf.TemplateDialer = a.builtinDialer + nomadClient, err := client.NewClient( conf, a.consulCatalog, a.consulProxies, a.consulService, nil) if err != nil { diff --git a/command/agent/command.go b/command/agent/command.go index ebbd9645314..8821bc815ef 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -517,6 +517,7 @@ func SetupLoggers(ui cli.Ui, config *Config) (*logutils.LevelFilter, *gatedwrite // setupAgent is used to start the agent and various interfaces func (c *Command) setupAgent(config *Config, logger hclog.InterceptLogger, logOutput io.Writer, inmem *metrics.InmemSink) error { c.Ui.Output("Starting Nomad agent...") + agent, err := NewAgent(config, logger, logOutput, inmem) if err != nil { // log the error as well, so it appears at the end diff --git a/command/agent/http.go b/command/agent/http.go index d3ca40c3e90..c3284764bbf 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -116,9 +116,6 @@ func NewHTTPServers(agent *Agent, config *Config) ([]*HTTPServer, error) { // Start the listener for _, addr := range config.normalizedAddrs.HTTP { - // Create the mux - mux := http.NewServeMux() - lnAddr, err := net.ResolveTCPAddr("tcp", addr) if err != nil { serverInitializationErrors = multierror.Append(serverInitializationErrors, err) @@ -143,7 +140,7 @@ func NewHTTPServers(agent *Agent, config *Config) ([]*HTTPServer, error) { // Create the server srv := &HTTPServer{ agent: agent, - mux: mux, + mux: http.NewServeMux(), listener: ln, listenerCh: make(chan struct{}), logger: agent.httpLogger, @@ -155,7 +152,7 @@ func NewHTTPServers(agent *Agent, config *Config) ([]*HTTPServer, error) { // Create HTTP server with timeouts httpServer := http.Server{ Addr: srv.Addr, - Handler: handlers.CompressHandler(mux), + Handler: handlers.CompressHandler(srv.mux), ConnState: makeConnState(config.TLSConfig.EnableHTTP, handshakeTimeout, maxConns), ErrorLog: newHTTPServerLogger(srv.logger), } @@ -168,6 +165,35 @@ func NewHTTPServers(agent *Agent, config *Config) ([]*HTTPServer, error) { srvs = append(srvs, srv) } + // This HTTP server is only create when running in client mode, otherwise + // the builtinDialer and builtinListener will be nil. + if agent.builtinDialer != nil && agent.builtinListener != nil { + srv := &HTTPServer{ + agent: agent, + mux: http.NewServeMux(), + listener: agent.builtinListener, + listenerCh: make(chan struct{}), + logger: agent.httpLogger, + Addr: "builtin", + wsUpgrader: wsUpgrader, + } + + srv.registerHandlers(config.EnableDebug) + + httpServer := http.Server{ + Addr: srv.Addr, + Handler: srv.mux, + ErrorLog: newHTTPServerLogger(srv.logger), + } + + go func() { + defer close(srv.listenerCh) + httpServer.Serve(agent.builtinListener) + }() + + srvs = append(srvs, srv) + } + if serverInitializationErrors != nil { for _, srv := range srvs { srv.Shutdown() diff --git a/e2e/consultemplate/consultemplate.go b/e2e/consultemplate/consultemplate.go index a700268fccf..7b1473c3258 100644 --- a/e2e/consultemplate/consultemplate.go +++ b/e2e/consultemplate/consultemplate.go @@ -22,6 +22,14 @@ type ConsulTemplateTest struct { framework.TC jobIDs []string consulKeys []string + + // namespaceIDs tracks the created namespace for removal after test + // completion. + namespaceIDs []string + + // namespacedJobIDs tracks any non-default namespaced jobs for removal + // after test completion. + namespacedJobIDs map[string][]string } func init() { @@ -30,7 +38,9 @@ func init() { CanRunLocal: true, Consul: true, Cases: []framework.TestCase{ - new(ConsulTemplateTest), + &ConsulTemplateTest{ + namespacedJobIDs: make(map[string][]string), + }, }, }) } @@ -57,6 +67,20 @@ func (tc *ConsulTemplateTest) AfterEach(f *framework.F) { } tc.consulKeys = []string{} + for namespace, jobIDs := range tc.namespacedJobIDs { + for _, jobID := range jobIDs { + err := e2eutil.StopJob(jobID, "-purge", "-namespace", namespace) + f.Assert().NoError(err) + } + } + tc.namespacedJobIDs = make(map[string][]string) + + for _, ns := range tc.namespaceIDs { + _, err := e2eutil.Command("nomad", "namespace", "delete", ns) + f.Assert().NoError(err) + } + tc.namespaceIDs = []string{} + _, err := e2eutil.Command("nomad", "system", "gc") f.NoError(err) } @@ -339,6 +363,116 @@ func (tc *ConsulTemplateTest) TestTemplatePathInterpolation_SharedAllocDir(f *fr } } +// TestConsulTemplate_NomadServiceLookups tests consul-templates Nomad service +// lookup functionality. It runs a job which registers two services, then +// another which performs both a list and read template function lookup against +// registered services. +func (tc *ConsulTemplateTest) TestConsulTemplate_NomadServiceLookups(f *framework.F) { + + // Set up our base job that will be used in various manners. + serviceJob, err := jobspec.ParseFile("consultemplate/input/nomad_provider_service.nomad") + f.NoError(err) + serviceJobID := "test-consul-template-nomad-lookups" + uuid.Generate()[0:8] + serviceJob.ID = &serviceJobID + + _, _, err = tc.Nomad().Jobs().Register(serviceJob, nil) + f.NoError(err) + tc.jobIDs = append(tc.jobIDs, serviceJobID) + f.NoError(e2eutil.WaitForAllocStatusExpected(serviceJobID, "default", []string{"running"}), "job should be running") + + // Pull the allocation ID for the job, we use this to ensure this is found + // in the rendered template later on. + serviceJobAllocs, err := e2eutil.AllocsForJob(serviceJobID, "default") + f.NoError(err) + f.Len(serviceJobAllocs, 1) + serviceAllocID := serviceJobAllocs[0]["ID"] + + // Create at non-default namespace. + _, err = e2eutil.Command("nomad", "namespace", "apply", "platform") + f.NoError(err) + tc.namespaceIDs = append(tc.namespaceIDs, "NamespaceA") + + // Register a job which includes services destined for the Nomad provider + // into the platform namespace. This is used to ensure consul-template + // lookups stay bound to the allocation namespace. + diffNamespaceServiceJobID := "test-consul-template-nomad-lookups" + uuid.Generate()[0:8] + f.NoError(e2eutil.Register(diffNamespaceServiceJobID, "consultemplate/input/nomad_provider_service_ns.nomad")) + tc.namespacedJobIDs["platform"] = append(tc.namespacedJobIDs["platform"], diffNamespaceServiceJobID) + f.NoError(e2eutil.WaitForAllocStatusExpected(diffNamespaceServiceJobID, "platform", []string{"running"}), "job should be running") + + // Register a job which includes consul-template function performing Nomad + // service listing and reads. + serviceLookupJobID := "test-consul-template-nomad-lookups" + uuid.Generate()[0:8] + f.NoError(e2eutil.Register(serviceLookupJobID, "consultemplate/input/nomad_provider_service_lookup.nomad")) + tc.jobIDs = append(tc.jobIDs, serviceLookupJobID) + f.NoError(e2eutil.WaitForAllocStatusExpected(serviceLookupJobID, "default", []string{"running"}), "job should be running") + + // Find the allocation ID for the job which contains templates, so we can + // perform filesystem actions. + serviceLookupJobAllocs, err := e2eutil.AllocsForJob(serviceLookupJobID, "default") + f.NoError(err) + f.Len(serviceLookupJobAllocs, 1) + serviceLookupAllocID := serviceLookupJobAllocs[0]["ID"] + + // Ensure the listing (nomadServices) template function has found all + // services within the default namespace. + err = waitForTaskFile(serviceLookupAllocID, "test", "${NOMAD_TASK_DIR}/services.conf", + func(out string) bool { + if !f.Assert().Contains(out, "service default-nomad-provider-service-primary [bar foo]") { + return false + } + if !f.Assert().Contains(out, "service default-nomad-provider-service-secondary [baz buz]") { + return false + } + if strings.Contains(out, "service platform-nomad-provider-service-secondary [baz buz]") { + return false + } + return true + }, nil) + f.NoError(err) + + // Ensure the direct service lookup has found the entry we expect. + err = waitForTaskFile(serviceLookupAllocID, "test", "${NOMAD_TASK_DIR}/service.conf", + func(out string) bool { + expected := fmt.Sprintf("service default-nomad-provider-service-primary [bar foo] dc1 %s", serviceAllocID) + return f.Assert().Contains(out, expected) + }, nil) + f.NoError(err) + + // Scale the default namespaced service job in order to change the expected + // number of entries. + count := 3 + serviceJob.TaskGroups[0].Count = &count + _, _, err = tc.Nomad().Jobs().Register(serviceJob, nil) + f.NoError(err) + + // Pull the allocation ID for the job, we use this to ensure this is found + // in the rendered template later on. + serviceJobAllocs, err = e2eutil.AllocsForJob(serviceJobID, "default") + f.NoError(err) + f.Len(serviceJobAllocs, 3) + + // Track the expected entries, including the allocID to make this test + // actually valuable. + var expectedEntries []string + for _, allocs := range serviceJobAllocs { + e := fmt.Sprintf("service default-nomad-provider-service-primary [bar foo] dc1 %s", allocs["ID"]) + expectedEntries = append(expectedEntries, e) + } + + // Ensure the direct service lookup has the new entries we expect. + err = waitForTaskFile(serviceLookupAllocID, "test", "${NOMAD_TASK_DIR}/service.conf", + func(out string) bool { + for _, entry := range expectedEntries { + if !f.Assert().Contains(out, entry) { + return false + } + } + return true + }, nil) + f.NoError(err) +} + func waitForTaskFile(allocID, task, path string, test func(out string) bool, wc *e2eutil.WaitConfig) error { var err error var out string diff --git a/e2e/consultemplate/input/nomad_provider_service.nomad b/e2e/consultemplate/input/nomad_provider_service.nomad new file mode 100644 index 00000000000..24be3a41fc4 --- /dev/null +++ b/e2e/consultemplate/input/nomad_provider_service.nomad @@ -0,0 +1,33 @@ +job "nomad_provider_service" { + datacenters = ["dc1"] + type = "service" + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + + group "nomad_provider_service" { + + service { + name = "${NOMAD_NAMESPACE}-nomad-provider-service-primary" + provider = "nomad" + tags = ["foo", "bar"] + } + + service { + name = "${NOMAD_NAMESPACE}-nomad-provider-service-secondary" + provider = "nomad" + tags = ["baz", "buz"] + } + + task "test" { + driver = "raw_exec" + + config { + command = "bash" + args = ["-c", "sleep 15000"] + } + } + } +} diff --git a/e2e/consultemplate/input/nomad_provider_service_lookup.nomad b/e2e/consultemplate/input/nomad_provider_service_lookup.nomad new file mode 100644 index 00000000000..72e874fe321 --- /dev/null +++ b/e2e/consultemplate/input/nomad_provider_service_lookup.nomad @@ -0,0 +1,41 @@ +job "nomad_provider_service_lookup" { + datacenters = ["dc1"] + type = "service" + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + + group "nomad_provider_service_lookup" { + + task "test" { + driver = "raw_exec" + + config { + command = "bash" + args = ["-c", "sleep 15000"] + } + + template { + data = <