Skip to content

Commit

Permalink
bulker: disable consumers for stale topics
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Jan 16, 2024
1 parent 525fe5d commit e414e24
Showing 1 changed file with 14 additions and 14 deletions.
28 changes: 14 additions & 14 deletions bulkerapp/app/topic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,20 +142,20 @@ func (tm *TopicManager) LoadMetadata() {
}
}
}
start := time.Now()
res, err := tm.kaftaAdminClient.ListOffsets(context.Background(), topicPartitionOffsets)
if err != nil {
tm.Errorf("Error getting topic offsets: %v", err)
} else {
for tp, offset := range res.ResultInfos {
if offset.Offset >= 0 {
topicsLastMessageDates[*tp.Topic] = time.UnixMilli(offset.Timestamp)
} else {
topicsLastMessageDates[*tp.Topic] = time.Time{}
}
}
}
tm.Infof("Got topic offsets in %v", time.Since(start))
//start := time.Now()
//res, err := tm.kaftaAdminClient.ListOffsets(context.Background(), topicPartitionOffsets)
//if err != nil {
// tm.Errorf("Error getting topic offsets: %v", err)
//} else {
// for tp, offset := range res.ResultInfos {
// if offset.Offset >= 0 {
// topicsLastMessageDates[*tp.Topic] = time.UnixMilli(offset.Timestamp)
// } else {
// topicsLastMessageDates[*tp.Topic] = time.Time{}
// }
// }
//}
//tm.Infof("Got topic offsets in %v", time.Since(start))

if err != nil {
metrics.TopicManagerError("load_metadata_error").Inc()
Expand Down

0 comments on commit e414e24

Please sign in to comment.