Skip to content

Commit

Permalink
address pr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
efd6 committed Sep 20, 2023
1 parent 7c9f7eb commit 73bfeda
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 77 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ CHANGELOG*
/libbeat/ @elastic/elastic-agent-data-plane
/libbeat/docs/processors-list.asciidoc @elastic/ingest-docs
/libbeat/management @elastic/elastic-agent-control-plane
/libbeat/processors/cache/ @elastic/security-external-integrations
/libbeat/processors/community_id/ @elastic/security-external-integrations
/libbeat/processors/decode_xml/ @elastic/security-external-integrations
/libbeat/processors/decode_xml_wineventlog/ @elastic/security-external-integrations
Expand Down
73 changes: 23 additions & 50 deletions libbeat/processors/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -84,47 +83,20 @@ func New(cfg *conf.C) (beat.Processor, error) {
return p, nil
}

// Store is the interface implemented by metadata providers.
type Store interface {
Put(key string, val any) error
Get(key string) (any, error)
Delete(key string) error

// The string returned from the String method should
// be the backing store ID. Either "file:<id>" or
// "memory:<id>".
fmt.Stringer
}

type CacheEntry struct {
key string
value any
expires time.Time
index int
}

var (
storeMu sync.Mutex
memStores = map[string]*memStore{}
fileStores = map[string]*memStore{}
)

// getStoreFor returns a backing store for the provided configuration,
// and a context cancellation that releases the cache resource when it
// is no longer required. The cancellation should be called when the
// processor is closed.
func getStoreFor(cfg config) (Store, context.CancelFunc, error) {
storeMu.Lock()
defer storeMu.Unlock()
switch {
case cfg.Store.Memory != nil:
s, cancel := getMemStore(memStores, cfg.Store.Memory.ID, cfg, "memory")
s, cancel := memStores.get(cfg.Store.Memory.ID, cfg)
return s, cancel, nil

case cfg.Store.File != nil:
logp.L().Warn("using memory store when file is configured")
// TODO: Replace place-holder code with a file-store.
s, cancel := getMemStore(fileStores, cfg.Store.File.ID, cfg, "file")
s, cancel := fileStores.get(cfg.Store.File.ID, cfg)
return s, cancel, nil

default:
Expand All @@ -133,30 +105,31 @@ func getStoreFor(cfg config) (Store, context.CancelFunc, error) {
}
}

var (
memStores = memStoreSet{stores: map[string]*memStore{}, typ: "memory"}
fileStores = memStoreSet{stores: map[string]*memStore{}, typ: "file"} // This is a temporary mock.
)

// noop is a no-op context.CancelFunc.
func noop() {}

// TODO: Remove the typ parameter when a file-backed store is available
// and each type knows who they are.
func getMemStore(stores map[string]*memStore, id string, cfg config, typ string) (*memStore, context.CancelFunc) {
s, ok := stores[id]
if !ok {
s = newMemStore(cfg, id, typ)
stores[s.id] = s
}
// Store is the interface implemented by metadata providers.
type Store interface {
Put(key string, val any) error
Get(key string) (any, error)
Delete(key string) error

// We may have already constructed the store with
// a get or a delete config, so set the TTL, cap
// and effort if we have a put config. If another
// put config has already been included, we ignore
// the put options now.
s.setPutOptions(cfg)

return s, func() {
storeMu.Lock()
s.close(stores)
storeMu.Unlock()
}
// The string returned from the String method should
// be the backing store ID. Either "file:<id>" or
// "memory:<id>".
fmt.Stringer
}

type CacheEntry struct {
key string
value any
expires time.Time
index int
}

// Run enriches the given event with the host metadata.
Expand Down
74 changes: 61 additions & 13 deletions libbeat/processors/cache/mem_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,46 @@ package cache

import (
"container/heap"
"context"
"sync"
"time"
)

// memStoreSet is a collection of shared memStore caches.
type memStoreSet struct {
mu sync.Mutex
stores map[string]*memStore
typ string // TODO: Remove when a file-backed store exists.
}

// get returns a memStore cache with the provided ID based on the config.
// If a memStore with the ID already exist, its configuration is adjusted
// and its reference count is increased. The returned context.CancelFunc
// reduces the reference count and deletes the memStore from set if the
// count reaches zero.
func (s *memStoreSet) get(id string, cfg config) (*memStore, context.CancelFunc) {
s.mu.Lock()
defer s.mu.Unlock()
store, ok := s.stores[id]
if !ok {
store = newMemStore(cfg, id, s.typ)
s.stores[store.id] = store
}
store.add(cfg)

return store, func() {
store.dropFrom(s)
}
}

// free removes the memStore with the given ID from the set. free is safe
// for concurrent use.
func (s *memStoreSet) free(id string) {
s.mu.Lock()
delete(s.stores, id)
s.mu.Unlock()
}

// memStore is a memory-backed cache store.
type memStore struct {
mu sync.Mutex
Expand Down Expand Up @@ -55,8 +91,12 @@ func newMemStore(cfg config, id, typ string) *memStore {
typ: typ,
cache: make(map[string]*CacheEntry),

// Mark the ttl as invalid until we have had a put operation
// configured.
// Mark the ttl as invalid until we have had a put
// operation configured. While the shared backing
// data store is incomplete, and has no put operation
// defined, the TTL will be invalid, but will never
// be accessed since all time operations outside put
// refer to absolute times, held by the CacheEntry.
ttl: -1,
cap: -1,
effort: -1,
Expand All @@ -65,15 +105,20 @@ func newMemStore(cfg config, id, typ string) *memStore {

func (c *memStore) String() string { return c.typ + ":" + c.id }

// setPutOptions allows concurrency-safe updating of the put options. While the shared
// backing data store is incomplete, and has no put operation defined, the TTL
// will be invalid, but will never be accessed since all time operations outside
// put refer to absolute times. setPutOptions also increases the reference count
// for the memStore for all operation types.
func (c *memStore) setPutOptions(cfg config) {
// add updates a the receiver for a new operation. It increases the reference
// count for the receiver, and if the config is a put operation and has no
// previous put operation defined, the TTL, cap and effort will be set from
// cfg. add is safe for concurrent use.
func (c *memStore) add(cfg config) {
c.mu.Lock()
defer c.mu.Unlock()
c.refs++

// We may have already constructed the store with
// a get or a delete config, so set the TTL, cap
// and effort if we have a put config. If another
// put config has already been included, we ignore
// the put options now.
if cfg.Put == nil {
return
}
Expand All @@ -86,16 +131,16 @@ func (c *memStore) setPutOptions(cfg config) {
}
}

// close decreases the reference count for the memStore and removes it from the
// stores map if the count is zero.
func (c *memStore) close(stores map[string]*memStore) {
// dropFrom decreases the reference count for the memStore and removes it from
// the stores map if the count is zero. dropFrom is safe for concurrent use.
func (c *memStore) dropFrom(stores *memStoreSet) {
c.mu.Lock()
c.refs--
if c.refs < 0 {
panic("invalid reference count")
}
if c.refs == 0 {
delete(stores, c.id)
stores.free(c.id)
// GC assists.
c.cache = nil
c.expiries = nil
Expand All @@ -104,7 +149,8 @@ func (c *memStore) close(stores map[string]*memStore) {
}

// Get return the cached value associated with the provided key. If there is
// no value for the key, or the value has expired Get returns ErrNoData.
// no value for the key, or the value has expired Get returns ErrNoData. Get
// is safe for concurrent use.
func (c *memStore) Get(key string) (any, error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -121,6 +167,7 @@ func (c *memStore) Get(key string) (any, error) {

// Put stores the provided value in the cache associated with the given key.
// The value is given an expiry time based on the configured TTL of the cache.
// Put is safe for concurrent use.
func (c *memStore) Put(key string, val any) error {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down Expand Up @@ -159,6 +206,7 @@ func (c *memStore) evictExpired(now time.Time) {
}

// Delete removes the value associated with the provided key from the cache.
// Delete is safe for concurrent use.
func (c *memStore) Delete(key string) error {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
37 changes: 23 additions & 14 deletions libbeat/processors/cache/mem_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ var memStoreTests = []struct {
TTL: ptrTo(time.Second),
},
}
s.setPutOptions(putCfg)
s.add(putCfg)
return nil
},
want: &memStore{
Expand Down Expand Up @@ -176,7 +176,7 @@ var memStoreTests = []struct {
TTL: ptrTo(time.Second),
},
}
s.setPutOptions(putCfg)
s.add(putCfg)
return nil
},
want: &memStore{
Expand Down Expand Up @@ -285,10 +285,8 @@ var memStoreTests = []struct {
},
5: {
doTo: func(s *memStore) error {
storeMu.Lock()
s.close(memStores)
defer storeMu.Unlock()
if _, ok := memStores[s.id]; !ok {
s.dropFrom(&memStores)
if !memStores.has(s.id) {
return fmt.Errorf("%q memStore not found after single close", s.id)
}
return nil
Expand All @@ -311,10 +309,8 @@ var memStoreTests = []struct {
},
6: {
doTo: func(s *memStore) error {
storeMu.Lock()
s.close(memStores)
defer storeMu.Unlock()
if _, ok := memStores[s.id]; ok {
s.dropFrom(&memStores)
if memStores.has(s.id) {
return fmt.Errorf("%q memStore still found after double close", s.id)
}
return nil
Expand Down Expand Up @@ -343,10 +339,8 @@ func TestMemStore(t *testing.T) {
// Construct the store and put in into the stores map as
// we would if we were calling Run.
store := newMemStore(test.cfg, test.cfg.Store.Memory.ID, "memory")
store.setPutOptions(test.cfg)
storeMu.Lock()
memStores[store.id] = store
storeMu.Unlock()
store.add(test.cfg)
memStores.add(store)

if !cmp.Equal(test.want, store, allow, ignoreInMemStore) {
t.Errorf("unexpected new memStore result:\n--- want\n+++ got\n%s",
Expand All @@ -366,4 +360,19 @@ func TestMemStore(t *testing.T) {
}
}

// add adds the store to the set, it is used only for testing.
func (s *memStoreSet) add(store *memStore) {
s.mu.Lock()
s.stores[store.id] = store
s.mu.Unlock()
}

// has returns whether the store exists in the set, it is used only for testing.
func (s *memStoreSet) has(id string) bool {
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.stores[id]
return ok
}

func ptrTo[T any](v T) *T { return &v }

0 comments on commit 73bfeda

Please sign in to comment.