From 054fc07ab0e388b2d810d2618f16ba8b5702350f Mon Sep 17 00:00:00 2001 From: Dmitry Kisler Date: Sun, 27 Mar 2022 18:50:31 +0200 Subject: [PATCH 1/7] Add the field to avoid creating missing topic. Add field AllowAutoTopicCreation to Writer struct. It prevents the side-effect of creating missing topic. Closes #872 --- writer.go | 7 +++++-- writer_test.go | 28 ++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/writer.go b/writer.go index cdbe07129..89ac87192 100644 --- a/writer.go +++ b/writer.go @@ -185,6 +185,9 @@ type Writer struct { // If nil, DefaultTransport is used. Transport RoundTripper + // AllowAutoTopicCreation notifies writer to create topic is missing. + AllowAutoTopicCreation bool + // Manages the current set of partition-topic writers. group sync.WaitGroup mutex sync.Mutex @@ -733,7 +736,7 @@ func (w *Writer) partitions(ctx context.Context, topic string) (int, error) { // caching recent results (the kafka.Transport types does). r, err := client.transport().RoundTrip(ctx, client.Addr, &metadataAPI.Request{ TopicNames: []string{topic}, - AllowAutoTopicCreation: true, + AllowAutoTopicCreation: w.AllowAutoTopicCreation, }) if err != nil { return 0, err @@ -941,7 +944,7 @@ func newBatchQueue(initialSize int) batchQueue { bq := batchQueue{ queue: make([]*writeBatch, 0, initialSize), mutex: &sync.Mutex{}, - cond: &sync.Cond{}, + cond: &sync.Cond{}, } bq.cond.L = bq.mutex diff --git a/writer_test.go b/writer_test.go index 8737809f8..ed4778466 100644 --- a/writer_test.go +++ b/writer_test.go @@ -160,6 +160,10 @@ func TestWriter(t *testing.T) { scenario: "writing a message to a non-existant topic creates the topic", function: testWriterAutoCreateTopic, }, + { + scenario: "terminates on an attempts to write a message to a non-existant topic", + function: testWriterTerminateMissingTopic, + }, } for _, test := range tests { @@ -737,6 +741,30 @@ func testWriterAutoCreateTopic(t *testing.T) { } } +func testWriterTerminateMissingTopic(t *testing.T) { + topic := makeTopic() + + transport := &Transport{} + defer transport.CloseIdleConnections() + + writer := &Writer{ + Addr: TCP("localhost:9092"), + Topic: topic, + Balancer: &RoundRobin{}, + RequiredAcks: RequireNone, + AllowAutoTopicCreation: false, + Transport: transport, + } + defer writer.Close() + + msg := Message{Value: []byte("FooBar")} + + if err := writer.WriteMessages(context.Background(), msg); err == nil { + t.Fatal("Kafka error [3] UNKNOWN_TOPIC_OR_PARTITION is expected") + return + } +} + type staticBalancer struct { partition int } From 59a3338d7c735111c445bec97a6afc960ece49e7 Mon Sep 17 00:00:00 2001 From: Dmitry Kisler Date: Sun, 27 Mar 2022 23:56:53 +0200 Subject: [PATCH 2/7] Fix the test when missing topic creation is expected Closes #872 --- writer_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/writer_test.go b/writer_test.go index ed4778466..849771053 100644 --- a/writer_test.go +++ b/writer_test.go @@ -716,6 +716,7 @@ func testWriterAutoCreateTopic(t *testing.T) { Topic: topic, Balancer: &RoundRobin{}, }) + w.AllowAutoTopicCreation = true defer w.Close() msg := Message{Key: []byte("key"), Value: []byte("Hello World")} From 682d91a1d33ea10513356edc977413c6642e2c6e Mon Sep 17 00:00:00 2001 From: Dmitry Kisler Date: Sun, 27 Mar 2022 23:58:17 +0200 Subject: [PATCH 3/7] Add documentation to create missing topic. Add the feature on the Writer's features list. Add a code snippet to create a topic before messages publication. Closes #872 --- README.md | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/README.md b/README.md index 33e452b38..a68255434 100644 --- a/README.md +++ b/README.md @@ -337,6 +337,7 @@ to use in most cases as it provides additional features: - Synchronous or asynchronous writes of messages to Kafka. - Asynchronous cancellation using contexts. - Flushing of pending messages on close to support graceful shutdowns. +- Creation of a missing topic before publishing a message. *Note!* it was the default behaviour up to the version `v0.4.30`. ```go // make a writer that produces to topic-A, using the least-bytes distribution @@ -369,6 +370,55 @@ if err := w.Close(); err != nil { } ``` +### Missing topic creation before publication + +```go +// Make a writer that publishes messages to topic-A. +// The topic is being if missing. +w := &Writer{ + Addr: TCP("localhost:9092"), + Topic: "topic-A", + AllowAutoTopicCreation: true, +} + +messages := []kafka.Message{ + { + Key: []byte("Key-A"), + Value: []byte("Hello World!"), + }, + { + Key: []byte("Key-B"), + Value: []byte("One!"), + }, + { + Key: []byte("Key-C"), + Value: []byte("Two!"), + }, +} + +var err error +const retries = 3 +for i := 0; i < retries; i++ { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // an attempt to creating topic prior to publishing the message is being made. + err = w.WriteMessages(ctx, messages...) + if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) { + time.Sleep(time.Millisecond * 250) + continue + } + + if err != nil { + log.Fatalf("unexpected error %v", err) + } +} + +if err := w.Close(); err != nil { + log.Fatal("failed to close writer:", err) +} +``` + ### Writing to multiple topics Normally, the `WriterConfig.Topic` is used to initialize a single-topic writer. From dc5d0c671591d08dabbe435e73ebb25171e8de5f Mon Sep 17 00:00:00 2001 From: Dmitry Kisler Date: Mon, 28 Mar 2022 00:03:26 +0200 Subject: [PATCH 4/7] Fixes missing word. --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index a68255434..7aab85df7 100644 --- a/README.md +++ b/README.md @@ -374,7 +374,7 @@ if err := w.Close(); err != nil { ```go // Make a writer that publishes messages to topic-A. -// The topic is being if missing. +// The topic is being created if missing. w := &Writer{ Addr: TCP("localhost:9092"), Topic: "topic-A", From 55102ba6b3bc903b3c292192caec7ea6aa1099f7 Mon Sep 17 00:00:00 2001 From: Dmitry Kisler Date: Wed, 30 Mar 2022 08:46:37 +0200 Subject: [PATCH 5/7] Update writer_test.go Co-authored-by: Erik Weathers --- writer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/writer_test.go b/writer_test.go index 849771053..da95423fa 100644 --- a/writer_test.go +++ b/writer_test.go @@ -161,7 +161,7 @@ func TestWriter(t *testing.T) { function: testWriterAutoCreateTopic, }, { - scenario: "terminates on an attempts to write a message to a non-existant topic", + scenario: "terminates on an attempt to write a message to a nonexistent topic", function: testWriterTerminateMissingTopic, }, } From 5d70fac38de0288336822629dde20bd591b834e3 Mon Sep 17 00:00:00 2001 From: Dmitry Kisler Date: Wed, 30 Mar 2022 08:47:05 +0200 Subject: [PATCH 6/7] Update README.md language adjustment @erikdw Co-authored-by: Erik Weathers --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 7aab85df7..cebd0c54d 100644 --- a/README.md +++ b/README.md @@ -374,7 +374,7 @@ if err := w.Close(); err != nil { ```go // Make a writer that publishes messages to topic-A. -// The topic is being created if missing. +// The topic will be created if it is missing. w := &Writer{ Addr: TCP("localhost:9092"), Topic: "topic-A", From f6edf1dfd1e9149540a81cc6c7554042632bd097 Mon Sep 17 00:00:00 2001 From: Dmitry Kisler Date: Wed, 30 Mar 2022 08:47:45 +0200 Subject: [PATCH 7/7] Update README.md Adjust styling by @erikdw Co-authored-by: Erik Weathers --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index cebd0c54d..d08e67f0f 100644 --- a/README.md +++ b/README.md @@ -402,7 +402,7 @@ for i := 0; i < retries; i++ { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - // an attempt to creating topic prior to publishing the message is being made. + // attempt to create topic prior to publishing the message err = w.WriteMessages(ctx, messages...) if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) { time.Sleep(time.Millisecond * 250)