Skip to content

Commit

Permalink
Rejig the client
Browse files Browse the repository at this point in the history
  • Loading branch information
Mongey committed Jun 3, 2018
1 parent f86ee19 commit 9c529ac
Showing 1 changed file with 36 additions and 27 deletions.
63 changes: 36 additions & 27 deletions kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ type TopicMissingError struct {
func (e TopicMissingError) Error() string { return e.msg }

type Client struct {
client sarama.Client
config *Config
client sarama.Client
kafkaConfig *sarama.Config
config *Config
}

type Config struct {
Expand All @@ -33,17 +34,24 @@ func NewClient(config *Config) (*Client, error) {
if bootstrapServers == nil {
return nil, fmt.Errorf("No bootstrap_servers provided")
}
kafkaConfig := newKafkaConfig()
c, err := sarama.NewClient(bootstrapServers, kafkaConfig)

kc, err := config.newKafkaConfig()
if err != nil {
log.Println("[ERROR] Error creating kafka client")
return nil, err
}

c, err := sarama.NewClient(bootstrapServers, kc)
if err != nil {
fmt.Println("Error connecting to kafka")
log.Println("[ERROR] Error connecting to kafka")
return nil, err
}

return &Client{
client: c,
config: config,
}, kafkaConfig.Validate()
client: c,
config: config,
kafkaConfig: kc,
}, kc.Validate()
}

func (c *Client) DeleteTopic(t string) error {
Expand Down Expand Up @@ -173,23 +181,6 @@ func (c *Client) AddPartitions(t Topic) error {
return err
}

func (c *Client) availableBroker() (*sarama.Broker, error) {
var err error
brokers := *c.config.BootstrapServers

log.Printf("[DEBUG] Looking for Brokers @ %v", brokers)
for _, b := range brokers {
broker := sarama.NewBroker(b)
err = broker.Open(newKafkaConfig())
if err == nil {
return broker, nil
}
log.Printf("[WARN] Broker @ %s cannot be reached\n", b)
}

return nil, fmt.Errorf("No Available Brokers @ %v", brokers)
}

func (client *Client) ReadTopic(name string) (Topic, error) {
c := client.client

Expand Down Expand Up @@ -268,8 +259,26 @@ func (c *Client) topicConfig(topic string) (map[string]*string, error) {
return conf, nil
}

func newKafkaConfig() *sarama.Config {
func (c *Client) availableBroker() (*sarama.Broker, error) {
var err error
brokers := *c.config.BootstrapServers
kc := c.kafkaConfig

log.Printf("[DEBUG] Looking for Brokers @ %v", brokers)
for _, b := range brokers {
broker := sarama.NewBroker(b)
err = broker.Open(kc)
if err == nil {
return broker, nil
}
log.Printf("[WARN] Broker @ %s cannot be reached\n", b)
}

return nil, fmt.Errorf("No Available Brokers @ %v", brokers)
}

func (c *Config) newKafkaConfig() (*sarama.Config, error) {
kafkaConfig := sarama.NewConfig()
kafkaConfig.Version = sarama.V1_0_0_0
return kafkaConfig
return kafkaConfig, nil
}

0 comments on commit 9c529ac

Please sign in to comment.