From bb1d64dee68f983f2040eb511acd4ecd2903d1be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Rath?= <50269951+superstes@users.noreply.github.com> Date: Thu, 14 Dec 2023 22:25:47 +0100 Subject: [PATCH] Log: Add 'country' to geoip pipeline-stage (#5924) * Add 'country' to log pipeline-stage geoip * Add tests for log pipeline-stage geoip * Add 'country' to log pipeline-stage geoip * Lint-fix for tests of pipeline-stage geoip --- CHANGELOG.md | 2 + component/loki/process/stages/geoip.go | 68 ++++++- component/loki/process/stages/geoip_test.go | 168 ++++++++++++++++++ .../stages/testdata/geoip_maxmind_asn.mmdb | Bin 0 -> 435 bytes .../stages/testdata/geoip_maxmind_city.mmdb | Bin 0 -> 1439 bytes .../testdata/geoip_maxmind_country.mmdb | Bin 0 -> 1331 bytes .../process/stages/testdata/geoip_source.json | 14 ++ .../flow/reference/components/loki.process.md | 40 ++++- 8 files changed, 282 insertions(+), 10 deletions(-) create mode 100644 component/loki/process/stages/testdata/geoip_maxmind_asn.mmdb create mode 100644 component/loki/process/stages/testdata/geoip_maxmind_city.mmdb create mode 100644 component/loki/process/stages/testdata/geoip_maxmind_country.mmdb create mode 100644 component/loki/process/stages/testdata/geoip_source.json diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ff616c25480..a63fddbbc7ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,8 @@ Main (unreleased) - Added support for `loki.write` to flush WAL on agent shutdown. (@thepalbi) +- Added 'country' mmdb-type to log pipeline-stage geoip. (@superstes) + ### Bugfixes - Update `pyroscope.ebpf` to fix a logical bug causing to profile to many kthreads instead of regular processes https://github.com/grafana/pyroscope/pull/2778 (@korniltsev) diff --git a/component/loki/process/stages/geoip.go b/component/loki/process/stages/geoip.go index 0b4787bd0e2a..a157e236d9b9 100644 --- a/component/loki/process/stages/geoip.go +++ b/component/loki/process/stages/geoip.go @@ -27,6 +27,7 @@ type GeoIPFields int const ( CITYNAME GeoIPFields = iota COUNTRYNAME + COUNTRYCODE CONTINENTNAME CONTINENTCODE LOCATION @@ -34,11 +35,14 @@ const ( TIMEZONE SUBDIVISIONNAME SUBDIVISIONCODE + ASN + ASNORG ) var fields = map[GeoIPFields]string{ CITYNAME: "geoip_city_name", COUNTRYNAME: "geoip_country_name", + COUNTRYCODE: "geoip_country_code", CONTINENTNAME: "geoip_continent_name", CONTINENTCODE: "geoip_continent_code", LOCATION: "geoip_location", @@ -46,6 +50,8 @@ var fields = map[GeoIPFields]string{ TIMEZONE: "geoip_timezone", SUBDIVISIONNAME: "geoip_subdivision_name", SUBDIVISIONCODE: "geoip_subdivision_code", + ASN: "geoip_autonomous_system_number", + ASNORG: "geoip_autonomous_system_organization", } // GeoIPConfig represents GeoIP stage config @@ -69,7 +75,7 @@ func validateGeoIPConfig(c GeoIPConfig) (map[string]*jmespath.JMESPath, error) { } switch c.DBType { - case "", "asn", "city": + case "", "asn", "city", "country": default: return nil, ErrEmptyDBTypeGeoIPStageConfig } @@ -182,6 +188,14 @@ func (g *geoIPStage) process(_ model.LabelSet, extracted map[string]interface{}) return } g.populateExtractedWithASNData(extracted, &record) + case "country": + var record geoip2.Country + err := g.mmdb.Lookup(ip, &record) + if err != nil { + level.Error(g.logger).Log("msg", "unable to get Country record for the ip", "err", err, "ip", ip) + return + } + g.populateExtractedWithCountryData(extracted, &record) default: level.Error(g.logger).Log("msg", "unknown database type") } @@ -210,6 +224,11 @@ func (g *geoIPStage) populateExtractedWithCityData(extracted map[string]interfac if contryName != "" { extracted[label] = contryName } + case COUNTRYCODE: + contryCode := record.Country.IsoCode + if contryCode != "" { + extracted[label] = contryCode + } case CONTINENTNAME: continentName := record.Continent.Names["en"] if continentName != "" { @@ -252,20 +271,51 @@ func (g *geoIPStage) populateExtractedWithCityData(extracted map[string]interfac extracted[label] = subdivisionCode } } - default: - level.Error(g.logger).Log("msg", "unknown geoip field") } } } func (g *geoIPStage) populateExtractedWithASNData(extracted map[string]interface{}, record *geoip2.ASN) { - autonomousSystemNumber := record.AutonomousSystemNumber - autonomousSystemOrganization := record.AutonomousSystemOrganization - if autonomousSystemNumber != 0 { - extracted["geoip_autonomous_system_number"] = autonomousSystemNumber + for field, label := range fields { + switch field { + case ASN: + autonomousSystemNumber := record.AutonomousSystemNumber + if autonomousSystemNumber != 0 { + extracted[label] = autonomousSystemNumber + } + case ASNORG: + autonomousSystemOrganization := record.AutonomousSystemOrganization + if autonomousSystemOrganization != "" { + extracted[label] = autonomousSystemOrganization + } + } } - if autonomousSystemOrganization != "" { - extracted["geoip_autonomous_system_organization"] = autonomousSystemOrganization +} + +func (g *geoIPStage) populateExtractedWithCountryData(extracted map[string]interface{}, record *geoip2.Country) { + for field, label := range fields { + switch field { + case COUNTRYNAME: + contryName := record.Country.Names["en"] + if contryName != "" { + extracted[label] = contryName + } + case COUNTRYCODE: + contryCode := record.Country.IsoCode + if contryCode != "" { + extracted[label] = contryCode + } + case CONTINENTNAME: + continentName := record.Continent.Names["en"] + if continentName != "" { + extracted[label] = continentName + } + case CONTINENTCODE: + continentCode := record.Continent.Code + if continentCode != "" { + extracted[label] = continentCode + } + } } } diff --git a/component/loki/process/stages/geoip_test.go b/component/loki/process/stages/geoip_test.go index 2e53afa025f8..26d1802f74de 100644 --- a/component/loki/process/stages/geoip_test.go +++ b/component/loki/process/stages/geoip_test.go @@ -2,11 +2,21 @@ package stages import ( "errors" + "fmt" + "net" "testing" + util_log "github.com/grafana/loki/pkg/util/log" + "github.com/oschwald/geoip2-golang" + "github.com/oschwald/maxminddb-golang" "github.com/stretchr/testify/require" ) +var ( + geoipTestIP string = "192.0.2.1" + geoipTestSource string = "dummy" +) + func Test_ValidateConfigs(t *testing.T) { source := "ip" tests := []struct { @@ -21,6 +31,14 @@ func Test_ValidateConfigs(t *testing.T) { }, nil, }, + { + GeoIPConfig{ + DB: "test", + Source: &source, + DBType: "country", + }, + nil, + }, { GeoIPConfig{ DB: "test", @@ -81,3 +99,153 @@ func Test_ValidateConfigs(t *testing.T) { } } } + +/* + NOTE: + database schema: https://github.com/maxmind/MaxMind-DB/tree/main/source-data + Script used to build the minimal binaries: https://github.com/vimt/MaxMind-DB-Writer-python +*/ + +func Test_MaxmindAsn(t *testing.T) { + mmdb, err := maxminddb.Open("testdata/geoip_maxmind_asn.mmdb") + if err != nil { + t.Error(err) + return + } + defer mmdb.Close() + + var record geoip2.ASN + err = mmdb.Lookup(net.ParseIP(geoipTestIP), &record) + if err != nil { + t.Error(err) + } + + config := GeoIPConfig{ + DB: "test", + Source: &geoipTestSource, + DBType: "asn", + } + valuesExpressions, err := validateGeoIPConfig(config) + if err != nil { + t.Errorf("Error validating test-config: %v", err) + } + testStage := &geoIPStage{ + mmdb: mmdb, + logger: util_log.Logger, + valuesExpressions: valuesExpressions, + cfgs: config, + } + + extracted := map[string]interface{}{} + testStage.populateExtractedWithASNData(extracted, &record) + + for _, field := range []string{ + fields[ASN], + fields[ASNORG], + } { + _, present := extracted[field] + if !present { + t.Errorf("GeoIP label %v not present", field) + } + } +} + +func Test_MaxmindCity(t *testing.T) { + mmdb, err := maxminddb.Open("testdata/geoip_maxmind_city.mmdb") + if err != nil { + t.Error(err) + return + } + defer mmdb.Close() + + var record geoip2.City + err = mmdb.Lookup(net.ParseIP(geoipTestIP), &record) + if err != nil { + t.Error(err) + } + + config := GeoIPConfig{ + DB: "test", + Source: &geoipTestSource, + DBType: "city", + } + valuesExpressions, err := validateGeoIPConfig(config) + if err != nil { + t.Errorf("Error validating test-config: %v", err) + } + testStage := &geoIPStage{ + mmdb: mmdb, + logger: util_log.Logger, + valuesExpressions: valuesExpressions, + cfgs: config, + } + + extracted := map[string]interface{}{} + testStage.populateExtractedWithCityData(extracted, &record) + + for _, field := range []string{ + fields[COUNTRYNAME], + fields[COUNTRYCODE], + fields[CONTINENTNAME], + fields[CONTINENTCODE], + fields[CITYNAME], + fmt.Sprintf("%s_latitude", fields[LOCATION]), + fmt.Sprintf("%s_longitude", fields[LOCATION]), + fields[POSTALCODE], + fields[TIMEZONE], + fields[SUBDIVISIONNAME], + fields[SUBDIVISIONCODE], + fields[COUNTRYNAME], + } { + _, present := extracted[field] + if !present { + t.Errorf("GeoIP label %v not present", field) + } + } +} + +func Test_MaxmindCountry(t *testing.T) { + mmdb, err := maxminddb.Open("testdata/geoip_maxmind_country.mmdb") + if err != nil { + t.Error(err) + return + } + defer mmdb.Close() + + var record geoip2.Country + err = mmdb.Lookup(net.ParseIP(geoipTestIP), &record) + if err != nil { + t.Error(err) + } + + config := GeoIPConfig{ + DB: "test", + Source: &geoipTestSource, + DBType: "country", + } + valuesExpressions, err := validateGeoIPConfig(config) + if err != nil { + t.Errorf("Error validating test-config: %v", err) + } + testStage := &geoIPStage{ + mmdb: mmdb, + logger: util_log.Logger, + valuesExpressions: valuesExpressions, + cfgs: config, + } + + extracted := map[string]interface{}{} + testStage.populateExtractedWithCountryData(extracted, &record) + + for _, field := range []string{ + fields[COUNTRYNAME], + fields[COUNTRYCODE], + fields[CONTINENTNAME], + fields[CONTINENTCODE], + } { + _, present := extracted[field] + if !present { + t.Errorf("GeoIP label %v not present", field) + } + } +} diff --git a/component/loki/process/stages/testdata/geoip_maxmind_asn.mmdb b/component/loki/process/stages/testdata/geoip_maxmind_asn.mmdb new file mode 100644 index 0000000000000000000000000000000000000000..4abd5e255736d5c4166b0a7f005a697f302ce0de GIT binary patch literal 435 zcmZY4J5B>Z3;<9kZ214bk33z%H^&>@vH;uCib`X#jw$hK9W-R=F8qr)DWZp1EmNG>vE(Dmh&bsB@zcw^o7lIq{CGC$pi aG>fWNNGff9om5Gmy`%;C+^KT>(c2fYD{hJa literal 0 HcmV?d00001 diff --git a/component/loki/process/stages/testdata/geoip_maxmind_city.mmdb b/component/loki/process/stages/testdata/geoip_maxmind_city.mmdb new file mode 100644 index 0000000000000000000000000000000000000000..72d5e818ec7fe7070e0275dad7e2b31741dfa411 GIT binary patch literal 1439 zcmZ9MUu+ab9LE>D+JY6q`lm01-^?5d$ytpS3@;?~j=OTbwkM?p8-(q4cQ}^Z&0Kc( znmYo)^nw9x6ij&{L`zI0Rt!)yh_+(-L`=hy^3oR&x0K3@JQN;Cn)ur(A<;|bGr#$> z`}=-petVkMsAE2#g`RVDZZk(T5*lytBMVZ zuPMH+_=X~8e^ZMH6>Itlby|INP$N0NsQG$lG=C^h%}w{ur6!|*&b!)9oK6lAgQ#=aN! zK@sCJ9DyMihO+_GLjyFzdWgYUxZY~YlJJ}g+??F7=_{D&wq4y8IuFHiL+B-8O96Ma zT6nVEcBQTyKaID>NzLwE+~Kyca~^ugO#Xxsj?DvPWoM7DJ#In97CB26NHo)h8ztbh zCCR2tm#i420&l`GNwf!p-vom{1l8|?!Jo;FkNhwFAN^natNt~#c?r#4y*21x^vALP zg7l^4Kf84Q%s4rUR>?|eAyLSZP0&pe&`Wke5#uuHgCTO19G^v-GjJE4z*ABKF>;)Y zb~!v}yB-R+(28AfNoj? zy|e*EFF={bV2Ey{TTnEaekOVbVss1bOL>lAdoFdm@Nx_xMWrB1u3mIxxk3*wtEH!U zaKS~b`8=L`xG-Q_+@Yu2)B_dMa6Nhh&CG-rY+06vZfw150E*a` z*)cZ!7c{cH5M#q^tlcg3TlRqMqHmYs&7c2+O(l|@t!%nADROy3SZt;(9Y$Crj=_Sq z798+!z}cf&OfrK>0!;D<69jBAz?V*hg&>u{5dk_p;JU3!h) zU`=c*%d%Z;j7>z>Mc0o`{nKHTJ8WUSZOX#mZ36#FmP+#X#*D*F=~%jJAL1j8ZFW%~ z;7%xcB+_9Sp3!f(T=yzPo@wPW{lR-p2|T?tiqDmd9Cx+Io_<>xPDS4@or2-%1>>M} qp1))H|5x$Te7^d1roUw8Eu9yoc_6Z!7xvU7%Q6vwxzLp}(F5I%gVcr(*Vm5f0p4a%ibv#FgpPGaKNR2Hb4&F<9M%HCP+ zt}8ogRT@hug$NSVoT>_pkf;cupP&**DDtTX)UDb}E-kQW5_;j1_E4#U@g%AQE6s1- z%$wc)|7YGvQc#k%Nj26<(smI91nUK#5o{1_6l@Y~7JOE)MesSn9fHpbz985t_@ZE& z;7fup3%(-A|Ng49{T~ra|9N5>GhM@^rVG+Rjp-Cx(#55;mkU;Lnm}W6cW5o@je5G z!QycdPQVnLfpb2zKpO;MH-z9ET<_6X!F25s+^p{D_#Bo~hNBp!LTh5FYAOZOU?$v? z8sJf{;V?xwek}LMsQ%Eq)TV}MWL@qdUj2bbe5{S3=E1aSxKxL@VP;!D3=5f0|=PfxB_KI9T{Thyy~=e=nhimokvQk&UV6MdJ9Ug4rQAcgB8 zgWI_14JhIeOyPZaKNpQ|Z;4)p5Z;eRqOPqPu7jNv zPdHp_F30!1mN;hU)W)-Y;)Rl?Ixe2~@o_wZXL+0FZJ{^EG}Yy&i*F=UO)J={R#I$L zHwq5EQ_=BiZ;q$kf==;+K%+dk<0JeqmSbjibM8r?adBzxZPFBT4PN!*%%r4sKomKB zv!9tdGs(^sdpMIw9|5<)ro@T#@j~0xau273