Skip to content

Commit

Permalink
Merge pull request #8003 from hashicorp/c-use-taskkind
Browse files Browse the repository at this point in the history
consul/connect: use task kind to get service name
  • Loading branch information
shoenig authored May 19, 2020
2 parents 7d71b5e + 0dd1596 commit 36684bd
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 53 deletions.
33 changes: 18 additions & 15 deletions nomad/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,27 +52,30 @@ const (
ConsulPolicyWrite = "write"
)

type ServiceIdentityIndex struct {
type ServiceIdentityRequest struct {
TaskKind structs.TaskKind
TaskName string
ClusterID string
AllocID string
TaskName string
}

func (sii ServiceIdentityIndex) Validate() error {
func (sir ServiceIdentityRequest) Validate() error {
switch {
case sii.ClusterID == "":
case sir.ClusterID == "":
return errors.New("cluster id not set")
case sii.AllocID == "":
case sir.AllocID == "":
return errors.New("alloc id not set")
case sii.TaskName == "":
case sir.TaskName == "":
return errors.New("task name not set")
case sir.TaskKind == "":
return errors.New("task kind not set")
default:
return nil
}
}

func (sii ServiceIdentityIndex) Description() string {
return fmt.Sprintf(siTokenDescriptionFmt, sii.ClusterID, sii.AllocID, sii.TaskName)
func (sir ServiceIdentityRequest) Description() string {
return fmt.Sprintf(siTokenDescriptionFmt, sir.ClusterID, sir.AllocID, sir.TaskName)
}

// ConsulACLsAPI is an abstraction over the consul/api.ACL API used by Nomad
Expand All @@ -87,7 +90,7 @@ type ConsulACLsAPI interface {
CheckSIPolicy(ctx context.Context, task, secretID string) error

// Create instructs Consul to create a Service Identity token.
CreateToken(context.Context, ServiceIdentityIndex) (*structs.SIToken, error)
CreateToken(context.Context, ServiceIdentityRequest) (*structs.SIToken, error)

// RevokeTokens instructs Consul to revoke the given token accessors.
RevokeTokens(context.Context, []*structs.SITokenAccessor, bool) bool
Expand Down Expand Up @@ -194,7 +197,7 @@ func (c *consulACLsAPI) CheckSIPolicy(ctx context.Context, task, secretID string
return nil
}

func (c *consulACLsAPI) CreateToken(ctx context.Context, sii ServiceIdentityIndex) (*structs.SIToken, error) {
func (c *consulACLsAPI) CreateToken(ctx context.Context, sir ServiceIdentityRequest) (*structs.SIToken, error) {
defer metrics.MeasureSince([]string{"nomad", "consul", "create_token"}, time.Now())

// make sure the background token revocations have not been stopped
Expand All @@ -207,16 +210,16 @@ func (c *consulACLsAPI) CreateToken(ctx context.Context, sii ServiceIdentityInde
}

// sanity check the metadata for the token we want
if err := sii.Validate(); err != nil {
if err := sir.Validate(); err != nil {
return nil, err
}

// the SI token created must be for the service, not the sidecar of the service
// https://www.consul.io/docs/acl/acl-system.html#acl-service-identities
serviceName := strings.TrimPrefix(sii.TaskName, structs.ConnectProxyPrefix+"-")
service := sir.TaskKind.Value()
partial := &api.ACLToken{
Description: sii.Description(),
ServiceIdentities: []*api.ACLServiceIdentity{{ServiceName: serviceName}},
Description: sir.Description(),
ServiceIdentities: []*api.ACLServiceIdentity{{ServiceName: service}},
}

// Ensure we are under our rate limit.
Expand All @@ -230,7 +233,7 @@ func (c *consulACLsAPI) CreateToken(ctx context.Context, sii ServiceIdentityInde
}

return &structs.SIToken{
TaskName: sii.TaskName,
TaskName: sir.TaskName,
AccessorID: token.AccessorID,
SecretID: token.SecretID,
}, nil
Expand Down
16 changes: 9 additions & 7 deletions nomad/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (m *mockConsulACLsAPI) CheckSIPolicy(_ context.Context, _, _ string) error
panic("not implemented yet")
}

func (m *mockConsulACLsAPI) CreateToken(_ context.Context, _ ServiceIdentityIndex) (*structs.SIToken, error) {
func (m *mockConsulACLsAPI) CreateToken(_ context.Context, _ ServiceIdentityRequest) (*structs.SIToken, error) {
panic("not implemented yet")
}

Expand Down Expand Up @@ -88,10 +88,11 @@ func TestConsulACLsAPI_CreateToken(t *testing.T) {
c := NewConsulACLsAPI(aclAPI, logger, nil)

ctx := context.Background()
sii := ServiceIdentityIndex{
sii := ServiceIdentityRequest{
AllocID: uuid.Generate(),
ClusterID: uuid.Generate(),
TaskName: "my-task1",
TaskName: "my-task1-sidecar-proxy",
TaskKind: structs.NewTaskKind(structs.ConnectProxyPrefix, "my-service"),
}

token, err := c.CreateToken(ctx, sii)
Expand All @@ -101,7 +102,7 @@ func TestConsulACLsAPI_CreateToken(t *testing.T) {
require.Nil(t, token)
} else {
require.NoError(t, err)
require.Equal(t, "my-task1", token.TaskName)
require.Equal(t, "my-task1-sidecar-proxy", token.TaskName)
require.True(t, helper.IsUUID(token.AccessorID))
require.True(t, helper.IsUUID(token.SecretID))
}
Expand All @@ -126,10 +127,11 @@ func TestConsulACLsAPI_RevokeTokens(t *testing.T) {
c := NewConsulACLsAPI(aclAPI, logger, nil)

ctx := context.Background()
generated, err := c.CreateToken(ctx, ServiceIdentityIndex{
generated, err := c.CreateToken(ctx, ServiceIdentityRequest{
ClusterID: uuid.Generate(),
AllocID: uuid.Generate(),
TaskName: "task1",
TaskName: "task1-sidecar-proxy",
TaskKind: structs.NewTaskKind(structs.ConnectProxyPrefix, "service1"),
})
require.NoError(t, err)

Expand Down Expand Up @@ -235,7 +237,7 @@ func TestConsulACLsAPI_Stop(t *testing.T) {

c := setup(t)
c.Stop()
_, err := c.CreateToken(context.Background(), ServiceIdentityIndex{
_, err := c.CreateToken(context.Background(), ServiceIdentityRequest{
ClusterID: "",
AllocID: "",
TaskName: "",
Expand Down
2 changes: 1 addition & 1 deletion nomad/job_endpoint_hook_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func newConnectTask(serviceName string) *structs.Task {
task := &structs.Task{
// Name is used in container name so must start with '[A-Za-z0-9]'
Name: fmt.Sprintf("%s-%s", structs.ConnectProxyPrefix, serviceName),
Kind: structs.TaskKind(fmt.Sprintf("%s:%s", structs.ConnectProxyPrefix, serviceName)),
Kind: structs.NewTaskKind(structs.ConnectProxyPrefix, serviceName),
Driver: "docker",
Config: connectDriverConfig,
ShutdownDelay: 5 * time.Second,
Expand Down
50 changes: 29 additions & 21 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1666,6 +1666,11 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest, reply *st
return nil
}

type connectTask struct {
TaskKind structs.TaskKind
TaskName string
}

func (n *Node) DeriveSIToken(args *structs.DeriveSITokenRequest, reply *structs.DeriveSITokenResponse) error {
setError := func(e error, recoverable bool) {
if e != nil {
Expand Down Expand Up @@ -1752,13 +1757,11 @@ func (n *Node) DeriveSIToken(args *structs.DeriveSITokenRequest, reply *structs.
}

// make sure each task in args.Tasks is a connect-enabled task
// note: the tasks at this point should be the "connect-sidecar-<id>" name
//
unneeded := tasksNotUsingConnect(tg, args.Tasks)
if len(unneeded) > 0 {
notConnect, tasks := connectTasks(tg, args.Tasks)
if len(notConnect) > 0 {
setError(fmt.Errorf(
"Requested Consul Service Identity tokens for tasks that are not Connect enabled: %v",
strings.Join(unneeded, ", "),
strings.Join(notConnect, ", "),
), false)
}

Expand All @@ -1780,8 +1783,8 @@ func (n *Node) DeriveSIToken(args *structs.DeriveSITokenRequest, reply *structs.

// would like to pull some of this out...

// Create the SI tokens
input := make(chan string, numWorkers)
// Create the SI tokens from a slice of task name + connect service
input := make(chan connectTask, numWorkers)
results := make(map[string]*structs.SIToken, numWorkers)
for i := 0; i < numWorkers; i++ {
g.Go(func() error {
Expand All @@ -1791,17 +1794,16 @@ func (n *Node) DeriveSIToken(args *structs.DeriveSITokenRequest, reply *structs.
if !ok {
return nil
}

sii := ServiceIdentityIndex{
secret, err := n.srv.consulACLs.CreateToken(ctx, ServiceIdentityRequest{
TaskKind: task.TaskKind,
TaskName: task.TaskName,
ClusterID: clusterID,
AllocID: alloc.ID,
TaskName: task,
}
secret, err := n.srv.consulACLs.CreateToken(ctx, sii)
})
if err != nil {
return err
}
results[task] = secret
results[task.TaskName] = secret
case <-ctx.Done():
return nil
}
Expand All @@ -1812,11 +1814,11 @@ func (n *Node) DeriveSIToken(args *structs.DeriveSITokenRequest, reply *structs.
// Send the input
go func() {
defer close(input)
for _, task := range args.Tasks {
for _, connectTask := range tasks {
select {
case <-ctx.Done():
return
case input <- task:
case input <- connectTask:
}
}
}()
Expand Down Expand Up @@ -1875,24 +1877,30 @@ func (n *Node) DeriveSIToken(args *structs.DeriveSITokenRequest, reply *structs.
return nil
}

func tasksNotUsingConnect(tg *structs.TaskGroup, tasks []string) []string {
var unneeded []string
func connectTasks(tg *structs.TaskGroup, tasks []string) ([]string, []connectTask) {
var notConnect []string
var usesConnect []connectTask
for _, task := range tasks {
tgTask := tg.LookupTask(task)
if !taskUsesConnect(tgTask) {
unneeded = append(unneeded, task)
notConnect = append(notConnect, task)
} else {
usesConnect = append(usesConnect, connectTask{
TaskName: task,
TaskKind: tgTask.Kind,
})
}
}
return unneeded
return notConnect, usesConnect
}

func taskUsesConnect(task *structs.Task) bool {
if task == nil {
// not even in the task group
return false
}
// todo(shoenig): TBD what Kind does a native task have?
return task.Kind.IsConnectProxy()

return task.Kind.IsConnectProxy() || task.Kind.IsConnectNative()
}

func (n *Node) EmitEvents(args *structs.EmitNodeEventsRequest, reply *structs.EmitNodeEventsResponse) error {
Expand Down
22 changes: 17 additions & 5 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3168,16 +3168,19 @@ func TestClientEndpoint_tasksNotUsingConnect(t *testing.T) {
Name: "testgroup",
Tasks: []*structs.Task{{
Name: "connect-proxy-service1",
Kind: "connect-proxy:service1",
Kind: structs.NewTaskKind(structs.ConnectProxyPrefix, "service1"),
}, {
Name: "incorrect-task3",
Kind: "incorrect:task3",
}, {
Name: "connect-proxy-service4",
Kind: "connect-proxy:service4",
Kind: structs.NewTaskKind(structs.ConnectProxyPrefix, "service4"),
}, {
Name: "incorrect-task5",
Kind: "incorrect:task5",
}, {
Name: "task6",
Kind: structs.NewTaskKind(structs.ConnectNativePrefix, "service6"),
}},
}

Expand All @@ -3187,11 +3190,20 @@ func TestClientEndpoint_tasksNotUsingConnect(t *testing.T) {
"task3", // no
"connect-proxy-service4", // yes
"task5", // no
"task6", // yes, native
}

notConnect, usingConnect := connectTasks(taskGroup, requestingTasks)

notConnectExp := []string{"task2", "task3", "task5"}
usingConnectExp := []connectTask{
{TaskName: "connect-proxy-service1", TaskKind: "connect-proxy:service1"},
{TaskName: "connect-proxy-service4", TaskKind: "connect-proxy:service4"},
{TaskName: "task6", TaskKind: "connect-native:service6"},
}

unneeded := tasksNotUsingConnect(taskGroup, requestingTasks)
exp := []string{"task2", "task3", "task5"}
require.Equal(t, exp, unneeded)
require.Equal(t, notConnectExp, notConnect)
require.Equal(t, usingConnectExp, usingConnect)
}

func mutateConnectJob(t *testing.T, job *structs.Job) {
Expand Down
2 changes: 1 addition & 1 deletion nomad/structs/service_identities.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import "errors"

// An SIToken is the important bits of a Service Identity token generated by Consul.
type SIToken struct {
TaskName string
TaskName string // the nomad task backing the consul service (native or sidecar)
AccessorID string
SecretID string
}
Expand Down
20 changes: 17 additions & 3 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6313,6 +6313,10 @@ func (t *Task) Warnings() error {
// task to the service name of which it is a connect proxy for.
type TaskKind string

func NewTaskKind(name, identifier string) TaskKind {
return TaskKind(fmt.Sprintf("%s:%s", name, identifier))
}

// Name returns the kind name portion of the TaskKind
func (k TaskKind) Name() string {
return strings.Split(string(k), ":")[0]
Expand All @@ -6332,9 +6336,19 @@ func (k TaskKind) IsConnectProxy() bool {
return strings.HasPrefix(string(k), ConnectProxyPrefix+":") && len(k) > len(ConnectProxyPrefix)+1
}

// ConnectProxyPrefix is the prefix used for fields referencing a Consul Connect
// Proxy
const ConnectProxyPrefix = "connect-proxy"
func (k TaskKind) IsConnectNative() bool {
return strings.HasPrefix(string(k), ConnectNativePrefix+":") && len(k) > len(ConnectNativePrefix)+1
}

const (
// ConnectProxyPrefix is the prefix used for fields referencing a Consul Connect
// Proxy
ConnectProxyPrefix = "connect-proxy"

// ConnectNativePrefix is the prefix used for fields referencing a Connect
// Native Task
ConnectNativePrefix = "connect-native"
)

// ValidateConnectProxyService checks that the service that is being
// proxied by this task exists in the task group and contains
Expand Down

0 comments on commit 36684bd

Please sign in to comment.