Skip to content

Commit

Permalink
franz-go: support 64 bit timestamp deltas
Browse files Browse the repository at this point in the history
Most importantly, in a backwards compatible way.
  • Loading branch information
twmb committed Jan 29, 2023
1 parent 4219922 commit 12e3c11
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 27 deletions.
6 changes: 6 additions & 0 deletions pkg/kbin/primitives.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ func UvarintLen(u uint32) int {
return int(uvarintLens[byte(bits.Len32(u))])
}

// VarlongLen returns how long i would be if it were varlong encoded.
func VarlongLen(i int64) int {
u := uint64(i)<<1 ^ uint64(i>>63)
return uvarlongLen(u)
}

func uvarlongLen(u uint64) int {
return int(uvarintLens[byte(bits.Len64(u))])
}
Expand Down
36 changes: 35 additions & 1 deletion pkg/kbin/primitives_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,35 @@ import (
"testing/quick"
)

func TestVarint(t *testing.T) {
if err := quick.Check(func(x int32) bool {
var expPut [10]byte
n := binary.PutVarint(expPut[:], int64(x))

gotPut := AppendVarint(nil, x)
if !bytes.Equal(expPut[:n], gotPut) {
return false
}
if len(gotPut) != n {
return false
}
if VarintLen(x) != n {
return false
}

expRead, expN := binary.Varint(expPut[:n])
gotRead, gotN := Varint(gotPut)

if expN != gotN || expRead != int64(gotRead) {
return false
}

return true
}, nil); err != nil {
t.Error(err)
}
}

func TestUvarint(t *testing.T) {
if err := quick.Check(func(u uint32) bool {
var expPut [10]byte
Expand Down Expand Up @@ -38,7 +67,12 @@ func TestVarlong(t *testing.T) {

gotPut := AppendVarlong(nil, x)
if !bytes.Equal(expPut[:n], gotPut) {
fmt.Println(expPut[:n], gotPut)
return false
}
if len(gotPut) != n {
return false
}
if VarlongLen(x) != n {
return false
}

Expand Down
37 changes: 22 additions & 15 deletions pkg/kgo/produce_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ func TestPromisedRecAppendTo(t *testing.T) {
{Key: "header key 1", Value: []byte("header value 1")},
{Key: "header key 2", Value: []byte("header value 2")},
},
Offset: 1<<32 | 2,
LeaderEpoch: 1,
Offset: 2,
},
}

Expand Down Expand Up @@ -103,14 +104,16 @@ func TestRecBatchAppendTo(t *testing.T) {
{"header key 1", []byte("header value 1")},
{"header key 2", []byte("header value 2")},
},
Offset: 1<<32 | 2,
LeaderEpoch: 1,
Offset: 2,
},
},
{
Record: &Record{
Key: []byte("key 2"),
Value: []byte("value 2"),
Offset: 3<<32 | 4,
Key: []byte("key 2"),
Value: []byte("value 2"),
LeaderEpoch: 3,
Offset: 4,
},
},
},
Expand Down Expand Up @@ -274,16 +277,18 @@ func TestMessageSetAppendTo(t *testing.T) {
records: []promisedRec{
{
Record: &Record{
Key: []byte("loooooong key 1"),
Value: []byte("loooooong value 1"),
Offset: 1 << 32,
Key: []byte("loooooong key 1"),
Value: []byte("loooooong value 1"),
LeaderEpoch: 1,
Offset: 0,
},
},
{
Record: &Record{
Key: []byte("loooooong key 2"),
Value: []byte("loooooong value 2"),
Offset: 3<<32 | 1,
Key: []byte("loooooong key 2"),
Value: []byte("loooooong value 2"),
LeaderEpoch: 3,
Offset: 1,
},
},
},
Expand Down Expand Up @@ -376,14 +381,16 @@ func BenchmarkAppendBatch(b *testing.B) {
{"header key 1", []byte("header value 1")},
{"header key 2", []byte("header value 2")},
},
Offset: 1<<32 | 2,
LeaderEpoch: 1,
Offset: 2,
},
},
{
Record: &Record{
Key: []byte("key 2"),
Value: bytes.Repeat([]byte("value 2"), 1000),
Offset: 3<<32 | 4,
Key: []byte("key 2"),
Value: bytes.Repeat([]byte("value 2"), 1000),
LeaderEpoch: 3,
Offset: 4,
},
},
},
Expand Down
1 change: 1 addition & 0 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ func (p *producer) finishPromises(b batchPromise) {
start:
p.promisesMu.Lock()
for i, pr := range b.recs {
pr.LeaderEpoch = 0
pr.Offset = b.baseOffset + int64(i)
pr.Partition = b.partition
pr.ProducerID = b.pid
Expand Down
9 changes: 5 additions & 4 deletions pkg/kgo/record_and_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,13 @@ type Record struct {
// When buffering records, we calculate the length and tsDelta ahead of time
// (also because number width affects encoding length). We repurpose the Offset
// field to save space.
func (r *Record) setLengthAndTimestampDelta(length, tsDelta int32) {
r.Offset = int64(uint64(uint32(length))<<32 | uint64(uint32(tsDelta)))
func (r *Record) setLengthAndTimestampDelta(length int32, tsDelta int64) {
r.LeaderEpoch = length
r.Offset = tsDelta
}

func (r *Record) lengthAndTimestampDelta() (length, tsDelta int32) {
return int32(uint32(uint64(r.Offset) >> 32)), int32(uint32(uint64(r.Offset)))
func (r *Record) lengthAndTimestampDelta() (length int32, tsDelta int64) {
return r.LeaderEpoch, r.Offset
}

// AppendFormat appends a record to b given the layout or returns an error if
Expand Down
12 changes: 6 additions & 6 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -1376,7 +1376,7 @@ type recBatch struct {

attrs int16 // updated during apending; read and converted to RecordAttrs on success
firstTimestamp int64 // since unix epoch, in millis
maxTimestampDelta int32
maxTimestampDelta int64

mu sync.Mutex // guards appendTo's reading of records against failAllRecords emptying it
records []promisedRec // record w/ length, ts calculated
Expand Down Expand Up @@ -1775,7 +1775,7 @@ func messageSet1Length(r *Record) int32 {
// Returns the numbers for a record if it were added to the record batch.
func (b *recBatch) calculateRecordNumbers(r *Record) recordNumbers {
tsMillis := r.Timestamp.UnixNano() / 1e6
tsDelta := int32(tsMillis - b.firstTimestamp)
tsDelta := tsMillis - b.firstTimestamp

// If this is to be the first record in the batch, then our timestamp
// delta is actually 0.
Expand All @@ -1786,7 +1786,7 @@ func (b *recBatch) calculateRecordNumbers(r *Record) recordNumbers {
offsetDelta := int32(len(b.records)) // since called before adding record, delta is the current end

l := 1 + // attributes, int8 unused
kbin.VarintLen(tsDelta) +
kbin.VarlongLen(tsDelta) +
kbin.VarintLen(offsetDelta) +
kbin.VarintLen(int32(len(r.Key))) +
len(r.Key) +
Expand All @@ -1813,7 +1813,7 @@ func uvarlen(l int) int32 { return int32(kbin.UvarintLen(uvar32(int32(l)))) }
// recordNumbers tracks a few numbers for a record that is buffered.
type recordNumbers struct {
lengthField int32 // the length field prefix of a record encoded on the wire
tsDelta int32 // the ms delta of when the record was added against the first timestamp
tsDelta int64 // the ms delta of when the record was added against the first timestamp
}

// wireLength is the wire length of a record including its length field prefix.
Expand Down Expand Up @@ -2036,7 +2036,7 @@ func (b seqRecBatch) appendTo(
dst = kbin.AppendInt16(dst, b.attrs)
dst = kbin.AppendInt32(dst, int32(len(b.records)-1)) // lastOffsetDelta
dst = kbin.AppendInt64(dst, b.firstTimestamp)
dst = kbin.AppendInt64(dst, b.firstTimestamp+int64(b.maxTimestampDelta))
dst = kbin.AppendInt64(dst, b.firstTimestamp+b.maxTimestampDelta)

seq := b.seq
if producerID < 0 { // a negative producer ID means we are not using idempotence
Expand Down Expand Up @@ -2092,7 +2092,7 @@ func (pr promisedRec) appendTo(dst []byte, offsetDelta int32) []byte {
length, tsDelta := pr.lengthAndTimestampDelta()
dst = kbin.AppendVarint(dst, length)
dst = kbin.AppendInt8(dst, 0) // attributes, currently unused
dst = kbin.AppendVarint(dst, tsDelta)
dst = kbin.AppendVarlong(dst, tsDelta)
dst = kbin.AppendVarint(dst, offsetDelta)
dst = kbin.AppendVarintBytes(dst, pr.Key)
dst = kbin.AppendVarintBytes(dst, pr.Value)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -1470,7 +1470,7 @@ func recordToRecord(
Offset: batch.FirstOffset + int64(record.OffsetDelta),
}
if r.Attrs.TimestampType() == 0 {
r.Timestamp = timeFromMillis(batch.FirstTimestamp + int64(record.TimestampDelta))
r.Timestamp = timeFromMillis(batch.FirstTimestamp + record.TimestampDelta64)
} else {
r.Timestamp = timeFromMillis(batch.MaxTimestamp)
}
Expand Down

0 comments on commit 12e3c11

Please sign in to comment.