diff --git a/.github/workflows/testing-go.yml b/.github/workflows/testing-go.yml index b3134c25..c9331893 100644 --- a/.github/workflows/testing-go.yml +++ b/.github/workflows/testing-go.yml @@ -141,7 +141,7 @@ jobs: - id: count_tests run: | - data=$(sudo go test -timeout 360s -v ./workers ./dnsutils ./transformers ./pkgconfig ./pkginit ././ 2>&1 | grep -c RUN) + data=$(sudo go test -timeout 360s -v ./workers ./dnsutils ./transformers ./pkgconfig ./pkginit ./telemetry ././ 2>&1 | grep -c RUN) echo "Count of Tests: $data" echo "data=$data" >> $GITHUB_OUTPUT diff --git a/Dockerfile b/Dockerfile index 81936f34..71562971 100644 --- a/Dockerfile +++ b/Dockerfile @@ -20,7 +20,7 @@ USER dnscollector COPY --from=builder /build/go-dnscollector /bin/go-dnscollector COPY --from=builder /build/docker-config.yml ./etc/dnscollector/config.yml -EXPOSE 6000/tcp 8080/tcp +EXPOSE 6000/tcp 8080/tcp 9165/tcp ENTRYPOINT ["/bin/go-dnscollector"] diff --git a/Makefile b/Makefile index 771d25e2..1ffafb69 100644 --- a/Makefile +++ b/Makefile @@ -74,6 +74,7 @@ tests: check-go @go test ./pkgconfig/ -race -cover -v @go test ./pkginit/ -race -cover -v @go test ./netutils/ -race -cover -v + @go test ./telemetry/ -race -cover -v @go test -timeout 90s ./transformers/ -race -cover -v @go test -timeout 180s ./workers/ -race -cover -v diff --git a/README.md b/README.md index 5fa8ca02..e1d74bb0 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,9 @@

Go Report Go version -Go tests +Go tests Go bench -Go lines +Go lines

@@ -115,6 +115,17 @@ The [`_integration`](./docs/_integration) folder contains DNS-collector `configu - [Elasticsearch](./docs/_integration/elasticsearch/README.md) - [Kafka](./docs/_integration/kafka/README.md) +## Telemetry + +Performance metrics are available to evaluate the efficiency of your pipelines. These metrics allow you to track: +- The number of incoming and outgoing packets processed by each worker +- The number of packets matching the policies applied (forwarded, dropped) +- The number of "discarded" packets +- Memory consumption +- CPU consumption + +A [build-in](./docs/dashboards/grafana_exporter.json) dashboard is available for monitoring these metrics. + ## Performance Tuning may be necessary to deal with a large traffic loads. diff --git a/config.yml b/config.yml index f721ef5c..3f0b9403 100644 --- a/config.yml +++ b/config.yml @@ -10,6 +10,20 @@ global: text-format-delimiter: " " text-format-boundary: "\"" pid-file: "" + worker: + interval-monitor: 10 + telemetry: + enabled: true + web-path: "/metrics" + web-listen: ":9165" + prometheus-prefix: "dnscollector_exporter" + tls-support: false + tls-cert-file: "" + tls-key-file: "" + client-ca-file: "" + basic-auth-enable: false + basic-auth-login: admin + basic-auth-pwd: changeme ################################################ # Pipelining configuration @@ -27,7 +41,7 @@ pipelines: qname-lowercase: true routing-policy: forward: [ console ] - dropped: [] + dropped: [ ] - name: console stdout: diff --git a/dnscollector.go b/dnscollector.go index dbb0e4ea..40691467 100644 --- a/dnscollector.go +++ b/dnscollector.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "os" "os/signal" @@ -12,6 +13,7 @@ import ( "github.com/dmachard/go-dnscollector/pkgconfig" "github.com/dmachard/go-dnscollector/pkginit" + "github.com/dmachard/go-dnscollector/telemetry" "github.com/dmachard/go-dnscollector/workers" "github.com/dmachard/go-logger" "github.com/natefinch/lumberjack" @@ -150,6 +152,12 @@ func main() { InitLogger(logger, config) logger.Info("main - version=%s revision=%s", version.Version, version.Revision) + // // telemetry + if config.Global.Telemetry.Enabled { + logger.Info("main - telemetry enabled on local address: %s", config.Global.Telemetry.WebListen) + } + promServer, metrics, errTelemetry := telemetry.InitTelemetryServer(config, logger) + // init active collectors and loggers mapLoggers := make(map[string]workers.Worker) mapCollectors := make(map[string]workers.Worker) @@ -164,8 +172,8 @@ func main() { // or pipeline ? if pkginit.IsPipelinesEnabled(config) { - logger.Info("main - running in pipelines mode") - err := pkginit.InitPipelines(mapLoggers, mapCollectors, config, logger) + logger.Info("main - running in pipeline mode") + err := pkginit.InitPipelines(mapLoggers, mapCollectors, config, logger, metrics) if err != nil { logger.Error("main - %s", err.Error()) removePIDFile(config) @@ -183,6 +191,11 @@ func main() { go func() { for { select { + case err := <-errTelemetry: + logger.Error("main - unable to start telemetry: %v", err) + removePIDFile(config) + os.Exit(1) + case <-sigHUP: logger.Warning("main - SIGHUP received") @@ -205,6 +218,19 @@ func main() { case <-sigTerm: logger.Warning("main - exiting...") + + // gracefully shutdown the HTTP server + if config.Global.Telemetry.Enabled { + logger.Info("main - telemetry is stopping") + metrics.Stop() + + if err := promServer.Shutdown(context.Background()); err != nil { + logger.Error("main - telemetry error shutting down http server - %s", err.Error()) + } + + } + + // and stop all workers for _, c := range mapCollectors { c.Stop() } diff --git a/docker-compose.yml b/docker-compose.yml index cd2784b7..e388b967 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,4 +12,5 @@ services: ports: - "6000:6000/tcp" - "8080:8080/tcp" + - "9165:9165/tcp" restart: always \ No newline at end of file diff --git a/docs/_examples/use-case-24.yml b/docs/_examples/use-case-24.yml index 6bfdd1df..dca37544 100644 --- a/docs/_examples/use-case-24.yml +++ b/docs/_examples/use-case-24.yml @@ -31,7 +31,7 @@ pipelines: dns.qtype: "TXT" transforms: atags: - tags: [ "TAG:TXT-QUERIES" ] + add-tags: [ "TAG:TXT-QUERIES" ] routing-policy: forward: [ apple-txt, all-txt ] @@ -51,7 +51,7 @@ pipelines: dns.qname: "^*.apple.com$" transforms: atags: - tags: [ "TXT:apple" ] + add-tags: [ "TXT:apple" ] routing-policy: forward: [ outputfile-apple ] diff --git a/docs/configuration.md b/docs/configuration.md index 108cd408..748eb027 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -13,6 +13,7 @@ You can find the global settings below - [Custom text format](#custom-text-format) - [Server identity](#server-identity) - [Pid file](#pid-file) + - [Telemetry](#telemetry) ### Trace @@ -127,9 +128,30 @@ Output example: ### Pid file -Set path to create pid file. +Set path to create DNS-collector PID. +By default, this settings is empty. ```yaml global: pid-file: "/path/to/your/pidfile.pid" -``` \ No newline at end of file +``` + +### Telemetry + +Enable and configure telemetry + +```yaml +global: + telemetry: + enabled: true + web-path: "/metrics" + web-listen: ":9165" + prometheus-prefix: "dnscollector_exporter" + tls-support: false + tls-cert-file: "" + tls-key-file: "" + client-ca-file: "" + basic-auth-enable: false + basic-auth-login: admin + basic-auth-pwd: changeme +``` diff --git a/docs/dashboards/grafana_exporter.json b/docs/dashboards/grafana_exporter.json new file mode 100644 index 00000000..e9000c2e --- /dev/null +++ b/docs/dashboards/grafana_exporter.json @@ -0,0 +1,954 @@ +{ + "__inputs": [ + { + "name": "DS_PROMETHEUS", + "label": "Prometheus", + "description": "", + "type": "datasource", + "pluginId": "prometheus", + "pluginName": "Prometheus" + } + ], + "__elements": {}, + "__requires": [ + { + "type": "grafana", + "id": "grafana", + "name": "Grafana", + "version": "11.0.0" + }, + { + "type": "datasource", + "id": "prometheus", + "name": "Prometheus", + "version": "1.0.0" + }, + { + "type": "panel", + "id": "timeseries", + "name": "Time series", + "version": "" + } + ], + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": null, + "links": [], + "panels": [ + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 0 + }, + "id": 9, + "panels": [], + "title": "Go", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 8, + "x": 0, + "y": 1 + }, + "id": 6, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "maxHeight": 600, + "mode": "multi", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "go_goroutines{job=~\"$job\"}", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "dnscollector", + "range": true, + "refId": "A" + } + ], + "title": "Goroutines", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "decbytes" + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 8, + "x": 8, + "y": 1 + }, + "id": 10, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "maxHeight": 600, + "mode": "multi", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "go_memstats_sys_bytes{job=~\"$job\"}", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "dnscollector", + "range": true, + "refId": "A" + } + ], + "title": "Total Used Memory", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "percentunit" + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 8, + "x": 16, + "y": 1 + }, + "id": 11, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "maxHeight": 600, + "mode": "multi", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "rate(process_cpu_seconds_total{job=\"$job\"}[2m])", + "legendFormat": "dnscollector", + "range": true, + "refId": "A" + } + ], + "title": "Process cpu", + "type": "timeseries" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 10 + }, + "id": 7, + "panels": [], + "title": "Workers", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [ + { + "__systemRef": "hideSeriesFrom", + "matcher": { + "id": "byNames", + "options": { + "mode": "exclude", + "names": [ + "tap" + ], + "prefix": "All except:", + "readOnly": true + } + }, + "properties": [ + { + "id": "custom.hideFrom", + "value": { + "legend": false, + "tooltip": false, + "viz": true + } + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 8, + "x": 0, + "y": 11 + }, + "id": 3, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "maxHeight": 600, + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "disableTextWrap": false, + "editorMode": "code", + "expr": "increase(dnscollector_exporter_worker_ingress_traffic_total{job=~\"$job\"}[$__rate_interval])", + "fullMetaSearch": false, + "includeNullMetadata": false, + "instant": false, + "legendFormat": "{{worker}}", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Worker - Ingress traffic", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 8, + "x": 8, + "y": 11 + }, + "id": 1, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "maxHeight": 600, + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "disableTextWrap": false, + "editorMode": "code", + "expr": "increase(dnscollector_exporter_worker_egress_traffic_total{job=~\"$job\"}[$__rate_interval])", + "fullMetaSearch": false, + "includeNullMetadata": false, + "instant": false, + "legendFormat": "{{worker}}", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Worker - Egress traffic", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 8, + "x": 16, + "y": 11 + }, + "id": 5, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "maxHeight": 600, + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "disableTextWrap": false, + "editorMode": "code", + "expr": "increase(dnscollector_exporter_worker_discarded_traffic_total{job=~\"$job\"}[$__rate_interval])", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": false, + "instant": false, + "legendFormat": "{{worker}}", + "range": true, + "refId": "B", + "useBackend": false + } + ], + "title": "Worker - Discarded", + "type": "timeseries" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 19 + }, + "id": 8, + "panels": [], + "title": "Policies", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 8, + "x": 0, + "y": 20 + }, + "id": 2, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "maxHeight": 600, + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "disableTextWrap": false, + "editorMode": "code", + "expr": "increase(dnscollector_exporter_policy_forwarded_total{job=~\"$job\"}[$__rate_interval])", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": false, + "instant": false, + "legendFormat": "{{worker}}", + "range": true, + "refId": "B", + "useBackend": false + } + ], + "title": "Policy - Forwarded", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 8, + "x": 8, + "y": 20 + }, + "id": 4, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "maxHeight": 600, + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "disableTextWrap": false, + "editorMode": "code", + "expr": "increase(dnscollector_exporter_policy_dropped_total{job=~\"$job\"}[$__rate_interval])", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": false, + "instant": false, + "legendFormat": "{{worker}}", + "range": true, + "refId": "B", + "useBackend": false + } + ], + "title": "Policy - Dropped", + "type": "timeseries" + } + ], + "refresh": "", + "schemaVersion": 39, + "tags": [], + "templating": { + "list": [ + { + "current": { + "selected": false, + "text": "dnscollector-exporter", + "value": "dnscollector-exporter" + }, + "description": "", + "hide": 0, + "label": "job", + "name": "job", + "options": [ + { + "selected": true, + "text": "dnscollector-exporter", + "value": "dnscollector-exporter" + } + ], + "query": "dnscollector-exporter", + "skipUrlSync": false, + "type": "textbox" + } + ] + }, + "time": { + "from": "now-6h", + "to": "now" + }, + "timeRangeUpdatedDuringEditOrView": false, + "timepicker": {}, + "timezone": "browser", + "title": "DNScollector - Exporter", + "uid": "bdo8oaa6fq7lse", + "version": 11, + "weekStart": "" +} \ No newline at end of file diff --git a/docs/loggers/logger_devnull.md b/docs/loggers/logger_devnull.md new file mode 100644 index 00000000..d2fb060d --- /dev/null +++ b/docs/loggers/logger_devnull.md @@ -0,0 +1,11 @@ +# Logger: DevNull + +Devnull plugin Logger + +Options: + +Default values: + +```yaml +devnull: +``` diff --git a/docs/workers.md b/docs/workers.md index 4e1af7fe..534342d5 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -28,3 +28,4 @@ A worker can act as a collector or a logger. | [Kafka Producer](loggers/logger_kafka.md) | Logger | Kafka DNS producer | | [Falco](loggers/logger_falco.md) | Logger | Falco plugin logger | | [ClickHouse](loggers/logger_clickhouse.md) | Logger | ClickHouse logger | +| [DevNull](loggers/logger_devnull.md) | Logger | For testing purpose | diff --git a/pkgconfig/global.go b/pkgconfig/global.go index 647596e2..f316bdb9 100644 --- a/pkgconfig/global.go +++ b/pkgconfig/global.go @@ -19,6 +19,22 @@ type ConfigGlobal struct { } `yaml:"trace"` ServerIdentity string `yaml:"server-identity" default:""` PidFile string `yaml:"pid-file" default:""` + Worker struct { + InternalMonitor int `yaml:"interval-monitor" default:"10"` + } `yaml:"worker"` + Telemetry struct { + Enabled bool `yaml:"enabled" default:"true"` + WebPath string `yaml:"web-path" default:"/metrics"` + WebListen string `yaml:"web-listen" default:":9165"` + PromPrefix string `yaml:"prometheus-prefix" default:"dnscollector_exporter"` + TLSSupport bool `yaml:"tls-support" default:"false"` + TLSCertFile string `yaml:"tls-cert-file" default:""` + TLSKeyFile string `yaml:"tls-key-file" default:""` + ClientCAFile string `yaml:"client-ca-file" default:""` + BasicAuthEnable bool `yaml:"basic-auth-enable" default:"false"` + BasicAuthLogin string `yaml:"basic-auth-login" default:"admin"` + BasicAuthPwd string `yaml:"basic-auth-pwd" default:"changeme"` + } `yaml:"telemetry"` } func (c *ConfigGlobal) SetDefault() { diff --git a/pkginit/pipelines.go b/pkginit/pipelines.go index cc7605aa..50e97e17 100644 --- a/pkginit/pipelines.go +++ b/pkginit/pipelines.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/dmachard/go-dnscollector/pkgconfig" + "github.com/dmachard/go-dnscollector/telemetry" "github.com/dmachard/go-dnscollector/workers" "github.com/dmachard/go-logger" "github.com/pkg/errors" @@ -124,91 +125,121 @@ func CreateRouting(stanza pkgconfig.ConfigPipelines, mapCollectors map[string]wo return nil } -func CreateStanza(stanzaName string, config *pkgconfig.Config, mapCollectors map[string]workers.Worker, mapLoggers map[string]workers.Worker, logger *logger.Logger) { +func CreateStanza(stanzaName string, config *pkgconfig.Config, mapCollectors map[string]workers.Worker, mapLoggers map[string]workers.Worker, logger *logger.Logger, metrics *telemetry.PrometheusCollector) { // register the logger if enabled if config.Loggers.RestAPI.Enable { mapLoggers[stanzaName] = workers.NewRestAPI(config, logger, stanzaName) + mapLoggers[stanzaName].SetMetrics(metrics) } if config.Loggers.Prometheus.Enable { mapLoggers[stanzaName] = workers.NewPrometheus(config, logger, stanzaName) + mapLoggers[stanzaName].SetMetrics(metrics) } if config.Loggers.Stdout.Enable { mapLoggers[stanzaName] = workers.NewStdOut(config, logger, stanzaName) + mapLoggers[stanzaName].SetMetrics(metrics) } if config.Loggers.LogFile.Enable { mapLoggers[stanzaName] = workers.NewLogFile(config, logger, stanzaName) + mapLoggers[stanzaName].SetMetrics(metrics) } if config.Loggers.DNSTap.Enable { mapLoggers[stanzaName] = workers.NewDnstapSender(config, logger, stanzaName) + mapLoggers[stanzaName].SetMetrics(metrics) } if config.Loggers.TCPClient.Enable { mapLoggers[stanzaName] = workers.NewTCPClient(config, logger, stanzaName) + mapLoggers[stanzaName].SetMetrics(metrics) } if config.Loggers.Syslog.Enable { mapLoggers[stanzaName] = workers.NewSyslog(config, logger, stanzaName) + mapLoggers[stanzaName].SetMetrics(metrics) } if config.Loggers.Fluentd.Enable { mapLoggers[stanzaName] = workers.NewFluentdClient(config, logger, stanzaName) + mapLoggers[stanzaName].SetMetrics(metrics) } if config.Loggers.InfluxDB.Enable { mapLoggers[stanzaName] = workers.NewInfluxDBClient(config, logger, stanzaName) + mapLoggers[stanzaName].SetMetrics(metrics) } if config.Loggers.LokiClient.Enable { mapLoggers[stanzaName] = workers.NewLokiClient(config, logger, stanzaName) + mapLoggers[stanzaName].SetMetrics(metrics) } if config.Loggers.Statsd.Enable { mapLoggers[stanzaName] = workers.NewStatsdClient(config, logger, stanzaName) + mapLoggers[stanzaName].SetMetrics(metrics) } if config.Loggers.ElasticSearchClient.Enable { mapLoggers[stanzaName] = workers.NewElasticSearchClient(config, logger, stanzaName) + mapLoggers[stanzaName].SetMetrics(metrics) } if config.Loggers.ScalyrClient.Enable { mapLoggers[stanzaName] = workers.NewScalyrClient(config, logger, stanzaName) + mapLoggers[stanzaName].SetMetrics(metrics) } if config.Loggers.RedisPub.Enable { mapLoggers[stanzaName] = workers.NewRedisPub(config, logger, stanzaName) + mapLoggers[stanzaName].SetMetrics(metrics) } if config.Loggers.KafkaProducer.Enable { mapLoggers[stanzaName] = workers.NewKafkaProducer(config, logger, stanzaName) + mapLoggers[stanzaName].SetMetrics(metrics) } if config.Loggers.FalcoClient.Enable { mapLoggers[stanzaName] = workers.NewFalcoClient(config, logger, stanzaName) + mapLoggers[stanzaName].SetMetrics(metrics) } if config.Loggers.ClickhouseClient.Enable { mapLoggers[stanzaName] = workers.NewClickhouseClient(config, logger, stanzaName) + mapLoggers[stanzaName].SetMetrics(metrics) + } + if config.Loggers.DevNull.Enable { + mapLoggers[stanzaName] = workers.NewDevNull(config, logger, stanzaName) + mapLoggers[stanzaName].SetMetrics(metrics) } // register the collector if enabled if config.Collectors.DNSMessage.Enable { mapCollectors[stanzaName] = workers.NewDNSMessage(nil, config, logger, stanzaName) + mapCollectors[stanzaName].SetMetrics(metrics) } if config.Collectors.Dnstap.Enable { mapCollectors[stanzaName] = workers.NewDnstapServer(nil, config, logger, stanzaName) + mapCollectors[stanzaName].SetMetrics(metrics) } if config.Collectors.DnstapProxifier.Enable { mapCollectors[stanzaName] = workers.NewDnstapProxifier(nil, config, logger, stanzaName) + mapCollectors[stanzaName].SetMetrics(metrics) } if config.Collectors.AfpacketLiveCapture.Enable { mapCollectors[stanzaName] = workers.NewAfpacketSniffer(nil, config, logger, stanzaName) + mapCollectors[stanzaName].SetMetrics(metrics) } if config.Collectors.XdpLiveCapture.Enable { mapCollectors[stanzaName] = workers.NewXDPSniffer(nil, config, logger, stanzaName) + mapCollectors[stanzaName].SetMetrics(metrics) } if config.Collectors.Tail.Enable { mapCollectors[stanzaName] = workers.NewTail(nil, config, logger, stanzaName) + mapCollectors[stanzaName].SetMetrics(metrics) } if config.Collectors.PowerDNS.Enable { mapCollectors[stanzaName] = workers.NewPdnsServer(nil, config, logger, stanzaName) + mapCollectors[stanzaName].SetMetrics(metrics) } if config.Collectors.FileIngestor.Enable { mapCollectors[stanzaName] = workers.NewFileIngestor(nil, config, logger, stanzaName) + mapCollectors[stanzaName].SetMetrics(metrics) } if config.Collectors.Tzsp.Enable { mapCollectors[stanzaName] = workers.NewTZSP(nil, config, logger, stanzaName) + mapCollectors[stanzaName].SetMetrics(metrics) } } -func InitPipelines(mapLoggers map[string]workers.Worker, mapCollectors map[string]workers.Worker, config *pkgconfig.Config, logger *logger.Logger) error { +func InitPipelines(mapLoggers map[string]workers.Worker, mapCollectors map[string]workers.Worker, config *pkgconfig.Config, logger *logger.Logger, telemetry *telemetry.PrometheusCollector) error { // check if the name of each stanza is uniq routesDefined := false for _, stanza := range config.Pipelines { @@ -241,7 +272,7 @@ func InitPipelines(mapLoggers map[string]workers.Worker, mapCollectors map[strin // read each stanza and init for _, stanza := range config.Pipelines { stanzaConfig := GetStanzaConfig(config, stanza) - CreateStanza(stanza.Name, stanzaConfig, mapCollectors, mapLoggers, logger) + CreateStanza(stanza.Name, stanzaConfig, mapCollectors, mapLoggers, logger, telemetry) } diff --git a/pkginit/pipelines_test.go b/pkginit/pipelines_test.go index 4615ac6b..36c8d975 100644 --- a/pkginit/pipelines_test.go +++ b/pkginit/pipelines_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/dmachard/go-dnscollector/pkgconfig" + "github.com/dmachard/go-dnscollector/telemetry" "github.com/dmachard/go-dnscollector/workers" "github.com/dmachard/go-logger" ) @@ -76,7 +77,8 @@ func TestPipelines_NoRoutesDefined(t *testing.T) { mapLoggers := make(map[string]workers.Worker) mapCollectors := make(map[string]workers.Worker) - err := InitPipelines(mapLoggers, mapCollectors, config, logger.New(false)) + metrics := telemetry.NewPrometheusCollector(config) + err := InitPipelines(mapLoggers, mapCollectors, config, logger.New(false), metrics) if err == nil { t.Errorf("Want err, got nil") } else if err.Error() != "no routes are defined" { @@ -100,7 +102,8 @@ func TestPipelines_RoutingLoop(t *testing.T) { mapLoggers := make(map[string]workers.Worker) mapCollectors := make(map[string]workers.Worker) - err := InitPipelines(mapLoggers, mapCollectors, config, logger.New(false)) + metrics := telemetry.NewPrometheusCollector(config) + err := InitPipelines(mapLoggers, mapCollectors, config, logger.New(false), metrics) if err == nil { t.Errorf("Want err, got nil") } else if !strings.Contains(err.Error(), "routing error loop") { diff --git a/telemetry/prometheus.go b/telemetry/prometheus.go new file mode 100644 index 00000000..79945c82 --- /dev/null +++ b/telemetry/prometheus.go @@ -0,0 +1,252 @@ +package telemetry + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "net/http" + "os" + "regexp" + "sync" + "time" + + "github.com/dmachard/go-dnscollector/pkgconfig" + "github.com/dmachard/go-logger" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/prometheus/common/version" +) + +/* +OpenMetrics and the Prometheus exposition format require the metric name +to consist only of alphanumericals and "_", ":" and they must not start +with digits. +*/ +var metricNameRegex = regexp.MustCompile(`_*[^0-9A-Za-z_]+_*`) + +func SanitizeMetricName(metricName string) string { + return metricNameRegex.ReplaceAllString(metricName, "_") +} + +type WorkerStats struct { + Name string + TotalIngress int + TotalEgress int + TotalForwardedPolicy int + TotalDroppedPolicy int + TotalDiscarded int +} + +type PrometheusCollector struct { + sync.Mutex + config *pkgconfig.Config + metrics map[string]*prometheus.Desc + Record chan WorkerStats + data map[string]WorkerStats // To store the worker stats + stop chan struct{} // Channel to signal stopping + promPrefix string +} + +func NewPrometheusCollector(config *pkgconfig.Config) *PrometheusCollector { + t := &PrometheusCollector{ + config: config, + Record: make(chan WorkerStats), + data: make(map[string]WorkerStats), + stop: make(chan struct{}), + } + + t.promPrefix = SanitizeMetricName(config.Global.Telemetry.PromPrefix) + + t.metrics = map[string]*prometheus.Desc{ + "worker_ingress_total": prometheus.NewDesc( + fmt.Sprintf("%s_worker_ingress_traffic_total", t.promPrefix), + "Ingress traffic associated to each worker", []string{"worker"}, nil), + "worker_egress_total": prometheus.NewDesc( + fmt.Sprintf("%s_worker_egress_traffic_total", t.promPrefix), + "Egress traffic associated to each worker", []string{"worker"}, nil), + "worker_discarded_total": prometheus.NewDesc( + fmt.Sprintf("%s_worker_discarded_traffic_total", t.promPrefix), + "Discarded traffic associated to each worker", []string{"worker"}, nil), + "policy_forwarded_total": prometheus.NewDesc( + fmt.Sprintf("%s_policy_forwarded_total", t.promPrefix), + "Total number of forwarded policy", []string{"worker"}, nil), + "policy_dropped_total": prometheus.NewDesc( + fmt.Sprintf("%s_policy_dropped_total", t.promPrefix), + "Total number of dropped policy", []string{"worker"}, nil), + } + return t +} + +func (t *PrometheusCollector) UpdateStats() { + for { + select { + case ws := <-t.Record: + t.Lock() + if _, ok := t.data[ws.Name]; !ok { + t.data[ws.Name] = ws + } else { + updatedWs := t.data[ws.Name] + updatedWs.TotalForwardedPolicy += ws.TotalForwardedPolicy + updatedWs.TotalDroppedPolicy += ws.TotalDroppedPolicy + updatedWs.TotalIngress += ws.TotalIngress + updatedWs.TotalEgress += ws.TotalEgress + updatedWs.TotalDiscarded += ws.TotalDiscarded + t.data[ws.Name] = updatedWs + } + t.Unlock() + case <-t.stop: + // Received stop signal, exit the goroutine + return + } + } +} +func (t *PrometheusCollector) Collect(ch chan<- prometheus.Metric) { + t.Lock() + defer t.Unlock() + + // Collect the forwarded and dropped metrics for each worker + for _, ws := range t.data { + ch <- prometheus.MustNewConstMetric( + t.metrics["worker_discarded_total"], + prometheus.CounterValue, + float64(ws.TotalDiscarded), + ws.Name, + ) + ch <- prometheus.MustNewConstMetric( + t.metrics["worker_ingress_total"], + prometheus.CounterValue, + float64(ws.TotalIngress), + ws.Name, + ) + ch <- prometheus.MustNewConstMetric( + t.metrics["worker_egress_total"], + prometheus.CounterValue, + float64(ws.TotalEgress), + ws.Name, + ) + ch <- prometheus.MustNewConstMetric( + t.metrics["policy_forwarded_total"], + prometheus.CounterValue, + float64(ws.TotalForwardedPolicy), + ws.Name, + ) + ch <- prometheus.MustNewConstMetric( + t.metrics["policy_dropped_total"], + prometheus.CounterValue, + float64(ws.TotalDroppedPolicy), + ws.Name, + ) + } +} + +func (t *PrometheusCollector) Describe(ch chan<- *prometheus.Desc) { + for _, m := range t.metrics { + ch <- m + } +} + +func (t *PrometheusCollector) Stop() { + close(t.stop) // Signal the stop channel to stop the goroutine +} + +func InitTelemetryServer(config *pkgconfig.Config, logger *logger.Logger) (*http.Server, *PrometheusCollector, chan error) { + // channel for error + errChan := make(chan error) + + // Prometheus collectors + metrics := NewPrometheusCollector(config) + + // HTTP server + promServer := &http.Server{ + Addr: config.Global.Telemetry.WebListen, + ReadHeaderTimeout: 5 * time.Second, + } + + if config.Global.Telemetry.Enabled { + go func() { + // start metrics + go metrics.UpdateStats() + + // register metrics + prometheus.MustRegister(metrics) + prometheus.MustRegister(version.NewCollector(config.Global.Telemetry.PromPrefix)) + + // handle /metrics + http.Handle(config.Global.Telemetry.WebPath, promhttp.Handler()) + + // handle http error + http.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) { + _, err := w.Write([]byte(` + DNScollector Exporter + +

DNScollector Exporter

+

Metrics

+ + `)) + if err != nil { + errChan <- err + } + }) + + if config.Global.Telemetry.TLSSupport { + // Load server certificate and key + cert, err := tls.LoadX509KeyPair(config.Global.Telemetry.TLSCertFile, config.Global.Telemetry.TLSKeyFile) + if err != nil { + errChan <- fmt.Errorf("failed to load server certificate and key: %w", err) + return + } + + // Load client CA certificate + clientCACert, err := os.ReadFile(config.Global.Telemetry.ClientCAFile) + if err != nil { + errChan <- fmt.Errorf("failed to load client CA certificate: %w", err) + return + } + clientCAs := x509.NewCertPool() + clientCAs.AppendCertsFromPEM(clientCACert) + + // Configure TLS + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + ClientCAs: clientCAs, + ClientAuth: tls.RequireAndVerifyClientCert, + } + + // Update the promServer with TLS configuration and handler + promServer.TLSConfig = tlsConfig + } + + if config.Global.Telemetry.BasicAuthEnable { + promServer.Handler = basicAuthMiddleware(http.DefaultServeMux, config.Global.Telemetry.BasicAuthLogin, config.Global.Telemetry.BasicAuthPwd) + } else { + promServer.Handler = http.DefaultServeMux + } + + // start https server + if config.Global.Telemetry.TLSSupport { + if err := promServer.ListenAndServeTLS("", ""); err != nil && err != http.ErrServerClosed { + errChan <- err + } + } else { + if err := promServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + errChan <- err + } + } + }() + } + + return promServer, metrics, errChan +} + +// BasicAuth middleware +func basicAuthMiddleware(next http.Handler, username, password string) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + u, p, ok := r.BasicAuth() + if !ok || u != username || p != password { + w.Header().Set("WWW-Authenticate", `Basic realm="Restricted"`) + http.Error(w, "Unauthorized", http.StatusUnauthorized) + return + } + next.ServeHTTP(w, r) + }) +} diff --git a/telemetry/prometheus_test.go b/telemetry/prometheus_test.go new file mode 100644 index 00000000..b618ed82 --- /dev/null +++ b/telemetry/prometheus_test.go @@ -0,0 +1,50 @@ +package telemetry + +import ( + "testing" + + "github.com/dmachard/go-dnscollector/pkgconfig" + "github.com/stretchr/testify/assert" +) + +func TestTelemetry_SanitizeMetricName(t *testing.T) { + testCases := []struct { + input string + expected string + }{ + {"metric:name", "metric_name"}, + {"metric-name", "metric_name"}, + {"metric.name", "metric_name"}, + } + + for _, tc := range testCases { + actual := SanitizeMetricName(tc.input) + assert.Equal(t, tc.expected, actual) + } +} + +func TestTelemetry_PrometheusCollectorUpdateStats(t *testing.T) { + config := pkgconfig.Config{} + + collector := NewPrometheusCollector(&config) + + // Create a sample WorkerStats + ws := WorkerStats{ + Name: "worker1", + TotalIngress: 10, TotalEgress: 5, + TotalForwardedPolicy: 2, TotalDroppedPolicy: 1, TotalDiscarded: 3, + } + + // Send the stats to the collector + go collector.UpdateStats() + collector.Record <- ws + + // Verify that the stats were updated + storedWS, ok := collector.data["worker1"] + assert.True(t, ok, "Worker stats should be present in the collector") + assert.Equal(t, ws.TotalIngress, storedWS.TotalIngress) + assert.Equal(t, ws.TotalEgress, storedWS.TotalEgress) + assert.Equal(t, ws.TotalForwardedPolicy, storedWS.TotalForwardedPolicy) + assert.Equal(t, ws.TotalDroppedPolicy, storedWS.TotalDroppedPolicy) + assert.Equal(t, ws.TotalDiscarded, storedWS.TotalDiscarded) +} diff --git a/workers/clickhouse.go b/workers/clickhouse.go index cf0f243b..fdb1179e 100644 --- a/workers/clickhouse.go +++ b/workers/clickhouse.go @@ -70,6 +70,8 @@ func (w *ClickhouseClient) StartCollect() { w.LogInfo("input channel closed!") return } + // count global messages + w.CountIngressTraffic() // apply tranforms, init dns message with additionnals parts if necessary transformResult, err := subprocessors.ProcessMessage(&dm) @@ -77,15 +79,16 @@ func (w *ClickhouseClient) StartCollect() { w.LogError(err.Error()) } if transformResult == transformers.ReturnDrop { - w.SendTo(droppedRoutes, droppedNames, dm) + w.SendDroppedTo(droppedRoutes, droppedNames, dm) continue } // send to output channel + w.CountEgressTraffic() w.GetOutputChannel() <- dm // send to next ? - w.SendTo(defaultRoutes, defaultNames, dm) + w.SendForwardedTo(defaultRoutes, defaultNames, dm) } } diff --git a/workers/devnull.go b/workers/devnull.go index 1b3a0111..e2df41f7 100644 --- a/workers/devnull.go +++ b/workers/devnull.go @@ -33,25 +33,29 @@ func (w *DevNull) StartCollect() { w.LogInfo("run: input channel closed!") return } + + // count global messages + w.CountIngressTraffic() + } } } -func (w *DevNull) StartLogging() { - w.LogInfo("logging has started") - defer w.LoggingDone() +// func (w *DevNull) StartLogging() { +// w.LogInfo("logging has started") +// defer w.LoggingDone() - for { - select { - case <-w.OnLoggerStopped(): - return +// for { +// select { +// case <-w.OnLoggerStopped(): +// return - case _, opened := <-w.GetOutputChannel(): - if !opened { - w.LogInfo("process: output channel closed!") - return - } +// case _, opened := <-w.GetOutputChannel(): +// if !opened { +// w.LogInfo("process: output channel closed!") +// return +// } - } - } -} +// } +// } +// } diff --git a/workers/dnsmessage.go b/workers/dnsmessage.go index 279251fc..2f180d11 100644 --- a/workers/dnsmessage.go +++ b/workers/dnsmessage.go @@ -190,6 +190,8 @@ func (w *DNSMessage) StartCollect() { w.LogInfo("channel closed, exit") return } + // count global messages + w.CountIngressTraffic() // matching enabled, filtering DNS messages ? matched := true @@ -220,6 +222,9 @@ func (w *DNSMessage) StartCollect() { } } + // count output packets + w.CountEgressTraffic() + // apply tranforms on matched packets only // init dns message with additionnals parts if necessary if matched { @@ -228,19 +233,19 @@ func (w *DNSMessage) StartCollect() { w.LogError(err.Error()) } if transformResult == transformers.ReturnDrop { - w.SendTo(droppedRoutes, droppedNames, dm) + w.SendDroppedTo(droppedRoutes, droppedNames, dm) continue } } // drop packet ? if !matched { - w.SendTo(droppedRoutes, droppedNames, dm) + w.SendDroppedTo(droppedRoutes, droppedNames, dm) continue } // send to next - w.SendTo(defaultRoutes, defaultNames, dm) + w.SendForwardedTo(defaultRoutes, defaultNames, dm) } } } diff --git a/workers/dnsprocessor.go b/workers/dnsprocessor.go index 0f1a12c5..a5ff1bad 100644 --- a/workers/dnsprocessor.go +++ b/workers/dnsprocessor.go @@ -46,6 +46,8 @@ func (w *DNSProcessor) StartCollect() { w.LogInfo("channel closed, exit") return } + // count global messages + w.CountIngressTraffic() // compute timestamp ts := time.Unix(int64(dm.DNSTap.TimeSec), int64(dm.DNSTap.TimeNsec)) @@ -84,13 +86,16 @@ func (w *DNSProcessor) StartCollect() { } } + // count output packets + w.CountEgressTraffic() + // apply all enabled transformers transformResult, err := transforms.ProcessMessage(&dm) if err != nil { w.LogError(err.Error()) } if transformResult == transformers.ReturnDrop { - w.SendTo(droppedRoutes, droppedNames, dm) + w.SendDroppedTo(droppedRoutes, droppedNames, dm) continue } @@ -98,7 +103,7 @@ func (w *DNSProcessor) StartCollect() { dm.DNSTap.LatencySec = fmt.Sprintf("%.6f", dm.DNSTap.Latency) // dispatch dns message to all generators - w.SendTo(defaultRoutes, defaultNames, dm) + w.SendForwardedTo(defaultRoutes, defaultNames, dm) } } } diff --git a/workers/dnstapclient.go b/workers/dnstapclient.go index 5a205b2c..d224e90a 100644 --- a/workers/dnstapclient.go +++ b/workers/dnstapclient.go @@ -219,6 +219,8 @@ func (w *DnstapSender) StartCollect() { w.LogInfo("input channel closed!") return } + // count global messages + w.CountIngressTraffic() // apply tranforms, init dns message with additionnals parts if necessary transformResult, err := subprocessors.ProcessMessage(&dm) @@ -226,15 +228,16 @@ func (w *DnstapSender) StartCollect() { w.LogError(err.Error()) } if transformResult == transformers.ReturnDrop { - w.SendTo(droppedRoutes, droppedNames, dm) + w.SendDroppedTo(droppedRoutes, droppedNames, dm) continue } // send to output channel + w.CountEgressTraffic() w.GetOutputChannel() <- dm // send to next ? - w.SendTo(defaultRoutes, defaultNames, dm) + w.SendForwardedTo(defaultRoutes, defaultNames, dm) } } } diff --git a/workers/dnstapserver.go b/workers/dnstapserver.go index 9c70401d..fe3c0449 100644 --- a/workers/dnstapserver.go +++ b/workers/dnstapserver.go @@ -56,6 +56,7 @@ func (w *DnstapServer) HandleConn(conn net.Conn, connID uint64, forceClose chan // start dnstap processor and run it dnstapProcessor := NewDNSTapProcessor(int(connID), peerName, w.GetConfig(), w.GetLogger(), w.GetName(), w.GetConfig().Collectors.Dnstap.ChannelBufferSize) + dnstapProcessor.SetMetrics(w.metrics) dnstapProcessor.SetDefaultRoutes(w.GetDefaultRoutes()) dnstapProcessor.SetDefaultDropped(w.GetDroppedRoutes()) go dnstapProcessor.StartCollect() @@ -328,12 +329,17 @@ func (w *DNSTapProcessor) StartCollect() { w.LogInfo("channel closed, exit") return } + // count global messages + w.CountIngressTraffic() err := proto.Unmarshal(data, dt) if err != nil { continue } + // count global messages + w.CountIngressTraffic() + // init dns message dm := dnsutils.DNSMessage{} dm.Init() @@ -508,13 +514,16 @@ func (w *DNSTapProcessor) StartCollect() { } } + // count output packets + w.CountEgressTraffic() + // apply all enabled transformers transformResult, err := transforms.ProcessMessage(&dm) if err != nil { w.LogError(err.Error()) } if transformResult == transformers.ReturnDrop { - w.SendTo(droppedRoutes, droppedNames, dm) + w.SendDroppedTo(droppedRoutes, droppedNames, dm) continue } @@ -522,7 +531,7 @@ func (w *DNSTapProcessor) StartCollect() { dm.DNSTap.LatencySec = fmt.Sprintf("%.6f", dm.DNSTap.Latency) // dispatch dns message to connected routes - w.SendTo(defaultRoutes, defaultNames, dm) + w.SendForwardedTo(defaultRoutes, defaultNames, dm) } } } diff --git a/workers/elasticsearch.go b/workers/elasticsearch.go index 591f1dde..6e74184f 100644 --- a/workers/elasticsearch.go +++ b/workers/elasticsearch.go @@ -84,6 +84,8 @@ func (w *ElasticSearchClient) StartCollect() { w.LogInfo("input channel closed!") return } + // count global messages + w.CountIngressTraffic() // apply tranforms, init dns message with additionnals parts if necessary transformResult, err := subprocessors.ProcessMessage(&dm) @@ -91,15 +93,16 @@ func (w *ElasticSearchClient) StartCollect() { w.LogError(err.Error()) } if transformResult == transformers.ReturnDrop { - w.SendTo(droppedRoutes, droppedNames, dm) + w.SendDroppedTo(droppedRoutes, droppedNames, dm) continue } // send to output channel + w.CountEgressTraffic() w.GetOutputChannel() <- dm // send to next ? - w.SendTo(defaultRoutes, defaultNames, dm) + w.SendForwardedTo(defaultRoutes, defaultNames, dm) } } } diff --git a/workers/falco.go b/workers/falco.go index 7bad4cb4..a7ba1bb3 100644 --- a/workers/falco.go +++ b/workers/falco.go @@ -53,6 +53,8 @@ func (w *FalcoClient) StartCollect() { w.LogInfo("input channel closed!") return } + // count global messages + w.CountIngressTraffic() // apply tranforms, init dns message with additionnals parts if necessary transformResult, err := subprocessors.ProcessMessage(&dm) @@ -60,15 +62,16 @@ func (w *FalcoClient) StartCollect() { w.LogError(err.Error()) } if transformResult == transformers.ReturnDrop { - w.SendTo(droppedRoutes, droppedNames, dm) + w.SendDroppedTo(droppedRoutes, droppedNames, dm) continue } // send to output channel + w.CountEgressTraffic() w.GetOutputChannel() <- dm // send to next ? - w.SendTo(defaultRoutes, defaultNames, dm) + w.SendForwardedTo(defaultRoutes, defaultNames, dm) } } } diff --git a/workers/file_tail.go b/workers/file_tail.go index e7286d83..8bb6f844 100644 --- a/workers/file_tail.go +++ b/workers/file_tail.go @@ -218,18 +218,21 @@ func (w *Tail) StartCollect() { dm.DNS.Payload, _ = dnspkt.Pack() dm.DNS.Length = len(dm.DNS.Payload) + // count output packets + w.CountEgressTraffic() + // apply all enabled transformers transformResult, err := subprocessors.ProcessMessage(&dm) if err != nil { w.LogError(err.Error()) } if transformResult == transformers.ReturnDrop { - w.SendTo(droppedRoutes, droppedNames, dm) + w.SendDroppedTo(droppedRoutes, droppedNames, dm) continue } // send to next ? - w.SendTo(defaultRoutes, defaultNames, dm) + w.SendForwardedTo(defaultRoutes, defaultNames, dm) } } } diff --git a/workers/fluentd.go b/workers/fluentd.go index b873e46d..8520cd51 100644 --- a/workers/fluentd.go +++ b/workers/fluentd.go @@ -201,6 +201,8 @@ func (w *FluentdClient) StartCollect() { w.LogInfo("input channel closed!") return } + // count global messages + w.CountIngressTraffic() // apply tranforms, init dns message with additionnals parts if necessary transformResult, err := subprocessors.ProcessMessage(&dm) @@ -208,15 +210,16 @@ func (w *FluentdClient) StartCollect() { w.LogError(err.Error()) } if transformResult == transformers.ReturnDrop { - w.SendTo(droppedRoutes, droppedNames, dm) + w.SendDroppedTo(droppedRoutes, droppedNames, dm) continue } // send to output channel + w.CountEgressTraffic() w.GetOutputChannel() <- dm // send to next ? - w.SendTo(defaultRoutes, defaultNames, dm) + w.SendForwardedTo(defaultRoutes, defaultNames, dm) } } } diff --git a/workers/influxdb.go b/workers/influxdb.go index 006bb62c..0521d8c5 100644 --- a/workers/influxdb.go +++ b/workers/influxdb.go @@ -57,6 +57,8 @@ func (w *InfluxDBClient) StartCollect() { w.LogInfo("input channel closed!") return } + // count global messages + w.CountIngressTraffic() // apply tranforms, init dns message with additionnals parts if necessary transformResult, err := subprocessors.ProcessMessage(&dm) @@ -64,15 +66,16 @@ func (w *InfluxDBClient) StartCollect() { w.LogError(err.Error()) } if transformResult == transformers.ReturnDrop { - w.SendTo(droppedRoutes, droppedNames, dm) + w.SendDroppedTo(droppedRoutes, droppedNames, dm) continue } // send to output channel + w.CountEgressTraffic() w.GetOutputChannel() <- dm // send to next ? - w.SendTo(defaultRoutes, defaultNames, dm) + w.SendForwardedTo(defaultRoutes, defaultNames, dm) } } } diff --git a/workers/kafkaproducer.go b/workers/kafkaproducer.go index 72f8d1d0..9385c024 100644 --- a/workers/kafkaproducer.go +++ b/workers/kafkaproducer.go @@ -228,6 +228,8 @@ func (w *KafkaProducer) StartCollect() { w.LogInfo("input channel closed!") return } + // count global messages + w.CountIngressTraffic() // apply tranforms, init dns message with additionnals parts if necessary transformResult, err := subprocessors.ProcessMessage(&dm) @@ -235,15 +237,16 @@ func (w *KafkaProducer) StartCollect() { w.LogError(err.Error()) } if transformResult == transformers.ReturnDrop { - w.SendTo(droppedRoutes, droppedNames, dm) + w.SendDroppedTo(droppedRoutes, droppedNames, dm) continue } // send to output channel + w.CountEgressTraffic() w.GetOutputChannel() <- dm // send to next ? - w.SendTo(defaultRoutes, defaultNames, dm) + w.SendForwardedTo(defaultRoutes, defaultNames, dm) } } } diff --git a/workers/logfile.go b/workers/logfile.go index e71ce115..d47ec353 100644 --- a/workers/logfile.go +++ b/workers/logfile.go @@ -428,6 +428,8 @@ func (w *LogFile) StartCollect() { w.LogInfo("input channel closed!") return } + // count global messages + w.CountIngressTraffic() // apply tranforms, init dns message with additionnals parts if necessary transformResult, err := subprocessors.ProcessMessage(&dm) @@ -435,15 +437,16 @@ func (w *LogFile) StartCollect() { w.LogError(err.Error()) } if transformResult == transformers.ReturnDrop { - w.SendTo(droppedRoutes, droppedNames, dm) + w.SendDroppedTo(droppedRoutes, droppedNames, dm) continue } // send to output channel + w.CountEgressTraffic() w.GetOutputChannel() <- dm // send to next ? - w.SendTo(defaultRoutes, defaultNames, dm) + w.SendForwardedTo(defaultRoutes, defaultNames, dm) } } } diff --git a/workers/lokiclient.go b/workers/lokiclient.go index 58f39648..6c4860b2 100644 --- a/workers/lokiclient.go +++ b/workers/lokiclient.go @@ -170,6 +170,8 @@ func (w *LokiClient) StartCollect() { w.LogInfo("input channel closed!") return } + // count global messages + w.CountIngressTraffic() // apply tranforms, init dns message with additionnals parts if necessary transformResult, err := subprocessors.ProcessMessage(&dm) @@ -177,15 +179,16 @@ func (w *LokiClient) StartCollect() { w.LogError(err.Error()) } if transformResult == transformers.ReturnDrop { - w.SendTo(droppedRoutes, droppedNames, dm) + w.SendDroppedTo(droppedRoutes, droppedNames, dm) continue } // send to output channel + w.CountEgressTraffic() w.GetOutputChannel() <- dm // send to next ? - w.SendTo(defaultRoutes, defaultNames, dm) + w.SendForwardedTo(defaultRoutes, defaultNames, dm) } } } diff --git a/workers/powerdns.go b/workers/powerdns.go index 7e39518e..37950bc6 100644 --- a/workers/powerdns.go +++ b/workers/powerdns.go @@ -240,6 +240,8 @@ func (w *PdnsProcessor) StartCollect() { w.LogInfo("channel closed, exit") return } + // count global messages + w.CountIngressTraffic() err := proto.Unmarshal(data, pbdm) if err != nil { @@ -440,18 +442,21 @@ func (w *PdnsProcessor) StartCollect() { } } + // count output packets + w.CountEgressTraffic() + // apply all enabled transformers transformResult, err := transforms.ProcessMessage(&dm) if err != nil { w.LogError(err.Error()) } if transformResult == transformers.ReturnDrop { - w.SendTo(droppedRoutes, droppedNames, dm) + w.SendDroppedTo(droppedRoutes, droppedNames, dm) continue } // dispatch dns messages to connected loggers - w.SendTo(defaultRoutes, defaultNames, dm) + w.SendForwardedTo(defaultRoutes, defaultNames, dm) } } } diff --git a/workers/prometheus.go b/workers/prometheus.go index d6eaf526..78618966 100644 --- a/workers/prometheus.go +++ b/workers/prometheus.go @@ -7,7 +7,6 @@ import ( "net" "net/http" "os" - "regexp" "strconv" "strings" "sync" @@ -15,6 +14,7 @@ import ( "github.com/dmachard/go-dnscollector/dnsutils" "github.com/dmachard/go-dnscollector/pkgconfig" + "github.com/dmachard/go-dnscollector/telemetry" "github.com/dmachard/go-dnscollector/transformers" "github.com/dmachard/go-logger" "github.com/dmachard/go-netutils" @@ -27,8 +27,6 @@ import ( // _ "net/http/pprof" ) -var metricNameRegex = regexp.MustCompile(`_*[^0-9A-Za-z_]+_*`) - /* This is the list of available label values selectors. Configuration may specify a list of lables to use for metrics. @@ -40,15 +38,6 @@ var catalogueSelectors map[string]func(*dnsutils.DNSMessage) string = map[string "stream_global": GetStreamGlobal, } -/* -OpenMetrics and the Prometheus exposition format require the metric name -to consist only of alphanumericals and "_", ":" and they must not start -with digits. -*/ -func SanitizeMetricName(metricName string) string { - return metricNameRegex.ReplaceAllString(metricName, "_") -} - /* EpsCounters (Events Per Second) - is a set of metrics we calculate on per-second basis. For others we rely on averaging by collector @@ -740,7 +729,7 @@ func NewPrometheus(config *pkgconfig.Config, logger *logger.Logger, name string) func (w *Prometheus) InitProm() { - promPrefix := SanitizeMetricName(w.GetConfig().Loggers.Prometheus.PromPrefix) + promPrefix := telemetry.SanitizeMetricName(w.GetConfig().Loggers.Prometheus.PromPrefix) // register metric about current version information. w.promRegistry.MustRegister(version.NewCollector(promPrefix)) @@ -1148,6 +1137,8 @@ func (w *Prometheus) StartCollect() { w.LogInfo("input channel closed!") return } + // count global messages + w.CountIngressTraffic() // apply tranforms, init dns message with additionnals parts if necessary transformResult, err := subprocessors.ProcessMessage(&dm) @@ -1155,15 +1146,16 @@ func (w *Prometheus) StartCollect() { w.LogError(err.Error()) } if transformResult == transformers.ReturnDrop { - w.SendTo(droppedRoutes, droppedNames, dm) + w.SendDroppedTo(droppedRoutes, droppedNames, dm) continue } // send to output channel + w.CountEgressTraffic() w.GetOutputChannel() <- dm // send to next ? - w.SendTo(defaultRoutes, defaultNames, dm) + w.SendForwardedTo(defaultRoutes, defaultNames, dm) } } } diff --git a/workers/redispub.go b/workers/redispub.go index ebe609d6..19696d12 100644 --- a/workers/redispub.go +++ b/workers/redispub.go @@ -243,6 +243,8 @@ func (w *RedisPub) StartCollect() { w.LogInfo("input channel closed!") return } + // count global messages + w.CountIngressTraffic() // apply tranforms, init dns message with additionnals parts if necessary transformResult, err := subprocessors.ProcessMessage(&dm) @@ -250,15 +252,16 @@ func (w *RedisPub) StartCollect() { w.LogError(err.Error()) } if transformResult == transformers.ReturnDrop { - w.SendTo(droppedRoutes, droppedNames, dm) + w.SendDroppedTo(droppedRoutes, droppedNames, dm) continue } // send to output channel + w.CountEgressTraffic() w.GetOutputChannel() <- dm // send to next ? - w.SendTo(defaultRoutes, defaultNames, dm) + w.SendForwardedTo(defaultRoutes, defaultNames, dm) } } } diff --git a/workers/restapi.go b/workers/restapi.go index cfd2abda..a1eba404 100644 --- a/workers/restapi.go +++ b/workers/restapi.go @@ -676,6 +676,8 @@ func (w *RestAPI) StartCollect() { w.LogInfo("input channel closed!") return } + // count global messages + w.CountIngressTraffic() // apply tranforms, init dns message with additionnals parts if necessary transformResult, err := subprocessors.ProcessMessage(&dm) @@ -683,15 +685,16 @@ func (w *RestAPI) StartCollect() { w.LogError(err.Error()) } if transformResult == transformers.ReturnDrop { - w.SendTo(droppedRoutes, droppedNames, dm) + w.SendDroppedTo(droppedRoutes, droppedNames, dm) continue } // send to output channel + w.CountEgressTraffic() w.GetOutputChannel() <- dm // send to next ? - w.SendTo(defaultRoutes, defaultNames, dm) + w.SendForwardedTo(defaultRoutes, defaultNames, dm) } } } diff --git a/workers/scalyr.go b/workers/scalyr.go index b5cc5e7d..11a07bbe 100644 --- a/workers/scalyr.go +++ b/workers/scalyr.go @@ -154,6 +154,8 @@ func (w *ScalyrClient) StartCollect() { w.LogInfo("input channel closed!") return } + // count global messages + w.CountIngressTraffic() // apply tranforms, init dns message with additionnals parts if necessary transformResult, err := subprocessors.ProcessMessage(&dm) @@ -161,15 +163,16 @@ func (w *ScalyrClient) StartCollect() { w.LogError(err.Error()) } if transformResult == transformers.ReturnDrop { - w.SendTo(droppedRoutes, droppedNames, dm) + w.SendDroppedTo(droppedRoutes, droppedNames, dm) continue } // send to output channel + w.CountEgressTraffic() w.GetOutputChannel() <- dm // send to next ? - w.SendTo(defaultRoutes, defaultNames, dm) + w.SendForwardedTo(defaultRoutes, defaultNames, dm) } } } diff --git a/workers/statsd.go b/workers/statsd.go index 01f4f866..5ff933a0 100644 --- a/workers/statsd.go +++ b/workers/statsd.go @@ -178,6 +178,8 @@ func (w *StatsdClient) StartCollect() { w.LogInfo("input channel closed!") return } + // count global messages + w.CountIngressTraffic() // apply tranforms, init dns message with additionnals parts if necessary transformResult, err := subprocessors.ProcessMessage(&dm) @@ -185,15 +187,16 @@ func (w *StatsdClient) StartCollect() { w.LogError(err.Error()) } if transformResult == transformers.ReturnDrop { - w.SendTo(droppedRoutes, droppedNames, dm) + w.SendDroppedTo(droppedRoutes, droppedNames, dm) continue } // send to output channel + w.CountEgressTraffic() w.GetOutputChannel() <- dm // send to next ? - w.SendTo(defaultRoutes, defaultNames, dm) + w.SendForwardedTo(defaultRoutes, defaultNames, dm) } } } diff --git a/workers/stdout.go b/workers/stdout.go index 348f23b7..d0e5e5e9 100644 --- a/workers/stdout.go +++ b/workers/stdout.go @@ -103,21 +103,25 @@ func (w *StdOut) StartCollect() { return } + // count global messages + w.CountIngressTraffic() + // apply tranforms, init dns message with additionnals parts if necessary transformResult, err := subprocessors.ProcessMessage(&dm) if err != nil { w.LogError(err.Error()) } if transformResult == transformers.ReturnDrop { - w.SendTo(droppedRoutes, droppedNames, dm) + w.SendDroppedTo(droppedRoutes, droppedNames, dm) continue } // send to output channel + w.CountEgressTraffic() w.GetOutputChannel() <- dm // send to next ? - w.SendTo(defaultRoutes, defaultNames, dm) + w.SendForwardedTo(defaultRoutes, defaultNames, dm) } } } diff --git a/workers/syslog.go b/workers/syslog.go index 16deb311..b6ed03f6 100644 --- a/workers/syslog.go +++ b/workers/syslog.go @@ -194,6 +194,8 @@ func (w *Syslog) StartCollect() { w.LogInfo("input channel closed!") return } + // count global messages + w.CountIngressTraffic() // apply tranforms, init dns message with additionnals parts if necessary transformResult, err := subprocessors.ProcessMessage(&dm) @@ -201,15 +203,16 @@ func (w *Syslog) StartCollect() { w.LogError(err.Error()) } if transformResult == transformers.ReturnDrop { - w.SendTo(droppedRoutes, droppedNames, dm) + w.SendDroppedTo(droppedRoutes, droppedNames, dm) continue } // send to output channel + w.CountEgressTraffic() w.GetOutputChannel() <- dm // send to next ? - w.SendTo(defaultRoutes, defaultNames, dm) + w.SendForwardedTo(defaultRoutes, defaultNames, dm) } } } diff --git a/workers/tcpclient.go b/workers/tcpclient.go index ed8f6b5f..09e6b583 100644 --- a/workers/tcpclient.go +++ b/workers/tcpclient.go @@ -230,6 +230,8 @@ func (w *TCPClient) StartCollect() { w.LogInfo("input channel closed!") return } + // count global messages + w.CountIngressTraffic() // apply tranforms, init dns message with additionnals parts if necessary transformResult, err := subprocessors.ProcessMessage(&dm) @@ -237,15 +239,16 @@ func (w *TCPClient) StartCollect() { w.LogError(err.Error()) } if transformResult == transformers.ReturnDrop { - w.SendTo(droppedRoutes, droppedNames, dm) + w.SendDroppedTo(droppedRoutes, droppedNames, dm) continue } // send to output channel + w.CountEgressTraffic() w.GetOutputChannel() <- dm // send to next ? - w.SendTo(defaultRoutes, defaultNames, dm) + w.SendForwardedTo(defaultRoutes, defaultNames, dm) } } } diff --git a/workers/worker.go b/workers/worker.go index 079f2e05..02d012f8 100644 --- a/workers/worker.go +++ b/workers/worker.go @@ -5,16 +5,20 @@ import ( "github.com/dmachard/go-dnscollector/dnsutils" "github.com/dmachard/go-dnscollector/pkgconfig" + "github.com/dmachard/go-dnscollector/telemetry" "github.com/dmachard/go-logger" ) type Worker interface { + SetMetrics(metrics *telemetry.PrometheusCollector) AddDefaultRoute(wrk Worker) AddDroppedRoute(wrk Worker) SetLoggers(loggers []Worker) GetName() string Stop() StartCollect() + CountIngressTraffic() + CountEgressTraffic() GetInputChannel() chan dnsutils.DNSMessage ReadConfig() ReloadConfig(config *pkgconfig.Config) @@ -30,6 +34,10 @@ type GenericWorker struct { droppedWorker chan string droppedWorkerCount map[string]int dnsMessageIn, dnsMessageOut chan dnsutils.DNSMessage + + metrics *telemetry.PrometheusCollector + countIngress, countEgress, countForwarded, countDropped, countDiscarded chan int + totalIngress, totalEgress, totalForwarded, totalDropped, totalDiscarded int } func NewGenericWorker(config *pkgconfig.Config, logger *logger.Logger, name string, descr string, bufferSize int, monitor bool) *GenericWorker { @@ -50,6 +58,11 @@ func NewGenericWorker(config *pkgconfig.Config, logger *logger.Logger, name stri droppedWorkerCount: map[string]int{}, dnsMessageIn: make(chan dnsutils.DNSMessage, bufferSize), dnsMessageOut: make(chan dnsutils.DNSMessage, bufferSize), + countIngress: make(chan int), + countEgress: make(chan int), + countDiscarded: make(chan int), + countForwarded: make(chan int), + countDropped: make(chan int), } if monitor { go w.Monitor() @@ -57,6 +70,10 @@ func NewGenericWorker(config *pkgconfig.Config, logger *logger.Logger, name stri return w } +func (w *GenericWorker) SetMetrics(metrics *telemetry.PrometheusCollector) { + w.metrics = metrics +} + func (w *GenericWorker) GetName() string { return w.name } func (w *GenericWorker) GetConfig() *pkgconfig.Config { return w.config } @@ -163,15 +180,32 @@ func (w *GenericWorker) Stop() { func (w *GenericWorker) Monitor() { defer func() { + if r := recover(); r != nil { + w.LogError("monitor - recovered panic: %v", r) + } w.LogInfo("monitor terminated") w.doneMonitor <- true }() - w.LogInfo("starting monitoring") - watchInterval := 10 * time.Second - bufferFull := time.NewTimer(watchInterval) + w.LogInfo("starting monitoring - refresh every %ds", w.config.Global.Worker.InternalMonitor) + timerMonitor := time.NewTimer(time.Duration(w.config.Global.Worker.InternalMonitor) * time.Second) for { select { + case <-w.countDiscarded: + w.totalDiscarded++ + + case <-w.countIngress: + w.totalIngress++ + + case <-w.countEgress: + w.totalEgress++ + + case <-w.countForwarded: + w.totalForwarded++ + + case <-w.countDropped: + w.totalDropped++ + case loggerName := <-w.droppedWorker: if _, ok := w.droppedWorkerCount[loggerName]; !ok { w.droppedWorkerCount[loggerName] = 1 @@ -181,10 +215,10 @@ func (w *GenericWorker) Monitor() { case <-w.stopMonitor: close(w.droppedWorker) - bufferFull.Stop() + timerMonitor.Stop() return - case <-bufferFull.C: + case <-timerMonitor.C: for v, k := range w.droppedWorkerCount { if k > 0 { w.LogError("worker[%s] buffer is full, %d dnsmessage(s) dropped", v, k) @@ -192,7 +226,26 @@ func (w *GenericWorker) Monitor() { } } - bufferFull.Reset(watchInterval) + // // send to telemetry? + if w.config.Global.Telemetry.Enabled && w.metrics != nil { + if w.totalIngress > 0 || w.totalForwarded > 0 || w.totalDropped > 0 { + w.metrics.Record <- telemetry.WorkerStats{ + Name: w.GetName(), + TotalIngress: w.totalIngress, + TotalEgress: w.totalEgress, + TotalForwardedPolicy: w.totalForwarded, + TotalDroppedPolicy: w.totalDropped, + TotalDiscarded: w.totalDiscarded, + } + w.totalIngress = 0 + w.totalEgress = 0 + w.totalForwarded = 0 + w.totalDropped = 0 + w.totalDiscarded = 0 + } + } + + timerMonitor.Reset(time.Duration(w.config.Global.Worker.InternalMonitor) * time.Second) } } } @@ -211,16 +264,60 @@ func (w *GenericWorker) StartLogging() { defer w.LoggingDone() } -func (w *GenericWorker) SendTo(routes []chan dnsutils.DNSMessage, routesName []string, dm dnsutils.DNSMessage) { +func (w *GenericWorker) CountIngressTraffic() { + if w.config.Global.Telemetry.Enabled { + w.countIngress <- 1 + } +} + +func (w *GenericWorker) CountEgressTraffic() { + if w.config.Global.Telemetry.Enabled { + w.countEgress <- 1 + } +} + +func (w *GenericWorker) SendDroppedTo(routes []chan dnsutils.DNSMessage, routesName []string, dm dnsutils.DNSMessage) { + for i := range routes { + select { + case routes[i] <- dm: + if w.config.Global.Telemetry.Enabled { + w.countDropped <- 1 + } + default: + if w.config.Global.Telemetry.Enabled { + w.countDiscarded <- 1 + } + w.WorkerIsBusy(routesName[i]) + } + } +} + +func (w *GenericWorker) SendForwardedTo(routes []chan dnsutils.DNSMessage, routesName []string, dm dnsutils.DNSMessage) { for i := range routes { select { case routes[i] <- dm: + if w.config.Global.Telemetry.Enabled { + w.countForwarded <- 1 + } default: + if w.config.Global.Telemetry.Enabled { + w.countDiscarded <- 1 + } w.WorkerIsBusy(routesName[i]) } } } +// func (w *GenericWorker) SendTo(routes []chan dnsutils.DNSMessage, routesName []string, dm dnsutils.DNSMessage) { +// for i := range routes { +// select { +// case routes[i] <- dm: +// default: +// w.WorkerIsBusy(routesName[i]) +// } +// } +// } + func GetRoutes(routes []Worker) ([]chan dnsutils.DNSMessage, []string) { channels := []chan dnsutils.DNSMessage{} names := []string{}