Skip to content

Commit

Permalink
[FAB-7887] log hint of Kafka.Version mistach
Browse files Browse the repository at this point in the history
Log a hint that the Kafka.Version property might not be
appropriate for the Kafka broker that orderer is using.

Change-Id: Ib741ccf8b07a1775ddcd921fd654fa8ee8419216
Signed-off-by: Luis Sanchez <[email protected]>
  • Loading branch information
Luis Sanchez committed Jan 25, 2018
1 parent 53264cb commit 4b419a6
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 0 deletions.
16 changes: 16 additions & 0 deletions orderer/consensus/kafka/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,22 @@ func init() {
saramaLogger = saramaEventLogger
}

// init starts a go routine that detects a possible configuration issue
func init() {
listener := saramaLogger.NewListener("insufficient data to decode packet")
go func() {
for {
select {
case <-listener:
logger.Critical("Unable to decode a Kafka packet. Usually, this " +
"indicates that the Kafka.Version specified in the orderer " +
"configuration is incorrectly set to a version which is newer than " +
"the actual Kafka broker version.")
}
}
}()
}

// eventLogger adapts a go-logging Logger to the sarama.Logger interface.
// Additionally, listeners can be registered to be notified when a substring has
// been logged.
Expand Down
62 changes: 62 additions & 0 deletions orderer/consensus/kafka/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ SPDX-License-Identifier: Apache-2.0
package kafka

import (
"bytes"
"fmt"
"os"
"testing"
"time"

Expand Down Expand Up @@ -135,3 +137,63 @@ func TestEventListener(t *testing.T) {
}
}
}

func TestLogPossibleKafkaVersionMismatch(t *testing.T) {

logging.SetLevel(logging.DEBUG, saramaLogID)

topic := channelNameForTest(t)
partition := int32(0)

var buffer bytes.Buffer
logger.SetBackend(logging.AddModuleLevel(
logging.MultiLogger(
logging.NewBackendFormatter(
logging.NewLogBackend(os.Stderr, "", 0),
logging.MustStringFormatter("%{color}%{time:2006-01-02 15:04:05.000 MST} [%{module}] %{shortfunc} -> %{level:.4s} %{id:03x}%{color:reset} %{message}"),
),
logging.NewLogBackend(&buffer, "", 0),
),
))
defer logging.Reset()

broker := sarama.NewMockBroker(t, 500)
defer broker.Close()

config := sarama.NewConfig()
config.ClientID = t.Name()
config.Metadata.Retry.Max = 0
config.Metadata.Retry.Backoff = 250 * time.Millisecond
config.Net.ReadTimeout = 100 * time.Millisecond
config.Version = sarama.V0_10_0_0

broker.SetHandlerByMap(map[string]sarama.MockResponse{
"MetadataRequest": sarama.NewMockMetadataResponse(t).
SetBroker(broker.Addr(), broker.BrokerID()).
SetLeader(topic, partition, broker.BrokerID()),
"OffsetRequest": sarama.NewMockOffsetResponse(t).
SetOffset(topic, partition, sarama.OffsetNewest, 1000).
SetOffset(topic, partition, sarama.OffsetOldest, 0),
"FetchRequest": sarama.NewMockFetchResponse(t, 1).
SetMessage(topic, partition, 0, sarama.StringEncoder("MSG 00")),
})

consumer, err := sarama.NewConsumer([]string{broker.Addr()}, config)
if err != nil {
t.Fatal(err)
}
defer consumer.Close()

partitionConsumer, err := consumer.ConsumePartition(topic, partition, 1)
if err != nil {
t.Fatal(err)
}
defer partitionConsumer.Close()

select {
case <-partitionConsumer.Messages():
t.Fatalf("did not expect to receive message")
case <-time.After(shortTimeout):
assert.Regexp(t, "Kafka.Version specified in the orderer configuration is incorrectly set", buffer.String())
}
}

0 comments on commit 4b419a6

Please sign in to comment.