Skip to content

Commit

Permalink
auditbeat: Add a cached file hasher for auditbeat (#41952)
Browse files Browse the repository at this point in the history
This implements a LRU cache on top of the FileHasher from hasher.go, it will be
used in the new backend for the system process module on linux.

The cache is indexed by file path and stores the metadata (what we get from
stat(2)/statx(2)) along with the hashes of each file.

When we want to hash a file: we stat() the file, then do cache lookup and
compare against the stored metadata, if it differs, we rehash, if not we use the
cached values.

The cache ignores access time (atime), it's only interested in write
modifications, if the machine doesn't support statx(2) it falls back to stat(2)
but uses the same Unix.Statx_t.

With this we end up with a stat() + lookup on the hotpath, and a stat() + stat()
+ insert on the cold path.

The motivation for this is that the new backend ends up fetching "all
processes", which in turn causes it to try to hash at every event, the
current/old hasher just can't cope with it:
 1. Hashing for each event is simply to expensive, in the 100us-50ms range on the
    default configuration, which puts us below 1000/s.
 2. It has a scan rate throttling that on the default configuration ends easily
    at 40ms per event (25/s).

With the cache things improve considerably, we stay below 5us (200k/s) in all
cases:

```
MISSES
"miss (/usr/sbin/sshd) took 2.571359ms"
"miss (/usr/bin/containerd) took 52.099386ms"
"miss (/usr/sbin/gssproxy) took 160us"
"miss (/usr/sbin/atd) took 50.032us"
HITS
"hit (/usr/sbin/sshd) took 2.163us"
"hit (/usr/lib/systemd/systemd) took 3.024us"
"hit (/usr/lib/systemd/systemd) took 859ns"
"hit (/usr/sbin/sshd) took 805ns"
```
  • Loading branch information
haesbaert authored Dec 11, 2024
1 parent fb93eee commit 8ec2e31
Show file tree
Hide file tree
Showing 3 changed files with 373 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- auditd: Use ECS `event.type: end` instead of `stop` for SERVICE_STOP, DAEMON_ABORT, and DAEMON_END messages. {pull}41558[41558]
- auditd: Update syscall names for Linux 6.11. {pull}41558[41558]
- hasher: Geneneral improvements and fixes. {pull}41863[41863]
- hasher: Add a cached hasher for upcoming backend. {pull}41952[41952]

*Filebeat*

Expand Down
221 changes: 221 additions & 0 deletions auditbeat/helper/hasher/cached_hasher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//go:build linux

package hasher

import (
"errors"
"time"

lru "github.com/hashicorp/golang-lru/v2"
"golang.org/x/sys/unix"

"github.com/elastic/elastic-agent-libs/logp"
)

// CachedHasher is a metadata aware FileHasher with a LRU cache on top of it.
type CachedHasher struct {
hasher *FileHasher
hashLRU *lru.Cache[string, hashEntry]
hasStatx bool
stats CachedHasherStats
log *logp.Logger
}

// CachedHasherStats are basics statistics for debugging and testing.
type CachedHasherStats struct {
Hits uint64
Misses uint64
Invalidations uint64
Evictions uint64
}

// hashEntry is an entry in the LRU cache.
type hashEntry struct {
statx unix.Statx_t
hashes map[HashType]Digest
}

// NewFileHasherWithCache creates a CachedHasher with space up to size elements.
func NewFileHasherWithCache(c Config, size int) (*CachedHasher, error) {
// We don't rate limit our hashes, we cache
c.ScanRateBytesPerSec = 0
hasher, err := NewFileHasher(c, nil)
if err != nil {
return nil, err
}
hashLRU, err := lru.New[string, hashEntry](size)
if err != nil {
return nil, err
}
var nada unix.Statx_t
hasStatx := unix.Statx(-1, "/", 0, unix.STATX_ALL|unix.STATX_MNT_ID, &nada) != unix.ENOSYS

return &CachedHasher{
hasher: hasher,
hashLRU: hashLRU,
hasStatx: hasStatx,
log: logp.NewLogger("cached_hasher"),
}, nil
}

// HashFile looks up a hashEntry in the cache, if the lookup fails,
// the hash is computed, inserted in the cache, and returned. If the
// lookup succeeds but the file metadata changed, the entry is evicted
// and refreshed.
func (ch *CachedHasher) HashFile(path string) (map[HashType]Digest, error) {
var x time.Time
if logp.IsDebug("cached_hasher") {
x = time.Now()
}

// See if we have it stored
if entry, ok := ch.hashLRU.Get(path); ok {
statx, err := ch.statxFromPath(path)
if err != nil {
// No point in keeping an entry if we can't compare
if !ch.hashLRU.Remove(path) {
err := errors.New("can't remove existing entry, this is a bug")
ch.log.Error(err)
}
return nil, err
}
// If metadata didn't change, this is a good entry, if not fall and rehash
if statx == entry.statx {
ch.log.Debugf("hit (%s) took %v", path, time.Since(x))
ch.stats.Hits++
return entry.hashes, nil
}
// Zap from lru
if !ch.hashLRU.Remove(path) {
err := errors.New("can't remove existing entry, this is a bug")
ch.log.Error(err)
return nil, err
} else {
ch.stats.Invalidations++
ch.log.Debugf("invalidate (%s)", path)
}
}
// Nah, so do the hard work
hashes, err := ch.hasher.HashFile(path)
if err != nil {
return nil, err
}
// Fetch metadata
statx, err := ch.statxFromPath(path)
if err != nil {
return nil, err
}
// Insert
entry := hashEntry{hashes: hashes, statx: statx}
if ch.hashLRU.Add(path, entry) {
ch.stats.Evictions++
ch.log.Debugf("evict (%s)")
}

ch.log.Debugf("miss (%s) took %v", path, time.Since(x))
ch.stats.Misses++

return entry.hashes, nil
}

// Close releases all resources
func (ch *CachedHasher) Close() {
ch.hashLRU.Purge()
}

// Stats returns basic stats suitable for debugging and testing
func (ch *CachedHasher) Stats() CachedHasherStats {
return ch.stats
}

// statxFromPath returns the metadata (unix.Statx_t) of path. In case
// the system doesn't support statx(2), it uses stat(2) and fills the
// corresponding members of unix.Statx_t, leaving the remaining members
// with a zero value.
func (ch *CachedHasher) statxFromPath(path string) (unix.Statx_t, error) {
if ch.hasStatx {
var tmpstx unix.Statx_t
err := unix.Statx(-1, path, 0, unix.STATX_ALL|unix.STATX_MNT_ID, &tmpstx)
if err != nil {
return unix.Statx_t{}, err
}

// This might look stupid, but it guarantees we only compare
// the members we are really interested, unix.Statx_t grows
// with time, so if they ever add a member that changes all
// the time, we don't introduce a bug where we compare things
// we don't want to.
return unix.Statx_t{
Mask: tmpstx.Mask,
Blksize: tmpstx.Blksize,
Attributes: tmpstx.Attributes,
Nlink: tmpstx.Nlink,
Uid: tmpstx.Uid,
Gid: tmpstx.Gid,
Mode: tmpstx.Mode,
Ino: tmpstx.Ino,
Size: tmpstx.Size,
Blocks: tmpstx.Blocks,
Attributes_mask: tmpstx.Attributes_mask,
Btime: tmpstx.Btime,
Ctime: tmpstx.Ctime,
Mtime: tmpstx.Mtime,
Rdev_minor: tmpstx.Rdev_minor,
Rdev_major: tmpstx.Rdev_major,
// no Atime
// no Dio_mem_align
// no Dio_offset_align
// no Subvol
// no Atomic_write_unit_min
// no Atomic_write_unit_max
// no Atomic_write_segments_max
}, nil
}

// No statx(2), fallback to stat(2)
var st unix.Stat_t
if err := unix.Stat(path, &st); err != nil {
return unix.Statx_t{}, err
}

return unix.Statx_t{
Dev_major: unix.Major(st.Dev),
Dev_minor: unix.Minor(st.Dev),
Ino: st.Ino,
Nlink: uint32(st.Nlink),
Mode: uint16(st.Mode),
Uid: st.Uid,
Gid: st.Gid,
Rdev_major: unix.Major(st.Rdev),
Rdev_minor: unix.Minor(st.Rdev),
Size: uint64(st.Size),
Blksize: uint32(st.Blksize),
Blocks: uint64(st.Blocks),
Mtime: unix.StatxTimestamp{
Nsec: uint32(st.Mtim.Nsec),
Sec: st.Mtim.Sec,
},
Ctime: unix.StatxTimestamp{
Nsec: uint32(st.Ctim.Nsec),
Sec: st.Ctim.Sec,
},
// no Atime
}, nil
}
151 changes: 151 additions & 0 deletions auditbeat/helper/hasher/cached_hasher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//go:build linux

package hasher

import (
"io"
"os"
"testing"
"time"

"github.com/stretchr/testify/require"
)

type pattern struct {
text []byte
md5 string
sha1 string
sha256 string
sha512 string
sha3_384 string
}

var patternA = pattern{
text: []byte("Rather than love, than money, than fame, give me truth.\n"),
md5: "572698a28f439d3c2647c67df75ed22f",
sha1: "511c4040962d493ba9cb2c0748137c11e42eb46b",
sha256: "19c76b22dd0bf97b0bf064e6587961938ba9f4ab73d034b0edac6c2c2829c0cd",
sha512: "e339322ed81208f930047e8b94db504f40a3e8bb2af75511925e3469488104edcd8eb8c613ea7fd0b08199a4d7061690512a05f66b50b4427470d6c8cf2d74a3",
sha3_384: "9961640983a079920f74f2503feb5ce63325d6a6cd0138905e9419c4307043fa324217587062ac8648cbf43138a33034",
}

var patternB = pattern{
text: []byte("From womb to tomb, in kindness and crime.\n"),
md5: "e3d72a80f13b9c1e4b07a7182b934502",
sha1: "90da69d7b93ef792e8e4506543506975018df980",
sha256: "67606f88f25357b2b101e94bd02fc5da8dd2993391b88596c15bea77780a6a77",
sha512: "23c3779d7c6a8d4be2ca7a0bf412a2c99ea2f8a95ac21f56e3b9cb1bd0c0427bf2db91bbb484128f53ef48fbbfc97e525b328e1c4c0f8d24dd8a3f438c449736",
sha3_384: "2034d02ad7b46831b9f2bf09b2eaa77bfcf70ebd136f29b95e6723cc6bf94d0fb7aae972dd2297b5507bb568cb65563b",
}

var config = Config{
HashTypes: []HashType{MD5, SHA1, SHA256, SHA512, SHA3_384},
MaxFileSize: "1 KiB",
MaxFileSizeBytes: 1024,
}

func TestCachedHasher(t *testing.T) {
ch, err := NewFileHasherWithCache(config, 1)
require.NoError(t, err)
doTestCachedHasher(t, ch)
}

func TestCachedHasherWithStat(t *testing.T) {
ch, err := NewFileHasherWithCache(config, 1)
require.NoError(t, err)
ch.hasStatx = false
doTestCachedHasher(t, ch)
}

func doTestCachedHasher(t *testing.T, ch *CachedHasher) {
// Create a file
file := mkTemp(t)
defer file.Close()

// Write patternA and confirm first hash is a miss
writePattern(t, file, patternA)
ch.checkState(t, file.Name(), patternA, CachedHasherStats{Misses: 1})

// Prove a subsequent hash hits the cache
ch.checkState(t, file.Name(), patternA, CachedHasherStats{Misses: 1, Hits: 1})

// Prove changing access time still causes a hit.
// Note: we can't use os.Chtimes() to change _only_ atime, it
// might end up modifying mtime since it can round/truncate
// value we would get from file.Stat().ModTime()
time.Sleep(time.Millisecond * 2)
_, err := os.ReadFile(file.Name())
require.NoError(t, err)
ch.checkState(t, file.Name(), patternA, CachedHasherStats{Misses: 1, Hits: 2})

// Prove changing mtime invalides the entry, and causes a miss
ostat, err := file.Stat()
require.NoError(t, err)
mtime := ostat.ModTime().Add(time.Hour)
require.NoError(t, os.Chtimes(file.Name(), mtime, mtime))
ch.checkState(t, file.Name(), patternA, CachedHasherStats{Misses: 2, Hits: 2, Invalidations: 1})

// Write the second pattern, prove it's a miss
writePattern(t, file, patternB)
ch.checkState(t, file.Name(), patternB, CachedHasherStats{Misses: 3, Hits: 2, Invalidations: 2})

// Hash something else, prove first one is evicted
file2 := mkTemp(t)
defer file2.Close()
writePattern(t, file2, patternA)
ch.checkState(t, file2.Name(), patternA, CachedHasherStats{Misses: 4, Hits: 2, Invalidations: 2, Evictions: 1})

// If we go back and lookup the original path, prove we should evict again and it's a miss
ch.checkState(t, file.Name(), patternB, CachedHasherStats{Misses: 5, Hits: 2, Invalidations: 2, Evictions: 2})

// If we close, prove we purge
require.Equal(t, ch.hashLRU.Len(), 1)
ch.Close()
require.Equal(t, ch.hashLRU.Len(), 0)
}

func mkTemp(t *testing.T) *os.File {
file, err := os.CreateTemp(t.TempDir(), "cached_hasher_test_*")
require.NoError(t, err)

return file
}

func writePattern(t *testing.T, file *os.File, p pattern) {
err := file.Truncate(0)
require.NoError(t, err)
_, err = file.Seek(0, io.SeekStart)
require.NoError(t, err)
n, err := file.Write(p.text)
require.NoError(t, err)
require.Equal(t, n, len(p.text))
}

func (ch *CachedHasher) checkState(t *testing.T, path string, p pattern, stats CachedHasherStats) {
hashes, err := ch.HashFile(path)
require.NoError(t, err)
require.Len(t, hashes, 5)
require.Equal(t, p.md5, hashes["md5"].String())
require.Equal(t, p.sha1, hashes["sha1"].String())
require.Equal(t, p.sha256, hashes["sha256"].String())
require.Equal(t, p.sha512, hashes["sha512"].String())
require.Equal(t, p.sha3_384, hashes["sha3_384"].String())
require.Equal(t, stats, ch.Stats())
}

0 comments on commit 8ec2e31

Please sign in to comment.