Skip to content

Commit

Permalink
Account for possible offset skew
Browse files Browse the repository at this point in the history
  • Loading branch information
horkhe committed Feb 8, 2018
1 parent 541689b commit d6298f0
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 24 deletions.
20 changes: 12 additions & 8 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,10 +524,20 @@ func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMes
var messages []*ConsumerMessage
var incomplete bool
prelude := true
originalOffset := child.offset

// Compacted records can lie about their offset delta, but that can be
// detected comparing the last offset delta from the record batch with
// actual offset delta of the last record. The difference (skew) should be
// applied to all records in the batch.
var offsetSkew int64
recordCount := len(batch.Records)
if recordCount > 0 {
lastRecordOffsetDelta := batch.Records[recordCount-1].OffsetDelta
offsetSkew = int64(batch.LastOffsetDelta) - lastRecordOffsetDelta
}

for _, rec := range batch.Records {
offset := batch.FirstOffset + rec.OffsetDelta
offset := batch.FirstOffset + rec.OffsetDelta + offsetSkew
if prelude && offset < child.offset {
continue
}
Expand All @@ -552,12 +562,6 @@ func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMes
if incomplete {
return nil, ErrIncompleteResponse
}

child.offset = batch.FirstOffset + int64(batch.LastOffsetDelta) + 1
if child.offset <= originalOffset {
return nil, ErrConsumerOffsetNotAdvanced
}

return messages, nil
}

Expand Down
2 changes: 1 addition & 1 deletion record_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (b *RecordBatch) encode(pe packetEncoder) error {
pe.putInt8(b.Version)
pe.push(newCRC32Field(crcCastagnoli))
pe.putInt16(b.computeAttributes())
pe.putInt32(int32(len(b.Records)))
pe.putInt32(b.LastOffsetDelta)

if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
return err
Expand Down
34 changes: 19 additions & 15 deletions record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ var recordBatchTestCases = []struct {
{
name: "uncompressed record",
batch: RecordBatch{
Version: 2,
FirstTimestamp: time.Unix(1479847795, 0),
MaxTimestamp: time.Unix(0, 0),
Version: 2,
LastOffsetDelta: 1,
FirstTimestamp: time.Unix(1479847795, 0),
MaxTimestamp: time.Unix(0, 0),
Records: []*Record{{
TimestampDelta: 5 * time.Millisecond,
Key: []byte{1, 2, 3, 4},
Expand Down Expand Up @@ -115,10 +116,11 @@ var recordBatchTestCases = []struct {
{
name: "gzipped record",
batch: RecordBatch{
Version: 2,
Codec: CompressionGZIP,
FirstTimestamp: time.Unix(1479847795, 0),
MaxTimestamp: time.Unix(0, 0),
Version: 2,
Codec: CompressionGZIP,
LastOffsetDelta: 1,
FirstTimestamp: time.Unix(1479847795, 0),
MaxTimestamp: time.Unix(0, 0),
Records: []*Record{{
TimestampDelta: 5 * time.Millisecond,
Key: []byte{1, 2, 3, 4},
Expand Down Expand Up @@ -168,10 +170,11 @@ var recordBatchTestCases = []struct {
{
name: "snappy compressed record",
batch: RecordBatch{
Version: 2,
Codec: CompressionSnappy,
FirstTimestamp: time.Unix(1479847795, 0),
MaxTimestamp: time.Unix(0, 0),
Version: 2,
Codec: CompressionSnappy,
LastOffsetDelta: 1,
FirstTimestamp: time.Unix(1479847795, 0),
MaxTimestamp: time.Unix(0, 0),
Records: []*Record{{
TimestampDelta: 5 * time.Millisecond,
Key: []byte{1, 2, 3, 4},
Expand Down Expand Up @@ -203,10 +206,11 @@ var recordBatchTestCases = []struct {
{
name: "lz4 compressed record",
batch: RecordBatch{
Version: 2,
Codec: CompressionLZ4,
FirstTimestamp: time.Unix(1479847795, 0),
MaxTimestamp: time.Unix(0, 0),
Version: 2,
Codec: CompressionLZ4,
LastOffsetDelta: 1,
FirstTimestamp: time.Unix(1479847795, 0),
MaxTimestamp: time.Unix(0, 0),
Records: []*Record{{
TimestampDelta: 5 * time.Millisecond,
Key: []byte{1, 2, 3, 4},
Expand Down

0 comments on commit d6298f0

Please sign in to comment.