Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libbeat/processors/cache: add file-backed cache #36686

Merged
merged 5 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Add device handling to Okta API package for entity analytics. {pull}35980[35980]
- Make Filebeat HTTPJSON input process responses sequentially. {pull}36493[36493]
- Add initial infrastructure for a caching enrichment processor. {pull}36619[36619]
- Add file-backed cache for cache enrichment processor. {pull}36686[36686]

==== Deprecated

Expand Down
33 changes: 16 additions & 17 deletions libbeat/processors/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"errors"
"fmt"
"os"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -29,6 +30,7 @@ import (
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/paths"
)

const name = "cache"
Expand Down Expand Up @@ -64,15 +66,15 @@ func New(cfg *conf.C) (beat.Processor, error) {
if err != nil {
return nil, fmt.Errorf("failed to unpack the %s configuration: %w", name, err)
}
src, cancel, err := getStoreFor(config)
if err != nil {
return nil, fmt.Errorf("failed to get the store for %s: %w", name, err)
}

// Logging (each processor instance has a unique ID).
id := int(instanceID.Inc())
log := logp.NewLogger(name).With("instance_id", id)

src, cancel, err := getStoreFor(config, log)
if err != nil {
return nil, fmt.Errorf("failed to get the store for %s: %w", name, err)
}

p := &cache{
config: config,
store: src,
Expand All @@ -87,16 +89,18 @@ func New(cfg *conf.C) (beat.Processor, error) {
// and a context cancellation that releases the cache resource when it
// is no longer required. The cancellation should be called when the
// processor is closed.
func getStoreFor(cfg config) (Store, context.CancelFunc, error) {
func getStoreFor(cfg config, log *logp.Logger) (Store, context.CancelFunc, error) {
switch {
case cfg.Store.Memory != nil:
s, cancel := memStores.get(cfg.Store.Memory.ID, cfg)
return s, cancel, nil

case cfg.Store.File != nil:
logp.L().Warn("using memory store when file is configured")
// TODO: Replace place-holder code with a file-store.
s, cancel := fileStores.get(cfg.Store.File.ID, cfg)
err := os.MkdirAll(paths.Resolve(paths.Data, "cache_processor"), 0o700)
if err != nil {
return nil, noop, fmt.Errorf("cache processor could not create store directory: %w", err)
}
s, cancel := fileStores.get(cfg.Store.File.ID, cfg, log)
return s, cancel, nil

default:
Expand All @@ -105,11 +109,6 @@ func getStoreFor(cfg config) (Store, context.CancelFunc, error) {
}
}

var (
memStores = memStoreSet{stores: map[string]*memStore{}, typ: "memory"}
fileStores = memStoreSet{stores: map[string]*memStore{}, typ: "file"} // This is a temporary mock.
)

// noop is a no-op context.CancelFunc.
func noop() {}

Expand All @@ -126,9 +125,9 @@ type Store interface {
}

type CacheEntry struct {
key string
value any
expires time.Time
Key string `json:"key"`
Value any `json:"val"`
Expires time.Time `json:"expires"`
index int
}

Expand Down
24 changes: 12 additions & 12 deletions libbeat/processors/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: nil,
},
Expand Down Expand Up @@ -191,7 +191,7 @@
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: nil,
},
Expand All @@ -210,7 +210,7 @@
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: nil,
},
Expand Down Expand Up @@ -271,7 +271,7 @@
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: nil,
},
Expand All @@ -290,7 +290,7 @@
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: nil,
},
Expand Down Expand Up @@ -351,7 +351,7 @@
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: nil,
},
Expand Down Expand Up @@ -379,7 +379,7 @@
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: errors.New("error applying cache get processor: target field 'crowdstrike.metadata_new' already exists and overwrite_keys is false"),
},
Expand Down Expand Up @@ -441,7 +441,7 @@
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: nil,
},
Expand All @@ -465,7 +465,7 @@
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: nil,
},
Expand Down Expand Up @@ -527,7 +527,7 @@
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: nil,
},
Expand All @@ -547,7 +547,7 @@
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: errors.New("error applying cache get processor: expected map but type is string"),
},
Expand All @@ -561,7 +561,7 @@
}

func TestCache(t *testing.T) {
logp.TestingSetup(logp.WithSelectors(name))

Check failure on line 564 in libbeat/processors/cache/cache_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Error return value of `logp.TestingSetup` is not checked (errcheck)
for _, test := range cacheTests {
t.Run(test.name, func(t *testing.T) {
var processors []beat.Processor
Expand Down Expand Up @@ -613,7 +613,7 @@
switch got := p.(*cache).store.(type) {
case *memStore:
allow := cmp.AllowUnexported(CacheEntry{})
ignore := cmpopts.IgnoreFields(CacheEntry{}, "expires", "index")
ignore := cmpopts.IgnoreFields(CacheEntry{}, "Expires", "index")
if !cmp.Equal(step.wantCacheVal, got.cache, allow, ignore) {
t.Errorf("unexpected cache state result %d:\n--- want\n+++ got\n%s", i, cmp.Diff(step.wantCacheVal, got.cache, allow, ignore))
}
Expand Down
11 changes: 8 additions & 3 deletions libbeat/processors/cache/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,24 @@ func defaultConfig() config {
}

type storeConfig struct {
Memory *id `config:"memory"`
File *id `config:"file"`
Memory *memConfig `config:"memory"`
File *fileConfig `config:"file"`

// Capacity and Effort are currently experimental
// and not in public-facing documentation.
Capacity int `config:"capacity"`
Effort int `config:"eviction_effort"`
}

type id struct {
type memConfig struct {
ID string `config:"id"`
}

type fileConfig struct {
ID string `config:"id"`
WriteOutEvery time.Duration `config:"write_frequency"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to look over the rest of the code base to see if there is any consistency for naming this type of setting. My initial thought is that this is an "interval" rather than a "frequency", because an interval is a specified as duration between events.

}

func (cfg *storeConfig) Validate() error {
switch {
case cfg.Memory != nil && cfg.File != nil:
Expand Down
14 changes: 14 additions & 0 deletions libbeat/processors/cache/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,20 @@ put:
ttl: 168h
key_field: crowdstrike.aid
value_field: crowdstrike.metadata
`,
want: nil,
},
{
name: "put_file_with_periodic_write_out",
cfg: `
backend:
file:
id: aidmaster
write_frequency: 15m
put:
ttl: 168h
key_field: crowdstrike.aid
value_field: crowdstrike.metadata
`,
want: nil,
},
Expand Down
5 changes: 3 additions & 2 deletions libbeat/processors/cache/docs/cache.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ It has the following settings:

One of `backend.memory.id` or `backend.file.id` must be provided.

`backend.memory.id`:: The ID of a memory based cache. Use the same ID across instance to reference the same cache.
`backend.file.id`:: The ID of a file based cache. Use the same ID across instance to reference the same cache.
`backend.memory.id`:: The ID of a memory-based cache. Use the same ID across instance to reference the same cache.
`backend.file.id`:: The ID of a file-based cache. Use the same ID across instance to reference the same cache.
`backend.file.write_frequency`:: The frequency the cache is periodically written to the backing file. Valid time units are h, m, s, ms, us/µs and ns. Periodic writes are only made if `backend.file.write_frequency` is greater than zero. The contents are always written out to the backing file when the processor is closed. Default is zero, no periodic writes.

One of `put`, `get` or `delete` must be provided.

Expand Down
Loading
Loading