Skip to content

Commit

Permalink
Allow Writer to automatically create topics. (#775)
Browse files Browse the repository at this point in the history
* set AutoCreateTopic when requesting metadata in Writer

if a metadata request is made with auto create topic set,
we now make a metadata request to trigger the topic creation
if the topic is not already in the cache and update the cache
after the request is made.
  • Loading branch information
rhansen2 authored Nov 2, 2021
1 parent 40933d2 commit ea83b29
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 3 deletions.
30 changes: 28 additions & 2 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,14 +344,32 @@ func (p *connPool) roundTrip(ctx context.Context, req Request) (Response, error)

switch m := req.(type) {
case *meta.Request:
// We serve metadata requests directly from the transport cache.
// We serve metadata requests directly from the transport cache unless
// we would like to auto create a topic that isn't in our cache.
//
// This reduces the number of round trips to kafka brokers while keeping
// the logic simple when applying partitioning strategies.
if state.err != nil {
return nil, state.err
}
return filterMetadataResponse(m, state.metadata), nil

cachedMeta := filterMetadataResponse(m, state.metadata)
// requestNeeded indicates if we need to send this metadata request to the server.
// It's true when we want to auto-create topics and we don't have the topic in our
// cache.
var requestNeeded bool
if m.AllowAutoTopicCreation {
for _, topic := range cachedMeta.Topics {
if topic.ErrorCode == int16(UnknownTopicOrPartition) {
requestNeeded = true
break
}
}
}

if !requestNeeded {
return cachedMeta, nil
}

case protocol.Splitter:
// Messages that implement the Splitter interface trigger the creation of
Expand Down Expand Up @@ -392,6 +410,14 @@ func (p *connPool) roundTrip(ctx context.Context, req Request) (Response, error)
}

p.refreshMetadata(ctx, topicsToRefresh)
case *meta.Response:
m := req.(*meta.Request)
// If we get here with allow auto topic creation then
// we didn't have that topic in our cache so we should update
// the cache.
if m.AllowAutoTopicCreation {
p.refreshMetadata(ctx, m.TopicNames)
}
}

return r, nil
Expand Down
3 changes: 2 additions & 1 deletion writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,8 @@ func (w *Writer) partitions(ctx context.Context, topic string) (int, error) {
// It is expected that the transport will optimize this request by
// caching recent results (the kafka.Transport types does).
r, err := client.transport().RoundTrip(ctx, client.Addr, &metadataAPI.Request{
TopicNames: []string{topic},
TopicNames: []string{topic},
AllowAutoTopicCreation: true,
})
if err != nil {
return 0, err
Expand Down
39 changes: 39 additions & 0 deletions writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka

import (
"context"
"errors"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -155,6 +156,10 @@ func TestWriter(t *testing.T) {
scenario: "writing a message to an invalid partition",
function: testWriterInvalidPartition,
},
{
scenario: "writing a message to a non-existant topic creates the topic",
function: testWriterAutoCreateTopic,
},
}

for _, test := range tests {
Expand Down Expand Up @@ -698,6 +703,40 @@ func testWriterUnexpectedMessageTopic(t *testing.T) {
}
}

func testWriterAutoCreateTopic(t *testing.T) {
topic := makeTopic()
// Assume it's going to get created.
defer deleteTopic(t, topic)

w := newTestWriter(WriterConfig{
Topic: topic,
Balancer: &RoundRobin{},
})
defer w.Close()

msg := Message{Key: []byte("key"), Value: []byte("Hello World")}

var err error
const retries = 5
for i := 0; i < retries; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err = w.WriteMessages(ctx, msg)
if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
time.Sleep(time.Millisecond * 250)
continue
}

if err != nil {
t.Errorf("unexpected error %v", err)
return
}
}
if err != nil {
t.Errorf("unable to create topic %v", err)
}
}

type staticBalancer struct {
partition int
}
Expand Down

0 comments on commit ea83b29

Please sign in to comment.