Skip to content

Commit

Permalink
Add Kafka-based orderer
Browse files Browse the repository at this point in the history
Related: http://jira.hyperledger.org/browse/FAB-32

This changeset introduces a simple, solo-like Kafka-based orderer,
complete with a config file that ties into the orderer config mechanism
introduced in an earlier changeset, unit and BDD tests. It also provides
a sample client that broadcasts and delivers counter values.

For a demo of this work please watch this video:
https://ibm.box.com/s/kqkk12vn18w3s3in2vkioucl9z32jk2h

This changeset introduces the following abstractions:

- Broker: Provides info on the atomic broadcast seek requests (earliest
batch available, etc.)
- Producer: Sends batches to Kafka
- Consumer: Reads a stream of batches from Kafka
- Client Deliver: A consumer dedicated to a connected client
- Deliverer: Handles the deliver part of the Kafka-based orderer, spawns
a new Client Deliver instance per connected client
- Broadcaster: Handles the broadcast part of the Kafka-based orderer;
cuts batches and sends them to Kafka
- Orderer: Consists of a Deliverer and Broadcaster and, as the name
suggests, handles all ordering requests (broadcast and deliver RPCs)
issued by the connected clients.

Change-Id: I09a313e9bf681051ee73b35d8d14401fee234f02
Signed-off-by: Kostas Christidis <[email protected]>
  • Loading branch information
kchristidis committed Oct 12, 2016
1 parent 3cedee1 commit f6640f2
Show file tree
Hide file tree
Showing 34 changed files with 2,519 additions and 76 deletions.
12 changes: 12 additions & 0 deletions bddtests/docker-compose-orderer-base.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
ordererBase:
image: hyperledger/fabric-orderer
environment:
- ORDERER_GENERAL_LEDGERTYPE=ram
- ORDERER_GENERAL_BATCHTIMEOUT=10s
- ORDERER_GENERAL_BATCHSIZE=10
- ORDERER_GENERAL_MAXWINDOWSIZE=1000
- ORDERER_GENERAL_LISTENADDRESS=0.0.0.0
- ORDERER_GENERAL_LISTENPORT=5005
- ORDERER_RAMLEDGER_HISTORY_SIZE=100
working_dir: /opt/gopath/src/github.com/hyperledger/fabric/orderer
command: orderer
15 changes: 15 additions & 0 deletions bddtests/docker-compose-orderer-kafka.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
kafka0:
image: kchristidis/kafka
environment:
- ADVERTISED_PORT=9092

orderer0:
extends:
file: docker-compose-orderer-base.yml
service: ordererBase
environment:
- ORDERER_GENERAL_ORDERERTYPE=kafka
- ORDERER_KAFKA_BROKERS=[kafka0:9092]
links:
- kafka0
command: orderer -loglevel debug -verbose true
18 changes: 5 additions & 13 deletions bddtests/docker-compose-orderer-solo.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
orderer0:
image: hyperledger/fabric-orderer
environment:
- ORDERER_GENERAL_ORDERERTYPE=solo
- ORDERER_GENERAL_LEDGERTYPE=ram
- ORDERER_GENERAL_BATCHTIMEOUT=10s
- ORDERER_GENERAL_BATCHSIZE=10
- ORDERER_GENERAL_MAXWINDOWSIZE=1000
- ORDERER_GENERAL_LISTENADDRESS=0.0.0.0
- ORDERER_GENERAL_LISTENPORT=5005
- ORDERER_RAMLEDGER_HISTORY_SIZE=100

working_dir: /opt/gopath/src/github.com/hyperledger/fabric/orderer
command: orderer
extends:
file: docker-compose-orderer-base.yml
service: ordererBase
environment:
- ORDERER_GENERAL_ORDERERTYPE=solo
92 changes: 45 additions & 47 deletions bddtests/features/orderer.feature
Original file line number Diff line number Diff line change
@@ -1,75 +1,73 @@
#
# Test Orderer
#
# Tags that can be used and will affect test internals:
# @doNotDecompose will NOT decompose the named compose_yaml after scenario ends. Useful for setting up environment and reviewing after scenario.
# @chaincodeImagesUpToDate use this if all scenarios chaincode images are up to date, and do NOT require building. BE SURE!!!
# @doNotDecompose will NOT decompose the named compose_yaml after scenario ends.
# Useful for setting up environment and reviewing after scenario.

#@chaincodeImagesUpToDate
@orderer
Feature: Orderer
As a Fabric developer
I want to run and validate a orderer service




# @doNotDecompose
Scenario Outline: Basic orderer function
Scenario Outline: Basic orderer function

Given we compose "<ComposeFile>"
And I wait ".5" seconds
And user "binhn" is an authorized user of the ordering service
When user "binhn" broadcasts "<NumMsgsToBroadcast>" unique messages on "orderer0"
And user "binhn" connects to deliver function on "orderer0" with Ack of "<SendAck>" and properties:
Given we compose "<ComposeFile>"
And I wait "<BootTime>" seconds
And user "binhn" is an authorized user of the ordering service
When user "binhn" broadcasts "<NumMsgsToBroadcast>" unique messages on "orderer0"
And user "binhn" connects to deliver function on "orderer0" with Ack of "<SendAck>" and properties:
| Start | SpecifiedNumber | WindowSize |
| SPECIFIED | 1 | 10 |
Then user "binhn" should get a delivery from "orderer0" of "<ExpectedBlocks>" blocks with "<NumMsgsToBroadcast>" messages within "<BatchTimeout>" seconds

Examples: Orderer Options
| ComposeFile | SendAck | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout |
| docker-compose-orderer-solo.yml | true | 20 | 2 | 10 |
| docker-compose-orderer-solo.yml | true | 40 | 4 | 10 |
| docker-compose-orderer-solo.yml | true | 60 | 6 | 10 |

Then user "binhn" should get a delivery from "orderer0" of "<ExpectedBlocks>" blocks with "<NumMsgsToBroadcast>" messages within "<BatchTimeout>" seconds

Examples: Orderer Options
| ComposeFile | SendAck | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout | BootTime |
| docker-compose-orderer-solo.yml | true | 20 | 2 | 10 | .5 |
| docker-compose-orderer-solo.yml | true | 40 | 4 | 10 | .5 |
| docker-compose-orderer-solo.yml | true | 60 | 6 | 10 | .5 |
| docker-compose-orderer-kafka.yml | true | 20 | 2 | 10 | 5 |
| docker-compose-orderer-kafka.yml | true | 40 | 4 | 10 | 5 |
| docker-compose-orderer-kafka.yml | true | 60 | 6 | 10 | 5 |

# @doNotDecompose
Scenario Outline: Basic seek orderer function (Utilizing properties for atomic broadcast)
Scenario Outline: Basic seek orderer function (Utilizing properties for atomic broadcast)

Given we compose "<ComposeFile>"
And I wait ".5" seconds
And user "binhn" is an authorized user of the ordering service
When user "binhn" broadcasts "<NumMsgsToBroadcast>" unique messages on "orderer0"
And user "binhn" connects to deliver function on "orderer0" with Ack of "<SendAck>" and properties:
Given we compose "<ComposeFile>"
And I wait "<BootTime>" seconds
And user "binhn" is an authorized user of the ordering service
When user "binhn" broadcasts "<NumMsgsToBroadcast>" unique messages on "orderer0"
And user "binhn" connects to deliver function on "orderer0" with Ack of "<SendAck>" and properties:
| Start | SpecifiedNumber | WindowSize |
| SPECIFIED | 1 | 10 |
Then user "binhn" should get a delivery from "orderer0" of "<ExpectedBlocks>" blocks with "<NumMsgsToBroadcast>" messages within "<BatchTimeout>" seconds
When user "binhn" seeks to block "1" on deliver function on "orderer0"
Then user "binhn" should get a delivery from "orderer0" of "<ExpectedBlocks>" blocks with "<NumMsgsToBroadcast>" messages within "1" seconds

Then user "binhn" should get a delivery from "orderer0" of "<ExpectedBlocks>" blocks with "<NumMsgsToBroadcast>" messages within "<BatchTimeout>" seconds
When user "binhn" seeks to block "1" on deliver function on "orderer0"
Then user "binhn" should get a delivery from "orderer0" of "<ExpectedBlocks>" blocks with "<NumMsgsToBroadcast>" messages within "1" seconds

Examples: Orderer Options
| ComposeFile | SendAck | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout |
| docker-compose-orderer-solo.yml | true | 20 | 2 | 10 |
# | docker-compose-orderer-solo.yml | true | 40 | 4 | 10 |
# | docker-compose-orderer-solo.yml | true | 60 | 6 | 10 |
| ComposeFile | SendAck | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout | BootTime |
| docker-compose-orderer-solo.yml | true | 20 | 2 | 10 | .5 |
| docker-compose-orderer-solo.yml | true | 40 | 4 | 10 | .5 |
| docker-compose-orderer-solo.yml | true | 60 | 6 | 10 | .5 |
| docker-compose-orderer-kafka.yml | true | 20 | 2 | 10 | 5 |
| docker-compose-orderer-kafka.yml | true | 40 | 4 | 10 | 5 |
| docker-compose-orderer-kafka.yml | true | 60 | 6 | 10 | 5 |


# @doNotDecompose
Scenario Outline: Basic orderer function varying ACK
Scenario Outline: Basic orderer function varying ACK

Given we compose "<ComposeFile>"
And I wait ".5" seconds
And user "binhn" is an authorized user of the ordering service
When user "binhn" broadcasts "<NumMsgsToBroadcast>" unique messages on "orderer0"
And user "binhn" connects to deliver function on "orderer0" with Ack of "<SendAck>" and properties:
Given we compose "<ComposeFile>"
And I wait "<BootTime>" seconds
And user "binhn" is an authorized user of the ordering service
When user "binhn" broadcasts "<NumMsgsToBroadcast>" unique messages on "orderer0"
And user "binhn" connects to deliver function on "orderer0" with Ack of "<SendAck>" and properties:
| Start | SpecifiedNumber | WindowSize |
| SPECIFIED | 1 | 1 |
Then user "binhn" should get a delivery from "orderer0" of "<ExpectedBlocks>" blocks with "<NumMsgsToBroadcast>" messages within "<BatchTimeout>" seconds

Then user "binhn" should get a delivery from "orderer0" of "<ExpectedBlocks>" blocks with "<NumMsgsToBroadcast>" messages within "<BatchTimeout>" seconds

Examples: Orderer Options
| ComposeFile | SendAck | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout |
| docker-compose-orderer-solo.yml | false | 20 | 1 | 10 |
| docker-compose-orderer-solo.yml | true | 20 | 2 | 10 |
| ComposeFile | SendAck | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout | BootTime |
| docker-compose-orderer-solo.yml | false | 20 | 1 | 10 | .5 |
| docker-compose-orderer-solo.yml | true | 20 | 2 | 10 | .5 |
| docker-compose-orderer-kafka.yml | false | 20 | 1 | 10 | 5 |
| docker-compose-orderer-kafka.yml | true | 20 | 2 | 10 | 5 |
43 changes: 43 additions & 0 deletions orderer/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"
"time"

"github.com/Shopify/sarama"
"github.com/op/go-logging"
"github.com/spf13/viper"
)
Expand Down Expand Up @@ -59,6 +60,21 @@ type FileLedger struct {
Prefix string
}

// Kafka contains config for the Kafka orderer
type Kafka struct {
Brokers []string
Topic string
PartitionID int32
Retry Retry
Version sarama.KafkaVersion // TODO For now set this in code
}

// Retry contains config for the reconnection attempts to the Kafka brokers
type Retry struct {
Period time.Duration
Stop time.Duration
}

// TopLevel directly corresponds to the orderer config yaml
// Note, for non 1-1 mappings, you may append
// something like `mapstructure:"weirdFoRMat"` to
Expand All @@ -68,6 +84,7 @@ type TopLevel struct {
General General
RAMLedger RAMLedger
FileLedger FileLedger
Kafka Kafka
}

var defaults = TopLevel{
Expand All @@ -88,6 +105,16 @@ var defaults = TopLevel{
Location: "",
Prefix: "hyperledger-fabric-rawledger",
},
Kafka: Kafka{
Brokers: []string{"127.0.0.1:9092"},
Topic: "test",
PartitionID: 0,
Version: sarama.V0_9_0_1,
Retry: Retry{
Period: 3 * time.Second,
Stop: 60 * time.Second,
},
},
}

func (c *TopLevel) completeInitialization() {
Expand Down Expand Up @@ -122,7 +149,22 @@ func (c *TopLevel) completeInitialization() {
case c.FileLedger.Prefix == "":
logger.Infof("FileLedger.Prefix unset, setting to %s", defaults.FileLedger.Prefix)
c.FileLedger.Prefix = defaults.FileLedger.Prefix
case c.Kafka.Brokers == nil:
logger.Infof("Kafka.Brokers unset, setting to %v", defaults.Kafka.Brokers)
c.Kafka.Brokers = defaults.Kafka.Brokers
case c.Kafka.Topic == "":
logger.Infof("Kafka.Topic unset, setting to %v", defaults.Kafka.Topic)
c.Kafka.Topic = defaults.Kafka.Topic
case c.Kafka.Retry.Period == 0*time.Second:
logger.Infof("Kafka.Retry.Period unset, setting to %v", defaults.Kafka.Retry.Period)
c.Kafka.Retry.Period = defaults.Kafka.Retry.Period
case c.Kafka.Retry.Stop == 0*time.Second:
logger.Infof("Kafka.Retry.Stop unset, setting to %v", defaults.Kafka.Retry.Stop)
c.Kafka.Retry.Stop = defaults.Kafka.Retry.Stop
default:
// A bit hacky, but its type makes it impossible to test for a nil value.
// This may be overwritten by the Kafka orderer upon instantiation.
c.Kafka.Version = defaults.Kafka.Version
return
}
}
Expand All @@ -140,6 +182,7 @@ func Load() *TopLevel {

config.SetConfigName("orderer")
config.AddConfigPath("./")
config.AddConfigPath("../../.")
config.AddConfigPath("../orderer/")
config.AddConfigPath("../../orderer/")
// Path to look for the config file in based on GOPATH
Expand Down
137 changes: 137 additions & 0 deletions orderer/kafka/broadcast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kafka

import (
"fmt"
"sync"
"time"

ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
"github.com/hyperledger/fabric/orderer/config"
)

// Broadcaster allows the caller to submit messages to the orderer
type Broadcaster interface {
Broadcast(stream ab.AtomicBroadcast_BroadcastServer) error
Closeable
}

type broadcasterImpl struct {
producer Producer
config *config.TopLevel
once sync.Once

batchChan chan *ab.BroadcastMessage
messages []*ab.BroadcastMessage
nextNumber uint64
prevHash []byte
}

func newBroadcaster(conf *config.TopLevel) Broadcaster {
return &broadcasterImpl{
producer: newProducer(conf),
config: conf,
batchChan: make(chan *ab.BroadcastMessage, conf.General.BatchSize),
messages: []*ab.BroadcastMessage{&ab.BroadcastMessage{Data: []byte("genesis")}},
nextNumber: 0,
}
}

// Broadcast receives ordering requests by clients and sends back an
// acknowledgement for each received message in order, indicating
// success or type of failure
func (b *broadcasterImpl) Broadcast(stream ab.AtomicBroadcast_BroadcastServer) error {
b.once.Do(func() {
// Send the genesis block to create the topic
// otherwise consumers will throw an exception.
b.sendBlock()
// Spawn the goroutine that cuts blocks
go b.cutBlock(b.config.General.BatchTimeout, b.config.General.BatchSize)
})
return b.recvRequests(stream)
}

// Close shuts down the broadcast side of the orderer
func (b *broadcasterImpl) Close() error {
if b.producer != nil {
return b.producer.Close()
}
return nil
}

func (b *broadcasterImpl) sendBlock() error {
block := &ab.Block{
Messages: b.messages,
Number: b.nextNumber,
PrevHash: b.prevHash,
}
logger.Debugf("Prepared block %d with %d messages (%+v)", block.Number, len(block.Messages), block)

b.messages = []*ab.BroadcastMessage{}
b.nextNumber++
hash, data := hashBlock(block)
b.prevHash = hash

return b.producer.Send(data)
}

func (b *broadcasterImpl) cutBlock(period time.Duration, maxSize uint) {
timer := time.NewTimer(period)

for {
select {
case msg := <-b.batchChan:
b.messages = append(b.messages, msg)
if len(b.messages) >= int(maxSize) {
if err := b.sendBlock(); err != nil {
panic(fmt.Errorf("Cannot communicate with Kafka broker: %s", err))
}
if !timer.Stop() {
<-timer.C
}
timer.Reset(period)
}
case <-timer.C:
if len(b.messages) > 0 {
if err := b.sendBlock(); err != nil {
panic(fmt.Errorf("Cannot communicate with Kafka broker: %s", err))
}
}
}
}
}

func (b *broadcasterImpl) recvRequests(stream ab.AtomicBroadcast_BroadcastServer) error {
reply := new(ab.BroadcastResponse)
for {
msg, err := stream.Recv()
if err != nil {
logger.Debug("Can no longer receive requests from client (exited?)")
return err
}

b.batchChan <- msg
reply.Status = ab.Status_SUCCESS // TODO This shouldn't always be a success

if err := stream.Send(reply); err != nil {
logger.Info("Cannot send broadcast reply to client")
return err
}
logger.Debugf("Sent broadcast reply %v to client", reply.Status.String())
}
}
Loading

0 comments on commit f6640f2

Please sign in to comment.