Skip to content

Commit

Permalink
kmsg: add new UnsafeReadFrom interface
Browse files Browse the repository at this point in the history
  • Loading branch information
twmb committed May 23, 2022
1 parent 852c5a8 commit d53c0fe
Showing 1 changed file with 31 additions and 5 deletions.
36 changes: 31 additions & 5 deletions pkg/kmsg/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,15 @@ type Response interface {
RequestKind() Request
}

// UnsafeReadFrom, implemented by all requests and responses generated in this
// package, switches to using unsafe slice-to-string conversions when reading.
// This can be used to avoid a lot of garbage, but it means to have to be
// careful when using any strings in structs: if you hold onto the string, the
// underlying response slice will not be garbage collected.
type UnsafeReadFrom interface {
UnsafeReadFrom([]byte) error
}

// ThrottleResponse represents a response that could have a throttle applied by
// Kafka.
//
Expand Down Expand Up @@ -235,6 +244,16 @@ func StringPtr(in string) *string {
// point of this type is that it does not contain a version number inside it,
// but it is versioned: if decoding v1 fails, this falls back to v0.
func (s *StickyMemberMetadata) ReadFrom(src []byte) error {
return s.readFrom(src, false)
}

// UnsafeReadFrom is the same as ReadFrom, but uses unsafe slice to string
// conversions to reduce garbage.
func (s *StickyMemberMetadata) UnsafeReadFrom(src []byte) error {
return s.readFrom(src, true)
}

func (s *StickyMemberMetadata) readFrom(src []byte, unsafe bool) error {
b := kbin.Reader{Src: src}
numAssignments := b.ArrayLen()
if numAssignments < 0 {
Expand All @@ -243,10 +262,16 @@ func (s *StickyMemberMetadata) ReadFrom(src []byte) error {
need := numAssignments - int32(cap(s.CurrentAssignment))
if need > 0 {
s.CurrentAssignment = append(s.CurrentAssignment[:cap(s.CurrentAssignment)], make([]StickyMemberMetadataCurrentAssignment, need)...)
} else {
s.CurrentAssignment = s.CurrentAssignment[:numAssignments]
}
s.CurrentAssignment = s.CurrentAssignment[:numAssignments]
for i := int32(0); i < numAssignments; i++ {
topic := b.String()
var topic string
if unsafe {
topic = b.UnsafeString()
} else {
topic = b.String()
}
numPartitions := b.ArrayLen()
if numPartitions < 0 {
numPartitions = 0
Expand All @@ -256,10 +281,11 @@ func (s *StickyMemberMetadata) ReadFrom(src []byte) error {
need := numPartitions - int32(cap(a.Partitions))
if need > 0 {
a.Partitions = append(a.Partitions[:cap(a.Partitions)], make([]int32, need)...)
} else {
a.Partitions = a.Partitions[:numPartitions]
}
a.Partitions = a.Partitions[:0]
for i := numPartitions; i > 0; i-- {
a.Partitions = append(a.Partitions, b.Int32())
for i := range a.Partitions {
a.Partitions[i] = b.Int32()
}
}
if len(b.Src) > 0 {
Expand Down

0 comments on commit d53c0fe

Please sign in to comment.