Skip to content

Commit

Permalink
Merge pull request #415 from twmb/kfake
Browse files Browse the repository at this point in the history
kfake
  • Loading branch information
twmb authored Mar 27, 2023
2 parents 91271e0 + 1b13fa5 commit e66bd29
Show file tree
Hide file tree
Showing 10 changed files with 358 additions and 72 deletions.
29 changes: 21 additions & 8 deletions pkg/kfake/02_list_offsets.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package kfake

import (
"sort"

"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kmsg"
)

// TODO
//
// * Timestamp >= 0
// * LeaderEpoch in response

func init() { regKey(2, 0, 7) }

func (c *Cluster) handleListOffsets(b *broker, kreq kmsg.Request) (kmsg.Response, error) {
Expand Down Expand Up @@ -57,16 +54,17 @@ func (c *Cluster) handleListOffsets(b *broker, kreq kmsg.Request) (kmsg.Response
continue
}
if le := rp.CurrentLeaderEpoch; le != -1 {
if le < c.epoch {
if le < pd.epoch {
donep(rt.Topic, rp.Partition, kerr.FencedLeaderEpoch.Code)
continue
} else if le > c.epoch {
} else if le > pd.epoch {
donep(rt.Topic, rp.Partition, kerr.UnknownLeaderEpoch.Code)
continue
}
}

sp := donep(rt.Topic, rp.Partition, 0)
sp.LeaderEpoch = pd.epoch
switch rp.Timestamp {
case -2:
sp.Offset = pd.logStartOffset
Expand All @@ -77,7 +75,22 @@ func (c *Cluster) handleListOffsets(b *broker, kreq kmsg.Request) (kmsg.Response
sp.Offset = pd.highWatermark
}
default:
sp.ErrorCode = kerr.UnknownServerError.Code
idx, _ := sort.Find(len(pd.batches), func(idx int) int {
maxEarlier := pd.batches[idx].maxEarlierTimestamp
switch {
case maxEarlier < rp.Timestamp:
return -1
case maxEarlier == rp.Timestamp:
return 0
default:
return 1
}
})
if idx == len(pd.batches) {
sp.Offset = -1
} else {
sp.Offset = pd.batches[idx].FirstOffset
}
}
}
}
Expand Down
30 changes: 19 additions & 11 deletions pkg/kfake/03_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,22 @@ func (c *Cluster) handleMetadata(kreq kmsg.Request) (kmsg.Response, error) {
st.Partitions = append(st.Partitions, sp)
return &st.Partitions[len(st.Partitions)-1]
}
okp := func(t string, id uuid, p int32, pd *partData) {
nreplicas := c.data.treplicas[t]
if nreplicas > len(c.bs) {
nreplicas = len(c.bs)
}

sp := donep(t, id, p, 0)
sp.Leader = pd.leader.node
sp.LeaderEpoch = pd.epoch

for i := 0; i < nreplicas; i++ {
idx := (pd.leader.bsIdx + i) % len(c.bs)
sp.Replicas = append(sp.Replicas, c.bs[idx].node)
}
sp.ISR = sp.Replicas
}

allowAuto := req.AllowAutoTopicCreation && c.cfg.allowAutoTopic
for _, rt := range req.Topics {
Expand All @@ -82,28 +98,20 @@ func (c *Cluster) handleMetadata(kreq kmsg.Request) (kmsg.Response, error) {
donet(topic, rt.TopicID, kerr.UnknownTopicOrPartition.Code)
continue
}
c.data.mkt(topic, -1)
c.data.mkt(topic, -1, -1)
ps, _ = c.data.tps.gett(topic)
}

id := c.data.t2id[topic]
for p, pd := range ps {
sp := donep(topic, id, p, 0)
sp.Leader = pd.leader.node
sp.LeaderEpoch = c.epoch
sp.Replicas = []int32{sp.Leader}
sp.ISR = []int32{sp.Leader}
okp(topic, id, p, pd)
}
}
if req.Topics == nil && c.data.tps != nil {
for topic, ps := range c.data.tps {
id := c.data.t2id[topic]
for p, pd := range ps {
sp := donep(topic, id, p, 0)
sp.Leader = pd.leader.node
sp.LeaderEpoch = c.epoch
sp.Replicas = []int32{sp.Leader}
sp.ISR = []int32{sp.Leader}
okp(topic, id, p, pd)
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/kfake/19_create_topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,15 @@ func (c *Cluster) handleCreateTopics(b *broker, kreq kmsg.Request) (kmsg.Respons
donet(rt.Topic, kerr.InvalidReplicaAssignment.Code)
continue
}
c.data.mkt(rt.Topic, int(rt.NumPartitions))
if int(rt.ReplicationFactor) > len(c.bs) {
donet(rt.Topic, kerr.InvalidReplicationFactor.Code)
continue
}
c.data.mkt(rt.Topic, int(rt.NumPartitions), int(rt.ReplicationFactor))
st := donet(rt.Topic, 0)
st.TopicID = c.data.t2id[rt.Topic]
st.NumPartitions = int32(len(c.data.tps[rt.Topic]))
st.ReplicationFactor = rt.ReplicationFactor
st.ReplicationFactor = int16(c.data.treplicas[rt.Topic])
}

return resp, nil
Expand Down
111 changes: 111 additions & 0 deletions pkg/kfake/23_offset_for_leader_epoch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package kfake

import (
"sort"

"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kmsg"
)

func init() { regKey(23, 3, 4) }

func (c *Cluster) handleOffsetForLeaderEpoch(b *broker, kreq kmsg.Request) (kmsg.Response, error) {
req := kreq.(*kmsg.OffsetForLeaderEpochRequest)
resp := req.ResponseKind().(*kmsg.OffsetForLeaderEpochResponse)

if err := checkReqVersion(req.Key(), req.Version); err != nil {
return nil, err
}

tidx := make(map[string]int)
donet := func(t string, errCode int16) *kmsg.OffsetForLeaderEpochResponseTopic {
if i, ok := tidx[t]; ok {
return &resp.Topics[i]
}
tidx[t] = len(resp.Topics)
st := kmsg.NewOffsetForLeaderEpochResponseTopic()
st.Topic = t
resp.Topics = append(resp.Topics, st)
return &resp.Topics[len(resp.Topics)-1]
}
donep := func(t string, p int32, errCode int16) *kmsg.OffsetForLeaderEpochResponseTopicPartition {
sp := kmsg.NewOffsetForLeaderEpochResponseTopicPartition()
sp.Partition = p
sp.ErrorCode = errCode
st := donet(t, 0)
st.Partitions = append(st.Partitions, sp)
return &st.Partitions[len(st.Partitions)-1]
}

for _, rt := range req.Topics {
ps, ok := c.data.tps.gett(rt.Topic)
for _, rp := range rt.Partitions {
if req.ReplicaID != 1 {
donep(rt.Topic, rp.Partition, kerr.UnknownServerError.Code)
continue
}
if !ok {
donep(rt.Topic, rp.Partition, kerr.UnknownTopicOrPartition.Code)
continue
}
pd, ok := ps[rp.Partition]
if !ok {
donep(rt.Topic, rp.Partition, kerr.UnknownTopicOrPartition.Code)
continue
}
if pd.leader != b {
donep(rt.Topic, rp.Partition, kerr.NotLeaderForPartition.Code)
continue
}
if rp.CurrentLeaderEpoch < pd.epoch {
donep(rt.Topic, rp.Partition, kerr.FencedLeaderEpoch.Code)
continue
} else if rp.CurrentLeaderEpoch > pd.epoch {
donep(rt.Topic, rp.Partition, kerr.UnknownLeaderEpoch.Code)
continue
}

sp := donep(rt.Topic, rp.Partition, 0)

// If the user is requesting our current epoch, we return the HWM.
if rp.LeaderEpoch == pd.epoch {
sp.LeaderEpoch = pd.epoch
sp.EndOffset = pd.highWatermark
continue
}

// What is the largest epoch after the requested epoch?
idx, _ := sort.Find(len(pd.batches), func(idx int) int {
batchEpoch := pd.batches[idx].epoch
switch {
case rp.LeaderEpoch <= batchEpoch:
return -1
default:
return 1
}
})

// Requested epoch is not yet known: keep -1 returns.
if idx == len(pd.batches) {
sp.LeaderEpoch = -1
sp.EndOffset = -1
continue
}

// Requested epoch is before the LSO: return the requested
// epoch and the LSO.
if idx == 0 && pd.batches[0].epoch > rp.LeaderEpoch {
sp.LeaderEpoch = rp.LeaderEpoch
sp.EndOffset = pd.logStartOffset
continue
}

// The requested epoch exists and is not the latest
// epoch, we return the end offset being the first
// offset of the next epoch.
sp.LeaderEpoch = pd.batches[idx].epoch
sp.EndOffset = pd.batches[idx+1].FirstOffset
}
}
return resp, nil
}
Loading

0 comments on commit e66bd29

Please sign in to comment.