From 8e915d5934194262e44a208dea6f8b31e2cb72f5 Mon Sep 17 00:00:00 2001 From: Dirk Wilden Date: Wed, 14 Aug 2024 13:53:27 +0200 Subject: [PATCH] explicitly defined params should take precedence --- internal/topic/topic-operation.go | 47 +++++++++++++++---------------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/internal/topic/topic-operation.go b/internal/topic/topic-operation.go index 417d322..4337f43 100644 --- a/internal/topic/topic-operation.go +++ b/internal/topic/topic-operation.go @@ -60,22 +60,6 @@ type CreateTopicFlags struct { Configs []string } -type CreateTopicConfig struct { - Name string `json:"Name"` - Partitions []struct { - ID int `json:"ID"` - OldestOffset int `json:"oldestOffset"` - NewestOffset int `json:"newestOffset"` - Leader string `json:"Leader"` - Replicas []int `json:"Replicas"` - InSyncReplicas []int `json:"inSyncReplicas"` - } `json:"Partitions"` - Configs []struct { - Name string `json:"Name"` - Value string `json:"Value"` - } `json:"Configs"` -} - type AlterTopicFlags struct { Partitions int32 ReplicationFactor int16 @@ -137,25 +121,40 @@ func (operation *Operation) CreateTopics(topics []string, flags CreateTopicFlags return errors.Wrap(err, "could not read topic description file") } - createTopicConfig := CreateTopicConfig{} + fileTopicConfig := Topic{} ext := path.Ext(flags.File) var unmarshalErr error switch ext { case ".yml", ".yaml": - unmarshalErr = yaml.Unmarshal(fileContent, &createTopicConfig) + unmarshalErr = yaml.Unmarshal(fileContent, &fileTopicConfig) case ".json": - unmarshalErr = json.Unmarshal(fileContent, &createTopicConfig) + unmarshalErr = json.Unmarshal(fileContent, &fileTopicConfig) default: return errors.Wrapf(err, "unsupported file format '%s'", ext) } if unmarshalErr != nil { - return errors.Wrap(err, "could not umarshal config file") + return errors.Wrap(err, "could not unmarshal config file") + } + + numPartitions := int32(len(fileTopicConfig.Partitions)) + if flags.Partitions == 1 { + topicDetails.NumPartitions = numPartitions } - topicDetails.NumPartitions = int32(len(createTopicConfig.Partitions)) - topicDetails.ReplicationFactor = int16(len(createTopicConfig.Partitions[0].Replicas)) - for _, v := range createTopicConfig.Configs { - topicDetails.ConfigEntries[v.Name] = &v.Value + replicationFactors := map[int16]struct{}{} + for _, partition := range fileTopicConfig.Partitions { + replicationFactors[int16(len(partition.Replicas))] = struct{}{} + } + if flags.ReplicationFactor == -1 && len(replicationFactors) == 1 { + topicDetails.ReplicationFactor = int16(len(fileTopicConfig.Partitions[0].Replicas)) + } else if flags.ReplicationFactor == -1 && len(replicationFactors) != 1 { + output.Warnf("replication factor from file ignored. partitions have different replicaCounts.") + } + + for _, v := range fileTopicConfig.Configs { + if _, ok := topicDetails.ConfigEntries[v.Name]; !ok { + topicDetails.ConfigEntries[v.Name] = &v.Value + } } }