Skip to content

Commit

Permalink
[v16] Workload ID: Add WorkloadIdentity local service and cache config (
Browse files Browse the repository at this point in the history
#49942) (#49990)

* Workload ID: Add WorkloadIdentity local service and cache config (#49942)

* Add WorkloadIdentity store and cache

* Update lib/services/local/workload_identity.go

Co-authored-by: Edward Dowling <[email protected]>

* Update lib/services/local/workload_identity.go

Co-authored-by: Edward Dowling <[email protected]>

* Update lib/cache/resource_workload_identity.go

Co-authored-by: Edoardo Spadolini <[email protected]>

---------

Co-authored-by: Edward Dowling <[email protected]>
Co-authored-by: Edoardo Spadolini <[email protected]>

* Fix for v16

* Fix dodgy backport with incorrect Kind

---------

Co-authored-by: Edward Dowling <[email protected]>
Co-authored-by: Edoardo Spadolini <[email protected]>
  • Loading branch information
3 people authored Dec 10, 2024
1 parent 90ab485 commit 6451acb
Show file tree
Hide file tree
Showing 16 changed files with 1,109 additions and 0 deletions.
2 changes: 2 additions & 0 deletions lib/auth/accesspoint/accesspoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type Config struct {
Users services.UsersService
WebSession types.WebSessionInterface
WebToken types.WebTokenInterface
WorkloadIdentity cache.WorkloadIdentityReader
WindowsDesktops services.WindowsDesktops
AutoUpdateService services.AutoUpdateServiceGetter
}
Expand Down Expand Up @@ -198,6 +199,7 @@ func NewCache(cfg Config) (*cache.Cache, error) {
Users: cfg.Users,
WebSession: cfg.WebSession,
WebToken: cfg.WebToken,
WorkloadIdentity: cfg.WorkloadIdentity,
WindowsDesktops: cfg.WindowsDesktops,
}

Expand Down
9 changes: 9 additions & 0 deletions lib/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,13 @@ func NewServer(cfg *InitConfig, opts ...ServerOption) (*Server, error) {
return nil, trace.Wrap(err, "creating SPIFFEFederation service")
}
}
if cfg.WorkloadIdentity == nil {
workloadIdentity, err := local.NewWorkloadIdentityService(cfg.Backend)
if err != nil {
return nil, trace.Wrap(err, "creating WorkloadIdentity service")
}
cfg.WorkloadIdentity = workloadIdentity
}

limiter, err := limiter.NewConnectionsLimiter(limiter.Config{
MaxConnections: defaults.LimiterMaxConcurrentSignatures,
Expand Down Expand Up @@ -455,6 +462,7 @@ func NewServer(cfg *InitConfig, opts ...ServerOption) (*Server, error) {
BotInstance: cfg.BotInstance,
SPIFFEFederations: cfg.SPIFFEFederations,
StaticHostUser: cfg.StaticHostUsers,
WorkloadIdentities: cfg.WorkloadIdentity,
}

as := Server{
Expand Down Expand Up @@ -668,6 +676,7 @@ type Services struct {
services.BotInstance
services.StaticHostUser
services.AutoUpdateService
services.WorkloadIdentities
}

// GetWebSession returns existing web session described by req.
Expand Down
7 changes: 7 additions & 0 deletions lib/auth/authclient/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
userprovisioningpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/userprovisioning/v2"
userspb "github.com/gravitational/teleport/api/gen/proto/go/teleport/users/v1"
usertasksv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/usertasks/v1"
workloadidentityv1pb "github.com/gravitational/teleport/api/gen/proto/go/teleport/workloadidentity/v1"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/accesslist"
"github.com/gravitational/teleport/api/types/discoveryconfig"
Expand Down Expand Up @@ -1211,6 +1212,12 @@ type Cache interface {
// GetAccessGraphSettings returns the access graph settings.
GetAccessGraphSettings(context.Context) (*clusterconfigpb.AccessGraphSettings, error)

// GetWorkloadIdentity gets a WorkloadIdentity by name.
GetWorkloadIdentity(ctx context.Context, name string) (*workloadidentityv1pb.WorkloadIdentity, error)
// ListWorkloadIdentities lists all SPIFFE Federations using Google style
// pagination.
ListWorkloadIdentities(ctx context.Context, pageSize int, lastToken string) ([]*workloadidentityv1pb.WorkloadIdentity, string, error)

// ListStaticHostUsers lists static host users.
ListStaticHostUsers(ctx context.Context, pageSize int, startKey string) ([]*userprovisioningpb.StaticHostUser, string, error)
// GetStaticHostUser returns a static host user by name.
Expand Down
1 change: 1 addition & 0 deletions lib/auth/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ func NewTestAuthServer(cfg TestAuthServerConfig) (*TestAuthServer, error) {
SecReports: svces.SecReports,
SnowflakeSession: svces.Identity,
SPIFFEFederations: svces.SPIFFEFederations,
WorkloadIdentity: svces.WorkloadIdentities,
StaticHostUsers: svces.StaticHostUser,
Trust: svces.TrustInternal,
UserGroups: svces.UserGroups,
Expand Down
4 changes: 4 additions & 0 deletions lib/auth/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,10 @@ type InitConfig struct {
// SPIFFEFederations is a service that manages storing SPIFFE federations.
SPIFFEFederations services.SPIFFEFederations

// WorkloadIdentity is the service for storing and retrieving
// WorkloadIdentity resources.
WorkloadIdentity services.WorkloadIdentities

// StaticHostUsers is a service that manages host users that should be
// created on SSH nodes.
StaticHostUsers services.StaticHostUser
Expand Down
12 changes: 12 additions & 0 deletions lib/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ func ForAuth(cfg Config) Config {
{Kind: types.KindUserTask},
{Kind: types.KindAutoUpdateVersion},
{Kind: types.KindAutoUpdateConfig},
{Kind: types.KindWorkloadIdentity},
}
cfg.QueueSize = defaults.AuthQueueSize
// We don't want to enable partial health for auth cache because auth uses an event stream
Expand Down Expand Up @@ -536,6 +537,7 @@ type Cache struct {
accessMontoringRuleCache services.AccessMonitoringRules
spiffeFederationCache spiffeFederationCacher
staticHostUsersCache *local.StaticHostUserService
workloadIdentityCache workloadIdentityCacher

// closed indicates that the cache has been closed
closed atomic.Bool
Expand Down Expand Up @@ -716,6 +718,9 @@ type Config struct {
SPIFFEFederations SPIFFEFederationReader
// StaticHostUsers is the static host user service.
StaticHostUsers services.StaticHostUser
// WorkloadIdentity is the upstream Workload Identities service that we're
// caching
WorkloadIdentity WorkloadIdentityReader
// Backend is a backend for local cache
Backend backend.Backend
// MaxRetryPeriod is the maximum period between cache retries on failures
Expand Down Expand Up @@ -969,6 +974,12 @@ func New(config Config) (*Cache, error) {
return nil, trace.Wrap(err)
}

workloadIdentityCache, err := local.NewWorkloadIdentityService(config.Backend)
if err != nil {
cancel()
return nil, trace.Wrap(err)
}

staticHostUserCache, err := local.NewStaticHostUserService(config.Backend)
if err != nil {
cancel()
Expand Down Expand Up @@ -1019,6 +1030,7 @@ func New(config Config) (*Cache, error) {
kubeWaitingContsCache: kubeWaitingContsCache,
spiffeFederationCache: spiffeFederationCache,
staticHostUsersCache: staticHostUserCache,
workloadIdentityCache: workloadIdentityCache,
Logger: log.WithFields(log.Fields{
teleport.ComponentKey: config.Component,
}),
Expand Down
13 changes: 13 additions & 0 deletions lib/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ type testPack struct {
spiffeFederations *local.SPIFFEFederationService
staticHostUsers services.StaticHostUser
autoUpdateService services.AutoUpdateService
workloadIdentity *local.WorkloadIdentityService
}

// testFuncs are functions to support testing an object in a cache.
Expand Down Expand Up @@ -350,6 +351,12 @@ func newPackWithoutCache(dir string, opts ...packOption) (*testPack, error) {
}
p.spiffeFederations = spiffeFederationsSvc

workloadIdentitySvc, err := local.NewWorkloadIdentityService(p.backend)
if err != nil {
return nil, trace.Wrap(err)
}
p.workloadIdentity = workloadIdentitySvc

databaseObjectsSvc, err := local.NewDatabaseObjectService(p.backend)
if err != nil {
return nil, trace.Wrap(err)
Expand Down Expand Up @@ -428,6 +435,7 @@ func newPack(dir string, setupConfig func(c Config) Config, opts ...packOption)
DatabaseObjects: p.databaseObjects,
StaticHostUsers: p.staticHostUsers,
AutoUpdateService: p.autoUpdateService,
WorkloadIdentity: p.workloadIdentity,
MaxRetryPeriod: 200 * time.Millisecond,
EventsC: p.eventsC,
}))
Expand Down Expand Up @@ -836,6 +844,7 @@ func TestCompletenessInit(t *testing.T) {
SPIFFEFederations: p.spiffeFederations,
StaticHostUsers: p.staticHostUsers,
AutoUpdateService: p.autoUpdateService,
WorkloadIdentity: p.workloadIdentity,
MaxRetryPeriod: 200 * time.Millisecond,
EventsC: p.eventsC,
}))
Expand Down Expand Up @@ -917,6 +926,7 @@ func TestCompletenessReset(t *testing.T) {
SPIFFEFederations: p.spiffeFederations,
StaticHostUsers: p.staticHostUsers,
AutoUpdateService: p.autoUpdateService,
WorkloadIdentity: p.workloadIdentity,
MaxRetryPeriod: 200 * time.Millisecond,
EventsC: p.eventsC,
}))
Expand Down Expand Up @@ -1124,6 +1134,7 @@ func TestListResources_NodesTTLVariant(t *testing.T) {
SPIFFEFederations: p.spiffeFederations,
StaticHostUsers: p.staticHostUsers,
AutoUpdateService: p.autoUpdateService,
WorkloadIdentity: p.workloadIdentity,
MaxRetryPeriod: 200 * time.Millisecond,
EventsC: p.eventsC,
neverOK: true, // ensure reads are never healthy
Expand Down Expand Up @@ -1216,6 +1227,7 @@ func initStrategy(t *testing.T) {
SPIFFEFederations: p.spiffeFederations,
StaticHostUsers: p.staticHostUsers,
AutoUpdateService: p.autoUpdateService,
WorkloadIdentity: p.workloadIdentity,
MaxRetryPeriod: 200 * time.Millisecond,
EventsC: p.eventsC,
}))
Expand Down Expand Up @@ -3454,6 +3466,7 @@ func TestCacheWatchKindExistsInEvents(t *testing.T) {
types.KindUserTask: types.Resource153ToLegacy(newUserTasks(t)),
types.KindAutoUpdateConfig: types.Resource153ToLegacy(newAutoUpdateConfig(t)),
types.KindAutoUpdateVersion: types.Resource153ToLegacy(newAutoUpdateVersion(t)),
types.KindWorkloadIdentity: types.Resource153ToLegacy(newWorkloadIdentity("some_identifier")),
}

for name, cfg := range cases {
Expand Down
11 changes: 11 additions & 0 deletions lib/cache/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
userprovisioningpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/userprovisioning/v2"
userspb "github.com/gravitational/teleport/api/gen/proto/go/teleport/users/v1"
usertasksv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/usertasks/v1"
workloadidentityv1pb "github.com/gravitational/teleport/api/gen/proto/go/teleport/workloadidentity/v1"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/accesslist"
"github.com/gravitational/teleport/api/types/discoveryconfig"
Expand Down Expand Up @@ -265,6 +266,7 @@ type cacheCollections struct {
spiffeFederations collectionReader[SPIFFEFederationReader]
autoUpdateConfigs collectionReader[autoUpdateConfigGetter]
autoUpdateVersions collectionReader[autoUpdateVersionGetter]
workloadIdentity collectionReader[WorkloadIdentityReader]
}

// setupCollections returns a registry of collections.
Expand Down Expand Up @@ -784,6 +786,15 @@ func setupCollections(c *Cache, watches []types.WatchKind) (*cacheCollections, e
watch: watch,
}
collections.byKind[resourceKind] = collections.accessGraphSettings
case types.KindWorkloadIdentity:
if c.Config.WorkloadIdentity == nil {
return nil, trace.BadParameter("missing parameter WorkloadIdentity")
}
collections.workloadIdentity = &genericCollection[*workloadidentityv1pb.WorkloadIdentity, WorkloadIdentityReader, workloadIdentityExecutor]{
cache: c,
watch: watch,
}
collections.byKind[resourceKind] = collections.workloadIdentity
case types.KindAutoUpdateConfig:
if c.AutoUpdateService == nil {
return nil, trace.BadParameter("missing parameter AutoUpdateService")
Expand Down
119 changes: 119 additions & 0 deletions lib/cache/resource_workload_identity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Teleport
// Copyright (C) 2024 Gravitational, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

//nolint:unused // Because the executors generate a large amount of false positives.
package cache

import (
"context"

"github.com/gravitational/trace"

workloadidentityv1pb "github.com/gravitational/teleport/api/gen/proto/go/teleport/workloadidentity/v1"
"github.com/gravitational/teleport/api/types"
)

// WorkloadIdentityReader is an interface that defines the methods for getting
// WorkloadIdentity. This is returned as the reader for the WorkloadIdentity
// collection but is also used by the executor to read the full list of
// WorkloadIdentity on initialization.
type WorkloadIdentityReader interface {
ListWorkloadIdentities(ctx context.Context, pageSize int, nextToken string) ([]*workloadidentityv1pb.WorkloadIdentity, string, error)
GetWorkloadIdentity(ctx context.Context, name string) (*workloadidentityv1pb.WorkloadIdentity, error)
}

// workloadIdentityCacher is used for storing and retrieving WorkloadIdentity
// from the cache's local backend.
type workloadIdentityCacher interface {
WorkloadIdentityReader
UpsertWorkloadIdentity(ctx context.Context, resource *workloadidentityv1pb.WorkloadIdentity) (*workloadidentityv1pb.WorkloadIdentity, error)
DeleteWorkloadIdentity(ctx context.Context, name string) error
DeleteAllWorkloadIdentities(ctx context.Context) error
}

type workloadIdentityExecutor struct{}

var _ executor[*workloadidentityv1pb.WorkloadIdentity, WorkloadIdentityReader] = workloadIdentityExecutor{}

func (workloadIdentityExecutor) getAll(ctx context.Context, cache *Cache, loadSecrets bool) ([]*workloadidentityv1pb.WorkloadIdentity, error) {
var out []*workloadidentityv1pb.WorkloadIdentity
var nextToken string
for {
var page []*workloadidentityv1pb.WorkloadIdentity
var err error

const defaultPageSize = 0
page, nextToken, err = cache.Config.WorkloadIdentity.ListWorkloadIdentities(ctx, defaultPageSize, nextToken)
if err != nil {
return nil, trace.Wrap(err)
}
out = append(out, page...)
if nextToken == "" {
break
}
}
return out, nil
}

func (workloadIdentityExecutor) upsert(ctx context.Context, cache *Cache, resource *workloadidentityv1pb.WorkloadIdentity) error {
_, err := cache.workloadIdentityCache.UpsertWorkloadIdentity(ctx, resource)
return trace.Wrap(err)
}

func (workloadIdentityExecutor) deleteAll(ctx context.Context, cache *Cache) error {
return trace.Wrap(cache.workloadIdentityCache.DeleteAllWorkloadIdentities(ctx))
}

func (workloadIdentityExecutor) delete(ctx context.Context, cache *Cache, resource types.Resource) error {
return trace.Wrap(cache.workloadIdentityCache.DeleteWorkloadIdentity(ctx, resource.GetName()))
}

func (workloadIdentityExecutor) isSingleton() bool { return false }

func (workloadIdentityExecutor) getReader(cache *Cache, cacheOK bool) WorkloadIdentityReader {
if cacheOK {
return cache.workloadIdentityCache
}
return cache.Config.WorkloadIdentity
}

// ListWorkloadIdentities returns a paginated list of WorkloadIdentity resources.
func (c *Cache) ListWorkloadIdentities(ctx context.Context, pageSize int, nextToken string) ([]*workloadidentityv1pb.WorkloadIdentity, string, error) {
ctx, span := c.Tracer.Start(ctx, "cache/ListWorkloadIdentities")
defer span.End()

rg, err := readCollectionCache(c, c.collections.workloadIdentity)
if err != nil {
return nil, "", trace.Wrap(err)
}
defer rg.Release()
out, nextKey, err := rg.reader.ListWorkloadIdentities(ctx, pageSize, nextToken)
return out, nextKey, trace.Wrap(err)
}

// GetWorkloadIdentity returns a single WorkloadIdentity by name
func (c *Cache) GetWorkloadIdentity(ctx context.Context, name string) (*workloadidentityv1pb.WorkloadIdentity, error) {
ctx, span := c.Tracer.Start(ctx, "cache/GetWorkloadIdentity")
defer span.End()

rg, err := readCollectionCache(c, c.collections.workloadIdentity)
if err != nil {
return nil, trace.Wrap(err)
}
defer rg.Release()
out, err := rg.reader.GetWorkloadIdentity(ctx, name)
return out, trace.Wrap(err)
}
Loading

0 comments on commit 6451acb

Please sign in to comment.