From 86705144cb51c5b6001adbd40f2a36746b4039ae Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sun, 26 Mar 2023 21:33:01 -0600 Subject: [PATCH 1/3] kfake: AddNode, RemoveNode, ShufflePartitionLeaders Also adds better faked replicas, adds per-partition epochs, and adds ListOffsets searching by timestamps Removes dependency on slices.BinarySearchFunc, in favor of sort.Find. --- pkg/kfake/02_list_offsets.go | 29 ++++-- pkg/kfake/03_metadata.go | 30 ++++-- pkg/kfake/19_create_topics.go | 8 +- pkg/kfake/cluster.go | 183 ++++++++++++++++++++++++++-------- pkg/kfake/data.go | 52 ++++++++-- pkg/kfake/go.mod | 1 - pkg/kfake/go.sum | 2 - pkg/kfake/topic_partition.go | 8 ++ 8 files changed, 241 insertions(+), 72 deletions(-) diff --git a/pkg/kfake/02_list_offsets.go b/pkg/kfake/02_list_offsets.go index 0e32115d..5ab7843d 100644 --- a/pkg/kfake/02_list_offsets.go +++ b/pkg/kfake/02_list_offsets.go @@ -1,15 +1,12 @@ package kfake import ( + "sort" + "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kmsg" ) -// TODO -// -// * Timestamp >= 0 -// * LeaderEpoch in response - func init() { regKey(2, 0, 7) } func (c *Cluster) handleListOffsets(b *broker, kreq kmsg.Request) (kmsg.Response, error) { @@ -57,16 +54,17 @@ func (c *Cluster) handleListOffsets(b *broker, kreq kmsg.Request) (kmsg.Response continue } if le := rp.CurrentLeaderEpoch; le != -1 { - if le < c.epoch { + if le < pd.epoch { donep(rt.Topic, rp.Partition, kerr.FencedLeaderEpoch.Code) continue - } else if le > c.epoch { + } else if le > pd.epoch { donep(rt.Topic, rp.Partition, kerr.UnknownLeaderEpoch.Code) continue } } sp := donep(rt.Topic, rp.Partition, 0) + sp.LeaderEpoch = pd.epoch switch rp.Timestamp { case -2: sp.Offset = pd.logStartOffset @@ -77,7 +75,22 @@ func (c *Cluster) handleListOffsets(b *broker, kreq kmsg.Request) (kmsg.Response sp.Offset = pd.highWatermark } default: - sp.ErrorCode = kerr.UnknownServerError.Code + idx, _ := sort.Find(len(pd.batches), func(idx int) int { + maxEarlier := pd.batches[idx].maxEarlierTimestamp + switch { + case maxEarlier < rp.Timestamp: + return -1 + case maxEarlier == rp.Timestamp: + return 0 + default: + return 1 + } + }) + if idx == len(pd.batches) { + sp.Offset = -1 + } else { + sp.Offset = pd.batches[idx].FirstOffset + } } } } diff --git a/pkg/kfake/03_metadata.go b/pkg/kfake/03_metadata.go index e7b9a994..d76f867b 100644 --- a/pkg/kfake/03_metadata.go +++ b/pkg/kfake/03_metadata.go @@ -57,6 +57,22 @@ func (c *Cluster) handleMetadata(kreq kmsg.Request) (kmsg.Response, error) { st.Partitions = append(st.Partitions, sp) return &st.Partitions[len(st.Partitions)-1] } + okp := func(t string, id uuid, p int32, pd *partData) { + nreplicas := c.data.treplicas[t] + if nreplicas > len(c.bs) { + nreplicas = len(c.bs) + } + + sp := donep(t, id, p, 0) + sp.Leader = pd.leader.node + sp.LeaderEpoch = pd.epoch + + for i := 0; i < nreplicas; i++ { + idx := (pd.leader.bsIdx + i) % len(c.bs) + sp.Replicas = append(sp.Replicas, c.bs[idx].node) + } + sp.ISR = sp.Replicas + } allowAuto := req.AllowAutoTopicCreation && c.cfg.allowAutoTopic for _, rt := range req.Topics { @@ -82,28 +98,20 @@ func (c *Cluster) handleMetadata(kreq kmsg.Request) (kmsg.Response, error) { donet(topic, rt.TopicID, kerr.UnknownTopicOrPartition.Code) continue } - c.data.mkt(topic, -1) + c.data.mkt(topic, -1, -1) ps, _ = c.data.tps.gett(topic) } id := c.data.t2id[topic] for p, pd := range ps { - sp := donep(topic, id, p, 0) - sp.Leader = pd.leader.node - sp.LeaderEpoch = c.epoch - sp.Replicas = []int32{sp.Leader} - sp.ISR = []int32{sp.Leader} + okp(topic, id, p, pd) } } if req.Topics == nil && c.data.tps != nil { for topic, ps := range c.data.tps { id := c.data.t2id[topic] for p, pd := range ps { - sp := donep(topic, id, p, 0) - sp.Leader = pd.leader.node - sp.LeaderEpoch = c.epoch - sp.Replicas = []int32{sp.Leader} - sp.ISR = []int32{sp.Leader} + okp(topic, id, p, pd) } } } diff --git a/pkg/kfake/19_create_topics.go b/pkg/kfake/19_create_topics.go index a68a14e9..392aaecd 100644 --- a/pkg/kfake/19_create_topics.go +++ b/pkg/kfake/19_create_topics.go @@ -55,11 +55,15 @@ func (c *Cluster) handleCreateTopics(b *broker, kreq kmsg.Request) (kmsg.Respons donet(rt.Topic, kerr.InvalidReplicaAssignment.Code) continue } - c.data.mkt(rt.Topic, int(rt.NumPartitions)) + if int(rt.ReplicationFactor) > len(c.bs) { + donet(rt.Topic, kerr.InvalidReplicationFactor.Code) + continue + } + c.data.mkt(rt.Topic, int(rt.NumPartitions), int(rt.ReplicationFactor)) st := donet(rt.Topic, 0) st.TopicID = c.data.t2id[rt.Topic] st.NumPartitions = int32(len(c.data.tps[rt.Topic])) - st.ReplicationFactor = rt.ReplicationFactor + st.ReplicationFactor = int16(c.data.treplicas[rt.Topic]) } return resp, nil diff --git a/pkg/kfake/cluster.go b/pkg/kfake/cluster.go index 3fc75bdc..4c137996 100644 --- a/pkg/kfake/cluster.go +++ b/pkg/kfake/cluster.go @@ -3,7 +3,9 @@ package kfake import ( "errors" "fmt" + "math/rand" "net" + "strconv" "sync" "sync/atomic" "time" @@ -39,18 +41,18 @@ type ( control map[int16][]controlFn keepCurrentControl atomic.Bool - epoch int32 - data data - pids pids + data data + pids pids die chan struct{} dead atomic.Bool } broker struct { - c *Cluster - ln net.Listener - node int32 + c *Cluster + ln net.Listener + node int32 + bsIdx int } controlFn func(kmsg.Request) (kmsg.Response, error, bool) @@ -88,8 +90,9 @@ func NewCluster(opts ...Opt) (c *Cluster, err error) { watchFetchCh: make(chan *watchFetch, 20), data: data{ - id2t: make(map[uuid]string), - t2id: make(map[string]uuid), + id2t: make(map[uuid]string), + t2id: make(map[string]uuid), + treplicas: make(map[string]int), }, die: make(chan struct{}), @@ -106,10 +109,16 @@ func NewCluster(opts ...Opt) (c *Cluster, err error) { if len(cfg.ports) > 0 { port = cfg.ports[i] } + ln, err := newListener(port) + if err != nil { + c.Close() + return nil, err + } b := &broker{ - c: c, - ln: newListener(port), - node: int32(i + 1), + c: c, + ln: ln, + node: int32(i), + bsIdx: len(c.bs), } c.bs = append(c.bs, b) go b.listen() @@ -122,9 +131,11 @@ func NewCluster(opts ...Opt) (c *Cluster, err error) { // ListenAddrs returns the hostports that the cluster is listening on. func (c *Cluster) ListenAddrs() []string { var addrs []string - for _, b := range c.bs { - addrs = append(addrs, b.ln.Addr().String()) - } + c.admin(func() { + for _, b := range c.bs { + addrs = append(addrs, b.ln.Addr().String()) + } + }) return addrs } @@ -139,14 +150,8 @@ func (c *Cluster) Close() { } } -func newListener(port int) net.Listener { - l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port)) - if err != nil { - if l, err = net.Listen("tcp6", fmt.Sprintf("[::1]:%d")); err != nil { - panic(fmt.Sprintf("kfake: failed to listen on a port: %v", err)) - } - } - return l +func newListener(port int) (net.Listener, error) { + return net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port)) } func (b *broker) listen() { @@ -335,33 +340,133 @@ func (c *Cluster) callControl(key int16, req kmsg.Request, fn controlFn) (kresp // that handles client requests. func (c *Cluster) admin(fn func()) { + ofn := fn + wait := make(chan struct{}) + fn = func() { ofn(); close(wait) } c.adminCh <- fn + <-wait } // MoveTopicPartition simulates the rebalancing of a partition to an alternative -// broker -func (c *Cluster) MoveTopicPartition(topic string, partition int32, bid int32) error { - var br *broker - for _, b := range c.bs { - if b.node == bid { - br = b - break - } - } - if br == nil { - return errors.New("no such broker") - } - - resp := make(chan error, 1) +// broker. This returns an error if the topic, partition, or node does not exit. +func (c *Cluster) MoveTopicPartition(topic string, partition int32, nodeID int32) error { + var err error c.admin(func() { + var br *broker + for _, b := range c.bs { + if b.node == nodeID { + br = b + break + } + } + if br == nil { + err = fmt.Errorf("node %d not found", nodeID) + return + } pd, ok := c.data.tps.getp(topic, partition) if !ok { - resp <- errors.New("topic/partition not found") + err = errors.New("topic/partition not found") return } pd.leader = br - resp <- nil }) + return err +} - return <-resp +// AddNode adds a node to the cluster. If nodeID is -1, the next node ID is +// used. If port is 0 or negative, a random port is chosen. This returns the +// added node ID and the port used, or an error if the node already exists or +// the port cannot be listened to. +func (c *Cluster) AddNode(nodeID int32, port int) (int32, int, error) { + var err error + c.admin(func() { + if nodeID >= 0 { + for _, b := range c.bs { + if b.node == nodeID { + err = fmt.Errorf("node %d already exists", nodeID) + return + } + } + } else if len(c.bs) > 0 { + // We go one higher than the max current node ID. We + // need to search all nodes because a person may have + // added and removed a bunch, with manual ID overrides. + nodeID = c.bs[0].node + for _, b := range c.bs[1:] { + if b.node > nodeID { + nodeID = b.node + } + } + nodeID++ + } else { + nodeID = 0 + } + if port < 0 { + port = 0 + } + var ln net.Listener + if ln, err = newListener(port); err != nil { + return + } + _, strPort, _ := net.SplitHostPort(ln.Addr().String()) + port, _ = strconv.Atoi(strPort) + b := &broker{ + c: c, + ln: ln, + node: nodeID, + bsIdx: len(c.bs), + } + c.bs = append(c.bs, b) + c.cfg.nbrokers++ + c.shufflePartitionsLocked() + go b.listen() + }) + return nodeID, port, err +} + +// RemoveNode removes a ndoe from the cluster. This returns an error if the +// node does not exist. +func (c *Cluster) RemoveNode(nodeID int32) error { + var err error + c.admin(func() { + for i, b := range c.bs { + if b.node == nodeID { + if len(c.bs) == 1 { + err = errors.New("cannot remove all brokers") + return + } + b.ln.Close() + c.cfg.nbrokers-- + c.bs[i] = c.bs[len(c.bs)-1] + c.bs[i].bsIdx = i + c.bs = c.bs[:len(c.bs)-1] + c.shufflePartitionsLocked() + return + } + } + err = fmt.Errorf("node %d not found", nodeID) + }) + return err +} + +// ShufflePartitionLeaders simulates a leader election for all partitions: all +// partitions have a randomly selected new leader and their internal epochs are +// bumped. +func (c *Cluster) ShufflePartitionLeaders() { + c.admin(func() { + c.shufflePartitionsLocked() + }) +} + +func (c *Cluster) shufflePartitionsLocked() { + c.data.tps.each(func(_ string, _ int32, p *partData) { + var leader *broker + if len(c.bs) == 0 { + leader = c.noLeader() + } else { + leader = c.bs[rand.Intn(len(c.bs))] + } + p.leader = leader + p.epoch++ + }) } diff --git a/pkg/kfake/data.go b/pkg/kfake/data.go index 1bcb90a0..7f6f54b0 100644 --- a/pkg/kfake/data.go +++ b/pkg/kfake/data.go @@ -3,11 +3,11 @@ package kfake import ( "crypto/sha256" "math/rand" + "sort" "strconv" "time" "github.com/twmb/franz-go/pkg/kmsg" - "golang.org/x/exp/slices" ) // TODO @@ -24,8 +24,9 @@ type ( c *Cluster tps tps[partData] - id2t map[uuid]string // topic IDs => topic name - t2id map[string]uuid // topic name => topic IDs + id2t map[uuid]string // topic IDs => topic name + t2id map[string]uuid // topic name => topic IDs + treplicas map[string]int // topic name => # replicas } partData struct { @@ -34,6 +35,8 @@ type ( highWatermark int64 lastStableOffset int64 logStartOffset int64 + epoch int32 // current epoch + maxTimestamp int64 // current max timestamp in all batches // abortedTxns rf int8 @@ -47,10 +50,22 @@ type ( partBatch struct { kmsg.RecordBatch nbytes int + epoch int32 // epoch when appended + + // For list offsets, we may need to return the first offset + // after a given requested timestamp. Client provided + // timestamps gan go forwards and backwards. We answer list + // offsets with a binary search: even if this batch has a small + // timestamp, this is produced _after_ a potentially higher + // timestamp, so it is after it in the list offset response. + // + // When we drop the earlier timestamp, we update all following + // firstMaxTimestamps that match the dropped timestamp. + maxEarlierTimestamp int64 } ) -func (d *data) mkt(t string, nparts int) { +func (d *data) mkt(t string, nparts int, nreplicas int) { if d.tps != nil { if _, exists := d.tps[t]; exists { panic("should have checked existence already") @@ -68,24 +83,42 @@ func (d *data) mkt(t string, nparts int) { if nparts < 0 { nparts = d.c.cfg.defaultNumParts } + if nreplicas < 0 { + nreplicas = 3 // cluster default + } d.id2t[id] = t d.t2id[t] = id + d.treplicas[t] = nreplicas for i := 0; i < nparts; i++ { d.tps.mkp(t, int32(i), d.c.newPartData) } } +func (c *Cluster) noLeader() *broker { + return &broker{ + c: c, + node: -1, + } +} + func (c *Cluster) newPartData() *partData { return &partData{ - leader: c.bs[rand.Uint64()%uint64(len(c.bs))], + leader: c.bs[rand.Intn(len(c.bs))], watch: make(map[*watchFetch]struct{}), createdAt: time.Now(), } } func (pd *partData) pushBatch(nbytes int, b kmsg.RecordBatch) { + maxEarlierTimestamp := b.FirstTimestamp + if maxEarlierTimestamp < pd.maxTimestamp { + maxEarlierTimestamp = pd.maxTimestamp + } else { + pd.maxTimestamp = maxEarlierTimestamp + } b.FirstOffset = pd.highWatermark - pd.batches = append(pd.batches, partBatch{b, nbytes}) + b.PartitionLeaderEpoch = pd.epoch + pd.batches = append(pd.batches, partBatch{b, nbytes, pd.epoch, maxEarlierTimestamp}) pd.highWatermark += int64(b.NumRecords) pd.lastStableOffset += int64(b.NumRecords) // TODO for w := range pd.watch { @@ -105,11 +138,12 @@ func (pd *partData) searchOffset(o int64) (index int, found bool, atEnd bool) { } } - index, found = slices.BinarySearchFunc(pd.batches, o, func(b partBatch, o int64) int { - if b.FirstOffset+int64(b.NumRecords) <= o { + index, found = sort.Find(len(pd.batches), func(idx int) int { + b := &pd.batches[idx] + if o < b.FirstOffset { return -1 } - if b.FirstOffset > o { + if o >= b.FirstOffset+int64(b.NumRecords) { return 1 } return 0 diff --git a/pkg/kfake/go.mod b/pkg/kfake/go.mod index 5828647a..2b80a0a2 100644 --- a/pkg/kfake/go.mod +++ b/pkg/kfake/go.mod @@ -5,5 +5,4 @@ go 1.20 require ( github.com/twmb/franz-go v1.13.0 github.com/twmb/franz-go/pkg/kmsg v1.4.0 - golang.org/x/exp v0.0.0-20230310171629-522b1b587ee0 ) diff --git a/pkg/kfake/go.sum b/pkg/kfake/go.sum index 32def2e1..d3852dcb 100644 --- a/pkg/kfake/go.sum +++ b/pkg/kfake/go.sum @@ -2,5 +2,3 @@ github.com/twmb/franz-go v1.13.0 h1:J4VyTXVlOhiCDCXS56ut2ZRAylaimPXnIqtCq9Wlfbw= github.com/twmb/franz-go v1.13.0/go.mod h1:jm/FtYxmhxDTN0gNSb26XaJY0irdSVcsckLiR5tQNMk= github.com/twmb/franz-go/pkg/kmsg v1.4.0 h1:tbp9hxU6m8qZhQTlpGiaIJOm4BXix5lsuEZ7K00dF0s= github.com/twmb/franz-go/pkg/kmsg v1.4.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= -golang.org/x/exp v0.0.0-20230310171629-522b1b587ee0 h1:LGJsf5LRplCck6jUCH3dBL2dmycNruWNF5xugkSlfXw= -golang.org/x/exp v0.0.0-20230310171629-522b1b587ee0/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= diff --git a/pkg/kfake/topic_partition.go b/pkg/kfake/topic_partition.go index e02b01a8..d7e7255d 100644 --- a/pkg/kfake/topic_partition.go +++ b/pkg/kfake/topic_partition.go @@ -51,3 +51,11 @@ func (tps *tps[V]) mkpDefault(t string, p int32) *V { func (tps *tps[V]) set(t string, p int32, v V) { *tps.mkpDefault(t, p) = v } + +func (tps *tps[V]) each(fn func(t string, p int32, v *V)) { + for t, ps := range *tps { + for p, v := range ps { + fn(t, p, v) + } + } +} From fc05b01d54cd58ac61c3f7ee6eab25b7409a405a Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 27 Mar 2023 14:58:35 -0600 Subject: [PATCH 2/3] kfake: bugfix producing without a producer ID --- pkg/kfake/pid.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/kfake/pid.go b/pkg/kfake/pid.go index b3cc8536..eacf2cf8 100644 --- a/pkg/kfake/pid.go +++ b/pkg/kfake/pid.go @@ -72,6 +72,10 @@ func (pids *pids) create(txnalID *string) pid { } func (seqs *pidseqs) pushAndValidate(firstSeq, numRecs int32) (ok, dup bool) { + // If there is no pid, we do not do duplicate detection. + if seqs == nil { + return true, false + } var ( seq = firstSeq seq64 = int64(seq) From 1b13fa592ef3ddce1a65f8f131dacfdd68820728 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 27 Mar 2023 14:58:52 -0600 Subject: [PATCH 3/3] kfake: add support for OffsetForLeaderEpoch --- pkg/kfake/23_offset_for_leader_epoch.go | 111 ++++++++++++++++++++++++ pkg/kfake/cluster.go | 2 + 2 files changed, 113 insertions(+) create mode 100644 pkg/kfake/23_offset_for_leader_epoch.go diff --git a/pkg/kfake/23_offset_for_leader_epoch.go b/pkg/kfake/23_offset_for_leader_epoch.go new file mode 100644 index 00000000..5d9cfc82 --- /dev/null +++ b/pkg/kfake/23_offset_for_leader_epoch.go @@ -0,0 +1,111 @@ +package kfake + +import ( + "sort" + + "github.com/twmb/franz-go/pkg/kerr" + "github.com/twmb/franz-go/pkg/kmsg" +) + +func init() { regKey(23, 3, 4) } + +func (c *Cluster) handleOffsetForLeaderEpoch(b *broker, kreq kmsg.Request) (kmsg.Response, error) { + req := kreq.(*kmsg.OffsetForLeaderEpochRequest) + resp := req.ResponseKind().(*kmsg.OffsetForLeaderEpochResponse) + + if err := checkReqVersion(req.Key(), req.Version); err != nil { + return nil, err + } + + tidx := make(map[string]int) + donet := func(t string, errCode int16) *kmsg.OffsetForLeaderEpochResponseTopic { + if i, ok := tidx[t]; ok { + return &resp.Topics[i] + } + tidx[t] = len(resp.Topics) + st := kmsg.NewOffsetForLeaderEpochResponseTopic() + st.Topic = t + resp.Topics = append(resp.Topics, st) + return &resp.Topics[len(resp.Topics)-1] + } + donep := func(t string, p int32, errCode int16) *kmsg.OffsetForLeaderEpochResponseTopicPartition { + sp := kmsg.NewOffsetForLeaderEpochResponseTopicPartition() + sp.Partition = p + sp.ErrorCode = errCode + st := donet(t, 0) + st.Partitions = append(st.Partitions, sp) + return &st.Partitions[len(st.Partitions)-1] + } + + for _, rt := range req.Topics { + ps, ok := c.data.tps.gett(rt.Topic) + for _, rp := range rt.Partitions { + if req.ReplicaID != 1 { + donep(rt.Topic, rp.Partition, kerr.UnknownServerError.Code) + continue + } + if !ok { + donep(rt.Topic, rp.Partition, kerr.UnknownTopicOrPartition.Code) + continue + } + pd, ok := ps[rp.Partition] + if !ok { + donep(rt.Topic, rp.Partition, kerr.UnknownTopicOrPartition.Code) + continue + } + if pd.leader != b { + donep(rt.Topic, rp.Partition, kerr.NotLeaderForPartition.Code) + continue + } + if rp.CurrentLeaderEpoch < pd.epoch { + donep(rt.Topic, rp.Partition, kerr.FencedLeaderEpoch.Code) + continue + } else if rp.CurrentLeaderEpoch > pd.epoch { + donep(rt.Topic, rp.Partition, kerr.UnknownLeaderEpoch.Code) + continue + } + + sp := donep(rt.Topic, rp.Partition, 0) + + // If the user is requesting our current epoch, we return the HWM. + if rp.LeaderEpoch == pd.epoch { + sp.LeaderEpoch = pd.epoch + sp.EndOffset = pd.highWatermark + continue + } + + // What is the largest epoch after the requested epoch? + idx, _ := sort.Find(len(pd.batches), func(idx int) int { + batchEpoch := pd.batches[idx].epoch + switch { + case rp.LeaderEpoch <= batchEpoch: + return -1 + default: + return 1 + } + }) + + // Requested epoch is not yet known: keep -1 returns. + if idx == len(pd.batches) { + sp.LeaderEpoch = -1 + sp.EndOffset = -1 + continue + } + + // Requested epoch is before the LSO: return the requested + // epoch and the LSO. + if idx == 0 && pd.batches[0].epoch > rp.LeaderEpoch { + sp.LeaderEpoch = rp.LeaderEpoch + sp.EndOffset = pd.logStartOffset + continue + } + + // The requested epoch exists and is not the latest + // epoch, we return the end offset being the first + // offset of the next epoch. + sp.LeaderEpoch = pd.batches[idx].epoch + sp.EndOffset = pd.batches[idx+1].FirstOffset + } + } + return resp, nil +} diff --git a/pkg/kfake/cluster.go b/pkg/kfake/cluster.go index 4c137996..151466bd 100644 --- a/pkg/kfake/cluster.go +++ b/pkg/kfake/cluster.go @@ -229,6 +229,8 @@ func (c *Cluster) run() { kresp, err = c.handleDeleteTopics(creq.cc.b, kreq) case kmsg.InitProducerID: kresp, err = c.handleInitProducerID(kreq) + case kmsg.OffsetForLeaderEpoch: + kresp, err = c.handleOffsetForLeaderEpoch(kreq) case kmsg.CreatePartitions: kresp, err = c.handleCreatePartitions(creq.cc.b, kreq) default: