diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index fc375311..4dcba685 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -92,6 +92,15 @@ type sinkAndSource struct { source *source } +func (cl *Client) allSinksAndSources(fn func(sns sinkAndSource)) { + cl.sinksAndSourcesMu.Lock() + defer cl.sinksAndSourcesMu.Unlock() + + for _, sns := range cl.sinksAndSources { + fn(sns) + } +} + type hostport struct { host string port int32 diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index b1068435..09f729cd 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -425,13 +425,9 @@ func (cl *Client) PauseFetchPartitions(topicPartitions map[string][]int32) map[s // paused. Resuming topics that are not currently paused is a per-topic no-op. // See the documentation on PauseTfetchTopics for more details. func (cl *Client) ResumeFetchTopics(topics ...string) { - defer func() { - cl.sinksAndSourcesMu.Lock() - for _, sns := range cl.sinksAndSources { - sns.source.maybeConsume() - } - cl.sinksAndSourcesMu.Unlock() - }() + defer cl.allSinksAndSources(func(sns sinkAndSource) { + sns.source.maybeConsume() + }) c := &cl.consumer c.pausedMu.Lock() @@ -447,13 +443,9 @@ func (cl *Client) ResumeFetchTopics(topics ...string) { // per-topic no-op. See the documentation on PauseFetchPartitions for more // details. func (cl *Client) ResumeFetchPartitions(topicPartitions map[string][]int32) { - defer func() { - cl.sinksAndSourcesMu.Lock() - for _, sns := range cl.sinksAndSources { - sns.source.maybeConsume() - } - cl.sinksAndSourcesMu.Unlock() - }() + defer cl.allSinksAndSources(func(sns sinkAndSource) { + sns.source.maybeConsume() + }) c := &cl.consumer c.pausedMu.Lock() @@ -1123,11 +1115,9 @@ func (c *consumer) stopSession() (listOrEpochLoads, *topicsPartitions) { // our num-fetches manager without worrying about a source trying to // register itself. - c.cl.sinksAndSourcesMu.Lock() - for _, sns := range c.cl.sinksAndSources { + c.cl.allSinksAndSources(func(sns sinkAndSource) { sns.source.session.reset() - } - c.cl.sinksAndSourcesMu.Unlock() + }) // At this point, if we begin fetching anew, then the sources will not // be using stale fetch sessions. @@ -1166,11 +1156,9 @@ func (c *consumer) startNewSession(tps *topicsPartitions) *consumerSession { c.sessionChangeMu.Unlock() - c.cl.sinksAndSourcesMu.Lock() - for _, sns := range c.cl.sinksAndSources { + c.cl.allSinksAndSources(func(sns sinkAndSource) { sns.source.maybeConsume() - } - c.cl.sinksAndSourcesMu.Unlock() + }) // At this point, any source that was not consuming becauase it saw the // session was stopped has been notified to potentially start consuming diff --git a/pkg/kgo/metadata.go b/pkg/kgo/metadata.go index c80a2007..4da29471 100644 --- a/pkg/kgo/metadata.go +++ b/pkg/kgo/metadata.go @@ -445,6 +445,17 @@ func (cl *Client) fetchTopicMetadata(all bool, reqTopics []string) (map[string]* } cl.sinksAndSources[p.leader] = sns } + for _, replica := range partMeta.Replicas { + if replica < 0 { + continue + } + if _, exists = cl.sinksAndSources[replica]; !exists { + cl.sinksAndSources[replica] = sinkAndSource{ + sink: cl.newSink(replica), + source: cl.newSource(replica), + } + } + } cl.sinksAndSourcesMu.Unlock() p.records.sink = sns.sink p.cursor.source = sns.source