diff --git a/.changelog/22228.txt b/.changelog/22228.txt new file mode 100644 index 00000000000..ce357c09f47 --- /dev/null +++ b/.changelog/22228.txt @@ -0,0 +1,7 @@ +```release-note:bug +consul: Fixed a bug where gateway config entries were written to the Nomad server agent's Consul partition and not the client's partition +``` + +```release-note:bug +consul: (Enterprise) Fixed a bug where gateway config entries were written before Sentinel policies were enforced +``` diff --git a/command/agent/consul/config_entries_testing.go b/command/agent/consul/config_entries_testing.go index 9bbd496abb9..e846566fa02 100644 --- a/command/agent/consul/config_entries_testing.go +++ b/command/agent/consul/config_entries_testing.go @@ -8,6 +8,7 @@ import ( "github.com/hashicorp/consul/api" "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/lib/lang" ) var _ ConfigAPI = (*MockConfigsAPI)(nil) @@ -18,7 +19,7 @@ type MockConfigsAPI struct { lock sync.Mutex state struct { error error - entries map[string]api.ConfigEntry + entries map[string]lang.Pair[api.ConfigEntry, *api.WriteOptions] } } @@ -27,8 +28,8 @@ func NewMockConfigsAPI(l hclog.Logger) *MockConfigsAPI { logger: l.Named("mock_consul"), state: struct { error error - entries map[string]api.ConfigEntry - }{entries: make(map[string]api.ConfigEntry)}, + entries map[string]lang.Pair[api.ConfigEntry, *api.WriteOptions] + }{entries: make(map[string]lang.Pair[api.ConfigEntry, *api.WriteOptions])}, } } @@ -41,7 +42,7 @@ func (m *MockConfigsAPI) Set(entry api.ConfigEntry, w *api.WriteOptions) (bool, return false, nil, m.state.error } - m.state.entries[entry.GetName()] = entry + m.state.entries[entry.GetName()] = lang.Pair[api.ConfigEntry, *api.WriteOptions]{First: entry, Second: w} return true, &api.WriteMeta{ RequestTime: 1, @@ -56,3 +57,11 @@ func (m *MockConfigsAPI) SetError(err error) { m.state.error = err } + +// GetEntry is a helper method so that test can verify what's been written +func (m *MockConfigsAPI) GetEntry(kind string) (api.ConfigEntry, *api.WriteOptions) { + m.lock.Lock() + defer m.lock.Unlock() + entry := m.state.entries[kind] + return entry.First, entry.Second +} diff --git a/nomad/consul.go b/nomad/consul.go index 2a56a509933..19a8eeed0da 100644 --- a/nomad/consul.go +++ b/nomad/consul.go @@ -510,11 +510,11 @@ func (s *Server) purgeSITokenAccessors(accessors []*structs.SITokenAccessor) err type ConsulConfigsAPI interface { // SetIngressCE adds the given ConfigEntry to Consul, overwriting // the previous entry if set. - SetIngressCE(ctx context.Context, namespace, service, cluster string, entry *structs.ConsulIngressConfigEntry) error + SetIngressCE(ctx context.Context, namespace, service, cluster, partition string, entry *structs.ConsulIngressConfigEntry) error // SetTerminatingCE adds the given ConfigEntry to Consul, overwriting // the previous entry if set. - SetTerminatingCE(ctx context.Context, namespace, service, cluster string, entry *structs.ConsulTerminatingConfigEntry) error + SetTerminatingCE(ctx context.Context, namespace, service, cluster, partition string, entry *structs.ConsulTerminatingConfigEntry) error // Stop is used to stop additional creations of Configuration Entries. Intended to // be used on Nomad Server shutdown. @@ -552,16 +552,16 @@ func (c *consulConfigsAPI) Stop() { c.stopped = true } -func (c *consulConfigsAPI) SetIngressCE(ctx context.Context, namespace, service, cluster string, entry *structs.ConsulIngressConfigEntry) error { - return c.setCE(ctx, convertIngressCE(namespace, service, entry), cluster) +func (c *consulConfigsAPI) SetIngressCE(ctx context.Context, namespace, service, cluster, partition string, entry *structs.ConsulIngressConfigEntry) error { + return c.setCE(ctx, convertIngressCE(namespace, service, entry), cluster, partition) } -func (c *consulConfigsAPI) SetTerminatingCE(ctx context.Context, namespace, service, cluster string, entry *structs.ConsulTerminatingConfigEntry) error { - return c.setCE(ctx, convertTerminatingCE(namespace, service, entry), cluster) +func (c *consulConfigsAPI) SetTerminatingCE(ctx context.Context, namespace, service, cluster, partition string, entry *structs.ConsulTerminatingConfigEntry) error { + return c.setCE(ctx, convertTerminatingCE(namespace, service, entry), cluster, partition) } // setCE will set the Configuration Entry of any type Consul supports. -func (c *consulConfigsAPI) setCE(ctx context.Context, entry api.ConfigEntry, cluster string) error { +func (c *consulConfigsAPI) setCE(ctx context.Context, entry api.ConfigEntry, cluster, partition string) error { defer metrics.MeasureSince([]string{"nomad", "consul", "create_config_entry"}, time.Now()) // make sure the background deletion goroutine has not been stopped @@ -579,7 +579,10 @@ func (c *consulConfigsAPI) setCE(ctx context.Context, entry api.ConfigEntry, clu } client := c.configsClientFunc(cluster) - _, _, err := client.Set(entry, &api.WriteOptions{Namespace: entry.GetNamespace()}) + _, _, err := client.Set(entry, &api.WriteOptions{ + Namespace: entry.GetNamespace(), + Partition: partition, + }) return err } diff --git a/nomad/consul_test.go b/nomad/consul_test.go index 293fb68e463..5ce2e04f654 100644 --- a/nomad/consul_test.go +++ b/nomad/consul_test.go @@ -10,12 +10,14 @@ import ( "testing" "time" + "github.com/hashicorp/consul/api" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" "golang.org/x/time/rate" ) @@ -27,20 +29,30 @@ var _ ConsulConfigsAPI = (*consulConfigsAPI)(nil) func TestConsulConfigsAPI_SetCE(t *testing.T) { ci.Parallel(t) - try := func(t *testing.T, expect error, f func(ConsulConfigsAPI) error) { + try := func(t *testing.T, + expectErr error, + expectKey string, + expectConfig api.ConfigEntry, + expectWriteOpts *api.WriteOptions, + f func(ConsulConfigsAPI) error) { + logger := testlog.HCLogger(t) configsAPI := consul.NewMockConfigsAPI(logger) - configsAPI.SetError(expect) + configsAPI.SetError(expectErr) configsAPIFunc := func(_ string) consul.ConfigAPI { return configsAPI } c := NewConsulConfigsAPI(configsAPIFunc, logger) err := f(c) // set the config entry - switch expect { + entry, wo := configsAPI.GetEntry(expectKey) + must.Eq(t, expectConfig, entry) + must.Eq(t, expectWriteOpts, wo) + + switch expectErr { case nil: - require.NoError(t, err) + must.NoError(t, err) default: - require.Equal(t, expect, err) + must.EqError(t, err, expectErr.Error()) } } @@ -48,35 +60,46 @@ func TestConsulConfigsAPI_SetCE(t *testing.T) { // existing behavior is no set namespace consulNamespace := "" + partition := "foo" ingressCE := new(structs.ConsulIngressConfigEntry) t.Run("ingress ok", func(t *testing.T) { - try(t, nil, func(c ConsulConfigsAPI) error { - return c.SetIngressCE( - ctx, consulNamespace, "ig", structs.ConsulDefaultCluster, ingressCE) - }) + try(t, nil, "ig", + &api.IngressGatewayConfigEntry{Kind: "ingress-gateway", Name: "ig"}, + &api.WriteOptions{Partition: partition}, + func(c ConsulConfigsAPI) error { + return c.SetIngressCE( + ctx, consulNamespace, "ig", structs.ConsulDefaultCluster, partition, ingressCE) + }) }) t.Run("ingress fail", func(t *testing.T) { - try(t, errors.New("consul broke"), func(c ConsulConfigsAPI) error { - return c.SetIngressCE( - ctx, consulNamespace, "ig", structs.ConsulDefaultCluster, ingressCE) - }) + try(t, errors.New("consul broke"), + "ig", nil, nil, + func(c ConsulConfigsAPI) error { + return c.SetIngressCE( + ctx, consulNamespace, "ig", structs.ConsulDefaultCluster, partition, ingressCE) + }) }) terminatingCE := new(structs.ConsulTerminatingConfigEntry) t.Run("terminating ok", func(t *testing.T) { - try(t, nil, func(c ConsulConfigsAPI) error { - return c.SetTerminatingCE( - ctx, consulNamespace, "tg", structs.ConsulDefaultCluster, terminatingCE) - }) + try(t, nil, "tg", + &api.TerminatingGatewayConfigEntry{Kind: "terminating-gateway", Name: "tg"}, + &api.WriteOptions{Partition: partition}, + func(c ConsulConfigsAPI) error { + return c.SetTerminatingCE( + ctx, consulNamespace, "tg", structs.ConsulDefaultCluster, partition, terminatingCE) + }) }) t.Run("terminating fail", func(t *testing.T) { - try(t, errors.New("consul broke"), func(c ConsulConfigsAPI) error { - return c.SetTerminatingCE( - ctx, consulNamespace, "tg", structs.ConsulDefaultCluster, terminatingCE) - }) + try(t, errors.New("consul broke"), + "tg", nil, nil, + func(c ConsulConfigsAPI) error { + return c.SetTerminatingCE( + ctx, consulNamespace, "tg", structs.ConsulDefaultCluster, partition, terminatingCE) + }) }) // also mesh diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index c876dfaeb3e..c3c1dc63947 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -267,6 +267,23 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis return err } + // Enforce Sentinel policies. Pass a copy of the job to prevent + // sentinel from altering it. + ns, err := snap.NamespaceByName(nil, args.RequestNamespace()) + if err != nil { + return err + } + + policyWarnings, err := j.enforceSubmitJob(args.PolicyOverride, args.Job.Copy(), + existingJob, args.GetIdentity().GetACLToken(), ns) + if err != nil { + return err + } + if policyWarnings != nil { + warnings = append(warnings, policyWarnings) + reply.Warnings = helper.MergeMultierrorWarnings(warnings...) + } + // Create or Update Consul Configuration Entries defined in the job. For now // Nomad only supports Configuration Entries types // - "ingress-gateway" for managing Ingress Gateways @@ -281,34 +298,19 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis for ns, entries := range args.Job.ConfigEntries() { for service, entry := range entries.Ingress { - if errCE := j.srv.consulConfigEntries.SetIngressCE(ctx, ns, service, entries.Cluster, entry); errCE != nil { + if errCE := j.srv.consulConfigEntries.SetIngressCE( + ctx, ns, service, entries.Cluster, entries.Partition, entry); errCE != nil { return errCE } } for service, entry := range entries.Terminating { - if errCE := j.srv.consulConfigEntries.SetTerminatingCE(ctx, ns, service, entries.Cluster, entry); errCE != nil { + if errCE := j.srv.consulConfigEntries.SetTerminatingCE( + ctx, ns, service, entries.Cluster, entries.Partition, entry); errCE != nil { return errCE } } } - // Enforce Sentinel policies. Pass a copy of the job to prevent - // sentinel from altering it. - ns, err := snap.NamespaceByName(nil, args.RequestNamespace()) - if err != nil { - return err - } - - policyWarnings, err := j.enforceSubmitJob(args.PolicyOverride, args.Job.Copy(), - existingJob, args.GetIdentity().GetACLToken(), ns) - if err != nil { - return err - } - if policyWarnings != nil { - warnings = append(warnings, policyWarnings) - reply.Warnings = helper.MergeMultierrorWarnings(warnings...) - } - // Clear the Vault token args.Job.VaultToken = "" diff --git a/nomad/structs/connect.go b/nomad/structs/connect.go index 34e76875e15..21bf3b7955c 100644 --- a/nomad/structs/connect.go +++ b/nomad/structs/connect.go @@ -17,6 +17,7 @@ import ( // a single Consul namespace. type ConsulConfigEntries struct { Cluster string + Partition string Ingress map[string]*ConsulIngressConfigEntry Terminating map[string]*ConsulTerminatingConfigEntry } @@ -43,9 +44,15 @@ func (j *Job) ConfigEntries() map[string]*ConsulConfigEntries { if ig := gateway.Ingress; ig != nil { collection[ns].Ingress[service.Name] = ig collection[ns].Cluster = service.Cluster + if tg.Consul != nil { + collection[ns].Partition = tg.Consul.Partition + } } else if term := gateway.Terminating; term != nil { collection[ns].Terminating[service.Name] = term collection[ns].Cluster = service.Cluster + if tg.Consul != nil { + collection[ns].Partition = tg.Consul.Partition + } } } }