diff --git a/bulkerapp/app/topic_manager.go b/bulkerapp/app/topic_manager.go index cc7b23f..e401657 100644 --- a/bulkerapp/app/topic_manager.go +++ b/bulkerapp/app/topic_manager.go @@ -47,11 +47,11 @@ type TopicManager struct { repository *Repository cron *Cron // consumedTopics by destinationId. Consumed topics are topics that have consumer started - consumedTopics map[string]utils.Set[string] - topicsLastMessageDates map[string]time.Time - abandonedTopics utils.Set[string] - staleTopics utils.Set[string] - allTopics utils.Set[string] + consumedTopics map[string]utils.Set[string] + nonEmptyTopics utils.Set[string] + abandonedTopics utils.Set[string] + staleTopics utils.Set[string] + allTopics utils.Set[string] //batch consumers by destinationId batchConsumers map[string][]BatchConsumer @@ -132,7 +132,7 @@ func (tm *TopicManager) Start() { } func (tm *TopicManager) LoadMetadata() { - topicsLastMessageDates := make(map[string]time.Time) + nonEmptyTopics := utils.NewSet[string]() metadata, err := tm.kaftaAdminClient.GetMetadata(nil, true, tm.config.KafkaAdminMetadataTimeoutMs) topicPartitionOffsets := make(map[kafka.TopicPartition]kafka.OffsetSpec) for _, topic := range metadata.Topics { @@ -150,30 +150,27 @@ func (tm *TopicManager) LoadMetadata() { } else { for tp, offset := range res.ResultInfos { if offset.Offset >= 0 { - topicsLastMessageDates[*tp.Topic] = time.UnixMilli(offset.Timestamp) - } else { - topicsLastMessageDates[*tp.Topic] = time.Time{} + nonEmptyTopics.Put(*tp.Topic) } } - tm.Debugf("Got topic offsets for %d topics in %v", len(topicsLastMessageDates), time.Since(start)) + tm.Debugf("Got topic offsets for %d topics in %v", len(nonEmptyTopics), time.Since(start)) } if err != nil { metrics.TopicManagerError("load_metadata_error").Inc() tm.Errorf("Error getting metadata: %v", err) } else { - tm.processMetadata(metadata, topicsLastMessageDates) + tm.processMetadata(metadata, nonEmptyTopics) } } -func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, lastMessageDates map[string]time.Time) { +func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, nonEmptyTopics utils.Set[string]) { tm.Lock() defer tm.Unlock() start := time.Now() - if len(lastMessageDates) > 0 { - tm.topicsLastMessageDates = lastMessageDates + if nonEmptyTopics.Size() > 0 { + tm.nonEmptyTopics = nonEmptyTopics } - staleTopicsCutOff := time.Now().Add(-1 * time.Duration(tm.config.KafkaTopicRetentionHours) * time.Hour) var abandonedTopicsCount float64 var otherTopicsCount float64 topicsCountByMode := make(map[string]float64) @@ -188,11 +185,11 @@ func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, lastMessageDat abandonedTopicsCount++ continue } - lastMessageDate, ok := tm.topicsLastMessageDates[topic] - if ok && (lastMessageDate.IsZero() || lastMessageDate.Before(staleTopicsCutOff)) { - //staleTopics.Put(topic) - tm.Debugf("Topic %s is stale. Last message date: %v", topic, lastMessageDate) - //continue + ok := tm.nonEmptyTopics.Contains(topic) + if !ok { + staleTopics.Put(topic) + tm.Debugf("Topic %s is stale. It doesn't contain any messages", topic) + continue } destinationId, mode, tableName, err := ParseTopicId(topic) if err != nil { @@ -310,6 +307,7 @@ func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, lastMessageDat case retryTopicMode: tm.retryConsumers[destinationId] = ExcludeConsumerForTopic(tm.retryConsumers[destinationId], topic, tm.cron) } + dstTopics.Remove(topic) } } if destination.config.Special == "backup" || destination.config.Special == "metrics" {