From ed0f82234c5449809b6986cd2a6a497faa5eae7f Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Mon, 21 Aug 2023 10:54:04 -0400 Subject: [PATCH 1/7] dns processor - Add A, AAAA, and TXT query support The dns processor previously supported only reverse DNS lookups. This adds support for performing A, AAAA, and TXT record queries. The response.ptr.histogram metric was renamed to request_duration.histogram. This naming allows the metric to represent the duration of the DNS request for all query types. Closes #11416 --- libbeat/processors/dns/cache.go | 68 +++++++------- libbeat/processors/dns/cache_test.go | 52 +++++------ libbeat/processors/dns/config.go | 80 +++++++++++------ libbeat/processors/dns/dns.go | 45 ++++++++-- libbeat/processors/dns/dns_test.go | 14 +-- libbeat/processors/dns/doc.go | 2 +- libbeat/processors/dns/docs/dns.asciidoc | 38 +++++--- libbeat/processors/dns/resolver.go | 107 +++++++++++++++-------- libbeat/processors/dns/resolver_test.go | 16 ++-- 9 files changed, 265 insertions(+), 157 deletions(-) diff --git a/libbeat/processors/dns/cache.go b/libbeat/processors/dns/cache.go index 6e77af028880..b1b9c35cfac4 100644 --- a/libbeat/processors/dns/cache.go +++ b/libbeat/processors/dns/cache.go @@ -24,23 +24,23 @@ import ( "github.com/elastic/elastic-agent-libs/monitoring" ) -type ptrRecord struct { - host string +type successRecord struct { + data []string expires time.Time } -func (r ptrRecord) IsExpired(now time.Time) bool { +func (r successRecord) IsExpired(now time.Time) bool { return now.After(r.expires) } -type ptrCache struct { +type successCache struct { sync.RWMutex - data map[string]ptrRecord + data map[string]successRecord maxSize int minSuccessTTL time.Duration } -func (c *ptrCache) set(now time.Time, key string, ptr *PTR) { +func (c *successCache) set(now time.Time, key string, result *result) { c.Lock() defer c.Unlock() @@ -48,14 +48,14 @@ func (c *ptrCache) set(now time.Time, key string, ptr *PTR) { c.evict() } - c.data[key] = ptrRecord{ - host: ptr.Host, - expires: now.Add(time.Duration(ptr.TTL) * time.Second), + c.data[key] = successRecord{ + data: result.Data, + expires: now.Add(time.Duration(result.TTL) * time.Second), } } // evict removes a single random key from the cache. -func (c *ptrCache) evict() { +func (c *successCache) evict() { var key string for k := range c.data { key = k @@ -64,13 +64,13 @@ func (c *ptrCache) evict() { delete(c.data, key) } -func (c *ptrCache) get(now time.Time, key string) *PTR { +func (c *successCache) get(now time.Time, key string) *result { c.RLock() defer c.RUnlock() r, found := c.data[key] if found && !r.IsExpired(now) { - return &PTR{r.host, uint32(r.expires.Sub(now) / time.Second)} + return &result{r.data, uint32(r.expires.Sub(now) / time.Second)} } return nil } @@ -132,13 +132,13 @@ type cachedError struct { func (ce *cachedError) Error() string { return ce.err.Error() + " (from failure cache)" } func (ce *cachedError) Cause() error { return ce.err } -// PTRLookupCache is a cache for storing and retrieving the results of -// reverse DNS queries. It caches the results of queries regardless of their +// LookupCache is a cache for storing and retrieving the results of +// DNS queries. It caches the results of queries regardless of their // outcome (success or failure). -type PTRLookupCache struct { - success *ptrCache +type LookupCache struct { + success *successCache failure *failureCache - resolver PTRResolver + resolver resolver stats cacheStats } @@ -147,15 +147,15 @@ type cacheStats struct { Miss *monitoring.Int } -// NewPTRLookupCache returns a new cache. -func NewPTRLookupCache(reg *monitoring.Registry, conf CacheConfig, resolver PTRResolver) (*PTRLookupCache, error) { +// NewLookupCache returns a new cache. +func NewLookupCache(reg *monitoring.Registry, conf CacheConfig, resolver resolver) (*LookupCache, error) { if err := conf.Validate(); err != nil { return nil, err } - c := &PTRLookupCache{ - success: &ptrCache{ - data: make(map[string]ptrRecord, conf.SuccessCache.InitialCapacity), + c := &LookupCache{ + success: &successCache{ + data: make(map[string]successRecord, conf.SuccessCache.InitialCapacity), maxSize: conf.SuccessCache.MaxCapacity, minSuccessTTL: conf.SuccessCache.MinTTL, }, @@ -174,36 +174,36 @@ func NewPTRLookupCache(reg *monitoring.Registry, conf CacheConfig, resolver PTRR return c, nil } -// LookupPTR performs a reverse lookup on the given IP address. A cached result +// Lookup performs a lookup on the given query string. A cached result // will be returned if it is contained in the cache, otherwise a lookup is // performed. -func (c PTRLookupCache) LookupPTR(ip string) (*PTR, error) { +func (c LookupCache) Lookup(q string, qt queryType) (*result, error) { now := time.Now() - ptr := c.success.get(now, ip) - if ptr != nil { + r := c.success.get(now, q) + if r != nil { c.stats.Hit.Inc() - return ptr, nil + return r, nil } - err := c.failure.get(now, ip) + err := c.failure.get(now, q) if err != nil { c.stats.Hit.Inc() return nil, err } c.stats.Miss.Inc() - ptr, err = c.resolver.LookupPTR(ip) + r, err = c.resolver.Lookup(q, qt) if err != nil { - c.failure.set(now, ip, &cachedError{err}) + c.failure.set(now, q, &cachedError{err}) return nil, err } - // We set the ptr.TTL to the minimum TTL in case it is less than that. - ptr.TTL = max(ptr.TTL, uint32(c.success.minSuccessTTL/time.Second)) + // We set the result TTL to the minimum TTL in case it is less than that. + r.TTL = max(r.TTL, uint32(c.success.minSuccessTTL/time.Second)) - c.success.set(now, ip, ptr) - return ptr, nil + c.success.set(now, q, r) + return r, nil } func max(a, b uint32) uint32 { diff --git a/libbeat/processors/dns/cache_test.go b/libbeat/processors/dns/cache_test.go index fdc531c54fb7..16e151ed0d7c 100644 --- a/libbeat/processors/dns/cache_test.go +++ b/libbeat/processors/dns/cache_test.go @@ -29,85 +29,85 @@ import ( type stubResolver struct{} -func (r *stubResolver) LookupPTR(ip string) (*PTR, error) { +func (r *stubResolver) Lookup(ip string, _ queryType) (*result, error) { switch ip { case gatewayIP: - return &PTR{Host: gatewayName, TTL: gatewayTTL}, nil + return &result{Data: []string{gatewayName}, TTL: gatewayTTL}, nil case gatewayIP + "1": return nil, io.ErrUnexpectedEOF case gatewayIP + "2": - return &PTR{Host: gatewayName, TTL: 0}, nil + return &result{Data: []string{gatewayName}, TTL: 0}, nil } return nil, &dnsError{"fake lookup returned NXDOMAIN"} } func TestCache(t *testing.T) { - c, err := NewPTRLookupCache( + c, err := NewLookupCache( monitoring.NewRegistry(), - defaultConfig.CacheConfig, + defaultConfig().CacheConfig, &stubResolver{}) if err != nil { t.Fatal(err) } // Initial success query. - ptr, err := c.LookupPTR(gatewayIP) + r, err := c.Lookup(gatewayIP, typePTR) if assert.NoError(t, err) { - assert.EqualValues(t, gatewayName, ptr.Host) - assert.EqualValues(t, gatewayTTL, ptr.TTL) + assert.EqualValues(t, []string{gatewayName}, r.Data) + assert.EqualValues(t, gatewayTTL, r.TTL) assert.EqualValues(t, 0, c.stats.Hit.Get()) assert.EqualValues(t, 1, c.stats.Miss.Get()) } // Cached success query. - ptr, err = c.LookupPTR(gatewayIP) + r, err = c.Lookup(gatewayIP, typePTR) if assert.NoError(t, err) { - assert.EqualValues(t, gatewayName, ptr.Host) + assert.EqualValues(t, []string{gatewayName}, r.Data) // TTL counts down while in cache. - assert.InDelta(t, gatewayTTL, ptr.TTL, 1) + assert.InDelta(t, gatewayTTL, r.TTL, 1) assert.EqualValues(t, 1, c.stats.Hit.Get()) assert.EqualValues(t, 1, c.stats.Miss.Get()) } // Initial failure query (like a dns error response code). - ptr, err = c.LookupPTR(gatewayIP + "0") + r, err = c.Lookup(gatewayIP+"0", typePTR) if assert.Error(t, err) { - assert.Nil(t, ptr) + assert.Nil(t, r) assert.EqualValues(t, 1, c.stats.Hit.Get()) assert.EqualValues(t, 2, c.stats.Miss.Get()) } // Cached failure query. - ptr, err = c.LookupPTR(gatewayIP + "0") + r, err = c.Lookup(gatewayIP+"0", typePTR) if assert.Error(t, err) { - assert.Nil(t, ptr) + assert.Nil(t, r) assert.EqualValues(t, 2, c.stats.Hit.Get()) assert.EqualValues(t, 2, c.stats.Miss.Get()) } // Initial network failure (like I/O timeout). - ptr, err = c.LookupPTR(gatewayIP + "1") + r, err = c.Lookup(gatewayIP+"1", typePTR) if assert.Error(t, err) { - assert.Nil(t, ptr) + assert.Nil(t, r) assert.EqualValues(t, 2, c.stats.Hit.Get()) assert.EqualValues(t, 3, c.stats.Miss.Get()) } // Check for a cache hit for the network failure. - ptr, err = c.LookupPTR(gatewayIP + "1") + r, err = c.Lookup(gatewayIP+"1", typePTR) if assert.Error(t, err) { - assert.Nil(t, ptr) + assert.Nil(t, r) assert.EqualValues(t, 3, c.stats.Hit.Get()) assert.EqualValues(t, 3, c.stats.Miss.Get()) // Cache miss. } - minTTL := defaultConfig.CacheConfig.SuccessCache.MinTTL + minTTL := defaultConfig().CacheConfig.SuccessCache.MinTTL // Initial success returned TTL=0 with MinTTL. - ptr, err = c.LookupPTR(gatewayIP + "2") + r, err = c.Lookup(gatewayIP+"2", typePTR) if assert.NoError(t, err) { - assert.EqualValues(t, gatewayName, ptr.Host) + assert.EqualValues(t, []string{gatewayName}, r.Data) - assert.EqualValues(t, minTTL/time.Second, ptr.TTL) + assert.EqualValues(t, minTTL/time.Second, r.TTL) assert.EqualValues(t, 3, c.stats.Hit.Get()) assert.EqualValues(t, 4, c.stats.Miss.Get()) @@ -117,11 +117,11 @@ func TestCache(t *testing.T) { } // Cached success from a previous TTL=0 response. - ptr, err = c.LookupPTR(gatewayIP + "2") + r, err = c.Lookup(gatewayIP+"2", typePTR) if assert.NoError(t, err) { - assert.EqualValues(t, gatewayName, ptr.Host) + assert.EqualValues(t, []string{gatewayName}, r.Data) // TTL counts down while in cache. - assert.InDelta(t, minTTL/time.Second, ptr.TTL, 1) + assert.InDelta(t, minTTL/time.Second, r.TTL, 1) assert.EqualValues(t, 4, c.stats.Hit.Get()) assert.EqualValues(t, 4, c.stats.Miss.Get()) } diff --git a/libbeat/processors/dns/config.go b/libbeat/processors/dns/config.go index fb8a13eaf216..928cd46cee0a 100644 --- a/libbeat/processors/dns/config.go +++ b/libbeat/processors/dns/config.go @@ -23,6 +23,8 @@ import ( "strings" "time" + "github.com/miekg/dns" + "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -31,7 +33,7 @@ type Config struct { CacheConfig `config:",inline"` Nameservers []string `config:"nameservers"` // Required on Windows. /etc/resolv.conf is used if none are given. Timeout time.Duration `config:"timeout"` // Per request timeout (with 2 nameservers the total timeout would be 2x). - Type string `config:"type" validate:"required"` // Reverse is the only supported type currently. + Type queryType `config:"type" validate:"required"` // Reverse is the only supported type currently. Action FieldAction `config:"action"` // Append or replace (defaults to append) when target exists. TagOnFailure []string `config:"tag_on_failure"` // Tags to append when a failure occurs. Fields mapstr.M `config:"fields"` // Mapping of source fields to target fields. @@ -75,6 +77,41 @@ func (fa *FieldAction) Unpack(v string) error { return nil } +// queryType represents a DNS query type. +type queryType uint16 + +const ( + typePTR = queryType(dns.TypePTR) + typeA = queryType(dns.TypeA) + typeAAAA = queryType(dns.TypeAAAA) + typeTXT = queryType(dns.TypeTXT) +) + +func (qt queryType) String() string { + if name := dns.TypeToString[uint16(qt)]; name != "" { + return name + } + return strconv.FormatUint(uint64(qt), 10) +} + +// Unpack unpacks a string to a queryType. +func (qt *queryType) Unpack(v string) error { + switch strings.ToLower(v) { + case "a": + *qt = typeA + case "aaaa": + *qt = typeAAAA + case "reverse", "ptr": + *qt = typePTR + case "txt": + *qt = typeTXT + default: + return fmt.Errorf("invalid dns lookup type '%v' specified in "+ + "config (valid values are: A, AAAA, PTR, reverse, TXT)", v) + } + return nil +} + // CacheConfig defines the success and failure caching parameters. type CacheConfig struct { SuccessCache CacheSettings `config:"success_cache"` @@ -100,15 +137,6 @@ type CacheSettings struct { // Validate validates the data contained in the config. func (c *Config) Validate() error { - // Validate lookup type. - c.Type = strings.ToLower(c.Type) - switch c.Type { - case "reverse": - default: - return fmt.Errorf("invalid dns lookup type '%v' specified in "+ - "config (valid values are: reverse)", c.Type) - } - // Flatten the mapping of source fields to target fields. c.reverseFlat = map[string]string{} for k, v := range c.Fields.Flatten() { @@ -157,20 +185,22 @@ func (c *CacheConfig) Validate() error { return nil } -var defaultConfig = Config{ - CacheConfig: CacheConfig{ - SuccessCache: CacheSettings{ - MinTTL: time.Minute, - InitialCapacity: 1000, - MaxCapacity: 10000, +func defaultConfig() Config { + return Config{ + CacheConfig: CacheConfig{ + SuccessCache: CacheSettings{ + MinTTL: time.Minute, + InitialCapacity: 1000, + MaxCapacity: 10000, + }, + FailureCache: CacheSettings{ + MinTTL: time.Minute, + TTL: time.Minute, + InitialCapacity: 1000, + MaxCapacity: 10000, + }, }, - FailureCache: CacheSettings{ - MinTTL: time.Minute, - TTL: time.Minute, - InitialCapacity: 1000, - MaxCapacity: 10000, - }, - }, - Transport: "udp", - Timeout: 500 * time.Millisecond, + Transport: "udp", + Timeout: 500 * time.Millisecond, + } } diff --git a/libbeat/processors/dns/dns.go b/libbeat/processors/dns/dns.go index ee8dd918ebc3..154ab3349a49 100644 --- a/libbeat/processors/dns/dns.go +++ b/libbeat/processors/dns/dns.go @@ -45,13 +45,13 @@ func init() { type processor struct { Config - resolver PTRResolver + resolver resolver log *logp.Logger } // New constructs a new DNS processor. func New(cfg *config.C) (beat.Processor, error) { - c := defaultConfig + c := defaultConfig() if err := cfg.Unpack(&c); err != nil { return nil, fmt.Errorf("fail to unpack the dns configuration: %w", err) } @@ -69,7 +69,7 @@ func New(cfg *config.C) (beat.Processor, error) { return nil, err } - cache, err := NewPTRLookupCache(metrics.NewRegistry("cache"), c.CacheConfig, resolver) + cache, err := NewLookupCache(metrics.NewRegistry("cache"), c.CacheConfig, resolver) if err != nil { return nil, err } @@ -95,17 +95,21 @@ func (p *processor) processField(source, target string, action FieldAction, even return nil } - maybeIP, ok := v.(string) + strVal, ok := v.(string) if !ok { return nil } - ptrRecord, err := p.resolver.LookupPTR(maybeIP) + result, err := p.resolver.Lookup(strVal, p.Type) if err != nil { - return fmt.Errorf("reverse lookup of %v value '%v' failed: %w", source, maybeIP, err) + return fmt.Errorf("dns lookup (%v) of %v value '%v' failed: %w", p.Type, source, strVal, err) } - return setFieldValue(action, event, target, ptrRecord.Host) + // PTR lookups return a scalar. All other lookup types return a string slice. + if p.Type == typePTR { + return setFieldValue(action, event, target, result.Data[0]) + } + return setFieldSliceValue(action, event, target, result.Data) } func setFieldValue(action FieldAction, event *beat.Event, key string, value string) error { @@ -129,7 +133,32 @@ func setFieldValue(action FieldAction, event *beat.Event, key string, value stri } return err default: - panic(fmt.Errorf("Unexpected dns field action value encountered: %v", action)) + panic(fmt.Errorf("unexpected dns field action value encountered: %v", action)) + } +} + +func setFieldSliceValue(action FieldAction, event *beat.Event, key string, value []string) error { + switch action { + case ActionReplace: + _, err := event.PutValue(key, value) + return err + case ActionAppend: + old, err := event.PutValue(key, value) + if err != nil { + return err + } + + if old != nil { + switch v := old.(type) { + case string: + _, err = event.PutValue(key, append([]string{v}, value...)) + case []string: + _, err = event.PutValue(key, append(v, value...)) + } + } + return err + default: + panic(fmt.Errorf("unexpected dns field action value encountered: %v", action)) } } diff --git a/libbeat/processors/dns/dns_test.go b/libbeat/processors/dns/dns_test.go index fa3b2d67f41b..f0b0dcf72346 100644 --- a/libbeat/processors/dns/dns_test.go +++ b/libbeat/processors/dns/dns_test.go @@ -32,8 +32,10 @@ import ( ) func TestDNSProcessorRun(t *testing.T) { + c := defaultConfig() + c.Type = typePTR p := &processor{ - Config: defaultConfig, + Config: c, resolver: &stubResolver{}, log: logp.NewLogger(logName), } @@ -94,7 +96,8 @@ func TestDNSProcessorRun(t *testing.T) { }) t.Run("metadata target", func(t *testing.T) { - config := defaultConfig + config := defaultConfig() + config.Type = typePTR config.reverseFlat = map[string]string{ "@metadata.ip": "@metadata.domain", } @@ -121,12 +124,11 @@ func TestDNSProcessorRun(t *testing.T) { assert.Equal(t, expMeta, newEvent.Meta) assert.Equal(t, event.Fields, newEvent.Fields) }) - } func TestDNSProcessorTagOnFailure(t *testing.T) { p := &processor{ - Config: defaultConfig, + Config: defaultConfig(), resolver: &stubResolver{}, log: logp.NewLogger(logName), } @@ -157,9 +159,9 @@ func TestDNSProcessorRunInParallel(t *testing.T) { // This is a simple smoke test to make sure that there are no concurrency // issues. It is most effective when run with the race detector. - conf := defaultConfig + conf := defaultConfig() reg := monitoring.NewRegistry() - cache, err := NewPTRLookupCache(reg, conf.CacheConfig, &stubResolver{}) + cache, err := NewLookupCache(reg, conf.CacheConfig, &stubResolver{}) if err != nil { t.Fatal(err) } diff --git a/libbeat/processors/dns/doc.go b/libbeat/processors/dns/doc.go index 8c895b268000..781d7e5284dd 100644 --- a/libbeat/processors/dns/doc.go +++ b/libbeat/processors/dns/doc.go @@ -16,7 +16,7 @@ // under the License. // Package dns implements a processor that can perform DNS lookups by sending -// a DNS request over UDP to a recursive nameserver. Each instance of the +// a DNS request over UDP or TLS to a recursive nameserver. Each instance of the // processor is independent (no shared cache) so it's best to only define one // instance of the processor. // diff --git a/libbeat/processors/dns/docs/dns.asciidoc b/libbeat/processors/dns/docs/dns.asciidoc index 8d03e8b4c0a4..9350b109a858 100644 --- a/libbeat/processors/dns/docs/dns.asciidoc +++ b/libbeat/processors/dns/docs/dns.asciidoc @@ -5,10 +5,10 @@ dns ++++ -The `dns` processor performs reverse DNS lookups of IP addresses. It caches the -responses that it receives in accordance to the time-to-live (TTL) value -contained in the response. It also caches failures that occur during lookups. -Each instance of this processor maintains its own independent cache. +The `dns` processor performs DNS queries. It caches the responses that it +receives in accordance to the time-to-live (TTL) value contained in the +response. It also caches failures that occur during lookups. Each instance +of this processor maintains its own independent cache. The processor uses its own DNS resolver to send requests to nameservers and does not use the operating system's resolver. It does not read any values contained @@ -24,6 +24,16 @@ throughput you can achieve is 500 events per second (1000 milliseconds / 2 milliseconds). If you have a high cache hit ratio then your throughput can be higher. +The processor can send the following query types: + +- `A` - IPv4 addresses +- `AAAA` - IPv6 addresses +- `TXT` - arbitrary human-readable text data +- `PTR` - reverse IP address lookups + +The output value is a list of strings for all query types except `PTR`. For +`PTR` queries the output value is a string. + This is a minimal configuration example that resolves the IP addresses contained in two fields. @@ -33,8 +43,8 @@ processors: - dns: type: reverse fields: - source.ip: source.hostname - destination.ip: destination.hostname + source.ip: source.domain + destination.ip: destination.domain ---- Next is a configuration example showing all options. @@ -47,8 +57,8 @@ processors: action: append transport: tls fields: - server.ip: server.hostname - client.ip: client.hostname + server.ip: server.domain + client.ip: client.domain success_cache: capacity.initial: 1000 capacity.max: 10000 @@ -64,8 +74,8 @@ processors: The `dns` processor has the following configuration settings: -`type`:: The type of DNS lookup to perform. The only supported type is -`reverse` which queries for a PTR record. +`type`:: The type of DNS query to perform. The supported types are `A`, `AAAA`, +`PTR` (or `reverse`), and `TXT`. `action`:: This defines the behavior of the processor when the target field already exists in the event. The options are `append` (default) and `replace`. @@ -82,8 +92,10 @@ the memory for this number of items. Default value is `1000`. cache can hold. When the maximum capacity is reached a random item is evicted. Default value is `10000`. -`success_cache.min_ttl`:: The duration of the minimum alternative cache TTL for successful DNS responses. Ensures that `TTL=0` successful reverse DNS responses can be cached. -Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". Default value is `1m`. +`success_cache.min_ttl`:: The duration of the minimum alternative cache TTL for +successful DNS responses. Ensures that `TTL=0` successful reverse DNS responses +can be cached. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". +Default value is `1m`. `failure_cache.capacity.initial`:: The initial number of items that the failure cache will be allocated to hold. When initialized the processor will allocate @@ -107,7 +119,7 @@ for each DNS request so if you have 2 nameservers then the total timeout will be "h". Default value is `500ms`. `tag_on_failure`:: A list of tags to add to the event when any lookup fails. The -tags are only added once even if multiple lookups fail. By default no tags are +tags are only added once even if multiple lookups fail. By default, no tags are added upon failure. `transport`:: The type of transport connection that should be used can either be diff --git a/libbeat/processors/dns/resolver.go b/libbeat/processors/dns/resolver.go index 7e0f160315cb..afa5f8395e00 100644 --- a/libbeat/processors/dns/resolver.go +++ b/libbeat/processors/dns/resolver.go @@ -19,6 +19,8 @@ package dns import ( "errors" + "golang.org/x/exp/constraints" + "math" "net" "strconv" "strings" @@ -34,18 +36,18 @@ import ( const etcResolvConf = "/etc/resolv.conf" -// PTR represents a DNS pointer record (IP to hostname). -type PTR struct { - Host string // Hostname. - TTL uint32 // Time to live in seconds. +// result represents a DNS lookup result. +type result struct { + Data []string // Hostname. + TTL uint32 // Time to live in seconds. } -// PTRResolver performs PTR record lookups. -type PTRResolver interface { - LookupPTR(ip string) (*PTR, error) +// resolver performs result record lookups. +type resolver interface { + Lookup(q string, qt queryType) (*result, error) } -// MiekgResolver is a PTRResolver that is implemented using github.com/miekg/dns +// MiekgResolver is a resolver that is implemented using github.com/miekg/dns // to send requests to DNS servers. It does not use the Go resolver. type MiekgResolver struct { client *dns.Client @@ -57,9 +59,9 @@ type MiekgResolver struct { } type nameserverStats struct { - success *monitoring.Int // Number of responses from server. - failure *monitoring.Int // Number of failures (e.g. I/O timeout) (not NXDOMAIN). - ptrResponse metrics.Sample // Histogram of response times. + success *monitoring.Int // Number of responses from server. + failure *monitoring.Int // Number of failures (e.g. I/O timeout) (not NXDOMAIN). + requestDuration metrics.Sample // Histogram of response times. } // NewMiekgResolver returns a new MiekgResolver. It returns an error if no @@ -94,7 +96,7 @@ func NewMiekgResolver(reg *monitoring.Registry, timeout time.Duration, transport } if timeout == 0 { - timeout = defaultConfig.Timeout + timeout = defaultConfig().Timeout } var clientTransferType string @@ -129,37 +131,42 @@ func (e *dnsError) Error() string { return "dns: " + e.err } -// LookupPTR performs a reverse lookup on the given IP address. -func (res *MiekgResolver) LookupPTR(ip string) (*PTR, error) { +// Lookup performs a DNS query. +func (res *MiekgResolver) Lookup(q string, qt queryType) (*result, error) { if len(res.servers) == 0 { return nil, errors.New("no dns servers configured") } - // Create PTR (reverse) DNS request. + // Create DNS request. m := new(dns.Msg) - arpa, err := dns.ReverseAddr(ip) - if err != nil { - return nil, err + switch qt { + case typePTR: + arpa, err := dns.ReverseAddr(q) + if err != nil { + return nil, err + } + m.SetQuestion(arpa, dns.TypePTR) + case typeA, typeAAAA, typeTXT: + m.SetQuestion(dns.Fqdn(q), uint16(qt)) } - m.SetQuestion(arpa, dns.TypePTR) m.RecursionDesired = true // Try the nameservers until we get a response. - var rtnErr error + var nameserverErr error for _, server := range res.servers { stats := res.getOrCreateNameserverStats(server) r, rtt, err := res.client.Exchange(m, server) if err != nil { - // Try next server if any. Otherwise return retErr. - rtnErr = err + // Try next server if any. Otherwise, return retErr. + nameserverErr = err stats.failure.Inc() continue } // We got a response. stats.success.Inc() - stats.ptrResponse.Update(int64(rtt)) + stats.requestDuration.Update(int64(rtt)) if r.Rcode != dns.RcodeSuccess { name, found := dns.RcodeToString[r.Rcode] if !found { @@ -168,24 +175,45 @@ func (res *MiekgResolver) LookupPTR(ip string) (*PTR, error) { return nil, &dnsError{"nameserver " + server + " returned " + name} } + var rtn result + rtn.TTL = math.MaxUint32 for _, a := range r.Answer { - if ptr, ok := a.(*dns.PTR); ok { - return &PTR{ - Host: strings.TrimSuffix(ptr.Ptr, "."), - TTL: ptr.Hdr.Ttl, + // Ignore records that don't match the query type. + if a.Header().Rrtype != uint16(qt) { + continue + } + + switch rr := a.(type) { + case *dns.PTR: + return &result{ + Data: []string{strings.TrimSuffix(rr.Ptr, ".")}, + TTL: rr.Hdr.Ttl, }, nil + case *dns.A: + rtn.Data = append(rtn.Data, rr.A.String()) + rtn.TTL = min(rtn.TTL, rr.Hdr.Ttl) + case *dns.AAAA: + rtn.Data = append(rtn.Data, rr.AAAA.String()) + rtn.TTL = min(rtn.TTL, rr.Hdr.Ttl) + case *dns.TXT: + rtn.Data = append(rtn.Data, rr.Txt...) + rtn.TTL = min(rtn.TTL, rr.Hdr.Ttl) } } - return nil, &dnsError{"no PTR record was found in the response"} + if len(rtn.Data) == 0 { + return nil, &dnsError{"no " + qt.String() + " resource records were found in the response"} + } + + return &rtn, nil } - if rtnErr != nil { - return nil, rtnErr + if nameserverErr != nil { + return nil, nameserverErr } // This should never get here. - panic("LookupPTR should have returned a response.") + panic("dns resolver Lookup() should have returned a response.") } func (res *MiekgResolver) getOrCreateNameserverStats(ns string) *nameserverStats { @@ -212,13 +240,20 @@ func (res *MiekgResolver) getOrCreateNameserverStats(ns string) *nameserverStats // Create stats for the nameserver. reg := res.registry.NewRegistry(strings.Replace(ns, ".", "_", -1)) stats = &nameserverStats{ - success: monitoring.NewInt(reg, "success"), - failure: monitoring.NewInt(reg, "failure"), - ptrResponse: metrics.NewUniformSample(1028), + success: monitoring.NewInt(reg, "success"), + failure: monitoring.NewInt(reg, "failure"), + requestDuration: metrics.NewUniformSample(1028), } - adapter.NewGoMetrics(reg, "response.ptr", adapter.Accept). - Register("histogram", metrics.NewHistogram(stats.ptrResponse)) + adapter.NewGoMetrics(reg, "request_duration", adapter.Accept). + Register("histogram", metrics.NewHistogram(stats.requestDuration)) res.nsStats[ns] = stats return stats } + +func min[T uint32](a, b T) T { + if a < b { + return a + } + return b +} diff --git a/libbeat/processors/dns/resolver_test.go b/libbeat/processors/dns/resolver_test.go index 1e2e56b86282..21c597a0ae82 100644 --- a/libbeat/processors/dns/resolver_test.go +++ b/libbeat/processors/dns/resolver_test.go @@ -29,7 +29,7 @@ import ( "github.com/elastic/elastic-agent-libs/monitoring" ) -var _ PTRResolver = (*MiekgResolver)(nil) +var _ resolver = (*MiekgResolver)(nil) func TestMiekgResolverLookupPTR(t *testing.T) { stop, addr, err := ServeDNS(FakeDNSHandler) @@ -45,15 +45,15 @@ func TestMiekgResolverLookupPTR(t *testing.T) { } // Success - ptr, err := res.LookupPTR("8.8.8.8") + ptr, err := res.Lookup("8.8.8.8", typePTR) if err != nil { t.Fatal(err) } - assert.EqualValues(t, "google-public-dns-a.google.com", ptr.Host) + assert.EqualValues(t, "google-public-dns-a.google.com", ptr.Data[0]) assert.EqualValues(t, 19273, ptr.TTL) // NXDOMAIN - _, err = res.LookupPTR("1.1.1.1") + _, err = res.Lookup("1.1.1.1", typePTR) if assert.Error(t, err) { assert.Contains(t, err.Error(), "NXDOMAIN") } @@ -91,21 +91,21 @@ func TestMiekgResolverLookupPTRTLS(t *testing.T) { if err != nil { t.Fatal(err) } - // we use a self signed certificate for localhost + // we use a self-signed certificate for localhost // we have to pass InsecureSSL to the DNS resolver res.client.TLSConfig = &tls.Config{ InsecureSkipVerify: true, } // Success - ptr, err := res.LookupPTR("8.8.8.8") + ptr, err := res.Lookup("8.8.8.8", typePTR) if err != nil { t.Fatal(err) } - assert.EqualValues(t, "google-public-dns-a.google.com", ptr.Host) + assert.EqualValues(t, "google-public-dns-a.google.com", ptr.Data[0]) assert.EqualValues(t, 19273, ptr.TTL) // NXDOMAIN - _, err = res.LookupPTR("1.1.1.1") + _, err = res.Lookup("1.1.1.1", typePTR) if assert.Error(t, err) { assert.Contains(t, err.Error(), "NXDOMAIN") } From 2b7f43e258ecf7617c8a02b02dfb79be1af9a874 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Tue, 22 Aug 2023 18:03:48 -0400 Subject: [PATCH 2/7] gofumpt -w --extra --- libbeat/processors/dns/dns.go | 2 +- libbeat/processors/dns/resolver.go | 1 - libbeat/processors/dns/resolver_test.go | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/libbeat/processors/dns/dns.go b/libbeat/processors/dns/dns.go index 154ab3349a49..aa8b1c5cc18f 100644 --- a/libbeat/processors/dns/dns.go +++ b/libbeat/processors/dns/dns.go @@ -112,7 +112,7 @@ func (p *processor) processField(source, target string, action FieldAction, even return setFieldSliceValue(action, event, target, result.Data) } -func setFieldValue(action FieldAction, event *beat.Event, key string, value string) error { +func setFieldValue(action FieldAction, event *beat.Event, key, value string) error { switch action { case ActionReplace: _, err := event.PutValue(key, value) diff --git a/libbeat/processors/dns/resolver.go b/libbeat/processors/dns/resolver.go index afa5f8395e00..d1a820455799 100644 --- a/libbeat/processors/dns/resolver.go +++ b/libbeat/processors/dns/resolver.go @@ -19,7 +19,6 @@ package dns import ( "errors" - "golang.org/x/exp/constraints" "math" "net" "strconv" diff --git a/libbeat/processors/dns/resolver_test.go b/libbeat/processors/dns/resolver_test.go index 21c597a0ae82..2fff26ed2f93 100644 --- a/libbeat/processors/dns/resolver_test.go +++ b/libbeat/processors/dns/resolver_test.go @@ -70,7 +70,7 @@ func TestMiekgResolverLookupPTR(t *testing.T) { } func TestMiekgResolverLookupPTRTLS(t *testing.T) { - //Build Cert + // Build Cert cert, err := tls.X509KeyPair(CertPEMBlock, KeyPEMBlock) if err != nil { t.Fatalf("unable to build certificate: %v", err) From 2625e3af2184464972d3733c8de7a608a9c2bc21 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Tue, 22 Aug 2023 15:59:32 -0400 Subject: [PATCH 3/7] fix golangci-lint issues --- libbeat/processors/dns/resolver.go | 2 + libbeat/processors/dns/resolver_test.go | 65 ++++++++++++++++++------- 2 files changed, 50 insertions(+), 17 deletions(-) diff --git a/libbeat/processors/dns/resolver.go b/libbeat/processors/dns/resolver.go index d1a820455799..a0f47cbce1c4 100644 --- a/libbeat/processors/dns/resolver.go +++ b/libbeat/processors/dns/resolver.go @@ -243,6 +243,8 @@ func (res *MiekgResolver) getOrCreateNameserverStats(ns string) *nameserverStats failure: monitoring.NewInt(reg, "failure"), requestDuration: metrics.NewUniformSample(1028), } + + //nolint:errcheck // Register should never fail because this is a new empty registry. adapter.NewGoMetrics(reg, "request_duration", adapter.Accept). Register("histogram", metrics.NewHistogram(stats.requestDuration)) res.nsStats[ns] = stats diff --git a/libbeat/processors/dns/resolver_test.go b/libbeat/processors/dns/resolver_test.go index 2fff26ed2f93..3e618ea9e75c 100644 --- a/libbeat/processors/dns/resolver_test.go +++ b/libbeat/processors/dns/resolver_test.go @@ -19,12 +19,14 @@ package dns import ( "crypto/tls" + "errors" "net" "strings" "testing" "github.com/miekg/dns" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/elastic-agent-libs/monitoring" ) @@ -32,11 +34,13 @@ import ( var _ resolver = (*MiekgResolver)(nil) func TestMiekgResolverLookupPTR(t *testing.T) { - stop, addr, err := ServeDNS(FakeDNSHandler) + stop, addr, err := serveDNS(fakeDNSHandler) if err != nil { t.Fatal(err) } - defer stop() + defer func() { + require.NoError(t, stop()) + }() reg := monitoring.NewRegistry() res, err := NewMiekgResolver(reg.NewRegistry(logName), 0, "udp", addr) @@ -71,19 +75,22 @@ func TestMiekgResolverLookupPTR(t *testing.T) { func TestMiekgResolverLookupPTRTLS(t *testing.T) { // Build Cert - cert, err := tls.X509KeyPair(CertPEMBlock, KeyPEMBlock) + cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock) if err != nil { t.Fatalf("unable to build certificate: %v", err) } config := tls.Config{ Certificates: []tls.Certificate{cert}, + MinVersion: tls.VersionTLS13, } // serve TLS with cert - stop, addr, err := ServeDNSTLS(FakeDNSHandler, &config) + stop, addr, err := serveDNSTLS(fakeDNSHandler, &config) if err != nil { t.Fatal(err) } - defer stop() + defer func() { + require.NoError(t, stop()) + }() reg := monitoring.NewRegistry() @@ -91,11 +98,11 @@ func TestMiekgResolverLookupPTRTLS(t *testing.T) { if err != nil { t.Fatal(err) } - // we use a self-signed certificate for localhost - // we have to pass InsecureSSL to the DNS resolver + //nolint:gosec // Don't verify the self-signed cert. This is only for testing purposes. res.client.TLSConfig = &tls.Config{ InsecureSkipVerify: true, } + // Success ptr, err := res.Lookup("8.8.8.8", typePTR) if err != nil { @@ -121,7 +128,7 @@ func TestMiekgResolverLookupPTRTLS(t *testing.T) { assert.Equal(t, 12, metricCount) } -func ServeDNS(h dns.HandlerFunc) (cancel func() error, addr string, err error) { +func serveDNS(h dns.HandlerFunc) (cancel func() error, addr string, err error) { // Setup listener on ephemeral port. a, err := net.ResolveUDPAddr("udp4", "localhost:0") @@ -136,11 +143,23 @@ func ServeDNS(h dns.HandlerFunc) (cancel func() error, addr string, err error) { var s dns.Server s.PacketConn = l s.Handler = h - go s.ActivateAndServe() - return s.Shutdown, s.PacketConn.LocalAddr().String(), err + + serveErr := make(chan error, 1) + go func() { + defer close(serveErr) + serveErr <- s.ActivateAndServe() + }() + + cancel = func() error { + return errors.Join( + s.Shutdown(), + <-serveErr, + ) + } + return cancel, s.PacketConn.LocalAddr().String(), err } -func ServeDNSTLS(h dns.HandlerFunc, config *tls.Config) (cancel func() error, addr string, err error) { +func serveDNSTLS(h dns.HandlerFunc, config *tls.Config) (cancel func() error, addr string, err error) { // Setup listener on ephemeral port. l, err := tls.Listen("tcp", "localhost:0", config) if err != nil { @@ -150,11 +169,23 @@ func ServeDNSTLS(h dns.HandlerFunc, config *tls.Config) (cancel func() error, ad var s dns.Server s.Handler = h s.Listener = l - go s.ActivateAndServe() - return s.Shutdown, l.Addr().String(), err + + serveErr := make(chan error, 1) + go func() { + defer close(serveErr) + serveErr <- s.ActivateAndServe() + }() + + cancel = func() error { + return errors.Join( + s.Shutdown(), + <-serveErr, + ) + } + return cancel, l.Addr().String(), err } -func FakeDNSHandler(w dns.ResponseWriter, msg *dns.Msg) { +func fakeDNSHandler(w dns.ResponseWriter, msg *dns.Msg) { m := new(dns.Msg) m.SetReply(msg) switch { @@ -164,11 +195,11 @@ func FakeDNSHandler(w dns.ResponseWriter, msg *dns.Msg) { default: m.SetRcode(msg, dns.RcodeNameError) } - w.WriteMsg(m) + _ = w.WriteMsg(m) } var ( - KeyPEMBlock = []byte(`-----BEGIN RSA PRIVATE KEY----- + keyPEMBlock = []byte(`-----BEGIN RSA PRIVATE KEY----- MIIEowIBAAKCAQEA2g2zpEtWaIUx5o6MEnWnGsf0Ba1SDc3AwgOmxeNIPBJYVCrk sWe8Qt/5nymReVFcum76995ncr/zT+e4e8l+hXuGzTKZJpOj27Igb0/wa3j2hIcu rnbzfwkJ+KMag2UUKdSo31ChMU+64bwziEXunF347Ot7dBLtw3PJKbabNCP+/oil @@ -196,7 +227,7 @@ LatVl7h6ud25ZJYnP7DelGxHsZnDXNirLFlSB0CL4F6I5xNoBvCoH0Q8ckDSh4C7 tlAyD5m9gwvgdkNFWq6/lcUPxGksTtTk8dGnhJz8pGlZvp6+dZCM -----END RSA PRIVATE KEY-----`) - CertPEMBlock = []byte(`-----BEGIN CERTIFICATE----- + certPEMBlock = []byte(`-----BEGIN CERTIFICATE----- MIIDaTCCAlGgAwIBAgIQGqg47wLgbjwwrZASuakmwjANBgkqhkiG9w0BAQsFADAy MRQwEgYDVQQKEwtMb2cgQ291cmllcjEaMBgGA1UEAxMRYmVhdHMuZWxhc3RpYy5j b20wHhcNMjAwNjIzMDY0NDEwWhcNMjEwNjIzMDY0NDEwWjAyMRQwEgYDVQQKEwtM From 2c8505e0f799aa87068ae90b370df17c5c841ed7 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Tue, 22 Aug 2023 17:40:28 -0400 Subject: [PATCH 4/7] unexport internal-only types/functions --- libbeat/processors/dns/cache.go | 12 ++--- libbeat/processors/dns/cache_test.go | 6 +-- libbeat/processors/dns/config.go | 62 ++++++++++++------------- libbeat/processors/dns/dns.go | 26 +++++------ libbeat/processors/dns/dns_test.go | 24 +++++----- libbeat/processors/dns/resolver.go | 14 +++--- libbeat/processors/dns/resolver_test.go | 6 +-- 7 files changed, 75 insertions(+), 75 deletions(-) diff --git a/libbeat/processors/dns/cache.go b/libbeat/processors/dns/cache.go index b1b9c35cfac4..6427fb9f98ed 100644 --- a/libbeat/processors/dns/cache.go +++ b/libbeat/processors/dns/cache.go @@ -132,10 +132,10 @@ type cachedError struct { func (ce *cachedError) Error() string { return ce.err.Error() + " (from failure cache)" } func (ce *cachedError) Cause() error { return ce.err } -// LookupCache is a cache for storing and retrieving the results of +// lookupCache is a cache for storing and retrieving the results of // DNS queries. It caches the results of queries regardless of their // outcome (success or failure). -type LookupCache struct { +type lookupCache struct { success *successCache failure *failureCache resolver resolver @@ -147,13 +147,13 @@ type cacheStats struct { Miss *monitoring.Int } -// NewLookupCache returns a new cache. -func NewLookupCache(reg *monitoring.Registry, conf CacheConfig, resolver resolver) (*LookupCache, error) { +// newLookupCache returns a new cache. +func newLookupCache(reg *monitoring.Registry, conf cacheConfig, resolver resolver) (*lookupCache, error) { if err := conf.Validate(); err != nil { return nil, err } - c := &LookupCache{ + c := &lookupCache{ success: &successCache{ data: make(map[string]successRecord, conf.SuccessCache.InitialCapacity), maxSize: conf.SuccessCache.MaxCapacity, @@ -177,7 +177,7 @@ func NewLookupCache(reg *monitoring.Registry, conf CacheConfig, resolver resolve // Lookup performs a lookup on the given query string. A cached result // will be returned if it is contained in the cache, otherwise a lookup is // performed. -func (c LookupCache) Lookup(q string, qt queryType) (*result, error) { +func (c lookupCache) Lookup(q string, qt queryType) (*result, error) { now := time.Now() r := c.success.get(now, q) diff --git a/libbeat/processors/dns/cache_test.go b/libbeat/processors/dns/cache_test.go index 16e151ed0d7c..ffb081d9fcc9 100644 --- a/libbeat/processors/dns/cache_test.go +++ b/libbeat/processors/dns/cache_test.go @@ -42,9 +42,9 @@ func (r *stubResolver) Lookup(ip string, _ queryType) (*result, error) { } func TestCache(t *testing.T) { - c, err := NewLookupCache( + c, err := newLookupCache( monitoring.NewRegistry(), - defaultConfig().CacheConfig, + defaultConfig().cacheConfig, &stubResolver{}) if err != nil { t.Fatal(err) @@ -101,7 +101,7 @@ func TestCache(t *testing.T) { assert.EqualValues(t, 3, c.stats.Miss.Get()) // Cache miss. } - minTTL := defaultConfig().CacheConfig.SuccessCache.MinTTL + minTTL := defaultConfig().cacheConfig.SuccessCache.MinTTL // Initial success returned TTL=0 with MinTTL. r, err = c.Lookup(gatewayIP+"2", typePTR) if assert.NoError(t, err) { diff --git a/libbeat/processors/dns/config.go b/libbeat/processors/dns/config.go index 928cd46cee0a..b83380f92600 100644 --- a/libbeat/processors/dns/config.go +++ b/libbeat/processors/dns/config.go @@ -28,35 +28,35 @@ import ( "github.com/elastic/elastic-agent-libs/mapstr" ) -// Config defines the configuration options for the DNS processor. -type Config struct { - CacheConfig `config:",inline"` +// config defines the configuration options for the DNS processor. +type config struct { + cacheConfig `config:",inline"` Nameservers []string `config:"nameservers"` // Required on Windows. /etc/resolv.conf is used if none are given. Timeout time.Duration `config:"timeout"` // Per request timeout (with 2 nameservers the total timeout would be 2x). Type queryType `config:"type" validate:"required"` // Reverse is the only supported type currently. - Action FieldAction `config:"action"` // Append or replace (defaults to append) when target exists. + Action fieldAction `config:"action"` // Append or replace (defaults to append) when target exists. TagOnFailure []string `config:"tag_on_failure"` // Tags to append when a failure occurs. Fields mapstr.M `config:"fields"` // Mapping of source fields to target fields. Transport string `config:"transport"` // Can be tls or udp. reverseFlat map[string]string } -// FieldAction defines the behavior when the target field exists. -type FieldAction uint8 +// fieldAction defines the behavior when the target field exists. +type fieldAction uint8 -// List of FieldAction types. +// List of fieldAction types. const ( - ActionAppend FieldAction = iota - ActionReplace + actionAppend fieldAction = iota + actionReplace ) -var fieldActionNames = map[FieldAction]string{ - ActionAppend: "append", - ActionReplace: "replace", +var fieldActionNames = map[fieldAction]string{ + actionAppend: "append", + actionReplace: "replace", } // String returns a field action name. -func (fa FieldAction) String() string { +func (fa fieldAction) String() string { name, found := fieldActionNames[fa] if found { return name @@ -64,13 +64,13 @@ func (fa FieldAction) String() string { return "unknown (" + strconv.Itoa(int(fa)) + ")" } -// Unpack unpacks a string to a FieldAction. -func (fa *FieldAction) Unpack(v string) error { +// Unpack unpacks a string to a fieldAction. +func (fa *fieldAction) Unpack(v string) error { switch strings.ToLower(v) { case "", "append": - *fa = ActionAppend + *fa = actionAppend case "replace": - *fa = ActionReplace + *fa = actionReplace default: return fmt.Errorf("invalid dns field action value '%v'", v) } @@ -112,14 +112,14 @@ func (qt *queryType) Unpack(v string) error { return nil } -// CacheConfig defines the success and failure caching parameters. -type CacheConfig struct { - SuccessCache CacheSettings `config:"success_cache"` - FailureCache CacheSettings `config:"failure_cache"` +// cacheConfig defines the success and failure caching parameters. +type cacheConfig struct { + SuccessCache cacheSettings `config:"success_cache"` + FailureCache cacheSettings `config:"failure_cache"` } -// CacheSettings define the caching behavior for an individual cache. -type CacheSettings struct { +// cacheSettings define the caching behavior for an individual cache. +type cacheSettings struct { // TTL value for items in cache. Not used for success because we use TTL // from the DNS record. TTL time.Duration `config:"ttl"` @@ -136,7 +136,7 @@ type CacheSettings struct { } // Validate validates the data contained in the config. -func (c *Config) Validate() error { +func (c *config) Validate() error { // Flatten the mapping of source fields to target fields. c.reverseFlat = map[string]string{} for k, v := range c.Fields.Flatten() { @@ -159,8 +159,8 @@ func (c *Config) Validate() error { return nil } -// Validate validates the data contained in the CacheConfig. -func (c *CacheConfig) Validate() error { +// Validate validates the data contained in the cacheConfig. +func (c *cacheConfig) Validate() error { if c.SuccessCache.MinTTL <= 0 { return fmt.Errorf("success_cache.min_ttl must be > 0") } @@ -185,15 +185,15 @@ func (c *CacheConfig) Validate() error { return nil } -func defaultConfig() Config { - return Config{ - CacheConfig: CacheConfig{ - SuccessCache: CacheSettings{ +func defaultConfig() config { + return config{ + cacheConfig: cacheConfig{ + SuccessCache: cacheSettings{ MinTTL: time.Minute, InitialCapacity: 1000, MaxCapacity: 10000, }, - FailureCache: CacheSettings{ + FailureCache: cacheSettings{ MinTTL: time.Minute, TTL: time.Minute, InitialCapacity: 1000, diff --git a/libbeat/processors/dns/dns.go b/libbeat/processors/dns/dns.go index aa8b1c5cc18f..5083aea381d6 100644 --- a/libbeat/processors/dns/dns.go +++ b/libbeat/processors/dns/dns.go @@ -27,7 +27,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/processors" jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor" - "github.com/elastic/elastic-agent-libs/config" + conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/monitoring" @@ -44,13 +44,13 @@ func init() { } type processor struct { - Config + config resolver resolver log *logp.Logger } // New constructs a new DNS processor. -func New(cfg *config.C) (beat.Processor, error) { +func New(cfg *conf.C) (beat.Processor, error) { c := defaultConfig() if err := cfg.Unpack(&c); err != nil { return nil, fmt.Errorf("fail to unpack the dns configuration: %w", err) @@ -64,17 +64,17 @@ func New(cfg *config.C) (beat.Processor, error) { ) log.Debugf("DNS processor config: %+v", c) - resolver, err := NewMiekgResolver(metrics, c.Timeout, c.Transport, c.Nameservers...) + resolver, err := newMiekgResolver(metrics, c.Timeout, c.Transport, c.Nameservers...) if err != nil { return nil, err } - cache, err := NewLookupCache(metrics.NewRegistry("cache"), c.CacheConfig, resolver) + cache, err := newLookupCache(metrics.NewRegistry("cache"), c.cacheConfig, resolver) if err != nil { return nil, err } - return &processor{Config: c, resolver: cache, log: log}, nil + return &processor{config: c, resolver: cache, log: log}, nil } func (p *processor) Run(event *beat.Event) (*beat.Event, error) { @@ -88,7 +88,7 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) { return event, nil } -func (p *processor) processField(source, target string, action FieldAction, event *beat.Event) error { +func (p *processor) processField(source, target string, action fieldAction, event *beat.Event) error { v, err := event.GetValue(source) if err != nil { //nolint:nilerr // an empty source field isn't considered an error for this processor @@ -112,12 +112,12 @@ func (p *processor) processField(source, target string, action FieldAction, even return setFieldSliceValue(action, event, target, result.Data) } -func setFieldValue(action FieldAction, event *beat.Event, key, value string) error { +func setFieldValue(action fieldAction, event *beat.Event, key, value string) error { switch action { - case ActionReplace: + case actionReplace: _, err := event.PutValue(key, value) return err - case ActionAppend: + case actionAppend: old, err := event.PutValue(key, value) if err != nil { return err @@ -137,12 +137,12 @@ func setFieldValue(action FieldAction, event *beat.Event, key, value string) err } } -func setFieldSliceValue(action FieldAction, event *beat.Event, key string, value []string) error { +func setFieldSliceValue(action fieldAction, event *beat.Event, key string, value []string) error { switch action { - case ActionReplace: + case actionReplace: _, err := event.PutValue(key, value) return err - case ActionAppend: + case actionAppend: old, err := event.PutValue(key, value) if err != nil { return err diff --git a/libbeat/processors/dns/dns_test.go b/libbeat/processors/dns/dns_test.go index f0b0dcf72346..e4571cb4240c 100644 --- a/libbeat/processors/dns/dns_test.go +++ b/libbeat/processors/dns/dns_test.go @@ -35,11 +35,11 @@ func TestDNSProcessorRun(t *testing.T) { c := defaultConfig() c.Type = typePTR p := &processor{ - Config: c, + config: c, resolver: &stubResolver{}, log: logp.NewLogger(logName), } - p.Config.reverseFlat = map[string]string{ + p.config.reverseFlat = map[string]string{ "source.ip": "source.domain", } t.Log(p.String()) @@ -60,7 +60,7 @@ func TestDNSProcessorRun(t *testing.T) { const forwardDomain = "www." + gatewayName t.Run("append", func(t *testing.T) { - p.Config.Action = ActionAppend + p.config.Action = actionAppend event, err := p.Run(&beat.Event{ Fields: mapstr.M{ @@ -79,7 +79,7 @@ func TestDNSProcessorRun(t *testing.T) { }) t.Run("replace", func(t *testing.T) { - p.Config.Action = ActionReplace + p.config.Action = actionReplace event, err := p.Run(&beat.Event{ Fields: mapstr.M{ @@ -103,7 +103,7 @@ func TestDNSProcessorRun(t *testing.T) { } p := &processor{ - Config: config, + config: config, resolver: &stubResolver{}, log: logp.NewLogger(logName), } @@ -128,12 +128,12 @@ func TestDNSProcessorRun(t *testing.T) { func TestDNSProcessorTagOnFailure(t *testing.T) { p := &processor{ - Config: defaultConfig(), + config: defaultConfig(), resolver: &stubResolver{}, log: logp.NewLogger(logName), } - p.Config.TagOnFailure = []string{"_lookup_failed"} - p.Config.reverseFlat = map[string]string{ + p.config.TagOnFailure = []string{"_lookup_failed"} + p.config.reverseFlat = map[string]string{ "source.ip": "source.domain", "destination.ip": "destination.domain", } @@ -151,7 +151,7 @@ func TestDNSProcessorTagOnFailure(t *testing.T) { v, _ := event.GetValue("tags") if assert.Len(t, v, 1) { - assert.ElementsMatch(t, v, p.Config.TagOnFailure) + assert.ElementsMatch(t, v, p.config.TagOnFailure) } } @@ -161,12 +161,12 @@ func TestDNSProcessorRunInParallel(t *testing.T) { conf := defaultConfig() reg := monitoring.NewRegistry() - cache, err := NewLookupCache(reg, conf.CacheConfig, &stubResolver{}) + cache, err := newLookupCache(reg, conf.cacheConfig, &stubResolver{}) if err != nil { t.Fatal(err) } - p := &processor{Config: conf, resolver: cache, log: logp.NewLogger(logName)} - p.Config.reverseFlat = map[string]string{"source.ip": "source.domain"} + p := &processor{config: conf, resolver: cache, log: logp.NewLogger(logName)} + p.config.reverseFlat = map[string]string{"source.ip": "source.domain"} const numGoroutines = 10 const numEvents = 500 diff --git a/libbeat/processors/dns/resolver.go b/libbeat/processors/dns/resolver.go index a0f47cbce1c4..bbbe20bce5a8 100644 --- a/libbeat/processors/dns/resolver.go +++ b/libbeat/processors/dns/resolver.go @@ -46,9 +46,9 @@ type resolver interface { Lookup(q string, qt queryType) (*result, error) } -// MiekgResolver is a resolver that is implemented using github.com/miekg/dns +// miekgResolver is a resolver that is implemented using github.com/miekg/dns // to send requests to DNS servers. It does not use the Go resolver. -type MiekgResolver struct { +type miekgResolver struct { client *dns.Client servers []string @@ -63,9 +63,9 @@ type nameserverStats struct { requestDuration metrics.Sample // Histogram of response times. } -// NewMiekgResolver returns a new MiekgResolver. It returns an error if no +// newMiekgResolver returns a new miekgResolver. It returns an error if no // nameserver are given and none can be read from /etc/resolv.conf. -func NewMiekgResolver(reg *monitoring.Registry, timeout time.Duration, transport string, servers ...string) (*MiekgResolver, error) { +func newMiekgResolver(reg *monitoring.Registry, timeout time.Duration, transport string, servers ...string) (*miekgResolver, error) { // Use /etc/resolv.conf if no nameservers are given. (Won't work for Windows). if len(servers) == 0 { config, err := dns.ClientConfigFromFile(etcResolvConf) @@ -106,7 +106,7 @@ func NewMiekgResolver(reg *monitoring.Registry, timeout time.Duration, transport clientTransferType = "udp" } - return &MiekgResolver{ + return &miekgResolver{ client: &dns.Client{ Net: clientTransferType, Timeout: timeout, @@ -131,7 +131,7 @@ func (e *dnsError) Error() string { } // Lookup performs a DNS query. -func (res *MiekgResolver) Lookup(q string, qt queryType) (*result, error) { +func (res *miekgResolver) Lookup(q string, qt queryType) (*result, error) { if len(res.servers) == 0 { return nil, errors.New("no dns servers configured") } @@ -215,7 +215,7 @@ func (res *MiekgResolver) Lookup(q string, qt queryType) (*result, error) { panic("dns resolver Lookup() should have returned a response.") } -func (res *MiekgResolver) getOrCreateNameserverStats(ns string) *nameserverStats { +func (res *miekgResolver) getOrCreateNameserverStats(ns string) *nameserverStats { // Trim port. ns = ns[:strings.LastIndex(ns, ":")] diff --git a/libbeat/processors/dns/resolver_test.go b/libbeat/processors/dns/resolver_test.go index 3e618ea9e75c..1786883a671c 100644 --- a/libbeat/processors/dns/resolver_test.go +++ b/libbeat/processors/dns/resolver_test.go @@ -31,7 +31,7 @@ import ( "github.com/elastic/elastic-agent-libs/monitoring" ) -var _ resolver = (*MiekgResolver)(nil) +var _ resolver = (*miekgResolver)(nil) func TestMiekgResolverLookupPTR(t *testing.T) { stop, addr, err := serveDNS(fakeDNSHandler) @@ -43,7 +43,7 @@ func TestMiekgResolverLookupPTR(t *testing.T) { }() reg := monitoring.NewRegistry() - res, err := NewMiekgResolver(reg.NewRegistry(logName), 0, "udp", addr) + res, err := newMiekgResolver(reg.NewRegistry(logName), 0, "udp", addr) if err != nil { t.Fatal(err) } @@ -94,7 +94,7 @@ func TestMiekgResolverLookupPTRTLS(t *testing.T) { reg := monitoring.NewRegistry() - res, err := NewMiekgResolver(reg.NewRegistry(logName), 0, "tls", addr) + res, err := newMiekgResolver(reg.NewRegistry(logName), 0, "tls", addr) if err != nil { t.Fatal(err) } From 64b70328c1d3221028382f1369a8c242fff92a9d Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Tue, 22 Aug 2023 10:06:46 -0400 Subject: [PATCH 5/7] Add changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a50b32d17227..a71f279cb7b8 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -147,6 +147,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - When running under Elastic-Agent the status is now reported per Unit instead of the whole Beat {issue}35874[35874] {pull}36183[36183] - Add warning message to SysV init scripts for RPM-based systems that lack `/etc/rc.d/init.d/functions`. {issue}35708[35708] {pull}36188[36188] - Mark `translate_sid` processor is GA. {issue}36279[36279] {pull}36280[36280] +- dns processor: Add support for forward lookups (`A`, `AAAA`, and `TXT`). {issue}11416[11416] {pull}36394[36394] *Auditbeat* From aab32d4d7587045080a1a02e10d9175ad61cfb55 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Wed, 23 Aug 2023 09:53:45 -0400 Subject: [PATCH 6/7] Apply efd6 review suggestions Co-authored-by: Dan Kortschak <90160302+efd6@users.noreply.github.com> --- libbeat/processors/dns/config.go | 4 ++-- libbeat/processors/dns/dns.go | 6 +++--- libbeat/processors/dns/resolver.go | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/libbeat/processors/dns/config.go b/libbeat/processors/dns/config.go index b83380f92600..07aefe5303a6 100644 --- a/libbeat/processors/dns/config.go +++ b/libbeat/processors/dns/config.go @@ -33,7 +33,7 @@ type config struct { cacheConfig `config:",inline"` Nameservers []string `config:"nameservers"` // Required on Windows. /etc/resolv.conf is used if none are given. Timeout time.Duration `config:"timeout"` // Per request timeout (with 2 nameservers the total timeout would be 2x). - Type queryType `config:"type" validate:"required"` // Reverse is the only supported type currently. + Type queryType `config:"type" validate:"required"` // One of A, AAAA, TXT or PTR (or reverse). Action fieldAction `config:"action"` // Append or replace (defaults to append) when target exists. TagOnFailure []string `config:"tag_on_failure"` // Tags to append when a failure occurs. Fields mapstr.M `config:"fields"` // Mapping of source fields to target fields. @@ -106,7 +106,7 @@ func (qt *queryType) Unpack(v string) error { case "txt": *qt = typeTXT default: - return fmt.Errorf("invalid dns lookup type '%v' specified in "+ + return fmt.Errorf("invalid dns lookup type '%s' specified in "+ "config (valid values are: A, AAAA, PTR, reverse, TXT)", v) } return nil diff --git a/libbeat/processors/dns/dns.go b/libbeat/processors/dns/dns.go index 5083aea381d6..d4f3d2ba57b9 100644 --- a/libbeat/processors/dns/dns.go +++ b/libbeat/processors/dns/dns.go @@ -102,7 +102,7 @@ func (p *processor) processField(source, target string, action fieldAction, even result, err := p.resolver.Lookup(strVal, p.Type) if err != nil { - return fmt.Errorf("dns lookup (%v) of %v value '%v' failed: %w", p.Type, source, strVal, err) + return fmt.Errorf("dns lookup (%s) of %s value '%s' failed: %w", p.Type, source, strVal, err) } // PTR lookups return a scalar. All other lookup types return a string slice. @@ -133,7 +133,7 @@ func setFieldValue(action fieldAction, event *beat.Event, key, value string) err } return err default: - panic(fmt.Errorf("unexpected dns field action value encountered: %v", action)) + panic(fmt.Errorf("unexpected dns field action value encountered: %s", action)) } } @@ -158,7 +158,7 @@ func setFieldSliceValue(action fieldAction, event *beat.Event, key string, value } return err default: - panic(fmt.Errorf("unexpected dns field action value encountered: %v", action)) + panic(fmt.Errorf("unexpected dns field action value encountered: %s", action)) } } diff --git a/libbeat/processors/dns/resolver.go b/libbeat/processors/dns/resolver.go index bbbe20bce5a8..e560ccdead6e 100644 --- a/libbeat/processors/dns/resolver.go +++ b/libbeat/processors/dns/resolver.go @@ -157,7 +157,7 @@ func (res *miekgResolver) Lookup(q string, qt queryType) (*result, error) { r, rtt, err := res.client.Exchange(m, server) if err != nil { - // Try next server if any. Otherwise, return retErr. + // Try next server if any. Otherwise, return nameserverErr. nameserverErr = err stats.failure.Inc() continue From 62358524b5ffe50300621ac36a4730b93dd1bf4b Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Wed, 23 Aug 2023 09:55:17 -0400 Subject: [PATCH 7/7] Use min(a, b uint32) uint32 --- libbeat/processors/dns/resolver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/processors/dns/resolver.go b/libbeat/processors/dns/resolver.go index e560ccdead6e..537051c5a5a3 100644 --- a/libbeat/processors/dns/resolver.go +++ b/libbeat/processors/dns/resolver.go @@ -252,7 +252,7 @@ func (res *miekgResolver) getOrCreateNameserverStats(ns string) *nameserverStats return stats } -func min[T uint32](a, b T) T { +func min(a, b uint32) uint32 { if a < b { return a }