Skip to content

Commit

Permalink
fab-1475 make CC fmk allow concurrent invokes
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-1475

Summary
=======
With pre-consensus simulation, multiple chains and relaxation by the ledger
to simulate versions of chaincode state concurrently, we can now allow
chaincode framework to execute invokes concurrently. This CR enables this.

This CR enables concurrency basically by removing the FSM states that
enforced serialization (so basically all the FSM changes in chaincode/hander.go
and chaincode/shim/handler.go).

The CR also has a "Chaincode Checker" program which has the potential for
much bigger things
   . the tooling test their chaincodes for consistency
   . the tooling for stressing the fabric

The concurrency enablement was tested with the "ccchecker".

Details
=======
The submit will basically have 4 things
  . changes to 3 chaincode framework files handler.go files
    to enable concurrency
  . concurrency_test.go to run 100 concurrent invokes followed
    by 100 concurrent queries
  . a complete "ccchecker" example framework for testing and validating
    chaincodes
  . exports some functions under fabric/peer/chaincode CLI for use by
    the above ccchecker example framework

"ccchecker" comes with a sample "newkeyperinvoke" chaincode that should
NEVER fail ledger consistency checks. To test simply follow these steps
  . vagrant window 1 - start orderer
    ./orderer

  . vagrant window 2 - start peer
    peer node start

  . vagrant window 3 - bring up chaincode for test
    cd peer

    //deploy the chaincode used by ccchecker out of the box
    peer chaincode deploy -n mycc -p github.com/hyperledger/fabric/examples/ccchecker/chaincodes/newkeyperinvoke -c '{"Args":[""]}'
    //wait for commit say for about 10 secs and then issue a query to bring the CC up
    peer chaincode query -n mycc -c '{"Args":["get","a"]}'

    //verify the chaincode is up
    docker ps

  . vagrant window 4 - run test
    cd examples/ccchecker
    go build
    ./ccchecker

The above reads from ccchecker.json and executes tests concurrently.

Change-Id: I5267b19f03ed10003eb28facf87693525f0dcd1a
Signed-off-by: Srinivasan Muralidharan <[email protected]>
  • Loading branch information
Srinivasan Muralidharan committed Dec 29, 2016
1 parent defb65b commit 5bdca86
Show file tree
Hide file tree
Showing 19 changed files with 1,345 additions and 291 deletions.
23 changes: 11 additions & 12 deletions core/chaincode/chaincode_support.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,30 +397,29 @@ func (chaincodeSupport *ChaincodeSupport) getArgsAndEnv(cccid *CCContext, cLang
}

// launchAndWaitForRegister will launch container if not already running. Use the targz to create the image if not found
func (chaincodeSupport *ChaincodeSupport) launchAndWaitForRegister(ctxt context.Context, cccid *CCContext, cds *pb.ChaincodeDeploymentSpec, cLang pb.ChaincodeSpec_Type, targz io.Reader) (bool, error) {
func (chaincodeSupport *ChaincodeSupport) launchAndWaitForRegister(ctxt context.Context, cccid *CCContext, cds *pb.ChaincodeDeploymentSpec, cLang pb.ChaincodeSpec_Type, targz io.Reader) error {
canName := cccid.GetCanonicalName()
if canName == "" {
return false, fmt.Errorf("chaincode name not set")
return fmt.Errorf("chaincode name not set")
}

chaincodeSupport.runningChaincodes.Lock()
var ok bool
//if its in the map, there must be a connected stream...nothing to do
if _, ok = chaincodeSupport.chaincodeHasBeenLaunched(canName); ok {
chaincodeLogger.Debugf("chaincode is running and ready: %s", canName)
//if its in the map, its either up or being launched. Either case break the
//multiple launch by failing
if _, hasBeenLaunched := chaincodeSupport.chaincodeHasBeenLaunched(canName); hasBeenLaunched {
chaincodeSupport.runningChaincodes.Unlock()
return true, nil
return fmt.Errorf("Error chaincode is being launched: %s", canName)
}
alreadyRunning := false

//chaincodeHasBeenLaunch false... its not in the map, add it and proceed to launch
notfy := chaincodeSupport.preLaunchSetup(canName)
chaincodeSupport.runningChaincodes.Unlock()

//launch the chaincode

args, env, err := chaincodeSupport.getArgsAndEnv(cccid, cLang)
if err != nil {
return alreadyRunning, err
return err
}

chaincodeLogger.Debugf("start container: %s(networkid:%s,peerid:%s)", canName, chaincodeSupport.peerNetworkID, chaincodeSupport.peerID)
Expand All @@ -440,7 +439,7 @@ func (chaincodeSupport *ChaincodeSupport) launchAndWaitForRegister(ctxt context.
chaincodeSupport.runningChaincodes.Lock()
delete(chaincodeSupport.runningChaincodes.chaincodeMap, canName)
chaincodeSupport.runningChaincodes.Unlock()
return alreadyRunning, err
return err
}

//wait for REGISTER state
Expand All @@ -459,7 +458,7 @@ func (chaincodeSupport *ChaincodeSupport) launchAndWaitForRegister(ctxt context.
chaincodeLogger.Debugf("error on stop %s(%s)", errIgnore, err)
}
}
return alreadyRunning, err
return err
}

//Stop stops a chaincode if running
Expand Down Expand Up @@ -577,7 +576,7 @@ func (chaincodeSupport *ChaincodeSupport) Launch(context context.Context, cccid
//launch container if it is a System container or not in dev mode
if (!chaincodeSupport.userRunsCC || cds.ExecEnv == pb.ChaincodeDeploymentSpec_SYSTEM) && (chrte == nil || chrte.handler == nil) {
var targz io.Reader = bytes.NewBuffer(cds.CodePackage)
_, err = chaincodeSupport.launchAndWaitForRegister(context, cccid, cds, cLang, targz)
err = chaincodeSupport.launchAndWaitForRegister(context, cccid, cds, cLang, targz)
if err != nil {
chaincodeLogger.Errorf("launchAndWaitForRegister failed %s", err)
return cID, cMsg, err
Expand Down
141 changes: 141 additions & 0 deletions core/chaincode/concurrency_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package chaincode

import (
"fmt"
"sync"
"testing"

"github.com/hyperledger/fabric/core/util"
pb "github.com/hyperledger/fabric/protos/peer"

"golang.org/x/net/context"
)

//TestExecuteConcurrentInvokes deploys newkeyperinvoke and runs 100 concurrent invokes
//followed by concurrent 100 queries to validate
func TestExecuteConcurrentInvokes(t *testing.T) {
chainID := util.GetTestChainID()

lis, err := initPeer(chainID)
if err != nil {
t.Fail()
t.Logf("Error creating peer: %s", err)
}

defer finitPeer(lis, chainID)

var ctxt = context.Background()

url := "github.com/hyperledger/fabric/examples/ccchecker/chaincodes/newkeyperinvoke"

chaincodeID := &pb.ChaincodeID{Name: "nkpi", Path: url}

args := util.ToChaincodeArgs("init", "")

spec := &pb.ChaincodeSpec{Type: 1, ChaincodeID: chaincodeID, CtorMsg: &pb.ChaincodeInput{Args: args}}

cccid := NewCCContext(chainID, "nkpi", "0", "", false, nil)

defer theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})

_, err = deploy(ctxt, cccid, spec)
if err != nil {
t.Fail()
t.Logf("Error initializing chaincode %s(%s)", chaincodeID, err)
return
}

var wg sync.WaitGroup

//run 100 invokes in parallel
numTrans := 100

results := make([][]byte, numTrans)
errs := make([]error, numTrans)

e := func(inv bool, qnum int) {
defer wg.Done()

newkey := fmt.Sprintf("%d", qnum)

var args [][]byte
if inv {
args = util.ToChaincodeArgs("put", newkey, newkey)
} else {
args = util.ToChaincodeArgs("get", newkey)
}

spec = &pb.ChaincodeSpec{Type: 1, ChaincodeID: chaincodeID, CtorMsg: &pb.ChaincodeInput{Args: args}}

//start with a new background
_, _, results[qnum], err = invoke(context.Background(), chainID, spec)

if err != nil {
errs[qnum] = fmt.Errorf("Error executing <%s>: %s", chaincodeID.Name, err)
return
}
}

wg.Add(numTrans)

//execute transactions concurrently.
for i := 0; i < numTrans; i++ {
go e(true, i)
}

wg.Wait()

for i := 0; i < numTrans; i++ {
if errs[i] != nil {
t.Fail()
t.Logf("Error invoking chaincode iter %d %s(%s)", i, chaincodeID.Name, errs[i])
}
if results[i] == nil || string(results[i]) != "OK" {
t.Fail()
t.Logf("Error concurrent invoke %d %s", i, chaincodeID.Name)
return
}
}

wg.Add(numTrans)

//execute queries concurrently.
for i := 0; i < numTrans; i++ {
go e(false, i)
}

wg.Wait()

for i := 0; i < numTrans; i++ {
if errs[i] != nil {
t.Fail()
t.Logf("Error querying chaincode iter %d %s(%s)", i, chaincodeID.Name, errs[i])
return
}
if results[i] == nil || string(results[i]) != fmt.Sprintf("%d", i) {
t.Fail()
if results[i] == nil {
t.Logf("Error concurrent query %d(%s)", i, chaincodeID.Name)
} else {
t.Logf("Error concurrent query %d(%s, %s, %v)", i, chaincodeID.Name, string(results[i]), results[i])
}
return
}
}
}
46 changes: 14 additions & 32 deletions core/chaincode/exectransaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,16 @@ func endTxSimulationCIS(chainID string, txid string, txsim ledger.TxSimulator, p
return endTxSimulation(chainID, txsim, payload, commit, prop)
}

//getting a crash from ledger.Commit when doing concurrent invokes
//It is likely intentional that ledger.Commit is serial (ie, the real
//Committer will invoke this serially on each block). Mimic that here
//by forcing serialization of the ledger.Commit call.
//
//NOTE-this should NOT have any effect on the older serial tests.
//This affects only the tests in concurrent_test.go which call these
//concurrently (100 concurrent invokes followed by 100 concurrent queries)
var _commitLock_ sync.Mutex

func endTxSimulation(chainID string, txsim ledger.TxSimulator, payload []byte, commit bool, prop *pb.Proposal) error {
txsim.Done()
if lgr := peer.GetLedger(chainID); lgr != nil {
Expand Down Expand Up @@ -194,6 +204,10 @@ func endTxSimulation(chainID string, txsim ledger.TxSimulator, payload []byte, c
block := common.NewBlock(1, []byte{})
block.Data.Data = [][]byte{envBytes}
//commit the block

//see comment on _commitLock_
_commitLock_.Lock()
defer _commitLock_.Unlock()
if err := lgr.Commit(block); err != nil {
return err
}
Expand Down Expand Up @@ -601,38 +615,6 @@ func TestExecuteInvokeTransaction(t *testing.T) {
theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: &pb.ChaincodeSpec{ChaincodeID: chaincodeID}})
}

// Execute multiple transactions and queries.
func exec(ctxt context.Context, chainID string, chaincodeID string, numTrans int, numQueries int) []error {
var wg sync.WaitGroup
errs := make([]error, numTrans+numQueries)

e := func(qnum int) {
defer wg.Done()
var spec *pb.ChaincodeSpec
args := util.ToChaincodeArgs("invoke", "a", "b", "10")

spec = &pb.ChaincodeSpec{Type: 1, ChaincodeID: &pb.ChaincodeID{Name: chaincodeID}, CtorMsg: &pb.ChaincodeInput{Args: args}}

_, _, _, err := invoke(ctxt, chainID, spec)

if err != nil {
errs[qnum] = fmt.Errorf("Error executing <%s>: %s", chaincodeID, err)
return
}
}
wg.Add(numTrans + numQueries)

//execute transactions sequentially..
go func() {
for i := 0; i < numTrans; i++ {
e(i)
}
}()

wg.Wait()
return errs
}

// Test the execution of an invalid transaction.
func TestExecuteInvokeInvalidTransaction(t *testing.T) {
chainID := util.GetTestChainID()
Expand Down
Loading

0 comments on commit 5bdca86

Please sign in to comment.