From 2459f93b57ae2e7622cb408538d15265863b32b2 Mon Sep 17 00:00:00 2001 From: yacovm Date: Tue, 25 Jul 2017 14:15:19 +0300 Subject: [PATCH] [FAB-5406] Mutual TLS in chaincode service-P2 This commit adds an authentication layer to the chaincode service that inspects the first REGISTER message from the shim, and based on the passed chaincode name - authorizes the stream or rejects it. The full details of the flow can be seen in the JIRA item. Change-Id: I4a1b3c9b3b078d96906f3c9618520b9fe319eeb6 Signed-off-by: yacovm --- core/chaincode/accesscontrol/access.go | 121 +++++++ core/chaincode/accesscontrol/access_test.go | 361 ++++++++++++++++++++ core/chaincode/accesscontrol/ca_test.go | 3 +- core/chaincode/accesscontrol/interceptor.go | 79 +++++ 4 files changed, 562 insertions(+), 2 deletions(-) create mode 100644 core/chaincode/accesscontrol/access.go create mode 100644 core/chaincode/accesscontrol/access_test.go create mode 100644 core/chaincode/accesscontrol/interceptor.go diff --git a/core/chaincode/accesscontrol/access.go b/core/chaincode/accesscontrol/access.go new file mode 100644 index 00000000000..267df137dde --- /dev/null +++ b/core/chaincode/accesscontrol/access.go @@ -0,0 +1,121 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package accesscontrol + +import ( + "errors" + "fmt" + + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/common/flogging" + pb "github.com/hyperledger/fabric/protos/peer" + "google.golang.org/grpc" +) + +var logger = flogging.MustGetLogger("accessControl") + +// Authenticator wraps a chaincode service and authenticates +// chaincode shims (containers) +type Authenticator interface { + // DisableAccessCheck disables the access control + // enforcement of the Authenticator + DisableAccessCheck() + + // Generate returns a pair of certificate and private key, + // and associates the hash of the certificate with the given + // chaincode name + Generate(ccName string) (*CertAndPrivKeyPair, error) + + // ChaincodeSupportServer - The Authenticator is registered + // as a chaincode service + pb.ChaincodeSupportServer +} + +// CertAndPrivKeyPair contains a certificate +// and its corresponding private key in base64 format +type CertAndPrivKeyPair struct { + // Cert - an x509 certificate encoded in base64 + Cert string + // Key - a private key of the corresponding certificate + Key string +} + +type authenticator struct { + bypass bool + mapper *certMapper + pb.ChaincodeSupportServer +} + +// NewAuthenticator returns a new authenticator that would wrap the given chaincode service +func NewAuthenticator(srv pb.ChaincodeSupportServer, ca CA) Authenticator { + auth := &authenticator{ + mapper: newCertMapper(ca.newCertKeyPair), + } + auth.ChaincodeSupportServer = newInterceptor(srv, auth.authenticate) + return auth +} + +// DisableAccessCheck disables the access control +// enforcement of the Authenticator +func (ac *authenticator) DisableAccessCheck() { + ac.bypass = true +} + +// Generate returns a pair of certificate and private key, +// and associates the hash of the certificate with the given +// chaincode name +func (ac *authenticator) Generate(ccName string) (*CertAndPrivKeyPair, error) { + cert, err := ac.mapper.genCert(ccName) + if err != nil { + return nil, err + } + return &CertAndPrivKeyPair{ + Key: cert.privKeyString(), + Cert: cert.pubKeyString(), + }, nil +} + +func (ac *authenticator) authenticate(msg *pb.ChaincodeMessage, stream grpc.ServerStream) error { + if ac.bypass { + return nil + } + + if msg.Type != pb.ChaincodeMessage_REGISTER { + logger.Warning("Got message", msg, "but expected a ChaincodeMessage_REGISTER message") + return errors.New("First message needs to be a register") + } + + chaincodeID := &pb.ChaincodeID{} + err := proto.Unmarshal(msg.Payload, chaincodeID) + if err != nil { + logger.Warning("Failed unmarshaling message:", err) + return err + } + ccName := chaincodeID.Name + // Obtain certificate from stream + hash := extractCertificateHashFromContext(stream.Context()) + if len(hash) == 0 { + errMsg := fmt.Sprintf("TLS is active but chaincode %s didn't send certificate", ccName) + logger.Warning(errMsg) + return errors.New(errMsg) + } + // Look it up in the mapper + registeredName := ac.mapper.lookup(certHash(hash)) + if registeredName == "" { + errMsg := fmt.Sprintf("Chaincode %s with given certificate hash %v not found in registry", ccName, hash) + logger.Warning(errMsg) + return errors.New(errMsg) + } + if registeredName != ccName { + errMsg := fmt.Sprintf("Chaincode %s with given certificate hash %v belongs to a different chaincode", ccName, hash) + logger.Warning(errMsg) + return fmt.Errorf(errMsg) + } + + logger.Debug("Chaincode", ccName, "'s authentication is authorized") + return nil +} diff --git a/core/chaincode/accesscontrol/access_test.go b/core/chaincode/accesscontrol/access_test.go new file mode 100644 index 00000000000..c74ff8a9e4d --- /dev/null +++ b/core/chaincode/accesscontrol/access_test.go @@ -0,0 +1,361 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package accesscontrol + +import ( + "crypto/tls" + "encoding/base64" + "fmt" + "net" + "testing" + "time" + + "github.com/golang/protobuf/proto" + flogging "github.com/hyperledger/fabric/common/flogging" + pb "github.com/hyperledger/fabric/protos/peer" + "github.com/op/go-logging" + "github.com/stretchr/testify/assert" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +type ccSrv struct { + l net.Listener + grpcSrv *grpc.Server + t *testing.T + cert []byte + expectedCCname string +} + +func (cs *ccSrv) Register(stream pb.ChaincodeSupport_RegisterServer) error { + msg, err := stream.Recv() + if err != nil { + return err + } + + // First message is a register message + assert.Equal(cs.t, pb.ChaincodeMessage_REGISTER.String(), msg.Type.String()) + // And its chaincode name is the expected one + chaincodeID := &pb.ChaincodeID{} + err = proto.Unmarshal(msg.Payload, chaincodeID) + if err != nil { + return err + } + assert.Equal(cs.t, cs.expectedCCname, chaincodeID.Name) + // Subsequent messages are just echoed back + for { + msg, _ = stream.Recv() + if err != nil { + return err + } + err = stream.Send(msg) + if err != nil { + return err + } + } +} + +func (cs *ccSrv) stop() { + cs.grpcSrv.Stop() + cs.l.Close() +} + +func newCCServer(t *testing.T, port int, expectedCCname string, withTLS bool, clientCAcert []byte) *ccSrv { + var s *grpc.Server + if withTLS { + s = createTLSService(t, clientCAcert) + } else { + s = grpc.NewServer() + } + + l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", "", port)) + assert.NoError(t, err, "%v", err) + return &ccSrv{ + expectedCCname: expectedCCname, + l: l, + grpcSrv: s, + } +} + +type ccClient struct { + conn *grpc.ClientConn + stream pb.ChaincodeSupport_RegisterClient +} + +func newClient(t *testing.T, port int, cert *tls.Certificate) (*ccClient, error) { + tlsCfg := &tls.Config{ + InsecureSkipVerify: true, + } + if cert != nil { + tlsCfg.Certificates = []tls.Certificate{*cert} + } + tlsOpts := grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg)) + conn, err := grpc.Dial(fmt.Sprintf("localhost:%d", port), tlsOpts, grpc.WithBlock(), grpc.WithTimeout(time.Second)) + if err != nil { + return nil, err + } + chaincodeSupportClient := pb.NewChaincodeSupportClient(conn) + stream, err := chaincodeSupportClient.Register(context.Background()) + assert.NoError(t, err) + return &ccClient{ + conn: conn, + stream: stream, + }, nil +} + +func (c *ccClient) close() { + c.conn.Close() +} + +func (c *ccClient) sendMsg(msg *pb.ChaincodeMessage) { + c.stream.Send(msg) +} + +func (c *ccClient) recv() *pb.ChaincodeMessage { + msgs := make(chan *pb.ChaincodeMessage, 1) + go func() { + msg, _ := c.stream.Recv() + if msg != nil { + msgs <- msg + } + }() + select { + case <-time.After(time.Second): + return nil + case msg := <-msgs: + return msg + } +} + +func TestAccessControl(t *testing.T) { + backupTTL := ttl + defer func() { + ttl = backupTTL + }() + ttl = time.Second * 3 + + logAsserter := &logBackend{ + logEntries: make(chan string, 1), + } + logger.SetBackend(logAsserter) + defer func() { + logger = flogging.MustGetLogger("accessControl") + }() + + chaincodeID := &pb.ChaincodeID{Name: "example02"} + payload, err := proto.Marshal(chaincodeID) + registerMsg := &pb.ChaincodeMessage{ + Type: pb.ChaincodeMessage_REGISTER, + Payload: payload, + } + putStateMsg := &pb.ChaincodeMessage{ + Type: pb.ChaincodeMessage_PUT_STATE, + } + + ca, _ := NewCA() + srv := newCCServer(t, 7052, "example02", true, ca.CertBytes()) + auth := NewAuthenticator(srv, ca) + pb.RegisterChaincodeSupportServer(srv.grpcSrv, auth) + go srv.grpcSrv.Serve(srv.l) + defer srv.stop() + + // Create an attacker without a TLS certificate + _, err = newClient(t, 7052, nil) + assert.Error(t, err) + assert.Contains(t, err.Error(), "tls: bad certificate") + + // Create an attacker with its own TLS certificate + maliciousCA, _ := NewCA() + keyPair, err := maliciousCA.newCertKeyPair() + cert, err := tls.X509KeyPair(keyPair.certBytes, keyPair.keyBytes) + assert.NoError(t, err) + _, err = newClient(t, 7052, &cert) + assert.Error(t, err) + assert.Contains(t, err.Error(), "tls: bad certificate") + + // Create a chaincode for example01 that tries to impersonate example02 + kp, err := auth.Generate("example01") + assert.NoError(t, err) + keyBytes, err := base64.StdEncoding.DecodeString(kp.Key) + assert.NoError(t, err) + certBytes, err := base64.StdEncoding.DecodeString(kp.Cert) + assert.NoError(t, err) + cert, err = tls.X509KeyPair(certBytes, keyBytes) + assert.NoError(t, err) + mismatchedShim, err := newClient(t, 7052, &cert) + assert.NoError(t, err) + defer mismatchedShim.close() + mismatchedShim.sendMsg(registerMsg) + mismatchedShim.sendMsg(putStateMsg) + // Mismatched chaincode didn't get back anything + assert.Nil(t, mismatchedShim.recv()) + logAsserter.assertLastLogContains(t, "with given certificate hash", "belongs to a different chaincode") + + // Create the real chaincode that its cert is generated by us that should pass the security checks + kp, err = auth.Generate("example02") + assert.NoError(t, err) + keyBytes, err = base64.StdEncoding.DecodeString(kp.Key) + assert.NoError(t, err) + certBytes, err = base64.StdEncoding.DecodeString(kp.Cert) + assert.NoError(t, err) + cert, err = tls.X509KeyPair(certBytes, keyBytes) + assert.NoError(t, err) + realCC, err := newClient(t, 7052, &cert) + assert.NoError(t, err) + defer realCC.close() + realCC.sendMsg(registerMsg) + realCC.sendMsg(putStateMsg) + echoMsg := realCC.recv() + // The real chaincode should be echoed back its message + assert.NotNil(t, echoMsg) + assert.Equal(t, pb.ChaincodeMessage_PUT_STATE, echoMsg.Type) + // Log should not complain about anything + assert.Empty(t, logAsserter.logEntries) + + // Create the real chaincode that its cert is generated by us + // but one that the first message sent by it isn't a register message. + // The second message that is sent is a register message but it's "too late" + // and the stream is already denied. + kp, err = auth.Generate("example02") + assert.NoError(t, err) + keyBytes, err = base64.StdEncoding.DecodeString(kp.Key) + assert.NoError(t, err) + certBytes, err = base64.StdEncoding.DecodeString(kp.Cert) + assert.NoError(t, err) + cert, err = tls.X509KeyPair(certBytes, keyBytes) + assert.NoError(t, err) + confusedCC, err := newClient(t, 7052, &cert) + assert.NoError(t, err) + defer confusedCC.close() + confusedCC.sendMsg(putStateMsg) + confusedCC.sendMsg(registerMsg) + confusedCC.sendMsg(putStateMsg) + assert.Nil(t, confusedCC.recv()) + logAsserter.assertLastLogContains(t, "expected a ChaincodeMessage_REGISTER message") + + // Create a real chaincode, that its cert was generated by us + // but it sends a malformed first message + kp, err = auth.Generate("example02") + assert.NoError(t, err) + keyBytes, err = base64.StdEncoding.DecodeString(kp.Key) + assert.NoError(t, err) + certBytes, err = base64.StdEncoding.DecodeString(kp.Cert) + assert.NoError(t, err) + cert, err = tls.X509KeyPair(certBytes, keyBytes) + assert.NoError(t, err) + malformedMessageCC, err := newClient(t, 7052, &cert) + assert.NoError(t, err) + defer malformedMessageCC.close() + // Save old payload + originalPayload := registerMsg.Payload + registerMsg.Payload = append(registerMsg.Payload, 0) + malformedMessageCC.sendMsg(registerMsg) + malformedMessageCC.sendMsg(putStateMsg) + assert.Nil(t, malformedMessageCC.recv()) + logAsserter.assertLastLogContains(t, "Failed unmarshaling message") + // Recover old payload + registerMsg.Payload = originalPayload + + // Create a real chaincode, that its cert was generated by us + // but have it reconnect only after too much time. + // This tests a use case where the CC's cert has been expired + // and the CC has been compromized. We don't want it to be able + // to reconnect to us. + kp, err = auth.Generate("example02") + assert.NoError(t, err) + keyBytes, err = base64.StdEncoding.DecodeString(kp.Key) + assert.NoError(t, err) + certBytes, err = base64.StdEncoding.DecodeString(kp.Cert) + assert.NoError(t, err) + cert, err = tls.X509KeyPair(certBytes, keyBytes) + assert.NoError(t, err) + lateCC, err := newClient(t, 7052, &cert) + assert.NoError(t, err) + defer realCC.close() + time.Sleep(ttl + time.Second*2) + lateCC.sendMsg(registerMsg) + lateCC.sendMsg(putStateMsg) + echoMsg = lateCC.recv() + assert.Nil(t, echoMsg) + logAsserter.assertLastLogContains(t, "with given certificate hash", "not found in registry") +} + +func TestAccessControlNoTLS(t *testing.T) { + chaincodeID := &pb.ChaincodeID{Name: "example02"} + payload, err := proto.Marshal(chaincodeID) + registerMsg := &pb.ChaincodeMessage{ + Type: pb.ChaincodeMessage_REGISTER, + Payload: payload, + } + putStateMsg := &pb.ChaincodeMessage{ + Type: pb.ChaincodeMessage_PUT_STATE, + } + + ca, _ := NewCA() + s := newCCServer(t, 8052, "example02", false, ca.CertBytes()) + auth := NewAuthenticator(s, ca) + pb.RegisterChaincodeSupportServer(s.grpcSrv, auth) + go s.grpcSrv.Serve(s.l) + defer s.stop() + conn, err := grpc.Dial(fmt.Sprintf("localhost:%d", 8052), grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(time.Second)) + assert.NoError(t, err) + chaincodeSupportClient := pb.NewChaincodeSupportClient(conn) + stream, err := chaincodeSupportClient.Register(context.Background()) + stream.Send(registerMsg) + stream.Send(putStateMsg) + // Should fail because we haven't disabled security yet + echoMsg, err := stream.Recv() + assert.Error(t, err) + assert.Contains(t, err.Error(), "TLS is active but chaincode") + assert.Nil(t, echoMsg) + conn.Close() + + auth.DisableAccessCheck() + // Now it should work + conn, err = grpc.Dial(fmt.Sprintf("localhost:%d", 8052), grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(time.Second)) + assert.NoError(t, err) + defer conn.Close() + chaincodeSupportClient = pb.NewChaincodeSupportClient(conn) + stream, err = chaincodeSupportClient.Register(context.Background()) + stream.Send(registerMsg) + stream.Send(putStateMsg) + echoMsg, err = stream.Recv() + assert.NotNil(t, echoMsg) + assert.NoError(t, err) +} + +type logBackend struct { + logEntries chan string +} + +func (l *logBackend) assertLastLogContains(t *testing.T, ss ...string) { + lastLogMsg := <-l.logEntries + for _, s := range ss { + assert.Contains(t, lastLogMsg, s) + } +} + +func (l *logBackend) Log(lvl logging.Level, n int, r *logging.Record) error { + if lvl.String() != logging.WARNING.String() { + return nil + } + l.logEntries <- fmt.Sprint(r.Args) + return nil +} + +func (*logBackend) GetLevel(string) logging.Level { + return logging.DEBUG +} + +func (*logBackend) SetLevel(logging.Level, string) { + panic("implement me") +} + +func (*logBackend) IsEnabledFor(logging.Level, string) bool { + return true +} diff --git a/core/chaincode/accesscontrol/ca_test.go b/core/chaincode/accesscontrol/ca_test.go index 75af681f147..f768a48d924 100644 --- a/core/chaincode/accesscontrol/ca_test.go +++ b/core/chaincode/accesscontrol/ca_test.go @@ -8,6 +8,7 @@ package accesscontrol import ( "crypto/tls" + "crypto/x509" "encoding/base64" "fmt" "math/rand" @@ -15,8 +16,6 @@ import ( "testing" "time" - "crypto/x509" - "github.com/stretchr/testify/assert" "google.golang.org/grpc" "google.golang.org/grpc/credentials" diff --git a/core/chaincode/accesscontrol/interceptor.go b/core/chaincode/accesscontrol/interceptor.go new file mode 100644 index 00000000000..fdda95631f6 --- /dev/null +++ b/core/chaincode/accesscontrol/interceptor.go @@ -0,0 +1,79 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package accesscontrol + +import ( + "fmt" + + pb "github.com/hyperledger/fabric/protos/peer" + "google.golang.org/grpc" +) + +type interceptor struct { + next pb.ChaincodeSupportServer + auth authorization +} + +// ChaincodeStream defines a gRPC stream for sending +// and receiving chaincode messages +type ChaincodeStream interface { + // Send sends a chaincode message + Send(*pb.ChaincodeMessage) error + // Recv receives a chaincode message + Recv() (*pb.ChaincodeMessage, error) +} + +type authorization func(message *pb.ChaincodeMessage, stream grpc.ServerStream) error + +func newInterceptor(srv pb.ChaincodeSupportServer, auth authorization) pb.ChaincodeSupportServer { + return &interceptor{ + next: srv, + auth: auth, + } +} + +// Register makes the interceptor implement ChaincodeSupportServer +func (i *interceptor) Register(stream pb.ChaincodeSupport_RegisterServer) error { + is := &interceptedStream{ + incMessages: make(chan *pb.ChaincodeMessage, 1), + stream: stream, + ServerStream: stream, + auth: i.auth, + } + msg, err := stream.Recv() + if err != nil { + return fmt.Errorf("Recv() error: %v, closing connection", err) + } + err = is.auth(msg, is.ServerStream) + if err != nil { + return err + } + is.incMessages <- msg + close(is.incMessages) + return i.next.Register(is) +} + +type interceptedStream struct { + incMessages chan *pb.ChaincodeMessage + stream ChaincodeStream + grpc.ServerStream + auth authorization +} + +// Send sends a chaincode message +func (is *interceptedStream) Send(msg *pb.ChaincodeMessage) error { + return is.stream.Send(msg) +} + +// Recv receives a chaincode message +func (is *interceptedStream) Recv() (*pb.ChaincodeMessage, error) { + msg, ok := <-is.incMessages + if !ok { + return is.stream.Recv() + } + return msg, nil +}