diff --git a/cmd/icinga-kubernetes/main.go b/cmd/icinga-kubernetes/main.go index 9ac8e6bd..c504d444 100644 --- a/cmd/icinga-kubernetes/main.go +++ b/cmd/icinga-kubernetes/main.go @@ -96,10 +96,12 @@ func main() { ) }) - logSync := sync.NewContainerLogSync(k, db, logs.GetChildLogger("ContainerLogs")) - g.Go(func() error { - return logSync.Run(ctx, podUpserts, podDeletes) + return sync.NewContainerLogSync( + k, db, logs.GetChildLogger("ContainerLogs"), + ).Run( + ctx, podUpserts, podDeletes, + ) }) if err := g.Wait(); err != nil { diff --git a/pkg/sync/channel-mux.go b/pkg/sync/channel-mux.go index 69635ba9..16ec177b 100644 --- a/pkg/sync/channel-mux.go +++ b/pkg/sync/channel-mux.go @@ -7,79 +7,82 @@ import ( ) // ChannelMux is a multiplexer for channels of variable types. -// It fans all input channels to all output channels. +// It fans out all input channels to all output channels. type ChannelMux[T any] interface { + // In adds the given input channel reading. + In(<-chan T) - // AddInChannel adds given input channel to the list of input channels. - AddInChannel(<-chan T) + // Out returns a new output channel that receives from all input channels. + Out() <-chan T - // NewOutChannel returns and adds new output channel to the pods of created addedOutChannels. - NewOutChannel() <-chan T + // AddOut registers the given output channel to receive from all input channels. + AddOut(chan<- T) - // AddOutChannel adds given output channel to the list of added addedOutChannels. - AddOutChannel(chan<- T) - - // Run combines output channel lists and starts multiplexing. + // Run starts multiplexing of all input channels to all output channels. Run(context.Context) error } type channelMux[T any] struct { - inChannels []<-chan T - createdOutChannels []chan<- T - addedOutChannels []chan<- T - started atomic.Bool + in []<-chan T + out []chan<- T + outAdded []chan<- T + started atomic.Bool } -// NewChannelMux creates new ChannelMux initialized with at least one input channel -func NewChannelMux[T any](initInChannel <-chan T, inChannels ...<-chan T) ChannelMux[T] { +// NewChannelMux returns a new ChannelMux initialized with at least one input channel. +func NewChannelMux[T any](inChannel <-chan T, inChannels ...<-chan T) ChannelMux[T] { return &channelMux[T]{ - inChannels: append(make([]<-chan T, 0), append(inChannels, initInChannel)...), + in: append(inChannels, inChannel), } } -func (mux *channelMux[T]) AddInChannel(channel <-chan T) { +func (mux *channelMux[T]) In(channel <-chan T) { if mux.started.Load() { panic("channelMux already started") } - mux.inChannels = append(mux.inChannels, channel) + mux.in = append(mux.in, channel) } -func (mux *channelMux[T]) NewOutChannel() <-chan T { +func (mux *channelMux[T]) Out() <-chan T { if mux.started.Load() { panic("channelMux already started") } channel := make(chan T) - mux.createdOutChannels = append(mux.createdOutChannels, channel) + mux.out = append(mux.out, channel) return channel } -func (mux *channelMux[T]) AddOutChannel(channel chan<- T) { +func (mux *channelMux[T]) AddOut(channel chan<- T) { if mux.started.Load() { panic("channelMux already started") } - mux.addedOutChannels = append(mux.addedOutChannels, channel) + mux.outAdded = append(mux.outAdded, channel) } func (mux *channelMux[T]) Run(ctx context.Context) error { + if mux.started.Load() { + panic("channelMux already started") + } + mux.started.Store(true) defer func() { - for _, channelToClose := range mux.createdOutChannels { + for _, channelToClose := range mux.out { close(channelToClose) } }() - outChannels := append(mux.addedOutChannels, mux.createdOutChannels...) + outs := append(mux.outAdded, mux.out...) sink := make(chan T) g, ctx := errgroup.WithContext(ctx) - for _, ch := range mux.inChannels { + for _, ch := range mux.in { ch := ch g.Go(func() error { @@ -105,9 +108,9 @@ func (mux *channelMux[T]) Run(ctx context.Context) error { return nil } - for _, outChannel := range outChannels { + for _, ch := range outs { select { - case outChannel <- spread: + case ch <- spread: case <-ctx.Done(): return ctx.Err() } diff --git a/pkg/sync/channel-mux_test.go b/pkg/sync/channel-mux_test.go index 37e90b6e..467c527d 100644 --- a/pkg/sync/channel-mux_test.go +++ b/pkg/sync/channel-mux_test.go @@ -27,9 +27,9 @@ func TestAddedOutputChannels(t *testing.T) { outputChannel1 := make(chan int) outputChannel2 := make(chan int) outputChannel3 := make(chan int) - multiplexer.AddOutChannel(outputChannel1) - multiplexer.AddOutChannel(outputChannel2) - multiplexer.AddOutChannel(outputChannel3) + multiplexer.AddOut(outputChannel1) + multiplexer.AddOut(outputChannel2) + multiplexer.AddOut(outputChannel3) g, ctx := errgroup.WithContext(context.Background()) @@ -56,9 +56,9 @@ func TestCreatedOutputChannels(t *testing.T) { multiplexChannel := make(chan int) multiplexer := NewChannelMux(multiplexChannel) - outputChannel1 := multiplexer.NewOutChannel() - outputChannel2 := multiplexer.NewOutChannel() - outputChannel3 := multiplexer.NewOutChannel() + outputChannel1 := multiplexer.Out() + outputChannel2 := multiplexer.Out() + outputChannel3 := multiplexer.Out() g, ctx := errgroup.WithContext(context.Background()) @@ -100,7 +100,7 @@ func TestAddedInputChannels(t *testing.T) { multiplexer := NewChannelMux(multiplexChannel1, multiplexChannel2, multiplexChannel3) - outputChannel := multiplexer.NewOutChannel() + outputChannel := multiplexer.Out() ctx, cancel := context.WithCancel(context.Background()) g, ctx := errgroup.WithContext(ctx) @@ -173,9 +173,9 @@ func TestClosedChannels(t *testing.T) { multiplexChannel := make(chan int) multiplexer := NewChannelMux(multiplexChannel) - outputChannel1 := multiplexer.NewOutChannel() - outputChannel2 := multiplexer.NewOutChannel() - outputChannel3 := multiplexer.NewOutChannel() + outputChannel1 := multiplexer.Out() + outputChannel2 := multiplexer.Out() + outputChannel3 := multiplexer.Out() ctx, cancel := context.WithCancel(context.Background()) g, ctx := errgroup.WithContext(ctx) diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index e04b5125..17c9852e 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -31,35 +31,38 @@ func NewSync( informer kcache.SharedInformer, logger *logging.Logger, ) Sync { - s := &sync{ + return &sync{ db: db, informer: informer, logger: logger, factory: factory, } - - return s } +// WithForwardUpserts adds a new channel to get upserts from multiplexer. func WithForwardUpserts(channel chan<- contracts.KUpsert) SyncOption { return func(options *SyncOptions) { options.forwardUpserts = channel } } +// WithForwardDeletes adds a new channel to get deletes from multiplexer. func WithForwardDeletes(channel chan<- contracts.KDelete) SyncOption { return func(options *SyncOptions) { options.forwardDeletes = channel } } +// SyncOption is a single option for sync. type SyncOption func(options *SyncOptions) +// SyncOptions stores options for sync. type SyncOptions struct { forwardUpserts chan<- contracts.KUpsert forwardDeletes chan<- contracts.KDelete } +// NewSyncOptions returns a new SyncOptions and initializes with given options. func NewSyncOptions(options ...SyncOption) *SyncOptions { syncOptions := &SyncOptions{} @@ -96,132 +99,77 @@ func (s *sync) Run(ctx context.Context, execOptions ...SyncOption) error { s.logger.Debug("Finished warming up") - s.factory().GetResourceVersion() - syncOptions := NewSyncOptions(execOptions...) - // init upsert channel spreader - multiplexUpsertChannel := make(chan contracts.KUpsert) - defer close(multiplexUpsertChannel) - - multiplexUpsert := NewChannelMux(multiplexUpsertChannel) - - upsertChannel := multiplexUpsert.NewOutChannel() + kupsertMux := NewChannelMux(changes.Adds(), changes.Updates()) + kupserts := kupsertMux.Out() if syncOptions.forwardUpserts != nil { - multiplexUpsert.AddOutChannel(syncOptions.forwardUpserts) + kupsertMux.AddOut(syncOptions.forwardUpserts) } - // run upsert channel spreader g.Go(func() error { - return multiplexUpsert.Run(ctx) + return kupsertMux.Run(ctx) }) - upsertToStream := make(chan database.Entity) - defer close(upsertToStream) - - for _, ch := range []<-chan contracts.KUpsert{changes.Adds(), changes.Updates()} { - ch := ch + databaseUpserts := make(chan database.Entity) + defer close(databaseUpserts) - g.Go(func() error { - for { - select { - case kupsert, more := <-ch: - if !more { - return nil - } - - select { - case multiplexUpsertChannel <- kupsert: - case <-ctx.Done(): - return ctx.Err() - } - case <-ctx.Done(): - return ctx.Err() + g.Go(func() error { + for { + select { + case kupsert, more := <-kupserts: + if !more { + return nil } - } - }) - g.Go(func() error { - for { + entity := s.factory() + entity.SetID(kupsert.ID()) + entity.SetCanonicalName(kupsert.GetCanonicalName()) + entity.Obtain(kupsert.KObject()) + select { - case kupsert, more := <-upsertChannel: - if !more { - return nil - } - - entity := s.factory() - entity.SetID(kupsert.ID()) - entity.SetCanonicalName(kupsert.GetCanonicalName()) - entity.Obtain(kupsert.KObject()) - - select { - case upsertToStream <- entity: - s.logger.Debugw( - fmt.Sprintf("Sync: Upserted %s", kupsert.GetCanonicalName()), - zap.String("id", kupsert.ID().String())) - case <-ctx.Done(): - return ctx.Err() - } + case databaseUpserts <- entity: + s.logger.Debugw( + fmt.Sprintf("Sync: Upserted %s", kupsert.GetCanonicalName()), + zap.String("id", kupsert.ID().String())) case <-ctx.Done(): return ctx.Err() } + case <-ctx.Done(): + return ctx.Err() } - }) - } + } + }) g.Go(func() error { - return database.NewUpsert(s.db).Stream(ctx, upsertToStream) + return database.NewUpsert(s.db).Stream(ctx, databaseUpserts) }) - // init delete channel spreader - multiplexDeleteChannel := make(chan contracts.KDelete) - defer close(multiplexDeleteChannel) - - multiplexDelete := NewChannelMux(multiplexDeleteChannel) - - deleteChannel := multiplexDelete.NewOutChannel() + kdeleteMux := NewChannelMux(changes.Deletes()) + kdeletes := kdeleteMux.Out() if syncOptions.forwardDeletes != nil { - multiplexDelete.AddOutChannel(syncOptions.forwardDeletes) + kdeleteMux.AddOut(syncOptions.forwardDeletes) } - // run delete channel spreader - g.Go(func() error { - return multiplexDelete.Run(ctx) - }) - g.Go(func() error { - for { - select { - case kdelete, more := <-changes.Deletes(): - if !more { - return nil - } - select { - case multiplexDeleteChannel <- kdelete: - case <-ctx.Done(): - return ctx.Err() - } - case <-ctx.Done(): - return ctx.Err() - } - } + return kdeleteMux.Run(ctx) }) - deleteToStream := make(chan any) + databaseDeletes := make(chan any) g.Go(func() error { - defer close(deleteToStream) + defer close(databaseDeletes) for { select { - case kdelete, more := <-deleteChannel: + case kdelete, more := <-kdeletes: if !more { return nil } select { - case deleteToStream <- kdelete.ID(): + case databaseDeletes <- kdelete.ID(): s.logger.Debugw( fmt.Sprintf("Sync: Deleted %s", kdelete.GetCanonicalName()), zap.String("id", kdelete.ID().String())) @@ -235,7 +183,7 @@ func (s *sync) Run(ctx context.Context, execOptions ...SyncOption) error { }) g.Go(func() error { - return database.NewDelete(s.db).Stream(ctx, s.factory(), deleteToStream) + return database.NewDelete(s.db).Stream(ctx, s.factory(), databaseDeletes) }) g.Go(func() error {