Skip to content

Commit

Permalink
goflow2: use slog instead of logrus
Browse files Browse the repository at this point in the history
  • Loading branch information
lspgn committed Jan 6, 2024
1 parent 6b6d464 commit cb08412
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 63 deletions.
43 changes: 28 additions & 15 deletions cmd/enricher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"flag"
"fmt"
"io"
"log"
"log/slog"
"net"
"os"
"strings"
Expand All @@ -24,7 +26,6 @@ import (
_ "github.com/netsampler/goflow2/v2/transport/kafka"

"github.com/oschwald/geoip2-golang"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/encoding/protodelim"
)

Expand Down Expand Up @@ -81,23 +82,39 @@ func main() {
os.Exit(0)
}

lvl, _ := log.ParseLevel(*LogLevel)
log.SetLevel(lvl)
var loglevel slog.Level
if err := loglevel.UnmarshalText([]byte(*LogLevel)); err != nil {
log.Fatal("error parsing log level")
}

lo := slog.HandlerOptions{
Level: loglevel,
}
logger := slog.New(slog.NewTextHandler(os.Stderr, &lo))

switch *LogFmt {
case "json":
logger = slog.New(slog.NewJSONHandler(os.Stderr, &lo))
}

slog.SetDefault(logger)

var dbAsn, dbCountry *geoip2.Reader
var err error
if *DbAsn != "" {
dbAsn, err = geoip2.Open(*DbAsn)
if err != nil {
log.Fatal(err)
slog.Error("error opening asn db", slog.String("error", err.Error()))
os.Exit(1)
}
defer dbAsn.Close()
}

if *DbCountry != "" {
dbCountry, err = geoip2.Open(*DbCountry)
if err != nil {
log.Fatal(err)
slog.Error("error opening country db", slog.String("error", err.Error()))
os.Exit(1)
}
defer dbCountry.Close()
}
Expand All @@ -109,16 +126,12 @@ func main() {

transporter, err := transport.FindTransport(*Transport)
if err != nil {
log.Fatal(err)
slog.Error("error transporter", slog.String("error", err.Error()))
os.Exit(1)
}
defer transporter.Close()

switch *LogFmt {
case "json":
log.SetFormatter(&log.JSONFormatter{})
}

log.Info("starting enricher")
logger.Info("starting enricher")

rdr := bufio.NewReader(os.Stdin)

Expand All @@ -127,7 +140,7 @@ func main() {
if err := protodelim.UnmarshalFrom(rdr, msg); err != nil && errors.Is(err, io.EOF) {
return
} else if err != nil {
log.Error(err)
slog.Error("error unmarshalling message", slog.String("error", err.Error()))
continue
}

Expand All @@ -139,13 +152,13 @@ func main() {

key, data, err := formatter.Format(msg)
if err != nil {
log.Error(err)
slog.Error("error formatting message", slog.String("error", err.Error()))
continue
}

err = transporter.Send(key, data)
if err != nil {
log.Error(err)
slog.Error("error sending message", slog.String("error", err.Error()))
continue
}
}
Expand Down
103 changes: 60 additions & 43 deletions cmd/goflow2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"flag"
"fmt"
"io"
"log"
"log/slog"
"net"
"net/http"
"net/url"
Expand Down Expand Up @@ -41,7 +43,6 @@ import (
"github.com/netsampler/goflow2/v2/utils"

"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
)

Expand Down Expand Up @@ -83,22 +84,33 @@ func main() {
os.Exit(0)
}

lvl, _ := log.ParseLevel(*LogLevel)
log.SetLevel(lvl)
var loglevel slog.Level
if err := loglevel.UnmarshalText([]byte(*LogLevel)); err != nil {
log.Fatal("error parsing log level")
}

lo := slog.HandlerOptions{
Level: loglevel,
}
logger := slog.New(slog.NewTextHandler(os.Stderr, &lo))

switch *LogFmt {
case "json":
log.SetFormatter(&log.JSONFormatter{})
logger = slog.New(slog.NewJSONHandler(os.Stderr, &lo))
}

slog.SetDefault(logger)

formatter, err := format.FindFormat(*Format)
if err != nil {
log.Fatal(err)
slog.Error("error formatter", slog.String("error", err.Error()))
os.Exit(1)
}

transporter, err := transport.FindTransport(*Transport)
if err != nil {
log.Fatal(err)
slog.Error("error transporter", slog.String("error", err.Error()))
os.Exit(1)
}

var flowProducer producer.ProducerInterface
Expand All @@ -109,23 +121,27 @@ func main() {
if *MappingFile != "" {
f, err := os.Open(*MappingFile)
if err != nil {
log.Fatal(err)
slog.Error("error opening mapping", slog.String("error", err.Error()))
os.Exit(1)
}
cfgProducer, err = LoadMapping(f)
f.Close()
if err != nil {
log.Fatal(err)
slog.Error("error loading mapping", slog.String("error", err.Error()))
os.Exit(1)
}
}

flowProducer, err = protoproducer.CreateProtoProducer(cfgProducer, protoproducer.CreateSamplingSystem)
if err != nil {
log.Fatal(err)
slog.Error("error producer", slog.String("error", err.Error()))
os.Exit(1)
}
} else if *Produce == "raw" {
flowProducer = &rawproducer.RawProducer{}
} else {
log.Fatalf("producer %s does not exist", *Produce)
slog.Error("producer does not exist", slog.String("error", err.Error()), slog.String("producer", *Produce))
os.Exit(1)
}

// wrap producer with Prometheus metrics
Expand All @@ -139,12 +155,13 @@ func main() {
if !collecting {
wr.WriteHeader(http.StatusServiceUnavailable)
if _, err := wr.Write([]byte("Not OK\n")); err != nil {
log.WithError(err).Error("error writing HTTP")
slog.Error("error writing HTTP", slog.String("error", err.Error()))
}
} else {
wr.WriteHeader(http.StatusOK)
if _, err := wr.Write([]byte("OK\n")); err != nil {
log.WithError(err).Error("error writing HTTP")
slog.Error("error writing HTTP", slog.String("error", err.Error()))

}
}
})
Expand All @@ -156,18 +173,17 @@ func main() {
wg.Add(1)
go func() {
defer wg.Done()
l := log.WithFields(log.Fields{
"http": *Addr,
})
logger := logger.With(slog.String("http", *Addr))
err := srv.ListenAndServe()
if err != nil && !errors.Is(err, http.ErrServerClosed) {
l.WithError(err).Fatal("HTTP server error")
slog.Error("HTTP server error", slog.String("error", err.Error()))
os.Exit(1)
}
l.Info("closed HTTP server")
logger.Info("closed HTTP server")
}()
}

log.Info("starting GoFlow2")
logger.Info("starting GoFlow2")

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
Expand All @@ -179,12 +195,14 @@ func main() {
for _, listenAddress := range strings.Split(*ListenAddresses, ",") {
listenAddrUrl, err := url.Parse(listenAddress)
if err != nil {
log.Fatal(err)
logger.Error("error parsing address", slog.String("error", err.Error()))
os.Exit(1)
}
numSockets := 1
if listenAddrUrl.Query().Has("count") {
if numSocketsTmp, err := strconv.ParseUint(listenAddrUrl.Query().Get("count"), 10, 64); err != nil {
log.Fatal(err)
slog.Error("error parsing count in URL", slog.String("error", err.Error()))
os.Exit(1)
} else {
numSockets = int(numSocketsTmp)
}
Expand All @@ -196,26 +214,26 @@ func main() {
hostname := listenAddrUrl.Hostname()
port, err := strconv.ParseUint(listenAddrUrl.Port(), 10, 64)
if err != nil {
log.Errorf("Port %s could not be converted to integer", listenAddrUrl.Port())
return
slog.Error("port could not be converted to integer", slog.String("port", listenAddrUrl.Port()))
os.Exit(1)
}

logFields := log.Fields{
"scheme": listenAddrUrl.Scheme,
"hostname": hostname,
"port": port,
"count": numSockets,
logAttr := []any{
slog.String("scheme", listenAddrUrl.Scheme),
slog.String("hostname", hostname),
slog.Int64("port", int64(port)),
slog.Int("count", numSockets),
}
l := log.WithFields(logFields)

l.Info("starting collection")
logger := logger.With(logAttr...)
logger.Info("starting collection")

cfg := &utils.UDPReceiverConfig{
Sockets: numSockets,
}
recv, err := utils.NewUDPReceiver(cfg)
if err != nil {
log.WithError(err).Fatal("error creating UDP receiver")
logger.Error("error creating UDP receiver", slog.String("error", err.Error()))
os.Exit(1)
}

cfgPipe := &utils.PipeConfig{
Expand All @@ -232,16 +250,17 @@ func main() {
} else if listenAddrUrl.Scheme == "netflow" {
p = utils.NewNetFlowPipe(cfgPipe)
} else {
l.Errorf("scheme %s does not exist", listenAddrUrl.Scheme)
return
logger.Error("scheme does not exist", slog.String("error", listenAddrUrl.Scheme))
os.Exit(1)
}
decodeFunc = metrics.PromDecoderWrapper(p.DecodeFlow, listenAddrUrl.Scheme)
pipes = append(pipes, p)

// starts receivers
// the function either returns an error
if err := recv.Start(hostname, int(port), decodeFunc); err != nil {
l.Fatal(err)
logger.Error("error starting", slog.String("error", listenAddrUrl.Scheme))
os.Exit(1)
} else {
wg.Add(1)
go func() {
Expand All @@ -252,13 +271,12 @@ func main() {
case <-q:
return
case err := <-recv.Errors():
l := l.WithError(err)
if errors.Is(err, netflow.ErrorTemplateNotFound) {
l.Warn("template error")
logger.Warn("template error", slog.String("error", err.Error()))
} else if errors.Is(err, net.ErrClosed) {
l.Info("closed receiver")
logger.Info("receiver closed")
} else {
l.Error("error")
logger.Info("error", slog.String("error", err.Error()))
}

}
Expand Down Expand Up @@ -288,8 +306,7 @@ func main() {
if err == nil {
return
}
l := log.WithError(err)
l.Error("transport error")
logger.Error("transporter error", slog.String("error", err.Error()))
}
}
}()
Expand All @@ -303,7 +320,7 @@ func main() {
// stops receivers first, udp sockets will be down
for _, recv := range receivers {
if err := recv.Stop(); err != nil {
log.WithError(err).Error("error stopping receiver")
logger.Error("error stopping receiver", slog.String("error", err.Error()))
}
}
// then stop pipe
Expand All @@ -314,11 +331,11 @@ func main() {
flowProducer.Close()
// close transporter (eg: flushes message to Kafka)
transporter.Close()
log.Info("closed transporter")
logger.Info("transporter closed")
// close http server (prometheus + health check)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
if err := srv.Shutdown(ctx); err != nil {
log.WithError(err).Error("error shutting-down HTTP server")
logger.Error("error shutting-down HTTP server", slog.String("error", err.Error()))
}
cancel()
close(q) // close errors
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ require (
github.com/libp2p/go-reuseport v0.3.0
github.com/oschwald/geoip2-golang v1.9.0
github.com/prometheus/client_golang v1.16.0
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.8.4
github.com/xdg-go/scram v1.1.2
google.golang.org/protobuf v1.31.0
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,9 @@ github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5X
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
Expand Down Expand Up @@ -114,7 +111,6 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s=
golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down

0 comments on commit cb08412

Please sign in to comment.