diff --git a/Dockerfile b/Dockerfile index 105962d5c..61f2e7ef1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -152,6 +152,14 @@ ARG GO_LDFLAGS RUN --mount=type=cache,target=/.cache GOOS=linux GOARCH=${TARGETARCH} go build ${GO_BUILDFLAGS} -ldflags "${GO_LDFLAGS} -X main.TalosRelease=${TALOS_RELEASE}" -o /log-receiver ./app/sidero-controller-manager/cmd/log-receiver RUN chmod +x /log-receiver +FROM base AS build-events-manager +ARG TALOS_RELEASE +ARG TARGETARCH +ARG GO_BUILDFLAGS +ARG GO_LDFLAGS +RUN --mount=type=cache,target=/.cache GOOS=linux GOARCH=${TARGETARCH} go build ${GO_BUILDFLAGS} -ldflags "${GO_LDFLAGS} -X main.TalosRelease=${TALOS_RELEASE}" -o /events-manager ./app/sidero-controller-manager/cmd/events-manager +RUN chmod +x /events-manager + FROM base AS agent-build-amd64 ARG GO_BUILDFLAGS ARG GO_LDFLAGS @@ -203,6 +211,7 @@ COPY --from=pkg-kernel-arm64 /boot/vmlinuz /var/lib/sidero/env/agent-arm64/vmlin COPY --from=build-sidero-controller-manager /manager /manager COPY --from=build-siderolink-manager /siderolink-manager /siderolink-manager COPY --from=build-log-receiver /log-receiver /log-receiver +COPY --from=build-events-manager /events-manager /events-manager FROM sidero-controller-manager-image AS sidero-controller-manager LABEL org.opencontainers.image.source https://github.com/talos-systems/sidero diff --git a/app/sidero-controller-manager/cmd/events-manager/adapter.go b/app/sidero-controller-manager/cmd/events-manager/adapter.go new file mode 100644 index 000000000..708a3b477 --- /dev/null +++ b/app/sidero-controller-manager/cmd/events-manager/adapter.go @@ -0,0 +1,93 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package main + +import ( + "context" + "fmt" + "strings" + + "go.uber.org/zap" + runtimeclient "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/talos-systems/siderolink/pkg/events" + + "github.com/talos-systems/talos/pkg/machinery/api/machine" +) + +// Adapter implents gRPC API. +type Adapter struct { + Sink *events.Sink + + logger *zap.Logger + metalClient runtimeclient.Client +} + +// NewAdapter initializes new server. +func NewAdapter(metalClient runtimeclient.Client, logger *zap.Logger) *Adapter { + return &Adapter{ + logger: logger, + metalClient: metalClient, + } +} + +// HandleEvent implements events.Adapter. +func (a *Adapter) HandleEvent(ctx context.Context, event events.Event) error { + logger := a.logger.With( + zap.String("node", event.Node), + zap.String("id", event.ID), + zap.String("type", event.TypeURL), + ) + + fields := []zap.Field{} + message := "incoming event" + + var err error + + switch event := event.Payload.(type) { + case *machine.AddressEvent: + fields = append(fields, zap.String("hostname", event.GetHostname()), zap.String("addresses", strings.Join(event.GetAddresses(), ","))) + case *machine.ConfigValidationErrorEvent: + fields = append(fields, zap.Error(fmt.Errorf(event.GetError()))) + case *machine.ConfigLoadErrorEvent: + fields = append(fields, zap.Error(fmt.Errorf(event.GetError()))) + case *machine.PhaseEvent: + fields = append(fields, zap.String("phase", event.GetPhase()), zap.String("action", event.GetAction().String())) + case *machine.TaskEvent: + fields = append(fields, zap.String("task", event.GetTask()), zap.String("action", event.GetAction().String())) + case *machine.ServiceStateEvent: + message = "service " + event.GetMessage() + fields = append(fields, zap.String("service", event.GetService()), zap.String("action", event.GetAction().String())) + case *machine.SequenceEvent: + fields = append(fields, zap.String("sequence", event.GetSequence()), zap.String("action", event.GetAction().String())) + + if event.GetError() != nil { + err = fmt.Errorf(event.GetError().GetMessage()) + } + + if event.GetSequence() == "install" && + event.GetAction() == machine.SequenceEvent_STOP { + if event.GetError() != nil { + message = "failed to install Talos" + + break + } + + message = "successfully installed Talos" + } + } + + if err != nil { + fields = append(fields, zap.Error(err)) + + logger.Error(message, fields...) + + return nil + } + + logger.Info(message, fields...) + + return nil +} diff --git a/app/sidero-controller-manager/cmd/events-manager/kubernetes.go b/app/sidero-controller-manager/cmd/events-manager/kubernetes.go new file mode 100644 index 000000000..b43a0d1dc --- /dev/null +++ b/app/sidero-controller-manager/cmd/events-manager/kubernetes.go @@ -0,0 +1,32 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package main + +import ( + "k8s.io/apimachinery/pkg/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + ctrl "sigs.k8s.io/controller-runtime" + runtimeclient "sigs.k8s.io/controller-runtime/pkg/client" + + sidero "github.com/talos-systems/sidero/app/caps-controller-manager/api/v1alpha3" +) + +func getMetalClient() (runtimeclient.Client, error) { + kubeconfig := ctrl.GetConfigOrDie() + + scheme := runtime.NewScheme() + + if err := clientgoscheme.AddToScheme(scheme); err != nil { + return nil, err + } + + if err := sidero.AddToScheme(scheme); err != nil { + return nil, err + } + + client, err := runtimeclient.New(kubeconfig, runtimeclient.Options{Scheme: scheme}) + + return client, err +} diff --git a/app/sidero-controller-manager/cmd/events-manager/main.go b/app/sidero-controller-manager/cmd/events-manager/main.go new file mode 100644 index 000000000..24218a460 --- /dev/null +++ b/app/sidero-controller-manager/cmd/events-manager/main.go @@ -0,0 +1,90 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "net" + "os" + "os/signal" + "syscall" + + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + + "github.com/talos-systems/sidero/app/sidero-controller-manager/internal/siderolink" + "github.com/talos-systems/siderolink/api/events" + sink "github.com/talos-systems/siderolink/pkg/events" +) + +func main() { + flag.Parse() + + if err := run(); err != nil { + fmt.Fprintf(os.Stderr, "error: %s", err) + os.Exit(1) + } +} + +func run() error { + logger, err := zap.NewProduction() + if err != nil { + return fmt.Errorf("failed to initialize logger: %w", err) + } + + zap.ReplaceGlobals(logger) + zap.RedirectStdLog(logger) + + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer cancel() + + eg, ctx := errgroup.WithContext(ctx) + + address := fmt.Sprintf(":%d", siderolink.EventsSinkPort) + + lis, err := net.Listen("tcp", address) + if err != nil { + return fmt.Errorf("error listening for gRPC API: %w", err) + } + + s := grpc.NewServer() + + client, err := getMetalClient() + if err != nil { + return fmt.Errorf("error getting metal client: %w", err) + } + + srv := sink.NewSink( + NewAdapter(client, + logger.With(zap.String("component", "sink")), + ), + ) + + events.RegisterEventSinkServiceServer(s, srv) + + eg.Go(func() error { + logger.Info("started gRPC event sink", zap.String("address", address)) + + return s.Serve(lis) + }) + + eg.Go(func() error { + <-ctx.Done() + + s.Stop() + + return nil + }) + + if err := eg.Wait(); err != nil && !errors.Is(err, grpc.ErrServerStopped) && errors.Is(err, context.Canceled) { + return err + } + + return nil +} diff --git a/app/sidero-controller-manager/config/manager/manager.yaml b/app/sidero-controller-manager/config/manager/manager.yaml index 0e72c6f55..60c08810a 100644 --- a/app/sidero-controller-manager/config/manager/manager.yaml +++ b/app/sidero-controller-manager/config/manager/manager.yaml @@ -147,6 +147,18 @@ spec: requests: cpu: 50m memory: 128Mi + - command: + - /events-manager + image: controller:latest + imagePullPolicy: Always + name: serverevents + resources: + limits: + cpu: 256m + memory: 256Mi + requests: + cpu: 50m + memory: 128Mi volumes: - hostPath: path: /dev/net/tun diff --git a/app/sidero-controller-manager/internal/ipxe/ipxe_server.go b/app/sidero-controller-manager/internal/ipxe/ipxe_server.go index b533c6caa..381b361e6 100644 --- a/app/sidero-controller-manager/internal/ipxe/ipxe_server.go +++ b/app/sidero-controller-manager/internal/ipxe/ipxe_server.go @@ -411,11 +411,13 @@ func appendTalosArguments(env *metalv1alpha1.Environment) { talosConfigPrefix := talosconstants.KernelParamConfig + "=" sideroLinkPrefix := talosconstants.KernelParamSideroLink + "=" logDeliveryPrefix := talosconstants.KernelParamLoggingKernel + "=" + eventsSinkPrefix := talosconstants.KernelParamEventsSink + "=" for _, prefix := range []string{ talosConfigPrefix, sideroLinkPrefix, logDeliveryPrefix, + eventsSinkPrefix, } { for _, arg := range args { if strings.HasPrefix(arg, prefix) { @@ -440,6 +442,11 @@ func appendTalosArguments(env *metalv1alpha1.Environment) { env.Spec.Kernel.Args = append(env.Spec.Kernel.Args, fmt.Sprintf("%s=tcp://[%s]:%d", talosconstants.KernelParamLoggingKernel, siderolink.Cfg.ServerAddress.IP(), siderolink.LogReceiverPort), ) + case eventsSinkPrefix: + // patch environment with the events sink endpoint + env.Spec.Kernel.Args = append(env.Spec.Kernel.Args, + fmt.Sprintf("%s=[%s]:%d", talosconstants.KernelParamEventsSink, siderolink.Cfg.ServerAddress.IP(), siderolink.EventsSinkPort), + ) } } } diff --git a/app/sidero-controller-manager/internal/siderolink/siderolink.go b/app/sidero-controller-manager/internal/siderolink/siderolink.go index b67e5b501..1ca3b07bd 100644 --- a/app/sidero-controller-manager/internal/siderolink/siderolink.go +++ b/app/sidero-controller-manager/internal/siderolink/siderolink.go @@ -15,6 +15,11 @@ const SecretName = "siderolink" // LogReceiverPort is working only over Wireguard. const LogReceiverPort = 4001 +// EventsSinkPort is the port of the events sink container. +// +// EventsSinkPort is working only over wireguard. +const EventsSinkPort = 4002 + // Cfg is a default global instance of the SideroLink configuration. // // Cfg should be initialized first with `LoadOrCreate`.