Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Struggling to Implement producer in Application #800

Closed
Menster317 opened this issue Dec 19, 2016 · 4 comments
Closed

Struggling to Implement producer in Application #800

Menster317 opened this issue Dec 19, 2016 · 4 comments

Comments

@Menster317
Copy link

Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.
Sarama Version: 1.10.1 (2016-08-30)
Kafka Version: 0.10.1.0
Go Version: 1.6.2 linux/amd64

Configuration

What configuration values are you using for Sarama and Kafka?
Kafka: Vanilla Apache Vagrant cluster - 1 ZK, 3 brokers, initialised using the Vagrant stuff included with the Kafka Source download
Sarama: See code below

Logs

With sarama.Logger enable, my server outputs the following logs:
[sarama] 2016/12/19 14:43:07 Initializing new client
[sarama] 2016/12/19 14:43:07 client/metadata fetching metadata for all topics from broker broker1:9092
[sarama] 2016/12/19 14:43:07 Connected to broker at broker1:9092 (unregistered)
[sarama] 2016/12/19 14:43:07 client/brokers registered new broker #2 at broker2:9092
[sarama] 2016/12/19 14:43:07 client/brokers registered new broker #1 at broker1:9092
[sarama] 2016/12/19 14:43:07 client/brokers registered new broker #3 at broker3:9092
[sarama] 2016/12/19 14:43:07 Successfully initialized new client
[sarama] 2016/12/19 14:43:07 producer/broker/1 starting up
[sarama] 2016/12/19 14:43:07 producer/broker/1 state change to [open] on myTopic/0
[sarama] 2016/12/19 14:43:07 Connected to broker at broker1:9092 (registered as #1)

When I first instantiate the AsyncProducer - no further logs are printed on subsequent calls to the AsyncProducer

Problem Description

I'm attempting to put together a singleton that encapsulates the sarama AsyncProducer, so that I can achieve the following:

  • Publish messages to my Kafka cluster from anywhere in my application (rather than just having it weaved in to publish Access logs or similar) as I will be publishing to multiple topics depending on when & where in the application I use it.
  • The above, without the overhead of re-creating a Producer everytime I want to publish a message.

I've managed to get my application to publish test messages to the cluster by sticking the following snippet in my "main.go" file:

package main

import (
	"NewProfilePackages/routers"
	"log"
	"net/http"
        sarama "github.com/Shopify/sarama"
        "fmt"
)

func main() {
	router := router.NewRouter()
        
        
        /// ---- From here -----
        config := sarama.NewConfig()
        config.ClientID = "NewProfile"
        
        log.Println("Producer Object Requested for the first time")
        
        brokers := []string{"broker1:9092"}
        
        config.Producer.Flush.Messages = 10
        config.Producer.Return.Successes = true
        config.Producer.Retry.Max = 3
        config.Producer.Retry.Backoff = 0
        log.Println("Producer config defined")
        producer, err := sarama.NewAsyncProducer(brokers, config)
        if err != nil {
           panic(err)          
        }
        for i := 0; i < 10; i++ {
            producer.Input() <- &sarama.ProducerMessage{Topic: "myTopic", Value: sarama.StringEncoder(fmt.Sprintf("A%d", i))}
        }
        log.Println("Producer Test Completed")
        /// ---- To here -----
    
	log.Fatal(http.ListenAndServe(":8080", router))
}

But, when I refactor the relevant code out into a singleton as follows:

package kafka_producer

import (
    sarama "github.com/Shopify/sarama"
    "sync"
    "log"
    "os"
    //"fmt"
)

var (
	producer    sarama.AsyncProducer
	once sync.Once
)
/*
* Zookeeper will be running on 192.168.50.11 (and `zk1` if you used enable_dns)
* Broker 1 on 192.168.50.51 (and `broker1` if you used enable_dns)
* Broker 2 on 192.168.50.52 (and `broker2` if you used enable_dns)
* Broker 3 on 192.168.50.53 (and `broker3` if you used enable_dns)
*/


func MyProducer() sarama.AsyncProducer {
        log.Println("Producer Object Requested")
	once.Do(func() {
                sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)

                config := sarama.NewConfig()
                config.ClientID = "NewProfile"
                
                log.Println("Producer Object Requested for the first time")
                
		brokers := []string{"broker1:9092"}
 		var err error
                config.Producer.Flush.Messages = 10
                config.Producer.Return.Successes = true
                config.Producer.Retry.Max = 3
                config.Producer.Retry.Backoff = 0
                log.Println("Producer config defined")
                producer, err = sarama.NewAsyncProducer(brokers, config)
                if err != nil {
                        panic(err)
                }
                log.Println("Producer initialised, logging start up message")
                producer.Input() <- &sarama.ProducerMessage{Topic: "myTopic", Value: sarama.StringEncoder("NewProfile service initiated Kafka Producer")}
        })	
	return producer
}

func ProduceMessageForKafka(topic string, message string) {
    log.Println("Producing Message for Kafka")     
    MyProducer().Input() <- &sarama.ProducerMessage{Topic: topic, Value: sarama.StringEncoder(message)}
    log.Println("Message for Kafka Successfully produced")
}

I can not get the messages to publish at all, although the logging on either side of the "Input()" statements does get printed out by my server.

From what I can tell, the "producer" object is getting setup correctly, and the reference returned on later calls does look correct. Do you know if there is something about the Sarama library that would prevent me from doing what I'm trying to do, or is there something wrong with my implementation? I'm fairly new to Go, so I'm guessing it's the latter, but I've been having very little luck so far, so I figured it would be worth trying to pick your brains :)

Thanks for any and all response.

@eapache
Copy link
Contributor

eapache commented Dec 19, 2016

The code in question looks correct, from what I can see of it. Is there really nothing being logged by Sarama at all after startup? Is there anything being returned on the Errors channel?

I do note that you set Return.Successes to true but didn't include code to read from that channel, I assume it's running elsewhere?

@Menster317
Copy link
Author

Menster317 commented Dec 20, 2016

Hi eapache,

Thank you for the response. I'm fairly sure now that my error has something to do with mis-managing those channels. I've amended my code to attempt to select any data out of either the Successes or Errors channel after pushing the message into the Input channel, but nothing seems to be coming back on those channels:

`
func MyProducer() sarama.AsyncProducer {

    log.Println("Producer Object Requested")
once.Do(func() {
            sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
            config := sarama.NewConfig()
            config.ClientID = "NewProfile"                
            log.Println("Producer Object Requested for the first time")                
	brokers := []string{"broker1:9092"}
	var err error
            config.Producer.Flush.Messages = 10
            config.Producer.Return.Successes = true
            config.Producer.Retry.Max = 3
            config.Producer.Retry.Backoff = 0
            log.Println("Producer config defined")
            producer, err = sarama.NewAsyncProducer(brokers, config)
            if err != nil {
                    panic(err)
            }
            log.Println("Producer initialised, logging start up message")
            producer.Input() <- &sarama.ProducerMessage{Topic: "myTopic", Value: sarama.StringEncoder("NewProfile service initiated Kafka Producer")}
            
            log.Println("Attempting to read any successes -or- errors:")
            select {
              case err_msg := <-producer.Errors():
                      log.Println("Received from Error channel: ", err_msg)
              case success_msg := <-producer.Successes():
                      log.Println("Received from Success channel: ", success_msg)
              default:
                  log.Println("Not much going on, eh.")                }
            log.Println("Successes read & logged, finishing up initialisation")
    })
return producer

}`

Without the "default" case, the code simply hangs as nothing ever seems to come through the channels, and, whether I have the default case there or not, nothing ever seems to get published into my Kafka topic.

Either way, I'm really appreciative of the advice so far, but it seems that the issue definitely lies with my implementation, rather than the library. So I'm more than happy for you to close this issue & I'll take this over to stack overflow.

EDIT: Not sure why the code snippet is only being marked up after the first line :|

@Menster317
Copy link
Author

Hi eapache,

After a lot of slogging away and doing crazy things like attempting to use GDB to inspect the contents of channels while the application was running. I finally realised that the reason that this is not working the way I had it in my brain, was down to the following line:

config.Producer.Flush.Messages = 10

Which means that I should only be expecting messages to be published to my cluster after attempting to publish at least 10, and furthermore, I would only be able to expect anything in the Errors() and Successes() channels after any messages actually get published, which is why my code would hangwithout the "default" case.

@eapache
Copy link
Contributor

eapache commented Dec 20, 2016

Oh, yes, it wasn't obvious from the snippets you posted how many times you were producing messages, so I had just assumed lots, but that would do it :)

If you're setting a batch size it's usually a good idea to also set a time limit (config.Producer.Flush. Frequency) as well so messages don't sit in the queue forever.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants