From e1c74085f07d43447b40ab93f07ac012336c76e2 Mon Sep 17 00:00:00 2001 From: Denis Machard <5562930+dmachard@users.noreply.github.com> Date: Mon, 19 Feb 2024 14:57:40 +0100 Subject: [PATCH] fluentd: remove msgpack lib and add IBM/fluent-forward-go/fluent/client (#612) * remove msgpack lib add IBM/fluent-forward-go/fluent/client * fix test --- dnsutils/message.go | 122 +++++++++++++++++------------------ go.mod | 7 +- go.sum | 23 ++++++- loggers/fluentd.go | 137 +++++++++++++++++----------------------- loggers/fluentd_test.go | 96 ++++++++++++++++++++++------ 5 files changed, 220 insertions(+), 165 deletions(-) diff --git a/dnsutils/message.go b/dnsutils/message.go index 6330e9bb..5e965c32 100644 --- a/dnsutils/message.go +++ b/dnsutils/message.go @@ -151,94 +151,94 @@ type DNSTap struct { } type PowerDNS struct { - Tags []string `json:"tags" msgpack:"tags"` - OriginalRequestSubnet string `json:"original-request-subnet" msgpack:"original-request-subnet"` - AppliedPolicy string `json:"applied-policy" msgpack:"applied-policy"` - AppliedPolicyHit string `json:"applied-policy-hit" msgpack:"applied-policy-hit"` - AppliedPolicyKind string `json:"applied-policy-kind" msgpack:"applied-policy-kind"` - AppliedPolicyTrigger string `json:"applied-policy-trigger" msgpack:"applied-policy-trigger"` - AppliedPolicyType string `json:"applied-policy-type" msgpack:"applied-policy-type"` - Metadata map[string]string `json:"metadata" msgpack:"metadata"` + Tags []string `json:"tags"` + OriginalRequestSubnet string `json:"original-request-subnet"` + AppliedPolicy string `json:"applied-policy"` + AppliedPolicyHit string `json:"applied-policy-hit"` + AppliedPolicyKind string `json:"applied-policy-kind"` + AppliedPolicyTrigger string `json:"applied-policy-trigger"` + AppliedPolicyType string `json:"applied-policy-type"` + Metadata map[string]string `json:"metadata"` } type TransformDNSGeo struct { - City string `json:"city" msgpack:"city"` - Continent string `json:"continent" msgpack:"continent"` - CountryIsoCode string `json:"country-isocode" msgpack:"country-isocode"` - AutonomousSystemNumber string `json:"as-number" msgpack:"as-number"` - AutonomousSystemOrg string `json:"as-owner" msgpack:"as-owner"` + City string `json:"city"` + Continent string `json:"continent"` + CountryIsoCode string `json:"country-isocode"` + AutonomousSystemNumber string `json:"as-number"` + AutonomousSystemOrg string `json:"as-owner"` } type TransformSuspicious struct { - Score float64 `json:"score" msgpack:"score"` - MalformedPacket bool `json:"malformed-pkt" msgpack:"malformed-pkt"` - LargePacket bool `json:"large-pkt" msgpack:"large-pkt"` - LongDomain bool `json:"long-domain" msgpack:"long-domain"` - SlowDomain bool `json:"slow-domain" msgpack:"slow-domain"` - UnallowedChars bool `json:"unallowed-chars" msgpack:"unallowed-chars"` - UncommonQtypes bool `json:"uncommon-qtypes" msgpack:"uncommon-qtypes"` - ExcessiveNumberLabels bool `json:"excessive-number-labels" msgpack:"excessive-number-labels"` - Domain string `json:"domain,omitempty" msgpack:"-"` + Score float64 `json:"score"` + MalformedPacket bool `json:"malformed-pkt"` + LargePacket bool `json:"large-pkt"` + LongDomain bool `json:"long-domain"` + SlowDomain bool `json:"slow-domain"` + UnallowedChars bool `json:"unallowed-chars"` + UncommonQtypes bool `json:"uncommon-qtypes"` + ExcessiveNumberLabels bool `json:"excessive-number-labels"` + Domain string `json:"domain,omitempty"` } type TransformPublicSuffix struct { - QnamePublicSuffix string `json:"tld" msgpack:"qname-public-suffix"` - QnameEffectiveTLDPlusOne string `json:"etld+1" msgpack:"qname-effective-tld-plus-one"` + QnamePublicSuffix string `json:"tld"` + QnameEffectiveTLDPlusOne string `json:"etld+1"` } type TransformExtracted struct { - Base64Payload []byte `json:"dns_payload" msgpack:"dns_payload"` + Base64Payload []byte `json:"dns_payload"` } type TransformReducer struct { - Occurrences int `json:"occurrences" msgpack:"occurrences"` - CumulativeLength int `json:"cumulative-length" msgpack:"cumulative-length"` + Occurrences int `json:"occurrences"` + CumulativeLength int `json:"cumulative-length"` } type TransformFiltering struct { - SampleRate int `json:"sample-rate" msgpack:"sample-rate"` + SampleRate int `json:"sample-rate"` } type TransformML struct { - Entropy float64 `json:"entropy" msgpack:"entropy"` // Entropy of query name - Length int `json:"length" msgpack:"length"` // Length of domain - Labels int `json:"labels" msgpack:"labels"` // Number of labels in the query name separated by dots - Digits int `json:"digits" msgpack:"digits"` // Count of numerical characters - Lowers int `json:"lowers" msgpack:"lowers"` // Count of lowercase characters - Uppers int `json:"uppers" msgpack:"uppers"` // Count of uppercase characters - Specials int `json:"specials" msgpack:"specials"` // Number of special characters; special characters such as dash, underscore, equal sign,... - Others int `json:"others" msgpack:"others"` - RatioDigits float64 `json:"ratio-digits" msgpack:"ratio-digits"` - RatioLetters float64 `json:"ratio-letters" msgpack:"ratio-letters"` - RatioSpecials float64 `json:"ratio-specials" msgpack:"ratio-specials"` - RatioOthers float64 `json:"ratio-others" msgpack:"ratio-others"` - ConsecutiveChars int `json:"consecutive-chars" msgpack:"consecutive-chars"` - ConsecutiveVowels int `json:"consecutive-vowels" msgpack:"consecutive-vowels"` - ConsecutiveDigits int `json:"consecutive-digits" msgpack:"consecutive-digits"` - ConsecutiveConsonants int `json:"consecutive-consonants" msgpack:"consecutive-consonants"` - Size int `json:"size" msgpack:"size"` - Occurrences int `json:"occurrences" msgpack:"occurrences"` - UncommonQtypes int `json:"uncommon-qtypes" msgpack:"uncommon-qtypes"` + Entropy float64 `json:"entropy"` // Entropy of query name + Length int `json:"length"` // Length of domain + Labels int `json:"labels"` // Number of labels in the query name separated by dots + Digits int `json:"digits"` // Count of numerical characters + Lowers int `json:"lowers"` // Count of lowercase characters + Uppers int `json:"uppers"` // Count of uppercase characters + Specials int `json:"specials"` // Number of special characters; special characters such as dash, underscore, equal sign,... + Others int `json:"others"` + RatioDigits float64 `json:"ratio-digits"` + RatioLetters float64 `json:"ratio-letters"` + RatioSpecials float64 `json:"ratio-specials"` + RatioOthers float64 `json:"ratio-others"` + ConsecutiveChars int `json:"consecutive-chars"` + ConsecutiveVowels int `json:"consecutive-vowels"` + ConsecutiveDigits int `json:"consecutive-digits"` + ConsecutiveConsonants int `json:"consecutive-consonants"` + Size int `json:"size"` + Occurrences int `json:"occurrences"` + UncommonQtypes int `json:"uncommon-qtypes"` } type TransformATags struct { - Tags []string `json:"tags" msgpack:"tags"` + Tags []string `json:"tags"` } type DNSMessage struct { - NetworkInfo DNSNetInfo `json:"network" msgpack:"network"` - DNS DNS `json:"dns" msgpack:"dns"` - EDNS DNSExtended `json:"edns" msgpack:"edns"` - DNSTap DNSTap `json:"dnstap" msgpack:"dnstap"` - Geo *TransformDNSGeo `json:"geoip,omitempty" msgpack:"geo"` - PowerDNS *PowerDNS `json:"powerdns,omitempty" msgpack:"powerdns"` - Suspicious *TransformSuspicious `json:"suspicious,omitempty" msgpack:"suspicious"` - PublicSuffix *TransformPublicSuffix `json:"publicsuffix,omitempty" msgpack:"publicsuffix"` - Extracted *TransformExtracted `json:"extracted,omitempty" msgpack:"extracted"` - Reducer *TransformReducer `json:"reducer,omitempty" msgpack:"reducer"` - MachineLearning *TransformML `json:"ml,omitempty" msgpack:"ml"` - Filtering *TransformFiltering `json:"filtering,omitempty" msgpack:"filtering"` - ATags *TransformATags `json:"atags,omitempty" msgpack:"atags"` + NetworkInfo DNSNetInfo `json:"network"` + DNS DNS `json:"dns"` + EDNS DNSExtended `json:"edns"` + DNSTap DNSTap `json:"dnstap"` + Geo *TransformDNSGeo `json:"geoip,omitempty"` + PowerDNS *PowerDNS `json:"powerdns,omitempty"` + Suspicious *TransformSuspicious `json:"suspicious,omitempty"` + PublicSuffix *TransformPublicSuffix `json:"publicsuffix,omitempty"` + Extracted *TransformExtracted `json:"extracted,omitempty"` + Reducer *TransformReducer `json:"reducer,omitempty"` + MachineLearning *TransformML `json:"ml,omitempty"` + Filtering *TransformFiltering `json:"filtering,omitempty"` + ATags *TransformATags `json:"atags,omitempty"` } func (dm *DNSMessage) Init() { diff --git a/go.mod b/go.mod index 3754ed32..53c631c0 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/dmachard/go-dnscollector go 1.21 require ( + github.com/IBM/fluent-forward-go v0.2.2 github.com/Shopify/sarama v1.38.1 github.com/cilium/ebpf v0.12.3 github.com/dmachard/go-clientsyslog v0.3.0 @@ -31,7 +32,7 @@ require ( github.com/rs/tzsp v0.0.0-20161230003637-8ce729c826b9 github.com/segmentio/kafka-go v0.4.47 github.com/stretchr/testify v1.8.4 - github.com/vmihailenco/msgpack v4.0.4+incompatible + github.com/vmihailenco/msgpack/v5 v5.4.1 golang.org/x/net v0.20.0 golang.org/x/sys v0.16.0 google.golang.org/protobuf v1.32.0 @@ -61,6 +62,7 @@ require ( github.com/gogo/status v1.1.1 // indirect github.com/google/btree v1.1.2 // indirect github.com/gorilla/mux v1.8.0 // indirect + github.com/gorilla/websocket v1.5.0 // indirect github.com/grafana/loki/pkg/push v0.0.0-20231211180320-2535f9bedeae // indirect github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd // indirect github.com/hashicorp/consul/api v1.20.0 // indirect @@ -94,6 +96,7 @@ require ( github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e // indirect github.com/opentracing-contrib/go-stdlib v1.0.0 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect + github.com/philhofer/fwd v1.1.2 // indirect github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/exporter-toolkit v0.9.1 // indirect @@ -103,8 +106,10 @@ require ( github.com/sirupsen/logrus v1.9.3 // indirect github.com/soheilhy/cmux v0.1.5 // indirect github.com/stretchr/objx v0.5.0 // indirect + github.com/tinylib/msgp v1.1.9 // indirect github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect github.com/uber/jaeger-lib v2.4.1+incompatible // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect github.com/weaveworks/promrus v1.2.0 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect diff --git a/go.sum b/go.sum index 9525328c..55aa6f6c 100644 --- a/go.sum +++ b/go.sum @@ -391,6 +391,8 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM= github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= +github.com/IBM/fluent-forward-go v0.2.2 h1:T48kAjSMOAqTcpd6zkzqLAFOWlYPYIbCFJcEjrVzV1U= +github.com/IBM/fluent-forward-go v0.2.2/go.mod h1:U1SVl6rVRGMC/QhCTZ3iQx4P/ykCeg1y6UoVnlz+OAY= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= github.com/Shopify/sarama v1.38.1 h1:lqqPUPQZ7zPqYlWpTh+LQ9bhYNu2xJL6k1SJN4WVe2A= @@ -650,6 +652,8 @@ github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grafana/dskit v0.0.0-20230804003603-740f56bd2934 h1:W1g+y6rOO7K/Jm2XNPxIXyJisJSJ25uiVVaSa7N1Zwo= github.com/grafana/dskit v0.0.0-20230804003603-740f56bd2934/go.mod h1:Xg0aN3EpqkYFW1ZxGyIl4BGEpr3QrCQOM1aWalpU3ik= github.com/grafana/loki v1.6.2-0.20231211180320-2535f9bedeae h1:KLk1jneF9/yVmxizviqWMV8uxwf5DbpYAJeru0NY1AU= @@ -821,6 +825,10 @@ github.com/nqd/flat v0.2.0 h1:g6lXtMxsxrz6PZOO+rNnAJUn/GGRrK4FgVEhy/v+cHI= github.com/nqd/flat v0.2.0/go.mod h1:FOuslZmNY082wVfVUUb7qAGWKl8z8Nor9FMg+Xj2Nss= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= +github.com/onsi/ginkgo/v2 v2.2.0 h1:3ZNA3L1c5FYDFTTxbFeVGGD8jYvjYauHD30YgLxVsNI= +github.com/onsi/ginkgo/v2 v2.2.0/go.mod h1:MEH45j8TBi6u9BMogfbp0stKC5cdGjumZj5Y7AG4VIk= +github.com/onsi/gomega v1.24.0 h1:+0glovB9Jd6z3VR+ScSwQqXVTIfJcGA9UBM8yzQxhqg= +github.com/onsi/gomega v1.24.0/go.mod h1:Z/NWtiqwBrwUt4/2loMmHL63EDLnYHmVbuBpDr2vQAg= github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e h1:4cPxUYdgaGzZIT5/j0IfqOrrXmq6bG8AwvwisMXpdrg= github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e/go.mod h1:DYR5Eij8rJl8h7gblRrOZ8g0kW1umSpKqYIBTgeDtLo= github.com/opentracing-contrib/go-stdlib v1.0.0 h1:TBS7YuVotp8myLon4Pv7BtCBzOTo1DeZCld0Z63mW2w= @@ -833,6 +841,10 @@ github.com/oschwald/maxminddb-golang v1.12.0/go.mod h1:q0Nob5lTCqyQ8WT6FYgS1L7PX github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= +github.com/philhofer/fwd v1.1.1 h1:GdGcTjf5RNAxwS4QLsiMzJYj5KEvPJD3Abr261yRQXQ= +github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= +github.com/philhofer/fwd v1.1.2 h1:bnDivRJ1EWPjUIRXV5KfORO897HTbpFAQddBdE8t7Gw= +github.com/philhofer/fwd v1.1.2/go.mod h1:qkPdfjR2SIEbspLqpe1tO4n5yICnr2DY7mqEx2tUTP0= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= @@ -924,6 +936,10 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/tinylib/msgp v1.1.6 h1:i+SbKraHhnrf9M5MYmvQhFnbLhAXSDWF8WWsuyRdocw= +github.com/tinylib/msgp v1.1.6/go.mod h1:75BAfg2hauQhs3qedfdDZmWAPcFMAvJE5b9rGOMufyw= +github.com/tinylib/msgp v1.1.9 h1:SHf3yoO2sGA0veCJeCBYLHuttAVFHGm2RHgNodW7wQU= +github.com/tinylib/msgp v1.1.9/go.mod h1:BCXGB54lDD8qUEPmiG0cQQUANC4IUQyB2ItS2UDlO/k= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/uber/jaeger-client-go v2.30.0+incompatible h1:D6wyKGCecFaSRUpo8lCVbaOOb6ThwMmTEbhRwtKR97o= github.com/uber/jaeger-client-go v2.30.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= @@ -932,8 +948,10 @@ github.com/uber/jaeger-lib v2.4.1+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6 github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= github.com/valyala/fasttemplate v1.1.0/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= -github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI= -github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/weaveworks/promrus v1.2.0 h1:jOLf6pe6/vss4qGHjXmGz4oDJQA+AOCqEL3FvvZGz7M= github.com/weaveworks/promrus v1.2.0/go.mod h1:SaE82+OJ91yqjrE1rsvBWVzNZKcHYFtMUyS1+Ogs/KA= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= @@ -1326,6 +1344,7 @@ golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82ur9kSqwfTHTeVxaDqrfMjpcNT6bE= +golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= diff --git a/loggers/fluentd.go b/loggers/fluentd.go index ccd31512..b8ec0684 100644 --- a/loggers/fluentd.go +++ b/loggers/fluentd.go @@ -2,19 +2,17 @@ package loggers import ( "crypto/tls" - "errors" - "io" - "net" "strconv" "time" + "github.com/IBM/fluent-forward-go/fluent/client" + "github.com/IBM/fluent-forward-go/fluent/protocol" "github.com/dmachard/go-dnscollector/dnsutils" "github.com/dmachard/go-dnscollector/netlib" "github.com/dmachard/go-dnscollector/pkgconfig" "github.com/dmachard/go-dnscollector/pkgutils" "github.com/dmachard/go-dnscollector/transformers" "github.com/dmachard/go-logger" - "github.com/vmihailenco/msgpack" ) type FluentdClient struct { @@ -22,15 +20,13 @@ type FluentdClient struct { doneProcess chan bool stopRun chan bool doneRun chan bool - stopRead chan bool - doneRead chan bool inputChan chan dnsutils.DNSMessage outputChan chan dnsutils.DNSMessage config *pkgconfig.Config configChan chan *pkgconfig.Config logger *logger.Logger transport string - transportConn net.Conn + fluentConn *client.Client transportReady chan bool transportReconnect chan bool writerReady bool @@ -45,8 +41,6 @@ func NewFluentdClient(config *pkgconfig.Config, logger *logger.Logger, name stri doneProcess: make(chan bool), stopRun: make(chan bool), doneRun: make(chan bool), - stopRead: make(chan bool), - doneRead: make(chan bool), inputChan: make(chan dnsutils.DNSMessage, config.Loggers.Fluentd.ChannelBufferSize), outputChan: make(chan dnsutils.DNSMessage, config.Loggers.Fluentd.ChannelBufferSize), transportReady: make(chan bool), @@ -111,58 +105,30 @@ func (fc *FluentdClient) Stop() { fc.stopRun <- true <-fc.doneRun - fc.LogInfo("stopping to read...") - fc.stopRead <- true - <-fc.doneRead - fc.LogInfo("stopping to process...") fc.stopProcess <- true <-fc.doneProcess } func (fc *FluentdClient) Disconnect() { - if fc.transportConn != nil { - fc.LogInfo("closing tcp connection") - fc.transportConn.Close() + if fc.fluentConn != nil { + fc.LogInfo("closing fluentd connection") + fc.fluentConn.Disconnect() } } -func (fc *FluentdClient) ReadFromConnection() { - buffer := make([]byte, 4096) - - go func() { - for { - _, err := fc.transportConn.Read(buffer) - if err != nil { - if errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) { - fc.LogInfo("read from connection terminated") - break - } - fc.LogError("Error on reading: %s", err.Error()) - } - // We just discard the data - } - }() - - // block goroutine until receive true event in stopRead channel - <-fc.stopRead - fc.doneRead <- true - - fc.LogInfo("read goroutine terminated") -} - func (fc *FluentdClient) ConnectToRemote() { for { - if fc.transportConn != nil { - fc.transportConn.Close() - fc.transportConn = nil + if fc.fluentConn != nil { + fc.fluentConn.Disconnect() + fc.fluentConn = nil } address := fc.config.Loggers.Fluentd.RemoteAddress + ":" + strconv.Itoa(fc.config.Loggers.Fluentd.RemotePort) connTimeout := time.Duration(fc.config.Loggers.Fluentd.ConnectTimeout) * time.Second // make the connection - var conn net.Conn + var c *client.Client var err error switch fc.transport { @@ -172,11 +138,23 @@ func (fc *FluentdClient) ConnectToRemote() { address = fc.config.Loggers.Fluentd.SockPath } fc.LogInfo("connecting to %s://%s", fc.transport, address) - conn, err = net.DialTimeout(fc.transport, address, connTimeout) + c = client.New(client.ConnectionOptions{ + Factory: &client.ConnFactory{ + Network: "unix", + Address: address, + }, + ConnectionTimeout: connTimeout, + }) case netlib.SocketTCP: fc.LogInfo("connecting to %s://%s", fc.transport, address) - conn, err = net.DialTimeout(fc.transport, address, connTimeout) + c = client.New(client.ConnectionOptions{ + Factory: &client.ConnFactory{ + Network: "tcp", + Address: address, + }, + ConnectionTimeout: connTimeout, + }) case netlib.SocketTLS: fc.LogInfo("connecting to %s://%s", fc.transport, address) @@ -190,17 +168,23 @@ func (fc *FluentdClient) ConnectToRemote() { CertFile: fc.config.Loggers.Fluentd.CertFile, KeyFile: fc.config.Loggers.Fluentd.KeyFile, } + tlsConfig, _ = pkgconfig.TLSClientConfig(tlsOptions) + + c = client.New(client.ConnectionOptions{ + Factory: &client.ConnFactory{ + Network: "tcp+tls", + Address: address, + TLSConfig: tlsConfig, + }, + ConnectionTimeout: connTimeout, + }) - tlsConfig, err = pkgconfig.TLSClientConfig(tlsOptions) - if err == nil { - dialer := &net.Dialer{Timeout: connTimeout} - conn, err = tls.DialWithDialer(dialer, netlib.SocketTCP, address, tlsConfig) - } default: fc.logger.Fatal("logger=fluent - invalid transport:", fc.transport) } // something is wrong during connection ? + err = c.Connect() if err != nil { fc.LogError("connect error: %s", err) fc.LogInfo("retry to connect in %d seconds", fc.config.Loggers.Fluentd.RetryInterval) @@ -208,7 +192,8 @@ func (fc *FluentdClient) ConnectToRemote() { continue } - fc.transportConn = conn + // save current connection + fc.fluentConn = c // block until framestream is ready fc.transportReady <- true @@ -220,37 +205,32 @@ func (fc *FluentdClient) ConnectToRemote() { func (fc *FluentdClient) FlushBuffer(buf *[]dnsutils.DNSMessage) { - tag, _ := msgpack.Marshal(fc.config.Loggers.Fluentd.Tag) + entries := []protocol.EntryExt{} for _, dm := range *buf { - // prepare event - tm, _ := msgpack.Marshal(dm.DNSTap.TimeSec) - record, err := msgpack.Marshal(dm) - if err != nil { - fc.LogError("msgpack error:", err.Error()) - continue - } + // Convert DNSMessage to map[] + flatDm, _ := dm.Flatten() - // Message ::= [ Tag, Time, Record, Option? ] - encoded := []byte{} - // array, size 3 - encoded = append(encoded, 0x93) - // append tag, time and record - encoded = append(encoded, tag...) - encoded = append(encoded, tm...) - encoded = append(encoded, record...) + // get timestamp from DNSMessage + timestamp, _ := time.Parse(time.RFC3339, dm.DNSTap.TimestampRFC3339) - // write event message - _, err = fc.transportConn.Write(encoded) + // append DNSMessage to the list + entries = append(entries, protocol.EntryExt{ + Timestamp: protocol.EventTime{Time: timestamp}, + Record: flatDm, + }) + } - // flusth the buffer - if err != nil { - fc.LogError("send transport error", err.Error()) - fc.writerReady = false - <-fc.transportReconnect - break - } + // send all entries with tag, check error on write ? + err := fc.fluentConn.SendForward(fc.config.Loggers.Fluentd.Tag, entries) + if err != nil { + fc.LogError("forward fluent error", err.Error()) + fc.writerReady = false + <-fc.transportReconnect } + + // reset buffer + *buf = nil } func (fc *FluentdClient) Run() { @@ -334,9 +314,6 @@ PROCESS_LOOP: fc.LogInfo("connected") fc.writerReady = true - // read from the connection until we stop - go fc.ReadFromConnection() - // incoming dns message to process case dm, opened := <-fc.outputChan: if !opened { diff --git a/loggers/fluentd_test.go b/loggers/fluentd_test.go index 1afe8a7d..77d6e0fb 100644 --- a/loggers/fluentd_test.go +++ b/loggers/fluentd_test.go @@ -1,33 +1,41 @@ package loggers import ( + "bytes" "net" "testing" "time" + "github.com/IBM/fluent-forward-go/fluent/protocol" "github.com/dmachard/go-dnscollector/dnsutils" "github.com/dmachard/go-dnscollector/netlib" "github.com/dmachard/go-dnscollector/pkgconfig" "github.com/dmachard/go-logger" - "github.com/vmihailenco/msgpack" + "github.com/tinylib/msgp/msgp" ) func Test_FluentdClient(t *testing.T) { testcases := []struct { - transport string - address string + name string + transport string + address string + bufferSize int + flushInterval int }{ { - transport: netlib.SocketTCP, - address: ":24224", + name: "with_buffer", + transport: netlib.SocketTCP, + address: ":24224", + bufferSize: 100, + flushInterval: 1, }, } for _, tc := range testcases { - t.Run(tc.transport, func(t *testing.T) { + t.Run(tc.name, func(t *testing.T) { // init logger cfg := pkgconfig.GetFakeConfig() - cfg.Loggers.Fluentd.FlushInterval = 1 - cfg.Loggers.Fluentd.BufferSize = 0 + cfg.Loggers.Fluentd.FlushInterval = tc.flushInterval + cfg.Loggers.Fluentd.BufferSize = tc.bufferSize g := NewFluentdClient(cfg, logger.New(false), "test") // fake msgpack receiver @@ -46,27 +54,73 @@ func Test_FluentdClient(t *testing.T) { return } defer conn.Close() + time.Sleep(time.Second) // send fake dns message to logger - time.Sleep(time.Second) dm := dnsutils.GetFakeDNSMessage() - g.GetInputChannel() <- dm + maxDm := 256 + for i := 0; i < maxDm; i++ { + g.GetInputChannel() <- dm + } + time.Sleep(time.Second) // read data on fake server side - buf := make([]byte, 4096) - _, err = conn.Read(buf) - if err != nil { - t.Errorf("error to read msgpack: %s", err) + nb := 0 + bytesSize := 0 + conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + fullBuffer := make([]byte, 0) + for { + buf := make([]byte, 4096) + n, _ := conn.Read(buf) + if n == 0 { + break + } + bytesSize += n + fullBuffer = append(fullBuffer, buf[:n]...) } - // unpack msgpack - var dmRcv dnsutils.DNSMessage - err = msgpack.Unmarshal(buf[24:], &dmRcv) - if err != nil { - t.Errorf("error to unpack msgpack: %s", err) + // code msgpack + msgpr := msgp.NewReader(bytes.NewReader(fullBuffer[:bytesSize])) + for { + sz, err := msgpr.ReadArrayHeader() + if err != nil { + t.Errorf("decode Array Header failed: %v", err) + break + } + if sz != 3 { + t.Errorf("decode expect 3 elements: %d", sz) + break + } + tag, err := msgpr.ReadString() + if err != nil { + t.Errorf("Decode tag: %v", err) + break + } + if tag != "dns.collector" { + t.Errorf("invalid tag: %s", tag) + break + } + + entries := protocol.EntryList{} + if err = entries.DecodeMsg(msgpr); err != nil { + t.Errorf("decode Entries: %v", err) + break + } + nb += len(entries) + + options := &protocol.MessageOptions{} + if err = options.DecodeMsg(msgpr); err != nil { + t.Errorf("decode options: %v", err) + break + } + + if msgpr.Buffered() == 0 { + break + } } - if dm.DNS.Qname != dmRcv.DNS.Qname { - t.Errorf("qname error want %s, got %s", dm.DNS.Qname, dmRcv.DNS.Qname) + + if nb != maxDm { + t.Errorf("invalid numer of msgpack: expected=%d received=%d", maxDm, nb) } // stop all