diff --git a/cmd/icinga-kubernetes/main.go b/cmd/icinga-kubernetes/main.go index b8ea8156..830ff08e 100644 --- a/cmd/icinga-kubernetes/main.go +++ b/cmd/icinga-kubernetes/main.go @@ -95,12 +95,6 @@ func main() { ).Run(ctx) }) - //upsertPodChannelSpreader := sync.NewChannelSpreader[database.Entity](forwardUpsertPodsChannel) - //deletePodChannelSpreader := sync.NewChannelSpreader[any](forwardDeletePodsChannel) - - //upsertPodChannel := upsertPodChannelSpreader.NewChannel() - //deletePodChannel := deletePodChannelSpreader.NewChannel() - forwardUpsertPodsToLogChannel := make(chan contracts.KUpsert) forwardDeletePodsToLogChannel := make(chan contracts.KDelete) @@ -114,9 +108,11 @@ func main() { schema.NewPod, informers.Core().V1().Pods().Informer(), logs.GetChildLogger("Pods"), + ).Run( + ctx, sync.WithForwardUpsertToLog(forwardUpsertPodsToLogChannel), sync.WithForwardDeleteToLog(forwardDeletePodsToLogChannel), - ).Run(ctx) + ) }) logSync := sync.NewLogSync(k, db, logs.GetChildLogger("ContainerLogs")) diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index de041a37..acecbf30 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -14,31 +14,15 @@ import ( kcache "k8s.io/client-go/tools/cache" ) -type Option func(s *sync) - -func WithForwardUpsertToLog(channel chan<- contracts.KUpsert) Option { - return func(s *sync) { - s.forwardUpsertToLogChannel = channel - } -} - -func WithForwardDeleteToLog(channel chan<- contracts.KDelete) Option { - return func(s *sync) { - s.forwardDeleteToLogChannel = channel - } -} - type Sync interface { - Run(context.Context) error + Run(context.Context, ...SyncOption) error } type sync struct { - db *database.DB - factory func() contracts.Resource - informer kcache.SharedInformer - logger *logging.Logger - forwardUpsertToLogChannel chan<- contracts.KUpsert - forwardDeleteToLogChannel chan<- contracts.KDelete + db *database.DB + factory func() contracts.Resource + informer kcache.SharedInformer + logger *logging.Logger } func NewSync( @@ -46,7 +30,6 @@ func NewSync( factory func() contracts.Resource, informer kcache.SharedInformer, logger *logging.Logger, - options ...Option, ) Sync { s := &sync{ db: db, @@ -55,14 +38,39 @@ func NewSync( factory: factory, } - for _, option := range options { - option(s) + return s +} + +func WithForwardUpsertToLog(channel chan<- contracts.KUpsert) SyncOption { + return func(options *SyncOptions) { + options.forwardUpsertToLogChannel = channel + } +} + +func WithForwardDeleteToLog(channel chan<- contracts.KDelete) SyncOption { + return func(options *SyncOptions) { + options.forwardDeleteToLogChannel = channel } +} - return s +type SyncOption func(options *SyncOptions) + +type SyncOptions struct { + forwardUpsertToLogChannel chan<- contracts.KUpsert + forwardDeleteToLogChannel chan<- contracts.KDelete } -func (s *sync) Run(ctx context.Context) error { +func NewOptionStorage(execOptions ...SyncOption) *SyncOptions { + optionStorage := &SyncOptions{} + + for _, option := range execOptions { + option(optionStorage) + } + + return optionStorage +} + +func (s *sync) Run(ctx context.Context, execOptions ...SyncOption) error { s.logger.Info("Starting sync") s.logger.Debug("Warming up") @@ -90,6 +98,8 @@ func (s *sync) Run(ctx context.Context) error { s.factory().GetResourceVersion() + syncOptions := NewOptionStorage(execOptions...) + // init upsert channel spreader multiplexUpsertChannel := make(chan contracts.KUpsert) defer close(multiplexUpsertChannel) @@ -98,8 +108,8 @@ func (s *sync) Run(ctx context.Context) error { upsertChannel := multiplexUpsert.NewChannel() - if s.forwardUpsertToLogChannel != nil { - multiplexUpsert.AddChannel(s.forwardUpsertToLogChannel) + if syncOptions.forwardUpsertToLogChannel != nil { + multiplexUpsert.AddChannel(syncOptions.forwardUpsertToLogChannel) } // run upsert channel spreader @@ -172,8 +182,8 @@ func (s *sync) Run(ctx context.Context) error { deleteChannel := multiplexDelete.NewChannel() - if s.forwardDeleteToLogChannel != nil { - multiplexDelete.AddChannel(s.forwardDeleteToLogChannel) + if syncOptions.forwardDeleteToLogChannel != nil { + multiplexDelete.AddChannel(syncOptions.forwardDeleteToLogChannel) } // run delete channel spreader