Skip to content

Commit

Permalink
Log: Add 'country' to geoip pipeline-stage (#5924)
Browse files Browse the repository at this point in the history
* Add 'country' to log pipeline-stage geoip

* Add tests for log pipeline-stage geoip

* Add 'country' to log pipeline-stage geoip

* Lint-fix for tests of pipeline-stage geoip
  • Loading branch information
superstes authored Dec 14, 2023
1 parent 881722c commit bb1d64d
Show file tree
Hide file tree
Showing 8 changed files with 282 additions and 10 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ Main (unreleased)

- Added support for `loki.write` to flush WAL on agent shutdown. (@thepalbi)

- Added 'country' mmdb-type to log pipeline-stage geoip. (@superstes)

### Bugfixes

- Update `pyroscope.ebpf` to fix a logical bug causing to profile to many kthreads instead of regular processes https://github.com/grafana/pyroscope/pull/2778 (@korniltsev)
Expand Down
68 changes: 59 additions & 9 deletions component/loki/process/stages/geoip.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,31 @@ type GeoIPFields int
const (
CITYNAME GeoIPFields = iota
COUNTRYNAME
COUNTRYCODE
CONTINENTNAME
CONTINENTCODE
LOCATION
POSTALCODE
TIMEZONE
SUBDIVISIONNAME
SUBDIVISIONCODE
ASN
ASNORG
)

var fields = map[GeoIPFields]string{
CITYNAME: "geoip_city_name",
COUNTRYNAME: "geoip_country_name",
COUNTRYCODE: "geoip_country_code",
CONTINENTNAME: "geoip_continent_name",
CONTINENTCODE: "geoip_continent_code",
LOCATION: "geoip_location",
POSTALCODE: "geoip_postal_code",
TIMEZONE: "geoip_timezone",
SUBDIVISIONNAME: "geoip_subdivision_name",
SUBDIVISIONCODE: "geoip_subdivision_code",
ASN: "geoip_autonomous_system_number",
ASNORG: "geoip_autonomous_system_organization",
}

// GeoIPConfig represents GeoIP stage config
Expand All @@ -69,7 +75,7 @@ func validateGeoIPConfig(c GeoIPConfig) (map[string]*jmespath.JMESPath, error) {
}

switch c.DBType {
case "", "asn", "city":
case "", "asn", "city", "country":
default:
return nil, ErrEmptyDBTypeGeoIPStageConfig
}
Expand Down Expand Up @@ -182,6 +188,14 @@ func (g *geoIPStage) process(_ model.LabelSet, extracted map[string]interface{})
return
}
g.populateExtractedWithASNData(extracted, &record)
case "country":
var record geoip2.Country
err := g.mmdb.Lookup(ip, &record)
if err != nil {
level.Error(g.logger).Log("msg", "unable to get Country record for the ip", "err", err, "ip", ip)
return
}
g.populateExtractedWithCountryData(extracted, &record)
default:
level.Error(g.logger).Log("msg", "unknown database type")
}
Expand Down Expand Up @@ -210,6 +224,11 @@ func (g *geoIPStage) populateExtractedWithCityData(extracted map[string]interfac
if contryName != "" {
extracted[label] = contryName
}
case COUNTRYCODE:
contryCode := record.Country.IsoCode
if contryCode != "" {
extracted[label] = contryCode
}
case CONTINENTNAME:
continentName := record.Continent.Names["en"]
if continentName != "" {
Expand Down Expand Up @@ -252,20 +271,51 @@ func (g *geoIPStage) populateExtractedWithCityData(extracted map[string]interfac
extracted[label] = subdivisionCode
}
}
default:
level.Error(g.logger).Log("msg", "unknown geoip field")
}
}
}

func (g *geoIPStage) populateExtractedWithASNData(extracted map[string]interface{}, record *geoip2.ASN) {
autonomousSystemNumber := record.AutonomousSystemNumber
autonomousSystemOrganization := record.AutonomousSystemOrganization
if autonomousSystemNumber != 0 {
extracted["geoip_autonomous_system_number"] = autonomousSystemNumber
for field, label := range fields {
switch field {
case ASN:
autonomousSystemNumber := record.AutonomousSystemNumber
if autonomousSystemNumber != 0 {
extracted[label] = autonomousSystemNumber
}
case ASNORG:
autonomousSystemOrganization := record.AutonomousSystemOrganization
if autonomousSystemOrganization != "" {
extracted[label] = autonomousSystemOrganization
}
}
}
if autonomousSystemOrganization != "" {
extracted["geoip_autonomous_system_organization"] = autonomousSystemOrganization
}

func (g *geoIPStage) populateExtractedWithCountryData(extracted map[string]interface{}, record *geoip2.Country) {
for field, label := range fields {
switch field {
case COUNTRYNAME:
contryName := record.Country.Names["en"]
if contryName != "" {
extracted[label] = contryName
}
case COUNTRYCODE:
contryCode := record.Country.IsoCode
if contryCode != "" {
extracted[label] = contryCode
}
case CONTINENTNAME:
continentName := record.Continent.Names["en"]
if continentName != "" {
extracted[label] = continentName
}
case CONTINENTCODE:
continentCode := record.Continent.Code
if continentCode != "" {
extracted[label] = continentCode
}
}
}
}

Expand Down
168 changes: 168 additions & 0 deletions component/loki/process/stages/geoip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,21 @@ package stages

import (
"errors"
"fmt"
"net"
"testing"

util_log "github.com/grafana/loki/pkg/util/log"
"github.com/oschwald/geoip2-golang"
"github.com/oschwald/maxminddb-golang"
"github.com/stretchr/testify/require"
)

var (
geoipTestIP string = "192.0.2.1"
geoipTestSource string = "dummy"
)

func Test_ValidateConfigs(t *testing.T) {
source := "ip"
tests := []struct {
Expand All @@ -21,6 +31,14 @@ func Test_ValidateConfigs(t *testing.T) {
},
nil,
},
{
GeoIPConfig{
DB: "test",
Source: &source,
DBType: "country",
},
nil,
},
{
GeoIPConfig{
DB: "test",
Expand Down Expand Up @@ -81,3 +99,153 @@ func Test_ValidateConfigs(t *testing.T) {
}
}
}

/*
NOTE:
database schema: https://github.com/maxmind/MaxMind-DB/tree/main/source-data
Script used to build the minimal binaries: https://github.com/vimt/MaxMind-DB-Writer-python
*/

func Test_MaxmindAsn(t *testing.T) {
mmdb, err := maxminddb.Open("testdata/geoip_maxmind_asn.mmdb")
if err != nil {
t.Error(err)
return
}
defer mmdb.Close()

var record geoip2.ASN
err = mmdb.Lookup(net.ParseIP(geoipTestIP), &record)
if err != nil {
t.Error(err)
}

config := GeoIPConfig{
DB: "test",
Source: &geoipTestSource,
DBType: "asn",
}
valuesExpressions, err := validateGeoIPConfig(config)
if err != nil {
t.Errorf("Error validating test-config: %v", err)
}
testStage := &geoIPStage{
mmdb: mmdb,
logger: util_log.Logger,
valuesExpressions: valuesExpressions,
cfgs: config,
}

extracted := map[string]interface{}{}
testStage.populateExtractedWithASNData(extracted, &record)

for _, field := range []string{
fields[ASN],
fields[ASNORG],
} {
_, present := extracted[field]
if !present {
t.Errorf("GeoIP label %v not present", field)
}
}
}

func Test_MaxmindCity(t *testing.T) {
mmdb, err := maxminddb.Open("testdata/geoip_maxmind_city.mmdb")
if err != nil {
t.Error(err)
return
}
defer mmdb.Close()

var record geoip2.City
err = mmdb.Lookup(net.ParseIP(geoipTestIP), &record)
if err != nil {
t.Error(err)
}

config := GeoIPConfig{
DB: "test",
Source: &geoipTestSource,
DBType: "city",
}
valuesExpressions, err := validateGeoIPConfig(config)
if err != nil {
t.Errorf("Error validating test-config: %v", err)
}
testStage := &geoIPStage{
mmdb: mmdb,
logger: util_log.Logger,
valuesExpressions: valuesExpressions,
cfgs: config,
}

extracted := map[string]interface{}{}
testStage.populateExtractedWithCityData(extracted, &record)

for _, field := range []string{
fields[COUNTRYNAME],
fields[COUNTRYCODE],
fields[CONTINENTNAME],
fields[CONTINENTCODE],
fields[CITYNAME],
fmt.Sprintf("%s_latitude", fields[LOCATION]),
fmt.Sprintf("%s_longitude", fields[LOCATION]),
fields[POSTALCODE],
fields[TIMEZONE],
fields[SUBDIVISIONNAME],
fields[SUBDIVISIONCODE],
fields[COUNTRYNAME],
} {
_, present := extracted[field]
if !present {
t.Errorf("GeoIP label %v not present", field)
}
}
}

func Test_MaxmindCountry(t *testing.T) {
mmdb, err := maxminddb.Open("testdata/geoip_maxmind_country.mmdb")
if err != nil {
t.Error(err)
return
}
defer mmdb.Close()

var record geoip2.Country
err = mmdb.Lookup(net.ParseIP(geoipTestIP), &record)
if err != nil {
t.Error(err)
}

config := GeoIPConfig{
DB: "test",
Source: &geoipTestSource,
DBType: "country",
}
valuesExpressions, err := validateGeoIPConfig(config)
if err != nil {
t.Errorf("Error validating test-config: %v", err)
}
testStage := &geoIPStage{
mmdb: mmdb,
logger: util_log.Logger,
valuesExpressions: valuesExpressions,
cfgs: config,
}

extracted := map[string]interface{}{}
testStage.populateExtractedWithCountryData(extracted, &record)

for _, field := range []string{
fields[COUNTRYNAME],
fields[COUNTRYCODE],
fields[CONTINENTNAME],
fields[CONTINENTCODE],
} {
_, present := extracted[field]
if !present {
t.Errorf("GeoIP label %v not present", field)
}
}
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
14 changes: 14 additions & 0 deletions component/loki/process/stages/testdata/geoip_source.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[
{
"type": "GeoLite2-ASN",
"data": {"192.0.2.0/24": {"autonomous_system_number": 1337, "autonomous_system_organization": "Just a Test"}}
},
{
"type": "GeoIP2-Country",
"data": {"192.0.2.0/24": {"continent": {"code": "NA", "geoname_id": 6255149, "names": {"de": "Nordamerika", "en": "North America", "es": "Norteam\u00e9rica", "fr": "Am\u00e9rique du Nord", "ja": "\u5317\u30a2\u30e1\u30ea\u30ab", "pt-BR": "Am\u00e9rica do Norte", "ru": "\u0421\u0435\u0432\u0435\u0440\u043d\u0430\u044f \u0410\u043c\u0435\u0440\u0438\u043a", "zh-CN": "\u5317\u7f8e\u6d32"}}, "country": {"geoname_id": 6252001, "is_in_european_union": false, "iso_code": "US", "names": {"de": "Vereinigte Staaten", "en": "United States", "es": "Estados Unidos", "fr": "\u00c9tats Unis", "ja": "\u30a2\u30e1\u30ea\u30ab", "pt-BR": "EUA", "ru": "\u0421\u0428\u0410", "zh-CN": "\u7f8e\u56fd"}}, "registered_country": {"geoname_id": 6252001, "is_in_european_union": false, "iso_code": "US", "names": {"de": "Vereinigte Staaten", "en": "United States", "es": "Estados Unidos", "fr": "\u00c9tats Unis", "ja": "\u30a2\u30e1\u30ea\u30ab", "pt-BR": "EUA", "ru": "\u0421\u0428\u0410", "zh-CN": "\u7f8e\u56fd"}}, "traits": {"is_anonymous_proxy": true, "is_satellite_provider": true, "is_anycast": true}}}
},
{
"type": "GeoIP2-City",
"data": {"192.0.2.0/24": {"continent": {"code": "EU", "geoname_id": 6255148, "names": {"de": "Europa", "en": "Europe", "es": "Europa", "fr": "Europe", "ja": "\u30e8\u30fc\u30ed\u30c3\u30d1", "pt-BR": "Europa", "ru": "\u0415\u0432\u0440\u043e\u043f\u0430", "zh-CN": "\u6b27\u6d32"}}, "country": {"geoname_id": 2635167, "is_in_european_union": false, "iso_code": "GB", "names": {"de": "Vereinigtes K\u00f6nigreich", "en": "United Kingdom", "es": "Reino Unido", "fr": "Royaume-Uni", "ja": "\u30a4\u30ae\u30ea\u30b9", "pt-BR": "Reino Unido", "ru": "\u0412\u0435\u043b\u0438\u043a\u043e\u0431\u0440\u0438\u0442\u0430\u043d\u0438\u044f", "zh-CN": "\u82f1\u56fd"}}, "registered_country": {"geoname_id": 6252001, "is_in_european_union": false, "iso_code": "US", "names": {"de": "USA", "en": "United States", "es": "Estados Unidos", "fr": "\u00c9tats-Unis", "ja": "\u30a2\u30e1\u30ea\u30ab\u5408\u8846\u56fd", "pt-BR": "Estados Unidos", "ru": "\u0421\u0428\u0410", "zh-CN": "\u7f8e\u56fd"}}, "traits": {"is_anonymous_proxy": true, "is_satellite_provider": true, "is_anycast": true}, "location": {"accuracy_radius": 100, "latitude": 51.5142, "longitude": -0.0931, "time_zone": "Europe/London"}, "postal": {"code": "OX1"}, "city": {"geoname_id": 2643743, "names": {"de": "London", "en": "London", "es": "Londres", "fr": "Londres", "ja": "\u30ed\u30f3\u30c9\u30f3", "pt-BR": "Londres", "ru": "\u041b\u043e\u043d\u0434\u043e\u043d"}}, "subdivisions": [{"geoname_id": 6269131, "iso_code": "ENG", "names": {"en": "England", "es": "Inglaterra", "fr": "Angleterre", "pt-BR": "Inglaterra"}}]}}
}
]
Loading

0 comments on commit bb1d64d

Please sign in to comment.