Skip to content

Commit

Permalink
feat: implement logreceiver for kernel logs
Browse files Browse the repository at this point in the history
Related to: siderolabs/sidero#527

Signed-off-by: Andrey Smirnov <[email protected]>
  • Loading branch information
smira committed Nov 25, 2021
1 parent f7cadbc commit d86cdd5
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 1 deletion.
46 changes: 46 additions & 0 deletions cmd/siderolink-agent/log_receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package main

import (
"context"

"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"inet.af/netaddr"

"github.com/talos-systems/siderolink/pkg/logreceiver"
)

var logReceiverFlags struct {
endpoint string
}

func logHandler(logger *zap.Logger) logreceiver.Handler {
return func(srcAddress netaddr.IP, msg map[string]interface{}) {
logger.Info("kernel log message", zap.Stringer("src_address", srcAddress), zap.Any("msg", msg))
}
}

func logReceiver(ctx context.Context, eg *errgroup.Group, logger *zap.Logger) error {
srv, err := logreceiver.NewServer(logger, logReceiverFlags.endpoint, logHandler(logger))
if err != nil {
return err
}

eg.Go(func() error {
return srv.Serve()
})

eg.Go(func() error {
<-ctx.Done()

srv.Stop()

return nil
})

return nil
}
7 changes: 6 additions & 1 deletion cmd/siderolink-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func main() {
flag.StringVar(&sideroLinkFlags.wireguardEndpoint, "sidero-link-wireguard-endpoint", "172.20.0.1:51821", "advertised Wireguard endpoint")
flag.StringVar(&sideroLinkFlags.apiEndpoint, "sidero-link-api-endpoint", ":4000", "gRPC API endpoint for the SideroLink")
flag.StringVar(&eventSinkFlags.apiEndpoint, "event-sink-endpoint", ":8080", "gRPC API endpoint for the Event Sink")
flag.StringVar(&logReceiverFlags.endpoint, "log-receiver-endpoint", ":4001", "TCP log receiver endpoint")
flag.Parse()

ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
Expand All @@ -44,7 +45,11 @@ func run(ctx context.Context) error {
}

if err := eventSink(ctx, eg); err != nil {
return fmt.Errorf("SideroLink: %w", err)
return fmt.Errorf("event sink: %w", err)
}

if err := logReceiver(ctx, eg, logger); err != nil {
return fmt.Errorf("log receiver: %w", err)
}

if err := eg.Wait(); err != nil && !errors.Is(err, grpc.ErrServerStopped) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/logreceiver/logreceiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

// Package logreceiver implements JSON-over-TCP log receiver.
package logreceiver
90 changes: 90 additions & 0 deletions pkg/logreceiver/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package logreceiver

import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io"
"net"

"go.uber.org/zap"
"inet.af/netaddr"
)

// Server implements TCP server to receive JSON logs.
type Server struct {
listener net.Listener
logger *zap.Logger
handler Handler
}

// Handler is called for each received message.
type Handler func(srcAddress netaddr.IP, msg map[string]interface{})

// NewServer initializes new Server and starts listening.
func NewServer(logger *zap.Logger, listenAddress string, handler Handler) (*Server, error) {
lis, err := net.Listen("tcp", listenAddress)
if err != nil {
return nil, fmt.Errorf("error listening: %w", err)
}

return &Server{
listener: lis,
logger: logger,
handler: handler,
}, nil
}

// Serve runs the TCP server loop.
func (srv *Server) Serve() error {
for {
conn, err := srv.listener.Accept()
if err != nil {
return fmt.Errorf("error accepting connection: %w", err)
}

go srv.handleConnection(conn)
}
}

// Stop serving.
//
// This has a bug that it doesn't close the connections.
func (srv *Server) Stop() {
srv.listener.Close() //nolint:errcheck
}

func (srv *Server) handleConnection(conn net.Conn) {
defer conn.Close() //nolint:errcheck

bufReader := bufio.NewReader(conn)
decoder := json.NewDecoder(bufReader)

srcAddr, ok := conn.RemoteAddr().(*net.TCPAddr)
if !ok {
srv.logger.Error("error getting remote IP address")

return
}

srcAddress, _ := netaddr.FromStdIP(srcAddr.IP)

for {
msg := map[string]interface{}{}

if err := decoder.Decode(&msg); err != nil {
if !errors.Is(err, net.ErrClosed) && !errors.Is(err, io.EOF) {
srv.logger.Error("error decoding message", zap.Error(err))
}

return
}

srv.handler(srcAddress, msg)
}
}

0 comments on commit d86cdd5

Please sign in to comment.