Skip to content

Commit

Permalink
[FAB-1164] Create broker connection to partion leader
Browse files Browse the repository at this point in the history
Subtask of FAB-890

Changed newBroker to:
  - bootstrap from any of the bootstrap servers specifed.
  - return the leader for the topic/partition specified by
    config.Kafka.Topic and config.Kafka.PartitionID

Added TestNewBrokerReturnsPartitionLeader unit test.

Change-Id: Ib8b5de11c6822307aef6c127c5bd7074e18329ab
Signed-off-by: Luis Sanchez <[email protected]>
  • Loading branch information
Luis Sanchez committed Nov 29, 2016
1 parent af0cd3e commit 4084688
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 2 deletions.
53 changes: 51 additions & 2 deletions orderer/kafka/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,59 @@ type brokerImpl struct {
}

func newBroker(conf *config.TopLevel) Broker {
broker := sarama.NewBroker(conf.Kafka.Brokers[0])

// connect to one of the bootstrap servers
var bootstrapServer *sarama.Broker
for _, hostPort := range conf.Kafka.Brokers {
broker := sarama.NewBroker(hostPort)
if err := broker.Open(nil); err != nil {
logger.Warningf("Failed to connect to bootstrap server at %s: %v.", hostPort, err)
continue
}
if connected, err := broker.Connected(); !connected {
logger.Warningf("Failed to connect to bootstrap server at %s: %v.", hostPort, err)
continue
}
bootstrapServer = broker
break
}
if bootstrapServer == nil {
panic(fmt.Errorf("Failed to connect to any of the bootstrap servers (%v) for metadata request.", conf.Kafka.Brokers))
}
logger.Debugf("Connected to bootstrap server at %s.", bootstrapServer.Addr())

// get metadata for topic
topic := conf.Kafka.Topic
metadata, err := bootstrapServer.GetMetadata(&sarama.MetadataRequest{Topics: []string{topic}})
if err != nil {
panic(fmt.Errorf("GetMetadata failed for topic %s: %v", topic, err))
}

// get leader broker for given topic/partition
var broker *sarama.Broker
partitionID := conf.Kafka.PartitionID
if (partitionID >= 0) && (partitionID < int32(len(metadata.Topics[0].Partitions))) {
leader := metadata.Topics[0].Partitions[partitionID].Leader
logger.Debugf("Leading broker for topic %s/partition %d is broker ID %d", topic, partitionID, leader)
for _, b := range metadata.Brokers {
if b.ID() == leader {
broker = b
break
}
}
}
if broker == nil {
panic(fmt.Errorf("Can't find leader for topic %s/partition %d", topic, partitionID))
}

// connect to broker
if err := broker.Open(nil); err != nil {
panic(fmt.Errorf("Failed to create Kafka broker: %v", err))
panic(fmt.Errorf("Failed to open Kafka broker: %v", err))
}
if connected, err := broker.Connected(); !connected {
panic(fmt.Errorf("Failed to open Kafka broker: %v", err))
}

return &brokerImpl{
broker: broker,
config: conf,
Expand Down
58 changes: 58 additions & 0 deletions orderer/kafka/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,61 @@ func testBrokerGetOffsetFunc(given, expected int64) func(t *testing.T) {
}
}
}

func TestNewBrokerReturnsPartitionLeader(t *testing.T) {

// sarama.Logger = log.New(os.Stdout, "[sarama] ", log.Lshortfile)
// SetLogLevel("debug")

broker1 := sarama.NewMockBroker(t, 1001)
broker2 := sarama.NewMockBroker(t, 1002)
broker3 := sarama.NewMockBroker(t, 1003)

// shutdown broker1
broker1.Close()

// update list of bootstrap brokers in config
originalKafkaBrokers := testConf.Kafka.Brokers
defer func() {
testConf.Kafka.Brokers = originalKafkaBrokers
}()
// add broker1, and broker2 to list of bootstrap brokers
// broker1 is 'down'
// broker3 will be discovered via a metadata request
testConf.Kafka.Brokers = []string{broker1.Addr(), broker2.Addr()}

// handy references
topic := testConf.Kafka.Topic
partition := testConf.Kafka.PartitionID

// add expectation that broker2 will return a metadata response that
// identifies broker3 as the topic partition leader
broker2.SetHandlerByMap(map[string]sarama.MockResponse{
"MetadataRequest": sarama.NewMockMetadataResponse(t).
SetBroker(broker1.Addr(), broker1.BrokerID()).
SetBroker(broker2.Addr(), broker2.BrokerID()).
SetBroker(broker3.Addr(), broker3.BrokerID()).
SetLeader(topic, partition, broker3.BrokerID()),
})

// add expectation that broker3 respond to an offset request
broker3.SetHandlerByMap(map[string]sarama.MockResponse{
"OffsetRequest": sarama.NewMockOffsetResponse(t).
SetOffset(topic, partition, sarama.OffsetOldest, 0).
SetOffset(topic, partition, sarama.OffsetNewest, 42),
})

// get leader for topic partition
broker := newBroker(testConf)

// only broker3 will respond successfully to an offset request
offsetRequest := new(sarama.OffsetRequest)
offsetRequest.AddBlock(topic, partition, -1, 1)
if _, err := broker.GetOffset(offsetRequest); err != nil {
t.Fatal(err)
}

broker2.Close()
broker3.Close()

}

0 comments on commit 4084688

Please sign in to comment.