Skip to content

Commit

Permalink
Minimize diff with master
Browse files Browse the repository at this point in the history
  • Loading branch information
bobrik committed Jan 19, 2018
1 parent d7d2bb7 commit c8284bb
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 74 deletions.
35 changes: 22 additions & 13 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,10 +570,18 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
return nil, block.Err
}

if block.numRecords() == 0 {
nRecs, err := block.numRecords()
if err != nil {
return nil, err
}
if nRecs == 0 {
partialTrailingMessage, err := block.isPartial()
if err != nil {
return nil, err
}
// We got no messages. If we got a trailing one then we need to ask for more data.
// Otherwise we just poll again and wait for one to be produced...
if block.isPartial() {
if partialTrailingMessage {
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
// we can't ask for more data, we've hit the configured limit
child.sendError(ErrMessageTooLarge)
Expand All @@ -594,27 +602,28 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)

messages := []*ConsumerMessage{}
for _, chunk := range block.RecordsSet {
if chunk.msgSet != nil {
messageSetMessages, err := child.parseMessages(chunk.msgSet)
for _, records := range block.RecordsSet {
if control, err := records.isControl(); err != nil || control {
continue
}

switch records.recordsType {
case legacyRecords:
messageSetMessages, err := child.parseMessages(records.msgSet)
if err != nil {
return nil, err
}

messages = append(messages, messageSetMessages...)
}

if chunk.recordBatch != nil {
if chunk.recordBatch.Control {
continue
}

recordBatchMessages, err := child.parseRecords(chunk.recordBatch)
case defaultRecords:
recordBatchMessages, err := child.parseRecords(records.recordBatch)
if err != nil {
return nil, err
}

messages = append(messages, recordBatchMessages...)
default:
return nil, fmt.Errorf("unknown records type: %v", records.recordsType)
}
}

Expand Down
57 changes: 39 additions & 18 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error)
break
}

chunk := &Records{}
if err := chunk.decode(recordsDecoder); err != nil {
// If we have at least one decoded record chunk, this is not an error
records := &Records{}
if err := records.decode(recordsDecoder); err != nil {
// If we have at least one decoded records, this is not an error
if err == ErrInsufficientData {
if len(b.RecordsSet) == 0 {
b.Partial = true
Expand All @@ -102,29 +102,47 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error)
return err
}

// If we have at least one full record chunk, we skip incomplete ones
if chunk.isPartial() && len(b.RecordsSet) > 0 {
partial, err := records.isPartial()
if err != nil {
return err
}

// If we have at least one full records, we skip incomplete ones
if partial && len(b.RecordsSet) > 0 {
break
}

b.RecordsSet = append(b.RecordsSet, chunk)
b.RecordsSet = append(b.RecordsSet, records)
}

return nil
}

func (b *FetchResponseBlock) numRecords() int {
s := 0
func (b *FetchResponseBlock) numRecords() (int, error) {
sum := 0

for _, chunk := range b.RecordsSet {
s += chunk.numRecords()
for _, records := range b.RecordsSet {
count, err := records.numRecords()
if err != nil {
return 0, err
}

sum += count
}

return s
return sum, nil
}

func (b *FetchResponseBlock) isPartial() bool {
return b.Partial || len(b.RecordsSet) == 1 && b.RecordsSet[0].isPartial()
func (b *FetchResponseBlock) isPartial() (bool, error) {
if b.Partial {
return true, nil
}

if len(b.RecordsSet) == 1 {
return b.RecordsSet[0].isPartial()
}

return false, nil
}

func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
Expand All @@ -146,8 +164,8 @@ func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error)
}

pe.push(&lengthField{})
for _, chunk := range b.RecordsSet {
err = chunk.encode(pe)
for _, records := range b.RecordsSet {
err = records.encode(pe)
if err != nil {
return err
}
Expand Down Expand Up @@ -331,7 +349,8 @@ func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Enc
msg := &Message{Key: kb, Value: vb}
msgBlock := &MessageBlock{Msg: msg, Offset: offset}
if len(frb.RecordsSet) == 0 {
frb.RecordsSet = []*Records{&Records{msgSet: &MessageSet{}}}
records := newLegacyRecords(&MessageSet{})
frb.RecordsSet = []*Records{&records}
}
set := frb.RecordsSet[0].msgSet
set.Messages = append(set.Messages, msgBlock)
Expand All @@ -342,7 +361,8 @@ func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Enco
kb, vb := encodeKV(key, value)
rec := &Record{Key: kb, Value: vb, OffsetDelta: offset}
if len(frb.RecordsSet) == 0 {
frb.RecordsSet = []*Records{&Records{recordBatch: &RecordBatch{Version: 2}}}
records := newDefaultRecords(&RecordBatch{Version: 2})
frb.RecordsSet = []*Records{&records}
}
batch := frb.RecordsSet[0].recordBatch
batch.addRecord(rec)
Expand All @@ -351,7 +371,8 @@ func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Enco
func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
frb := r.getOrCreateBlock(topic, partition)
if len(frb.RecordsSet) == 0 {
frb.RecordsSet = []*Records{&Records{recordBatch: &RecordBatch{Version: 2}}}
records := newDefaultRecords(&RecordBatch{Version: 2})
frb.RecordsSet = []*Records{&records}
}
batch := frb.RecordsSet[0].recordBatch
batch.LastOffsetDelta = offset
Expand Down
30 changes: 24 additions & 6 deletions fetch_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,18 @@ func TestOneMessageFetchResponse(t *testing.T) {
if block.HighWaterMarkOffset != 0x10101010 {
t.Error("Decoding didn't produce correct high water mark offset.")
}
partial := block.RecordsSet[0].isPartial()
partial, err := block.isPartial()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if partial {
t.Error("Decoding detected a partial trailing message where there wasn't one.")
}

n := block.RecordsSet[0].numRecords()
n, err := block.numRecords()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if n != 1 {
t.Fatal("Decoding produced incorrect number of messages.")
}
Expand Down Expand Up @@ -164,12 +170,18 @@ func TestOneRecordFetchResponse(t *testing.T) {
if block.HighWaterMarkOffset != 0x10101010 {
t.Error("Decoding didn't produce correct high water mark offset.")
}
partial := block.RecordsSet[0].isPartial()
partial, err := block.isPartial()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if partial {
t.Error("Decoding detected a partial trailing record where there wasn't one.")
}

n := block.RecordsSet[0].numRecords()
n, err := block.numRecords()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if n != 1 {
t.Fatal("Decoding produced incorrect number of records.")
}
Expand Down Expand Up @@ -204,12 +216,18 @@ func TestOneMessageFetchResponseV4(t *testing.T) {
if block.HighWaterMarkOffset != 0x10101010 {
t.Error("Decoding didn't produce correct high water mark offset.")
}
partial := block.RecordsSet[0].isPartial()
partial, err := block.isPartial()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if partial {
t.Error("Decoding detected a partial trailing record where there wasn't one.")
}

n := block.RecordsSet[0].numRecords()
n, err := block.numRecords()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if n != 1 {
t.Fatal("Decoding produced incorrect number of records.")
}
Expand Down
6 changes: 3 additions & 3 deletions produce_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,18 +235,18 @@ func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message)

if set == nil {
set = new(MessageSet)
r.records[topic][partition] = Records{msgSet: set}
r.records[topic][partition] = newLegacyRecords(set)
}

set.addMessage(msg)
}

func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet) {
r.ensureRecords(topic, partition)
r.records[topic][partition] = Records{msgSet: set}
r.records[topic][partition] = newLegacyRecords(set)
}

func (r *ProduceRequest) AddBatch(topic string, partition int32, batch *RecordBatch) {
r.ensureRecords(topic, partition)
r.records[topic][partition] = Records{recordBatch: batch}
r.records[topic][partition] = newDefaultRecords(batch)
}
4 changes: 2 additions & 2 deletions produce_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
ProducerID: -1, /* No producer id */
Codec: ps.parent.conf.Producer.Compression,
}
set = &partitionSet{recordsToSend: Records{recordBatch: batch}}
set = &partitionSet{recordsToSend: newDefaultRecords(batch)}
size = recordBatchOverhead
} else {
set = &partitionSet{recordsToSend: Records{msgSet: &MessageSet{}}}
set = &partitionSet{recordsToSend: newLegacyRecords(new(MessageSet))}
}
partitions[msg.Partition] = set
}
Expand Down
Loading

0 comments on commit c8284bb

Please sign in to comment.