Skip to content

Commit

Permalink
Send first e2e init message immediately
Browse files Browse the repository at this point in the history
Otherwise right now we wait for 16 minutes until that happens.
  • Loading branch information
alenkacz committed Nov 3, 2021
1 parent 64d41d5 commit 7c2e82e
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,13 @@ func (s *Service) Start(ctx context.Context) error {
// Produce an init message until the consumer received at least one fetch
initTicker := time.NewTicker(1000 * time.Second)
isInitialized := false
// send first init message immediately
sendInitMessage(ctx, s.client, s.config.TopicManagement.Name)

for !isInitialized {
select {
case <-initTicker.C:
s.client.Produce(ctx, &kgo.Record{
Key: []byte("init-message"),
Value: nil,
Topic: s.config.TopicManagement.Name,
}, nil)
sendInitMessage(ctx, s.client, s.config.TopicManagement.Name)
case <-initCh:
isInitialized = true
s.logger.Info("consumer has been successfully initialized")
Expand All @@ -196,6 +194,14 @@ func (s *Service) Start(ctx context.Context) error {
return nil
}

func sendInitMessage(ctx context.Context, client *kgo.Client, topicName string) {
client.Produce(ctx, &kgo.Record{
Key: []byte("init-message"),
Value: nil,
Topic: topicName,
}, nil)
}

func (s *Service) startReconciliation(ctx context.Context) {
if !s.config.TopicManagement.Enabled {
return
Expand Down

0 comments on commit 7c2e82e

Please sign in to comment.