Skip to content

Commit

Permalink
consul: set partition for gateway config entries
Browse files Browse the repository at this point in the history
When we write Connect gateway configuation entries from the server, we're not
passing in the intended partition. This means we're using the server's own
partition to submit the configuration entries and this may not match. Note this
requires the Nomad server's token has permission to that partition.

Also, move the config entry write after we check Sentinel policies. This allows
us to return early if we hit a Sentinel error without making Consul RPCs first.
  • Loading branch information
tgross committed May 23, 2024
1 parent 5bfb500 commit c03dd9b
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 52 deletions.
7 changes: 7 additions & 0 deletions .changelog/22228.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:bug
consul: Fixed a bug where gateway config entries were written to the Nomad server agent's Consul partition and not the client's partition
```

```release-note:bug
consul: (Enterprise) Fixed a bug where gateway config entries were written before Sentinel policies were enforced
```
17 changes: 13 additions & 4 deletions command/agent/consul/config_entries_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/lib/lang"
)

var _ ConfigAPI = (*MockConfigsAPI)(nil)
Expand All @@ -18,7 +19,7 @@ type MockConfigsAPI struct {
lock sync.Mutex
state struct {
error error
entries map[string]api.ConfigEntry
entries map[string]lang.Pair[api.ConfigEntry, *api.WriteOptions]
}
}

Expand All @@ -27,8 +28,8 @@ func NewMockConfigsAPI(l hclog.Logger) *MockConfigsAPI {
logger: l.Named("mock_consul"),
state: struct {
error error
entries map[string]api.ConfigEntry
}{entries: make(map[string]api.ConfigEntry)},
entries map[string]lang.Pair[api.ConfigEntry, *api.WriteOptions]
}{entries: make(map[string]lang.Pair[api.ConfigEntry, *api.WriteOptions])},
}
}

Expand All @@ -41,7 +42,7 @@ func (m *MockConfigsAPI) Set(entry api.ConfigEntry, w *api.WriteOptions) (bool,
return false, nil, m.state.error
}

m.state.entries[entry.GetName()] = entry
m.state.entries[entry.GetName()] = lang.Pair[api.ConfigEntry, *api.WriteOptions]{First: entry, Second: w}

return true, &api.WriteMeta{
RequestTime: 1,
Expand All @@ -56,3 +57,11 @@ func (m *MockConfigsAPI) SetError(err error) {

m.state.error = err
}

// GetEntry is a helper method so that test can verify what's been written
func (m *MockConfigsAPI) GetEntry(kind string) (api.ConfigEntry, *api.WriteOptions) {
m.lock.Lock()
defer m.lock.Unlock()
entry := m.state.entries[kind]
return entry.First, entry.Second
}
19 changes: 11 additions & 8 deletions nomad/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,11 +510,11 @@ func (s *Server) purgeSITokenAccessors(accessors []*structs.SITokenAccessor) err
type ConsulConfigsAPI interface {
// SetIngressCE adds the given ConfigEntry to Consul, overwriting
// the previous entry if set.
SetIngressCE(ctx context.Context, namespace, service, cluster string, entry *structs.ConsulIngressConfigEntry) error
SetIngressCE(ctx context.Context, namespace, service, cluster, partition string, entry *structs.ConsulIngressConfigEntry) error

// SetTerminatingCE adds the given ConfigEntry to Consul, overwriting
// the previous entry if set.
SetTerminatingCE(ctx context.Context, namespace, service, cluster string, entry *structs.ConsulTerminatingConfigEntry) error
SetTerminatingCE(ctx context.Context, namespace, service, cluster, partition string, entry *structs.ConsulTerminatingConfigEntry) error

// Stop is used to stop additional creations of Configuration Entries. Intended to
// be used on Nomad Server shutdown.
Expand Down Expand Up @@ -552,16 +552,16 @@ func (c *consulConfigsAPI) Stop() {
c.stopped = true
}

func (c *consulConfigsAPI) SetIngressCE(ctx context.Context, namespace, service, cluster string, entry *structs.ConsulIngressConfigEntry) error {
return c.setCE(ctx, convertIngressCE(namespace, service, entry), cluster)
func (c *consulConfigsAPI) SetIngressCE(ctx context.Context, namespace, service, cluster, partition string, entry *structs.ConsulIngressConfigEntry) error {
return c.setCE(ctx, convertIngressCE(namespace, service, entry), cluster, partition)
}

func (c *consulConfigsAPI) SetTerminatingCE(ctx context.Context, namespace, service, cluster string, entry *structs.ConsulTerminatingConfigEntry) error {
return c.setCE(ctx, convertTerminatingCE(namespace, service, entry), cluster)
func (c *consulConfigsAPI) SetTerminatingCE(ctx context.Context, namespace, service, cluster, partition string, entry *structs.ConsulTerminatingConfigEntry) error {
return c.setCE(ctx, convertTerminatingCE(namespace, service, entry), cluster, partition)
}

// setCE will set the Configuration Entry of any type Consul supports.
func (c *consulConfigsAPI) setCE(ctx context.Context, entry api.ConfigEntry, cluster string) error {
func (c *consulConfigsAPI) setCE(ctx context.Context, entry api.ConfigEntry, cluster, partition string) error {
defer metrics.MeasureSince([]string{"nomad", "consul", "create_config_entry"}, time.Now())

// make sure the background deletion goroutine has not been stopped
Expand All @@ -579,7 +579,10 @@ func (c *consulConfigsAPI) setCE(ctx context.Context, entry api.ConfigEntry, clu
}

client := c.configsClientFunc(cluster)
_, _, err := client.Set(entry, &api.WriteOptions{Namespace: entry.GetNamespace()})
_, _, err := client.Set(entry, &api.WriteOptions{
Namespace: entry.GetNamespace(),
Partition: partition,
})
return err
}

Expand Down
65 changes: 44 additions & 21 deletions nomad/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (
"testing"
"time"

"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
)
Expand All @@ -27,56 +29,77 @@ var _ ConsulConfigsAPI = (*consulConfigsAPI)(nil)
func TestConsulConfigsAPI_SetCE(t *testing.T) {
ci.Parallel(t)

try := func(t *testing.T, expect error, f func(ConsulConfigsAPI) error) {
try := func(t *testing.T,
expectErr error,
expectKey string,
expectConfig api.ConfigEntry,
expectWriteOpts *api.WriteOptions,
f func(ConsulConfigsAPI) error) {

logger := testlog.HCLogger(t)
configsAPI := consul.NewMockConfigsAPI(logger)
configsAPI.SetError(expect)
configsAPI.SetError(expectErr)
configsAPIFunc := func(_ string) consul.ConfigAPI { return configsAPI }

c := NewConsulConfigsAPI(configsAPIFunc, logger)
err := f(c) // set the config entry

switch expect {
entry, wo := configsAPI.GetEntry(expectKey)
must.Eq(t, expectConfig, entry)
must.Eq(t, expectWriteOpts, wo)

switch expectErr {
case nil:
require.NoError(t, err)
must.NoError(t, err)
default:
require.Equal(t, expect, err)
must.EqError(t, err, expectErr.Error())
}
}

ctx := context.Background()

// existing behavior is no set namespace
consulNamespace := ""
partition := "foo"

ingressCE := new(structs.ConsulIngressConfigEntry)
t.Run("ingress ok", func(t *testing.T) {
try(t, nil, func(c ConsulConfigsAPI) error {
return c.SetIngressCE(
ctx, consulNamespace, "ig", structs.ConsulDefaultCluster, ingressCE)
})
try(t, nil, "ig",
&api.IngressGatewayConfigEntry{Kind: "ingress-gateway", Name: "ig"},
&api.WriteOptions{Partition: partition},
func(c ConsulConfigsAPI) error {
return c.SetIngressCE(
ctx, consulNamespace, "ig", structs.ConsulDefaultCluster, partition, ingressCE)
})
})

t.Run("ingress fail", func(t *testing.T) {
try(t, errors.New("consul broke"), func(c ConsulConfigsAPI) error {
return c.SetIngressCE(
ctx, consulNamespace, "ig", structs.ConsulDefaultCluster, ingressCE)
})
try(t, errors.New("consul broke"),
"ig", nil, nil,
func(c ConsulConfigsAPI) error {
return c.SetIngressCE(
ctx, consulNamespace, "ig", structs.ConsulDefaultCluster, partition, ingressCE)
})
})

terminatingCE := new(structs.ConsulTerminatingConfigEntry)
t.Run("terminating ok", func(t *testing.T) {
try(t, nil, func(c ConsulConfigsAPI) error {
return c.SetTerminatingCE(
ctx, consulNamespace, "tg", structs.ConsulDefaultCluster, terminatingCE)
})
try(t, nil, "tg",
&api.TerminatingGatewayConfigEntry{Kind: "terminating-gateway", Name: "tg"},
&api.WriteOptions{Partition: partition},
func(c ConsulConfigsAPI) error {
return c.SetTerminatingCE(
ctx, consulNamespace, "tg", structs.ConsulDefaultCluster, partition, terminatingCE)
})
})

t.Run("terminating fail", func(t *testing.T) {
try(t, errors.New("consul broke"), func(c ConsulConfigsAPI) error {
return c.SetTerminatingCE(
ctx, consulNamespace, "tg", structs.ConsulDefaultCluster, terminatingCE)
})
try(t, errors.New("consul broke"),
"tg", nil, nil,
func(c ConsulConfigsAPI) error {
return c.SetTerminatingCE(
ctx, consulNamespace, "tg", structs.ConsulDefaultCluster, partition, terminatingCE)
})
})

// also mesh
Expand Down
40 changes: 21 additions & 19 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,23 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return err
}

// Enforce Sentinel policies. Pass a copy of the job to prevent
// sentinel from altering it.
ns, err := snap.NamespaceByName(nil, args.RequestNamespace())
if err != nil {
return err
}

policyWarnings, err := j.enforceSubmitJob(args.PolicyOverride, args.Job.Copy(),
existingJob, args.GetIdentity().GetACLToken(), ns)
if err != nil {
return err
}
if policyWarnings != nil {
warnings = append(warnings, policyWarnings)
reply.Warnings = helper.MergeMultierrorWarnings(warnings...)
}

// Create or Update Consul Configuration Entries defined in the job. For now
// Nomad only supports Configuration Entries types
// - "ingress-gateway" for managing Ingress Gateways
Expand All @@ -281,34 +298,19 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis

for ns, entries := range args.Job.ConfigEntries() {
for service, entry := range entries.Ingress {
if errCE := j.srv.consulConfigEntries.SetIngressCE(ctx, ns, service, entries.Cluster, entry); errCE != nil {
if errCE := j.srv.consulConfigEntries.SetIngressCE(
ctx, ns, service, entries.Cluster, entries.Partition, entry); errCE != nil {
return errCE
}
}
for service, entry := range entries.Terminating {
if errCE := j.srv.consulConfigEntries.SetTerminatingCE(ctx, ns, service, entries.Cluster, entry); errCE != nil {
if errCE := j.srv.consulConfigEntries.SetTerminatingCE(
ctx, ns, service, entries.Cluster, entries.Partition, entry); errCE != nil {
return errCE
}
}
}

// Enforce Sentinel policies. Pass a copy of the job to prevent
// sentinel from altering it.
ns, err := snap.NamespaceByName(nil, args.RequestNamespace())
if err != nil {
return err
}

policyWarnings, err := j.enforceSubmitJob(args.PolicyOverride, args.Job.Copy(),
existingJob, args.GetIdentity().GetACLToken(), ns)
if err != nil {
return err
}
if policyWarnings != nil {
warnings = append(warnings, policyWarnings)
reply.Warnings = helper.MergeMultierrorWarnings(warnings...)
}

// Clear the Vault token
args.Job.VaultToken = ""

Expand Down
7 changes: 7 additions & 0 deletions nomad/structs/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
// a single Consul namespace.
type ConsulConfigEntries struct {
Cluster string
Partition string
Ingress map[string]*ConsulIngressConfigEntry
Terminating map[string]*ConsulTerminatingConfigEntry
}
Expand All @@ -43,9 +44,15 @@ func (j *Job) ConfigEntries() map[string]*ConsulConfigEntries {
if ig := gateway.Ingress; ig != nil {
collection[ns].Ingress[service.Name] = ig
collection[ns].Cluster = service.Cluster
if tg.Consul != nil {
collection[ns].Partition = tg.Consul.Partition
}
} else if term := gateway.Terminating; term != nil {
collection[ns].Terminating[service.Name] = term
collection[ns].Cluster = service.Cluster
if tg.Consul != nil {
collection[ns].Partition = tg.Consul.Partition
}
}
}
}
Expand Down

0 comments on commit c03dd9b

Please sign in to comment.