Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow creating topic using topic description from file (rebased) #212

Merged
merged 3 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Added
- [#203](https://github.com/deviceinsight/kafkactl/pull/203) Add pod override fields affinity and tolerations
- [#210](https://github.com/deviceinsight/kafkactl/pull/210) Create topic from file

## 5.2.0 - 2024-08-08

Expand Down
36 changes: 36 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,42 @@ or with protoset
kafkactl consume <topic> --key-proto-type TopicKey --value-proto-type TopicValue --protoset-file kafkamsg.protoset
----

=== Create topics

The `create topic` allows you to create one or multiple topics.

Basic usage:
[,bash]
----
kafkactl create topic my-topic
----

The partition count can be specified with:
[,bash]
----
kafkactl create topic my-topic --partitions 32
----

The replication factor can be specified with:
[,bash]
----
kafkactl create topic my-topic --replication-factor 3
----

Configs can also be provided:
[,bash]
----
kafkactl create topic my-topic --config retention.ms=3600000 --config=cleanup.policy=compact
----

The topic configuration can also be taken from an existing topic using the following:
[,bash]
----
kafkactl describe topic my-topic -o json > my-topic-config.json
kafkactl create topic my-topic-clone --file my-topic-config.json
----


=== Altering topics

Using the `alter topic` command allows you to change the partition count, replication factor and topic-level
Expand Down
1 change: 1 addition & 0 deletions cmd/create/create-topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func newCreateTopicCmd() *cobra.Command {
cmdCreateTopic.Flags().Int32VarP(&flags.Partitions, "partitions", "p", 1, "number of partitions")
cmdCreateTopic.Flags().Int16VarP(&flags.ReplicationFactor, "replication-factor", "r", -1, "replication factor")
cmdCreateTopic.Flags().BoolVarP(&flags.ValidateOnly, "validate-only", "v", false, "validate only")
cmdCreateTopic.Flags().StringVarP(&flags.File, "file", "f", "", "file with topic description")
cmdCreateTopic.Flags().StringArrayVarP(&flags.Configs, "config", "c", flags.Configs, "configs in format `key=value`")

return cmdCreateTopic
Expand Down
96 changes: 96 additions & 0 deletions cmd/create/create-topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package create_test

import (
"fmt"
"os"
"testing"
"time"

Expand Down Expand Up @@ -109,6 +110,101 @@ partitions:
testutil.AssertEquals(t, fmt.Sprintf(expected, topicName), stdOut)
}

func TestCreateTopicWithConfigFileIntegration(t *testing.T) {

testutil.StartIntegrationTest(t)

kafkaCtl := testutil.CreateKafkaCtlCommand()

topicName := testutil.GetPrefixedName("new-topic")
configFile := fmt.Sprintf(`
name: %s
partitions:
- id: 0
oldestOffset: 0
newestOffset: 290
leader: kafka:9092
replicas: [1]
inSyncReplicas: [1]
- id: 1
oldestOffset: 0
newestOffset: 258
leader: kafka:9092
replicas: [1]
inSyncReplicas: [1]
- id: 2
oldestOffset: 0
newestOffset: 290
leader: kafka:9092
replicas: [1]
inSyncReplicas: [1]
configs:
- name: cleanup.policy
value: compact
- name: max.message.bytes
value: "10485880"
- name: min.cleanable.dirty.ratio
value: "1.0E-4"
- name: delete.retention.ms
value: "0"
- name: segment.ms
value: "100"
`, topicName)

tmpFile, err := os.CreateTemp("", fmt.Sprintf("%s-*.yaml", topicName))
if err != nil {
t.Fatalf("could not create temp file with topic config: %s", err)
}
defer tmpFile.Close()
if _, err := tmpFile.WriteString(configFile); err != nil {
t.Fatalf("could not write temp config file: %s", err)
}

if _, err := kafkaCtl.Execute("create", "topic", topicName, "-f", tmpFile.Name()); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, fmt.Sprintf("topic created: %s", topicName), kafkaCtl.GetStdOut())

describeTopic(t, kafkaCtl, topicName)
stdOut := testutil.WithoutBrokerReferences(kafkaCtl.GetStdOut())

expected := `
name: %s
partitions:
- id: 0
oldestOffset: 0
newestOffset: 0
leader: any-broker
replicas: [any-broker-id]
inSyncReplicas: [any-broker-id]
- id: 1
oldestOffset: 0
newestOffset: 0
leader: any-broker
replicas: [any-broker-id]
inSyncReplicas: [any-broker-id]
- id: 2
oldestOffset: 0
newestOffset: 0
leader: any-broker
replicas: [any-broker-id]
inSyncReplicas: [any-broker-id]
configs:
- name: cleanup.policy
value: compact
- name: max.message.bytes
value: "10485880"
- name: min.cleanable.dirty.ratio
value: "1.0E-4"
- name: delete.retention.ms
value: "0"
- name: segment.ms
value: "100"`

testutil.AssertEquals(t, fmt.Sprintf(expected, topicName), stdOut)
}

func describeTopic(t *testing.T, kafkaCtl testutil.KafkaCtlTestCommand, topicName string) {
describeTopic := func(_ uint) error {
_, err := kafkaCtl.Execute("describe", "topic", topicName, "-o", "yaml")
Expand Down
47 changes: 47 additions & 0 deletions internal/topic/topic-operation.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package topic

import (
"encoding/json"
"fmt"
"os"
"path"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -53,6 +56,7 @@ type CreateTopicFlags struct {
Partitions int32
ReplicationFactor int16
ValidateOnly bool
File string
Configs []string
}

Expand Down Expand Up @@ -111,6 +115,49 @@ func (operation *Operation) CreateTopics(topics []string, flags CreateTopicFlags
topicDetails.ConfigEntries[configParts[0]] = &configParts[1]
}

if flags.File != "" {
fileContent, err := os.ReadFile(flags.File)
if err != nil {
return errors.Wrap(err, "could not read topic description file")
}

fileTopicConfig := Topic{}
ext := path.Ext(flags.File)
var unmarshalErr error
switch ext {
case ".yml", ".yaml":
unmarshalErr = yaml.Unmarshal(fileContent, &fileTopicConfig)
case ".json":
unmarshalErr = json.Unmarshal(fileContent, &fileTopicConfig)
default:
return errors.Wrapf(err, "unsupported file format '%s'", ext)
}
if unmarshalErr != nil {
return errors.Wrap(err, "could not unmarshal config file")
}

numPartitions := int32(len(fileTopicConfig.Partitions))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gotha I changed your implementation so that explicitly defined parameters always take precedence over parameters from file

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, cool.
Thank you for merging this.

if flags.Partitions == 1 {
topicDetails.NumPartitions = numPartitions
}

replicationFactors := map[int16]struct{}{}
for _, partition := range fileTopicConfig.Partitions {
replicationFactors[int16(len(partition.Replicas))] = struct{}{}
}
if flags.ReplicationFactor == -1 && len(replicationFactors) == 1 {
topicDetails.ReplicationFactor = int16(len(fileTopicConfig.Partitions[0].Replicas))
} else if flags.ReplicationFactor == -1 && len(replicationFactors) != 1 {
output.Warnf("replication factor from file ignored. partitions have different replicaCounts.")
}

for _, v := range fileTopicConfig.Configs {
if _, ok := topicDetails.ConfigEntries[v.Name]; !ok {
topicDetails.ConfigEntries[v.Name] = &v.Value
}
}
}

for _, topic := range topics {
if err = admin.CreateTopic(topic, &topicDetails, flags.ValidateOnly); err != nil {
return errors.Wrap(err, "failed to create topic")
Expand Down
Loading