Skip to content

Commit

Permalink
bulker: fix for possible panic
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Jan 28, 2024
1 parent 2186540 commit 443e249
Showing 1 changed file with 23 additions and 23 deletions.
46 changes: 23 additions & 23 deletions bulkerapp/app/topic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,35 +134,35 @@ func (tm *TopicManager) Start() {
}

func (tm *TopicManager) LoadMetadata() {
topicsLastMessageDates := map[string]*time.Time{}
metadata, err := tm.kaftaAdminClient.GetMetadata(nil, true, tm.config.KafkaAdminMetadataTimeoutMs)
topicPartitionOffsets := make(map[kafka.TopicPartition]kafka.OffsetSpec)
for _, topic := range metadata.Topics {
t := topic.Topic
if !strings.HasPrefix(t, "__") {
for _, partition := range topic.Partitions {
topicPartitionOffsets[kafka.TopicPartition{Topic: &t, Partition: partition.ID}] = kafka.MaxTimestampOffsetSpec
}
}
}
start := time.Now()
res, err := tm.kaftaAdminClient.ListOffsets(context.Background(), topicPartitionOffsets)
if err != nil {
tm.Errorf("Error getting topic offsets: %v", err)
metrics.TopicManagerError("load_metadata_error").Inc()
tm.Errorf("Error getting metadata: %v", err)
} else {
for tp, offset := range res.ResultInfos {
if offset.Offset >= 0 && offset.Timestamp > 0 {
lastMessageDate := time.UnixMilli(offset.Timestamp)
topicsLastMessageDates[*tp.Topic] = &lastMessageDate
topicsLastMessageDates := map[string]*time.Time{}
topicPartitionOffsets := make(map[kafka.TopicPartition]kafka.OffsetSpec)
for _, topic := range metadata.Topics {
t := topic.Topic
if !strings.HasPrefix(t, "__") {
for _, partition := range topic.Partitions {
topicPartitionOffsets[kafka.TopicPartition{Topic: &t, Partition: partition.ID}] = kafka.MaxTimestampOffsetSpec
}
}
}
tm.Debugf("Got topic offsets for %d topics in %v", len(topicsLastMessageDates), time.Since(start))
}
start := time.Now()
res, err := tm.kaftaAdminClient.ListOffsets(context.Background(), topicPartitionOffsets)
if err != nil {
tm.Errorf("Error getting topic offsets: %v", err)
} else {
for tp, offset := range res.ResultInfos {
if offset.Offset >= 0 && offset.Timestamp > 0 {
lastMessageDate := time.UnixMilli(offset.Timestamp)
topicsLastMessageDates[*tp.Topic] = &lastMessageDate
}
}
tm.Debugf("Got topic offsets for %d topics in %v", len(topicsLastMessageDates), time.Since(start))
}

if err != nil {
metrics.TopicManagerError("load_metadata_error").Inc()
tm.Errorf("Error getting metadata: %v", err)
} else {
tm.processMetadata(metadata, topicsLastMessageDates)
}
}
Expand Down

0 comments on commit 443e249

Please sign in to comment.