Skip to content

Commit

Permalink
[FAB-3199] Committer to call CSCC on config update
Browse files Browse the repository at this point in the history
Once new configuration block reaches the committer it need to update
current configuration cache of the peer cache such that get current
configuration block of CSCC could provide correct information as it
reads from that cache.

Change-Id: Ifc931097aa8db45d12f8a047564e0e1534bc159c
Signed-off-by: Artem Barger <[email protected]>
  • Loading branch information
C0rWin committed Jun 1, 2017
1 parent a01b2f9 commit a9b3a61
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 103 deletions.
27 changes: 26 additions & 1 deletion core/committer/committer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ limitations under the License.
package committer

import (
"fmt"

"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/committer/txvalidator"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/events/producer"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"
"github.com/op/go-logging"
)

Expand All @@ -42,22 +45,44 @@ func init() {
type LedgerCommitter struct {
ledger ledger.PeerLedger
validator txvalidator.Validator
eventer ConfigBlockEventer
}

// ConfigBlockEventer callback function proto type to define action
// upon arrival on new configuaration update block
type ConfigBlockEventer func(block *common.Block) error

// NewLedgerCommitter is a factory function to create an instance of the committer
// which passes incoming blocks via validation and commits them into the ledger.
func NewLedgerCommitter(ledger ledger.PeerLedger, validator txvalidator.Validator) *LedgerCommitter {
return &LedgerCommitter{ledger: ledger, validator: validator}
return NewLedgerCommitterReactive(ledger, validator, func(_ *common.Block) error { return nil })
}

// NewLedgerCommitterReactive is a factory function to create an instance of the committer
// same as way as NewLedgerCommitter, while also provides an option to specify callback to
// be called upon new configuration block arrival and commit event
func NewLedgerCommitterReactive(ledger ledger.PeerLedger, validator txvalidator.Validator, eventer ConfigBlockEventer) *LedgerCommitter {
return &LedgerCommitter{ledger: ledger, validator: validator, eventer: eventer}
}

// Commit commits block to into the ledger
// Note, it is important that this always be called serially
func (lc *LedgerCommitter) Commit(block *common.Block) error {

// Validate and mark invalid transactions
logger.Debug("Validating block")
if err := lc.validator.Validate(block); err != nil {
return err
}

// Updating CSCC with new configuration block
if utils.IsConfigBlock(block) {
logger.Debug("Received configuration update, calling CSCC ConfigUpdate")
if err := lc.eventer(block); err != nil {
return fmt.Errorf("Could not update CSCC with new configuration update due to %s", err)
}
}

if err := lc.ledger.Commit(block); err != nil {
return err
}
Expand Down
37 changes: 34 additions & 3 deletions core/committer/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@ limitations under the License.
package committer

import (
"sync/atomic"
"testing"

"github.com/hyperledger/fabric/common/configtx/test"
"github.com/hyperledger/fabric/common/configtx/tool/localconfig"
"github.com/hyperledger/fabric/common/configtx/tool/provisional"
"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"

"github.com/hyperledger/fabric/core/ledger/ledgermgmt"
"github.com/hyperledger/fabric/core/mocks/validator"
"github.com/hyperledger/fabric/protos/common"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
)

func TestKVLedgerBlockStorage(t *testing.T) {
Expand Down Expand Up @@ -73,3 +75,32 @@ func TestKVLedgerBlockStorage(t *testing.T) {
testutil.AssertEquals(t, bcInfo, &common.BlockchainInfo{
Height: 2, CurrentBlockHash: block1Hash, PreviousBlockHash: gbHash})
}

func TestNewLedgerCommitterReactive(t *testing.T) {
viper.Set("peer.fileSystemPath", "/tmp/fabric/committertest")
chainID := "TestLedger"

ledgermgmt.InitializeTestEnv()
defer ledgermgmt.CleanupTestEnv()
gb, _ := test.MakeGenesisBlock(chainID)

ledger, err := ledgermgmt.CreateLedger(gb)
assert.NoError(t, err, "Error while creating ledger: %s", err)
defer ledger.Close()

var configArrived int32
committer := NewLedgerCommitterReactive(ledger, &validator.MockValidator{}, func(_ *common.Block) error {
atomic.AddInt32(&configArrived, 1)
return nil
})

height, err := committer.LedgerHeight()
assert.Equal(t, uint64(1), height)
assert.NoError(t, err)

profile := localconfig.Load(localconfig.SampleSingleMSPSoloProfile)
block := provisional.New(profile).GenesisBlockForChannel(chainID)

committer.Commit(block)
assert.Equal(t, int32(1), atomic.LoadInt32(&configArrived))
}
15 changes: 9 additions & 6 deletions core/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,17 @@ func createChain(cid string, ledger ledger.PeerLedger, cb *common.Block) error {
ledger: ledger,
}

c := committer.NewLedgerCommitter(ledger, txvalidator.NewTxValidator(cs))
c := committer.NewLedgerCommitterReactive(ledger, txvalidator.NewTxValidator(cs), func(block *common.Block) error {
chainID, err := utils.GetChainIDFromBlock(block)
if err != nil {
return err
}
return SetCurrConfigBlock(block, chainID)
})

ordererAddresses := configtxManager.ChannelConfig().OrdererAddresses()
if len(ordererAddresses) == 0 {
return errors.New("No orderering service endpoint provided in configuration block")
return errors.New("No ordering service endpoint provided in configuration block")
}
service.GetGossipService().InitializeChannel(cs.ChainID(), c, ordererAddresses)

Expand Down Expand Up @@ -470,10 +477,6 @@ func SetCurrConfigBlock(block *common.Block, cid string) error {
defer chains.Unlock()
if c, ok := chains.list[cid]; ok {
c.cb = block
// TODO: Change MSP config
// c.mspmgr.Reconfig(block)

// TODO: Change gossip configs
return nil
}
return fmt.Errorf("Chain %s doesn't exist on the peer", cid)
Expand Down
32 changes: 3 additions & 29 deletions core/scc/cscc/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,9 @@ var cnflogger = flogging.MustGetLogger("cscc")

// These are function names from Invoke first parameter
const (
JoinChain string = "JoinChain"
UpdateConfigBlock string = "UpdateConfigBlock"
GetConfigBlock string = "GetConfigBlock"
GetChannels string = "GetChannels"
JoinChain string = "JoinChain"
GetConfigBlock string = "GetConfigBlock"
GetChannels string = "GetChannels"
)

// Init is called once per chain when the chain is created.
Expand Down Expand Up @@ -124,13 +123,6 @@ func (e *PeerConfiger) Invoke(stub shim.ChaincodeStubInterface) pb.Response {
return shim.Error(fmt.Sprintf("\"GetConfigBlock\" request failed authorization check for channel [%s]: [%s]", args[1], err))
}
return getConfigBlock(args[1])
case UpdateConfigBlock:
// TODO: It needs to be clarified if this is a function invoked by a proposal or not.
// The issue is the following: ChannelApplicationAdmins might require multiple signatures
// but currently a proposal can be signed by a signle entity only. Therefore, the ChannelApplicationAdmins policy
// will be never satisfied.

return updateConfigBlock(args[1])
case GetChannels:
// 2. check local MSP Members policy
if err = e.policyChecker.CheckPolicyNoChannel(mgmt.Members, sp); err != nil {
Expand Down Expand Up @@ -170,24 +162,6 @@ func joinChain(blockBytes []byte) pb.Response {
return shim.Success(nil)
}

func updateConfigBlock(blockBytes []byte) pb.Response {
block, err := extractBlock(blockBytes)
if err != nil {
return shim.Error(fmt.Sprintf("Failed to reconstruct the configuration block, %s", err))
}
chainID, err := utils.GetChainIDFromBlock(block)
if err != nil {
return shim.Error(fmt.Sprintf("Failed to get the chain ID from the configuration block, %s", err))
}

if err := peer.SetCurrConfigBlock(block, chainID); err != nil {

return shim.Error(err.Error())
}

return shim.Success(nil)
}

func extractBlock(bytes []byte) (*common.Block, error) {
if bytes == nil {
return nil, errors.New("Genesis block must not be nil.")
Expand Down
59 changes: 0 additions & 59 deletions core/scc/cscc/configure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,65 +259,6 @@ func TestConfigerInvokeJoinChainCorrectParams(t *testing.T) {
}
}

func TestConfigerInvokeUpdateConfigBlock(t *testing.T) {
e := new(PeerConfiger)
stub := shim.NewMockStub("PeerConfiger", e)

// Init the policy checker
policyManagerGetter := &policymocks.MockChannelPolicyManagerGetter{
Managers: map[string]policies.Manager{
"mytestchainid": &policymocks.MockChannelPolicyManager{MockPolicy: &policymocks.MockPolicy{Deserializer: &policymocks.MockIdentityDeserializer{[]byte("Alice"), []byte("msg1")}}},
},
}

identityDeserializer := &policymocks.MockIdentityDeserializer{[]byte("Alice"), []byte("msg1")}

e.policyChecker = policy.NewPolicyChecker(
policyManagerGetter,
identityDeserializer,
&policymocks.MockMSPPrincipalGetter{Principal: []byte("Alice")},
)

sProp, _ := utils.MockSignedEndorserProposalOrPanic("", &pb.ChaincodeSpec{}, []byte("Alice"), []byte("msg1"))
identityDeserializer.Msg = sProp.ProposalBytes
sProp.Signature = sProp.ProposalBytes
policyManagerGetter.Managers["mytestchainid"].(*policymocks.MockChannelPolicyManager).MockPolicy.(*policymocks.MockPolicy).Deserializer.(*policymocks.MockIdentityDeserializer).Msg = sProp.ProposalBytes

// Failed path: Not enough parameters
args := [][]byte{[]byte("UpdateConfigBlock")}
if res := stub.MockInvokeWithSignedProposal("2", args, sProp); res.Status == shim.OK {
t.Fatalf("cscc invoke UpdateConfigBlock should have failed with invalid number of args: %v", args)
}

// Failed path: wrong parameter type
args = [][]byte{[]byte("UpdateConfigBlock"), []byte("action")}
if res := stub.MockInvokeWithSignedProposal("2", args, sProp); res.Status == shim.OK {
t.Fatalf("cscc invoke UpdateConfigBlock should have failed with null genesis block - args: %v", args)
}

// Successful path for UpdateConfigBlock
blockBytes := mockConfigBlock()
if blockBytes == nil {
t.Fatalf("cscc invoke UpdateConfigBlock failed because invalid block")
}
args = [][]byte{[]byte("UpdateConfigBlock"), blockBytes}
if res := stub.MockInvokeWithSignedProposal("2", args, sProp); res.Status != shim.OK {
t.Fatalf("cscc invoke UpdateConfigBlock failed with: %v", res.Message)
}

// Query the configuration block
//chainID := []byte{143, 222, 22, 192, 73, 145, 76, 110, 167, 154, 118, 66, 132, 204, 113, 168}
chainID, err := utils.GetChainIDFromBlockBytes(blockBytes)
if err != nil {
t.Fatalf("cscc invoke UpdateConfigBlock failed with: %v", err)
}
args = [][]byte{[]byte("GetConfigBlock"), []byte(chainID)}
if res := stub.MockInvokeWithSignedProposal("2", args, sProp); res.Status != shim.OK {
t.Fatalf("cscc invoke GetConfigBlock failed with: %v", err)
}

}

func mockConfigBlock() []byte {
var blockBytes []byte = nil
block, err := configtxtest.MakeGenesisBlock("mytestchainid")
Expand Down
29 changes: 24 additions & 5 deletions protos/utils/commonutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,15 @@ limitations under the License.
package utils

import (
"errors"
"fmt"
"time"

cb "github.com/hyperledger/fabric/protos/common"
pb "github.com/hyperledger/fabric/protos/peer"

"errors"

"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/hyperledger/fabric/common/crypto"
cb "github.com/hyperledger/fabric/protos/common"
pb "github.com/hyperledger/fabric/protos/peer"
)

// MarshalOrPanic serializes a protobuf message and panics if this operation fails.
Expand Down Expand Up @@ -259,3 +257,24 @@ func UnmarshalChaincodeID(bytes []byte) (*pb.ChaincodeID, error) {

return ccid, nil
}

// IsConfigBlock validates whenever given block contains configuration
// update transaction
func IsConfigBlock(block *cb.Block) bool {
envelope, err := ExtractEnvelope(block, 0)
if err != nil {
return false
}

payload, err := GetPayload(envelope)
if err != nil {
return false
}

hdr, err := UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return false
}

return cb.HeaderType(hdr.Type) == cb.HeaderType_CONFIG
}

0 comments on commit a9b3a61

Please sign in to comment.