Skip to content

Commit

Permalink
Fix Filestream store GC, entries are now removed when they TTL expires (
Browse files Browse the repository at this point in the history
elastic#38488)

The resources from Filestream registry have a counter to indicate how many 'owners' have got a hold of that resource, this counter was not correctly decremented. Because it never reached zero, no entry was ever removed from the in-memory store and even though the store GC would run periodically, no resource could be removed. That caused the in-memory store to be ever growing and the `op: remove` never to be seen in the registry log file.

This commit fixes this bug by correctly calling Released in every resource that is retained.
  • Loading branch information
belimawr authored Apr 9, 2024
1 parent 5b0b682 commit 8efb366
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Rename `activity_guid` to `activity_id` in ETW input events to suit other Windows inputs. {pull}38530[38530]
- Add missing provider registration and fix published entity for Active Directory entityanalytics provider. {pull}38645[38645]
- Fix handling of un-parsed JSON in O365 module. {issue}37800[37800] {pull}38709[38709]
- Fix filestream's registry GC: registry entries are now removed from the in-memory and disk store when they're older than the set TTL {issue}36761[36761] {pull}38488[38488]

*Heartbeat*

Expand Down
4 changes: 4 additions & 0 deletions filebeat/input/filestream/internal/input-logfile/clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ func gcStore(log *logp.Logger, started time.Time, store *store) {
if err := gcClean(store, keys); err != nil {
log.Errorf("Failed to remove all entries from the registry: %+v", err)
}

// The main reason for this log entry is to enable tests that want to observe
// if the resources are correctly removed from the store.
log.Debugf("%d entries removed", len(keys))
}

// gcFind searches the store of resources that can be removed. A set of keys to delete is returned.
Expand Down
2 changes: 2 additions & 0 deletions filebeat/input/filestream/internal/input-logfile/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ func (s *store) updateMetadata(key string, meta interface{}) error {
resource.cursorMeta = meta

s.writeState(resource)
resource.Release()
return nil
}

Expand Down Expand Up @@ -384,6 +385,7 @@ func (s *store) remove(key string) error {
return fmt.Errorf("resource '%s' not found", key)
}
s.UpdateTTL(resource, 0)
resource.Release()
return nil
}

Expand Down
145 changes: 145 additions & 0 deletions filebeat/tests/integration/store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//go:build integration

package integration

import (
"bufio"
"encoding/json"
"fmt"
"os"
"path"
"path/filepath"
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/tests/integration"
)

var testStoreCfg = `
filebeat.inputs:
- type: filestream
id: test-clean-removed
enabled: true
clean_removed: true
close.on_state_change.inactive: 8s
ignore_older: 9s
prospector.scanner.check_interval: 1s
paths:
- %s
filebeat.registry:
cleanup_interval: 5s
flush: 1s
queue.mem:
flush.min_events: 8
flush.timeout: 0.1s
path.home: %s
output.file:
path: ${path.home}
filename: "output-file"
rotate_every_kb: 10000
logging:
level: debug
selectors:
- input
- input.filestream
`

func TestStore(t *testing.T) {
numLogFiles := 10
filebeat := integration.NewBeat(
t,
"filebeat",
"../../filebeat.test",
)
tempDir := filebeat.TempDir()

// 1. Create some log files and write data to them
logsFolder := filepath.Join(tempDir, "logs")
if err := os.MkdirAll(logsFolder, 0755); err != nil {
t.Fatalf("could not create logs folder '%s': %s", logsFolder, err)
}

for i := 0; i < numLogFiles; i++ {
logFile := path.Join(logsFolder, fmt.Sprintf("log-%d.log", i))
integration.GenerateLogFile(t, logFile, 10, false)
}
logsFolderGlob := filepath.Join(logsFolder, "*")
filebeat.WriteConfigFile(fmt.Sprintf(testStoreCfg, logsFolderGlob, tempDir))

// 2. Ingest the file and stop Filebeat
filebeat.Start()

for i := 0; i < numLogFiles; i++ {
// Files can be ingested out of order, so we cannot specify their path.
// There will be more than one log line per file, but that at least gives us
// some assurance the files were read
filebeat.WaitForLogs("Closing reader of filestream", 30*time.Second, "Filebeat did not finish reading the log file")
}

// 3. Remove files so their state can be cleaned
if err := os.RemoveAll(logsFolder); err != nil {
t.Fatalf("could not remove logs folder '%s': %s", logsFolder, err)
}
filebeat.WaitForLogs(fmt.Sprintf("%d entries removed", numLogFiles), 30*time.Second, "store entries not removed")
filebeat.Stop()

registryLogFile := filepath.Join(tempDir, "data/registry/filebeat/log.json")
readFilestreamRegistryLog(t, registryLogFile, "remove", 10)
}

func readFilestreamRegistryLog(t *testing.T, path, op string, expectedCount int) {
file, err := os.Open(path)
if err != nil {
t.Fatalf("could not open file '%s': %s", path, err)
}

s := bufio.NewScanner(file)
count := 0
for s.Scan() {
line := s.Bytes()

registryOp := struct {
Op string `json:"op"`
ID int `json:"id"`
}{}

if err := json.Unmarshal(line, &registryOp); err != nil {
t.Fatalf("could not read line '%s': %s", string(line), err)
}

// Skips registry log entries that are not operation count
if registryOp.Op == "" {
continue
}

if registryOp.Op == op {
count++
}
}

if count != expectedCount {
t.Errorf("expecting %d '%s' operations, got %d instead", expectedCount, op, count)
}
}
39 changes: 37 additions & 2 deletions libbeat/tests/integration/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -115,6 +114,7 @@ func NewBeat(t *testing.T, beatName, binary string, args ...string) *BeatProc {
"--path.logs", tempDir,
"-E", "logging.to_files=true",
"-E", "logging.files.rotateeverybytes=104857600", // About 100MB
"-E", "logging.files.rotateonstartup=false",
}, args...),
tempDir: tempDir,
beatName: beatName,
Expand Down Expand Up @@ -524,7 +524,7 @@ func (b *BeatProc) LoadMeta() (Meta, error) {
}
defer metaFile.Close()

metaBytes, err := ioutil.ReadAll(metaFile)
metaBytes, err := io.ReadAll(metaFile)
require.NoError(b.t, err, "error reading meta file")
err = json.Unmarshal(metaBytes, &m)
require.NoError(b.t, err, "error unmarshalling meta data")
Expand Down Expand Up @@ -685,3 +685,38 @@ func readLastNBytes(filename string, numBytes int64) ([]byte, error) {
}
return io.ReadAll(f)
}

// GenerateLogFile writes count lines to path, each line is 50 bytes.
// Each line contains the current time (RFC3339) and a counter
func GenerateLogFile(t *testing.T, path string, count int, append bool) {
var file *os.File
var err error
if !append {
file, err = os.Create(path)
if err != nil {
t.Fatalf("could not create file '%s': %s", path, err)
}
} else {
file, err = os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666)
if err != nil {
t.Fatalf("could not open or create file: '%s': %s", path, err)
}
}

defer func() {
if err := file.Close(); err != nil {
t.Fatalf("could not close file: %s", err)
}
}()
defer func() {
if err := file.Sync(); err != nil {
t.Fatalf("could not sync file: %s", err)
}
}()
now := time.Now().Format(time.RFC3339)
for i := 0; i < count; i++ {
if _, err := fmt.Fprintf(file, "%s %13d\n", now, i); err != nil {
t.Fatalf("could not write line %d to file: %s", count+1, err)
}
}
}

0 comments on commit 8efb366

Please sign in to comment.