Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Not seeing all messages when syncProducer called from goroutines #1032

Closed
kenschneider18 opened this issue Feb 2, 2018 · 36 comments
Closed
Labels

Comments

@kenschneider18
Copy link
Contributor

Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.
Sarama Version: 1.15.0
Kafka Version: 0.11.0
Go Version: 1.9.2 darwin/amd64

Configuration

What configuration values are you using for Sarama and Kafka?

Sarama configuration

config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 10
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Version = sarama.V0_11_0_0

Kafka configuration

broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://localhost:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/Cellar/kafka/kafka-log-1
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
auto.create.topics.enable=false 
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
Logs

When filing an issue please provide logs from Sarama and Kafka if at all
possible. You can set sarama.Logger to a log.Logger to capture Sarama debug
output.

sarama: client.go:115: Initializing new client
sarama: config.go:351: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
sarama: config.go:351: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
sarama: client.go:646: client/metadata fetching metadata for all topics from broker localhost:9092
sarama: broker.go:146: Connected to broker at localhost:9092 (unregistered)
sarama: client.go:429: client/brokers registered new broker #0 at localhost:9092
sarama: client.go:655: client/metadata found some partitions to be leaderless
sarama: client.go:635: client/metadata retrying after 250ms... (3 attempts remaining)
sarama: client.go:646: client/metadata fetching metadata for all topics from broker localhost:9092
sarama: client.go:655: client/metadata found some partitions to be leaderless
sarama: client.go:635: client/metadata retrying after 250ms... (2 attempts remaining)
sarama: client.go:646: client/metadata fetching metadata for all topics from broker localhost:9092
sarama: client.go:655: client/metadata found some partitions to be leaderless
sarama: client.go:635: client/metadata retrying after 250ms... (1 attempts remaining)
sarama: client.go:646: client/metadata fetching metadata for all topics from broker localhost:9092
sarama: client.go:655: client/metadata found some partitions to be leaderless
sarama: client.go:161: Successfully initialized new client
sarama: config.go:351: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
sarama: async_producer.go:601: producer/broker/0 starting up
sarama: async_producer.go:612: producer/broker/0 state change to [open] on people/0
sarama: broker.go:144: Connected to broker at localhost:9092 (registered as #0)
Problem Description

When calling SendMessage on a single instance of syncProducer from multiple goroutines, some messages seem to fail to be produced to Kafka. I've looked at what ends up on the stream using Apache's kafka-console-consumer and it shows only a fraction of the messages on the stream anywhere from half of the messages down to none. I wrote my own consumer using sarama and it's the same issue, however I get the below error message back from sarama. I want to use syncProducer because I need to guarantee that messages will be published to the stream in the order that they're received by my application. Maybe I've just implemented it wrong, but right now I'm out of ideas and I'm hoping someone on here can help me out.

sarama: consumer.go:755: consumer/broker/0 abandoned subscription to people/0 because kafka: response did not contain all the expected topic/partition blocks
Error: kafka: error while consuming people/0: kafka: response did not contain all the expected topic/partition blocks
sarama: consumer.go:345: consumer/people/0 finding new broker
sarama: client.go:644: client/metadata fetching metadata for [people] from broker localhost:9092
sarama: consumer.go:711: consumer/broker/0 added subscription to people/0

Here's how I created my topic: bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic people

I'm running a single broker on my local machine. I've written a sample program that can reproduce the issue. It's also worth noting that none of the calls to sendMessage() are returning errors when I run the code.

main.go

package main

import (
	"bytes"
	"fmt"
	"log"
	"strconv"
	"sync"
	"syncProducer/streamer"

	"github.com/Shopify/sarama"
	"github.com/linkedin/goavro"
	uuid "github.com/satori/go.uuid"
)

const personSchema = `{
	"type":"record",
	"name":"Person",
	"namespace":"com.example.people",
	"fields":[
		{
			"name":"Name",
			"type":"string"
		},
		{
			"name":"Address",
			"type":"string"
		},{
			"name":"City",
			"type":"string"
		},
		{
			"name":"State",
			"type":"string"
		},
		{
			"name":"ZIP",
			"type":"long"
		}
	]
}`

var (
	personCodec *goavro.Codec
	buf         bytes.Buffer
)

type (
	person struct {
		Name    string
		Address string
		City    string
		State   string
		ZIP     int64
	}
)

func main() {
	var err error
	personCodec, err = goavro.NewCodec(personSchema)
	if err != nil {
		panic(err)
	}

	producer, err := newSyncProducer()
	if err != nil {
		panic(err)
	}
	streamer := streamer.New(producer)

	// Create 10 avro message bodies
	var people [][]byte
	for i := 1; i < 11; i++ {
		aPerson := person{
			Name:    "Bob #" + strconv.Itoa(i),
			Address: strconv.Itoa(i) + " Main St.",
			City:    "SomeTown",
			State:   "CA",
			ZIP:     90210,
		}
		data, err := convertToAvro(aPerson)
		if err != nil {
			panic("Could not convert aPerson " + strconv.Itoa(i) + " to avro.")
		}
		people = append(people, data)
	}

	errc := make(chan error, 10)

	var wg sync.WaitGroup
	// Send messages
	for _, person := range people {
		wg.Add(1)
		go func(person []byte, c chan error, wg *sync.WaitGroup) {
			uuid := uuid.NewV4().String()
			err := streamer.SendActivity("people", uuid, "CreatePerson", person, nil)
			c <- err
			wg.Done()
		}(person, errc, &wg)
	}

	wg.Wait()
	close(errc)
	fmt.Println("Completed!")
	for i := range errc {
		fmt.Println(i)
		if i != nil {
			fmt.Printf("Exit: %v\n", i)
		}
	}

	fmt.Print(&buf)
}

func convertToAvro(aPerson person) ([]byte, error) {
	data, err := personCodec.BinaryFromNative(nil, map[string]interface{}{
		"Name":    aPerson.Name,
		"Address": aPerson.Address,
		"City":    aPerson.City,
		"State":   aPerson.State,
		"ZIP":     aPerson.ZIP,
	})
	if err != nil {
		return nil, err
	}

	return data, nil
}

func newSyncProducer() (sarama.SyncProducer, error) {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
	config.Producer.Retry.Max = 10                   // Retry up to 10 times to produce the message
	config.Producer.Return.Successes = true          // Required when using syncproducer
	config.Producer.Return.Errors = true
	config.Version = sarama.V0_11_0_0

	sarama.Logger = log.New(&buf, "sarama: ", log.Lshortfile)

	return sarama.NewSyncProducer([]string{"localhost:9092"}, config)
}

streamer.go

package streamer

import (
	"github.com/Shopify/sarama"
	"github.com/pkg/errors"
)

const (
	MessageTypeHeaderKey = "message-type"
	MessageIDHeaderKey = "message-id"
)

type (
	// Metadata contains metadata for a given activity.
	Metadata map[string][]string

	// Streamer handles streaming activities to a topic.
	Streamer struct {
		producer sarama.SyncProducer
	}
)

var (
	// ErrNoSubjects denotes that no subjects were provided.
	ErrNoSubjects = errors.New("At least one subject is required")
)

// New creates a new streamer.
func New(producer sarama.SyncProducer) *Streamer {
	return &Streamer{
		producer: producer,
	}
}

// SendActivity encapsulates the provided metadata and data in a message and send it to a topic.
func (s *Streamer) SendActivity(topic string, messageID string, messageHeaderValue string, data []byte, metadata Metadata) error {
	_, _, err := s.producer.SendMessage(&sarama.ProducerMessage{
		Topic: topic,
		Key:   sarama.StringEncoder(messageID),
		Value: sarama.ByteEncoder(data),
		Headers: []sarama.RecordHeader{
			sarama.RecordHeader{
				Key:   []byte(MessageIDHeaderKey),
				Value: []byte(messageID),
			},
			sarama.RecordHeader{
				Key:   []byte(MessageTypeHeaderKey),
				Value: []byte(messageHeaderValue),
			},
		},
	})
	if err != nil {
		return errors.Wrapf(err, "Error sending message to topic %s for ID %s", topic, messageID)
	}

	return nil
}
@eapache
Copy link
Contributor

eapache commented Feb 2, 2018

Hmm, this is very strange. The symptoms sound a lot like you have multiple partitions for that topic and don't realize it, but it looks like you created it with only one. It may still be worth checking the state of your cluster to ensure it doesn't have multiple for some reason.

Also concerning (but not obviously related to your issue) is the "sarama: client.go:655: client/metadata found some partitions to be leaderless" in the logs - sounds like maybe your cluster isn't entirely healthy.

The code you provided looks correct to me.

@glyn
Copy link

glyn commented Feb 5, 2018

I wonder if the problem is that the main program is being allowed to terminate before sarama has managed to send the messages to kafka? I've noticed a similar problem in one of my tests and if I add a sleep before exiting main, this gives sarama's goroutines a chance to do the necessary.

@kenschneider18
Copy link
Contributor Author

@glyn I tried adding a sleep for 5 seconds to my code after the waitgroup returns and still no dice :(

So I started with a fresh zookeeper and kafka and created the topic with only one partition to be sure there weren't multiple partitions and ensure a healthy cluster. Unfortunately, I still saw the issue.

A colleague of mine suggested that I try running the sarama producer without specifying config.Version. With the producer using the default version of kafka all of the messages are shown on the stream.

I wanted to see what the maximum version of kafka I could use and still have it produce all of the messages and I discovered it to be sarama.V0_10_2_0. For kicks, I also went forward to sarama.V1_0_0_0 and the issue returned.

This is a stab in the dark but could it be that the protocol for producing messages changed with the addition of headers in 0.11 and that's causing the producer to return before the message is fully produced?

@eapache
Copy link
Contributor

eapache commented Feb 6, 2018

The flow of the code is marginally different, but I can't imagine that we wouldn't be putting messages on the wire in the first place. It does seem possible though that we're producing malformed requests and that the broker is silently rejecting them. If you look at the broker-side logs, are there any messages suggesting that it's not accepting some of the messages?

Also worth checking: if you take the Headers out of your produce requests, but leave the version set to 0.11 does the issue still show up?

@eapache
Copy link
Contributor

eapache commented Feb 6, 2018

cc @wladh since this may be a record-batch implementation issue of some sort

@nullne
Copy link

nullne commented Feb 7, 2018

@kenschneider18 I met a similar problem described on #1034

@eapache
Copy link
Contributor

eapache commented Feb 7, 2018

I don't think it's really related, but I'd be curious if #1038 has any effect on this issue.

@kenschneider18
Copy link
Contributor Author

@eapache So I set the broker logs level to debug and tried running my program once as is and once without the version. There wasn't any noteworthy difference in the logs either time. It's worth noting that in both cases even though I'm producing 5 messages the logs only seem to output for updating the high watermark for the topic twice rather than for each message maybe due to batching under the hood?

Here's the log from a run without a specified version (all of the messages are produced):

kafka_1      | [2018-02-07 20:01:42,486] DEBUG Accepted connection from /172.20.0.1:41254 on /172.20.0.3:9092 and assigned it to processor 1, sendBufferSize [actual|requested]: [102400|102400] recvBufferSize [actual|requested]: [102400|102400] (kafka.network.Acceptor)
kafka_1      | [2018-02-07 20:01:42,487] DEBUG Processor 1 listening to new connection from /172.20.0.1:41254 (kafka.network.Processor)
kafka_1      | [2018-02-07 20:01:42,514] DEBUG Accepted connection from /172.20.0.1:41256 on /172.20.0.3:9092 and assigned it to processor 2, sendBufferSize [actual|requested]: [102400|102400] recvBufferSize [actual|requested]: [102400|102400] (kafka.network.Acceptor)
kafka_1      | [2018-02-07 20:01:42,515] DEBUG Processor 2 listening to new connection from /172.20.0.1:41256 (kafka.network.Processor)
kafka_1      | [2018-02-07 20:01:42,586] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition: people-0. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
kafka_1      | [2018-02-07 20:01:42,601] DEBUG [ReplicaManager broker=1001] Request key people-0 unblocked 0 fetch requests. (kafka.server.ReplicaManager)
kafka_1      | [2018-02-07 20:01:42,602] DEBUG [Partition people-0 broker=1001] High watermark updated to 2 [0 : 215] (kafka.cluster.Partition)
kafka_1      | [2018-02-07 20:01:42,603] DEBUG [ReplicaManager broker=1001] Request key people-0 unblocked 0 fetch requests. (kafka.server.ReplicaManager)
kafka_1      | [2018-02-07 20:01:42,604] DEBUG [ReplicaManager broker=1001] Request key people-0 unblocked 0 producer requests. (kafka.server.ReplicaManager)
kafka_1      | [2018-02-07 20:01:42,604] DEBUG [ReplicaManager broker=1001] Request key people-0 unblocked 0 DeleteRecordsRequest. (kafka.server.ReplicaManager)
kafka_1      | [2018-02-07 20:01:42,608] DEBUG [ReplicaManager broker=1001] Produce to local log in 82 ms (kafka.server.ReplicaManager)
kafka_1      | [2018-02-07 20:01:42,629] DEBUG [ReplicaManager broker=1001] Request key people-0 unblocked 0 fetch requests. (kafka.server.ReplicaManager)
kafka_1      | [2018-02-07 20:01:42,629] DEBUG [Partition people-0 broker=1001] High watermark updated to 5 [0 : 507] (kafka.cluster.Partition)
kafka_1      | [2018-02-07 20:01:42,630] DEBUG [ReplicaManager broker=1001] Request key people-0 unblocked 0 fetch requests. (kafka.server.ReplicaManager)
kafka_1      | [2018-02-07 20:01:42,631] DEBUG [ReplicaManager broker=1001] Request key people-0 unblocked 0 producer requests. (kafka.server.ReplicaManager)
kafka_1      | [2018-02-07 20:01:42,631] DEBUG [ReplicaManager broker=1001] Request key people-0 unblocked 0 DeleteRecordsRequest. (kafka.server.ReplicaManager)
kafka_1      | [2018-02-07 20:01:42,631] DEBUG [ReplicaManager broker=1001] Produce to local log in 5 ms (kafka.server.ReplicaManager)
kafka_1      | [2018-02-07 20:01:42,636] DEBUG [SocketServer brokerId=1001] Connection with /172.20.0.1 disconnected (org.apache.kafka.common.network.Selector)
kafka_1      | java.io.EOFException
kafka_1      | 	at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124)
kafka_1      | 	at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93)
kafka_1      | 	at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:231)
kafka_1      | 	at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:192)
kafka_1      | 	at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:528)
kafka_1      | 	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:469)
kafka_1      | 	at org.apache.kafka.common.network.Selector.poll(Selector.java:398)
kafka_1      | 	at kafka.network.Processor.poll(SocketServer.scala:535)
kafka_1      | 	at kafka.network.Processor.run(SocketServer.scala:452)
kafka_1      | 	at java.lang.Thread.run(Thread.java:748)
kafka_1      | [2018-02-07 20:01:42,636] DEBUG [SocketServer brokerId=1001] Connection with /172.20.0.1 disconnected (org.apache.kafka.common.network.Selector)
kafka_1      | java.io.EOFException
kafka_1      | 	at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124)
kafka_1      | 	at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93)
kafka_1      | 	at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:231)
kafka_1      | 	at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:192)
kafka_1      | 	at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:528)
kafka_1      | 	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:469)
kafka_1      | 	at org.apache.kafka.common.network.Selector.poll(Selector.java:398)
kafka_1      | 	at kafka.network.Processor.poll(SocketServer.scala:535)
kafka_1      | 	at kafka.network.Processor.run(SocketServer.scala:452)
kafka_1      | 	at java.lang.Thread.run(Thread.java:748)

Here's the log from the run with the version specified:

kafka_1      | [2018-02-07 20:13:22,382] DEBUG Accepted connection from /172.20.0.1:41328 on /172.20.0.3:9092 and assigned it to processor 1, sendBufferSize [actual|requested]: [102400|102400] recvBufferSize [actual|requested]: [102400|102400] (kafka.network.Acceptor)
kafka_1      | [2018-02-07 20:13:22,383] DEBUG Processor 1 listening to new connection from /172.20.0.1:41328 (kafka.network.Processor)
kafka_1      | [2018-02-07 20:13:22,418] DEBUG Accepted connection from /172.20.0.1:41330 on /172.20.0.3:9092 and assigned it to processor 2, sendBufferSize [actual|requested]: [102400|102400] recvBufferSize [actual|requested]: [102400|102400] (kafka.network.Acceptor)
kafka_1      | [2018-02-07 20:13:22,419] DEBUG Processor 2 listening to new connection from /172.20.0.1:41330 (kafka.network.Processor)
kafka_1      | [2018-02-07 20:13:22,471] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition: people-0. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
kafka_1      | [2018-02-07 20:13:22,490] DEBUG [ReplicaManager broker=1001] Request key people-0 unblocked 0 fetch requests. (kafka.server.ReplicaManager)
kafka_1      | [2018-02-07 20:13:22,492] DEBUG [Partition people-0 broker=1001] High watermark updated to 1 [0 : 212] (kafka.cluster.Partition)
kafka_1      | [2018-02-07 20:13:22,492] DEBUG [ReplicaManager broker=1001] Request key people-0 unblocked 0 fetch requests. (kafka.server.ReplicaManager)
kafka_1      | [2018-02-07 20:13:22,493] DEBUG [ReplicaManager broker=1001] Request key people-0 unblocked 0 producer requests. (kafka.server.ReplicaManager)
kafka_1      | [2018-02-07 20:13:22,494] DEBUG [ReplicaManager broker=1001] Request key people-0 unblocked 0 DeleteRecordsRequest. (kafka.server.ReplicaManager)
kafka_1      | [2018-02-07 20:13:22,500] DEBUG [ReplicaManager broker=1001] Produce to local log in 70 ms (kafka.server.ReplicaManager)
kafka_1      | [2018-02-07 20:13:22,523] DEBUG [ReplicaManager broker=1001] Request key people-0 unblocked 0 fetch requests. (kafka.server.ReplicaManager)
kafka_1      | [2018-02-07 20:13:22,524] DEBUG [Partition people-0 broker=1001] High watermark updated to 5 [0 : 877] (kafka.cluster.Partition)
kafka_1      | [2018-02-07 20:13:22,524] DEBUG [ReplicaManager broker=1001] Request key people-0 unblocked 0 fetch requests. (kafka.server.ReplicaManager)
kafka_1      | [2018-02-07 20:13:22,525] DEBUG [ReplicaManager broker=1001] Request key people-0 unblocked 0 producer requests. (kafka.server.ReplicaManager)
kafka_1      | [2018-02-07 20:13:22,525] DEBUG [ReplicaManager broker=1001] Request key people-0 unblocked 0 DeleteRecordsRequest. (kafka.server.ReplicaManager)
kafka_1      | [2018-02-07 20:13:22,525] DEBUG [ReplicaManager broker=1001] Produce to local log in 5 ms (kafka.server.ReplicaManager)
kafka_1      | [2018-02-07 20:13:22,530] DEBUG [SocketServer brokerId=1001] Connection with /172.20.0.1 disconnected (org.apache.kafka.common.network.Selector)
kafka_1      | java.io.EOFException
kafka_1      | 	at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124)
kafka_1      | 	at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93)
kafka_1      | 	at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:231)
kafka_1      | 	at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:192)
kafka_1      | 	at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:528)
kafka_1      | 	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:469)
kafka_1      | 	at org.apache.kafka.common.network.Selector.poll(Selector.java:398)
kafka_1      | 	at kafka.network.Processor.poll(SocketServer.scala:535)
kafka_1      | 	at kafka.network.Processor.run(SocketServer.scala:452)
kafka_1      | 	at java.lang.Thread.run(Thread.java:748)
kafka_1      | [2018-02-07 20:13:22,534] DEBUG [SocketServer brokerId=1001] Connection with /172.20.0.1 disconnected (org.apache.kafka.common.network.Selector)
kafka_1      | java.io.EOFException
kafka_1      | 	at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124)
kafka_1      | 	at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93)
kafka_1      | 	at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:231)
kafka_1      | 	at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:192)
kafka_1      | 	at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:528)
kafka_1      | 	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:469)
kafka_1      | 	at org.apache.kafka.common.network.Selector.poll(Selector.java:398)
kafka_1      | 	at kafka.network.Processor.poll(SocketServer.scala:535)
kafka_1      | 	at kafka.network.Processor.run(SocketServer.scala:452)
kafka_1      | 	at java.lang.Thread.run(Thread.java:748)

Tried taking the headers out of the producer requests with the version set to 11 and it didn't resolve the issue.

@eapache
Copy link
Contributor

eapache commented Feb 7, 2018

the logs only seem to output for updating the high watermark for the topic twice rather than for each message maybe due to batching under the hood?

Ya that's normal, Sarama will collect multiple messages into a single request when possible.

Did all five messages make it in the second (version-specified) case? It looks like they did, since the high watermark gets updated to 5 still. What would be helpful would be broker logs from a case where not all the messages make it (though I don't know how hard that is to produce with a small number of messages?)

@kenschneider18
Copy link
Contributor Author

Actually in the second (version-specified) case they don't make it, which is weird.

@eapache
Copy link
Contributor

eapache commented Feb 7, 2018

That's really really weird, since the high watermark is still getting updated and the value after it ([0 : 877]) which I believe is the raw byte count of the log file, looks sane as well.

@kenschneider18
Copy link
Contributor Author

kenschneider18 commented Feb 7, 2018

One thing to note is if I put a lock around my call to producer.SendMessage() my problem is resolved.

I decided to take a look at the actual topic log to see what's there. Obviously it's a lot of gibberish but from what I can see it looks like its all there. I did two runs, both with the version. The first run I did without a mutex around SendMessage() the second run had it.

The log from the second run are longer, but again, from the fields I can read it everything including the headers appears to be there.

First run (no mutex):

^@^@^@^@^@^@^@^A^@^@^A_^@^@^@^@^BÔe]ö^@^@^@^@^@^@^@^@^Aar*<9c><85>^@^@^Aar*<9c><<
85>ÿÿÿÿÿÿÿÿ^@^@^@^@^@^@^@^@^@^Bª^B^@^@^@Hec356eac-f15d-41e0-972b-ebbfb86d6a93B^LL
Bob #1^T1 Main St.^PSomeTown^DCAÄ<81>^K^D^Tmessage-idHec356eac-f15d-41e0-972b-ebb
bfb86d6a93^Xmessage-type^XCreatePersonª^B^@^@^@H97fbd42c-41ff-43d2-824b-3bcbfc666
3db0B^LBob #1^T1 Main St.^PSomeTown^DCAÄ<81>^K^D^Tmessage-idH97fbd42c-41ff-43d2--
824b-3bcbfc663db0^Xmessage-type^XCreatePerson^@^@^@^@^@^@^@^D^@^@^Aö^@^@^@^@^B¢^^
@¤¥^@^@^@^@^@^@^@^@^Aar*<9c><85>^@^@^Aar*<9c><85>ÿÿÿÿÿÿÿÿ^@^@^@^@^@^@^@^@^@^Cª^BB
^@^@^@H206e6c0e-084d-4092-a5a0-e5132eff1246B^LBob #1^T1 Main St.^PSomeTown^DCAÄ<<
81>^K^D^Tmessage-idH206e6c0e-084d-4092-a5a0-e5132eff1246^Xmessage-type^XCreatePee
rsonª^B^@^@^@H88c64fd3-fbd2-4362-aa8e-5ae9d8e03edfB^LBob #1^T1 Main St.^PSomeToww
n^DCAÄ<81>^K^D^Tmessage-idH88c64fd3-fbd2-4362-aa8e-5ae9d8e03edf^Xmessage-type^XCC
reatePersonª^B^@^@^@H896383b1-4612-429e-b8c7-fa9c795a0fd9B^LBob #1^T1 Main St.^PP
SomeTown^DCAÄ<81>^K^D^Tmessage-idH896383b1-4612-429e-b8c7-fa9c795a0fd9^Xmessage--
type^XCreatePerson

Second run (with mutex):

^@^@^@^@^@^@^@^@^@^@^@È^@^@^@^@^BÛ%þy^@^@^@^@^@^@^@^@^Aar>^Z»^@^@^Aar>^Z»ÿÿÿÿÿÿÿ
ÿ^@^@^@^@^@^@^@^@^@^Aª^B^@^@^@Hfbf82306-b462-477e-9f69-119331e7e604B^LBob #1^T1
Main St.^PSomeTown^DCAÄ<81>^K^D^Tmessage-idHfbf82306-b462-477e-9f69-119331e7e6044
^Xmessage-type^XCreatePerson^@^@^@^@^@^@^@^A^@^@^@È^@^@^@^@^BÅbJ<8a>^@^@^@^@^@^@@
^@^@^Aar>^[¡^@^@^Aar>^[¡ÿÿÿÿÿÿÿÿ^@^@^@^@^@^@^@^@^@^Aª^B^@^@^@Hdbf81222-4ae9-46433
-99e1-4f34dc7a068cB^LBob #1^T1 Main St.^PSomeTown^DCAÄ<81>^K^D^Tmessage-idHdbf811
222-4ae9-4643-99e1-4f34dc7a068c^Xmessage-type^XCreatePerson^@^@^@^@^@^@^@^B^@^@^^
@È^@^@^@^@^BÙøP<9b>^@^@^@^@^@^@^@^@^Aar>^[©^@^@^Aar>^[©ÿÿÿÿÿÿÿÿ^@^@^@^@^@^@^@^@^^
@^Aª^B^@^@^@Hfc32da7a-9db9-48d8-a426-7871bb254eb6B^LBob #1^T1 Main St.^PSomeTownn
^DCAÄ<81>^K^D^Tmessage-idHfc32da7a-9db9-48d8-a426-7871bb254eb6^Xmessage-type^XCrr
eatePerson^@^@^@^@^@^@^@^C^@^@^@È^@^@^@^@^BõͬØ^@^@^@^@^@^@^@^@^Aar>^[®^@^@^Aar>>
^[®ÿÿÿÿÿÿÿÿ^@^@^@^@^@^@^@^@^@^Aª^B^@^@^@Hd61c7084-53a0-4e0f-be7b-7f7067adbbceB^LL
Bob #1^T1 Main St.^PSomeTown^DCAÄ<81>^K^D^Tmessage-idHd61c7084-53a0-4e0f-be7b-7ff
7067adbbce^Xmessage-type^XCreatePerson^@^@^@^@^@^@^@^D^@^@^@È^@^@^@^@^BìjBÌ^@^@^^
@^@^@^@^@^@^Aar>^[²^@^@^Aar>^[²ÿÿÿÿÿÿÿÿ^@^@^@^@^@^@^@^@^@^Aª^B^@^@^@H4703bc57-1bb
fc-433e-8c24-87c8dd60f109B^LBob #1^T1 Main St.^PSomeTown^DCAÄ<81>^K^D^Tmessage-ii
dH4703bc57-1bfc-433e-8c24-87c8dd60f109^Xmessage-type^XCreatePerson

EDIT: Another thing worth noting, I made the value for each message the same so all the messages should have Bob #1 at 1 Main St.

@kenschneider18
Copy link
Contributor Author

Tried running with the code from #1038 and no dice.

@superfell
Copy link

I'm seeing the same issue
Sarama: 541689b
Kafka: 2-11-1.0.0
Go: go version go1.9.2 darwin/amd64

kconfig.Version = sarama.V0_11_0_0
kconfig.Producer.RequiredAcks = sarama.WaitForAll
kconfig.Producer.Retry.Max = 10                   
kconfig.Producer.Return.Successes = true
kconfig.Consumer.Return.Errors = true
kconfig.Consumer.Fetch.Min = 1
kconfig.Consumer.Fetch.Default = 10 * 1024 * 1024
kconfig.Consumer.Fetch.Max = 25 * 1024 * 1024

Kafka settings [defaults except]
message.max.bytes=10485760
delete.topic.enable=true
log.retention.hours=87600

Writing messages using the Sync Producer.

Adding log.message.format.version=0.10.2 to the kafka config makes the problem go away.

@kenschneider18
Copy link
Contributor Author

@superfell Unfortunately that only seems to work if you're not sending record headers. If you try to send headers an error is returned to sarama and the broker spits out this error:

kafka_1      | java.lang.IllegalArgumentException: Magic v1 does not support record headers
kafka_1      | 	at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)

Still an interesting observation though.

@superfell
Copy link

I put together a repro case that spins up a number of goroutines calling a producer, writes a known set of data, consumes that range and compares the results of what the producer reported with what the consumer saw. with a producer concurrency of 1 looks ok, as you ramp up concurrency messages seem to end up with different offsets, the consumer see's holes in the offsets.

code and some example runs at
https://gist.github.com/superfell/cff892d075b7585e46b5f1ec40206dcf

@superfell
Copy link

I ran with the go race detector which didn't report anything. then I looked at the actual segments on the broker,

Here's the LogSegment dump from a good run

$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --files /tmp/kafka-logs/t1-0/00000000000000000000.log
Dumping /tmp/kafka-logs/t1-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 0 CreateTime: 1518114710642 isvalid: true size: 69 magic: 2 compresscodec: NONE crc: 3449178714
baseOffset: 1 lastOffset: 1 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 69 CreateTime: 1518114710650 isvalid: true size: 69 magic: 2 compresscodec: NONE crc: 801140006
baseOffset: 2 lastOffset: 2 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 138 CreateTime: 1518114710655 isvalid: true size: 69 magic: 2 compresscodec: NONE crc: 2664670657
baseOffset: 3 lastOffset: 3 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 207 CreateTime: 1518114710655 isvalid: true size: 69 magic: 2 compresscodec: NONE crc: 2373008822
baseOffset: 4 lastOffset: 4 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 276 CreateTime: 1518114710655 isvalid: true size: 69 magic: 2 compresscodec: NONE crc: 4145870067
baseOffset: 5 lastOffset: 5 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 345 CreateTime: 1518114710656 isvalid: true size: 69 magic: 2 compresscodec: NONE crc: 4280314447
baseOffset: 6 lastOffset: 6 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 414 CreateTime: 1518114710656 isvalid: true size: 69 magic: 2 compresscodec: NONE crc: 3418866390
baseOffset: 7 lastOffset: 7 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 483 CreateTime: 1518114710656 isvalid: true size: 69 magic: 2 compresscodec: NONE crc: 3630520993
baseOffset: 8 lastOffset: 8 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 552 CreateTime: 1518114710656 isvalid: true size: 69 magic: 2 compresscodec: NONE crc: 1058820188
baseOffset: 9 lastOffset: 9 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 621 CreateTime: 1518114710657 isvalid: true size: 69 magic: 2 compresscodec: NONE crc: 1397148840
baseOffset: 10 lastOffset: 10 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 690 CreateTime: 1518114710657 isvalid: true size: 70 magic: 2 compresscodec: NONE crc: 952156122
baseOffset: 11 lastOffset: 11 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 760 CreateTime: 1518114710657 isvalid: true size: 70 magic: 2 compresscodec: NONE crc: 727852973

And one from a bad run, perhaps the encoding for multiple records has an issue somewhere, notice the difference in sizes.

$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --files /tmp/kafka-logs/t2-0/00000000000000000000.log
Dumping /tmp/kafka-logs/t2-0/00000000000000000000.log
Starting offset: 0
baseOffset: 1 lastOffset: 1 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 0 CreateTime: 1518114733011 isvalid: true size: 77 magic: 2 compresscodec: NONE crc: 1667525040
baseOffset: 7 lastOffset: 7 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 77 CreateTime: 1518114733011 isvalid: true size: 109 magic: 2 compresscodec: NONE crc: 1401372392
baseOffset: 10 lastOffset: 10 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 186 CreateTime: 1518114733018 isvalid: true size: 86 magic: 2 compresscodec: NONE crc: 2500353729
baseOffset: 15 lastOffset: 15 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 272 CreateTime: 1518114733019 isvalid: true size: 106 magic: 2 compresscodec: NONE crc: 3164609018
baseOffset: 19 lastOffset: 19 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 378 CreateTime: 1518114733019 isvalid: true size: 97 magic: 2 compresscodec: NONE crc: 1461695846
baseOffset: 24 lastOffset: 24 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 475 CreateTime: 1518114733019 isvalid: true size: 106 magic: 2 compresscodec: NONE crc: 3712113495
baseOffset: 27 lastOffset: 27 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 581 CreateTime: 1518114733019 isvalid: true size: 88 magic: 2 compresscodec: NONE crc: 3006626779
baseOffset: 33 lastOffset: 33 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 669 CreateTime: 1518114733020 isvalid: true size: 115 magic: 2 compresscodec: NONE crc: 2178903687
baseOffset: 35 lastOffset: 35 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 784 CreateTime: 1518114733020 isvalid: true size: 79 magic: 2 compresscodec: NONE crc: 1571663991
baseOffset: 41 lastOffset: 41 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 863 CreateTime: 1518114733020 isvalid: true size: 115 magic: 2 compresscodec: NONE crc: 73640343
baseOffset: 43 lastOffset: 43 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 978 CreateTime: 1518114733020 isvalid: true size: 79 magic: 2 compresscodec: NONE crc: 2882994729
baseOffset: 49 lastOffset: 49 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 1057 CreateTime: 1518114733020 isvalid: true size: 115 magic: 2 compresscodec: NONE crc: 1096029845

@eapache
Copy link
Contributor

eapache commented Feb 8, 2018

Also the fact that the offsets are no longer consecutive. Unless this is only dumping record-batches and not records? But that would explain the size differences too, sort-of.

@superfell
Copy link

I was able to capture this produce request message with wireshark (which unfortuantly doesn't seem to update the v3 API format), am still picking it apart.

0000   00 00 00 7c 00 00 00 03 00 00 00 01 00 06 73 61   ...|..........sa
0010   72 61 6d 61 ff ff ff ff 00 00 27 10 00 00 00 01   ramaÿÿÿÿ..'.....
0020   00 05 74 65 73 74 32 00 00 00 01 00 00 00 00 00   ..test2.........
0030   00 00 4d 00 00 00 00 00 00 00 00 00 00 00 41 00   ..M...........A.
0040   00 00 00 02 52 c2 68 a4 00 00 00 00 00 00 00 00   ....RÂh¤........
0050   01 61 76 86 25 35 ff ff ff ff ff ff ff ff ff ff   .av.%5ÿÿÿÿÿÿÿÿÿÿ
0060   ff ff ff ff ff ff 00 00 00 00 00 00 00 00 00 02   ÿÿÿÿÿÿ..........
0070   0e 00 00 00 01 02 30 00 0e 00 00 02 01 02 31 00   ......0.......1.

@superfell
Copy link

I decoded the produce message by hand, the only thing that looks possibly suspect to me is that Last Offset Delta isn't set to anything.

	00 00 00 7c 			Length
	00 00 				Produce
 	00 03 				API Version
 	00 00 00 01 			CorelationID
	00 06 
	73 61 72 61 6d 61 		ClientID 'sarama'  
	ff ff					Null Transaction ID
	ff ff					Acks [-1 wait for all]
	00 00 27 10			Timeout
	00 00 00 01			Topic Data : 1
	00 05
	74 65 73 74 32		Topic: test2
	00 00 00 01			Topic Data: 1
	00 00 00 00			Partition: 0
	
	00 00 00 4d 			MessageSet Size
	00 00 00 00 00 00 00 00		FirstOffset 
	00 00 00 41					Length
	00 00 00 00					Partition Leader Epoch
	02							Magic
	52 c2 68 a4					CRC
	00 00						Attributes
	00 00 00 00					Last Offset Delta
	00 00 01 61 76 86 25 35		First Timestamp
	ff ff ff ff ff ff ff ff				Last Timestamp
	ff ff ff ff ff ff ff ff				Producer ID
	00 00						Producer Epoch
	00 00 00 00					First Sequence
	
	00 00 00 02					Records: 2
	
	Record
        0e 		Length				7
	00		Attributes
	00		Timestamp Delta
	00		Offset Delta
	01		Key Len				-1 No key
	02		Value Len			1
	30 							Value: '0'
	00 		Header Count			0
	
	0e		Length				7
	00 		Attributes
	00		Timestamp Delta
	02 		Offset Delta			1
	01 		Key Len				-1 No Key
	02 		Value Len			1
	31 							Value: '1'
	00   	Header Count				0

@superfell
Copy link

I couldn't see anywhere that was setting LastOffsetDelta, I added a change to produceSet::buildRequest to set it at the same time its assigning offsetDelta's to records and that resolved the issue in my test app.

@kenschneider18
Copy link
Contributor Author

kenschneider18 commented Feb 9, 2018

So after looking into the code and the kafka protocol changes for 0.11 a bit and I understand now that they were pretty significant. I decided to test out my sample program and remove the concurrency but use the producer.SendMessages() method. With only 5 messages in the slice I pass in, I see the same issue. I think it's something in the batching code.

@kenschneider18
Copy link
Contributor Author

kenschneider18 commented Feb 9, 2018

Looks like @superfell found it, still need to test it myself.

@buyology
Copy link

buyology commented Feb 9, 2018

I think @superfell is correct, looking at my WIP-transaction code I needed to add the LastOffsetDelta to get the correct behaviour in my tests.

Also from the librdkafka source

@pkedy
Copy link
Contributor

pkedy commented Feb 9, 2018

I tried the tweak below and it seems to be working. The question is whether or not its a sane fix.

In record_batch.go in RecordBatch::encode I commented out the putInt32 for LastOffsetDelta (which @superfell pointed out was not being computed) and use the number of records to calculate it. I also added setting OffsetDelta on each record (which were all 0s too).

	//pe.putInt32(b.LastOffsetDelta)

	pe.putInt32(int32(len(b.Records) - 1)) // Is it possible to get here with empty b.Records?
	for i, r := range b.Records {
		r.OffsetDelta = int64(i)
	}

If these offsets can be computed on the fly in the encode methods, are the RecordBatch.LastOffsetDelta and Record.OffsetDelta fields necessary? I only just started looking at this problem and learning the code.

@pkedy
Copy link
Contributor

pkedy commented Feb 9, 2018

Scratch that thought - I looked at the impact of removing those fields. I like @superfell 's approach of adding the fix in produceSet::buildRequest better.

if req.Version >= 3 {
	rb := set.recordsToSend.recordBatch
	rb.LastOffsetDelta = int32(len(rb.Records) - 1) // Is it possible to get here with empty b.Records?
	for i, record := range rb.Records {
		record.OffsetDelta = int64(i)
	}

	req.AddBatch(topic, partition, rb)
	continue
}

@kenschneider18
Copy link
Contributor Author

@pkedy 's solution works in my code and I agree that it should be set in buildRequest.

@kenschneider18
Copy link
Contributor Author

kenschneider18 commented Feb 9, 2018

As the code exists in master right now, there will also need to be a change to RecordBatch::encode. It was changed at some point to use this for LastOffsetDelta:

pe.putInt32(int32(len(b.Records)))

https://github.com/Shopify/sarama/blob/master/record_batch.go#L67

It will need to be changed back to this for the change in produce_set::buildRequest to be picked up:

pe.putInt32(b.LastOffsetDelta)

@eapache
Copy link
Contributor

eapache commented Feb 9, 2018

cc @bobrik since you changed LastOffsetDelta originally. It sounds like this error existed before that change though.

Actually, I assume somebody has already reproduced this issue with the code currently in master (including Ivan's tweak to that line) but if not then maybe that already fixed it? It looks like it would be encoding the same value as if you set it in buildRequest and then encoded LastOffsetDelta directly again?

@pkedy
Copy link
Contributor

pkedy commented Feb 9, 2018

@eapache - @kenschneider18 and I suspect the LastOffsetDelta is off by 1 considering the C implementation.

https://github.com/edenhill/librdkafka/blob/8fed971043e642c195860cd77006622d23616c22/src/rdkafka_msgset_writer.c#L1098-L1100

I'm about to submit a PR assuming this is correct.

@kenschneider18
Copy link
Contributor Author

@eapache I can confirm that the issue still existed in master. The key is the use of len(b.records) - 1 rather than len(b.records) for LastOffsetDelta.

@bobrik
Copy link
Contributor

bobrik commented Feb 9, 2018

Change LGTM. I don't see how this triggers missing messages, though.

@pkedy
Copy link
Contributor

pkedy commented Feb 9, 2018

@bobrik - Good point. @kenschneider18 and I were not on the latest master. Along the journey of discovering the issue for ourselves, we noticed the off by 1. After the PR is merged, we can close this ticket.

@eapache
Copy link
Contributor

eapache commented Feb 9, 2018

Thank you all for your hard work digging into this and finding a solution! I'll let it soak over the weekend, and likely push a new release with Phil's fix on Monday.

@horkhe
Copy link
Contributor

horkhe commented Feb 12, 2018

Just for the record, this was the root cause of the issue I was trying to fix with #1037

@superfell
Copy link

superfell commented Feb 12, 2018

I also logged a bug in Kafka as ideally the broker shouldn't accept the bad request to start with
https://issues.apache.org/jira/browse/KAFKA-6554

eapache added a commit that referenced this issue Feb 16, 2018
…nges

Document recordbatch offset changes from #1032
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

9 participants