Skip to content

Commit

Permalink
improve logging
Browse files Browse the repository at this point in the history
  • Loading branch information
efd6 committed Sep 20, 2023
1 parent 29a97e8 commit 7c9f7eb
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 14 deletions.
37 changes: 26 additions & 11 deletions libbeat/processors/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,26 @@ func New(cfg *conf.C) (beat.Processor, error) {
id := int(instanceID.Inc())
log := logp.NewLogger(name).With("instance_id", id)

p := cache{
p := &cache{
config: config,
store: src,
cancel: cancel,
log: log,
}
return &p, nil
p.log.Infow("initialized cache processor", "details", p)
return p, nil
}

// Store is the interface implemented by metadata providers.
type Store interface {
Put(key string, val any) error
Get(key string) (any, error)
Delete(key string) error

// The string returned from the String method should
// be the backing store ID. Either "file:<id>" or
// "memory:<id>".
fmt.Stringer
}

type CacheEntry struct {
Expand All @@ -112,13 +118,13 @@ func getStoreFor(cfg config) (Store, context.CancelFunc, error) {
defer storeMu.Unlock()
switch {
case cfg.Store.Memory != nil:
s, cancel := getMemStore(memStores, cfg.Store.Memory.ID, cfg)
s, cancel := getMemStore(memStores, cfg.Store.Memory.ID, cfg, "memory")
return s, cancel, nil

case cfg.Store.File != nil:
logp.L().Warn("using memory store when file is configured")
// TODO: Replace place-holder code with a file-store.
s, cancel := getMemStore(fileStores, cfg.Store.File.ID, cfg)
s, cancel := getMemStore(fileStores, cfg.Store.File.ID, cfg, "file")
return s, cancel, nil

default:
Expand All @@ -130,10 +136,12 @@ func getStoreFor(cfg config) (Store, context.CancelFunc, error) {
// noop is a no-op context.CancelFunc.
func noop() {}

func getMemStore(stores map[string]*memStore, id string, cfg config) (*memStore, context.CancelFunc) {
// TODO: Remove the typ parameter when a file-backed store is available
// and each type knows who they are.
func getMemStore(stores map[string]*memStore, id string, cfg config, typ string) (*memStore, context.CancelFunc) {
s, ok := stores[id]
if !ok {
s = newMemStore(cfg, id)
s = newMemStore(cfg, id, typ)
stores[s.id] = s
}

Expand All @@ -155,6 +163,7 @@ func getMemStore(stores map[string]*memStore, id string, cfg config) (*memStore,
func (p *cache) Run(event *beat.Event) (*beat.Event, error) {
switch {
case p.config.Put != nil:
p.log.Debugw("put", "backend_id", p.store, "config", p.config.Put)
err := p.putFrom(event)
if err != nil {
switch {
Expand All @@ -169,6 +178,7 @@ func (p *cache) Run(event *beat.Event) (*beat.Event, error) {
return event, nil

case p.config.Get != nil:
p.log.Debugw("get", "backend_id", p.store, "config", p.config.Get)
result, err := p.getFor(event)
if err != nil {
switch {
Expand All @@ -187,6 +197,7 @@ func (p *cache) Run(event *beat.Event) (*beat.Event, error) {
return event, ErrNoMatch

case p.config.Delete != nil:
p.log.Debugw("delete", "backend_id", p.store, "config", p.config.Delete)
err := p.deleteFor(event)
if err != nil {
return event, fmt.Errorf("error applying %s delete processor: %w", name, err)
Expand All @@ -210,10 +221,13 @@ func (p *cache) putFrom(event *beat.Event) error {
if !ok {
return fmt.Errorf("key field '%s' not a string: %T", p.config.Put.Key, k)
}
p.log.Debugw("put", "backend_id", p.store, "key", key)

val, err := event.GetValue(p.config.Put.Value)
if err != nil {
return err
}

err = p.store.Put(key, val)
if err != nil {
return fmt.Errorf("failed to put '%s' into '%s': %w", key, p.config.Put.Value, err)
Expand Down Expand Up @@ -242,6 +256,7 @@ func (p *cache) getFor(event *beat.Event) (result *beat.Event, err error) {
if !ok {
return nil, fmt.Errorf("key field '%s' not a string: %T", key, v)
}
p.log.Debugw("get", "backend_id", p.store, "key", k)

// Get metadata...
meta, err := p.store.Get(k)
Expand Down Expand Up @@ -289,13 +304,13 @@ func (p *cache) Close() error {
func (p *cache) String() string {
switch {
case p.config.Put != nil:
return fmt.Sprintf("%s=[operation=put, key_field=%s, value_field=%s, ttl=%v, ignore_missing=%t, overwrite_fields=%t]",
name, p.config.Put.Key, p.config.Put.Value, p.config.Put.TTL, p.config.IgnoreMissing, p.config.OverwriteKeys)
return fmt.Sprintf("%s=[operation=put, store_id=%s, key_field=%s, value_field=%s, ttl=%v, ignore_missing=%t, overwrite_fields=%t]",
name, p.store, p.config.Put.Key, p.config.Put.Value, p.config.Put.TTL, p.config.IgnoreMissing, p.config.OverwriteKeys)
case p.config.Get != nil:
return fmt.Sprintf("%s=[operation=get, key_field=%s, target_field=%s, ignore_missing=%t, overwrite_fields=%t]",
name, p.config.Get.Key, p.config.Get.Target, p.config.IgnoreMissing, p.config.OverwriteKeys)
return fmt.Sprintf("%s=[operation=get, store_id=%s, key_field=%s, target_field=%s, ignore_missing=%t, overwrite_fields=%t]",
name, p.store, p.config.Get.Key, p.config.Get.Target, p.config.IgnoreMissing, p.config.OverwriteKeys)
case p.config.Delete != nil:
return fmt.Sprintf("%s=[operation=delete, key_field=%s]", name, p.config.Delete.Key)
return fmt.Sprintf("%s=[operation=delete, store_id=%s, key_field=%s]", name, p.store, p.config.Delete.Key)
default:
return fmt.Sprintf("%s=[operation=invalid]", name)
}
Expand Down
9 changes: 8 additions & 1 deletion libbeat/processors/cache/mem_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ type memStore struct {

// id is the index into global cache store for the cache.
id string
// typ is a temporary tag to differentiate memory stores
// from the mocked file store.
// TODO: Remove when a file-backed store exists.
typ string

// cap is the maximum number of elements the cache
// will hold. If not positive, no limit.
Expand All @@ -45,9 +49,10 @@ type memStore struct {
// newMemStore returns a new memStore configured to apply the give TTL duration.
// The memStore is guaranteed not to grow larger than cap elements. id is the
// look-up into the global cache store the memStore is held in.
func newMemStore(cfg config, id string) *memStore {
func newMemStore(cfg config, id, typ string) *memStore {
return &memStore{
id: id,
typ: typ,
cache: make(map[string]*CacheEntry),

// Mark the ttl as invalid until we have had a put operation
Expand All @@ -58,6 +63,8 @@ func newMemStore(cfg config, id string) *memStore {
}
}

func (c *memStore) String() string { return c.typ + ":" + c.id }

// setPutOptions allows concurrency-safe updating of the put options. While the shared
// backing data store is incomplete, and has no put operation defined, the TTL
// will be invalid, but will never be accessed since all time operations outside
Expand Down
4 changes: 2 additions & 2 deletions libbeat/processors/cache/mem_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,14 +335,14 @@ var memStoreTests = []struct {

func TestMemStore(t *testing.T) {
allow := cmp.AllowUnexported(memStore{}, CacheEntry{})
ignoreInMemStore := cmpopts.IgnoreFields(memStore{}, "mu")
ignoreInMemStore := cmpopts.IgnoreFields(memStore{}, "mu", "typ")
ignoreInCacheEntry := cmpopts.IgnoreFields(CacheEntry{}, "expires")

for _, test := range memStoreTests {
t.Run(test.name, func(t *testing.T) {
// Construct the store and put in into the stores map as
// we would if we were calling Run.
store := newMemStore(test.cfg, test.cfg.Store.Memory.ID)
store := newMemStore(test.cfg, test.cfg.Store.Memory.ID, "memory")
store.setPutOptions(test.cfg)
storeMu.Lock()
memStores[store.id] = store
Expand Down

0 comments on commit 7c9f7eb

Please sign in to comment.