diff --git a/.github/workflows/bench-go.yml b/.github/workflows/bench-go.yml index 819d55fe..95ac0c0f 100644 --- a/.github/workflows/bench-go.yml +++ b/.github/workflows/bench-go.yml @@ -22,7 +22,7 @@ jobs: strategy: matrix: os-version: [ 'ubuntu-latest' ] - go-version: [ '1.21', '1.22', '1.23' ] + go-version: [ '1.22', '1.23' ] runs-on: ${{ matrix.os-version }} steps: diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 8ab1e5c7..6f7dc7b5 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -23,7 +23,7 @@ jobs: strategy: matrix: - go-version: ['1.21', '1.22', '1.23'] + go-version: ['1.22', '1.23'] steps: - uses: actions/checkout@v4 @@ -50,7 +50,7 @@ jobs: strategy: matrix: os: [ubuntu-latest, macos-latest, windows-latest] - go-version: ['1.21', '1.22', '1.23'] + go-version: ['1.22', '1.23'] goos: [linux, freebsd, darwin, windows] exclude: - { os: macos-latest, goos: linux } diff --git a/.github/workflows/testing-go.yml b/.github/workflows/testing-go.yml index 60471b80..9832d474 100644 --- a/.github/workflows/testing-go.yml +++ b/.github/workflows/testing-go.yml @@ -46,7 +46,7 @@ jobs: strategy: matrix: os-version: [ 'ubuntu-24.04' ] - go-version: [ '1.21', '1.22', '1.23' ] + go-version: [ '1.22', '1.23' ] package: - '.' - 'pkgconfig' diff --git a/README.md b/README.md index b2f5111a..9adc627c 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,9 @@
- - + + - +
@@ -56,6 +56,7 @@ - [`File`](docs/loggers/logger_file.md) with automatic rotation and compression - *Provide metrics and API* - [`Prometheus`](docs/loggers/logger_prometheus.md) exporter + - [`OpenTelemetry`](docs/loggers/logger_opentelemetry.md) tracing dns - [`Statsd`](docs/loggers/logger_statsd.md) support - [`REST API`](docs/loggers/logger_restapi.md) with [swagger](https://generator.swagger.io/?url=https://raw.githubusercontent.com/dmachard/go-dnscollector/main/docs/swagger.yml) to search DNS domains - *Send to remote host with generic transport protocol* diff --git a/dnsutils/dnsmessage.go b/dnsutils/dnsmessage.go index d1993399..be638fcc 100644 --- a/dnsutils/dnsmessage.go +++ b/dnsutils/dnsmessage.go @@ -5,20 +5,10 @@ import ( ) var ( - DNSQuery = "QUERY" - DNSQueryQuiet = "Q" - DNSReply = "REPLY" - DNSReplyQuiet = "R" - PdnsDirectives = regexp.MustCompile(`^powerdns-*`) - GeoIPDirectives = regexp.MustCompile(`^geoip-*`) - SuspiciousDirectives = regexp.MustCompile(`^suspicious-*`) - PublicSuffixDirectives = regexp.MustCompile(`^publixsuffix-*`) - ExtractedDirectives = regexp.MustCompile(`^extracted-*`) - ReducerDirectives = regexp.MustCompile(`^reducer-*`) - MachineLearningDirectives = regexp.MustCompile(`^ml-*`) - FilteringDirectives = regexp.MustCompile(`^filtering-*`) - RawTextDirective = regexp.MustCompile(`^ *\{.*\}`) - ATagsDirectives = regexp.MustCompile(`^atags*`) + DNSQuery = "QUERY" + DNSQueryQuiet = "Q" + DNSReply = "REPLY" + DNSReplyQuiet = "R" ) type DNSAnswer struct { @@ -112,7 +102,7 @@ type DNSTap struct { QueryZone string `json:"query-zone"` } -type PowerDNS struct { +type CollectorPowerDNS struct { Tags []string `json:"tags"` OriginalRequestSubnet string `json:"original-request-subnet"` AppliedPolicy string `json:"applied-policy"` @@ -129,6 +119,10 @@ type PowerDNS struct { DeviceID string `json:"device-id"` } +type LoggerOpenTelemetry struct { + TraceID string `json:"trace-id"` +} + type TransformDNSGeo struct { City string `json:"city"` Continent string `json:"continent"` @@ -209,8 +203,9 @@ type DNSMessage struct { DNS DNS `json:"dns"` EDNS DNSExtended `json:"edns"` DNSTap DNSTap `json:"dnstap"` + PowerDNS *CollectorPowerDNS `json:"powerdns,omitempty"` + OpenTelemetry *LoggerOpenTelemetry `json:"opentelemetry,omitempty"` Geo *TransformDNSGeo `json:"geoip,omitempty"` - PowerDNS *PowerDNS `json:"powerdns,omitempty"` Suspicious *TransformSuspicious `json:"suspicious,omitempty"` PublicSuffix *TransformPublicSuffix `json:"publicsuffix,omitempty"` Extracted *TransformExtracted `json:"extracted,omitempty"` @@ -264,6 +259,7 @@ func (dm *DNSMessage) Init() { } func (dm *DNSMessage) InitTransforms() { + // init transforms dm.ATags = &TransformATags{} dm.Filtering = &TransformFiltering{} dm.MachineLearning = &TransformML{} @@ -271,7 +267,9 @@ func (dm *DNSMessage) InitTransforms() { dm.Extracted = &TransformExtracted{} dm.PublicSuffix = &TransformPublicSuffix{} dm.Suspicious = &TransformSuspicious{} - dm.PowerDNS = &PowerDNS{} dm.Geo = &TransformDNSGeo{} dm.Relabeling = &TransformRelabeling{} + // init collectors & loggers + dm.PowerDNS = &CollectorPowerDNS{} + dm.OpenTelemetry = &LoggerOpenTelemetry{} } diff --git a/dnsutils/dnsmessage_json_test.go b/dnsutils/dnsmessage_json_test.go index 7c4e32fd..7da12266 100644 --- a/dnsutils/dnsmessage_json_test.go +++ b/dnsutils/dnsmessage_json_test.go @@ -101,7 +101,7 @@ func TestDnsMessage_Json_Collectors_Reference(t *testing.T) { }{ { collector: "powerdns", - dmRef: DNSMessage{PowerDNS: &PowerDNS{ + dmRef: DNSMessage{PowerDNS: &CollectorPowerDNS{ OriginalRequestSubnet: "subnet", AppliedPolicy: "basicrpz", AppliedPolicyHit: "hit", @@ -164,6 +164,49 @@ func TestDnsMessage_Json_Collectors_Reference(t *testing.T) { } } +func TestDnsMessage_Json_Loggers_Reference(t *testing.T) { + testcases := []struct { + logger string + dmRef DNSMessage + jsonRef string + }{ + { + logger: "otel", + dmRef: DNSMessage{OpenTelemetry: &LoggerOpenTelemetry{ + TraceID: "27c3e94ad6284eec9a50cfc5bd7384d6", + }}, + + jsonRef: `{ + "opentelemetry": { + "trace-id": "27c3e94ad6284eec9a50cfc5bd7384d6" + } + }`, + }, + } + for _, tc := range testcases { + t.Run(tc.logger, func(t *testing.T) { + + tc.dmRef.Init() + + var dmMap map[string]interface{} + err := json.Unmarshal([]byte(tc.dmRef.ToJSON()), &dmMap) + if err != nil { + t.Fatalf("could not unmarshal dm json: %s\n", err) + } + + var refMap map[string]interface{} + err = json.Unmarshal([]byte(tc.jsonRef), &refMap) + if err != nil { + t.Fatalf("could not unmarshal ref json: %s\n", err) + } + + if !reflect.DeepEqual(dmMap[tc.logger], refMap[tc.logger]) { + t.Errorf("json format different from reference, Get=%s Want=%s", dmMap[tc.logger], refMap[tc.logger]) + } + }) + } +} + func TestDnsMessage_Json_Transforms_Reference(t *testing.T) { testcases := []struct { @@ -558,7 +601,7 @@ func TestDnsMessage_JsonFlatten_Collectors_Reference(t *testing.T) { }{ { collector: "powerdns", - dm: DNSMessage{PowerDNS: &PowerDNS{ + dm: DNSMessage{PowerDNS: &CollectorPowerDNS{ OriginalRequestSubnet: "subnet", AppliedPolicy: "basicrpz", AppliedPolicyHit: "hit", diff --git a/dnsutils/dnsmessage_text.go b/dnsutils/dnsmessage_text.go index 75d4323c..4372ad75 100644 --- a/dnsutils/dnsmessage_text.go +++ b/dnsutils/dnsmessage_text.go @@ -5,11 +5,40 @@ import ( "errors" "fmt" "log" + "regexp" "strconv" "strings" "time" ) +var ( + OtelDirectives = regexp.MustCompile(`^otel-*`) + PdnsDirectives = regexp.MustCompile(`^powerdns-*`) + GeoIPDirectives = regexp.MustCompile(`^geoip-*`) + SuspiciousDirectives = regexp.MustCompile(`^suspicious-*`) + PublicSuffixDirectives = regexp.MustCompile(`^publixsuffix-*`) + ExtractedDirectives = regexp.MustCompile(`^extracted-*`) + ReducerDirectives = regexp.MustCompile(`^reducer-*`) + MachineLearningDirectives = regexp.MustCompile(`^ml-*`) + FilteringDirectives = regexp.MustCompile(`^filtering-*`) + RawTextDirective = regexp.MustCompile(`^ *\{.*\}`) + ATagsDirectives = regexp.MustCompile(`^atags*`) +) + +func (dm *DNSMessage) handleOpenTelemetryDirectives(directive string, s *strings.Builder) error { + if dm.OpenTelemetry == nil { + s.WriteString("-") + } else { + switch { + case directive == "otel-trace-id": + s.WriteString(dm.OpenTelemetry.TraceID) + default: + return errors.New(ErrorUnexpectedDirective + directive) + } + } + return nil +} + func (dm *DNSMessage) handleGeoIPDirectives(directive string, s *strings.Builder) error { if dm.Geo == nil { s.WriteString("-") @@ -532,6 +561,13 @@ func (dm *DNSMessage) ToTextLine(format []string, fieldDelimiter string, fieldBo s.WriteByte('-') } + // more directives from loggers + case OtelDirectives.MatchString(directive): + err := dm.handleOpenTelemetryDirectives(directive, &s) + if err != nil { + return nil, err + } + // more directives from collectors case PdnsDirectives.MatchString(directive): err := dm.handlePdnsDirectives(directive, &s) diff --git a/dnsutils/dnsmessage_text_test.go b/dnsutils/dnsmessage_text_test.go index 138cbd75..1f0ac2fb 100644 --- a/dnsutils/dnsmessage_text_test.go +++ b/dnsutils/dnsmessage_text_test.go @@ -274,7 +274,7 @@ func TestDnsMessage_TextFormat_InvalidDirectives(t *testing.T) { }, { name: "powerdns", - dm: DNSMessage{PowerDNS: &PowerDNS{}}, + dm: DNSMessage{PowerDNS: &CollectorPowerDNS{}}, format: "powerdns-invalid", }, { @@ -395,6 +395,43 @@ func TestDnsMessage_TextFormat_Directives_Geo(t *testing.T) { } } +func TestDnsMessage_TextFormat_Directives_OpenTelemetry(t *testing.T) { + config := pkgconfig.GetDefaultConfig() + + testcases := []struct { + name string + format string + dm DNSMessage + expected string + }{ + { + name: "undefined", + format: "otel-trace-id", + dm: DNSMessage{}, + expected: "-", + }, + { + name: "default", + format: "otel-trace-id", + dm: DNSMessage{OpenTelemetry: &LoggerOpenTelemetry{TraceID: "27c3e94ad6284eec9a50cfc5bd7384d6"}}, + expected: "27c3e94ad6284eec9a50cfc5bd7384d6", + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + line := tc.dm.String( + strings.Fields(tc.format), + config.Global.TextFormatDelimiter, + config.Global.TextFormatBoundary, + ) + if line != tc.expected { + t.Errorf("Want: %s, got: %s", tc.expected, line) + } + }) + } +} + func TestDnsMessage_TextFormat_Directives_Pdns(t *testing.T) { config := pkgconfig.GetDefaultConfig() @@ -413,13 +450,13 @@ func TestDnsMessage_TextFormat_Directives_Pdns(t *testing.T) { { name: "empty_attributes", format: "powerdns-tags powerdns-applied-policy powerdns-original-request-subnet powerdns-metadata", - dm: DNSMessage{PowerDNS: &PowerDNS{}}, + dm: DNSMessage{PowerDNS: &CollectorPowerDNS{}}, expected: "- - - -", }, { name: "applied_policy", format: "powerdns-applied-policy powerdns-applied-policy-hit powerdns-applied-policy-kind powerdns-applied-policy-trigger powerdns-applied-policy-type", - dm: DNSMessage{PowerDNS: &PowerDNS{ + dm: DNSMessage{PowerDNS: &CollectorPowerDNS{ AppliedPolicy: "policy", AppliedPolicyHit: "hit", AppliedPolicyKind: "kind", @@ -431,67 +468,67 @@ func TestDnsMessage_TextFormat_Directives_Pdns(t *testing.T) { { name: "original_request_subnet", format: "powerdns-original-request-subnet", - dm: DNSMessage{PowerDNS: &PowerDNS{OriginalRequestSubnet: "test"}}, + dm: DNSMessage{PowerDNS: &CollectorPowerDNS{OriginalRequestSubnet: "test"}}, expected: "test", }, { name: "metadata_badsyntax", format: "powerdns-metadata", - dm: DNSMessage{PowerDNS: &PowerDNS{Metadata: map[string]string{"test_key1": "test_value1"}}}, + dm: DNSMessage{PowerDNS: &CollectorPowerDNS{Metadata: map[string]string{"test_key1": "test_value1"}}}, expected: "-", }, { name: "metadata", format: "powerdns-metadata:test_key1", - dm: DNSMessage{PowerDNS: &PowerDNS{Metadata: map[string]string{"test_key1": "test_value1"}}}, + dm: DNSMessage{PowerDNS: &CollectorPowerDNS{Metadata: map[string]string{"test_key1": "test_value1"}}}, expected: "test_value1", }, { name: "metadata_invalid", format: "powerdns-metadata:test_key2", - dm: DNSMessage{PowerDNS: &PowerDNS{Metadata: map[string]string{"test_key1": "test_value1"}}}, + dm: DNSMessage{PowerDNS: &CollectorPowerDNS{Metadata: map[string]string{"test_key1": "test_value1"}}}, expected: "-", }, { name: "tags_all", format: "powerdns-tags", - dm: DNSMessage{PowerDNS: &PowerDNS{Tags: []string{"tag1", "tag2"}}}, + dm: DNSMessage{PowerDNS: &CollectorPowerDNS{Tags: []string{"tag1", "tag2"}}}, expected: "tag1,tag2", }, { name: "tags_index", format: "powerdns-tags:1", - dm: DNSMessage{PowerDNS: &PowerDNS{Tags: []string{"tag1", "tag2"}}}, + dm: DNSMessage{PowerDNS: &CollectorPowerDNS{Tags: []string{"tag1", "tag2"}}}, expected: "tag2", }, { name: "tags_invalid_index", format: "powerdns-tags:3", - dm: DNSMessage{PowerDNS: &PowerDNS{Tags: []string{"tag1", "tag2"}}}, + dm: DNSMessage{PowerDNS: &CollectorPowerDNS{Tags: []string{"tag1", "tag2"}}}, expected: "-", }, { name: "message_id", format: "powerdns-message-id", - dm: DNSMessage{PowerDNS: &PowerDNS{MessageID: "27c3e94ad6284eec9a50cfc5bd7384d6"}}, + dm: DNSMessage{PowerDNS: &CollectorPowerDNS{MessageID: "27c3e94ad6284eec9a50cfc5bd7384d6"}}, expected: "27c3e94ad6284eec9a50cfc5bd7384d6", }, { name: "initial_requestor_id", format: "powerdns-initial-requestor-id", - dm: DNSMessage{PowerDNS: &PowerDNS{InitialRequestorID: "5e006236c8a74f7eafc6af126e6d0689"}}, + dm: DNSMessage{PowerDNS: &CollectorPowerDNS{InitialRequestorID: "5e006236c8a74f7eafc6af126e6d0689"}}, expected: "5e006236c8a74f7eafc6af126e6d0689", }, { name: "requestor_id", format: "powerdns-requestor-id", - dm: DNSMessage{PowerDNS: &PowerDNS{RequestorID: "5e006236c8a74f7eafc6af126e6d0689"}}, + dm: DNSMessage{PowerDNS: &CollectorPowerDNS{RequestorID: "5e006236c8a74f7eafc6af126e6d0689"}}, expected: "5e006236c8a74f7eafc6af126e6d0689", }, { name: "device_id_name", format: "powerdns-device-id powerdns-device-name", - dm: DNSMessage{PowerDNS: &PowerDNS{DeviceID: "5e006236c8a74f7eafc6af126e6d0689", DeviceName: "test"}}, + dm: DNSMessage{PowerDNS: &CollectorPowerDNS{DeviceID: "5e006236c8a74f7eafc6af126e6d0689", DeviceName: "test"}}, expected: "5e006236c8a74f7eafc6af126e6d0689 test", }, } diff --git a/docs/_examples/use-case-32.yml b/docs/_examples/use-case-32.yml new file mode 100644 index 00000000..c5287d28 --- /dev/null +++ b/docs/_examples/use-case-32.yml @@ -0,0 +1,46 @@ +global: + trace: + verbose: true + +pipelines: + - name: tap + powerdns: + listen-ip: 0.0.0.0 + listen-port: 6000 + tls-support: false + tls-min-version: 1.2 + cert-file: "" + key-file: "" + reset-conn: true + chan-buffer-size: 0 + add-dns-payload: false + transforms: + normalize: + qname-lowercase: true + qname-replace-nonprintable: true + routing-policy: + forward: [ reordering ] + dropped: [ ] + + - name: reordering + dnsmessage: + matching: + include: + transforms: + reordering: + flush-interval: 5 + routing-policy: + forward: [ otel ] + dropped: [ ] + + - name: otel + opentelemetry: + otel-endpoint: "192.168.1.253:4317" + routing-policy: + forward: [ console ] + dropped: [ ] + + - name: console + stdout: + mode: text + text-format: "timestamp-rfc3339ns identity operation qname qtype queryip responseip otel-trace-id" diff --git a/docs/_images/otel_tracing.png b/docs/_images/otel_tracing.png new file mode 100644 index 00000000..b0d1f6d3 Binary files /dev/null and b/docs/_images/otel_tracing.png differ diff --git a/docs/_images/otel_tracing_error.png b/docs/_images/otel_tracing_error.png new file mode 100644 index 00000000..b0b89e39 Binary files /dev/null and b/docs/_images/otel_tracing_error.png differ diff --git a/docs/collectors/collector_dnstap.md b/docs/collectors/collector_dnstap.md index 754d1bca..6fd007e9 100644 --- a/docs/collectors/collector_dnstap.md +++ b/docs/collectors/collector_dnstap.md @@ -2,9 +2,11 @@ ## DNS tap -Collector to logging DNStap stream from DNS servers. +Collector to logging [DNStap](https://dnstap.info/) stream from DNS servers. The traffic can be a tcp or unix DNStap stream. TLS is also supported. +> Follow this guide to enable DNStap on your DNS servers: [Enabling DNStap logging on most popular DNS servers](https://dmachard.github.io/posts/0001-dnstap-testing/). + Options: * `listen-ip` (str) diff --git a/docs/examples.md b/docs/examples.md index 18e1694a..8952fb11 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -50,3 +50,6 @@ You will find below some examples of configurations to manage your DNS logs. - Security: suspicious traffic detector - [x] [Capture DNS packets and flag suspicious traffic](./_examples/use-case-19.yml) + +- Telemetry + - [x] [Opentelemetry tracing of your DNS traffic](./_examples/use-case-32.yml) diff --git a/docs/extended_dnstap.md b/docs/extended_dnstap.md index 4e92390a..ac336fc2 100644 --- a/docs/extended_dnstap.md +++ b/docs/extended_dnstap.md @@ -20,7 +20,7 @@ The following codec are supported: DNSTAP message can be extended by incorporating additional metadata added through transformations, such as filtering, geo, ATags. -These metadata are encoded in the extra field with the following [protobuf structure](./../../dnsutils/extended_dnstap.proto). +These metadata are encoded in the extra field with the following [protobuf structure](../dnsutils/extended_dnstap.proto). The following transformers are supported: diff --git a/docs/loggers/logger_opentelemetry.md b/docs/loggers/logger_opentelemetry.md new file mode 100644 index 00000000..43ce091f --- /dev/null +++ b/docs/loggers/logger_opentelemetry.md @@ -0,0 +1,29 @@ +# Logger: OpenTelemetry + +OpenTelemetry plugin Logger + +**Experimental**: This feature is experimental and currently works only with the DNSDist and Recursor products from PowerDNS. + +Options: +* `otel-endpoint` (string) + > Specifies the endpoint for sending telemetry data to an OpenTelemetry collector. + > The endpoint should be specified in the format `host:port`. + +Default values: + +```yaml +opentelemetry: + otel-endpoint: "" +``` + +Exemple of result with Tempo from Grafana + +
+ +
+ +Exemple with DNS error (NXDOMAIN) + ++ +
diff --git a/docs/transformers.md b/docs/transformers.md index 663fb61a..ee50a310 100644 --- a/docs/transformers.md +++ b/docs/transformers.md @@ -29,3 +29,4 @@ Transformers processing is currently in this order : | [JSON relabeling](transformers/transform_relabeling.md) | JSON relabeling to rename or remove keys | | [DNS message rewrite](transformers/transform_rewrite.md) | Rewrite value for DNS messages structure | | [Newly Observed Domains](transformers/transform_newdomaintracker.md) | Detect Newly Observed Domains | +| [Reordering](transformers/transform_reordering.md) | Reordering DNS messages based on timestamps | diff --git a/docs/workers.md b/docs/workers.md index 534342d5..2811739b 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -29,3 +29,4 @@ A worker can act as a collector or a logger. | [Falco](loggers/logger_falco.md) | Logger | Falco plugin logger | | [ClickHouse](loggers/logger_clickhouse.md) | Logger | ClickHouse logger | | [DevNull](loggers/logger_devnull.md) | Logger | For testing purpose | +| [OpenTelemetry](loggers/logger_opentelemetry.md) | Logger | Open Telemetry tracing - Experimental | diff --git a/go.mod b/go.mod index cda15045..bca1e1f3 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/dmachard/go-dnscollector -go 1.21.8 +go 1.22.0 toolchain go1.23.0 @@ -37,6 +37,11 @@ require ( github.com/segmentio/kafka-go v0.4.47 github.com/stretchr/testify v1.10.0 github.com/tinylib/msgp v1.2.5 + go.opentelemetry.io/otel v1.33.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 + go.opentelemetry.io/otel/sdk v1.28.0 + go.opentelemetry.io/otel/trace v1.33.0 golang.org/x/net v0.31.0 golang.org/x/sys v0.28.0 google.golang.org/protobuf v1.35.2 @@ -52,6 +57,7 @@ require ( github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect github.com/armon/go-metrics v0.4.1 // indirect github.com/c2h5oh/datasize v0.0.0-20231215233829-aa82cc1e6500 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash v1.1.0 // indirect github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect @@ -82,6 +88,7 @@ require ( github.com/grafana/loki/pkg/push v0.0.0-20240402204250-824f5aa20aaa // indirect github.com/grafana/pyroscope-go/godeltaprof v0.1.8 // indirect github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect github.com/hashicorp/consul/api v1.29.4 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect @@ -125,7 +132,6 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/exporter-toolkit v0.11.0 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect - github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect github.com/sercand/kuberesolver/v5 v5.1.1 // indirect github.com/shopspring/decimal v1.2.0 // indirect @@ -140,10 +146,10 @@ require ( go.etcd.io/etcd/api/v3 v3.5.4 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.4 // indirect go.etcd.io/etcd/client/v3 v3.5.4 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/collector/pdata v1.12.0 // indirect - go.opentelemetry.io/otel v1.28.0 // indirect - go.opentelemetry.io/otel/metric v1.28.0 // indirect - go.opentelemetry.io/otel/trace v1.28.0 // indirect + go.opentelemetry.io/otel/metric v1.33.0 // indirect + go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.21.0 // indirect diff --git a/go.sum b/go.sum index 5c87006f..59eb4f54 100644 --- a/go.sum +++ b/go.sum @@ -67,6 +67,8 @@ github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kB github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= github.com/c2h5oh/datasize v0.0.0-20231215233829-aa82cc1e6500 h1:6lhrsTEnloDPXyeZBvSYvQf8u86jbKehZPVDDlkgDl4= github.com/c2h5oh/datasize v0.0.0-20231215233829-aa82cc1e6500/go.mod h1:S/7n9copUssQ56c7aAgHqftWO4LTf4xY6CGWt8Bc+3M= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= @@ -253,6 +255,8 @@ github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc h1:GN2Lv3MGO7AS6PrR github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc/go.mod h1:+JKpmjMGhpgPL+rXZ5nsZieVzvarn86asRlBg4uNGnk= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k= github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw= github.com/hashicorp/consul/api v1.29.4 h1:P6slzxDLBOxUSj3fWo2o65VuKtbtOXFi7TSSgtXutuE= github.com/hashicorp/consul/api v1.29.4/go.mod h1:HUlfw+l2Zy68ceJavv2zAyArl2fqhGWnMycyt56sBgg= @@ -492,8 +496,8 @@ github.com/prometheus/prometheus v0.54.1/go.mod h1:xlLByHhk2g3ycakQGrMaU8K7OySZx github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= -github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= -github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/rs/tzsp v0.0.0-20161230003637-8ce729c826b9 h1:upQjqUCvtoYMwHSXn0eGc1lsVJpEi90u3oMjmLKa9ac= github.com/rs/tzsp v0.0.0-20161230003637-8ce729c826b9/go.mod h1:pFz3aQBXB8wqK0Mnt7iOEgcrpRHgpP+1xNnOy7Ok1Bw= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= @@ -563,14 +567,24 @@ go.etcd.io/etcd/client/pkg/v3 v3.5.4 h1:lrneYvz923dvC14R54XcA7FXoZ3mlGZAgmwhfm7H go.etcd.io/etcd/client/pkg/v3 v3.5.4/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= go.etcd.io/etcd/client/v3 v3.5.4 h1:p83BUL3tAYS0OT/r0qglgc3M1JjhM0diV8DSWAhVXv4= go.etcd.io/etcd/client/v3 v3.5.4/go.mod h1:ZaRkVgBZC+L+dLCjTcF1hRXpgZXQPOvnA/Ak/gq3kiY= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= go.opentelemetry.io/collector/pdata v1.12.0 h1:Xx5VK1p4VO0md8MWm2icwC1MnJ7f8EimKItMWw46BmA= go.opentelemetry.io/collector/pdata v1.12.0/go.mod h1:MYeB0MmMAxeM0hstCFrCqWLzdyeYySim2dG6pDT6nYI= -go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= -go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= -go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= -go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= -go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= -go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= +go.opentelemetry.io/otel v1.33.0 h1:/FerN9bax5LoK51X/sI0SVYrjSE0/yUL7DpxW4K3FWw= +go.opentelemetry.io/otel v1.33.0/go.mod h1:SUUkR6csvUQl+yjReHu5uM3EtVV7MBm5FHKRlNx4I8I= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 h1:3Q/xZUyC1BBkualc9ROb4G8qkH90LXEIICcs5zv1OYY= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0/go.mod h1:s75jGIWA9OfCMzF0xr+ZgfrB5FEbbV7UuYo32ahUiFI= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 h1:R3X6ZXmNPRR8ul6i3WgFURCHzaXjHdm0karRG/+dj3s= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0/go.mod h1:QWFXnDavXWwMx2EEcZsf3yxgEKAqsxQ+Syjp+seyInw= +go.opentelemetry.io/otel/metric v1.33.0 h1:r+JOocAyeRVXD8lZpjdQjzMadVZp2M4WmQ+5WtEnklQ= +go.opentelemetry.io/otel/metric v1.33.0/go.mod h1:L9+Fyctbp6HFTddIxClbQkjtubW6O9QS3Ann/M82u6M= +go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE= +go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg= +go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s= +go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck= +go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= +go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= diff --git a/pkgconfig/loggers.go b/pkgconfig/loggers.go index 60a147be..c427d9b4 100644 --- a/pkgconfig/loggers.go +++ b/pkgconfig/loggers.go @@ -235,6 +235,13 @@ type ConfigLoggers struct { BasicAuthLogin string `yaml:"basic-auth-login" default:""` BasicAuthPwd string `yaml:"basic-auth-pwd" default:""` } `yaml:"elasticsearch"` + OpenTelemetryClient struct { + Enable bool `yaml:"enable" default:"false"` + ChannelBufferSize int `yaml:"chan-buffer-size" default:"0"` + CleanupSpansInterval int `yaml:"cleanup-spans-interval" default:"30"` + MaxSpanTime int `yaml:"max-span-time" default:"120"` + OtelEndpoint string `yaml:"otel-endpoint" default:""` + } `yaml:"opentelemetry"` ScalyrClient struct { Enable bool `yaml:"enable" default:"false"` Mode string `yaml:"mode" default:"text"` diff --git a/pkginit/pipelines.go b/pkginit/pipelines.go index 64b195ab..5dcf9853 100644 --- a/pkginit/pipelines.go +++ b/pkginit/pipelines.go @@ -199,6 +199,10 @@ func CreateStanza(stanzaName string, config *pkgconfig.Config, mapCollectors map mapLoggers[stanzaName] = workers.NewDevNull(config, logger, stanzaName) mapLoggers[stanzaName].SetMetrics(metrics) } + if config.Loggers.OpenTelemetryClient.Enable { + mapLoggers[stanzaName] = workers.NewOpenTelemetryClient(config, logger, stanzaName) + mapLoggers[stanzaName].SetMetrics(metrics) + } // register the collector if enabled if config.Collectors.DNSMessage.Enable { diff --git a/workers/otel.go b/workers/otel.go new file mode 100644 index 00000000..3d76765f --- /dev/null +++ b/workers/otel.go @@ -0,0 +1,299 @@ +package workers + +import ( + "context" + "log" + "sync" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/trace" + + "github.com/dmachard/go-dnscollector/dnsutils" + "github.com/dmachard/go-dnscollector/pkgconfig" + "github.com/dmachard/go-dnscollector/transformers" + "github.com/dmachard/go-logger" +) + +type trackedSpan struct { + span trace.Span + startTime time.Time +} + +type OpenTelemetryClient struct { + *GenericWorker + tracerProviders map[string]*sdktrace.TracerProvider +} + +func NewOpenTelemetryClient(config *pkgconfig.Config, console *logger.Logger, name string) *OpenTelemetryClient { + bufSize := config.Global.Worker.ChannelBufferSize + if config.Loggers.OpenTelemetryClient.ChannelBufferSize > 0 { + bufSize = config.Loggers.OpenTelemetryClient.ChannelBufferSize + } + + w := &OpenTelemetryClient{ + GenericWorker: NewGenericWorker(config, console, name, "opentelemetry", bufSize, pkgconfig.DefaultMonitor), + tracerProviders: make(map[string]*sdktrace.TracerProvider), + } + return w +} + +func (w *OpenTelemetryClient) initTracerProvider(serviceName string) *sdktrace.TracerProvider { + exporter, err := otlptrace.New(context.Background(), otlptracegrpc.NewClient( + otlptracegrpc.WithEndpoint(w.config.Loggers.OpenTelemetryClient.OtelEndpoint), + otlptracegrpc.WithInsecure(), + )) + if err != nil { + log.Fatalf("failed to create OTLP exporter: %v", err) + } + + tracerProvider := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exporter), + sdktrace.WithResource(resource.NewSchemaless( + attribute.String("service.name", serviceName), + )), + ) + return tracerProvider +} + +func (w *OpenTelemetryClient) getTracer(serviceName string) trace.Tracer { + if tp, exists := w.tracerProviders[serviceName]; exists { + return tp.Tracer("") + } + + tp := w.initTracerProvider(serviceName) + w.tracerProviders[serviceName] = tp + return tp.Tracer("") +} + +func (w *OpenTelemetryClient) StartCollect() { + w.LogInfo("starting data collection") + defer w.CollectDone() + + // prepare next channels + droppedRoutes, droppedNames := GetRoutes(w.GetDroppedRoutes()) + + // prepare transforms + subprocessors := transformers.NewTransforms(&w.GetConfig().OutgoingTransformers, w.GetLogger(), w.GetName(), w.GetOutputChannelAsList(), 0) + + // goroutine to process transformed dns messages + go w.StartLogging() + + // loop to process incoming messages + for { + select { + case <-w.OnStop(): + w.StopLogger() + subprocessors.Reset() + return + + // new config provided? + case cfg := <-w.NewConfig(): + w.SetConfig(cfg) + w.ReadConfig() + subprocessors.ReloadConfig(&cfg.OutgoingTransformers) + + case dm, opened := <-w.GetInputChannel(): + if !opened { + w.LogInfo("input channel closed!") + return + } + + // count global messages + w.CountIngressTraffic() + + // apply tranforms, init dns message with additionnals parts if necessary + transformResult, err := subprocessors.ProcessMessage(&dm) + if err != nil { + w.LogError(err.Error()) + } + if transformResult == transformers.ReturnDrop { + w.SendDroppedTo(droppedRoutes, droppedNames, dm) + continue + } + + // send to output channel + w.CountEgressTraffic() + w.GetOutputChannel() <- dm + } + } +} + +func (w *OpenTelemetryClient) StartLogging() { + w.LogInfo("logging has started") + defer w.LoggingDone() + + // prepare next channels + defaultRoutes, defaultNames := GetRoutes(w.GetDefaultRoutes()) + + // Maps to follow the state of the spans + requestorSpans := sync.Map{} + messageSpans := sync.Map{} + resolverSpans := sync.Map{} + + go w.cleanupSpans(&requestorSpans, &messageSpans, &resolverSpans, time.Duration(w.config.Loggers.OpenTelemetryClient.MaxSpanTime)*time.Second) + + for { + select { + case <-w.OnLoggerStopped(): + return + + // incoming dns message to process + case dm, opened := <-w.GetOutputChannel(): + if !opened { + w.LogInfo("output channel closed!") + return + } + + timestamp, err := time.Parse(time.RFC3339, dm.DNSTap.TimestampRFC3339) + if err != nil { + w.LogWarning("invalid timestamp: %v", err) + continue + } + tracer := w.getTracer(dm.DNSTap.Identity) + + // ini opentelemetry with default values + dm.OpenTelemetry = &dnsutils.LoggerOpenTelemetry{} + + switch dm.DNSTap.Operation { + case "CLIENT_QUERY": + w.handleClientQuery(&requestorSpans, &messageSpans, tracer, &dm, timestamp) + case "CLIENT_RESPONSE": + w.handleClientResponse(&requestorSpans, &messageSpans, &dm, timestamp) + case "RESOLVER_QUERY": + w.handleResolverQuery(&messageSpans, &resolverSpans, tracer, &dm, timestamp) + case "RESOLVER_RESPONSE": + w.handleResolverResponse(&resolverSpans, &dm, timestamp) + } + + // send to next ? + w.SendForwardedTo(defaultRoutes, defaultNames, dm) + } + } +} + +func (w *OpenTelemetryClient) handleClientQuery(requestorSpans, messageSpans *sync.Map, tracer trace.Tracer, dm *dnsutils.DNSMessage, timestamp time.Time) { + if parentSpan, ok := requestorSpans.Load(dm.PowerDNS.RequestorID); ok { + _, childSpan := tracer.Start(trace.ContextWithSpan(context.Background(), parentSpan.(trackedSpan).span), "Client Query "+dm.NetworkInfo.ResponseIP+" ("+dm.DNS.Qname+" / "+dm.DNS.Qtype+" )", trace.WithTimestamp(timestamp)) + childSpan.SetAttributes(attribute.String("dns.qname", dm.DNS.Qname)) + childSpan.SetAttributes(attribute.String("source.ip", dm.NetworkInfo.QueryIP)) + childSpan.SetAttributes(attribute.String("destination.ip", dm.NetworkInfo.ResponseIP)) + + messageSpans.Store(dm.PowerDNS.MessageID, trackedSpan{span: childSpan, startTime: timestamp}) + dm.OpenTelemetry.TraceID = childSpan.SpanContext().TraceID().String() + } else { + _, clientSpan := tracer.Start(context.Background(), "Client Query "+dm.NetworkInfo.ResponseIP+" ("+dm.DNS.Qname+" / "+dm.DNS.Qtype+" )", trace.WithTimestamp(timestamp)) + clientSpan.SetAttributes(attribute.String("dns.qname", dm.DNS.Qname)) + clientSpan.SetAttributes(attribute.String("source.ip", dm.NetworkInfo.QueryIP)) + clientSpan.SetAttributes(attribute.String("destination.ip", dm.NetworkInfo.ResponseIP)) + requestorSpans.Store(dm.PowerDNS.RequestorID, trackedSpan{span: clientSpan, startTime: timestamp}) + messageSpans.Store(dm.PowerDNS.MessageID, trackedSpan{span: clientSpan, startTime: timestamp}) + dm.OpenTelemetry.TraceID = clientSpan.SpanContext().TraceID().String() + } +} + +func (w *OpenTelemetryClient) handleClientResponse(requestorSpans, messageSpans *sync.Map, dm *dnsutils.DNSMessage, timestamp time.Time) { + if span, ok := messageSpans.Load(dm.PowerDNS.MessageID); ok { + tracked := span.(trackedSpan) + tracked.span.SetAttributes(attribute.String("dns.rcode", dm.DNS.Rcode)) + if dm.DNS.Rcode != dnsutils.DNSRcodeNoError { + tracked.span.SetAttributes(attribute.String("error", "true")) + tracked.span.SetAttributes(attribute.String("error.message", "Non-successful DNS response code")) + } + + tracked.span.End(trace.WithTimestamp(timestamp)) + messageSpans.Delete(dm.PowerDNS.MessageID) + dm.OpenTelemetry.TraceID = tracked.span.SpanContext().TraceID().String() + } + + if span, ok := requestorSpans.Load(dm.PowerDNS.RequestorID); ok { + tracked := span.(trackedSpan) + tracked.span.SetAttributes(attribute.String("dns.rcode", dm.DNS.Rcode)) + + if dm.DNS.Rcode != dnsutils.DNSRcodeNoError { + tracked.span.SetAttributes(attribute.String("error", "true")) + tracked.span.SetAttributes(attribute.String("error.message", "Non-successful DNS response code")) + } + + tracked.span.End(trace.WithTimestamp(timestamp)) + requestorSpans.Delete(dm.PowerDNS.RequestorID) + } +} + +func (w *OpenTelemetryClient) handleResolverQuery(messageSpans, resolverSpans *sync.Map, tracer trace.Tracer, dm *dnsutils.DNSMessage, timestamp time.Time) { + if tracked, ok := messageSpans.Load(dm.PowerDNS.InitialRequestorID); ok { + _, resolverSpan := tracer.Start(trace.ContextWithSpan(context.Background(), tracked.(trackedSpan).span), "Resolver Query "+dm.NetworkInfo.ResponseIP+" ("+dm.DNS.Qname+" / "+dm.DNS.Qtype+" )", trace.WithTimestamp(timestamp)) + resolverSpan.SetAttributes(attribute.String("dns.qname", dm.DNS.Qname)) + resolverSpan.SetAttributes(attribute.String("source.ip", dm.NetworkInfo.QueryIP)) + resolverSpan.SetAttributes(attribute.String("destination.ip", dm.NetworkInfo.ResponseIP)) + resolverSpans.Store(dm.PowerDNS.MessageID, trackedSpan{span: resolverSpan, startTime: timestamp}) + dm.OpenTelemetry.TraceID = resolverSpan.SpanContext().TraceID().String() + } else { + // No parent span found, create a root span + _, resolverSpan := tracer.Start(context.Background(), "Resolver Query ("+dm.DNS.Qname+")", trace.WithTimestamp(timestamp)) + resolverSpan.SetAttributes(attribute.String("dns.qname", dm.DNS.Qname)) + resolverSpan.SetAttributes(attribute.String("source.ip", dm.NetworkInfo.QueryIP)) + resolverSpan.SetAttributes(attribute.String("destination.ip", dm.NetworkInfo.ResponseIP)) + resolverSpans.Store(dm.PowerDNS.MessageID, trackedSpan{span: resolverSpan, startTime: timestamp}) + dm.OpenTelemetry.TraceID = resolverSpan.SpanContext().TraceID().String() + } +} + +func (w *OpenTelemetryClient) handleResolverResponse(resolverSpans *sync.Map, dm *dnsutils.DNSMessage, timestamp time.Time) { + if span, ok := resolverSpans.Load(dm.PowerDNS.MessageID); ok { + tracked := span.(trackedSpan) + tracked.span.SetAttributes(attribute.String("dns.rcode", dm.DNS.Rcode)) + + if dm.DNS.Rcode != dnsutils.DNSRcodeNoError { + tracked.span.SetAttributes(attribute.String("error", "true")) + tracked.span.SetAttributes(attribute.String("error.message", "Non-successful DNS response code")) + } + + tracked.span.End(trace.WithTimestamp(timestamp)) + resolverSpans.Delete(dm.PowerDNS.MessageID) + dm.OpenTelemetry.TraceID = tracked.span.SpanContext().TraceID().String() + } +} + +func (w *OpenTelemetryClient) cleanupSpans(requestorSpans, messageSpans, resolverSpans *sync.Map, maxSpanDuration time.Duration) { + ticker := time.NewTicker(time.Duration(w.config.Loggers.OpenTelemetryClient.CleanupSpansInterval) * time.Second) + defer ticker.Stop() + + for range ticker.C { + now := time.Now() + + requestorSpans.Range(func(key, value interface{}) bool { + tracked := value.(trackedSpan) + if tracked.startTime.Add(maxSpanDuration).Before(now) { + tracked.span.SetAttributes(attribute.String("error", "timeout")) + tracked.span.End() + requestorSpans.Delete(key) + } + return true + }) + + messageSpans.Range(func(key, value interface{}) bool { + tracked := value.(trackedSpan) + if tracked.startTime.Add(maxSpanDuration).Before(now) { + tracked.span.SetAttributes(attribute.String("error", "timeout")) + tracked.span.End() + messageSpans.Delete(key) + } + return true + }) + + resolverSpans.Range(func(key, value interface{}) bool { + tracked := value.(trackedSpan) + if tracked.startTime.Add(maxSpanDuration).Before(now) { + tracked.span.SetAttributes(attribute.String("error", "timeout")) + tracked.span.End() + resolverSpans.Delete(key) + } + return true + }) + } +} diff --git a/workers/otel_test.go b/workers/otel_test.go new file mode 100644 index 00000000..a10f86a7 --- /dev/null +++ b/workers/otel_test.go @@ -0,0 +1,24 @@ +package workers + +import ( + "testing" + + "github.com/dmachard/go-dnscollector/pkgconfig" + "github.com/dmachard/go-logger" + "github.com/stretchr/testify/assert" +) + +func TestOpenTelemetry_InitTracerProvider(t *testing.T) { + cfg := pkgconfig.GetDefaultConfig() + cfg.Loggers.OpenTelemetryClient.Enable = true + cfg.Loggers.OpenTelemetryClient.OtelEndpoint = "localhost:4317" + + logger := logger.New(false) // Disable verbose logging for tests + client := NewOpenTelemetryClient(cfg, logger, "test-client") + + // Initialize tracer provider + tracer := client.getTracer("test-service") + + // Assert tracer is not nil + assert.NotNil(t, tracer, "Tracer should not be nil") +} diff --git a/workers/powerdns.go b/workers/powerdns.go index c84271c7..ea1af825 100644 --- a/workers/powerdns.go +++ b/workers/powerdns.go @@ -263,7 +263,7 @@ func (w *PdnsProcessor) StartCollect() { dm.Init() // init powerdns with default values - dm.PowerDNS = &dnsutils.PowerDNS{ + dm.PowerDNS = &dnsutils.CollectorPowerDNS{ Tags: []string{}, OriginalRequestSubnet: "", AppliedPolicy: "", @@ -318,7 +318,7 @@ func (w *PdnsProcessor) StartCollect() { dm.DNS.Qtype = dnsutils.RdatatypeToString(int(pbdm.Question.GetQType())) // get specific powerdns params - pdns := dnsutils.PowerDNS{} + pdns := dnsutils.CollectorPowerDNS{} // get PowerDNS OriginalRequestSubnet ip := pbdm.GetOriginalRequestorSubnet()