Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block when consuming exactly_once topic #1106

Closed
FrancoisPoinsot opened this issue May 25, 2018 · 0 comments · Fixed by #1115
Closed

Block when consuming exactly_once topic #1106

FrancoisPoinsot opened this issue May 25, 2018 · 0 comments · Fixed by #1115
Labels

Comments

@FrancoisPoinsot
Copy link
Contributor

FrancoisPoinsot commented May 25, 2018

Versions

Sarama Version: 3c763ff
Kafka Version: Client V1_0_0_0 , Server 0.11.0.0
Go Version: 1.10

Configuration
Config.Group.Return.Notifications = true
Group.PartitionStrategy = StrategyRange
Group.Offsets.Retry.Max = 3
Group.Session.Timeout = 30 * time.Second
Group.Heartbeat.Interval = 3 * time.Second

Net.MaxOpenRequests = 5
Net.DialTimeout = 30 * time.Second
Net.ReadTimeout = 30 * time.Second
Net.WriteTimeout = 30 * time.Second
Net.SASL.Handshake = true

Metadata.Retry.Max = 3
Metadata.Retry.Backoff = 250 * time.Millisecond
Metadata.RefreshFrequency = 10 * time.Minute
Metadata.Full = true
    
Consumer.Fetch.Min = 1
Consumer.Fetch.Default = 1024 * 1024
Consumer.Retry.Backoff = 2 * time.Second
Consumer.MaxWaitTime = 250 * time.Millisecond
Consumer.MaxProcessingTime = 100 * time.Millisecond
Consumer.Offsets.CommitInterval = 1 * time.Second
Consumer.Return.Errors = true
Consumer.Offsets.Initial = sarama.OffsetOldest
Consumer.Offsets.Retention = 192 * time.Hour 

ClientID = defaultClientID
ChannelBufferSize = 256
Logs

Since no log come out from client when my problem appear even with sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) I added spew to see what response i get from broker. For sake of simplicity I only watch partition 7

When my problem happen I get a loop of this:

fetch offset 46508 
(*sarama.FetchResponseBlock)(0xc04275e2a0)({
 Err: (sarama.KError) kafka server: Not an error, why are you printing me?,
 HighWaterMarkOffset: (int64) 49334,
 LastStableOffset: (int64) -1,
 AbortedTransactions: ([]*sarama.AbortedTransaction) <nil>,
 Records: (*sarama.Records)(0xc0427622e0)({
  recordsType: (int) 2,
  msgSet: (*sarama.MessageSet)(<nil>),
  recordBatch: (*sarama.RecordBatch)(0xc0427603c0)({
   FirstOffset: (int64) 46508,
   PartitionLeaderEpoch: (int32) 23,
   Version: (int8) 2,
   Codec: (sarama.CompressionCodec) 0,
   Control: (bool) true,
   LastOffsetDelta: (int32) 0,
   FirstTimestamp: (time.Time) 2018-05-18 03:34:19.073 +0200 CEST,
   MaxTimestamp: (time.Time) 2018-05-18 03:34:19.073 +0200 CEST,
   ProducerID: (int64) 103032,
   ProducerEpoch: (int16) 357,
   FirstSequence: (int32) -1,
   Records: ([]*sarama.Record) (len=1 cap=1) {
    (*sarama.Record)(0xc04275a620)({
     Attributes: (int8) 0,
     TimestampDelta: (time.Duration) 0s,
     OffsetDelta: (int64) 0,
     Key: ([]uint8) (len=4 cap=12) {
      00000000  00 00 00 01                                       |....|
     },
     Value: ([]uint8) (len=6 cap=7) {
      00000000  00 00 00 00 00 0d                                 |......|
     },
     Headers: ([]*sarama.RecordHeader) {
     },
     length: (sarama.varintLengthField) {
      startOffset: (int) 0,
      length: (int64) 16
     }
    })
   },
   PartialTrailingRecord: (bool) false,
   compressedRecords: ([]uint8) <nil>,
   recordsLen: (int) 17
  })
 }),
 RecordsSet: ([]*sarama.Records) (len=1 cap=1) {
  (*sarama.Records)(0xc0427622e0)(<already shown>)
 },
 Partial: (bool) false
})
Problem Description

I am consuming a topic which producer are java stream API using exactly_once sementic.
At some point my consumer using sarama-cluster block on an offset. There is no log in the console, there is no message coming from the chanel returned by Messages().
Basically nothing happen from client library user point of view.

It block on very differents offset foreach partition.
Reconsuming from start end up blocking on the exact same offsets.

I looked a bit inside the library and made a custom log on func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) to see what is happening.
For sake of simplicity i focus on only one partition.
Turn out my client constantly request for offset 46508 and received a response containing a control record.

A bit further in func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error)
I see that when receiving what is considered as a control record, the child offset is not incremented.

Not intended as a PR, just to show basically what i am talking about, here is a quick fix that solve my current situation:
ricardo-ch@525bef5

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants