Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dns processor - Add A, AAAA, and TXT query support #36394

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- When running under Elastic-Agent the status is now reported per Unit instead of the whole Beat {issue}35874[35874] {pull}36183[36183]
- Add warning message to SysV init scripts for RPM-based systems that lack `/etc/rc.d/init.d/functions`. {issue}35708[35708] {pull}36188[36188]
- Mark `translate_sid` processor is GA. {issue}36279[36279] {pull}36280[36280]
- dns processor: Add support for forward lookups (`A`, `AAAA`, and `TXT`). {issue}11416[11416] {pull}36394[36394]

*Auditbeat*

Expand Down
68 changes: 34 additions & 34 deletions libbeat/processors/dns/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,38 +24,38 @@ import (
"github.com/elastic/elastic-agent-libs/monitoring"
)

type ptrRecord struct {
host string
type successRecord struct {
data []string
expires time.Time
}

func (r ptrRecord) IsExpired(now time.Time) bool {
func (r successRecord) IsExpired(now time.Time) bool {
return now.After(r.expires)
}

type ptrCache struct {
type successCache struct {
sync.RWMutex
data map[string]ptrRecord
data map[string]successRecord
maxSize int
minSuccessTTL time.Duration
}

func (c *ptrCache) set(now time.Time, key string, ptr *PTR) {
func (c *successCache) set(now time.Time, key string, result *result) {
c.Lock()
defer c.Unlock()

if len(c.data) >= c.maxSize {
c.evict()
}

c.data[key] = ptrRecord{
host: ptr.Host,
expires: now.Add(time.Duration(ptr.TTL) * time.Second),
c.data[key] = successRecord{
data: result.Data,
expires: now.Add(time.Duration(result.TTL) * time.Second),
}
}

// evict removes a single random key from the cache.
func (c *ptrCache) evict() {
func (c *successCache) evict() {
var key string
for k := range c.data {
key = k
Expand All @@ -64,13 +64,13 @@ func (c *ptrCache) evict() {
delete(c.data, key)
}

func (c *ptrCache) get(now time.Time, key string) *PTR {
func (c *successCache) get(now time.Time, key string) *result {
c.RLock()
defer c.RUnlock()

r, found := c.data[key]
if found && !r.IsExpired(now) {
return &PTR{r.host, uint32(r.expires.Sub(now) / time.Second)}
return &result{r.data, uint32(r.expires.Sub(now) / time.Second)}
}
return nil
}
Expand Down Expand Up @@ -132,13 +132,13 @@ type cachedError struct {
func (ce *cachedError) Error() string { return ce.err.Error() + " (from failure cache)" }
func (ce *cachedError) Cause() error { return ce.err }

// PTRLookupCache is a cache for storing and retrieving the results of
// reverse DNS queries. It caches the results of queries regardless of their
// lookupCache is a cache for storing and retrieving the results of
// DNS queries. It caches the results of queries regardless of their
// outcome (success or failure).
type PTRLookupCache struct {
success *ptrCache
type lookupCache struct {
success *successCache
failure *failureCache
resolver PTRResolver
resolver resolver
stats cacheStats
}

Expand All @@ -147,15 +147,15 @@ type cacheStats struct {
Miss *monitoring.Int
}

// NewPTRLookupCache returns a new cache.
func NewPTRLookupCache(reg *monitoring.Registry, conf CacheConfig, resolver PTRResolver) (*PTRLookupCache, error) {
// newLookupCache returns a new cache.
func newLookupCache(reg *monitoring.Registry, conf cacheConfig, resolver resolver) (*lookupCache, error) {
if err := conf.Validate(); err != nil {
return nil, err
}

c := &PTRLookupCache{
success: &ptrCache{
data: make(map[string]ptrRecord, conf.SuccessCache.InitialCapacity),
c := &lookupCache{
success: &successCache{
data: make(map[string]successRecord, conf.SuccessCache.InitialCapacity),
maxSize: conf.SuccessCache.MaxCapacity,
minSuccessTTL: conf.SuccessCache.MinTTL,
},
Expand All @@ -174,36 +174,36 @@ func NewPTRLookupCache(reg *monitoring.Registry, conf CacheConfig, resolver PTRR
return c, nil
}

// LookupPTR performs a reverse lookup on the given IP address. A cached result
// Lookup performs a lookup on the given query string. A cached result
// will be returned if it is contained in the cache, otherwise a lookup is
// performed.
func (c PTRLookupCache) LookupPTR(ip string) (*PTR, error) {
func (c lookupCache) Lookup(q string, qt queryType) (*result, error) {
now := time.Now()

ptr := c.success.get(now, ip)
if ptr != nil {
r := c.success.get(now, q)
if r != nil {
c.stats.Hit.Inc()
return ptr, nil
return r, nil
}

err := c.failure.get(now, ip)
err := c.failure.get(now, q)
if err != nil {
c.stats.Hit.Inc()
return nil, err
}
c.stats.Miss.Inc()

ptr, err = c.resolver.LookupPTR(ip)
r, err = c.resolver.Lookup(q, qt)
if err != nil {
c.failure.set(now, ip, &cachedError{err})
c.failure.set(now, q, &cachedError{err})
return nil, err
}

// We set the ptr.TTL to the minimum TTL in case it is less than that.
ptr.TTL = max(ptr.TTL, uint32(c.success.minSuccessTTL/time.Second))
// We set the result TTL to the minimum TTL in case it is less than that.
r.TTL = max(r.TTL, uint32(c.success.minSuccessTTL/time.Second))

c.success.set(now, ip, ptr)
return ptr, nil
c.success.set(now, q, r)
return r, nil
}

func max(a, b uint32) uint32 {
Expand Down
52 changes: 26 additions & 26 deletions libbeat/processors/dns/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,85 +29,85 @@ import (

type stubResolver struct{}

func (r *stubResolver) LookupPTR(ip string) (*PTR, error) {
func (r *stubResolver) Lookup(ip string, _ queryType) (*result, error) {
switch ip {
case gatewayIP:
return &PTR{Host: gatewayName, TTL: gatewayTTL}, nil
return &result{Data: []string{gatewayName}, TTL: gatewayTTL}, nil
case gatewayIP + "1":
return nil, io.ErrUnexpectedEOF
case gatewayIP + "2":
return &PTR{Host: gatewayName, TTL: 0}, nil
return &result{Data: []string{gatewayName}, TTL: 0}, nil
}
return nil, &dnsError{"fake lookup returned NXDOMAIN"}
}

func TestCache(t *testing.T) {
c, err := NewPTRLookupCache(
c, err := newLookupCache(
monitoring.NewRegistry(),
defaultConfig.CacheConfig,
defaultConfig().cacheConfig,
&stubResolver{})
if err != nil {
t.Fatal(err)
}

// Initial success query.
ptr, err := c.LookupPTR(gatewayIP)
r, err := c.Lookup(gatewayIP, typePTR)
if assert.NoError(t, err) {
assert.EqualValues(t, gatewayName, ptr.Host)
assert.EqualValues(t, gatewayTTL, ptr.TTL)
assert.EqualValues(t, []string{gatewayName}, r.Data)
assert.EqualValues(t, gatewayTTL, r.TTL)
assert.EqualValues(t, 0, c.stats.Hit.Get())
assert.EqualValues(t, 1, c.stats.Miss.Get())
}

// Cached success query.
ptr, err = c.LookupPTR(gatewayIP)
r, err = c.Lookup(gatewayIP, typePTR)
if assert.NoError(t, err) {
assert.EqualValues(t, gatewayName, ptr.Host)
assert.EqualValues(t, []string{gatewayName}, r.Data)
// TTL counts down while in cache.
assert.InDelta(t, gatewayTTL, ptr.TTL, 1)
assert.InDelta(t, gatewayTTL, r.TTL, 1)
assert.EqualValues(t, 1, c.stats.Hit.Get())
assert.EqualValues(t, 1, c.stats.Miss.Get())
}

// Initial failure query (like a dns error response code).
ptr, err = c.LookupPTR(gatewayIP + "0")
r, err = c.Lookup(gatewayIP+"0", typePTR)
if assert.Error(t, err) {
assert.Nil(t, ptr)
assert.Nil(t, r)
assert.EqualValues(t, 1, c.stats.Hit.Get())
assert.EqualValues(t, 2, c.stats.Miss.Get())
}

// Cached failure query.
ptr, err = c.LookupPTR(gatewayIP + "0")
r, err = c.Lookup(gatewayIP+"0", typePTR)
if assert.Error(t, err) {
assert.Nil(t, ptr)
assert.Nil(t, r)
assert.EqualValues(t, 2, c.stats.Hit.Get())
assert.EqualValues(t, 2, c.stats.Miss.Get())
}

// Initial network failure (like I/O timeout).
ptr, err = c.LookupPTR(gatewayIP + "1")
r, err = c.Lookup(gatewayIP+"1", typePTR)
if assert.Error(t, err) {
assert.Nil(t, ptr)
assert.Nil(t, r)
assert.EqualValues(t, 2, c.stats.Hit.Get())
assert.EqualValues(t, 3, c.stats.Miss.Get())
}

// Check for a cache hit for the network failure.
ptr, err = c.LookupPTR(gatewayIP + "1")
r, err = c.Lookup(gatewayIP+"1", typePTR)
if assert.Error(t, err) {
assert.Nil(t, ptr)
assert.Nil(t, r)
assert.EqualValues(t, 3, c.stats.Hit.Get())
assert.EqualValues(t, 3, c.stats.Miss.Get()) // Cache miss.
}

minTTL := defaultConfig.CacheConfig.SuccessCache.MinTTL
minTTL := defaultConfig().cacheConfig.SuccessCache.MinTTL
// Initial success returned TTL=0 with MinTTL.
ptr, err = c.LookupPTR(gatewayIP + "2")
r, err = c.Lookup(gatewayIP+"2", typePTR)
if assert.NoError(t, err) {
assert.EqualValues(t, gatewayName, ptr.Host)
assert.EqualValues(t, []string{gatewayName}, r.Data)

assert.EqualValues(t, minTTL/time.Second, ptr.TTL)
assert.EqualValues(t, minTTL/time.Second, r.TTL)
assert.EqualValues(t, 3, c.stats.Hit.Get())
assert.EqualValues(t, 4, c.stats.Miss.Get())

Expand All @@ -117,11 +117,11 @@ func TestCache(t *testing.T) {
}

// Cached success from a previous TTL=0 response.
ptr, err = c.LookupPTR(gatewayIP + "2")
r, err = c.Lookup(gatewayIP+"2", typePTR)
if assert.NoError(t, err) {
assert.EqualValues(t, gatewayName, ptr.Host)
assert.EqualValues(t, []string{gatewayName}, r.Data)
// TTL counts down while in cache.
assert.InDelta(t, minTTL/time.Second, ptr.TTL, 1)
assert.InDelta(t, minTTL/time.Second, r.TTL, 1)
assert.EqualValues(t, 4, c.stats.Hit.Get())
assert.EqualValues(t, 4, c.stats.Miss.Get())
}
Expand Down
Loading