Skip to content

Commit

Permalink
Added SendPartialBatch Setting
Browse files Browse the repository at this point in the history
Send Partial Batch will force partial batches to send right away when
calling Writer.WriteMessages instead of blocking until a full batch is
made or the timeout is seen.
  • Loading branch information
Paul Montag committed Oct 17, 2024
1 parent a8e5eab commit 8d701a8
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 18 deletions.
69 changes: 51 additions & 18 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ type Writer struct {
BatchBytes int64

// Time limit on how often incomplete message batches will be flushed to
// kafka.
// kafka. This is ignored if SendPartialBatch is set to true
//
// The default is to flush at least every second.
BatchTimeout time.Duration
Expand Down Expand Up @@ -158,6 +158,13 @@ type Writer struct {
// Defaults to false.
Async bool

// SendPartialBatch forces WriteMessages to send a partial batch instead of
// blocking until a full batch is made. This is useful if you are already batching
// messages before the producer, and want to flush everything sent to WriteMessages
// without blocking while also still being synchronous. When set BatchTimeout does
// nothing.
SendPartialBatch bool

// An optional function called when the writer succeeds or fails the
// delivery of messages to a kafka partition. When writing the messages
// fails, the `err` parameter will be non-nil.
Expand Down Expand Up @@ -276,6 +283,13 @@ type WriterConfig struct {
// The default is to use a kafka default value of 1048576.
BatchBytes int

// SendPartialBatch forces WriteMessages to send a partial batch instead of
// blocking until a full batch is made. This is useful if you are already batching
// messages before the producer, and want to flush everything sent to WriteMessages
// without blocking while also still being synchronous. When set BatchTimeout does
// nothing.
SendPartialBatch bool

// Time limit on how often incomplete message batches will be flushed to
// kafka.
//
Expand Down Expand Up @@ -487,22 +501,23 @@ func NewWriter(config WriterConfig) *Writer {
}

w := &Writer{
Addr: TCP(config.Brokers...),
Topic: config.Topic,
MaxAttempts: config.MaxAttempts,
BatchSize: config.BatchSize,
Balancer: config.Balancer,
BatchBytes: int64(config.BatchBytes),
BatchTimeout: config.BatchTimeout,
ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
RequiredAcks: RequiredAcks(config.RequiredAcks),
Async: config.Async,
Logger: config.Logger,
ErrorLogger: config.ErrorLogger,
Transport: transport,
transport: transport,
writerStats: stats,
Addr: TCP(config.Brokers...),
Topic: config.Topic,
MaxAttempts: config.MaxAttempts,
BatchSize: config.BatchSize,
Balancer: config.Balancer,
BatchBytes: int64(config.BatchBytes),
BatchTimeout: config.BatchTimeout,
SendPartialBatch: config.SendPartialBatch,
ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
RequiredAcks: RequiredAcks(config.RequiredAcks),
Async: config.Async,
Logger: config.Logger,
ErrorLogger: config.ErrorLogger,
Transport: transport,
transport: transport,
writerStats: stats,
}

if config.RequiredAcks == 0 {
Expand Down Expand Up @@ -1059,13 +1074,26 @@ func (ptw *partitionWriter) writeMessages(msgs []Message, indexes []int32) map[*
batches[batch] = append(batches[batch], i)
}
}

// if we are sending partial batches and the current batch is not empty send
// the batch right away instead of lagging.
if ptw.w.SendPartialBatch && !ptw.currBatch.empty() {
ptw.currBatch.trigger()
ptw.queue.Put(ptw.currBatch)
ptw.currBatch = nil
}

return batches
}

// ptw.w can be accessed here because this is called with the lock ptw.mutex already held.
func (ptw *partitionWriter) newWriteBatch() *writeBatch {
batch := newWriteBatch(time.Now(), ptw.w.batchTimeout())
ptw.w.spawn(func() { ptw.awaitBatch(batch) })
// if we are sending partial batches we don't need to wait for a timeout
if !ptw.w.SendPartialBatch {
ptw.w.spawn(func() { ptw.awaitBatch(batch) })
}

return batch
}

Expand Down Expand Up @@ -1239,6 +1267,11 @@ func (b *writeBatch) full(maxSize int, maxBytes int64) bool {
return b.size >= maxSize || b.bytes >= maxBytes
}

// empty returns if the batch has no data in it at all
func (b *writeBatch) empty() bool {
return b == nil || b.size == 0
}

func (b *writeBatch) trigger() {
close(b.ready)
}
Expand Down
55 changes: 55 additions & 0 deletions writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,61 @@ func readPartition(topic string, partition int, offset int64) (msgs []Message, e
}
}

func testWriterPartailBatch(t *testing.T) {
topic := makeTopic()
createTopic(t, topic, 1)
defer deleteTopic(t, topic)

offset, err := readOffset(topic, 0)
if err != nil {
t.Fatal(err)
}

w := newTestWriter(WriterConfig{
Topic: topic,
BatchBytes: 75,
SendPartialBatch: true,
Balancer: &RoundRobin{},
})
defer w.Close()

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := w.WriteMessages(ctx, []Message{
// first batch
{Value: []byte("M0")}, // 25 Bytes
{Value: []byte("M1")}, // 25 Bytes
{Value: []byte("M2")}, // 25 Bytes
// second batch
{Value: []byte("M3")}, // 25 Bytes
}...); err != nil {
t.Error(err)
return
}

if w.Stats().Writes != 2 {
t.Error("didn't create expected batches")
return
}
msgs, err := readPartition(topic, 0, offset)
if err != nil {
t.Error("error reading partition", err)
return
}

if len(msgs) != 4 {
t.Error("bad messages in partition", msgs)
return
}

for i, m := range msgs {
if string(m.Value) == "M"+strconv.Itoa(i) {
continue
}
t.Error("bad messages in partition", string(m.Value))
}
}

func testWriterBatchBytes(t *testing.T) {
topic := makeTopic()
createTopic(t, topic, 1)
Expand Down

0 comments on commit 8d701a8

Please sign in to comment.