diff --git a/core/chaincode/chaincode_support.go b/core/chaincode/chaincode_support.go index a34865a1590..21ebfd7ade2 100644 --- a/core/chaincode/chaincode_support.go +++ b/core/chaincode/chaincode_support.go @@ -397,22 +397,21 @@ func (chaincodeSupport *ChaincodeSupport) getArgsAndEnv(cccid *CCContext, cLang } // launchAndWaitForRegister will launch container if not already running. Use the targz to create the image if not found -func (chaincodeSupport *ChaincodeSupport) launchAndWaitForRegister(ctxt context.Context, cccid *CCContext, cds *pb.ChaincodeDeploymentSpec, cLang pb.ChaincodeSpec_Type, targz io.Reader) (bool, error) { +func (chaincodeSupport *ChaincodeSupport) launchAndWaitForRegister(ctxt context.Context, cccid *CCContext, cds *pb.ChaincodeDeploymentSpec, cLang pb.ChaincodeSpec_Type, targz io.Reader) error { canName := cccid.GetCanonicalName() if canName == "" { - return false, fmt.Errorf("chaincode name not set") + return fmt.Errorf("chaincode name not set") } chaincodeSupport.runningChaincodes.Lock() - var ok bool - //if its in the map, there must be a connected stream...nothing to do - if _, ok = chaincodeSupport.chaincodeHasBeenLaunched(canName); ok { - chaincodeLogger.Debugf("chaincode is running and ready: %s", canName) + //if its in the map, its either up or being launched. Either case break the + //multiple launch by failing + if _, hasBeenLaunched := chaincodeSupport.chaincodeHasBeenLaunched(canName); hasBeenLaunched { chaincodeSupport.runningChaincodes.Unlock() - return true, nil + return fmt.Errorf("Error chaincode is being launched: %s", canName) } - alreadyRunning := false + //chaincodeHasBeenLaunch false... its not in the map, add it and proceed to launch notfy := chaincodeSupport.preLaunchSetup(canName) chaincodeSupport.runningChaincodes.Unlock() @@ -420,7 +419,7 @@ func (chaincodeSupport *ChaincodeSupport) launchAndWaitForRegister(ctxt context. args, env, err := chaincodeSupport.getArgsAndEnv(cccid, cLang) if err != nil { - return alreadyRunning, err + return err } chaincodeLogger.Debugf("start container: %s(networkid:%s,peerid:%s)", canName, chaincodeSupport.peerNetworkID, chaincodeSupport.peerID) @@ -440,7 +439,7 @@ func (chaincodeSupport *ChaincodeSupport) launchAndWaitForRegister(ctxt context. chaincodeSupport.runningChaincodes.Lock() delete(chaincodeSupport.runningChaincodes.chaincodeMap, canName) chaincodeSupport.runningChaincodes.Unlock() - return alreadyRunning, err + return err } //wait for REGISTER state @@ -459,7 +458,7 @@ func (chaincodeSupport *ChaincodeSupport) launchAndWaitForRegister(ctxt context. chaincodeLogger.Debugf("error on stop %s(%s)", errIgnore, err) } } - return alreadyRunning, err + return err } //Stop stops a chaincode if running @@ -577,7 +576,7 @@ func (chaincodeSupport *ChaincodeSupport) Launch(context context.Context, cccid //launch container if it is a System container or not in dev mode if (!chaincodeSupport.userRunsCC || cds.ExecEnv == pb.ChaincodeDeploymentSpec_SYSTEM) && (chrte == nil || chrte.handler == nil) { var targz io.Reader = bytes.NewBuffer(cds.CodePackage) - _, err = chaincodeSupport.launchAndWaitForRegister(context, cccid, cds, cLang, targz) + err = chaincodeSupport.launchAndWaitForRegister(context, cccid, cds, cLang, targz) if err != nil { chaincodeLogger.Errorf("launchAndWaitForRegister failed %s", err) return cID, cMsg, err diff --git a/core/chaincode/concurrency_test.go b/core/chaincode/concurrency_test.go new file mode 100644 index 00000000000..371641e959c --- /dev/null +++ b/core/chaincode/concurrency_test.go @@ -0,0 +1,141 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package chaincode + +import ( + "fmt" + "sync" + "testing" + + "github.com/hyperledger/fabric/core/util" + pb "github.com/hyperledger/fabric/protos/peer" + + "golang.org/x/net/context" +) + +//TestExecuteConcurrentInvokes deploys newkeyperinvoke and runs 100 concurrent invokes +//followed by concurrent 100 queries to validate +func TestExecuteConcurrentInvokes(t *testing.T) { + chainID := util.GetTestChainID() + + lis, err := initPeer(chainID) + if err != nil { + t.Fail() + t.Logf("Error creating peer: %s", err) + } + + defer finitPeer(lis, chainID) + + var ctxt = context.Background() + + url := "github.com/hyperledger/fabric/examples/ccchecker/chaincodes/newkeyperinvoke" + + chaincodeID := &pb.ChaincodeID{Name: "nkpi", Path: url} + + args := util.ToChaincodeArgs("init", "") + + spec := &pb.ChaincodeSpec{Type: 1, ChaincodeID: chaincodeID, CtorMsg: &pb.ChaincodeInput{Args: args}} + + cccid := NewCCContext(chainID, "nkpi", "0", "", false, nil) + + defer theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) + + _, err = deploy(ctxt, cccid, spec) + if err != nil { + t.Fail() + t.Logf("Error initializing chaincode %s(%s)", chaincodeID, err) + return + } + + var wg sync.WaitGroup + + //run 100 invokes in parallel + numTrans := 100 + + results := make([][]byte, numTrans) + errs := make([]error, numTrans) + + e := func(inv bool, qnum int) { + defer wg.Done() + + newkey := fmt.Sprintf("%d", qnum) + + var args [][]byte + if inv { + args = util.ToChaincodeArgs("put", newkey, newkey) + } else { + args = util.ToChaincodeArgs("get", newkey) + } + + spec = &pb.ChaincodeSpec{Type: 1, ChaincodeID: chaincodeID, CtorMsg: &pb.ChaincodeInput{Args: args}} + + //start with a new background + _, _, results[qnum], err = invoke(context.Background(), chainID, spec) + + if err != nil { + errs[qnum] = fmt.Errorf("Error executing <%s>: %s", chaincodeID.Name, err) + return + } + } + + wg.Add(numTrans) + + //execute transactions concurrently. + for i := 0; i < numTrans; i++ { + go e(true, i) + } + + wg.Wait() + + for i := 0; i < numTrans; i++ { + if errs[i] != nil { + t.Fail() + t.Logf("Error invoking chaincode iter %d %s(%s)", i, chaincodeID.Name, errs[i]) + } + if results[i] == nil || string(results[i]) != "OK" { + t.Fail() + t.Logf("Error concurrent invoke %d %s", i, chaincodeID.Name) + return + } + } + + wg.Add(numTrans) + + //execute queries concurrently. + for i := 0; i < numTrans; i++ { + go e(false, i) + } + + wg.Wait() + + for i := 0; i < numTrans; i++ { + if errs[i] != nil { + t.Fail() + t.Logf("Error querying chaincode iter %d %s(%s)", i, chaincodeID.Name, errs[i]) + return + } + if results[i] == nil || string(results[i]) != fmt.Sprintf("%d", i) { + t.Fail() + if results[i] == nil { + t.Logf("Error concurrent query %d(%s)", i, chaincodeID.Name) + } else { + t.Logf("Error concurrent query %d(%s, %s, %v)", i, chaincodeID.Name, string(results[i]), results[i]) + } + return + } + } +} diff --git a/core/chaincode/exectransaction_test.go b/core/chaincode/exectransaction_test.go index cac219c35c8..5315e321063 100644 --- a/core/chaincode/exectransaction_test.go +++ b/core/chaincode/exectransaction_test.go @@ -161,6 +161,16 @@ func endTxSimulationCIS(chainID string, txid string, txsim ledger.TxSimulator, p return endTxSimulation(chainID, txsim, payload, commit, prop) } +//getting a crash from ledger.Commit when doing concurrent invokes +//It is likely intentional that ledger.Commit is serial (ie, the real +//Committer will invoke this serially on each block). Mimic that here +//by forcing serialization of the ledger.Commit call. +// +//NOTE-this should NOT have any effect on the older serial tests. +//This affects only the tests in concurrent_test.go which call these +//concurrently (100 concurrent invokes followed by 100 concurrent queries) +var _commitLock_ sync.Mutex + func endTxSimulation(chainID string, txsim ledger.TxSimulator, payload []byte, commit bool, prop *pb.Proposal) error { txsim.Done() if lgr := peer.GetLedger(chainID); lgr != nil { @@ -194,6 +204,10 @@ func endTxSimulation(chainID string, txsim ledger.TxSimulator, payload []byte, c block := common.NewBlock(1, []byte{}) block.Data.Data = [][]byte{envBytes} //commit the block + + //see comment on _commitLock_ + _commitLock_.Lock() + defer _commitLock_.Unlock() if err := lgr.Commit(block); err != nil { return err } @@ -601,38 +615,6 @@ func TestExecuteInvokeTransaction(t *testing.T) { theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: &pb.ChaincodeSpec{ChaincodeID: chaincodeID}}) } -// Execute multiple transactions and queries. -func exec(ctxt context.Context, chainID string, chaincodeID string, numTrans int, numQueries int) []error { - var wg sync.WaitGroup - errs := make([]error, numTrans+numQueries) - - e := func(qnum int) { - defer wg.Done() - var spec *pb.ChaincodeSpec - args := util.ToChaincodeArgs("invoke", "a", "b", "10") - - spec = &pb.ChaincodeSpec{Type: 1, ChaincodeID: &pb.ChaincodeID{Name: chaincodeID}, CtorMsg: &pb.ChaincodeInput{Args: args}} - - _, _, _, err := invoke(ctxt, chainID, spec) - - if err != nil { - errs[qnum] = fmt.Errorf("Error executing <%s>: %s", chaincodeID, err) - return - } - } - wg.Add(numTrans + numQueries) - - //execute transactions sequentially.. - go func() { - for i := 0; i < numTrans; i++ { - e(i) - } - }() - - wg.Wait() - return errs -} - // Test the execution of an invalid transaction. func TestExecuteInvokeInvalidTransaction(t *testing.T) { chainID := util.GetTestChainID() diff --git a/core/chaincode/handler.go b/core/chaincode/handler.go index fb51b898206..21356b1707c 100644 --- a/core/chaincode/handler.go +++ b/core/chaincode/handler.go @@ -38,9 +38,6 @@ const ( establishedstate = "established" //in: CREATED, rcv: REGISTER, send: REGISTERED, INIT initstate = "init" //in:ESTABLISHED, rcv:-, send: INIT readystate = "ready" //in:ESTABLISHED,TRANSACTION, rcv:COMPLETED - transactionstate = "transaction" //in:READY, rcv: xact from consensus, send: TRANSACTION - busyinitstate = "busyinit" //in:INIT, rcv: PUT_STATE, DEL_STATE, INVOKE_CHAINCODE - busyxactstate = "busyxact" //in:TRANSACION, rcv: PUT_STATE, DEL_STATE, INVOKE_CHAINCODE endstate = "end" //in:INIT,ESTABLISHED, rcv: error, terminate container ) @@ -145,14 +142,31 @@ func (handler *Handler) getCCRootName() string { return handler.ccCompParts.name } +//serialSend serializes msgs so gRPC will be happy func (handler *Handler) serialSend(msg *pb.ChaincodeMessage) error { handler.serialLock.Lock() defer handler.serialLock.Unlock() - if err := handler.ChatStream.Send(msg); err != nil { - chaincodeLogger.Errorf("Error sending %s: %s", msg.Type.String(), err) - return fmt.Errorf("Error sending %s: %s", msg.Type.String(), err) + + var err error + if err = handler.ChatStream.Send(msg); err != nil { + err = fmt.Errorf("[%s]Error sending %s: %s", shorttxid(msg.Txid), msg.Type.String(), err) + chaincodeLogger.Errorf("%s", err) } - return nil + return err +} + +//serialSendAsync serves the same purpose as serialSend (serializ msgs so gRPC will +//be happy). In addition, it is also asynchronous so send-remoterecv--localrecv loop +//can be nonblocking. Only errors need to be handled and these are handled by +//communication on supplied error channel. A typical use will be a non-blocking or +//nil channel +func (handler *Handler) serialSendAsync(msg *pb.ChaincodeMessage, errc chan error) { + go func() { + err := handler.serialSend(msg) + if errc != nil { + errc <- err + } + }() } func (handler *Handler) createTxContext(ctxt context.Context, chainID string, txid string, prop *pb.Proposal) (*transactionContext, error) { @@ -253,6 +267,9 @@ func (handler *Handler) processStream() error { //recv is used to spin Recv routine after previous received msg //has been processed recv := true + + //catch send errors and bail now that sends aren't synchronous + errc := make(chan error, 1) for { in = nil err = nil @@ -266,6 +283,12 @@ func (handler *Handler) processStream() error { }() } select { + case sendErr := <-errc: + if sendErr != nil { + return sendErr + } + //send was successful, just continue + continue case in = <-msgAvail: // Defer the deregistering of the this handler. if err == io.EOF { @@ -307,14 +330,9 @@ func (handler *Handler) processStream() error { continue } - //TODO we could use this to hook into container lifecycle (kill the chaincode if not in use, etc) - kaerr := handler.serialSend(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_KEEPALIVE}) - if kaerr != nil { - chaincodeLogger.Errorf("Error sending keepalive, err=%s", kaerr) - } else { - chaincodeLogger.Debug("Sent KEEPALIVE request") - } - //keepalive message kicked in. just continue + //if no error message from serialSend, KEEPALIVE happy, and don't care about error + //(maybe it'll work later) + handler.serialSendAsync(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_KEEPALIVE}, nil) continue } @@ -326,10 +344,8 @@ func (handler *Handler) processStream() error { if nsInfo != nil && nsInfo.sendToCC { chaincodeLogger.Debugf("[%s]sending state message %s", shorttxid(in.Txid), in.Type.String()) - if err = handler.serialSend(in); err != nil { - chaincodeLogger.Debugf("[%s]serial sending received error %s", shorttxid(in.Txid), err) - return fmt.Errorf("[%s]serial sending received error %s", shorttxid(in.Txid), err) - } + //if error bail in select + handler.serialSendAsync(in, errc) } } } @@ -357,40 +373,26 @@ func newChaincodeSupportHandler(chaincodeSupport *ChaincodeSupport, peerChatStre {Name: pb.ChaincodeMessage_REGISTER.String(), Src: []string{createdstate}, Dst: establishedstate}, {Name: pb.ChaincodeMessage_INIT.String(), Src: []string{establishedstate}, Dst: initstate}, {Name: pb.ChaincodeMessage_READY.String(), Src: []string{establishedstate}, Dst: readystate}, - {Name: pb.ChaincodeMessage_TRANSACTION.String(), Src: []string{readystate}, Dst: transactionstate}, - {Name: pb.ChaincodeMessage_PUT_STATE.String(), Src: []string{transactionstate}, Dst: busyxactstate}, - {Name: pb.ChaincodeMessage_DEL_STATE.String(), Src: []string{transactionstate}, Dst: busyxactstate}, - {Name: pb.ChaincodeMessage_INVOKE_CHAINCODE.String(), Src: []string{transactionstate}, Dst: busyxactstate}, - {Name: pb.ChaincodeMessage_PUT_STATE.String(), Src: []string{initstate}, Dst: busyinitstate}, - {Name: pb.ChaincodeMessage_DEL_STATE.String(), Src: []string{initstate}, Dst: busyinitstate}, - {Name: pb.ChaincodeMessage_INVOKE_CHAINCODE.String(), Src: []string{initstate}, Dst: busyinitstate}, - {Name: pb.ChaincodeMessage_COMPLETED.String(), Src: []string{initstate, readystate, transactionstate}, Dst: readystate}, + {Name: pb.ChaincodeMessage_PUT_STATE.String(), Src: []string{initstate}, Dst: initstate}, + {Name: pb.ChaincodeMessage_PUT_STATE.String(), Src: []string{readystate}, Dst: readystate}, + {Name: pb.ChaincodeMessage_DEL_STATE.String(), Src: []string{initstate}, Dst: initstate}, + {Name: pb.ChaincodeMessage_DEL_STATE.String(), Src: []string{readystate}, Dst: readystate}, + {Name: pb.ChaincodeMessage_INVOKE_CHAINCODE.String(), Src: []string{initstate}, Dst: initstate}, + {Name: pb.ChaincodeMessage_INVOKE_CHAINCODE.String(), Src: []string{readystate}, Dst: readystate}, + {Name: pb.ChaincodeMessage_COMPLETED.String(), Src: []string{initstate, readystate}, Dst: readystate}, {Name: pb.ChaincodeMessage_GET_STATE.String(), Src: []string{readystate}, Dst: readystate}, {Name: pb.ChaincodeMessage_GET_STATE.String(), Src: []string{initstate}, Dst: initstate}, - {Name: pb.ChaincodeMessage_GET_STATE.String(), Src: []string{busyinitstate}, Dst: busyinitstate}, - {Name: pb.ChaincodeMessage_GET_STATE.String(), Src: []string{transactionstate}, Dst: transactionstate}, - {Name: pb.ChaincodeMessage_GET_STATE.String(), Src: []string{busyxactstate}, Dst: busyxactstate}, {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE.String(), Src: []string{readystate}, Dst: readystate}, {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE.String(), Src: []string{initstate}, Dst: initstate}, - {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE.String(), Src: []string{busyinitstate}, Dst: busyinitstate}, - {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE.String(), Src: []string{transactionstate}, Dst: transactionstate}, - {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE.String(), Src: []string{busyxactstate}, Dst: busyxactstate}, {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_NEXT.String(), Src: []string{readystate}, Dst: readystate}, {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_NEXT.String(), Src: []string{initstate}, Dst: initstate}, - {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_NEXT.String(), Src: []string{busyinitstate}, Dst: busyinitstate}, - {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_NEXT.String(), Src: []string{transactionstate}, Dst: transactionstate}, - {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_NEXT.String(), Src: []string{busyxactstate}, Dst: busyxactstate}, {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_CLOSE.String(), Src: []string{readystate}, Dst: readystate}, {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_CLOSE.String(), Src: []string{initstate}, Dst: initstate}, - {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_CLOSE.String(), Src: []string{busyinitstate}, Dst: busyinitstate}, - {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_CLOSE.String(), Src: []string{transactionstate}, Dst: transactionstate}, - {Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_CLOSE.String(), Src: []string{busyxactstate}, Dst: busyxactstate}, {Name: pb.ChaincodeMessage_ERROR.String(), Src: []string{initstate}, Dst: endstate}, - {Name: pb.ChaincodeMessage_ERROR.String(), Src: []string{transactionstate}, Dst: readystate}, - {Name: pb.ChaincodeMessage_ERROR.String(), Src: []string{busyinitstate}, Dst: initstate}, - {Name: pb.ChaincodeMessage_ERROR.String(), Src: []string{busyxactstate}, Dst: transactionstate}, - {Name: pb.ChaincodeMessage_RESPONSE.String(), Src: []string{busyinitstate}, Dst: initstate}, - {Name: pb.ChaincodeMessage_RESPONSE.String(), Src: []string{busyxactstate}, Dst: transactionstate}, + {Name: pb.ChaincodeMessage_ERROR.String(), Src: []string{readystate}, Dst: readystate}, + {Name: pb.ChaincodeMessage_RESPONSE.String(), Src: []string{initstate}, Dst: initstate}, + {Name: pb.ChaincodeMessage_RESPONSE.String(), Src: []string{readystate}, Dst: readystate}, + {Name: pb.ChaincodeMessage_TRANSACTION.String(), Src: []string{readystate}, Dst: readystate}, }, fsm.Callbacks{ "before_" + pb.ChaincodeMessage_REGISTER.String(): func(e *fsm.Event) { v.beforeRegisterEvent(e, v.FSM.Current()) }, @@ -400,14 +402,12 @@ func newChaincodeSupportHandler(chaincodeSupport *ChaincodeSupport, peerChatStre "after_" + pb.ChaincodeMessage_RANGE_QUERY_STATE.String(): func(e *fsm.Event) { v.afterRangeQueryState(e, v.FSM.Current()) }, "after_" + pb.ChaincodeMessage_RANGE_QUERY_STATE_NEXT.String(): func(e *fsm.Event) { v.afterRangeQueryStateNext(e, v.FSM.Current()) }, "after_" + pb.ChaincodeMessage_RANGE_QUERY_STATE_CLOSE.String(): func(e *fsm.Event) { v.afterRangeQueryStateClose(e, v.FSM.Current()) }, - "after_" + pb.ChaincodeMessage_PUT_STATE.String(): func(e *fsm.Event) { v.afterPutState(e, v.FSM.Current()) }, - "after_" + pb.ChaincodeMessage_DEL_STATE.String(): func(e *fsm.Event) { v.afterDelState(e, v.FSM.Current()) }, - "after_" + pb.ChaincodeMessage_INVOKE_CHAINCODE.String(): func(e *fsm.Event) { v.afterInvokeChaincode(e, v.FSM.Current()) }, + "after_" + pb.ChaincodeMessage_PUT_STATE.String(): func(e *fsm.Event) { v.enterBusyState(e, v.FSM.Current()) }, + "after_" + pb.ChaincodeMessage_DEL_STATE.String(): func(e *fsm.Event) { v.enterBusyState(e, v.FSM.Current()) }, + "after_" + pb.ChaincodeMessage_INVOKE_CHAINCODE.String(): func(e *fsm.Event) { v.enterBusyState(e, v.FSM.Current()) }, "enter_" + establishedstate: func(e *fsm.Event) { v.enterEstablishedState(e, v.FSM.Current()) }, "enter_" + initstate: func(e *fsm.Event) { v.enterInitState(e, v.FSM.Current()) }, "enter_" + readystate: func(e *fsm.Event) { v.enterReadyState(e, v.FSM.Current()) }, - "enter_" + busyinitstate: func(e *fsm.Event) { v.enterBusyState(e, v.FSM.Current()) }, - "enter_" + busyxactstate: func(e *fsm.Event) { v.enterBusyState(e, v.FSM.Current()) }, "enter_" + endstate: func(e *fsm.Event) { v.enterEndState(e, v.FSM.Current()) }, }, ) @@ -563,7 +563,7 @@ func (handler *Handler) handleGetState(msg *pb.ChaincodeMessage) { defer func() { handler.deleteTXIDEntry(msg.Txid) chaincodeLogger.Debugf("[%s]handleGetState serial send %s", shorttxid(serialSendMsg.Txid), serialSendMsg.Type) - handler.serialSend(serialSendMsg) + handler.serialSendAsync(serialSendMsg, nil) }() key := string(msg.Payload) @@ -635,7 +635,7 @@ func (handler *Handler) handleRangeQueryState(msg *pb.ChaincodeMessage) { defer func() { handler.deleteTXIDEntry(msg.Txid) chaincodeLogger.Debugf("[%s]handleRangeQueryState serial send %s", shorttxid(serialSendMsg.Txid), serialSendMsg.Type) - handler.serialSend(serialSendMsg) + handler.serialSendAsync(serialSendMsg, nil) }() rangeQueryState := &pb.RangeQueryState{} @@ -742,7 +742,7 @@ func (handler *Handler) handleRangeQueryStateNext(msg *pb.ChaincodeMessage) { defer func() { handler.deleteTXIDEntry(msg.Txid) chaincodeLogger.Debugf("[%s]handleRangeQueryState serial send %s", shorttxid(serialSendMsg.Txid), serialSendMsg.Type) - handler.serialSend(serialSendMsg) + handler.serialSendAsync(serialSendMsg, nil) }() rangeQueryStateNext := &pb.RangeQueryStateNext{} @@ -840,7 +840,7 @@ func (handler *Handler) handleRangeQueryStateClose(msg *pb.ChaincodeMessage) { defer func() { handler.deleteTXIDEntry(msg.Txid) chaincodeLogger.Debugf("[%s]handleRangeQueryState serial send %s", shorttxid(serialSendMsg.Txid), serialSendMsg.Type) - handler.serialSend(serialSendMsg) + handler.serialSendAsync(serialSendMsg, nil) }() rangeQueryStateClose := &pb.RangeQueryStateClose{} @@ -1132,6 +1132,11 @@ func (handler *Handler) initOrReady(ctxt context.Context, chainID string, txid s func (handler *Handler) HandleMessage(msg *pb.ChaincodeMessage) error { chaincodeLogger.Debugf("[%s]Handling ChaincodeMessage of type: %s in state %s", shorttxid(msg.Txid), msg.Type, handler.FSM.Current()) + if (msg.Type == pb.ChaincodeMessage_COMPLETED || msg.Type == pb.ChaincodeMessage_ERROR) && handler.FSM.Current() == "ready" { + chaincodeLogger.Debugf("[%s]HandleMessage- COMPLETED. Notify", msg.Txid) + handler.notify(msg) + return nil + } if handler.FSM.Cannot(msg.Type.String()) { // Other errors return fmt.Errorf("[%s]Chaincode handler validator FSM cannot handle message (%s) with payload size (%d) while in state: %s", msg.Txid, msg.Type.String(), len(msg.Payload), handler.FSM.Current()) @@ -1207,27 +1212,3 @@ func (handler *Handler) isRunning() bool { return true } } - -/**************** -func (handler *Handler) initEvent() (chan *pb.ChaincodeMessage, error) { - if handler.responseNotifiers == nil { - return nil,fmt.Errorf("SendMessage called before registration for Txid:%s", msg.Txid) - } - var notfy chan *pb.ChaincodeMessage - handler.Lock() - if handler.responseNotifiers[msg.Txid] != nil { - handler.Unlock() - return nil, fmt.Errorf("SendMessage Txid:%s exists", msg.Txid) - } - //note the explicit use of buffer 1. We won't block if the receiver times outi and does not wait - //for our response - handler.responseNotifiers[msg.Txid] = make(chan *pb.ChaincodeMessage, 1) - handler.Unlock() - - if err := c.serialSend(msg); err != nil { - deleteNotifier(msg.Txid) - return nil, fmt.Errorf("SendMessage error sending %s(%s)", msg.Txid, err) - } - return notfy, nil -} -*******************/ diff --git a/core/chaincode/shim/chaincode.go b/core/chaincode/shim/chaincode.go index a125b409b46..6050e50125c 100644 --- a/core/chaincode/shim/chaincode.go +++ b/core/chaincode/shim/chaincode.go @@ -187,8 +187,11 @@ func chatWithPeer(chaincodename string, stream PeerChaincodeStream, cc Chaincode } // Register on the stream chaincodeLogger.Debugf("Registering.. sending %s", pb.ChaincodeMessage_REGISTER) - handler.serialSend(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTER, Payload: payload}) + if err = handler.serialSend(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTER, Payload: payload}); err != nil { + return fmt.Errorf("Error sending chaincode REGISTER: %s", err) + } waitc := make(chan struct{}) + errc := make(chan error) go func() { defer close(waitc) msgAvail := make(chan *pb.ChaincodeMessage) @@ -208,6 +211,14 @@ func chatWithPeer(chaincodename string, stream PeerChaincodeStream, cc Chaincode }() } select { + case sendErr := <-errc: + //serialSendAsync successful? + if sendErr == nil { + continue + } + //no, bail + err = fmt.Errorf("Error sending %s: %s", in.Type.String(), sendErr) + return case in = <-msgAvail: if err == io.EOF { chaincodeLogger.Debugf("Received EOF, ending chaincode stream, %s", err) @@ -241,12 +252,11 @@ func chatWithPeer(chaincodename string, stream PeerChaincodeStream, cc Chaincode if (nsInfo != nil && nsInfo.sendToCC) || (in.Type == pb.ChaincodeMessage_KEEPALIVE) { if in.Type == pb.ChaincodeMessage_KEEPALIVE { chaincodeLogger.Debug("Sending KEEPALIVE response") + //ignore any errors, maybe next KEEPALIVE will work + handler.serialSendAsync(in, nil) } else { chaincodeLogger.Debugf("[%s]send state message %s", shorttxid(in.Txid), in.Type.String()) - } - if err = handler.serialSend(in); err != nil { - err = fmt.Errorf("Error sending %s: %s", in.Type.String(), err) - return + handler.serialSendAsync(in, errc) } } } diff --git a/core/chaincode/shim/handler.go b/core/chaincode/shim/handler.go index 09d819bf4e1..59c6d1061d7 100644 --- a/core/chaincode/shim/handler.go +++ b/core/chaincode/shim/handler.go @@ -64,14 +64,28 @@ func shorttxid(txid string) string { return txid[0:8] } +//serialSend serializes msgs so gRPC will be happy func (handler *Handler) serialSend(msg *pb.ChaincodeMessage) error { handler.serialLock.Lock() defer handler.serialLock.Unlock() - if err := handler.ChatStream.Send(msg); err != nil { - chaincodeLogger.Errorf("[%s]Error sending %s: %s", shorttxid(msg.Txid), msg.Type.String(), err) - return fmt.Errorf("Error sending %s: %s", msg.Type.String(), err) - } - return nil + + err := handler.ChatStream.Send(msg) + + return err +} + +//serialSendAsync serves the same purpose as serialSend (serializ msgs so gRPC will +//be happy). In addition, it is also asynchronous so send-remoterecv--localrecv loop +//can be nonblocking. Only errors need to be handled and these are handled by +//communication on supplied error channel. A typical use will be a non-blocking or +//nil channel +func (handler *Handler) serialSendAsync(msg *pb.ChaincodeMessage, errc chan error) { + go func() { + err := handler.serialSend(msg) + if errc != nil { + errc <- err + } + }() } func (handler *Handler) createChannel(txid string) (chan pb.ChaincodeMessage, error) { @@ -105,9 +119,32 @@ func (handler *Handler) sendChannel(msg *pb.ChaincodeMessage) error { return nil } -func (handler *Handler) receiveChannel(c chan pb.ChaincodeMessage) (pb.ChaincodeMessage, bool) { - msg, val := <-c - return msg, val +//sends a message and selects +func (handler *Handler) sendReceive(msg *pb.ChaincodeMessage, c chan pb.ChaincodeMessage) (pb.ChaincodeMessage, error) { + errc := make(chan error, 1) + handler.serialSendAsync(msg, errc) + + //the serialsend above will send an err or nil + //the select filters that first error(or nil) + //and continues to wait for the response + //it is possible that the response triggers first + //in which case the errc obviously worked and is + //ignored + for { + select { + case err := <-errc: + if err == nil { + continue + } + //would have been logged, return false + return pb.ChaincodeMessage{}, err + case outmsg, val := <-c: + if !val { + return pb.ChaincodeMessage{}, fmt.Errorf("unexpected failure on receive") + } + return outmsg, nil + } + } } func (handler *Handler) deleteChannel(txid string) { @@ -136,19 +173,18 @@ func newChaincodeHandler(peerChatStream PeerChaincodeStream, chaincode Chaincode {Name: pb.ChaincodeMessage_READY.String(), Src: []string{"established"}, Dst: "ready"}, {Name: pb.ChaincodeMessage_ERROR.String(), Src: []string{"init"}, Dst: "established"}, {Name: pb.ChaincodeMessage_RESPONSE.String(), Src: []string{"init"}, Dst: "init"}, - {Name: pb.ChaincodeMessage_COMPLETED.String(), Src: []string{"init"}, Dst: "ready"}, - {Name: pb.ChaincodeMessage_TRANSACTION.String(), Src: []string{"ready"}, Dst: "transaction"}, - {Name: pb.ChaincodeMessage_COMPLETED.String(), Src: []string{"transaction"}, Dst: "ready"}, - {Name: pb.ChaincodeMessage_ERROR.String(), Src: []string{"transaction"}, Dst: "ready"}, - {Name: pb.ChaincodeMessage_RESPONSE.String(), Src: []string{"transaction"}, Dst: "transaction"}, + {Name: pb.ChaincodeMessage_TRANSACTION.String(), Src: []string{"ready"}, Dst: "ready"}, {Name: pb.ChaincodeMessage_RESPONSE.String(), Src: []string{"ready"}, Dst: "ready"}, + {Name: pb.ChaincodeMessage_ERROR.String(), Src: []string{"ready"}, Dst: "ready"}, + {Name: pb.ChaincodeMessage_COMPLETED.String(), Src: []string{"init"}, Dst: "ready"}, + {Name: pb.ChaincodeMessage_COMPLETED.String(), Src: []string{"ready"}, Dst: "ready"}, }, fsm.Callbacks{ "before_" + pb.ChaincodeMessage_REGISTERED.String(): func(e *fsm.Event) { v.beforeRegistered(e) }, "after_" + pb.ChaincodeMessage_RESPONSE.String(): func(e *fsm.Event) { v.afterResponse(e) }, "after_" + pb.ChaincodeMessage_ERROR.String(): func(e *fsm.Event) { v.afterError(e) }, "enter_init": func(e *fsm.Event) { v.enterInitState(e) }, - "enter_transaction": func(e *fsm.Event) { v.enterTransactionState(e) }, + "before_" + pb.ChaincodeMessage_TRANSACTION.String(): func(e *fsm.Event) { v.enterTransactionState(e) }, }, ) return v @@ -283,9 +319,6 @@ func (handler *Handler) enterTransactionState(e *fsm.Event) { } } -// enterReadyState will need to handle COMPLETED event by sending message to the peer -//func (handler *Handler) enterReadyState(e *fsm.Event) { - // afterCompleted will need to handle COMPLETED event by sending message to the peer func (handler *Handler) afterCompleted(e *fsm.Event) { msg, ok := e.Args[0].(*pb.ChaincodeMessage) @@ -347,18 +380,12 @@ func (handler *Handler) handleGetState(key string, txid string) ([]byte, error) payload := []byte(key) msg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_GET_STATE, Payload: payload, Txid: txid} chaincodeLogger.Debugf("[%s]Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_GET_STATE) - if err := handler.serialSend(msg); err != nil { + responseMsg, err := handler.sendReceive(msg, respChan) + if err != nil { chaincodeLogger.Errorf("[%s]error sending GET_STATE %s", shorttxid(txid), err) return nil, errors.New("could not send msg") } - // Wait on responseChannel for response - responseMsg, ok := handler.receiveChannel(respChan) - if !ok { - chaincodeLogger.Errorf("[%s]Received unexpected message type", shorttxid(responseMsg.Txid)) - return nil, errors.New("Received unexpected message type") - } - if responseMsg.Type.String() == pb.ChaincodeMessage_RESPONSE.String() { // Success response chaincodeLogger.Debugf("[%s]GetState received payload %s", shorttxid(responseMsg.Txid), pb.ChaincodeMessage_RESPONSE) @@ -397,18 +424,12 @@ func (handler *Handler) handlePutState(key string, value []byte, txid string) er // Send PUT_STATE message to validator chaincode support msg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_PUT_STATE, Payload: payloadBytes, Txid: txid} chaincodeLogger.Debugf("[%s]Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_PUT_STATE) - if err = handler.serialSend(msg); err != nil { + responseMsg, err := handler.sendReceive(msg, respChan) + if err != nil { chaincodeLogger.Errorf("[%s]error sending PUT_STATE %s", msg.Txid, err) return errors.New("could not send msg") } - // Wait on responseChannel for response - responseMsg, ok := handler.receiveChannel(respChan) - if !ok { - chaincodeLogger.Errorf("[%s]Received unexpected message type", msg.Txid) - return errors.New("Received unexpected message type") - } - if responseMsg.Type.String() == pb.ChaincodeMessage_RESPONSE.String() { // Success response chaincodeLogger.Debugf("[%s]Received %s. Successfully updated state", shorttxid(responseMsg.Txid), pb.ChaincodeMessage_RESPONSE) @@ -441,18 +462,12 @@ func (handler *Handler) handleDelState(key string, txid string) error { payload := []byte(key) msg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_DEL_STATE, Payload: payload, Txid: txid} chaincodeLogger.Debugf("[%s]Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_DEL_STATE) - if err := handler.serialSend(msg); err != nil { + responseMsg, err := handler.sendReceive(msg, respChan) + if err != nil { chaincodeLogger.Errorf("[%s]error sending DEL_STATE %s", shorttxid(msg.Txid), pb.ChaincodeMessage_DEL_STATE) return errors.New("could not send msg") } - // Wait on responseChannel for response - responseMsg, ok := handler.receiveChannel(respChan) - if !ok { - chaincodeLogger.Errorf("[%s]Received unexpected message type", shorttxid(msg.Txid)) - return errors.New("Received unexpected message type") - } - if responseMsg.Type.String() == pb.ChaincodeMessage_RESPONSE.String() { // Success response chaincodeLogger.Debugf("[%s]Received %s. Successfully deleted state", msg.Txid, pb.ChaincodeMessage_RESPONSE) @@ -487,18 +502,12 @@ func (handler *Handler) handleRangeQueryState(startKey, endKey string, txid stri } msg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RANGE_QUERY_STATE, Payload: payloadBytes, Txid: txid} chaincodeLogger.Debugf("[%s]Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_RANGE_QUERY_STATE) - if err = handler.serialSend(msg); err != nil { + responseMsg, err := handler.sendReceive(msg, respChan) + if err != nil { chaincodeLogger.Errorf("[%s]error sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_RANGE_QUERY_STATE) return nil, errors.New("could not send msg") } - // Wait on responseChannel for response - responseMsg, ok := handler.receiveChannel(respChan) - if !ok { - chaincodeLogger.Errorf("[%s]Received unexpected message type", txid) - return nil, errors.New("Received unexpected message type") - } - if responseMsg.Type.String() == pb.ChaincodeMessage_RESPONSE.String() { // Success response chaincodeLogger.Debugf("[%s]Received %s. Successfully got range", shorttxid(responseMsg.Txid), pb.ChaincodeMessage_RESPONSE) @@ -541,18 +550,12 @@ func (handler *Handler) handleRangeQueryStateNext(id, txid string) (*pb.RangeQue } msg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RANGE_QUERY_STATE_NEXT, Payload: payloadBytes, Txid: txid} chaincodeLogger.Debugf("[%s]Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_RANGE_QUERY_STATE_NEXT) - if err = handler.serialSend(msg); err != nil { + responseMsg, err := handler.sendReceive(msg, respChan) + if err != nil { chaincodeLogger.Errorf("[%s]error sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_RANGE_QUERY_STATE_NEXT) return nil, errors.New("could not send msg") } - // Wait on responseChannel for response - responseMsg, ok := handler.receiveChannel(respChan) - if !ok { - chaincodeLogger.Errorf("[%s]Received unexpected message type", txid) - return nil, errors.New("Received unexpected message type") - } - if responseMsg.Type.String() == pb.ChaincodeMessage_RESPONSE.String() { // Success response chaincodeLogger.Debugf("[%s]Received %s. Successfully got range", shorttxid(responseMsg.Txid), pb.ChaincodeMessage_RESPONSE) @@ -595,18 +598,12 @@ func (handler *Handler) handleRangeQueryStateClose(id, txid string) (*pb.RangeQu } msg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RANGE_QUERY_STATE_CLOSE, Payload: payloadBytes, Txid: txid} chaincodeLogger.Debugf("[%s]Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_RANGE_QUERY_STATE_CLOSE) - if err = handler.serialSend(msg); err != nil { + responseMsg, err := handler.sendReceive(msg, respChan) + if err != nil { chaincodeLogger.Errorf("[%s]error sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_RANGE_QUERY_STATE_CLOSE) return nil, errors.New("could not send msg") } - // Wait on responseChannel for response - responseMsg, ok := handler.receiveChannel(respChan) - if !ok { - chaincodeLogger.Errorf("[%s]Received unexpected message type", txid) - return nil, errors.New("Received unexpected message type") - } - if responseMsg.Type.String() == pb.ChaincodeMessage_RESPONSE.String() { // Success response chaincodeLogger.Debugf("[%s]Received %s. Successfully got range", shorttxid(responseMsg.Txid), pb.ChaincodeMessage_RESPONSE) @@ -653,18 +650,12 @@ func (handler *Handler) handleInvokeChaincode(chaincodeName string, args [][]byt // Send INVOKE_CHAINCODE message to validator chaincode support msg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_INVOKE_CHAINCODE, Payload: payloadBytes, Txid: txid} chaincodeLogger.Debugf("[%s]Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_INVOKE_CHAINCODE) - if err = handler.serialSend(msg); err != nil { + responseMsg, err := handler.sendReceive(msg, respChan) + if err != nil { chaincodeLogger.Errorf("[%s]error sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_INVOKE_CHAINCODE) return nil, errors.New("could not send msg") } - // Wait on responseChannel for response - responseMsg, ok := handler.receiveChannel(respChan) - if !ok { - chaincodeLogger.Errorf("[%s]Received unexpected message type", shorttxid(msg.Txid)) - return nil, errors.New("Received unexpected message type") - } - if responseMsg.Type.String() == pb.ChaincodeMessage_RESPONSE.String() { // Success response chaincodeLogger.Debugf("[%s]Received %s. Successfully invoked chaincode", shorttxid(responseMsg.Txid), pb.ChaincodeMessage_RESPONSE) diff --git a/core/committer/txvalidator/validator.go b/core/committer/txvalidator/validator.go index 849960284f7..19636551378 100644 --- a/core/committer/txvalidator/validator.go +++ b/core/committer/txvalidator/validator.go @@ -88,7 +88,7 @@ func (v *txValidator) Validate(block *common.Block) { // TODO: this code needs to receive a bit more attention and discussion: // it's not clear what it means if a transaction which causes a failure // in validation is just dropped on the floor - logger.Errorf("Invalid transaction with index %s, error %s", tIdx, err) + logger.Errorf("Invalid transaction with index %d, error %s", tIdx, err) txsfltr.Set(uint(tIdx)) } else { //the payload is used to get headers @@ -153,9 +153,13 @@ func (v *vsccValidatorImpl) VSCCValidateTx(payload *common.Payload, envBytes []b defer txsim.Done() ctxt := context.WithValue(context.Background(), chaincode.TXSimulatorKey, txsim) + //generate an internal txid for executing system chaincode calls below on behalf + //of original txid + vscctxid := coreUtil.GenerateUUID() + // Extracting vscc from lccc /* - data, err := chaincode.GetChaincodeDataFromLCCC(ctxt, txid, nil, chainID, "vscc") + data, err := chaincode.GetChaincodeDataFromLCCC(ctxt, vscctxid, nil, chainID, "vscc") if err != nil { logger.Errorf("Unable to get chaincode data from LCCC for txid %s, due to %s", txid, err) return err @@ -164,7 +168,7 @@ func (v *vsccValidatorImpl) VSCCValidateTx(payload *common.Payload, envBytes []b // Get chaincode version version := coreUtil.GetSysCCVersion() - cccid := chaincode.NewCCContext(chainID, "vscc", version, txid, true, nil) + cccid := chaincode.NewCCContext(chainID, "vscc", version, vscctxid, true, nil) // invoke VSCC _, _, err = chaincode.ExecuteChaincode(ctxt, cccid, args) diff --git a/core/util/utils.go b/core/util/utils.go index 056263ad49b..fa93b69b4b1 100644 --- a/core/util/utils.go +++ b/core/util/utils.go @@ -25,7 +25,7 @@ import ( "strings" "time" - "github.com/hyperledger/fabric/metadata" + "github.com/hyperledger/fabric/common/metadata" "github.com/golang/protobuf/ptypes/timestamp" "golang.org/x/crypto/sha3" diff --git a/examples/ccchecker/ccchecker.go b/examples/ccchecker/ccchecker.go new file mode 100644 index 00000000000..77c5f11c228 --- /dev/null +++ b/examples/ccchecker/ccchecker.go @@ -0,0 +1,179 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "os" + "sync" + "time" + + "golang.org/x/net/context" + + "github.com/hyperledger/fabric/examples/ccchecker/chaincodes" + "github.com/hyperledger/fabric/peer/common" +) + +//global ccchecker params +var ccchecker *CCChecker + +//CCChecker encapsulates ccchecker properties and runtime +type CCChecker struct { + //Chaincodes to do ccchecker over (see ccchecker.json for defaults) + Chaincodes []*chaincodes.CC + //TimeoutToAbortSecs abort deadline + TimeoutToAbortSecs int + //ChainName name of the chain + ChainName string +} + +//LoadCCCheckerParams read the ccchecker params from a file +func LoadCCCheckerParams(file string) error { + var b []byte + var err error + if b, err = ioutil.ReadFile(file); err != nil { + return fmt.Errorf("Cannot read config file %s\n", err) + } + sp := &CCChecker{} + err = json.Unmarshal(b, &sp) + if err != nil { + return fmt.Errorf("error unmarshalling ccchecker: %s\n", err) + } + + ccchecker = &CCChecker{} + id := 0 + for _, scc := range sp.Chaincodes { + //concurrency <=0 will be dropped + if scc.Concurrency > 0 { + for i := 0; i < scc.Concurrency; i++ { + tmp := &chaincodes.CC{} + *tmp = *scc + tmp.ID = id + id = id + 1 + ccchecker.Chaincodes = append(ccchecker.Chaincodes, tmp) + } + } + } + + ccchecker.TimeoutToAbortSecs = sp.TimeoutToAbortSecs + ccchecker.ChainName = sp.ChainName + + return nil +} + +//CCCheckerInit assigns shadow chaincode to each of the CC from registered shadow chaincodes +func CCCheckerInit() { + if ccchecker == nil { + fmt.Printf("LoadCCCheckerParams needs to be called before init\n") + os.Exit(1) + } + + if err := chaincodes.RegisterCCs(ccchecker.Chaincodes); err != nil { + panic(fmt.Sprintf("%s", err)) + } +} + +//CCCheckerRun main loops that will run the tests and cleanup +func CCCheckerRun(report bool, verbose bool) error { + //connect with Broadcast client + bc, err := common.GetBroadcastClient() + if err != nil { + return err + } + defer bc.Close() + + ec, err := common.GetEndorserClient() + if err != nil { + return err + } + + signer, err := common.GetDefaultSigner() + if err != nil { + return err + } + + //when the wait's timeout and get out of ccchecker, we + //cancel and release all goroutines + ctxt, cancel := context.WithCancel(context.Background()) + defer cancel() + + var ccsWG sync.WaitGroup + ccsWG.Add(len(ccchecker.Chaincodes)) + + //an anonymous struct to hold failures + var failures struct { + sync.Mutex + failedCCs int + } + + //run the invokes + ccerrs := make([]error, len(ccchecker.Chaincodes)) + for _, cc := range ccchecker.Chaincodes { + go func(cc2 *chaincodes.CC) { + if ccerrs[cc2.ID] = cc2.Run(ctxt, ccchecker.ChainName, bc, ec, signer, &ccsWG); ccerrs[cc2.ID] != nil { + failures.Lock() + failures.failedCCs = failures.failedCCs + 1 + failures.Unlock() + } + }(cc) + } + + //wait or timeout + err = ccchecker.wait(&ccsWG) + + //verify results + if err == nil && failures.failedCCs < len(ccchecker.Chaincodes) { + ccsWG = sync.WaitGroup{} + ccsWG.Add(len(ccchecker.Chaincodes) - failures.failedCCs) + for _, cc := range ccchecker.Chaincodes { + go func(cc2 *chaincodes.CC) { + if ccerrs[cc2.ID] == nil { + ccerrs[cc2.ID] = cc2.Validate(ctxt, ccchecker.ChainName, bc, ec, signer, &ccsWG) + } else { + fmt.Printf("Ignoring [%v] for validation as it returned err %s\n", cc2, ccerrs[cc2.ID]) + } + }(cc) + } + + //wait or timeout + err = ccchecker.wait(&ccsWG) + } + + if report { + for _, cc := range ccchecker.Chaincodes { + cc.Report(verbose, ccchecker.ChainName) + } + } + + return err +} + +func (s *CCChecker) wait(ccsWG *sync.WaitGroup) error { + done := make(chan struct{}) + go func() { + ccsWG.Wait() + done <- struct{}{} + }() + select { + case <-done: + return nil + case <-time.After(time.Duration(s.TimeoutToAbortSecs) * time.Second): + return fmt.Errorf("Aborting due to timeoutout!!") + } +} diff --git a/examples/ccchecker/ccchecker.json b/examples/ccchecker/ccchecker.json new file mode 100644 index 00000000000..9c2aebe2d70 --- /dev/null +++ b/examples/ccchecker/ccchecker.json @@ -0,0 +1,17 @@ +{"Chaincodes": + [ + {"Name": "mycc", + "Path": "github.com/hyperledger/fabric/examples/ccchecker/chaincodes/newkeyperinvoke", + "NumFinalQueryAttempts": 10, + "NumberOfIterations": 10, + "DelayBetweenInvokeMs": 1, + "DelayBetweenQueryMs": 10, + "TimeoutToAbortSecs": 60, + "Lang": "GOLANG", + "WaitAfterInvokeMs": 10000, + "Concurrency": 10 + } + ], + "TimeoutToAbortSecs": 60, + "ChainName": "**TEST_CHAINID**" +} diff --git a/examples/ccchecker/chaincodes/chaincodes.go b/examples/ccchecker/chaincodes/chaincodes.go new file mode 100644 index 00000000000..21725726606 --- /dev/null +++ b/examples/ccchecker/chaincodes/chaincodes.go @@ -0,0 +1,339 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package chaincodes + +import ( + "fmt" + "sync" + "time" + + "github.com/hyperledger/fabric/msp" + "github.com/hyperledger/fabric/peer/chaincode" + "github.com/hyperledger/fabric/peer/common" + pb "github.com/hyperledger/fabric/protos/peer" + + "golang.org/x/net/context" +) + +//ShadowCCIntf interfaces to be implemented by shadow chaincodes +type ShadowCCIntf interface { + //InitShadowCC initializes the shadow chaincode (will be called once for each chaincode) + InitShadowCC() + + //GetInvokeArgs gets invoke arguments from shadow + GetInvokeArgs(ccnum int, iter int) [][]byte + + //PostInvoke passes the retvalue from the invoke to the shadow for post-processing + PostInvoke(args [][]byte, retval []byte) error + + //GetQueryArgs mimics the Invoke and gets the query for an invoke + GetQueryArgs(ccnum int, iter int) [][]byte + + //Validate the results against the query arguments + Validate(args [][]byte, value []byte) error +} + +//CC chaincode properties, config and runtime +type CC struct { + //-------------config properties ------------ + //Name of the chaincode + Name string + + //Path to the chaincode + Path string + + //NumFinalQueryAttempts number of times to try final query before giving up + NumFinalQueryAttempts int + + //NumberOfInvokeIterations number of iterations to do invoke on + NumberOfIterations int + + //DelayBetweenInvokeMs delay between each invoke + DelayBetweenInvokeMs int + + //DelayBetweenQueryMs delay between each query + DelayBetweenQueryMs int + + //TimeoutToAbortSecs timeout for aborting this chaincode processing + TimeoutToAbortSecs int + + //Lang of chaincode + Lang string + + //WaitAfterInvokeMs wait time before validating invokes for this chaincode + WaitAfterInvokeMs int + + //Concurrency number of goroutines to spin + Concurrency int + + //-------------runtime properties ------------ + //Unique number assigned to this CC by CCChecker + ID int + + //shadow CC where the chaincode stats is maintained + shadowCC ShadowCCIntf + + //current iteration of invoke + currentInvokeIter int + + //start of invokes in epoch seconds + invokeStartTime int + + //end of invokes in epoch seconds + invokeEndTime int + + //error that stopped invoke iterations + invokeErr error + + //current iteration of query + currQueryIter []int + + //did the query work ? + queryWorked []bool + + //error on a query in an iteration + queryErrs []error +} + +func (cc *CC) getChaincodeSpec(args [][]byte) *pb.ChaincodeSpec { + return &pb.ChaincodeSpec{ + Type: pb.ChaincodeSpec_Type(pb.ChaincodeSpec_Type_value[cc.Lang]), + ChaincodeID: &pb.ChaincodeID{Path: cc.Path, Name: cc.Name}, + CtorMsg: &pb.ChaincodeInput{Args: args}, + } +} + +//doInvokes calls invoke for each iteration for the chaincode +//Stops at the first invoke with error +//currentInvokeIter contains the number of successful iterations +func (cc *CC) doInvokes(ctxt context.Context, chainID string, + bc common.BroadcastClient, ec pb.EndorserClient, signer msp.SigningIdentity, + wg *sync.WaitGroup, quit func() bool) error { + + var err error + for cc.currentInvokeIter = 0; cc.currentInvokeIter < cc.NumberOfIterations; cc.currentInvokeIter++ { + if quit() { + break + } + args := cc.shadowCC.GetInvokeArgs(cc.ID, cc.currentInvokeIter) + + spec := cc.getChaincodeSpec(args) + + if quit() { + break + } + + var pResp *pb.ProposalResponse + if pResp, err = chaincode.ChaincodeInvokeOrQuery(spec, chainID, true, signer, ec, bc); err != nil { + cc.invokeErr = err + break + } + + resp := pResp.Response.Payload + if err = cc.shadowCC.PostInvoke(args, resp); err != nil { + cc.invokeErr = err + break + } + + if quit() { + break + } + + //don't sleep for the last iter + if cc.DelayBetweenInvokeMs > 0 && cc.currentInvokeIter < (cc.NumberOfIterations-1) { + time.Sleep(time.Duration(cc.DelayBetweenInvokeMs) * time.Millisecond) + } + } + + return err +} + +//Run test over given number of iterations +// i will be unique across chaincodes and can be used as a key +// this is useful if chaincode occurs multiple times in the array of chaincodes +func (cc *CC) Run(ctxt context.Context, chainID string, bc common.BroadcastClient, ec pb.EndorserClient, signer msp.SigningIdentity, wg *sync.WaitGroup) error { + defer wg.Done() + + var ( + quit bool + err error + ) + + done := make(chan struct{}) + go func() { + defer func() { done <- struct{}{} }() + + //return the quit closure for validation within validateIter + quitF := func() bool { return quit } + + //start of invokes + cc.invokeStartTime = time.Now().Second() + + err = cc.doInvokes(ctxt, chainID, bc, ec, signer, wg, quitF) + + //end of invokes + cc.invokeEndTime = time.Now().Second() + }() + + //we could be done or cancelled or timedout + select { + case <-ctxt.Done(): + quit = true + return nil + case <-done: + return err + case <-time.After(time.Duration(cc.TimeoutToAbortSecs) * time.Second): + quit = true + return fmt.Errorf("Aborting due to timeoutout!!") + } +} + +//validates the invoke iteration for this chaincode +func (cc *CC) validateIter(ctxt context.Context, iter int, chainID string, bc common.BroadcastClient, ec pb.EndorserClient, signer msp.SigningIdentity, wg *sync.WaitGroup, quit func() bool) { + defer wg.Done() + args := cc.shadowCC.GetQueryArgs(cc.ID, iter) + + spec := cc.getChaincodeSpec(args) + + //lets try a few times + for cc.currQueryIter[iter] = 0; cc.currQueryIter[iter] < cc.NumFinalQueryAttempts; cc.currQueryIter[iter]++ { + if quit() { + break + } + + var pResp *pb.ProposalResponse + var err error + if pResp, err = chaincode.ChaincodeInvokeOrQuery(spec, chainID, false, signer, ec, bc); err != nil { + cc.queryErrs[iter] = err + break + } + + resp := pResp.Response.Payload + + if quit() { + break + } + + //if it fails, we try again + if err = cc.shadowCC.Validate(args, resp); err == nil { + //appears to have worked + cc.queryWorked[iter] = true + cc.queryErrs[iter] = nil + break + } + + //save query error + cc.queryErrs[iter] = err + + if quit() { + break + } + + //try again + if cc.DelayBetweenQueryMs > 0 { + time.Sleep(time.Duration(cc.DelayBetweenQueryMs) * time.Millisecond) + } + } + + return +} + +//Validate test that was Run. Each successful iteration in the run is validated against +func (cc *CC) Validate(ctxt context.Context, chainID string, bc common.BroadcastClient, ec pb.EndorserClient, signer msp.SigningIdentity, wg *sync.WaitGroup) error { + defer wg.Done() + + //this will signal inner validators to get out via + //closure + var quit bool + + //use 1 so sender doesn't block (he doesn't care if is was receivd. + //makes sure goroutine exits) + done := make(chan struct{}, 1) + go func() { + defer func() { done <- struct{}{} }() + + var innerwg sync.WaitGroup + innerwg.Add(cc.currentInvokeIter) + + //initialize for querying + cc.currQueryIter = make([]int, cc.currentInvokeIter) + cc.queryWorked = make([]bool, cc.currentInvokeIter) + cc.queryErrs = make([]error, cc.currentInvokeIter) + + //give some time for the invokes to commit for this cc + time.Sleep(time.Duration(cc.WaitAfterInvokeMs) * time.Millisecond) + + //return the quit closure for validation within validateIter + quitF := func() bool { return quit } + + //try only till successful invoke iterations + for i := 0; i < cc.currentInvokeIter; i++ { + go func(iter int) { + cc.validateIter(ctxt, iter, chainID, bc, ec, signer, &innerwg, quitF) + }(i) + } + + //shouldn't block the sender go routine on cleanup + qDone := make(chan struct{}, 1) + + //wait for the above queries to be done + go func() { innerwg.Wait(); qDone <- struct{}{} }() + + //we could be done or cancelled + select { + case <-qDone: + case <-ctxt.Done(): + } + }() + + //we could be done or cancelled or timedout + select { + case <-ctxt.Done(): + //we don't know why it was cancelled but it was cancelled + quit = true + return nil + case <-done: + //for done does not return an err. The query validation stores chaincode errors + //Only error that's left to handle is timeout error for this chaincode below + return nil + case <-time.After(time.Duration(cc.TimeoutToAbortSecs) * time.Second): + quit = true + return fmt.Errorf("Aborting due to timeoutout!!") + } +} + +//Report reports chaincode test execution, iter by iter +func (cc *CC) Report(verbose bool, chainID string) { + fmt.Printf("%s/%s(%d)\n", cc.Name, chainID, cc.ID) + fmt.Printf("\tNum successful invokes: %d(%d)\n", cc.currentInvokeIter, cc.NumberOfIterations) + if cc.invokeErr != nil { + fmt.Printf("\tError on invoke: %s\n", cc.invokeErr) + } + //test to see if validate was called (validate alloc the arrays, one of which is queryWorked) + if cc.queryWorked != nil { + for i := 0; i < cc.currentInvokeIter; i++ { + fmt.Printf("\tQuery(%d) : succeeded-%t, num trials-%d(%d), error if any(%s)\n", i, cc.queryWorked[i], cc.currQueryIter[i], cc.NumFinalQueryAttempts, cc.queryErrs[i]) + } + } else { + fmt.Printf("\tQuery validation appears not have been performed(#invokes-%d). timed out ?\n", cc.currentInvokeIter) + } + //total actual time for cc.currentInvokeIter + invokeTime := (cc.invokeEndTime-cc.invokeStartTime)*1000 - (cc.DelayBetweenInvokeMs * (cc.currentInvokeIter - 1)) + fmt.Printf("\tTime for invokes(ms): %d\n", invokeTime) + + fmt.Printf("\tFinal query worked ? %t\n", cc.queryWorked) +} diff --git a/examples/ccchecker/chaincodes/newkeyperinvoke/newkeyperinvoke.go b/examples/ccchecker/chaincodes/newkeyperinvoke/newkeyperinvoke.go new file mode 100644 index 00000000000..c36ef974b5e --- /dev/null +++ b/examples/ccchecker/chaincodes/newkeyperinvoke/newkeyperinvoke.go @@ -0,0 +1,64 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "fmt" + + "github.com/hyperledger/fabric/core/chaincode/shim" +) + +// NewKeyPerInvoke is allows the following transactions +// "put", "key", val - returns "OK" on success +// "get", "key" - returns val stored previously +type NewKeyPerInvoke struct { +} + +//Init implements chaincode's Init interface +func (t *NewKeyPerInvoke) Init(stub shim.ChaincodeStubInterface) ([]byte, error) { + return nil, nil +} + +//Invoke implements chaincode's Invoke interface +func (t *NewKeyPerInvoke) Invoke(stub shim.ChaincodeStubInterface) ([]byte, error) { + args := stub.GetArgs() + if len(args) < 2 { + return nil, fmt.Errorf("invalid number of args %d", len(args)) + } + f := string(args[0]) + if f == "put" { + if len(args) < 3 { + return nil, fmt.Errorf("invalid number of args for put %d", len(args)) + } + err := stub.PutState(string(args[1]), args[2]) + if err != nil { + return nil, err + } + return []byte("OK"), nil + } else if f == "get" { + // Get the state from the ledger + return stub.GetState(string(args[1])) + } + return nil, fmt.Errorf("unknown function %s", f) +} + +func main() { + err := shim.Start(new(NewKeyPerInvoke)) + if err != nil { + fmt.Printf("Error starting New key per invoke: %s", err) + } +} diff --git a/examples/ccchecker/chaincodes/newkeyperinvoke/shadow/newkeyperinvoke.go b/examples/ccchecker/chaincodes/newkeyperinvoke/shadow/newkeyperinvoke.go new file mode 100644 index 00000000000..74ff80155a5 --- /dev/null +++ b/examples/ccchecker/chaincodes/newkeyperinvoke/shadow/newkeyperinvoke.go @@ -0,0 +1,110 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package shadow + +import ( + "bytes" + "fmt" + "sync" +) + +// NewKeyPerInvoke is the shadow implementation for NewKeyPerInvoke in the parent package +// The shadow provides invoke arguments that are guaranteed to be result in unique ledger +// entries as long as the parameters to GetInvokeArgs are unique +type NewKeyPerInvoke struct { + sync.Mutex + state map[string][]byte +} + +//---------- implements ShadowCCIntf functions ------- + +//InitShadowCC initializes CC +func (t *NewKeyPerInvoke) InitShadowCC() { + t.state = make(map[string][]byte) +} + +//setState sets the state +func (t *NewKeyPerInvoke) setState(key []byte, val []byte) { + t.Lock() + t.state[string(key)] = val + t.Unlock() +} + +//getState gets the state +func (t *NewKeyPerInvoke) getState(key []byte) ([]byte, bool) { + t.Lock() + defer t.Unlock() + v, ok := t.state[string(key)] + return v, ok +} + +//GetInvokeArgs get args for invoke based on chaincode ID and iteration num +func (t *NewKeyPerInvoke) GetInvokeArgs(ccnum int, iter int) [][]byte { + args := make([][]byte, 3) + args[0] = []byte("put") + args[1] = []byte(fmt.Sprintf("%d_%d", ccnum, iter)) + args[2] = []byte(fmt.Sprintf("%d", ccnum)) + + return args +} + +//PostInvoke store the the key/val for later verification +func (t *NewKeyPerInvoke) PostInvoke(args [][]byte, resp []byte) error { + if len(args) < 3 { + return fmt.Errorf("invalid number of args posted %d", len(args)) + } + + if string(args[0]) != "put" { + return fmt.Errorf("invalid args posted %s", args[0]) + } + + //the actual CC should have returned OK for success + if string(resp) != "OK" { + return fmt.Errorf("invalid response %s", string(resp)) + } + + t.setState(args[1], args[2]) + + return nil +} + +//Validate the key/val with mem storage +func (t *NewKeyPerInvoke) Validate(args [][]byte, value []byte) error { + if len(args) < 2 { + return fmt.Errorf("invalid number of args for validate %d", len(args)) + } + + if string(args[0]) != "get" { + return fmt.Errorf("invalid validate function %s", args[0]) + } + + if v, ok := t.getState(args[1]); !ok { + return fmt.Errorf("key not found %s", args[1]) + } else if !bytes.Equal(v, value) { + return fmt.Errorf("expected(%s) but found (%s)", string(v), string(value)) + } + + return nil +} + +//GetQueryArgs returns the query for the iter to test against +func (t *NewKeyPerInvoke) GetQueryArgs(ccnum int, iter int) [][]byte { + args := make([][]byte, 2) + args[0] = []byte("get") + args[1] = []byte(fmt.Sprintf("%d_%d", ccnum, iter)) + return args +} diff --git a/examples/ccchecker/chaincodes/registershadow.go b/examples/ccchecker/chaincodes/registershadow.go new file mode 100644 index 00000000000..b57d3d9a264 --- /dev/null +++ b/examples/ccchecker/chaincodes/registershadow.go @@ -0,0 +1,60 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package chaincodes + +import ( + "fmt" + + //shadow chaincodes to be registered + nkpi "github.com/hyperledger/fabric/examples/ccchecker/chaincodes/newkeyperinvoke/shadow" +) + +//all the statically registered shadow chaincodes that can be used +var shadowCCs = map[string]ShadowCCIntf{ + "github.com/hyperledger/fabric/examples/ccchecker/chaincodes/newkeyperinvoke": &nkpi.NewKeyPerInvoke{}, +} + +//RegisterCCs registers all possible chaincodes that can be used in test +func RegisterCCs(ccs []*CC) error { + inUse := make(map[string]ShadowCCIntf) + for _, cc := range ccs { + scc, ok := shadowCCs[cc.Path] + if !ok || scc == nil { + return fmt.Errorf("%s not a registered chaincode", cc.Path) + } + if _, ok := inUse[cc.Path]; !ok { + inUse[cc.Path] = scc + } + //setup the shadow chaincode to plug into the ccchecker framework + cc.shadowCC = scc + } + + //initialize a shadow chaincode just once. A chaincode may be used + //multiple times in test run + for _, cc := range inUse { + cc.InitShadowCC() + } + + return nil +} + +//ListShadowCCs lists all registered shadow ccs in the library +func ListShadowCCs() { + for key := range shadowCCs { + fmt.Printf("\t%s\n", key) + } +} diff --git a/examples/ccchecker/init.go b/examples/ccchecker/init.go new file mode 100644 index 00000000000..85ca1eccc8d --- /dev/null +++ b/examples/ccchecker/init.go @@ -0,0 +1,79 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "fmt" + "os" + "strings" + + "github.com/spf13/pflag" + "github.com/spf13/viper" + + "github.com/hyperledger/fabric/peer/common" +) + +//This is where all initializations take place. These closley follow CLI +//initializations. + +//read CC checker configuration from -s . Defaults to ccchecker.json +func initCCCheckerParams(mainFlags *pflag.FlagSet) { + configFile := "" + mainFlags.StringVarP(&configFile, "config", "s", "ccchecker.json", "CC Checker config file ") + + err := LoadCCCheckerParams(configFile) + if err != nil { + fmt.Printf("error unmarshalling ccchecker: %s\n", err) + os.Exit(1) + } +} + +//read yaml file from -y . Defaults to ../../peer +func initYaml(mainFlags *pflag.FlagSet) { + // For environment variables. + viper.SetEnvPrefix(cmdRoot) + viper.AutomaticEnv() + replacer := strings.NewReplacer(".", "_") + viper.SetEnvKeyReplacer(replacer) + + pathToYaml := "" + mainFlags.StringVarP(&pathToYaml, "yamlfile", "y", "../../peer", "Path to core.yaml defined for peer") + + err := common.InitConfig(cmdRoot) + if err != nil { // Handle errors reading the config file + fmt.Printf("Fatal error when reading %s config file: %s\n", cmdRoot, err) + os.Exit(2) + } +} + +//initialize MSP from -m . Defaults to ../../msp/sampleconfig +func initMSP(mainFlags *pflag.FlagSet) { + mspMgrConfigDir := "" + mainFlags.StringVarP(&mspMgrConfigDir, "mspcfgdir", "m", "../../msp/sampleconfig/", "Path to MSP dir") + + err := common.InitCrypto(mspMgrConfigDir) + if err != nil { + panic(err.Error()) + } +} + +//InitCCCheckerEnv initialize the CCChecker environment +func InitCCCheckerEnv(mainFlags *pflag.FlagSet) { + initCCCheckerParams(mainFlags) + initYaml(mainFlags) + initMSP(mainFlags) +} diff --git a/examples/ccchecker/main.go b/examples/ccchecker/main.go new file mode 100644 index 00000000000..138d5f3b75f --- /dev/null +++ b/examples/ccchecker/main.go @@ -0,0 +1,62 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "fmt" + "os" + + "github.com/op/go-logging" + "github.com/spf13/cobra" + + _ "net/http/pprof" +) + +var logger = logging.MustGetLogger("main") + +// Constants go here. +const cmdRoot = "core" + +// The main command describes the service and +// defaults to printing the help message. +var mainCmd = &cobra.Command{ + Use: "", + Run: func(cmd *cobra.Command, args []string) { + run(args) + }, +} + +func main() { + mainFlags := mainCmd.PersistentFlags() + + //initialize the env + InitCCCheckerEnv(mainFlags) + + // On failure Cobra prints the usage message and error string, so we only + // need to exit with a non-0 status + if mainCmd.Execute() != nil { + os.Exit(1) + } +} + +func run(args []string) { + CCCheckerInit() + //TODO make parameters out of report and verbose + CCCheckerRun(true, true) + fmt.Printf("Test complete\n") + return +} diff --git a/peer/chaincode/common.go b/peer/chaincode/common.go index 7517b814108..aaffa1b235c 100755 --- a/peer/chaincode/common.go +++ b/peer/chaincode/common.go @@ -145,65 +145,18 @@ func getChaincodeSpecification(cmd *cobra.Command) (*pb.ChaincodeSpec, error) { return spec, nil } -// chaincodeInvokeOrQuery invokes or queries the chaincode. If successful, the -// INVOKE form prints the ProposalResponse to STDOUT, and the QUERY form prints -// the query result on STDOUT. A command-line flag (-r, --raw) determines -// whether the query result is output as raw bytes, or as a printable string. -// The printable form is optionally (-x, --hex) a hexadecimal representation -// of the query response. If the query response is NIL, nothing is output. -// -// NOTE - Query will likely go away as all interactions with the endorser are -// Proposal and ProposalResponses func chaincodeInvokeOrQuery(cmd *cobra.Command, args []string, invoke bool, cf *ChaincodeCmdFactory) (err error) { spec, err := getChaincodeSpecification(cmd) if err != nil { return err } - // Build the ChaincodeInvocationSpec message - invocation := &pb.ChaincodeInvocationSpec{ChaincodeSpec: spec} - if customIDGenAlg != common.UndefinedParamValue { - invocation.IdGenerationAlg = customIDGenAlg - } - - creator, err := cf.Signer.Serialize() + proposalResp, err := ChaincodeInvokeOrQuery(spec, chainID, invoke, cf.Signer, cf.EndorserClient, cf.BroadcastClient) if err != nil { - return fmt.Errorf("Error serializing identity for %s: %s\n", cf.Signer.GetIdentifier(), err) - } - - uuid := cutil.GenerateUUID() - - var prop *pb.Proposal - prop, err = putils.CreateProposalFromCIS(uuid, chainID, invocation, creator) - if err != nil { - return fmt.Errorf("Error creating proposal %s: %s\n", chainFuncName, err) - } - - var signedProp *pb.SignedProposal - signedProp, err = putils.GetSignedProposal(prop, cf.Signer) - if err != nil { - return fmt.Errorf("Error creating signed proposal %s: %s\n", chainFuncName, err) - } - - var proposalResp *pb.ProposalResponse - proposalResp, err = cf.EndorserClient.ProcessProposal(context.Background(), signedProp) - if err != nil { - return fmt.Errorf("Error endorsing %s: %s\n", chainFuncName, err) + return err } if invoke { - if proposalResp != nil { - // assemble a signed transaction (it's an Envelope message) - env, err := putils.CreateSignedTx(prop, cf.Signer, proposalResp) - if err != nil { - return fmt.Errorf("Could not assemble transaction, err %s", err) - } - - // send the envelope for ordering - if err = cf.BroadcastClient.Send(env); err != nil { - return fmt.Errorf("Error sending transaction %s: %s\n", chainFuncName, err) - } - } logger.Infof("Invoke result: %v", proposalResp) } else { if proposalResp == nil { @@ -225,8 +178,7 @@ func chaincodeInvokeOrQuery(cmd *cobra.Command, args []string, invoke bool, cf * } } } - - return nil + return err } func checkChaincodeCmdParams(cmd *cobra.Command) error { @@ -301,3 +253,67 @@ func InitCmdFactory() (*ChaincodeCmdFactory, error) { BroadcastClient: broadcastClient, }, nil } + +// ChaincodeInvokeOrQuery invokes or queries the chaincode. If successful, the +// INVOKE form prints the ProposalResponse to STDOUT, and the QUERY form prints +// the query result on STDOUT. A command-line flag (-r, --raw) determines +// whether the query result is output as raw bytes, or as a printable string. +// The printable form is optionally (-x, --hex) a hexadecimal representation +// of the query response. If the query response is NIL, nothing is output. +// +// NOTE - Query will likely go away as all interactions with the endorser are +// Proposal and ProposalResponses +func ChaincodeInvokeOrQuery(spec *pb.ChaincodeSpec, cID string, invoke bool, signer msp.SigningIdentity, endorserClient pb.EndorserClient, bc common.BroadcastClient) (*pb.ProposalResponse, error) { + // Build the ChaincodeInvocationSpec message + invocation := &pb.ChaincodeInvocationSpec{ChaincodeSpec: spec} + if customIDGenAlg != common.UndefinedParamValue { + invocation.IdGenerationAlg = customIDGenAlg + } + + creator, err := signer.Serialize() + if err != nil { + return nil, fmt.Errorf("Error serializing identity for %s: %s", signer.GetIdentifier(), err) + } + + uuid := cutil.GenerateUUID() + + funcName := "invoke" + if !invoke { + funcName = "query" + } + + var prop *pb.Proposal + prop, err = putils.CreateProposalFromCIS(uuid, cID, invocation, creator) + if err != nil { + return nil, fmt.Errorf("Error creating proposal %s: %s", funcName, err) + } + + var signedProp *pb.SignedProposal + signedProp, err = putils.GetSignedProposal(prop, signer) + if err != nil { + return nil, fmt.Errorf("Error creating signed proposal %s: %s", funcName, err) + } + + var proposalResp *pb.ProposalResponse + proposalResp, err = endorserClient.ProcessProposal(context.Background(), signedProp) + if err != nil { + return nil, fmt.Errorf("Error endorsing %s: %s", funcName, err) + } + + if invoke { + if proposalResp != nil { + // assemble a signed transaction (it's an Envelope message) + env, err := putils.CreateSignedTx(prop, signer, proposalResp) + if err != nil { + return proposalResp, fmt.Errorf("Could not assemble transaction, err %s", err) + } + + // send the envelope for ordering + if err = bc.Send(env); err != nil { + return proposalResp, fmt.Errorf("Error sending transaction %s: %s", funcName, err) + } + } + } + + return proposalResp, nil +} diff --git a/peer/common/common.go b/peer/common/common.go index 1f693bf923b..b18f6747dc8 100755 --- a/peer/common/common.go +++ b/peer/common/common.go @@ -18,7 +18,10 @@ package common import ( "fmt" + "os" + "path/filepath" + "github.com/hyperledger/fabric/core/crypto/primitives" "github.com/hyperledger/fabric/core/errors" "github.com/hyperledger/fabric/core/flogging" "github.com/hyperledger/fabric/core/peer" @@ -31,6 +34,52 @@ import ( // UndefinedParamValue defines what undefined parameters in the command line will initialise to const UndefinedParamValue = "" +//InitConfig initializes viper config +func InitConfig(cmdRoot string) error { + var alternativeCfgPath = os.Getenv("PEER_CFG_PATH") + if alternativeCfgPath != "" { + viper.AddConfigPath(alternativeCfgPath) // Path to look for the config file in + } else { + viper.AddConfigPath("./") // Path to look for the config file in + // Path to look for the config file in based on GOPATH + gopath := os.Getenv("GOPATH") + for _, p := range filepath.SplitList(gopath) { + peerpath := filepath.Join(p, "src/github.com/hyperledger/fabric/peer") + viper.AddConfigPath(peerpath) + } + } + + // Now set the configuration file. + viper.SetConfigName(cmdRoot) // Name of config file (without extension) + + err := viper.ReadInConfig() // Find and read the config file + if err != nil { // Handle errors reading the config file + return fmt.Errorf("Fatal error when reading %s config file: %s\n", cmdRoot, err) + } + + return nil +} + +//InitCrypto initializes crypto for this peer +func InitCrypto(mspMgrConfigDir string) error { + // Init the crypto layer + //TODO: integrate new crypto / idp code + primitives.SetSecurityLevel("SHA2", 256) + + // FIXME: when this peer joins a chain, it should get the + // config for that chain with the list of MSPs that the + // chain uses; however this is not yet implemented. + // Additionally, we might always want to have an MSP for + // the local test chain so that we can run tests with the + // peer CLI. This is why we create this fake setup here for now + err := mspmgmt.LoadFakeSetupWithLocalMspAndTestChainMsp(mspMgrConfigDir) + if err != nil { + return fmt.Errorf("Fatal error when setting up MSP from directory %s: err %s\n", mspMgrConfigDir, err) + } + + return nil +} + // GetEndorserClient returns a new endorser client connection for this peer func GetEndorserClient() (pb.EndorserClient, error) { clientConn, err := peer.NewPeerClientConnection() diff --git a/peer/main.go b/peer/main.go index ae5c283c46f..10aa660d613 100644 --- a/peer/main.go +++ b/peer/main.go @@ -19,7 +19,6 @@ package main import ( "fmt" "os" - "path/filepath" "runtime" "strings" @@ -30,11 +29,10 @@ import ( _ "net/http/pprof" "github.com/hyperledger/fabric/core" - "github.com/hyperledger/fabric/core/crypto/primitives" "github.com/hyperledger/fabric/core/flogging" - "github.com/hyperledger/fabric/core/peer/msp" "github.com/hyperledger/fabric/peer/chaincode" "github.com/hyperledger/fabric/peer/clilogging" + "github.com/hyperledger/fabric/peer/common" "github.com/hyperledger/fabric/peer/node" "github.com/hyperledger/fabric/peer/version" ) @@ -83,26 +81,9 @@ func main() { testCoverProfile := "" mainFlags.StringVarP(&testCoverProfile, "test.coverprofile", "", "coverage.cov", "Done") - var alternativeCfgPath = os.Getenv("PEER_CFG_PATH") - if alternativeCfgPath != "" { - logger.Infof("User defined config file path: %s", alternativeCfgPath) - viper.AddConfigPath(alternativeCfgPath) // Path to look for the config file in - } else { - viper.AddConfigPath("./") // Path to look for the config file in - // Path to look for the config file in based on GOPATH - gopath := os.Getenv("GOPATH") - for _, p := range filepath.SplitList(gopath) { - peerpath := filepath.Join(p, "src/github.com/hyperledger/fabric/peer") - viper.AddConfigPath(peerpath) - } - } - - // Now set the configuration file. - viper.SetConfigName(cmdRoot) // Name of config file (without extension) - - err := viper.ReadInConfig() // Find and read the config file - if err != nil { // Handle errors reading the config file - panic(fmt.Errorf("Fatal error when reading %s config file: %s\n", cmdRoot, err)) + err := common.InitConfig(cmdRoot) + if err != nil { // Handle errors reading the config file + panic(fmt.Errorf("Fatal error when initializing %s config : %s\n", cmdRoot, err)) } mainCmd.AddCommand(version.Cmd()) @@ -112,13 +93,10 @@ func main() { runtime.GOMAXPROCS(viper.GetInt("peer.gomaxprocs")) - // Init the crypto layer - //TODO: integrate new crypto / idp code - primitives.SetSecurityLevel("SHA2", 256) - // Init the MSP // TODO: determine the location of this config file var mspMgrConfigDir string + var alternativeCfgPath = os.Getenv("PEER_CFG_PATH") if alternativeCfgPath != "" { mspMgrConfigDir = alternativeCfgPath + "/msp/sampleconfig/" } else if _, err := os.Stat("./msp/sampleconfig/"); err == nil { @@ -127,17 +105,10 @@ func main() { mspMgrConfigDir = os.Getenv("GOPATH") + "/src/github.com/hyperledger/fabric/msp/sampleconfig/" } - // FIXME: when this peer joins a chain, it should get the - // config for that chain with the list of MSPs that the - // chain uses; however this is not yet implemented. - // Additionally, we might always want to have an MSP for - // the local test chain so that we can run tests with the - // peer CLI. This is why we create this fake setup here for now - err = mspmgmt.LoadFakeSetupWithLocalMspAndTestChainMsp(mspMgrConfigDir) - if err != nil { - panic(fmt.Errorf("Fatal error when setting up MSP from directory %s: err %s\n", mspMgrConfigDir, err)) + err = common.InitCrypto(mspMgrConfigDir) + if err != nil { // Handle errors reading the config file + panic(err.Error()) } - // On failure Cobra prints the usage message and error string, so we only // need to exit with a non-0 status if mainCmd.Execute() != nil {