Skip to content

Commit

Permalink
update e2e topic config
Browse files Browse the repository at this point in the history
  • Loading branch information
rikimaru0345 committed Apr 29, 2021
1 parent 9813ad2 commit a1ca198
Showing 1 changed file with 21 additions and 23 deletions.
44 changes: 21 additions & 23 deletions minion/endtoend_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package minion
import (
"context"
"fmt"
"strconv"
"time"

"github.com/twmb/franz-go/pkg/kmsg"
Expand Down Expand Up @@ -125,31 +124,30 @@ func (s *Service) validateManagementTopic(ctx context.Context) error {

func createTopicConfig(cfgTopic EndToEndTopicConfig) []kmsg.CreateTopicsRequestTopicConfig {

minISRConf := kmsg.NewCreateTopicsRequestTopicConfig()
minISR := strconv.Itoa(cfgTopic.ReplicationFactor)
minISRConf.Name = "min.insync.replicas"
minISRConf.Value = &minISR

cleanupPolicyConf := kmsg.NewCreateTopicsRequestTopicConfig()
cleanupStr := "delete"
cleanupPolicyConf.Name = "cleanup.policy"
cleanupPolicyConf.Value = &cleanupStr

retentionByteConf := kmsg.NewCreateTopicsRequestTopicConfig()
retentionStr := "10000000"
retentionByteConf.Name = "retention.bytes"
retentionByteConf.Value = &retentionStr
topicConfig := func(name string, value interface{}) kmsg.CreateTopicsRequestTopicConfig {
prop := kmsg.NewCreateTopicsRequestTopicConfig()
prop.Name = name
valStr := string(fmt.Sprintf("%v", value))
prop.Value = &valStr
return prop
}

segmentByteConf := kmsg.NewCreateTopicsRequestTopicConfig()
segmentStr := "1000000"
segmentByteConf.Name = "segment.bytes"
segmentByteConf.Value = &segmentStr
minISR := 1
if cfgTopic.ReplicationFactor >= 3 {
// Only with 3+ replicas does it make sense to require acks from 2 brokers
// todo: think about if we should change how 'producer.requiredAcks' works.
// we probably don't even need this configured on the topic directly...
minISR = 2
}

// Even though kminion's end-to-end feature actually does not require any
// real persistence beyond a few minutes; it might be good too keep messages
// around a bit for debugging.
return []kmsg.CreateTopicsRequestTopicConfig{
minISRConf,
cleanupPolicyConf,
retentionByteConf,
segmentByteConf,
topicConfig("cleanup.policy", "delete"),
topicConfig("segment.ms", (time.Hour * 12).Milliseconds()), // new segment every 12h
topicConfig("retention.ms", (time.Hour * 24).Milliseconds()), // discard segments older than 24h
topicConfig("min.insync.replicas", minISR),
}
}

Expand Down

0 comments on commit a1ca198

Please sign in to comment.