diff --git a/e2e/service.go b/e2e/service.go index 02a1f63..d8781f4 100644 --- a/e2e/service.go +++ b/e2e/service.go @@ -168,15 +168,13 @@ func (s *Service) Start(ctx context.Context) error { // Produce an init message until the consumer received at least one fetch initTicker := time.NewTicker(1000 * time.Second) isInitialized := false + // send first init message immediately + sendInitMessage(ctx, s.client, s.config.TopicManagement.Name) for !isInitialized { select { case <-initTicker.C: - s.client.Produce(ctx, &kgo.Record{ - Key: []byte("init-message"), - Value: nil, - Topic: s.config.TopicManagement.Name, - }, nil) + sendInitMessage(ctx, s.client, s.config.TopicManagement.Name) case <-initCh: isInitialized = true s.logger.Info("consumer has been successfully initialized") @@ -196,6 +194,14 @@ func (s *Service) Start(ctx context.Context) error { return nil } +func sendInitMessage(ctx context.Context, client *kgo.Client, topicName string) { + client.Produce(ctx, &kgo.Record{ + Key: []byte("init-message"), + Value: nil, + Topic: topicName, + }, nil) +} + func (s *Service) startReconciliation(ctx context.Context) { if !s.config.TopicManagement.Enabled { return