Skip to content

Commit

Permalink
refactor: cleanup and deduplicate the code which matches SideroLink IPs
Browse files Browse the repository at this point in the history
Use same matcher in both log receiever and event receiver. Fix panic in
event receiver.

Bump CAPI and remove cert-manager temporary hack.

Signed-off-by: Andrey Smirnov <[email protected]>
  • Loading branch information
smira committed Feb 3, 2022
1 parent 003f6a7 commit 7912509
Show file tree
Hide file tree
Showing 12 changed files with 296 additions and 232 deletions.
178 changes: 59 additions & 119 deletions app/sidero-controller-manager/cmd/events-manager/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,17 @@ import (
"context"
"fmt"
"strings"
"sync"
"time"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"inet.af/netaddr"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/patch"
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"

sidero "github.com/talos-systems/sidero/app/caps-controller-manager/api/v1alpha3"
"github.com/talos-systems/sidero/app/sidero-controller-manager/internal/siderolink"

"github.com/talos-systems/siderolink/pkg/events"

Expand All @@ -36,45 +30,19 @@ type Adapter struct {
Sink *events.Sink

logger *zap.Logger
annotator *siderolink.Annotator
metalClient runtimeclient.Client
kubeconfig *rest.Config
nodesMu sync.Mutex
nodes map[string]types.NamespacedName
}

// NewAdapter initializes new server.
func NewAdapter(metalClient runtimeclient.Client, kubeconfig *rest.Config, logger *zap.Logger) *Adapter {
func NewAdapter(metalClient runtimeclient.Client, annotator *siderolink.Annotator, logger *zap.Logger) *Adapter {
return &Adapter{
logger: logger,
kubeconfig: kubeconfig,
annotator: annotator,
metalClient: metalClient,
nodes: map[string]types.NamespacedName{},
}
}

func (a *Adapter) Run(ctx context.Context) error {
dc, err := dynamic.NewForConfig(a.kubeconfig)
if err != nil {
return err
}

// Create a factory object that can generate informers for resource types
factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dc, 10*time.Minute, "", nil)

informerFactory := factory.ForResource(sidero.GroupVersion.WithResource("serverbindings"))
informer := informerFactory.Informer()

informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(new interface{}) { a.notify(nil, new) },
UpdateFunc: a.notify,
DeleteFunc: func(old interface{}) { a.notify(old, nil) },
})

informer.Run(ctx.Done())

return nil
}

// HandleEvent implements events.Adapter.
func (a *Adapter) HandleEvent(ctx context.Context, event events.Event) error {
logger := a.logger.With(
Expand All @@ -88,8 +56,34 @@ func (a *Adapter) HandleEvent(ctx context.Context, event events.Event) error {

var err error

parts := strings.Split(event.Node, ":")
ip := strings.Join(parts[:len(parts)-1], ":")
ipPort, err := netaddr.ParseIPPort(event.Node)
if err != nil {
return err
}

ip := ipPort.IP().String()

annotation, _ := a.annotator.Get(ip)

if annotation.ServerUUID != "" {
fields = append(fields, zap.String("server_uuid", annotation.ServerUUID))
}

if annotation.ClusterName != "" {
fields = append(fields, zap.String("cluster", annotation.ClusterName))
}

if annotation.Namespace != "" {
fields = append(fields, zap.String("namespace", annotation.Namespace))
}

if annotation.MetalMachineName != "" {
fields = append(fields, zap.String("metal_machine", annotation.MetalMachineName))
}

if annotation.MachineName != "" {
fields = append(fields, zap.String("machine", annotation.MachineName))
}

switch event := event.Payload.(type) {
case *machine.AddressEvent:
Expand Down Expand Up @@ -162,35 +156,37 @@ func (a *Adapter) HandleEvent(ctx context.Context, event events.Event) error {
}

func (a *Adapter) handleSequenceEvent(ctx context.Context, ip string, event *machine.SequenceEvent) error {
if event.GetSequence() == "install" {
var callback func(*sidero.ServerBinding)
if event.GetSequence() != "install" {
return nil
}

if event.GetAction() == machine.SequenceEvent_STOP {
if event.GetError() != nil {
callback = func(serverbinding *sidero.ServerBinding) {
conditions.MarkFalse(serverbinding, sidero.TalosInstalledCondition, sidero.TalosInstallationFailedReason, clusterv1.ConditionSeverityError, event.GetError().GetMessage())
}
} else {
callback = func(serverbinding *sidero.ServerBinding) {
conditions.MarkTrue(serverbinding, sidero.TalosInstalledCondition)
conditions.MarkTrue(serverbinding, sidero.TalosConfigValidatedCondition)
conditions.MarkTrue(serverbinding, sidero.TalosConfigLoadedCondition)
}
var callback func(*sidero.ServerBinding)

if event.GetAction() == machine.SequenceEvent_STOP {
if event.GetError() != nil {
callback = func(serverbinding *sidero.ServerBinding) {
conditions.MarkFalse(serverbinding, sidero.TalosInstalledCondition, sidero.TalosInstallationFailedReason, clusterv1.ConditionSeverityError, event.GetError().GetMessage())
}
} else if event.GetAction() == machine.SequenceEvent_START {
} else {
callback = func(serverbinding *sidero.ServerBinding) {
conditions.MarkFalse(serverbinding, sidero.TalosInstalledCondition, sidero.TalosInstallationInProgressReason, clusterv1.ConditionSeverityInfo, "")
conditions.MarkFalse(serverbinding, sidero.TalosConfigValidatedCondition, sidero.TalosInstallationInProgressReason, clusterv1.ConditionSeverityInfo, "")
conditions.MarkFalse(serverbinding, sidero.TalosConfigLoadedCondition, sidero.TalosInstallationInProgressReason, clusterv1.ConditionSeverityInfo, "")
conditions.MarkTrue(serverbinding, sidero.TalosInstalledCondition)
conditions.MarkTrue(serverbinding, sidero.TalosConfigValidatedCondition)
conditions.MarkTrue(serverbinding, sidero.TalosConfigLoadedCondition)
}
}

if e := a.patchServerBinding(ctx, ip, callback); e != nil {
return e
} else if event.GetAction() == machine.SequenceEvent_START {
callback = func(serverbinding *sidero.ServerBinding) {
conditions.MarkFalse(serverbinding, sidero.TalosInstalledCondition, sidero.TalosInstallationInProgressReason, clusterv1.ConditionSeverityInfo, "")
conditions.MarkFalse(serverbinding, sidero.TalosConfigValidatedCondition, sidero.TalosInstallationInProgressReason, clusterv1.ConditionSeverityInfo, "")
conditions.MarkFalse(serverbinding, sidero.TalosConfigLoadedCondition, sidero.TalosInstallationInProgressReason, clusterv1.ConditionSeverityInfo, "")
}
}

return nil
if callback == nil {
return nil
}

return a.patchServerBinding(ctx, ip, callback)
}

func (a *Adapter) handleConfigLoadFailedEvent(ctx context.Context, ip string, event *machine.ConfigLoadErrorEvent) error {
Expand Down Expand Up @@ -233,16 +229,13 @@ func (a *Adapter) handlePhaseEvent(ctx context.Context, ip string, event *machin
}

func (a *Adapter) patchServerBinding(ctx context.Context, ip string, callback func(serverbinding *sidero.ServerBinding)) error {
a.nodesMu.Lock()
defer a.nodesMu.Unlock()

name, ok := a.nodes[ip]
if !ok {
annotation, exists := a.annotator.Get(ip)
if !exists {
return fmt.Errorf("failed to find ServerBindings for ip %s", ip)
}

var serverbinding sidero.ServerBinding
if err := a.metalClient.Get(ctx, name, &serverbinding); err != nil {
if err := a.metalClient.Get(ctx, types.NamespacedName{Name: annotation.ServerUUID}, &serverbinding); err != nil {
return err
}

Expand All @@ -255,56 +248,3 @@ func (a *Adapter) patchServerBinding(ctx context.Context, ip string, callback fu

return patchHelper.Patch(ctx, &serverbinding)
}

func (a *Adapter) notify(old, new interface{}) {
var oldServerBinding, newServerBinding *sidero.ServerBinding

if old != nil {
oldServerBinding = &sidero.ServerBinding{}

err := runtime.DefaultUnstructuredConverter.
FromUnstructured(old.(*unstructured.Unstructured).UnstructuredContent(), oldServerBinding)
if err != nil {
a.logger.Error("failed converting old event object", zap.Error(err))

return
}
}

if new != nil {
newServerBinding = &sidero.ServerBinding{}

err := runtime.DefaultUnstructuredConverter.
FromUnstructured(new.(*unstructured.Unstructured).UnstructuredContent(), newServerBinding)
if err != nil {
a.logger.Error("failed converting new event object", zap.Error(err))

return
}
}

a.nodesMu.Lock()
defer a.nodesMu.Unlock()

if new == nil {
delete(a.nodes, oldServerBinding.Spec.SideroLink.NodeAddress)
} else {
address := newServerBinding.Spec.SideroLink.NodeAddress
if address == "" {
return
}

address = fmt.Sprintf("[%s]", strings.Split(address, "/")[0])

if old != nil {
delete(a.nodes, oldServerBinding.Spec.SideroLink.NodeAddress)
}

a.nodes[address] = types.NamespacedName{
Name: newServerBinding.GetName(),
Namespace: newServerBinding.GetNamespace(),
}

a.logger.Info("new node mapping", zap.String("ip", newServerBinding.Spec.SideroLink.NodeAddress), zap.String("server", newServerBinding.GetName()))
}
}
6 changes: 4 additions & 2 deletions app/sidero-controller-manager/cmd/events-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ func run() error {
return fmt.Errorf("error getting metal client: %w", err)
}

annotator := siderolink.NewAnnotator(client, kubeconfig, logger)

adapter := NewAdapter(client,
kubeconfig,
annotator,
logger.With(zap.String("component", "sink")),
)

Expand All @@ -70,7 +72,7 @@ func run() error {
events.RegisterEventSinkServiceServer(s, srv)

eg.Go(func() error {
return adapter.Run(ctx)
return annotator.Run(ctx)
})

eg.Go(func() error {
Expand Down
106 changes: 7 additions & 99 deletions app/sidero-controller-manager/cmd/log-receiver/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,115 +5,19 @@
package main

import (
"context"
"encoding/json"
"fmt"
"os"
"strings"
"sync"
"time"

"go.uber.org/zap"
"inet.af/netaddr"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/talos-systems/sidero/app/sidero-controller-manager/internal/siderolink"
"github.com/talos-systems/siderolink/pkg/logreceiver"

sidero "github.com/talos-systems/sidero/app/caps-controller-manager/api/v1alpha3"
)

type sourceAnnotation struct {
ServerUUID string
MetalMachineName string
MachineName string
ClusterName string
}

var sourceMap sync.Map

func fetchSourceAnnotation(ctx context.Context, metalClient runtimeclient.Client, srcAddr netaddr.IP) (sourceAnnotation, error) {
var (
annotation sourceAnnotation
serverbindings sidero.ServerBindingList
)

if err := metalClient.List(ctx, &serverbindings); err != nil {
return annotation, fmt.Errorf("error getting server bindings: %w", err)
}

srcAddress := srcAddr.String()

var serverBinding *sidero.ServerBinding

for _, item := range serverbindings.Items {
item := item

if strings.HasPrefix(item.Spec.SideroLink.NodeAddress, srcAddress) {
serverBinding = &item

break
}
}

if serverBinding == nil {
// no matching server binding, leave things as is
return annotation, nil
}

annotation.ServerUUID = serverBinding.Name
annotation.MetalMachineName = fmt.Sprintf("%s/%s", serverBinding.Spec.MetalMachineRef.Namespace, serverBinding.Spec.MetalMachineRef.Name)
annotation.ClusterName = serverBinding.Labels[clusterv1.ClusterLabelName]

var metalMachine sidero.MetalMachine

if err := metalClient.Get(ctx,
types.NamespacedName{
Namespace: serverBinding.Spec.MetalMachineRef.Namespace,
Name: serverBinding.Spec.MetalMachineRef.Name,
},
&metalMachine); err != nil {
return annotation, fmt.Errorf("error getting metal machine: %w", err)
}

for _, ref := range metalMachine.OwnerReferences {
gv, err := schema.ParseGroupVersion(ref.APIVersion)
if err != nil {
continue
}

if ref.Kind == "Machine" && gv.Group == clusterv1.GroupVersion.Group {
annotation.MachineName = fmt.Sprintf("%s/%s", metalMachine.Namespace, ref.Name)

break
}
}

return annotation, nil
}

func logHandler(metalClient runtimeclient.Client, logger *zap.Logger) logreceiver.Handler {
func logHandler(logger *zap.Logger, annotator *siderolink.Annotator) logreceiver.Handler {
return func(srcAddr netaddr.IP, msg map[string]interface{}) {
var annotation sourceAnnotation

v, ok := sourceMap.Load(srcAddr)
if !ok {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

var err error

annotation, err = fetchSourceAnnotation(ctx, metalClient, srcAddr)
if err != nil {
logger.Error("error fetching server information", zap.Error(err), zap.Stringer("source_addr", srcAddr))
}

sourceMap.Store(srcAddr, annotation)
} else {
annotation = v.(sourceAnnotation)
}
annotation, _ := annotator.Get(srcAddr.String())

if annotation.ServerUUID != "" {
msg["server_uuid"] = annotation.ServerUUID
Expand All @@ -123,6 +27,10 @@ func logHandler(metalClient runtimeclient.Client, logger *zap.Logger) logreceive
msg["cluster"] = annotation.ClusterName
}

if annotation.Namespace != "" {
msg["namespace"] = annotation.Namespace
}

if annotation.MetalMachineName != "" {
msg["metal_machine"] = annotation.MetalMachineName
}
Expand Down
Loading

0 comments on commit 7912509

Please sign in to comment.