Skip to content

Commit

Permalink
Merge changes from topic 'fab-2439-b'
Browse files Browse the repository at this point in the history
* changes:
  [FAB-2484] Prevent unclean leader election
  [FAB-2483] Improve configtx.yaml formatting & text
  [FAB-2480] Improve orderer.yaml formatting & text
  [FAB-2479] Log consumer errors
  • Loading branch information
yacovm authored and Gerrit Code Review committed Feb 26, 2017
2 parents 2509b2b + 2c9fd1b commit 7399fda
Show file tree
Hide file tree
Showing 10 changed files with 119 additions and 90 deletions.
1 change: 1 addition & 0 deletions bddtests/environments/kafka/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ services:
kafka:
image: hyperledger/fabric-kafka
environment:
KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE: "false"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper
1 change: 1 addition & 0 deletions bddtests/environments/orderer-1-kafka-1/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ services:
image: hyperledger/fabric-kafka
environment:
KAFKA_BROKER_ID: 0
KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE: "false"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper
9 changes: 6 additions & 3 deletions bddtests/environments/orderer-1-kafka-3/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,28 @@ services:
image: hyperledger/fabric-kafka
environment:
KAFKA_BROKER_ID: 0
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE: "false"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper

kafka1:
image: hyperledger/fabric-kafka
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE: "false"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper

kafka2:
image: hyperledger/fabric-kafka
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE: "false"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper
1 change: 1 addition & 0 deletions bddtests/environments/orderer-n-kafka-n/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ services:
kafka:
image: hyperledger/fabric-kafka
environment:
KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE: "false"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper
53 changes: 27 additions & 26 deletions common/configtx/tool/configtx.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
# Profile
#
# - Different configuration profiles may be encoded here to be specified
# as parameters to the configtxgen tool
# as parameters to the configtxgen tool.
#
################################################################################
Profiles:

# SampleInsecureSolo defines a configuration which uses the Solo orderer,
# contains no MSP definitions, and allows all transactions and channel
# creation requests
# creation requests.
SampleInsecureSolo:
Orderer:
<<: *OrdererDefaults
Expand Down Expand Up @@ -49,35 +49,35 @@ Profiles:
################################################################################
Organizations:

# SampleOrg defines an MSP using the sampleconfig. It should never be used
# in production but may be used as a template for other definitions
# SampleOrg defines an MSP using the sampleconfig. It should never be used
# in production but may be used as a template for other definitions.
- &SampleOrg
# DefaultOrg defines the organization which is used in the sampleconfig
# of the fabric.git development environment
# of the fabric.git development environment.
Name: SampleOrg

# ID to load the MSP definition as
# ID to load the MSP definition as.
ID: DEFAULT

# MSPDir is the filesystem path which contains the MSP configuration
# MSPDir is the filesystem path which contains the MSP configuration.
MSPDir: msp/sampleconfig

# BCCSP (Blockchain crypto provider): Select which crypto implementation or
# library to use
# BCCSP: Select which crypto implementation or library to use for the
# blockchain crypto service provider.
BCCSP:
Default: SW
SW:
Hash: SHA3
Security: 256
# Location of Key Store. If this is unset, a location will
# be chosen using 'MSPDir'/keystore
FileKeyStore:
KeyStore:
# Location of key store. If this is unset, a location will
# be chosen using: 'MSPDir'/keystore
FileKeyStore:
KeyStore:

AnchorPeers:
# AnchorPeers defines the location of peers which can be used
# for cross org gossip communication. Note, this value is only
# encoded in the genesis block in the Application section context
# for cross org gossip communication. Note, this value is only
# encoded in the genesis block in the Application section context.
- Host: 127.0.0.1
Port: 7051

Expand All @@ -86,25 +86,26 @@ Organizations:
# SECTION: Orderer
#
# - This section defines the values to encode into a config transaction or
# genesis block for orderer related parameters
# genesis block for orderer related parameters.
#
################################################################################
Orderer: &OrdererDefaults

# Orderer Type: The orderer implementation to start
# Available types are "solo" and "kafka"
# Orderer Type: The orderer implementation to start.
# Available types are "solo" and "kafka".
OrdererType: solo

Addresses:
- 127.0.0.1:7050

# Batch Timeout: The amount of time to wait before creating a batch
# Batch Timeout: The amount of time to wait before creating a batch.
BatchTimeout: 10s

# Batch Size: Controls the number of messages batched into a block
# Batch Size: Controls the number of messages batched into a block.
BatchSize:

# Max Message Count: The maximum number of messages to permit in a batch
# Max Message Count: The maximum number of messages to permit in a
# batch.
MaxMessageCount: 10

# Absolute Max Bytes: The absolute maximum number of bytes allowed for
Expand All @@ -118,25 +119,25 @@ Orderer: &OrdererDefaults
PreferredMaxBytes: 512 KB

Kafka:
# Brokers: A list of Kafka brokers to which the orderer connects
# Brokers: A list of Kafka brokers to which the orderer connects.
# NOTE: Use IP:port notation
Brokers:
- 127.0.0.1:9092

# Organizations is the list of orgs which are defined as participants on
# the orderer side of the network
# the orderer side of the network.
Organizations:

################################################################################
#
# SECTION: Application
#
# - This section defines the values to encode into a config transaction or
# genesis block for application related parameters
# genesis block for application related parameters.
#
################################################################################
Application: &ApplicationDefaults

# Organizations is the list of orgs which are defined as participants on
# the application side of the network
# the application side of the network.
Organizations:
13 changes: 11 additions & 2 deletions orderer/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"github.com/hyperledger/fabric/orderer/localconfig"
)

// Consumer allows the caller to receive a stream of blobs from the Kafka cluster for a specific partition.
// Consumer allows the caller to receive a stream of blobs
// from the Kafka cluster for a specific partition.
type Consumer interface {
Recv() <-chan *sarama.ConsumerMessage
Errors() <-chan *sarama.ConsumerError
Closeable
}

Expand All @@ -49,11 +51,18 @@ func newConsumer(brokers []string, kafkaVersion sarama.KafkaVersion, tls config.
return c, nil
}

// Recv returns a channel with blobs received from the Kafka cluster for a partition.
// Recv returns a channel with blobs received
// from the Kafka cluster for a partition.
func (c *consumerImpl) Recv() <-chan *sarama.ConsumerMessage {
return c.partition.Messages()
}

// Errors returns a channel with errors occuring during
// the consumption of a partition from the Kafka cluster.
func (c *consumerImpl) Errors() <-chan *sarama.ConsumerError {
return c.partition.Errors()
}

// Close shuts down the partition consumer.
// Invoked by the session deliverer's Close method, which is itself called
// during the processSeek function, between disabling and enabling the push.
Expand Down
4 changes: 4 additions & 0 deletions orderer/kafka/consumer_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func (mc *mockConsumerImpl) Recv() <-chan *sarama.ConsumerMessage {
return nil
}

func (mc *mockConsumerImpl) Errors() <-chan *sarama.ConsumerError {
return nil
}

func (mc *mockConsumerImpl) Close() error {
if err := mc.chainPartitionManager.Close(); err != nil {
return err
Expand Down
10 changes: 10 additions & 0 deletions orderer/kafka/orderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,21 @@ func (ch *chainImpl) Start() {
}
ch.consumer = consumer
close(ch.setupChan)
go ch.listenForErrors()

// 3. Set the loop the keep up to date with the chain.
go ch.loop()
}

func (ch *chainImpl) listenForErrors() {
select {
case <-ch.exitChan:
return
case err := <-ch.consumer.Errors():
logger.Error(err)
}
}

// Halt frees the resources which were allocated for this Chain.
// Implements the multichain.Chain interface.
func (ch *chainImpl) Halt() {
Expand Down
30 changes: 14 additions & 16 deletions orderer/kafka/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,40 +27,25 @@ import (
ab "github.com/hyperledger/fabric/protos/orderer"
)

// TODO Set the returned config file to more appropriate
// defaults as we're getting closer to a stable release
func newBrokerConfig(kafkaVersion sarama.KafkaVersion, chosenStaticPartition int32, tlsConfig config.TLS) *sarama.Config {
brokerConfig := sarama.NewConfig()

brokerConfig.Version = kafkaVersion
// Set the level of acknowledgement reliability needed from the broker.
// WaitForAll means that the partition leader will wait till all ISRs
// got the message before sending back an ACK to the sender.
brokerConfig.Producer.RequiredAcks = sarama.WaitForAll
// A partitioner is actually not needed the way we do things now,
// but we're adding it now to allow for flexibility in the future.
brokerConfig.Producer.Partitioner = newStaticPartitioner(chosenStaticPartition)
// Set equivalent of kafka producer config max.request.bytes to the deafult
// value of a Kafka broker's socket.request.max.bytes property (100 MiB).
brokerConfig.Producer.MaxMessageBytes = int(sarama.MaxRequestSize)
brokerConfig.Consumer.Return.Errors = true

brokerConfig.Net.TLS.Enable = tlsConfig.Enabled

if brokerConfig.Net.TLS.Enable {
// create public/private key pair structure
keyPair, err := tls.X509KeyPair([]byte(tlsConfig.Certificate), []byte(tlsConfig.PrivateKey))
if err != nil {
panic(fmt.Errorf("Unable to decode public/private key pair. Error: %v", err))
}

// create root CA pool
rootCAs := x509.NewCertPool()
for _, certificate := range tlsConfig.RootCAs {
if !rootCAs.AppendCertsFromPEM([]byte(certificate)) {
panic(fmt.Errorf("Unable to decode certificate. Error: %v", err))
}
}

brokerConfig.Net.TLS.Config = &tls.Config{
Certificates: []tls.Certificate{keyPair},
RootCAs: rootCAs,
Expand All @@ -69,6 +54,19 @@ func newBrokerConfig(kafkaVersion sarama.KafkaVersion, chosenStaticPartition int
}
}

// Set the level of acknowledgement reliability needed from the broker.
// WaitForAll means that the partition leader will wait till all ISRs
// got the message before sending back an ACK to the sender.
brokerConfig.Producer.RequiredAcks = sarama.WaitForAll
// A partitioner is actually not needed the way we do things now,
// but we're adding it now to allow for flexibility in the future.
brokerConfig.Producer.Partitioner = newStaticPartitioner(chosenStaticPartition)
// Set equivalent of Kafka producer config max.request.bytes to the default
// value of a Kafka broker's socket.request.max.bytes property (100 MiB).
brokerConfig.Producer.MaxMessageBytes = int(sarama.MaxRequestSize)

brokerConfig.Version = kafkaVersion

return brokerConfig
}

Expand Down
Loading

0 comments on commit 7399fda

Please sign in to comment.