Skip to content

Commit

Permalink
explicitly defined params should take precedence
Browse files Browse the repository at this point in the history
  • Loading branch information
d-rk committed Aug 14, 2024
1 parent f71fdeb commit 8e915d5
Showing 1 changed file with 23 additions and 24 deletions.
47 changes: 23 additions & 24 deletions internal/topic/topic-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,6 @@ type CreateTopicFlags struct {
Configs []string
}

type CreateTopicConfig struct {
Name string `json:"Name"`
Partitions []struct {
ID int `json:"ID"`
OldestOffset int `json:"oldestOffset"`
NewestOffset int `json:"newestOffset"`
Leader string `json:"Leader"`
Replicas []int `json:"Replicas"`
InSyncReplicas []int `json:"inSyncReplicas"`
} `json:"Partitions"`
Configs []struct {
Name string `json:"Name"`
Value string `json:"Value"`
} `json:"Configs"`
}

type AlterTopicFlags struct {
Partitions int32
ReplicationFactor int16
Expand Down Expand Up @@ -137,25 +121,40 @@ func (operation *Operation) CreateTopics(topics []string, flags CreateTopicFlags
return errors.Wrap(err, "could not read topic description file")
}

createTopicConfig := CreateTopicConfig{}
fileTopicConfig := Topic{}
ext := path.Ext(flags.File)
var unmarshalErr error
switch ext {
case ".yml", ".yaml":
unmarshalErr = yaml.Unmarshal(fileContent, &createTopicConfig)
unmarshalErr = yaml.Unmarshal(fileContent, &fileTopicConfig)
case ".json":
unmarshalErr = json.Unmarshal(fileContent, &createTopicConfig)
unmarshalErr = json.Unmarshal(fileContent, &fileTopicConfig)
default:
return errors.Wrapf(err, "unsupported file format '%s'", ext)
}
if unmarshalErr != nil {
return errors.Wrap(err, "could not umarshal config file")
return errors.Wrap(err, "could not unmarshal config file")
}

numPartitions := int32(len(fileTopicConfig.Partitions))
if flags.Partitions == 1 {
topicDetails.NumPartitions = numPartitions
}

topicDetails.NumPartitions = int32(len(createTopicConfig.Partitions))
topicDetails.ReplicationFactor = int16(len(createTopicConfig.Partitions[0].Replicas))
for _, v := range createTopicConfig.Configs {
topicDetails.ConfigEntries[v.Name] = &v.Value
replicationFactors := map[int16]struct{}{}
for _, partition := range fileTopicConfig.Partitions {
replicationFactors[int16(len(partition.Replicas))] = struct{}{}
}
if flags.ReplicationFactor == -1 && len(replicationFactors) == 1 {
topicDetails.ReplicationFactor = int16(len(fileTopicConfig.Partitions[0].Replicas))
} else if flags.ReplicationFactor == -1 && len(replicationFactors) != 1 {
output.Warnf("replication factor from file ignored. partitions have different replicaCounts.")
}

for _, v := range fileTopicConfig.Configs {
if _, ok := topicDetails.ConfigEntries[v.Name]; !ok {
topicDetails.ConfigEntries[v.Name] = &v.Value
}
}
}

Expand Down

0 comments on commit 8e915d5

Please sign in to comment.