From fa1fd358920c21c6e982601a5fa45ec6e256d2b5 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 12 Jun 2021 19:19:44 -0600 Subject: [PATCH] consuming: add HookFetchBatchRead This is the counterpart to the prior commit, and can be used for the same purposes except now via fetching. There are a few additional caveats with the numbers here, which are documented in the FetchBytesRead struct. --- pkg/kgo/hooks.go | 63 +++++++++++++++++++++++++++++++++++++++++ pkg/kgo/source.go | 72 ++++++++++++++++++++++++++++++++--------------- 2 files changed, 113 insertions(+), 22 deletions(-) diff --git a/pkg/kgo/hooks.go b/pkg/kgo/hooks.go index 599509da..8a9c32d2 100644 --- a/pkg/kgo/hooks.go +++ b/pkg/kgo/hooks.go @@ -182,11 +182,21 @@ type ProduceBatchMetrics struct { // UncompressedBytes is the number of bytes the records serialized as // before compression. + // + // For record batches (Kafka v0.11.0+), this is the size of the records + // in a batch, and does not include record batch overhead. + // + // For message sets, this size includes message set overhead. UncompressedBytes int // CompressedBytes is the number of bytes actually written for this // batch, after compression. If compression is not used, this will be // equal to UncompresedBytes. + // + // For record batches, this is the size of the compressed records, and + // does not include record batch overhead. + // + // For message sets, this is the size of the compressed message set. CompressedBytes int // CompressionType signifies which algorithm the batch was compressed @@ -204,3 +214,56 @@ type HookProduceBatchWritten interface { // topic partition OnProduceBatchWritten(meta BrokerMetadata, topic string, partition int32, metrics ProduceBatchMetrics) } + +// FetchBatchMetrics tracks information about fetches of batches. +type FetchBatchMetrics struct { + // NumRecords is the number of records that were fetched in this batch. + // + // Note that this number includes transaction markers, which are not + // actually returned to the user. + // + // If the batch has an encoding error, this will be 0. + NumRecords int + + // UncompressedBytes is the number of bytes the records deserialized + // into after decompresion. + // + // For record batches (Kafka v0.11.0+), this is the size of the records + // in a batch, and does not include record batch overhead. + // + // For message sets, this size includes message set overhead. + // + // Note that this number may be higher than the corresponding number + // when producing, because as an "optimization", Kafka can return + // partial batches when fetching. + UncompressedBytes int + + // CompressedBytes is the number of bytes actually read for this batch, + // before decompression. If the batch was not compressed, this will be + // equal to UncompressedBytes. + // + // For record batches, this is the size of the compressed records, and + // does not include record batch overhead. + // + // For message sets, this is the size of the compressed message set. + CompressedBytes int + + // CompressionType signifies which algorithm the batch was compressed + // with. + // + // 0 is no compression, 1 is gzip, 2 is snappy, 3 is lz4, and 4 is + // zstd. + CompressionType uint8 +} + +// HookFetchBatchRead is called whenever a batch if read within the client. +// +// Note that this hook is called when processing, but a batch may be internally +// discarded after processing in some uncommon specific circumstances. +// +// If the client reads v0 or v1 message sets, and they are not compressed, then +// this hook will be called per record. +type HookFetchBatchRead interface { + // OnFetchBatchRead is called per batch read from a topic partition. + OnFetchBatchRead(meta BrokerMetadata, topic string, partition int32, metrics FetchBatchMetrics) +} diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 42fe2b12..1e97d9cc 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -577,7 +577,7 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct // Processing the response only needs the source's nodeID and client. go func() { defer close(handled) - fetch, reloadOffsets, preferreds, updateMeta = s.handleReqResp(req, resp) + fetch, reloadOffsets, preferreds, updateMeta = s.handleReqResp(br, req, resp) }() select { @@ -676,7 +676,7 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct // the source mutex. // // This function, and everything it calls, is side effect free. -func (s *source) handleReqResp(req *fetchRequest, resp *kmsg.FetchResponse) (Fetch, listOrEpochLoads, cursorPreferreds, bool) { +func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchResponse) (Fetch, listOrEpochLoads, cursorPreferreds, bool) { var ( f = Fetch{ Topics: make([]FetchTopic, 0, len(resp.Topics)), @@ -719,7 +719,7 @@ func (s *source) handleReqResp(req *fetchRequest, resp *kmsg.FetchResponse) (Fet continue } - fetchTopic.Partitions = append(fetchTopic.Partitions, partOffset.processRespPartition(resp.Version, rp, s.cl.decompressor)) + fetchTopic.Partitions = append(fetchTopic.Partitions, partOffset.processRespPartition(br, resp.Version, rp, s.cl.decompressor, s.cl.cfg.hooks)) fp := &fetchTopic.Partitions[len(fetchTopic.Partitions)-1] updateMeta = updateMeta || fp.Err != nil @@ -815,7 +815,7 @@ func (s *source) handleReqResp(req *fetchRequest, resp *kmsg.FetchResponse) (Fet // processRespPartition processes all records in all potentially compressed // batches (or message sets). -func (o *cursorOffsetNext) processRespPartition(version int16, rp *kmsg.FetchResponseTopicPartition, decompressor *decompressor) FetchPartition { +func (o *cursorOffsetNext) processRespPartition(br *broker, version int16, rp *kmsg.FetchResponseTopicPartition, decompressor *decompressor, hooks hooks) FetchPartition { fp := FetchPartition{ Partition: rp.Partition, Err: kerr.ErrorForCode(rp.ErrorCode), @@ -917,14 +917,33 @@ func (o *cursorOffsetNext) processRespPartition(version int16, rp *kmsg.FetchRes in = in[length:] + var m FetchBatchMetrics + switch t := r.(type) { case *kmsg.MessageV0: - o.processV0OuterMessage(&fp, t, decompressor) + m.CompressedBytes = int(length) // for message sets, we include the message set overhead in length + m.CompressionType = uint8(t.Attributes) & 0b0000_0011 + m.NumRecords, m.UncompressedBytes = o.processV0OuterMessage(&fp, t, decompressor) + case *kmsg.MessageV1: - o.processV1OuterMessage(&fp, t, decompressor) + m.CompressedBytes = int(length) + m.CompressionType = uint8(t.Attributes) & 0b0000_0011 + m.NumRecords, m.UncompressedBytes = o.processV1OuterMessage(&fp, t, decompressor) + case *kmsg.RecordBatch: - o.processRecordBatch(&fp, t, aborter, decompressor) + m.CompressedBytes = len(t.Records) // for record batches, we only track the record batch length + m.CompressionType = uint8(t.Attributes) & 0b0000_0111 + m.NumRecords, m.UncompressedBytes = o.processRecordBatch(&fp, t, aborter, decompressor) + } + + if m.UncompressedBytes == 0 { + m.UncompressedBytes = m.CompressedBytes } + hooks.each(func(h Hook) { + if h, ok := h.(HookFetchBatchRead); ok { + h.OnFetchBatchRead(br.meta, o.from.topic, o.from.partition, m) + } + }) } return fp @@ -995,27 +1014,29 @@ func (o *cursorOffsetNext) processRecordBatch( batch *kmsg.RecordBatch, aborter aborter, decompressor *decompressor, -) { +) (int, int) { if batch.Magic != 2 { fp.Err = fmt.Errorf("unknown batch magic %d", batch.Magic) - return + return 0, 0 } lastOffset := batch.FirstOffset + int64(batch.LastOffsetDelta) if lastOffset < o.offset { // If the last offset in this batch is less than what we asked // for, we got a batch that we entirely do not need. We can // avoid all work (although we should not get this batch). - return + return 0, 0 } rawRecords := batch.Records if compression := byte(batch.Attributes & 0x0007); compression != 0 { var err error if rawRecords, err = decompressor.decompress(rawRecords, compression); err != nil { - return // truncated batch + return 0, 0 // truncated batch } } + uncompressedBytes := len(rawRecords) + numRecords := int(batch.NumRecords) krecords := readRawRecords(numRecords, rawRecords) @@ -1056,6 +1077,7 @@ func (o *cursorOffsetNext) processRecordBatch( } } + return len(krecords), uncompressedBytes } // Processes an outer v1 message. There could be no inner message, which makes @@ -1066,18 +1088,20 @@ func (o *cursorOffsetNext) processV1OuterMessage( fp *FetchPartition, message *kmsg.MessageV1, decompressor *decompressor, -) { +) (int, int) { compression := byte(message.Attributes & 0x0003) if compression == 0 { o.processV1Message(fp, message) - return + return 1, 0 } rawInner, err := decompressor.decompress(message.Value, compression) if err != nil { - return // truncated batch + return 0, 0 // truncated batch } + uncompressedBytes := len(rawInner) + var innerMessages []readerFrom out: for len(rawInner) > 17 { // magic at byte 17 @@ -1128,7 +1152,7 @@ out: rawInner = rawInner[length:] } if len(innerMessages) == 0 { - return + return 0, uncompressedBytes } firstOffset := message.Offset - int64(len(innerMessages)) + 1 @@ -1138,15 +1162,16 @@ out: case *kmsg.MessageV0: innerMessage.Offset = firstOffset + int64(i) if !o.processV0Message(fp, innerMessage) { - return + return i, uncompressedBytes } case *kmsg.MessageV1: innerMessage.Offset = firstOffset + int64(i) if !o.processV1Message(fp, innerMessage) { - return + return i, uncompressedBytes } } } + return len(innerMessages), uncompressedBytes } func (o *cursorOffsetNext) processV1Message( @@ -1172,18 +1197,20 @@ func (o *cursorOffsetNext) processV0OuterMessage( fp *FetchPartition, message *kmsg.MessageV0, decompressor *decompressor, -) { +) (int, int) { compression := byte(message.Attributes & 0x0003) if compression == 0 { o.processV0Message(fp, message) - return + return 1, 0 // uncompressed bytes is 0; set to compressed bytes on return } rawInner, err := decompressor.decompress(message.Value, compression) if err != nil { - return // truncated batch + return 0, 0 // truncated batch } + uncompressedBytes := len(rawInner) + var innerMessages []kmsg.MessageV0 for len(rawInner) > 17 { // magic at byte 17 length := int32(binary.BigEndian.Uint32(rawInner[8:])) @@ -1208,7 +1235,7 @@ func (o *cursorOffsetNext) processV0OuterMessage( rawInner = rawInner[length:] } if len(innerMessages) == 0 { - return + return 0, uncompressedBytes } firstOffset := message.Offset - int64(len(innerMessages)) + 1 @@ -1216,9 +1243,10 @@ func (o *cursorOffsetNext) processV0OuterMessage( innerMessage := &innerMessages[i] innerMessage.Offset = firstOffset + int64(i) if !o.processV0Message(fp, innerMessage) { - return + return i, uncompressedBytes } } + return len(innerMessages), uncompressedBytes } func (o *cursorOffsetNext) processV0Message(