Skip to content

Commit

Permalink
[FAB-1809] Enable tls config for Kafka connection
Browse files Browse the repository at this point in the history
The docker related items in previous patch sets will
be submitted under a separate change set.

- Added the ability to specify certs and keys via
  both direct insertion in config or via a file.

Change-Id: Ia34ed4d316ddb7177e78ee4c6c347ba5ee82e116
Signed-off-by: Tuan Dang <[email protected]>
Signed-off-by: Luis Sanchez <[email protected]>
  • Loading branch information
tuand27613 authored and Luis Sanchez committed Feb 2, 2017
1 parent 159d098 commit 4c9bec7
Show file tree
Hide file tree
Showing 13 changed files with 554 additions and 43 deletions.
1 change: 1 addition & 0 deletions images/kafka/Dockerfile.in
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ ADD payload/kafka-run-class.sh /opt/kafka/bin/kafka-run-class.sh
ADD payload/docker-entrypoint.sh /docker-entrypoint.sh

EXPOSE 9092
EXPOSE 9093

ENTRYPOINT ["/docker-entrypoint.sh"]
CMD ["/opt/kafka/bin/kafka-server-start.sh"]
9 changes: 6 additions & 3 deletions orderer/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ limitations under the License.

package kafka

import "github.com/Shopify/sarama"
import (
"github.com/Shopify/sarama"
"github.com/hyperledger/fabric/orderer/localconfig"
)

// Consumer allows the caller to receive a stream of blobs from the Kafka cluster for a specific partition.
type Consumer interface {
Expand All @@ -29,8 +32,8 @@ type consumerImpl struct {
partition sarama.PartitionConsumer
}

func newConsumer(brokers []string, kafkaVersion sarama.KafkaVersion, cp ChainPartition, offset int64) (Consumer, error) {
parent, err := sarama.NewConsumer(brokers, newBrokerConfig(kafkaVersion, rawPartition))
func newConsumer(brokers []string, kafkaVersion sarama.KafkaVersion, tls config.TLS, cp ChainPartition, offset int64) (Consumer, error) {
parent, err := sarama.NewConsumer(brokers, newBrokerConfig(kafkaVersion, rawPartition, tls))
if err != nil {
return nil, err
}
Expand Down
37 changes: 20 additions & 17 deletions orderer/kafka/orderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,50 +29,51 @@ import (
)

// New creates a Kafka-backed consenter. Called by orderer's main.go.
func New(kv sarama.KafkaVersion, ro config.Retry) multichain.Consenter {
return newConsenter(kv, ro, bfValue, pfValue, cfValue)
func New(kv sarama.KafkaVersion, ro config.Retry, tls config.TLS) multichain.Consenter {
return newConsenter(kv, ro, tls, bfValue, pfValue, cfValue)
}

// New calls here because we need to pass additional arguments to
// the constructor and New() should only read from the config file.
func newConsenter(kv sarama.KafkaVersion, ro config.Retry, bf bfType, pf pfType, cf cfType) multichain.Consenter {
return &consenterImpl{kv, ro, bf, pf, cf}
func newConsenter(kv sarama.KafkaVersion, ro config.Retry, tls config.TLS, bf bfType, pf pfType, cf cfType) multichain.Consenter {
return &consenterImpl{kv, ro, tls, bf, pf, cf}
}

// bfType defines the signature of the broker constructor.
type bfType func([]string, ChainPartition) (Broker, error)

// pfType defines the signature of the producer constructor.
type pfType func([]string, sarama.KafkaVersion, config.Retry) Producer
type pfType func([]string, sarama.KafkaVersion, config.Retry, config.TLS) Producer

// cfType defines the signature of the consumer constructor.
type cfType func([]string, sarama.KafkaVersion, ChainPartition, int64) (Consumer, error)
type cfType func([]string, sarama.KafkaVersion, config.TLS, ChainPartition, int64) (Consumer, error)

// bfValue holds the value for the broker constructor that's used in the non-test case.
var bfValue = func(brokers []string, cp ChainPartition) (Broker, error) {
return newBroker(brokers, cp)
}

// pfValue holds the value for the producer constructor that's used in the non-test case.
var pfValue = func(brokers []string, kafkaVersion sarama.KafkaVersion, retryOptions config.Retry) Producer {
return newProducer(brokers, kafkaVersion, retryOptions)
var pfValue = func(brokers []string, kafkaVersion sarama.KafkaVersion, retryOptions config.Retry, tls config.TLS) Producer {
return newProducer(brokers, kafkaVersion, retryOptions, tls)
}

// cfValue holds the value for the consumer constructor that's used in the non-test case.
var cfValue = func(brokers []string, kafkaVersion sarama.KafkaVersion, cp ChainPartition, offset int64) (Consumer, error) {
return newConsumer(brokers, kafkaVersion, cp, offset)
var cfValue = func(brokers []string, kafkaVersion sarama.KafkaVersion, tls config.TLS, cp ChainPartition, offset int64) (Consumer, error) {
return newConsumer(brokers, kafkaVersion, tls, cp, offset)
}

// consenterImpl holds the implementation of type that satisfies the
// multichain.Consenter and testableConsenter interfaces. The former
// is needed because that is what the HandleChain contract requires.
// The latter is needed for testing.
type consenterImpl struct {
kv sarama.KafkaVersion
ro config.Retry
bf bfType
pf pfType
cf cfType
kv sarama.KafkaVersion
ro config.Retry
tls config.TLS
bf bfType
pf pfType
cf cfType
}

// HandleChain creates/returns a reference to a Chain for the given set of support resources.
Expand Down Expand Up @@ -110,7 +111,7 @@ func newChain(consenter testableConsenter, support multichain.ConsenterSupport,
partition: newChainPartition(support.ChainID(), rawPartition),
batchTimeout: support.SharedConfig().BatchTimeout(),
lastOffsetPersisted: lastOffsetPersisted,
producer: consenter.prodFunc()(support.SharedConfig().KafkaBrokers(), consenter.kafkaVersion(), consenter.retryOptions()),
producer: consenter.prodFunc()(support.SharedConfig().KafkaBrokers(), consenter.kafkaVersion(), consenter.retryOptions(), consenter.tlsConfig()),
halted: false, // Redundant as the default value for booleans is false but added for readability
exitChan: make(chan struct{}),
haltedChan: make(chan struct{}),
Expand All @@ -123,13 +124,15 @@ func newChain(consenter testableConsenter, support multichain.ConsenterSupport,
type testableConsenter interface {
kafkaVersion() sarama.KafkaVersion
retryOptions() config.Retry
tlsConfig() config.TLS
brokFunc() bfType
prodFunc() pfType
consFunc() cfType
}

func (co *consenterImpl) kafkaVersion() sarama.KafkaVersion { return co.kv }
func (co *consenterImpl) retryOptions() config.Retry { return co.ro }
func (co *consenterImpl) tlsConfig() config.TLS { return co.tls }
func (co *consenterImpl) brokFunc() bfType { return co.bf }
func (co *consenterImpl) prodFunc() pfType { return co.pf }
func (co *consenterImpl) consFunc() cfType { return co.cf }
Expand Down Expand Up @@ -169,7 +172,7 @@ func (ch *chainImpl) Start() {
}

// 2. Set up the listener/consumer for this partition.
consumer, err := ch.consenter.consFunc()(ch.support.SharedConfig().KafkaBrokers(), ch.consenter.kafkaVersion(), ch.partition, ch.lastOffsetPersisted+1)
consumer, err := ch.consenter.consFunc()(ch.support.SharedConfig().KafkaBrokers(), ch.consenter.kafkaVersion(), ch.consenter.tlsConfig(), ch.partition, ch.lastOffsetPersisted+1)
if err != nil {
logger.Criticalf("Cannot retrieve required offset from Kafka cluster for chain %s: %s", ch.partition, err)
close(ch.exitChan)
Expand Down
17 changes: 10 additions & 7 deletions orderer/kafka/orderer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,16 @@ func mockNewConsenter(t *testing.T, kafkaVersion sarama.KafkaVersion, retryOptio
prodDisk := make(chan *ab.KafkaMessage)
consDisk := make(chan *ab.KafkaMessage)

mockTLS := config.TLS{Enabled: false}

mockBfValue := func(brokers []string, cp ChainPartition) (Broker, error) {
return mockNewBroker(t, cp)
}
mockPfValue := func(brokers []string, kafkaVersion sarama.KafkaVersion, retryOptions config.Retry) Producer {
mockPfValue := func(brokers []string, kafkaVersion sarama.KafkaVersion, retryOptions config.Retry, tls config.TLS) Producer {
// The first Send on this producer will return a blob with offset #nextProducedOffset
return mockNewProducer(t, cp, nextProducedOffset, prodDisk)
}
mockCfValue := func(brokers []string, kafkaVersion sarama.KafkaVersion, cp ChainPartition, lastPersistedOffset int64) (Consumer, error) {
mockCfValue := func(brokers []string, kafkaVersion sarama.KafkaVersion, tls config.TLS, cp ChainPartition, lastPersistedOffset int64) (Consumer, error) {
if lastPersistedOffset != nextProducedOffset {
panic(fmt.Errorf("Mock objects about to be set up incorrectly (consumer to seek to %d, producer to post %d)", lastPersistedOffset, nextProducedOffset))
}
Expand All @@ -67,11 +69,12 @@ func mockNewConsenter(t *testing.T, kafkaVersion sarama.KafkaVersion, retryOptio

return &mockConsenterImpl{
consenterImpl: consenterImpl{
kv: kafkaVersion,
ro: retryOptions,
bf: mockBfValue,
pf: mockPfValue,
cf: mockCfValue,
kv: kafkaVersion,
ro: retryOptions,
tls: mockTLS,
bf: mockBfValue,
pf: mockPfValue,
cf: mockCfValue,
},
prodDisk: prodDisk,
consDisk: consDisk,
Expand Down
4 changes: 2 additions & 2 deletions orderer/kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ type producerImpl struct {
producer sarama.SyncProducer
}

func newProducer(brokers []string, kafkaVersion sarama.KafkaVersion, retryOptions config.Retry) Producer {
func newProducer(brokers []string, kafkaVersion sarama.KafkaVersion, retryOptions config.Retry, tls config.TLS) Producer {
var p sarama.SyncProducer
var err error
brokerConfig := newBrokerConfig(kafkaVersion, rawPartition)
brokerConfig := newBrokerConfig(kafkaVersion, rawPartition, tls)

repeatTick := time.NewTicker(retryOptions.Period)
panicTick := time.NewTicker(retryOptions.Stop)
Expand Down
31 changes: 30 additions & 1 deletion orderer/kafka/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@ limitations under the License.
package kafka

import (
"crypto/tls"
"crypto/x509"
"fmt"
"strconv"

"github.com/Shopify/sarama"
"github.com/hyperledger/fabric/orderer/localconfig"
ab "github.com/hyperledger/fabric/protos/orderer"
)

// TODO Set the returned config file to more appropriate
// defaults as we're getting closer to a stable release
func newBrokerConfig(kafkaVersion sarama.KafkaVersion, chosenStaticPartition int32) *sarama.Config {
func newBrokerConfig(kafkaVersion sarama.KafkaVersion, chosenStaticPartition int32, tlsConfig config.TLS) *sarama.Config {
brokerConfig := sarama.NewConfig()

brokerConfig.Version = kafkaVersion
Expand All @@ -40,6 +44,31 @@ func newBrokerConfig(kafkaVersion sarama.KafkaVersion, chosenStaticPartition int
// value of a Kafka broker's socket.request.max.bytes property (100 MiB).
brokerConfig.Producer.MaxMessageBytes = int(sarama.MaxRequestSize)

brokerConfig.Net.TLS.Enable = tlsConfig.Enabled

if brokerConfig.Net.TLS.Enable {
// create public/private key pair structure
keyPair, err := tls.X509KeyPair([]byte(tlsConfig.Certificate), []byte(tlsConfig.PrivateKey))
if err != nil {
panic(fmt.Errorf("Unable to decode public/private key pair. Error: %v", err))
}

// create root CA pool
rootCAs := x509.NewCertPool()
for _, certificate := range tlsConfig.RootCAs {
if !rootCAs.AppendCertsFromPEM([]byte(certificate)) {
panic(fmt.Errorf("Unable to decode certificate. Error: %v", err))
}
}

brokerConfig.Net.TLS.Config = &tls.Config{
Certificates: []tls.Certificate{keyPair},
RootCAs: rootCAs,
MinVersion: 0, // TLS 1.0 (no SSL support)
MaxVersion: 0, // Latest supported TLS version
}
}

return brokerConfig
}

Expand Down
97 changes: 95 additions & 2 deletions orderer/kafka/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (

"github.com/Shopify/sarama"
"github.com/hyperledger/fabric/orderer/common/bootstrap/provisional"
"github.com/hyperledger/fabric/orderer/localconfig"
"github.com/hyperledger/fabric/orderer/mocks/util"
"github.com/stretchr/testify/assert"
)

func TestProducerConfigMessageMaxBytes(t *testing.T) {
Expand All @@ -35,7 +38,8 @@ func TestProducerConfigMessageMaxBytes(t *testing.T) {
"ProduceRequest": sarama.NewMockProduceResponse(t),
})

config := newBrokerConfig(testConf.Kafka.Version, rawPartition)
mockTLS := config.TLS{Enabled: false}
config := newBrokerConfig(testConf.Kafka.Version, rawPartition, mockTLS)
producer, err := sarama.NewSyncProducer([]string{broker.Addr()}, config)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -86,7 +90,7 @@ func TestNewBrokerConfig(t *testing.T) {
"ProduceRequest": sarama.NewMockProduceResponse(t),
})

config := newBrokerConfig(testConf.Kafka.Version, differentPartition)
config := newBrokerConfig(testConf.Kafka.Version, differentPartition, config.TLS{Enabled: false})
producer, err := sarama.NewSyncProducer([]string{broker.Addr()}, config)
if err != nil {
t.Fatal("Failed to create producer:", err)
Expand All @@ -105,3 +109,92 @@ func TestNewBrokerConfig(t *testing.T) {
}
}
}

func TestTLSConfigEnabled(t *testing.T) {
publicKey, privateKey, err := util.GenerateMockPublicPrivateKeyPairPEM(false)
if err != nil {
t.Fatalf("Enable to generate a public/private key pair: %v", err)
}
caPublicKey, _, err := util.GenerateMockPublicPrivateKeyPairPEM(true)
if err != nil {
t.Fatalf("Enable to generate a signer certificate: %v", err)
}

config := newBrokerConfig(testConf.Kafka.Version, 0, config.TLS{
Enabled: true,
PrivateKey: privateKey,
Certificate: publicKey,
RootCAs: []string{caPublicKey},
})

assert.True(t, config.Net.TLS.Enable)
assert.NotNil(t, config.Net.TLS.Config)
assert.Len(t, config.Net.TLS.Config.Certificates, 1)
assert.Len(t, config.Net.TLS.Config.RootCAs.Subjects(), 1)
assert.Equal(t, uint16(0), config.Net.TLS.Config.MaxVersion)
assert.Equal(t, uint16(0), config.Net.TLS.Config.MinVersion)
}

func TestTLSConfigDisabled(t *testing.T) {
publicKey, privateKey, err := util.GenerateMockPublicPrivateKeyPairPEM(false)
if err != nil {
t.Fatalf("Enable to generate a public/private key pair: %v", err)
}
caPublicKey, _, err := util.GenerateMockPublicPrivateKeyPairPEM(true)
if err != nil {
t.Fatalf("Enable to generate a signer certificate: %v", err)
}

config := newBrokerConfig(testConf.Kafka.Version, 0, config.TLS{
Enabled: false,
PrivateKey: privateKey,
Certificate: publicKey,
RootCAs: []string{caPublicKey},
})

assert.False(t, config.Net.TLS.Enable)
assert.Zero(t, config.Net.TLS.Config)

}

func TestTLSConfigBadCert(t *testing.T) {
publicKey, privateKey, err := util.GenerateMockPublicPrivateKeyPairPEM(false)
if err != nil {
t.Fatalf("Enable to generate a public/private key pair: %v", err)
}
caPublicKey, _, err := util.GenerateMockPublicPrivateKeyPairPEM(true)
if err != nil {
t.Fatalf("Enable to generate a signer certificate: %v", err)
}

t.Run("BadPrivateKey", func(t *testing.T) {
assert.Panics(t, func() {
newBrokerConfig(testConf.Kafka.Version, 0, config.TLS{
Enabled: true,
PrivateKey: privateKey,
Certificate: "TRASH",
RootCAs: []string{caPublicKey},
})
})
})
t.Run("BadPublicKey", func(t *testing.T) {
assert.Panics(t, func() {
newBrokerConfig(testConf.Kafka.Version, 0, config.TLS{
Enabled: true,
PrivateKey: "TRASH",
Certificate: publicKey,
RootCAs: []string{caPublicKey},
})
})
})
t.Run("BadRootCAs", func(t *testing.T) {
assert.Panics(t, func() {
newBrokerConfig(testConf.Kafka.Version, 0, config.TLS{
Enabled: true,
PrivateKey: privateKey,
Certificate: publicKey,
RootCAs: []string{"TRASH"},
})
})
})
}
Loading

0 comments on commit 4c9bec7

Please sign in to comment.