From 12e7389450044a58e2d2ed6239ec9bbc708c4c3b Mon Sep 17 00:00:00 2001 From: Rath Rene Date: Wed, 6 Dec 2023 16:03:44 +0100 Subject: [PATCH 1/4] Add 'country' to log pipeline-stage geoip --- component/loki/process/stages/geoip.go | 68 ++++++++++++++++--- .../flow/reference/components/loki.process.md | 40 ++++++++++- 2 files changed, 98 insertions(+), 10 deletions(-) diff --git a/component/loki/process/stages/geoip.go b/component/loki/process/stages/geoip.go index 0b4787bd0e2a..a157e236d9b9 100644 --- a/component/loki/process/stages/geoip.go +++ b/component/loki/process/stages/geoip.go @@ -27,6 +27,7 @@ type GeoIPFields int const ( CITYNAME GeoIPFields = iota COUNTRYNAME + COUNTRYCODE CONTINENTNAME CONTINENTCODE LOCATION @@ -34,11 +35,14 @@ const ( TIMEZONE SUBDIVISIONNAME SUBDIVISIONCODE + ASN + ASNORG ) var fields = map[GeoIPFields]string{ CITYNAME: "geoip_city_name", COUNTRYNAME: "geoip_country_name", + COUNTRYCODE: "geoip_country_code", CONTINENTNAME: "geoip_continent_name", CONTINENTCODE: "geoip_continent_code", LOCATION: "geoip_location", @@ -46,6 +50,8 @@ var fields = map[GeoIPFields]string{ TIMEZONE: "geoip_timezone", SUBDIVISIONNAME: "geoip_subdivision_name", SUBDIVISIONCODE: "geoip_subdivision_code", + ASN: "geoip_autonomous_system_number", + ASNORG: "geoip_autonomous_system_organization", } // GeoIPConfig represents GeoIP stage config @@ -69,7 +75,7 @@ func validateGeoIPConfig(c GeoIPConfig) (map[string]*jmespath.JMESPath, error) { } switch c.DBType { - case "", "asn", "city": + case "", "asn", "city", "country": default: return nil, ErrEmptyDBTypeGeoIPStageConfig } @@ -182,6 +188,14 @@ func (g *geoIPStage) process(_ model.LabelSet, extracted map[string]interface{}) return } g.populateExtractedWithASNData(extracted, &record) + case "country": + var record geoip2.Country + err := g.mmdb.Lookup(ip, &record) + if err != nil { + level.Error(g.logger).Log("msg", "unable to get Country record for the ip", "err", err, "ip", ip) + return + } + g.populateExtractedWithCountryData(extracted, &record) default: level.Error(g.logger).Log("msg", "unknown database type") } @@ -210,6 +224,11 @@ func (g *geoIPStage) populateExtractedWithCityData(extracted map[string]interfac if contryName != "" { extracted[label] = contryName } + case COUNTRYCODE: + contryCode := record.Country.IsoCode + if contryCode != "" { + extracted[label] = contryCode + } case CONTINENTNAME: continentName := record.Continent.Names["en"] if continentName != "" { @@ -252,20 +271,51 @@ func (g *geoIPStage) populateExtractedWithCityData(extracted map[string]interfac extracted[label] = subdivisionCode } } - default: - level.Error(g.logger).Log("msg", "unknown geoip field") } } } func (g *geoIPStage) populateExtractedWithASNData(extracted map[string]interface{}, record *geoip2.ASN) { - autonomousSystemNumber := record.AutonomousSystemNumber - autonomousSystemOrganization := record.AutonomousSystemOrganization - if autonomousSystemNumber != 0 { - extracted["geoip_autonomous_system_number"] = autonomousSystemNumber + for field, label := range fields { + switch field { + case ASN: + autonomousSystemNumber := record.AutonomousSystemNumber + if autonomousSystemNumber != 0 { + extracted[label] = autonomousSystemNumber + } + case ASNORG: + autonomousSystemOrganization := record.AutonomousSystemOrganization + if autonomousSystemOrganization != "" { + extracted[label] = autonomousSystemOrganization + } + } } - if autonomousSystemOrganization != "" { - extracted["geoip_autonomous_system_organization"] = autonomousSystemOrganization +} + +func (g *geoIPStage) populateExtractedWithCountryData(extracted map[string]interface{}, record *geoip2.Country) { + for field, label := range fields { + switch field { + case COUNTRYNAME: + contryName := record.Country.Names["en"] + if contryName != "" { + extracted[label] = contryName + } + case COUNTRYCODE: + contryCode := record.Country.IsoCode + if contryCode != "" { + extracted[label] = contryCode + } + case CONTINENTNAME: + continentName := record.Continent.Names["en"] + if continentName != "" { + extracted[label] = continentName + } + case CONTINENTCODE: + continentCode := record.Continent.Code + if continentCode != "" { + extracted[label] = continentCode + } + } } } diff --git a/docs/sources/flow/reference/components/loki.process.md b/docs/sources/flow/reference/components/loki.process.md index 9af6bea9e703..c2793abbfe2e 100644 --- a/docs/sources/flow/reference/components/loki.process.md +++ b/docs/sources/flow/reference/components/loki.process.md @@ -1538,7 +1538,7 @@ The following arguments are supported: | ---------------- | ------------- | -------------------------------------------------- | ------- | -------- | | `db` | `string` | Path to the Maxmind DB file. | | yes | | `source` | `string` | IP from extracted data to parse. | | yes | -| `db_type` | `string` | Maxmind DB type. Allowed values are "city", "asn". | | no | +| `db_type` | `string` | Maxmind DB type. Allowed values are "city", "asn", "country". | | no | | `custom_lookups` | `map(string)` | Key-value pairs of JMESPath expressions. | | no | @@ -1562,6 +1562,7 @@ loki.process "example" { values = { geoip_city_name = "", geoip_country_name = "", + geoip_country_code = "", geoip_continent_name = "", geoip_continent_code = "", geoip_location_latitude = "", @@ -1582,6 +1583,7 @@ The extracted data from the IP used in this example: - geoip_city_name: Kansas City - geoip_country_name: United States +- geoip_country_code: US - geoip_continent_name: North America - geoip_continent_code: NA - geoip_location_latitude: 39.1027 @@ -1622,6 +1624,42 @@ The extracted data from the IP used in this example: - geoip_autonomous_system_number: 396982 - geoip_autonomous_system_organization: GOOGLE-CLOUD-PLATFORM +#### GeoIP with Country database example: + +``` +{"log":"log message","client_ip":"34.120.177.193"} + +loki.process "example" { + stage.json { + expressions = {ip = "client_ip"} + } + + stage.geoip { + source = "ip" + db = "/path/to/db/GeoLite2-Country.mmdb" + db_type = "country" + } + + stage.labels { + values = { + geoip_country_name = "", + geoip_country_code = "", + geoip_continent_name = "", + geoip_continent_code = "", + } + } +} +``` + +The `json` stage extracts the IP address from the `client_ip` key in the log line. +Then the extracted `ip` value is given as source to geoip stage. The geoip stage performs a lookup on the IP and populates the following fields in the shared map which are added as labels using the `labels` stage. + +The extracted data from the IP used in this example: + +- geoip_country_name: United States +- geoip_country_code: US +- geoip_continent_name: North America +- geoip_continent_code: NA #### GeoIP with custom fields example From 3f2cf721d306b267bd33202bb22796bde178e854 Mon Sep 17 00:00:00 2001 From: Rath Rene Date: Wed, 6 Dec 2023 16:04:30 +0100 Subject: [PATCH 2/4] Add tests for log pipeline-stage geoip --- component/loki/process/stages/geoip_test.go | 169 ++++++++++++++++++ .../stages/testdata/geoip_maxmind_asn.mmdb | Bin 0 -> 435 bytes .../stages/testdata/geoip_maxmind_city.mmdb | Bin 0 -> 1439 bytes .../testdata/geoip_maxmind_country.mmdb | Bin 0 -> 1331 bytes .../process/stages/testdata/geoip_source.json | 14 ++ 5 files changed, 183 insertions(+) create mode 100644 component/loki/process/stages/testdata/geoip_maxmind_asn.mmdb create mode 100644 component/loki/process/stages/testdata/geoip_maxmind_city.mmdb create mode 100644 component/loki/process/stages/testdata/geoip_maxmind_country.mmdb create mode 100644 component/loki/process/stages/testdata/geoip_source.json diff --git a/component/loki/process/stages/geoip_test.go b/component/loki/process/stages/geoip_test.go index 2e53afa025f8..4c927fc24b3e 100644 --- a/component/loki/process/stages/geoip_test.go +++ b/component/loki/process/stages/geoip_test.go @@ -2,8 +2,13 @@ package stages import ( "errors" + "fmt" + "net" "testing" + util_log "github.com/grafana/loki/pkg/util/log" + "github.com/oschwald/geoip2-golang" + "github.com/oschwald/maxminddb-golang" "github.com/stretchr/testify/require" ) @@ -21,6 +26,14 @@ func Test_ValidateConfigs(t *testing.T) { }, nil, }, + { + GeoIPConfig{ + DB: "test", + Source: &source, + DBType: "country", + }, + nil, + }, { GeoIPConfig{ DB: "test", @@ -81,3 +94,159 @@ func Test_ValidateConfigs(t *testing.T) { } } } + +/* + NOTE: + database schema: https://github.com/maxmind/MaxMind-DB/tree/main/source-data + Script used to build the minimal binaries: https://github.com/vimt/MaxMind-DB-Writer-python +*/ + +func Test_MaxmindAsn(t *testing.T) { + mmdb, err := maxminddb.Open("testdata/geoip_maxmind_asn.mmdb") + if err != nil { + t.Error(err) + return + } + defer mmdb.Close() + ip := "192.0.2.1" + + var record geoip2.ASN + err = mmdb.Lookup(net.ParseIP(ip), &record) + if err != nil { + t.Error(err) + } + + source := "dummy" + config := GeoIPConfig{ + DB: "test", + Source: &source, + DBType: "asn", + } + valuesExpressions, err := validateGeoIPConfig(config) + if err != nil { + t.Errorf("Error validating test-config: %v", err) + } + testStage := &geoIPStage{ + mmdb: mmdb, + logger: util_log.Logger, + valuesExpressions: valuesExpressions, + cfgs: config, + } + + extracted := map[string]interface{}{} + testStage.populateExtractedWithASNData(extracted, &record) + + for _, field := range []string{ + fields[ASN], + fields[ASNORG], + } { + _, present := extracted[field] + if !present { + t.Errorf("GeoIP label %v not present", field) + } + } +} + +func Test_MaxmindCity(t *testing.T) { + mmdb, err := maxminddb.Open("testdata/geoip_maxmind_city.mmdb") + if err != nil { + t.Error(err) + return + } + defer mmdb.Close() + ip := "192.0.2.1" + + var record geoip2.City + err = mmdb.Lookup(net.ParseIP(ip), &record) + if err != nil { + t.Error(err) + } + + source := "dummy" + config := GeoIPConfig{ + DB: "test", + Source: &source, + DBType: "city", + } + valuesExpressions, err := validateGeoIPConfig(config) + if err != nil { + t.Errorf("Error validating test-config: %v", err) + } + testStage := &geoIPStage{ + mmdb: mmdb, + logger: util_log.Logger, + valuesExpressions: valuesExpressions, + cfgs: config, + } + + extracted := map[string]interface{}{} + testStage.populateExtractedWithCityData(extracted, &record) + + for _, field := range []string{ + fields[COUNTRYNAME], + fields[COUNTRYCODE], + fields[CONTINENTNAME], + fields[CONTINENTCODE], + fields[CITYNAME], + fmt.Sprintf("%s_latitude", fields[LOCATION]), + fmt.Sprintf("%s_longitude", fields[LOCATION]), + fields[POSTALCODE], + fields[TIMEZONE], + fields[SUBDIVISIONNAME], + fields[SUBDIVISIONCODE], + fields[COUNTRYNAME], + } { + _, present := extracted[field] + if !present { + t.Errorf("GeoIP label %v not present", field) + } + } +} + +func Test_MaxmindCountry(t *testing.T) { + mmdb, err := maxminddb.Open("testdata/geoip_maxmind_country.mmdb") + if err != nil { + t.Error(err) + return + } + defer mmdb.Close() + ip := "192.0.2.1" + + var record geoip2.Country + err = mmdb.Lookup(net.ParseIP(ip), &record) + if err != nil { + t.Error(err) + } + + source := "dummy" + config := GeoIPConfig{ + DB: "test", + Source: &source, + DBType: "country", + } + valuesExpressions, err := validateGeoIPConfig(config) + if err != nil { + t.Errorf("Error validating test-config: %v", err) + } + testStage := &geoIPStage{ + mmdb: mmdb, + logger: util_log.Logger, + valuesExpressions: valuesExpressions, + cfgs: config, + } + + extracted := map[string]interface{}{} + testStage.populateExtractedWithCountryData(extracted, &record) + + for _, field := range []string{ + fields[COUNTRYNAME], + fields[COUNTRYCODE], + fields[CONTINENTNAME], + fields[CONTINENTCODE], + } { + _, present := extracted[field] + if !present { + t.Errorf("GeoIP label %v not present", field) + } + } +} diff --git a/component/loki/process/stages/testdata/geoip_maxmind_asn.mmdb b/component/loki/process/stages/testdata/geoip_maxmind_asn.mmdb new file mode 100644 index 0000000000000000000000000000000000000000..4abd5e255736d5c4166b0a7f005a697f302ce0de GIT binary patch literal 435 zcmZY4J5B>Z3;<9kZ214bk33z%H^&>@vH;uCib`X#jw$hK9W-R=F8qr)DWZp1EmNG>vE(Dmh&bsB@zcw^o7lIq{CGC$pi aG>fWNNGff9om5Gmy`%;C+^KT>(c2fYD{hJa literal 0 HcmV?d00001 diff --git a/component/loki/process/stages/testdata/geoip_maxmind_city.mmdb b/component/loki/process/stages/testdata/geoip_maxmind_city.mmdb new file mode 100644 index 0000000000000000000000000000000000000000..72d5e818ec7fe7070e0275dad7e2b31741dfa411 GIT binary patch literal 1439 zcmZ9MUu+ab9LE>D+JY6q`lm01-^?5d$ytpS3@;?~j=OTbwkM?p8-(q4cQ}^Z&0Kc( znmYo)^nw9x6ij&{L`zI0Rt!)yh_+(-L`=hy^3oR&x0K3@JQN;Cn)ur(A<;|bGr#$> z`}=-petVkMsAE2#g`RVDZZk(T5*lytBMVZ zuPMH+_=X~8e^ZMH6>Itlby|INP$N0NsQG$lG=C^h%}w{ur6!|*&b!)9oK6lAgQ#=aN! zK@sCJ9DyMihO+_GLjyFzdWgYUxZY~YlJJ}g+??F7=_{D&wq4y8IuFHiL+B-8O96Ma zT6nVEcBQTyKaID>NzLwE+~Kyca~^ugO#Xxsj?DvPWoM7DJ#In97CB26NHo)h8ztbh zCCR2tm#i420&l`GNwf!p-vom{1l8|?!Jo;FkNhwFAN^natNt~#c?r#4y*21x^vALP zg7l^4Kf84Q%s4rUR>?|eAyLSZP0&pe&`Wke5#uuHgCTO19G^v-GjJE4z*ABKF>;)Y zb~!v}yB-R+(28AfNoj? zy|e*EFF={bV2Ey{TTnEaekOVbVss1bOL>lAdoFdm@Nx_xMWrB1u3mIxxk3*wtEH!U zaKS~b`8=L`xG-Q_+@Yu2)B_dMa6Nhh&CG-rY+06vZfw150E*a` z*)cZ!7c{cH5M#q^tlcg3TlRqMqHmYs&7c2+O(l|@t!%nADROy3SZt;(9Y$Crj=_Sq z798+!z}cf&OfrK>0!;D<69jBAz?V*hg&>u{5dk_p;JU3!h) zU`=c*%d%Z;j7>z>Mc0o`{nKHTJ8WUSZOX#mZ36#FmP+#X#*D*F=~%jJAL1j8ZFW%~ z;7%xcB+_9Sp3!f(T=yzPo@wPW{lR-p2|T?tiqDmd9Cx+Io_<>xPDS4@or2-%1>>M} qp1))H|5x$Te7^d1roUw8Eu9yoc_6Z!7xvU7%Q6vwxzLp}(F5I%gVcr(*Vm5f0p4a%ibv#FgpPGaKNR2Hb4&F<9M%HCP+ zt}8ogRT@hug$NSVoT>_pkf;cupP&**DDtTX)UDb}E-kQW5_;j1_E4#U@g%AQE6s1- z%$wc)|7YGvQc#k%Nj26<(smI91nUK#5o{1_6l@Y~7JOE)MesSn9fHpbz985t_@ZE& z;7fup3%(-A|Ng49{T~ra|9N5>GhM@^rVG+Rjp-Cx(#55;mkU;Lnm}W6cW5o@je5G z!QycdPQVnLfpb2zKpO;MH-z9ET<_6X!F25s+^p{D_#Bo~hNBp!LTh5FYAOZOU?$v? z8sJf{;V?xwek}LMsQ%Eq)TV}MWL@qdUj2bbe5{S3=E1aSxKxL@VP;!D3=5f0|=PfxB_KI9T{Thyy~=e=nhimokvQk&UV6MdJ9Ug4rQAcgB8 zgWI_14JhIeOyPZaKNpQ|Z;4)p5Z;eRqOPqPu7jNv zPdHp_F30!1mN;hU)W)-Y;)Rl?Ixe2~@o_wZXL+0FZJ{^EG}Yy&i*F=UO)J={R#I$L zHwq5EQ_=BiZ;q$kf==;+K%+dk<0JeqmSbjibM8r?adBzxZPFBT4PN!*%%r4sKomKB zv!9tdGs(^sdpMIw9|5<)ro@T#@j~0xau273 Date: Wed, 6 Dec 2023 16:09:52 +0100 Subject: [PATCH 3/4] Add 'country' to log pipeline-stage geoip --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e532812bb40c..97004ca0e73a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,8 @@ Main (unreleased) - Added links between compatible components in the documentation to make it easier to discover them. (@thampiotr) +- Added 'country' mmdb-type to log pipeline-stage geoip. (@superstes) + ### Bugfixes - Update `pyroscope.ebpf` to fix a logical bug causing to profile to many kthreads instead of regular processes https://github.com/grafana/pyroscope/pull/2778 (@korniltsev) From 516dbd173e05c3354bf87b2ed4d9394e5da22407 Mon Sep 17 00:00:00 2001 From: Rath Rene Date: Sat, 9 Dec 2023 21:52:19 +0100 Subject: [PATCH 4/4] Lint-fix for tests of pipeline-stage geoip --- component/loki/process/stages/geoip_test.go | 23 ++++++++++----------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/component/loki/process/stages/geoip_test.go b/component/loki/process/stages/geoip_test.go index 4c927fc24b3e..26d1802f74de 100644 --- a/component/loki/process/stages/geoip_test.go +++ b/component/loki/process/stages/geoip_test.go @@ -12,6 +12,11 @@ import ( "github.com/stretchr/testify/require" ) +var ( + geoipTestIP string = "192.0.2.1" + geoipTestSource string = "dummy" +) + func Test_ValidateConfigs(t *testing.T) { source := "ip" tests := []struct { @@ -108,18 +113,16 @@ func Test_MaxmindAsn(t *testing.T) { return } defer mmdb.Close() - ip := "192.0.2.1" var record geoip2.ASN - err = mmdb.Lookup(net.ParseIP(ip), &record) + err = mmdb.Lookup(net.ParseIP(geoipTestIP), &record) if err != nil { t.Error(err) } - source := "dummy" config := GeoIPConfig{ DB: "test", - Source: &source, + Source: &geoipTestSource, DBType: "asn", } valuesExpressions, err := validateGeoIPConfig(config) @@ -154,18 +157,16 @@ func Test_MaxmindCity(t *testing.T) { return } defer mmdb.Close() - ip := "192.0.2.1" var record geoip2.City - err = mmdb.Lookup(net.ParseIP(ip), &record) + err = mmdb.Lookup(net.ParseIP(geoipTestIP), &record) if err != nil { t.Error(err) } - source := "dummy" config := GeoIPConfig{ DB: "test", - Source: &source, + Source: &geoipTestSource, DBType: "city", } valuesExpressions, err := validateGeoIPConfig(config) @@ -210,18 +211,16 @@ func Test_MaxmindCountry(t *testing.T) { return } defer mmdb.Close() - ip := "192.0.2.1" var record geoip2.Country - err = mmdb.Lookup(net.ParseIP(ip), &record) + err = mmdb.Lookup(net.ParseIP(geoipTestIP), &record) if err != nil { t.Error(err) } - source := "dummy" config := GeoIPConfig{ DB: "test", - Source: &source, + Source: &geoipTestSource, DBType: "country", } valuesExpressions, err := validateGeoIPConfig(config)