Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

msg.Timestamp is null in a consumer #750

Closed
AlbinOS opened this issue Sep 27, 2016 · 5 comments
Closed

msg.Timestamp is null in a consumer #750

AlbinOS opened this issue Sep 27, 2016 · 5 comments

Comments

@AlbinOS
Copy link

AlbinOS commented Sep 27, 2016

Versions

Sarama Version: master
Kafka Version: 0.10.0.1
Go Version: 1.7

Configuration

Default for Sarama and Kafka

Logs

[sarama] 2016/09/27 21:09:53 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[sarama] 2016/09/27 21:09:53 Connected to broker at kafka.castle:9092 (registered as #1001)
[sarama] 2016/09/27 21:09:53 consumer/broker/1001 added subscription to raw_status/0
2016/09/27 21:09:53 Consumed message time:topic:partition:offset:message 0001-01-01 00:00:00 +0000 UTC:raw_status:0:0:[123 34 116 105 109 101 115 116 97 109 112 34 58 32 49 ...

Problem Description

Why is msg.Timestamp set to null on my consumed message ? Does I need to activate this somewhere ? In Sarama configuration or in Kafka configuration ?

I tried sending message via the http_server.go provided in the example directory as well as via a python producer where I manually set the timestamp in the send method.

Any clue ?

Thanks a lot,
Albin.

PS - Here is my consumer, directly copy pasted from your examples :

package main

import (
    "log"
    "os"
    "os/signal"

    "github.com/Shopify/sarama"
)

func main() {
    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
    if err != nil {
        panic(err)
    }
    sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)

    defer func() {
        if localErr := consumer.Close(); localErr != nil {
            log.Fatalln(localErr)
        }
    }()

    partitionConsumer, err := consumer.ConsumePartition("raw_status", 0, sarama.OffsetOldest)
    if err != nil {
        panic(err)
    }

    defer func() {
        if err := partitionConsumer.Close(); err != nil {
            log.Fatalln(err)
        }
    }()

    // Trap SIGINT to trigger a shutdown.
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    consumed := 0
ConsumerLoop:
    for {
        select {
        case msg := <-partitionConsumer.Messages():
            log.Printf("Consumed message time:topic:partition:offset:message %s:%s:%d:%d:%v:%d\n", msg.Timestamp, msg.Topic, msg.Partition, msg.Offset, msg.Value, len(msg.Value))
            consumed++
        case <-signals:
            break ConsumerLoop
        }
    }

    log.Printf("Consumed: %d\n", consumed)
}
@eapache
Copy link
Contributor

eapache commented Oct 1, 2016

Off the top of my head you need to create a config struct (NewConfig()) and set the Version value appropriately to tell Sarama it is safe to use the more recent API versions, then pass this configuration into the NewConsumer call.

@eapache eapache closed this as completed Oct 1, 2016
@AlbinOS
Copy link
Author

AlbinOS commented Oct 1, 2016

Here it goes : 2016-09-30 10:41:17.909 +0200 CEST !

Thanks a lot @eapache, and thanks as well for the awesome work on this client. Have a nice weekend !

@AlbinOS
Copy link
Author

AlbinOS commented Oct 4, 2016

Hey @eapache ,

After my successful test with a Python producer, I tried to feed my consumer using Sarama async producer but the timestamp is not set.

I've retried my consumer with the Python producer to be 100% certain that the issue comes from the producer and I still have the Timestam set in the message received in the consumer.

Any idea why my timestamp is not set ? I've also trie to set the Timestamp manually via time.Now() but then Kafka emit errors like this :

[2016-10-04 13:38:29,113] ERROR [Replica Manager on Broker 1001]: Error processing append operation on partition topic-8 (kafka.server.ReplicaManager)
java.lang.IllegalStateException: Compressed message has magic value 0 but inner message has magic value 1
    at kafka.message.ByteBufferMessageSet$$anon$1.readMessageFromStream(ByteBufferMessageSet.scala:143)
    at kafka.message.ByteBufferMessageSet$$anon$1.liftedTree2$1(ByteBufferMessageSet.scala:111)
    ...
    at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:405)
    at kafka.server.KafkaApis.handle(KafkaApis.scala:76)
    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
    at java.lang.Thread.run(Thread.java:745)

Here is my complete producer code :

package scheduler

import (
    "time"

    "github.com/Shopify/sarama"
    redis "gopkg.in/redis.v4"

    "strings"

    log "github.com/Sirupsen/logrus"
)

func Schedule(brokerList []string, redisHost string, topic string) {

    // Configure sarama logger
    sarama.Logger = log.New()

    // Create new producer
    producer, err := newAsyncProducer(brokerList)
    if err != nil {
        panic(err)
    }

    defer func() {
        if err = producer.Close(); err != nil {
            log.Fatal("Failed to close producer:", err)
        }
    }()

    // Init Redis connection
    client := redis.NewClient(&redis.Options{
        Addr:     redisHost,
        Password: "", // no password set
        DB:       0,  // use default DB
    })

    // Get all the things
    things, err := client.Keys("keys:*").Result()
    if err != nil {
        panic(err)
    }

    // Send them all
    for _, thing := range things {
        // We are not setting a message key, which means that all messages will
        // be distributed randomly over the different partitions.
        producer.Input() <- &sarama.ProducerMessage{
            Topic: topic,
            Value: sarama.StringEncoder(thing),
        }
        log.Infof("Send %s on %s", thing, topic)
    }
}

// newAsyncProducer create a new asynchronous Kafka producer
func newAsyncProducer(brokerList []string) (sarama.AsyncProducer, error) {

    // For the access log, we are looking for AP semantics, with high throughput.
    // By creating batches of compressed messages, we reduce network I/O at a cost of more latency.
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForLocal       // Only wait for the leader to ack
    config.Producer.Compression = sarama.CompressionSnappy   // Compress messages
    config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
    config.Version = sarama.V0_10_0_0

    producer, err := sarama.NewAsyncProducer(brokerList, config)
    if err != nil {
        return nil, err
    }

    // We will just log to STDOUT if we're not able to produce messages.
    // Note: messages will only be returned here after all retry attempts are exhausted.
    go func() {
        for err := range producer.Errors() {
            log.Error("Failed to write entry:", err)
        }
    }()

    return producer, nil
}

Thanks a lot for your support !

@eapache
Copy link
Contributor

eapache commented Oct 4, 2016

I've also tried to set the Timestamp manually via time.Now()

This is the correct method. We should probably add support for auto-setting the timestamp to time.Now but we leave it blank by default for now.

java.lang.IllegalStateException: Compressed message has magic value 0 but inner message has magic value 1

I think this is a Sarama bug when using both timestamps and compression. Can you try with CompressionNone? If that works, please file a separate issue and I'll take a look at fixing it.

@AlbinOS
Copy link
Author

AlbinOS commented Oct 4, 2016

It works, time:topic:partition:offset:message 2016-10-04 20:30:39.092 +0200 CEST, opening an other issue !

Thanks,
Albin.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants