From d86cdd59ee7a0e0504b739a913991c272c7fb3f5 Mon Sep 17 00:00:00 2001 From: Andrey Smirnov Date: Thu, 25 Nov 2021 23:41:18 +0300 Subject: [PATCH] feat: implement logreceiver for kernel logs Related to: https://github.com/talos-systems/sidero/issues/527 Signed-off-by: Andrey Smirnov --- cmd/siderolink-agent/log_receiver.go | 46 ++++++++++++++ cmd/siderolink-agent/main.go | 7 ++- pkg/logreceiver/logreceiver.go | 6 ++ pkg/logreceiver/server.go | 90 ++++++++++++++++++++++++++++ 4 files changed, 148 insertions(+), 1 deletion(-) create mode 100644 cmd/siderolink-agent/log_receiver.go create mode 100644 pkg/logreceiver/logreceiver.go create mode 100644 pkg/logreceiver/server.go diff --git a/cmd/siderolink-agent/log_receiver.go b/cmd/siderolink-agent/log_receiver.go new file mode 100644 index 0000000..b76e0a0 --- /dev/null +++ b/cmd/siderolink-agent/log_receiver.go @@ -0,0 +1,46 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package main + +import ( + "context" + + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "inet.af/netaddr" + + "github.com/talos-systems/siderolink/pkg/logreceiver" +) + +var logReceiverFlags struct { + endpoint string +} + +func logHandler(logger *zap.Logger) logreceiver.Handler { + return func(srcAddress netaddr.IP, msg map[string]interface{}) { + logger.Info("kernel log message", zap.Stringer("src_address", srcAddress), zap.Any("msg", msg)) + } +} + +func logReceiver(ctx context.Context, eg *errgroup.Group, logger *zap.Logger) error { + srv, err := logreceiver.NewServer(logger, logReceiverFlags.endpoint, logHandler(logger)) + if err != nil { + return err + } + + eg.Go(func() error { + return srv.Serve() + }) + + eg.Go(func() error { + <-ctx.Done() + + srv.Stop() + + return nil + }) + + return nil +} diff --git a/cmd/siderolink-agent/main.go b/cmd/siderolink-agent/main.go index 4e06280..e11b26b 100644 --- a/cmd/siderolink-agent/main.go +++ b/cmd/siderolink-agent/main.go @@ -21,6 +21,7 @@ func main() { flag.StringVar(&sideroLinkFlags.wireguardEndpoint, "sidero-link-wireguard-endpoint", "172.20.0.1:51821", "advertised Wireguard endpoint") flag.StringVar(&sideroLinkFlags.apiEndpoint, "sidero-link-api-endpoint", ":4000", "gRPC API endpoint for the SideroLink") flag.StringVar(&eventSinkFlags.apiEndpoint, "event-sink-endpoint", ":8080", "gRPC API endpoint for the Event Sink") + flag.StringVar(&logReceiverFlags.endpoint, "log-receiver-endpoint", ":4001", "TCP log receiver endpoint") flag.Parse() ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) @@ -44,7 +45,11 @@ func run(ctx context.Context) error { } if err := eventSink(ctx, eg); err != nil { - return fmt.Errorf("SideroLink: %w", err) + return fmt.Errorf("event sink: %w", err) + } + + if err := logReceiver(ctx, eg, logger); err != nil { + return fmt.Errorf("log receiver: %w", err) } if err := eg.Wait(); err != nil && !errors.Is(err, grpc.ErrServerStopped) { diff --git a/pkg/logreceiver/logreceiver.go b/pkg/logreceiver/logreceiver.go new file mode 100644 index 0000000..a281c1f --- /dev/null +++ b/pkg/logreceiver/logreceiver.go @@ -0,0 +1,6 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +// Package logreceiver implements JSON-over-TCP log receiver. +package logreceiver diff --git a/pkg/logreceiver/server.go b/pkg/logreceiver/server.go new file mode 100644 index 0000000..924eb19 --- /dev/null +++ b/pkg/logreceiver/server.go @@ -0,0 +1,90 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package logreceiver + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "io" + "net" + + "go.uber.org/zap" + "inet.af/netaddr" +) + +// Server implements TCP server to receive JSON logs. +type Server struct { + listener net.Listener + logger *zap.Logger + handler Handler +} + +// Handler is called for each received message. +type Handler func(srcAddress netaddr.IP, msg map[string]interface{}) + +// NewServer initializes new Server and starts listening. +func NewServer(logger *zap.Logger, listenAddress string, handler Handler) (*Server, error) { + lis, err := net.Listen("tcp", listenAddress) + if err != nil { + return nil, fmt.Errorf("error listening: %w", err) + } + + return &Server{ + listener: lis, + logger: logger, + handler: handler, + }, nil +} + +// Serve runs the TCP server loop. +func (srv *Server) Serve() error { + for { + conn, err := srv.listener.Accept() + if err != nil { + return fmt.Errorf("error accepting connection: %w", err) + } + + go srv.handleConnection(conn) + } +} + +// Stop serving. +// +// This has a bug that it doesn't close the connections. +func (srv *Server) Stop() { + srv.listener.Close() //nolint:errcheck +} + +func (srv *Server) handleConnection(conn net.Conn) { + defer conn.Close() //nolint:errcheck + + bufReader := bufio.NewReader(conn) + decoder := json.NewDecoder(bufReader) + + srcAddr, ok := conn.RemoteAddr().(*net.TCPAddr) + if !ok { + srv.logger.Error("error getting remote IP address") + + return + } + + srcAddress, _ := netaddr.FromStdIP(srcAddr.IP) + + for { + msg := map[string]interface{}{} + + if err := decoder.Decode(&msg); err != nil { + if !errors.Is(err, net.ErrClosed) && !errors.Is(err, io.EOF) { + srv.logger.Error("error decoding message", zap.Error(err)) + } + + return + } + + srv.handler(srcAddress, msg) + } +}