Skip to content

Commit

Permalink
consuming: add HookFetchBatchRead
Browse files Browse the repository at this point in the history
This is the counterpart to the prior commit, and can be used for the
same purposes except now via fetching. There are a few additional
caveats with the numbers here, which are documented in the
FetchBytesRead struct.
  • Loading branch information
twmb committed Jun 13, 2021
1 parent 9810427 commit fa1fd35
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 22 deletions.
63 changes: 63 additions & 0 deletions pkg/kgo/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,21 @@ type ProduceBatchMetrics struct {

// UncompressedBytes is the number of bytes the records serialized as
// before compression.
//
// For record batches (Kafka v0.11.0+), this is the size of the records
// in a batch, and does not include record batch overhead.
//
// For message sets, this size includes message set overhead.
UncompressedBytes int

// CompressedBytes is the number of bytes actually written for this
// batch, after compression. If compression is not used, this will be
// equal to UncompresedBytes.
//
// For record batches, this is the size of the compressed records, and
// does not include record batch overhead.
//
// For message sets, this is the size of the compressed message set.
CompressedBytes int

// CompressionType signifies which algorithm the batch was compressed
Expand All @@ -204,3 +214,56 @@ type HookProduceBatchWritten interface {
// topic partition
OnProduceBatchWritten(meta BrokerMetadata, topic string, partition int32, metrics ProduceBatchMetrics)
}

// FetchBatchMetrics tracks information about fetches of batches.
type FetchBatchMetrics struct {
// NumRecords is the number of records that were fetched in this batch.
//
// Note that this number includes transaction markers, which are not
// actually returned to the user.
//
// If the batch has an encoding error, this will be 0.
NumRecords int

// UncompressedBytes is the number of bytes the records deserialized
// into after decompresion.
//
// For record batches (Kafka v0.11.0+), this is the size of the records
// in a batch, and does not include record batch overhead.
//
// For message sets, this size includes message set overhead.
//
// Note that this number may be higher than the corresponding number
// when producing, because as an "optimization", Kafka can return
// partial batches when fetching.
UncompressedBytes int

// CompressedBytes is the number of bytes actually read for this batch,
// before decompression. If the batch was not compressed, this will be
// equal to UncompressedBytes.
//
// For record batches, this is the size of the compressed records, and
// does not include record batch overhead.
//
// For message sets, this is the size of the compressed message set.
CompressedBytes int

// CompressionType signifies which algorithm the batch was compressed
// with.
//
// 0 is no compression, 1 is gzip, 2 is snappy, 3 is lz4, and 4 is
// zstd.
CompressionType uint8
}

// HookFetchBatchRead is called whenever a batch if read within the client.
//
// Note that this hook is called when processing, but a batch may be internally
// discarded after processing in some uncommon specific circumstances.
//
// If the client reads v0 or v1 message sets, and they are not compressed, then
// this hook will be called per record.
type HookFetchBatchRead interface {
// OnFetchBatchRead is called per batch read from a topic partition.
OnFetchBatchRead(meta BrokerMetadata, topic string, partition int32, metrics FetchBatchMetrics)
}
72 changes: 50 additions & 22 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct
// Processing the response only needs the source's nodeID and client.
go func() {
defer close(handled)
fetch, reloadOffsets, preferreds, updateMeta = s.handleReqResp(req, resp)
fetch, reloadOffsets, preferreds, updateMeta = s.handleReqResp(br, req, resp)
}()

select {
Expand Down Expand Up @@ -676,7 +676,7 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct
// the source mutex.
//
// This function, and everything it calls, is side effect free.
func (s *source) handleReqResp(req *fetchRequest, resp *kmsg.FetchResponse) (Fetch, listOrEpochLoads, cursorPreferreds, bool) {
func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchResponse) (Fetch, listOrEpochLoads, cursorPreferreds, bool) {
var (
f = Fetch{
Topics: make([]FetchTopic, 0, len(resp.Topics)),
Expand Down Expand Up @@ -719,7 +719,7 @@ func (s *source) handleReqResp(req *fetchRequest, resp *kmsg.FetchResponse) (Fet
continue
}

fetchTopic.Partitions = append(fetchTopic.Partitions, partOffset.processRespPartition(resp.Version, rp, s.cl.decompressor))
fetchTopic.Partitions = append(fetchTopic.Partitions, partOffset.processRespPartition(br, resp.Version, rp, s.cl.decompressor, s.cl.cfg.hooks))
fp := &fetchTopic.Partitions[len(fetchTopic.Partitions)-1]
updateMeta = updateMeta || fp.Err != nil

Expand Down Expand Up @@ -815,7 +815,7 @@ func (s *source) handleReqResp(req *fetchRequest, resp *kmsg.FetchResponse) (Fet

// processRespPartition processes all records in all potentially compressed
// batches (or message sets).
func (o *cursorOffsetNext) processRespPartition(version int16, rp *kmsg.FetchResponseTopicPartition, decompressor *decompressor) FetchPartition {
func (o *cursorOffsetNext) processRespPartition(br *broker, version int16, rp *kmsg.FetchResponseTopicPartition, decompressor *decompressor, hooks hooks) FetchPartition {
fp := FetchPartition{
Partition: rp.Partition,
Err: kerr.ErrorForCode(rp.ErrorCode),
Expand Down Expand Up @@ -917,14 +917,33 @@ func (o *cursorOffsetNext) processRespPartition(version int16, rp *kmsg.FetchRes

in = in[length:]

var m FetchBatchMetrics

switch t := r.(type) {
case *kmsg.MessageV0:
o.processV0OuterMessage(&fp, t, decompressor)
m.CompressedBytes = int(length) // for message sets, we include the message set overhead in length
m.CompressionType = uint8(t.Attributes) & 0b0000_0011
m.NumRecords, m.UncompressedBytes = o.processV0OuterMessage(&fp, t, decompressor)

case *kmsg.MessageV1:
o.processV1OuterMessage(&fp, t, decompressor)
m.CompressedBytes = int(length)
m.CompressionType = uint8(t.Attributes) & 0b0000_0011
m.NumRecords, m.UncompressedBytes = o.processV1OuterMessage(&fp, t, decompressor)

case *kmsg.RecordBatch:
o.processRecordBatch(&fp, t, aborter, decompressor)
m.CompressedBytes = len(t.Records) // for record batches, we only track the record batch length
m.CompressionType = uint8(t.Attributes) & 0b0000_0111
m.NumRecords, m.UncompressedBytes = o.processRecordBatch(&fp, t, aborter, decompressor)
}

if m.UncompressedBytes == 0 {
m.UncompressedBytes = m.CompressedBytes
}
hooks.each(func(h Hook) {
if h, ok := h.(HookFetchBatchRead); ok {
h.OnFetchBatchRead(br.meta, o.from.topic, o.from.partition, m)
}
})
}

return fp
Expand Down Expand Up @@ -995,27 +1014,29 @@ func (o *cursorOffsetNext) processRecordBatch(
batch *kmsg.RecordBatch,
aborter aborter,
decompressor *decompressor,
) {
) (int, int) {
if batch.Magic != 2 {
fp.Err = fmt.Errorf("unknown batch magic %d", batch.Magic)
return
return 0, 0
}
lastOffset := batch.FirstOffset + int64(batch.LastOffsetDelta)
if lastOffset < o.offset {
// If the last offset in this batch is less than what we asked
// for, we got a batch that we entirely do not need. We can
// avoid all work (although we should not get this batch).
return
return 0, 0
}

rawRecords := batch.Records
if compression := byte(batch.Attributes & 0x0007); compression != 0 {
var err error
if rawRecords, err = decompressor.decompress(rawRecords, compression); err != nil {
return // truncated batch
return 0, 0 // truncated batch
}
}

uncompressedBytes := len(rawRecords)

numRecords := int(batch.NumRecords)
krecords := readRawRecords(numRecords, rawRecords)

Expand Down Expand Up @@ -1056,6 +1077,7 @@ func (o *cursorOffsetNext) processRecordBatch(
}
}

return len(krecords), uncompressedBytes
}

// Processes an outer v1 message. There could be no inner message, which makes
Expand All @@ -1066,18 +1088,20 @@ func (o *cursorOffsetNext) processV1OuterMessage(
fp *FetchPartition,
message *kmsg.MessageV1,
decompressor *decompressor,
) {
) (int, int) {
compression := byte(message.Attributes & 0x0003)
if compression == 0 {
o.processV1Message(fp, message)
return
return 1, 0
}

rawInner, err := decompressor.decompress(message.Value, compression)
if err != nil {
return // truncated batch
return 0, 0 // truncated batch
}

uncompressedBytes := len(rawInner)

var innerMessages []readerFrom
out:
for len(rawInner) > 17 { // magic at byte 17
Expand Down Expand Up @@ -1128,7 +1152,7 @@ out:
rawInner = rawInner[length:]
}
if len(innerMessages) == 0 {
return
return 0, uncompressedBytes
}

firstOffset := message.Offset - int64(len(innerMessages)) + 1
Expand All @@ -1138,15 +1162,16 @@ out:
case *kmsg.MessageV0:
innerMessage.Offset = firstOffset + int64(i)
if !o.processV0Message(fp, innerMessage) {
return
return i, uncompressedBytes
}
case *kmsg.MessageV1:
innerMessage.Offset = firstOffset + int64(i)
if !o.processV1Message(fp, innerMessage) {
return
return i, uncompressedBytes
}
}
}
return len(innerMessages), uncompressedBytes
}

func (o *cursorOffsetNext) processV1Message(
Expand All @@ -1172,18 +1197,20 @@ func (o *cursorOffsetNext) processV0OuterMessage(
fp *FetchPartition,
message *kmsg.MessageV0,
decompressor *decompressor,
) {
) (int, int) {
compression := byte(message.Attributes & 0x0003)
if compression == 0 {
o.processV0Message(fp, message)
return
return 1, 0 // uncompressed bytes is 0; set to compressed bytes on return
}

rawInner, err := decompressor.decompress(message.Value, compression)
if err != nil {
return // truncated batch
return 0, 0 // truncated batch
}

uncompressedBytes := len(rawInner)

var innerMessages []kmsg.MessageV0
for len(rawInner) > 17 { // magic at byte 17
length := int32(binary.BigEndian.Uint32(rawInner[8:]))
Expand All @@ -1208,17 +1235,18 @@ func (o *cursorOffsetNext) processV0OuterMessage(
rawInner = rawInner[length:]
}
if len(innerMessages) == 0 {
return
return 0, uncompressedBytes
}

firstOffset := message.Offset - int64(len(innerMessages)) + 1
for i := range innerMessages {
innerMessage := &innerMessages[i]
innerMessage.Offset = firstOffset + int64(i)
if !o.processV0Message(fp, innerMessage) {
return
return i, uncompressedBytes
}
}
return len(innerMessages), uncompressedBytes
}

func (o *cursorOffsetNext) processV0Message(
Expand Down

0 comments on commit fa1fd35

Please sign in to comment.