Skip to content

Commit

Permalink
as: Add attributes registry config and initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
vlasebian committed Jan 7, 2025
1 parent dc0b1cf commit 6890128
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 10 deletions.
9 changes: 9 additions & 0 deletions cmd/internal/shared/applicationserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ var DefaultApplicationServerConfig = applicationserver.Config{
TTL: 14 * 24 * time.Hour,
},
},
Attributes: applicationserver.EndDeviceAttributesStorageConfig{
Timeout: 5 * time.Second,
Cache: applicationserver.EndDeviceAttributesStorageCacheConfig{
Enable: true,
MinRefreshInterval: 15 * time.Minute,
MaxRefreshInterval: 4 * time.Hour,
TTL: 14 * 24 * time.Hour,
},
},
},
Distribution: applicationserver.DistributionConfig{
Timeout: time.Minute,
Expand Down
15 changes: 15 additions & 0 deletions cmd/ttn-lw-stack/commands/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,22 @@ var startCommand = &cobra.Command{
if err != nil {
return shared.ErrInitializeApplicationServer.WithCause(err)
}
if cache := &config.AS.EndDeviceMetadataStorage.Attributes.Cache; cache.Enable {
switch config.Cache.Service {
case "redis":
cache.Cache = &asmetaredis.EndDeviceAttributesCache{
Redis: redis.New(config.Cache.Redis.WithNamespace("as", "metadata", "attributes")),
}
default:
cache.Enable = false
}
}
attributesRegistry, err := config.AS.EndDeviceMetadataStorage.Attributes.NewRegistry(ctx, c)
if err != nil {
return shared.ErrInitializeApplicationServer.WithCause(err)
}
config.AS.EndDeviceMetadataStorage.Location.Registry = locationRegistry
config.AS.EndDeviceMetadataStorage.Attributes.Registry = attributesRegistry
as, err := applicationserver.New(c, &config.AS)
if err != nil {
return shared.ErrInitializeApplicationServer.WithCause(err)
Expand Down
18 changes: 10 additions & 8 deletions pkg/applicationserver/applicationserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type ApplicationServer struct {
linkRegistry LinkRegistry
deviceRegistry DeviceRegistry
locationRegistry metadata.EndDeviceLocationRegistry
attributesRegistry metadata.EndDeviceAttributesRegistry
formatters messageprocessors.MapPayloadProcessor
webhooks ioweb.Webhooks
webhookTemplates ioweb.TemplateStore
Expand Down Expand Up @@ -156,14 +157,15 @@ func New(c *component.Component, conf *Config) (as *ApplicationServer, err error
}

as = &ApplicationServer{
Component: c,
ctx: ctx,
config: conf,
linkRegistry: conf.Links,
deviceRegistry: wrapEndDeviceRegistryWithReplacedFields(conf.Devices, replacedEndDeviceFields...),
appPkgRegistry: conf.Packages.Registry,
locationRegistry: conf.EndDeviceMetadataStorage.Location.Registry,
formatters: make(messageprocessors.MapPayloadProcessor),
Component: c,
ctx: ctx,
config: conf,
linkRegistry: conf.Links,
deviceRegistry: wrapEndDeviceRegistryWithReplacedFields(conf.Devices, replacedEndDeviceFields...),
appPkgRegistry: conf.Packages.Registry,
locationRegistry: conf.EndDeviceMetadataStorage.Location.Registry,
attributesRegistry: conf.EndDeviceMetadataStorage.Attributes.Registry,
formatters: make(messageprocessors.MapPayloadProcessor),
clusterDistributor: distribution.NewPubSubDistributor(
ctx,
c,
Expand Down
40 changes: 38 additions & 2 deletions pkg/applicationserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,14 @@ type EndDeviceFetcherCircuitBreakerConfig struct {

// EndDeviceMetadataStorageConfig represents the configuration of end device metadata operations.
type EndDeviceMetadataStorageConfig struct {
Location EndDeviceLocationStorageConfig `name:"location"`
Location EndDeviceLocationStorageConfig `name:"location"`
Attributes EndDeviceAttributesStorageConfig `name:"attributes"`
}

// EndDeviceLocationStorageConfig represents the configuration of end device locations storage.
type EndDeviceLocationStorageConfig struct {
Registry metadata.EndDeviceLocationRegistry `name:"-"`
Timeout time.Duration `name:"timeout" description:"Timeout of the end device retrival operation"`
Timeout time.Duration `name:"timeout" description:"Timeout of the end device retrieval operation"`
Cache EndDeviceLocationStorageCacheConfig `name:"cache"`
}

Expand All @@ -89,6 +90,22 @@ type EndDeviceLocationStorageCacheConfig struct {
TTL time.Duration `name:"eviction-ttl" description:"Time to live of cached locations"`
}

// EndDeviceAttributesStorageConfig represents the configuration of end device attributes storage.
type EndDeviceAttributesStorageConfig struct {
Registry metadata.EndDeviceAttributesRegistry `name:"-"`
Timeout time.Duration `name:"timeout" description:"Timeout of the end device retrieval operation"`
Cache EndDeviceAttributesStorageCacheConfig `name:"cache"`
}

// EndDeviceAttributesStorageCacheConfig represents the configuration of end device attributes registry caching.
type EndDeviceAttributesStorageCacheConfig struct {
Cache metadata.EndDeviceAttributesCache `name:"-"`
Enable bool `name:"enable" description:"Enable caching of end device attributes"`
MinRefreshInterval time.Duration `name:"min-refresh-interval" description:"Minimum time interval between two asynchronous refreshes"`
MaxRefreshInterval time.Duration `name:"max-refresh-interval" description:"Maximum time interval between two asynchronous refreshes"`
TTL time.Duration `name:"eviction-ttl" description:"Time to live of cached attributes"`
}

// FormattersConfig represents the configuration for payload formatters.
type FormattersConfig struct {
MaxParameterLength int `name:"max-parameter-length" description:"Maximum allowed size for length of formatter parameters (payload formatter scripts)"`
Expand Down Expand Up @@ -334,6 +351,25 @@ func (c EndDeviceLocationStorageConfig) NewRegistry(ctx context.Context, comp *c
return registry, nil
}

// NewRegistry returns a new end device attributes registry based on the configuration.
func (c EndDeviceAttributesStorageConfig) NewRegistry(ctx context.Context, comp *component.Component) (metadata.EndDeviceAttributesRegistry, error) {
if c.Timeout <= 0 {
return nil, errInvalidTimeout.WithAttributes("timeout", c.Timeout)
}
registry := metadata.NewClusterEndDeviceAttributesRegistry(comp, c.Timeout)
registry = metadata.NewMetricsEndDeviceAttributesRegistry(registry)
if c.Cache.Enable {
for _, ttl := range []time.Duration{c.Cache.MinRefreshInterval, c.Cache.MaxRefreshInterval, c.Cache.TTL} {
if ttl <= 0 {
return nil, errInvalidTTL.WithAttributes("ttl", ttl)
}
}
cache := metadata.NewMetricsEndDeviceAttributesCache(c.Cache.Cache)
registry = metadata.NewCachedEndDeviceAttributesRegistry(ctx, comp, registry, cache, c.Cache.MinRefreshInterval, c.Cache.MaxRefreshInterval, c.Cache.TTL)
}
return registry, nil
}

// LastSeenConfig defines configuration for the device last seen map which stores timestamps for batch updates.
type LastSeenConfig struct {
BatchSize int `name:"batch-size" description:"Maximum number of end device last seen timestamps to store for batch update"`
Expand Down

0 comments on commit 6890128

Please sign in to comment.