Skip to content

Commit

Permalink
feat: implement events manager container
Browse files Browse the repository at this point in the history
- set up `talos.events.sink` kernel args.
- build and run additional container to receive talos events.
- log all events in the adapter.

Signed-off-by: Artem Chernyshev <[email protected]>
  • Loading branch information
Unix4ever committed Dec 2, 2021
1 parent ab12b81 commit d0df929
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 0 deletions.
9 changes: 9 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,14 @@ ARG GO_LDFLAGS
RUN --mount=type=cache,target=/.cache GOOS=linux GOARCH=${TARGETARCH} go build ${GO_BUILDFLAGS} -ldflags "${GO_LDFLAGS} -X main.TalosRelease=${TALOS_RELEASE}" -o /log-receiver ./app/sidero-controller-manager/cmd/log-receiver
RUN chmod +x /log-receiver

FROM base AS build-events-manager
ARG TALOS_RELEASE
ARG TARGETARCH
ARG GO_BUILDFLAGS
ARG GO_LDFLAGS
RUN --mount=type=cache,target=/.cache GOOS=linux GOARCH=${TARGETARCH} go build ${GO_BUILDFLAGS} -ldflags "${GO_LDFLAGS} -X main.TalosRelease=${TALOS_RELEASE}" -o /events-manager ./app/sidero-controller-manager/cmd/events-manager
RUN chmod +x /events-manager

FROM base AS agent-build-amd64
ARG GO_BUILDFLAGS
ARG GO_LDFLAGS
Expand Down Expand Up @@ -203,6 +211,7 @@ COPY --from=pkg-kernel-arm64 /boot/vmlinuz /var/lib/sidero/env/agent-arm64/vmlin
COPY --from=build-sidero-controller-manager /manager /manager
COPY --from=build-siderolink-manager /siderolink-manager /siderolink-manager
COPY --from=build-log-receiver /log-receiver /log-receiver
COPY --from=build-events-manager /events-manager /events-manager

FROM sidero-controller-manager-image AS sidero-controller-manager
LABEL org.opencontainers.image.source https://github.com/talos-systems/sidero
Expand Down
93 changes: 93 additions & 0 deletions app/sidero-controller-manager/cmd/events-manager/adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package main

import (
"context"
"fmt"
"strings"

"go.uber.org/zap"
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/talos-systems/siderolink/pkg/events"

"github.com/talos-systems/talos/pkg/machinery/api/machine"
)

// Adapter implents gRPC API.
type Adapter struct {
Sink *events.Sink

logger *zap.Logger
metalClient runtimeclient.Client
}

// NewAdapter initializes new server.
func NewAdapter(metalClient runtimeclient.Client, logger *zap.Logger) *Adapter {
return &Adapter{
logger: logger,
metalClient: metalClient,
}
}

// HandleEvent implements events.Adapter.
func (a *Adapter) HandleEvent(ctx context.Context, event events.Event) error {
logger := a.logger.With(
zap.String("node", event.Node),
zap.String("id", event.ID),
zap.String("type", event.TypeURL),
)

fields := []zap.Field{}
message := "incoming event"

var err error

switch event := event.Payload.(type) {
case *machine.AddressEvent:
fields = append(fields, zap.String("hostname", event.GetHostname()), zap.String("addresses", strings.Join(event.GetAddresses(), ",")))
case *machine.ConfigValidationErrorEvent:
fields = append(fields, zap.Error(fmt.Errorf(event.GetError())))
case *machine.ConfigLoadErrorEvent:
fields = append(fields, zap.Error(fmt.Errorf(event.GetError())))
case *machine.PhaseEvent:
fields = append(fields, zap.String("phase", event.GetPhase()), zap.String("action", event.GetAction().String()))
case *machine.TaskEvent:
fields = append(fields, zap.String("task", event.GetTask()), zap.String("action", event.GetAction().String()))
case *machine.ServiceStateEvent:
message = "service " + event.GetMessage()
fields = append(fields, zap.String("service", event.GetService()), zap.String("action", event.GetAction().String()))
case *machine.SequenceEvent:
fields = append(fields, zap.String("sequence", event.GetSequence()), zap.String("action", event.GetAction().String()))

if event.GetError() != nil {
err = fmt.Errorf(event.GetError().GetMessage())
}

if event.GetSequence() == "install" &&
event.GetAction() == machine.SequenceEvent_STOP {
if event.GetError() != nil {
message = "failed to install Talos"

break
}

message = "successfully installed Talos"
}
}

if err != nil {
fields = append(fields, zap.Error(err))

logger.Error(message, fields...)

return nil
}

logger.Info(message, fields...)

return nil
}
32 changes: 32 additions & 0 deletions app/sidero-controller-manager/cmd/events-manager/kubernetes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package main

import (
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"

sidero "github.com/talos-systems/sidero/app/caps-controller-manager/api/v1alpha3"
)

func getMetalClient() (runtimeclient.Client, error) {
kubeconfig := ctrl.GetConfigOrDie()

scheme := runtime.NewScheme()

if err := clientgoscheme.AddToScheme(scheme); err != nil {
return nil, err
}

if err := sidero.AddToScheme(scheme); err != nil {
return nil, err
}

client, err := runtimeclient.New(kubeconfig, runtimeclient.Options{Scheme: scheme})

return client, err
}
90 changes: 90 additions & 0 deletions app/sidero-controller-manager/cmd/events-manager/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package main

import (
"context"
"errors"
"flag"
"fmt"
"net"
"os"
"os/signal"
"syscall"

"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"

"github.com/talos-systems/sidero/app/sidero-controller-manager/internal/siderolink"
"github.com/talos-systems/siderolink/api/events"
sink "github.com/talos-systems/siderolink/pkg/events"
)

func main() {
flag.Parse()

if err := run(); err != nil {
fmt.Fprintf(os.Stderr, "error: %s", err)
os.Exit(1)
}
}

func run() error {
logger, err := zap.NewProduction()
if err != nil {
return fmt.Errorf("failed to initialize logger: %w", err)
}

zap.ReplaceGlobals(logger)
zap.RedirectStdLog(logger)

ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()

eg, ctx := errgroup.WithContext(ctx)

address := fmt.Sprintf(":%d", siderolink.EventsSinkPort)

lis, err := net.Listen("tcp", address)
if err != nil {
return fmt.Errorf("error listening for gRPC API: %w", err)
}

s := grpc.NewServer()

client, err := getMetalClient()
if err != nil {
return fmt.Errorf("error getting metal client: %w", err)
}

srv := sink.NewSink(
NewAdapter(client,
logger.With(zap.String("component", "sink")),
),
)

events.RegisterEventSinkServiceServer(s, srv)

eg.Go(func() error {
logger.Info("started gRPC event sink", zap.String("address", address))

return s.Serve(lis)
})

eg.Go(func() error {
<-ctx.Done()

s.Stop()

return nil
})

if err := eg.Wait(); err != nil && !errors.Is(err, grpc.ErrServerStopped) && errors.Is(err, context.Canceled) {
return err
}

return nil
}
12 changes: 12 additions & 0 deletions app/sidero-controller-manager/config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,18 @@ spec:
requests:
cpu: 50m
memory: 128Mi
- command:
- /events-manager
image: controller:latest
imagePullPolicy: Always
name: serverevents
resources:
limits:
cpu: 256m
memory: 256Mi
requests:
cpu: 50m
memory: 128Mi
volumes:
- hostPath:
path: /dev/net/tun
Expand Down
7 changes: 7 additions & 0 deletions app/sidero-controller-manager/internal/ipxe/ipxe_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,11 +411,13 @@ func appendTalosArguments(env *metalv1alpha1.Environment) {
talosConfigPrefix := talosconstants.KernelParamConfig + "="
sideroLinkPrefix := talosconstants.KernelParamSideroLink + "="
logDeliveryPrefix := talosconstants.KernelParamLoggingKernel + "="
eventsSinkPrefix := talosconstants.KernelParamEventsSink + "="

for _, prefix := range []string{
talosConfigPrefix,
sideroLinkPrefix,
logDeliveryPrefix,
eventsSinkPrefix,
} {
for _, arg := range args {
if strings.HasPrefix(arg, prefix) {
Expand All @@ -440,6 +442,11 @@ func appendTalosArguments(env *metalv1alpha1.Environment) {
env.Spec.Kernel.Args = append(env.Spec.Kernel.Args,
fmt.Sprintf("%s=tcp://[%s]:%d", talosconstants.KernelParamLoggingKernel, siderolink.Cfg.ServerAddress.IP(), siderolink.LogReceiverPort),
)
case eventsSinkPrefix:
// patch environment with the events sink endpoint
env.Spec.Kernel.Args = append(env.Spec.Kernel.Args,
fmt.Sprintf("%s=[%s]:%d", talosconstants.KernelParamEventsSink, siderolink.Cfg.ServerAddress.IP(), siderolink.EventsSinkPort),
)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ const SecretName = "siderolink"
// LogReceiverPort is working only over Wireguard.
const LogReceiverPort = 4001

// EventsSinkPort is the port of the events sink container.
//
// EventsSinkPort is working only over wireguard.
const EventsSinkPort = 4002

// Cfg is a default global instance of the SideroLink configuration.
//
// Cfg should be initialized first with `LoadOrCreate`.
Expand Down

0 comments on commit d0df929

Please sign in to comment.