Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[libbeat] Cache processor docs and memory fixes. (backport #38561) #39001

Merged
merged 2 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]

*Affecting all Beats*

- Change cache processor documentation from `write_period` to `write_interval`. {pull}38561[38561]
- Fix cache processor expiries heap cleanup on partial file writes. {pull}38561[38561]
- Fix cache processor expiries infinite growth when large a large TTL is used and recurring keys are cached. {pull}38561[38561]

*Auditbeat*

Expand Down
2 changes: 1 addition & 1 deletion libbeat/processors/cache/docs/cache.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ One of `backend.memory.id` or `backend.file.id` must be provided.
`backend.capacity`:: The number of elements that can be stored in the cache. `put` operations that would cause the capacity to be exceeded will result in evictions of the oldest elements. Values at or below zero indicate no limit. The capacity should not be lower than the number of elements that are expected to be referenced when processing the input as evicted elements are lost. The default is `0`, no limit.
`backend.memory.id`:: The ID of a memory-based cache. Use the same ID across instance to reference the same cache.
`backend.file.id`:: The ID of a file-based cache. Use the same ID across instance to reference the same cache.
`backend.file.write_period`:: The interval between periodic cache writes to the backing file. Valid time units are h, m, s, ms, us/µs and ns. Periodic writes are only made if `backend.file.write_period` is greater than zero. The contents are always written out to the backing file when the processor is closed. Default is zero, no periodic writes.
`backend.file.write_interval`:: The interval between periodic cache writes to the backing file. Valid time units are h, m, s, ms, us/µs and ns. Periodic writes are only made if `backend.file.write_interval` is greater than zero. The contents are always written out to the backing file when the processor is closed. Default is zero, no periodic writes.

One of `put`, `get` or `delete` must be provided.

Expand Down
4 changes: 2 additions & 2 deletions libbeat/processors/cache/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@
var e CacheEntry
err = dec.Decode(&e)
if err != nil {
if err != io.EOF {

Check failure on line 193 in libbeat/processors/cache/file_store.go

View workflow job for this annotation

GitHub Actions / lint (windows)

comparing with != will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
switch err := err.(type) {

Check failure on line 194 in libbeat/processors/cache/file_store.go

View workflow job for this annotation

GitHub Actions / lint (windows)

type switch on error will fail on wrapped errors. Use errors.As to check for specific errors (errorlint)
case *json.SyntaxError:
c.log.Errorw("failed to read state element", "error", err, "path", c.path, "offset", err.Offset)
default:
Expand Down Expand Up @@ -283,8 +283,8 @@
enc := json.NewEncoder(f)
enc.SetEscapeHTML(false)
now := time.Now()
for c.expiries.Len() != 0 {
e := c.expiries.pop()
for i := 0; i < c.expiries.Len(); i++ {
e := c.expiries[i]
if e.Expires.Before(now) {
// Don't write expired elements.
continue
Expand Down
103 changes: 103 additions & 0 deletions libbeat/processors/cache/file_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,109 @@
{Key: "three", Value: 3.0},
},
},
{
name: "periodic_write",
cfg: config{
Store: &storeConfig{
File: &fileConfig{ID: "test"},
Capacity: 1000,
Effort: 10,
},
Get: &getConfig{},
},
want: &fileStore{path: "testdata/periodic_write", memStore: memStore{
id: "test",
cache: map[string]*CacheEntry{},
refs: 1,
// TTL, capacity and effort are set only by put.
ttl: -1,
cap: -1,
effort: -1,
}},
steps: []fileStoreTestSteps{
0: {
doTo: func(s *fileStore) error {
putCfg := config{
Store: &storeConfig{
File: &fileConfig{ID: "test"},
Capacity: 1000,
Effort: 10,
},
Put: &putConfig{
TTL: ptrTo(time.Second),
},
}
s.add(putCfg)
return nil
},
want: &fileStore{path: "testdata/periodic_write", memStore: memStore{
id: "test",
cache: map[string]*CacheEntry{},
refs: 2,
dirty: false,
ttl: time.Second,
cap: 1000,
effort: 10,
}},
},
1: {
doTo: func(s *fileStore) error {
s.Put("one", 1)
s.Put("two", 2)
s.Put("three", 3)
return nil
},
want: &fileStore{path: "testdata/periodic_write", memStore: memStore{
id: "test",
cache: map[string]*CacheEntry{
"one": {Key: "one", Value: int(1), index: 0},
"two": {Key: "two", Value: int(2), index: 1},
"three": {Key: "three", Value: int(3), index: 2},
},
expiries: expiryHeap{
{Key: "one", Value: int(1), index: 0},
{Key: "two", Value: int(2), index: 1},
{Key: "three", Value: int(3), index: 2},
},
refs: 2,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
}},
},
2: {
doTo: func(s *fileStore) error {
s.writeState(false)
return nil
},
want: &fileStore{path: "testdata/periodic_write", memStore: memStore{
id: "test",
cache: map[string]*CacheEntry{
"one": {Key: "one", Value: int(1), index: 0},
"two": {Key: "two", Value: int(2), index: 1},
"three": {Key: "three", Value: int(3), index: 2},
},
expiries: expiryHeap{
{Key: "one", Value: int(1), index: 0},
{Key: "two", Value: int(2), index: 1},
{Key: "three", Value: int(3), index: 2},
},
refs: 2,
dirty: false,
ttl: time.Second,
cap: 1000,
effort: 10,
}},
},
},
wantPersisted: []*CacheEntry{
// Numeric values are float due to JSON round-trip.
{Key: "one", Value: 1.0},
{Key: "two", Value: 2.0},
{Key: "three", Value: 3.0},
},
},
}

func TestFileStore(t *testing.T) {
Expand Down Expand Up @@ -410,7 +513,7 @@
var e CacheEntry
err = dec.Decode(&e)
if err != nil {
if err != io.EOF {

Check failure on line 516 in libbeat/processors/cache/file_store_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

comparing with != will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
t.Fatalf("unexpected error reading persisted cache data: %v", err)
}
break
Expand Down
20 changes: 14 additions & 6 deletions libbeat/processors/cache/mem_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,21 @@ func (c *memStore) Put(key string, val any) error {
defer c.mu.Unlock()
now := time.Now()
c.evictExpired(now)
e := &CacheEntry{
Key: key,
Value: val,
Expires: now.Add(c.ttl),
// If the key is being overwritten we remove its previous expiry entry
// this will prevent expiries heap to grow with large TTLs and recurring keys.
if prev, found := c.cache[key]; found {
prev.Value = val
prev.Expires = now.Add(c.ttl)
heap.Fix(&c.expiries, prev.index)
} else {
e := &CacheEntry{
Key: key,
Value: val,
Expires: now.Add(c.ttl),
}
c.cache[key] = e
heap.Push(&c.expiries, e)
}
c.cache[key] = e
heap.Push(&c.expiries, e)
c.dirty = true
return nil
}
Expand Down
97 changes: 97 additions & 0 deletions libbeat/processors/cache/mem_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,103 @@ var memStoreTests = []struct {
},
},
},
{
name: "re-hit",
cfg: config{
Store: &storeConfig{
Memory: &memConfig{"test"},
Capacity: 1000,
Effort: 10,
},
Get: &getConfig{},
},
want: &memStore{
id: "test",
cache: map[string]*CacheEntry{},
refs: 1,
// TTL, capacity and effort are set only by put.
ttl: -1,
cap: -1,
effort: -1,
},
steps: []memStoreTestSteps{
0: {
doTo: func(s *memStore) error {
putCfg := config{
Store: &storeConfig{
Memory: &memConfig{"test"},
Capacity: 1000,
Effort: 10,
},
Put: &putConfig{
TTL: ptrTo(10 * time.Minute),
},
}
s.add(putCfg)
return nil
},
want: &memStore{
id: "test",
cache: map[string]*CacheEntry{},
refs: 2,
dirty: false,
ttl: 10 * time.Minute,
cap: 1000,
effort: 10,
},
},
1: {
doTo: func(s *memStore) error {
s.Put("one", 1)
s.Put("two", 2)
s.Put("three", 3)
return nil
},
want: &memStore{
id: "test",
cache: map[string]*CacheEntry{
"one": {Key: "one", Value: int(1), index: 0},
"two": {Key: "two", Value: int(2), index: 1},
"three": {Key: "three", Value: int(3), index: 2},
},
expiries: expiryHeap{
{Key: "one", Value: int(1), index: 0},
{Key: "two", Value: int(2), index: 1},
{Key: "three", Value: int(3), index: 2},
},
refs: 2,
dirty: true,
ttl: 10 * time.Minute,
cap: 1000,
effort: 10,
},
},
2: {
doTo: func(s *memStore) error {
s.Put("one", 1)
return nil
},
want: &memStore{
id: "test",
cache: map[string]*CacheEntry{
"one": {Key: "one", Value: int(1), index: 1},
"two": {Key: "two", Value: int(2), index: 0},
"three": {Key: "three", Value: int(3), index: 2},
},
expiries: expiryHeap{
{Key: "two", Value: int(2), index: 0},
{Key: "one", Value: int(1), index: 1},
{Key: "three", Value: int(3), index: 2},
},
refs: 2,
dirty: true,
ttl: 10 * time.Minute,
cap: 1000,
effort: 10,
},
},
},
},
}

func TestMemStore(t *testing.T) {
Expand Down
Loading