Skip to content

Commit

Permalink
bulker: disable consumers for stale topics
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Jan 15, 2024
1 parent 3a409df commit 525fe5d
Showing 1 changed file with 1 addition and 1 deletion.
2 changes: 1 addition & 1 deletion bulkerapp/app/topic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, lastMessageDat
}
for _, table := range tables {
topicId, _ := MakeTopicId(destination.Id(), "batch", table, false)
if !hasTopics || !dstTopics.Contains(topicId) {
if (!hasTopics || !dstTopics.Contains(topicId)) && !staleTopics.Contains(topicId) {
tm.Infof("Creating topic %s for destination %s", topicId, destination.Id())
err := tm.createDestinationTopic(topicId, nil)
if err != nil {
Expand Down

0 comments on commit 525fe5d

Please sign in to comment.