Skip to content

Commit

Permalink
bulker: fix stale topic logic
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Jan 27, 2024
1 parent 260be8e commit bdb1c05
Showing 1 changed file with 18 additions and 20 deletions.
38 changes: 18 additions & 20 deletions bulkerapp/app/topic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ type TopicManager struct {
repository *Repository
cron *Cron
// consumedTopics by destinationId. Consumed topics are topics that have consumer started
consumedTopics map[string]utils.Set[string]
topicsLastMessageDates map[string]time.Time
abandonedTopics utils.Set[string]
staleTopics utils.Set[string]
allTopics utils.Set[string]
consumedTopics map[string]utils.Set[string]
nonEmptyTopics utils.Set[string]
abandonedTopics utils.Set[string]
staleTopics utils.Set[string]
allTopics utils.Set[string]

//batch consumers by destinationId
batchConsumers map[string][]BatchConsumer
Expand Down Expand Up @@ -132,7 +132,7 @@ func (tm *TopicManager) Start() {
}

func (tm *TopicManager) LoadMetadata() {
topicsLastMessageDates := make(map[string]time.Time)
nonEmptyTopics := utils.NewSet[string]()
metadata, err := tm.kaftaAdminClient.GetMetadata(nil, true, tm.config.KafkaAdminMetadataTimeoutMs)
topicPartitionOffsets := make(map[kafka.TopicPartition]kafka.OffsetSpec)
for _, topic := range metadata.Topics {
Expand All @@ -150,30 +150,27 @@ func (tm *TopicManager) LoadMetadata() {
} else {
for tp, offset := range res.ResultInfos {
if offset.Offset >= 0 {
topicsLastMessageDates[*tp.Topic] = time.UnixMilli(offset.Timestamp)
} else {
topicsLastMessageDates[*tp.Topic] = time.Time{}
nonEmptyTopics.Put(*tp.Topic)
}
}
tm.Debugf("Got topic offsets for %d topics in %v", len(topicsLastMessageDates), time.Since(start))
tm.Debugf("Got topic offsets for %d topics in %v", len(nonEmptyTopics), time.Since(start))
}

if err != nil {
metrics.TopicManagerError("load_metadata_error").Inc()
tm.Errorf("Error getting metadata: %v", err)
} else {
tm.processMetadata(metadata, topicsLastMessageDates)
tm.processMetadata(metadata, nonEmptyTopics)
}
}

func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, lastMessageDates map[string]time.Time) {
func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, nonEmptyTopics utils.Set[string]) {
tm.Lock()
defer tm.Unlock()
start := time.Now()
if len(lastMessageDates) > 0 {
tm.topicsLastMessageDates = lastMessageDates
if nonEmptyTopics.Size() > 0 {
tm.nonEmptyTopics = nonEmptyTopics
}
staleTopicsCutOff := time.Now().Add(-1 * time.Duration(tm.config.KafkaTopicRetentionHours) * time.Hour)
var abandonedTopicsCount float64
var otherTopicsCount float64
topicsCountByMode := make(map[string]float64)
Expand All @@ -188,11 +185,11 @@ func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, lastMessageDat
abandonedTopicsCount++
continue
}
lastMessageDate, ok := tm.topicsLastMessageDates[topic]
if ok && (lastMessageDate.IsZero() || lastMessageDate.Before(staleTopicsCutOff)) {
//staleTopics.Put(topic)
tm.Debugf("Topic %s is stale. Last message date: %v", topic, lastMessageDate)
//continue
ok := tm.nonEmptyTopics.Contains(topic)
if !ok {
staleTopics.Put(topic)
tm.Debugf("Topic %s is stale. It doesn't contain any messages", topic)
continue
}
destinationId, mode, tableName, err := ParseTopicId(topic)
if err != nil {
Expand Down Expand Up @@ -310,6 +307,7 @@ func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, lastMessageDat
case retryTopicMode:
tm.retryConsumers[destinationId] = ExcludeConsumerForTopic(tm.retryConsumers[destinationId], topic, tm.cron)
}
dstTopics.Remove(topic)
}
}
if destination.config.Special == "backup" || destination.config.Special == "metrics" {
Expand Down

0 comments on commit bdb1c05

Please sign in to comment.