Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of consul: set partition for gateway config entries into release/1.8.x #22408

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changelog/22228.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:bug
consul: Fixed a bug where gateway config entries were written to the Nomad server agent's Consul partition and not the client's partition
```

```release-note:bug
consul: (Enterprise) Fixed a bug where gateway config entries were written before Sentinel policies were enforced
```
17 changes: 13 additions & 4 deletions command/agent/consul/config_entries_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/lib/lang"
)

var _ ConfigAPI = (*MockConfigsAPI)(nil)
Expand All @@ -18,7 +19,7 @@ type MockConfigsAPI struct {
lock sync.Mutex
state struct {
error error
entries map[string]api.ConfigEntry
entries map[string]lang.Pair[api.ConfigEntry, *api.WriteOptions]
}
}

Expand All @@ -27,8 +28,8 @@ func NewMockConfigsAPI(l hclog.Logger) *MockConfigsAPI {
logger: l.Named("mock_consul"),
state: struct {
error error
entries map[string]api.ConfigEntry
}{entries: make(map[string]api.ConfigEntry)},
entries map[string]lang.Pair[api.ConfigEntry, *api.WriteOptions]
}{entries: make(map[string]lang.Pair[api.ConfigEntry, *api.WriteOptions])},
}
}

Expand All @@ -41,7 +42,7 @@ func (m *MockConfigsAPI) Set(entry api.ConfigEntry, w *api.WriteOptions) (bool,
return false, nil, m.state.error
}

m.state.entries[entry.GetName()] = entry
m.state.entries[entry.GetName()] = lang.Pair[api.ConfigEntry, *api.WriteOptions]{First: entry, Second: w}

return true, &api.WriteMeta{
RequestTime: 1,
Expand All @@ -56,3 +57,11 @@ func (m *MockConfigsAPI) SetError(err error) {

m.state.error = err
}

// GetEntry is a helper method so that test can verify what's been written
func (m *MockConfigsAPI) GetEntry(kind string) (api.ConfigEntry, *api.WriteOptions) {
m.lock.Lock()
defer m.lock.Unlock()
entry := m.state.entries[kind]
return entry.First, entry.Second
}
19 changes: 11 additions & 8 deletions nomad/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,11 +510,11 @@ func (s *Server) purgeSITokenAccessors(accessors []*structs.SITokenAccessor) err
type ConsulConfigsAPI interface {
// SetIngressCE adds the given ConfigEntry to Consul, overwriting
// the previous entry if set.
SetIngressCE(ctx context.Context, namespace, service, cluster string, entry *structs.ConsulIngressConfigEntry) error
SetIngressCE(ctx context.Context, namespace, service, cluster, partition string, entry *structs.ConsulIngressConfigEntry) error

// SetTerminatingCE adds the given ConfigEntry to Consul, overwriting
// the previous entry if set.
SetTerminatingCE(ctx context.Context, namespace, service, cluster string, entry *structs.ConsulTerminatingConfigEntry) error
SetTerminatingCE(ctx context.Context, namespace, service, cluster, partition string, entry *structs.ConsulTerminatingConfigEntry) error

// Stop is used to stop additional creations of Configuration Entries. Intended to
// be used on Nomad Server shutdown.
Expand Down Expand Up @@ -552,16 +552,16 @@ func (c *consulConfigsAPI) Stop() {
c.stopped = true
}

func (c *consulConfigsAPI) SetIngressCE(ctx context.Context, namespace, service, cluster string, entry *structs.ConsulIngressConfigEntry) error {
return c.setCE(ctx, convertIngressCE(namespace, service, entry), cluster)
func (c *consulConfigsAPI) SetIngressCE(ctx context.Context, namespace, service, cluster, partition string, entry *structs.ConsulIngressConfigEntry) error {
return c.setCE(ctx, convertIngressCE(namespace, service, entry), cluster, partition)
}

func (c *consulConfigsAPI) SetTerminatingCE(ctx context.Context, namespace, service, cluster string, entry *structs.ConsulTerminatingConfigEntry) error {
return c.setCE(ctx, convertTerminatingCE(namespace, service, entry), cluster)
func (c *consulConfigsAPI) SetTerminatingCE(ctx context.Context, namespace, service, cluster, partition string, entry *structs.ConsulTerminatingConfigEntry) error {
return c.setCE(ctx, convertTerminatingCE(namespace, service, entry), cluster, partition)
}

// setCE will set the Configuration Entry of any type Consul supports.
func (c *consulConfigsAPI) setCE(ctx context.Context, entry api.ConfigEntry, cluster string) error {
func (c *consulConfigsAPI) setCE(ctx context.Context, entry api.ConfigEntry, cluster, partition string) error {
defer metrics.MeasureSince([]string{"nomad", "consul", "create_config_entry"}, time.Now())

// make sure the background deletion goroutine has not been stopped
Expand All @@ -579,7 +579,10 @@ func (c *consulConfigsAPI) setCE(ctx context.Context, entry api.ConfigEntry, clu
}

client := c.configsClientFunc(cluster)
_, _, err := client.Set(entry, &api.WriteOptions{Namespace: entry.GetNamespace()})
_, _, err := client.Set(entry, &api.WriteOptions{
Namespace: entry.GetNamespace(),
Partition: partition,
})
return err
}

Expand Down
65 changes: 44 additions & 21 deletions nomad/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (
"testing"
"time"

"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
)
Expand All @@ -27,56 +29,77 @@ var _ ConsulConfigsAPI = (*consulConfigsAPI)(nil)
func TestConsulConfigsAPI_SetCE(t *testing.T) {
ci.Parallel(t)

try := func(t *testing.T, expect error, f func(ConsulConfigsAPI) error) {
try := func(t *testing.T,
expectErr error,
expectKey string,
expectConfig api.ConfigEntry,
expectWriteOpts *api.WriteOptions,
f func(ConsulConfigsAPI) error) {

logger := testlog.HCLogger(t)
configsAPI := consul.NewMockConfigsAPI(logger)
configsAPI.SetError(expect)
configsAPI.SetError(expectErr)
configsAPIFunc := func(_ string) consul.ConfigAPI { return configsAPI }

c := NewConsulConfigsAPI(configsAPIFunc, logger)
err := f(c) // set the config entry

switch expect {
entry, wo := configsAPI.GetEntry(expectKey)
must.Eq(t, expectConfig, entry)
must.Eq(t, expectWriteOpts, wo)

switch expectErr {
case nil:
require.NoError(t, err)
must.NoError(t, err)
default:
require.Equal(t, expect, err)
must.EqError(t, err, expectErr.Error())
}
}

ctx := context.Background()

// existing behavior is no set namespace
consulNamespace := ""
partition := "foo"

ingressCE := new(structs.ConsulIngressConfigEntry)
t.Run("ingress ok", func(t *testing.T) {
try(t, nil, func(c ConsulConfigsAPI) error {
return c.SetIngressCE(
ctx, consulNamespace, "ig", structs.ConsulDefaultCluster, ingressCE)
})
try(t, nil, "ig",
&api.IngressGatewayConfigEntry{Kind: "ingress-gateway", Name: "ig"},
&api.WriteOptions{Partition: partition},
func(c ConsulConfigsAPI) error {
return c.SetIngressCE(
ctx, consulNamespace, "ig", structs.ConsulDefaultCluster, partition, ingressCE)
})
})

t.Run("ingress fail", func(t *testing.T) {
try(t, errors.New("consul broke"), func(c ConsulConfigsAPI) error {
return c.SetIngressCE(
ctx, consulNamespace, "ig", structs.ConsulDefaultCluster, ingressCE)
})
try(t, errors.New("consul broke"),
"ig", nil, nil,
func(c ConsulConfigsAPI) error {
return c.SetIngressCE(
ctx, consulNamespace, "ig", structs.ConsulDefaultCluster, partition, ingressCE)
})
})

terminatingCE := new(structs.ConsulTerminatingConfigEntry)
t.Run("terminating ok", func(t *testing.T) {
try(t, nil, func(c ConsulConfigsAPI) error {
return c.SetTerminatingCE(
ctx, consulNamespace, "tg", structs.ConsulDefaultCluster, terminatingCE)
})
try(t, nil, "tg",
&api.TerminatingGatewayConfigEntry{Kind: "terminating-gateway", Name: "tg"},
&api.WriteOptions{Partition: partition},
func(c ConsulConfigsAPI) error {
return c.SetTerminatingCE(
ctx, consulNamespace, "tg", structs.ConsulDefaultCluster, partition, terminatingCE)
})
})

t.Run("terminating fail", func(t *testing.T) {
try(t, errors.New("consul broke"), func(c ConsulConfigsAPI) error {
return c.SetTerminatingCE(
ctx, consulNamespace, "tg", structs.ConsulDefaultCluster, terminatingCE)
})
try(t, errors.New("consul broke"),
"tg", nil, nil,
func(c ConsulConfigsAPI) error {
return c.SetTerminatingCE(
ctx, consulNamespace, "tg", structs.ConsulDefaultCluster, partition, terminatingCE)
})
})

// also mesh
Expand Down
40 changes: 21 additions & 19 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,23 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return err
}

// Enforce Sentinel policies. Pass a copy of the job to prevent
// sentinel from altering it.
ns, err := snap.NamespaceByName(nil, args.RequestNamespace())
if err != nil {
return err
}

policyWarnings, err := j.enforceSubmitJob(args.PolicyOverride, args.Job.Copy(),
existingJob, args.GetIdentity().GetACLToken(), ns)
if err != nil {
return err
}
if policyWarnings != nil {
warnings = append(warnings, policyWarnings)
reply.Warnings = helper.MergeMultierrorWarnings(warnings...)
}

// Create or Update Consul Configuration Entries defined in the job. For now
// Nomad only supports Configuration Entries types
// - "ingress-gateway" for managing Ingress Gateways
Expand All @@ -282,34 +299,19 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis

for ns, entries := range args.Job.ConfigEntries() {
for service, entry := range entries.Ingress {
if errCE := j.srv.consulConfigEntries.SetIngressCE(ctx, ns, service, entries.Cluster, entry); errCE != nil {
if errCE := j.srv.consulConfigEntries.SetIngressCE(
ctx, ns, service, entries.Cluster, entries.Partition, entry); errCE != nil {
return errCE
}
}
for service, entry := range entries.Terminating {
if errCE := j.srv.consulConfigEntries.SetTerminatingCE(ctx, ns, service, entries.Cluster, entry); errCE != nil {
if errCE := j.srv.consulConfigEntries.SetTerminatingCE(
ctx, ns, service, entries.Cluster, entries.Partition, entry); errCE != nil {
return errCE
}
}
}

// Enforce Sentinel policies. Pass a copy of the job to prevent
// sentinel from altering it.
ns, err := snap.NamespaceByName(nil, args.RequestNamespace())
if err != nil {
return err
}

policyWarnings, err := j.enforceSubmitJob(args.PolicyOverride, args.Job.Copy(),
existingJob, args.GetIdentity().GetACLToken(), ns)
if err != nil {
return err
}
if policyWarnings != nil {
warnings = append(warnings, policyWarnings)
reply.Warnings = helper.MergeMultierrorWarnings(warnings...)
}

// Clear the Vault token
args.Job.VaultToken = ""

Expand Down
7 changes: 7 additions & 0 deletions nomad/structs/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
// a single Consul namespace.
type ConsulConfigEntries struct {
Cluster string
Partition string
Ingress map[string]*ConsulIngressConfigEntry
Terminating map[string]*ConsulTerminatingConfigEntry
}
Expand All @@ -43,9 +44,15 @@ func (j *Job) ConfigEntries() map[string]*ConsulConfigEntries {
if ig := gateway.Ingress; ig != nil {
collection[ns].Ingress[service.Name] = ig
collection[ns].Cluster = service.Cluster
if tg.Consul != nil {
collection[ns].Partition = tg.Consul.Partition
}
} else if term := gateway.Terminating; term != nil {
collection[ns].Terminating[service.Name] = term
collection[ns].Cluster = service.Cluster
if tg.Consul != nil {
collection[ns].Partition = tg.Consul.Partition
}
}
}
}
Expand Down
Loading