package main import ( "github.com/Shopify/sarama" "github.com/golang/glog" ) func main() { kafkaVersion, err := sarama.ParseKafkaVersion("1.0.0") if err != nil { glog.Errorf("Fail to parse kafkaVersion %v", err) } // init (custom) config, enable errors and notifications glog.Infof("Kafka consumer version is %v", kafkaVersion) config := sarama.NewConfig() config.Consumer.Offsets.Initial = sarama.OffsetOldest config.Version = kafkaVersion brokers := []string{"localhost:9092"} master, err := sarama.NewConsumer(brokers, config) if err != nil { glog.Errorf("Fail to create master %v", err) } // read message from offset 0 consumer, err := master.ConsumePartition("send", 0, 0) if err != nil { glog.Errorf("Fail to create consumer %v", err) } for i := 0; i < 10; i++ { select { case msg := <-consumer.Messages(): glog.Infof("Message topic: %s, partition: %d, offset: %d, key: %s, value: %s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, string(msg.Value)) glog.Infof("Consumer message header size %d\n", len(msg.Headers)) for _, header := range msg.Headers { if string(header.Key) != "" { glog.Infof("consumer message header key %s, value %s\n", string(header.Key), string(header.Value)) } } } } }