diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go index c27f2db9350..64bfa4ec37d 100644 --- a/cmd/ingester/app/consumer/consumer.go +++ b/cmd/ingester/app/consumer/consumer.go @@ -110,17 +110,19 @@ func (c *Consumer) Close() error { func (c *Consumer) handleMessages(pc sc.PartitionConsumer) { c.logger.Info("Starting message handler", zap.Int32("partition", pc.Partition())) + c.partitionMapLock.Lock() c.partitionsHeld++ c.partitionsHeldGauge.Update(c.partitionsHeld) + wg := &c.partitionIDToState[pc.Partition()].wg + c.partitionMapLock.Unlock() defer func() { + c.closePartition(pc) + wg.Done() + c.partitionMapLock.Lock() c.partitionsHeld-- c.partitionsHeldGauge.Update(c.partitionsHeld) + c.partitionMapLock.Unlock() }() - c.partitionMapLock.Lock() - wg := &c.partitionIDToState[pc.Partition()].wg - c.partitionMapLock.Unlock() - defer wg.Done() - defer c.closePartition(pc) msgMetrics := c.newMsgMetrics(pc.Partition())