Skip to content

Commit

Permalink
[FAB-7580] simplify READY state
Browse files Browse the repository at this point in the history
The chaincode support code in the peer sends a READY message to
the chaincode after receiving notification of registration. The
code path can be shortened (basically avoid a channel notification
followed by call to send READY) where the READY is sent immediately
following receipt of register from chaincode.

This code and codepath reduction further simplifies the chaincode
cleanup.

This was tested with
   . UT
   . ccchecker
   . go CC devmode
   . java Chaincode

Change-Id: I7cfb85d4c05e9073bb628d74c84013ce29ad77bc
Signed-off-by: Srinivasan Muralidharan <[email protected]>
  • Loading branch information
muralisrini committed Mar 13, 2018
1 parent e54cb8a commit d6b80f2
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 168 deletions.
71 changes: 2 additions & 69 deletions core/chaincode/chaincode_support.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,7 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Copyright IBM Corp. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
SPDX-License-Identifier: Apache-2.0
*/

package chaincode
Expand Down Expand Up @@ -311,49 +301,6 @@ func (chaincodeSupport *ChaincodeSupport) deregisterHandler(chaincodehandler *Ha
return nil
}

// send ready to move to ready state
func (chaincodeSupport *ChaincodeSupport) sendReady(context context.Context, cccid *ccprovider.CCContext, timeout time.Duration) error {
canName := cccid.GetCanonicalName()
chaincodeSupport.runningChaincodes.Lock()
//if its in the map, there must be a connected stream...nothing to do
var chrte *chaincodeRTEnv
var ok bool
if chrte, ok = chaincodeSupport.chaincodeHasBeenLaunched(canName); !ok {
chaincodeSupport.runningChaincodes.Unlock()
err := errors.Errorf("handler not found for chaincode %s", canName)
chaincodeLogger.Debugf("%+v", err)
return err
}
chaincodeSupport.runningChaincodes.Unlock()

var notfy chan *pb.ChaincodeMessage
var err error
if notfy, err = chrte.handler.ready(context, cccid.ChainID, cccid.TxID, cccid.SignedProposal, cccid.Proposal); err != nil {
return errors.WithMessage(err, fmt.Sprintf("error sending %s", pb.ChaincodeMessage_READY))
}
if notfy != nil {
select {
case ccMsg := <-notfy:
switch ccMsg.Type {
case pb.ChaincodeMessage_ERROR:
err = errors.Errorf("error initializing container %s: %s", canName, string(ccMsg.Payload))
case pb.ChaincodeMessage_READY:
chaincodeLogger.Infof("chaincode %s ready to accept requests", canName)
default:
//by construction, we cannot (should not) get anything other than ERROR or READY
panic(fmt.Sprintf("Invalid ready message %+v", ccMsg))
}
case <-time.After(timeout):
err = errors.New("timeout expired while executing send init message")
}
}

//if initOrReady succeeded, our responsibility to delete the context
chrte.handler.deleteTxContext(cccid.ChainID, cccid.TxID)

return err
}

// returns a map of file path <-> []byte for all files related to TLS
func (chaincodeSupport *ChaincodeSupport) getTLSFiles(keyPair *accesscontrol.CertAndPrivKeyPair) map[string][]byte {
if keyPair == nil {
Expand Down Expand Up @@ -733,20 +680,6 @@ func (chaincodeSupport *ChaincodeSupport) Launch(context context.Context, cccid
}
}

if err == nil {
//launch will set the chaincode in Ready state
err = chaincodeSupport.sendReady(context, cccid, chaincodeSupport.ccStartupTimeout)
if err != nil {
err = errors.WithMessage(err, "failed to init chaincode")
chaincodeLogger.Errorf("%+v", err)
errIgnore := chaincodeSupport.Stop(context, cccid, cds)
if errIgnore != nil {
chaincodeLogger.Errorf("stop failed: %+v", errIgnore)
}
}
chaincodeLogger.Debug("sending init completed")
}

chaincodeLogger.Debug("LaunchChaincode complete")

return cID, cMsg, err
Expand Down
117 changes: 38 additions & 79 deletions core/chaincode/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@ import (
"golang.org/x/net/context"
)

type state string

const (
created = "created" //start state
established = "established" //in: CREATED, rcv: REGISTER, send: REGISTERED
ready = "ready" //in:ESTABLISHED, rcv:COMPLETED
created state = "created" //start state
established state = "established" //in: CREATED, rcv: REGISTER, send: REGISTERED
ready state = "ready" //in:ESTABLISHED, rcv:COMPLETED

)

var chaincodeLogger = flogging.MustGetLogger("chaincode")
Expand Down Expand Up @@ -64,7 +67,7 @@ type Handler struct {
//peer to shim grpc serializer. User only in serialSend
serialLock sync.Mutex
ChatStream ccintf.ChaincodeStream
state string
state state
ChaincodeID *pb.ChaincodeID
ccInstance *sysccprovider.ChaincodeInstance

Expand All @@ -80,13 +83,9 @@ type Handler struct {

txidMap map[string]bool

// used to serialize state changes
nextStateChan chan *pb.ChaincodeMessage

//handlers for each state of the handler
readyStateHandlers stateHandlers
createStateHandlers stateHandlers
estStateHandlers stateHandlers
}

func shorttxid(txid string) string {
Expand Down Expand Up @@ -267,11 +266,6 @@ func (handler *Handler) deregister() error {
return nil
}

//serialize external state changes such as "ready" (and currently only "ready")
func (handler *Handler) nextState(msg *pb.ChaincodeMessage) {
handler.nextStateChan <- msg
}

func (handler *Handler) waitForKeepaliveTimer() <-chan time.Time {
if handler.chaincodeSupport.keepalive > 0 {
c := time.After(handler.chaincodeSupport.keepalive)
Expand Down Expand Up @@ -354,13 +348,6 @@ func (handler *Handler) processStream() error {
//(maybe it'll work later)
handler.serialSendAsync(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_KEEPALIVE}, false)
continue
case in = <-handler.nextStateChan:
if in == nil {
err = errors.New("next state nil message, ending chaincode support stream")
chaincodeLogger.Debugf("%+v", err)
return err
}
chaincodeLogger.Debugf("[%s]Move state message %s", shorttxid(in.Txid), in.Type)
}

err = handler.handleMessage(in)
Expand All @@ -384,7 +371,6 @@ func newChaincodeSupportHandler(chaincodeSupport *ChaincodeSupport, peerChatStre
v := &Handler{
ChatStream: peerChatStream,
chaincodeSupport: chaincodeSupport,
nextStateChan: make(chan *pb.ChaincodeMessage),
state: created,
errChan: make(chan error, 1),
}
Expand All @@ -411,10 +397,6 @@ func newChaincodeSupportHandler(chaincodeSupport *ChaincodeSupport, peerChatStre
pb.ChaincodeMessage_REGISTER: v.handleRegister,
}

v.estStateHandlers = stateHandlers{
pb.ChaincodeMessage_READY: v.handleReady,
}

return v
}

Expand Down Expand Up @@ -443,10 +425,35 @@ func (handler *Handler) deleteTXIDEntry(channelID, txid string) {
}
}

//sendReady sends READY to chaincode serially (just like REGISTER)
func (handler *Handler) sendReady() error {
chaincodeLogger.Debugf("sending READY for chaincode %+v", handler.ChaincodeID)
ccMsg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_READY}

//if error in sending tear down the handler
if err := handler.serialSend(ccMsg); err != nil {
chaincodeLogger.Errorf("error sending READY (%s) for chaincode %+v", err, handler.ChaincodeID)
return err
}

handler.state = ready

chaincodeLogger.Debugf("Changed to state ready for chaincode %+v", handler.ChaincodeID)

return nil
}

//notifyDuringStartup will send ready on registration
func (handler *Handler) notifyDuringStartup(val bool) {
//if USER_RUNS_CC readyNotify will be nil
if handler.readyNotify != nil {
chaincodeLogger.Debug("Notifying during startup")
if val {
//if send failed, notify failure which will initiate
//tearing down
if err := handler.sendReady(); err != nil {
val = false
}
}
handler.readyNotify <- val
return
}
Expand All @@ -461,15 +468,7 @@ func (handler *Handler) notifyDuringStartup(val bool) {
//chaincode only in dev mode (ie, peer started with --peer-chaincodedev=true)
if handler.chaincodeSupport.userRunsCC {
if val {
chaincodeLogger.Debug("(dev mode) sending READY")
ccMsg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_READY}

//if error in sending tear down the handler
handler.serialSendAsync(ccMsg, true)

handler.state = ready

chaincodeLogger.Debugf("[%s](dev mode) Changed to state ready", shorttxid(ccMsg.Txid))
handler.sendReady()
} else {
chaincodeLogger.Errorf("(dev mode) Error during startup .. not sending READY")
}
Expand All @@ -480,7 +479,7 @@ func (handler *Handler) notifyDuringStartup(val bool) {

// handleRegister is invoked when chaincode tries to register.
func (handler *Handler) handleRegister(msg *pb.ChaincodeMessage) {
chaincodeLogger.Debugf("[%s]Received %s in state %s", shorttxid(msg.Txid), msg.Type, handler.state)
chaincodeLogger.Debugf("Received %s in state %s", msg.Type, handler.state)
chaincodeID := &pb.ChaincodeID{}
err := proto.Unmarshal(msg.Payload, chaincodeID)
if err != nil {
Expand All @@ -500,7 +499,7 @@ func (handler *Handler) handleRegister(msg *pb.ChaincodeMessage) {
//name in keys
handler.decomposeRegisteredName(handler.ChaincodeID)

chaincodeLogger.Debugf("[%s]Got %s for chaincodeID = %s, sending back %s", shorttxid(msg.Txid), pb.ChaincodeMessage_REGISTER, chaincodeID, pb.ChaincodeMessage_REGISTERED)
chaincodeLogger.Debugf("Got %s for chaincodeID = %s, sending back %s", pb.ChaincodeMessage_REGISTER, chaincodeID, pb.ChaincodeMessage_REGISTERED)
if err := handler.serialSend(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTERED}); err != nil {
chaincodeLogger.Errorf("Error sending %s: %s", pb.ChaincodeMessage_REGISTERED, err)
handler.notifyDuringStartup(false)
Expand All @@ -509,28 +508,12 @@ func (handler *Handler) handleRegister(msg *pb.ChaincodeMessage) {

handler.state = established

chaincodeLogger.Debugf("[%s]Changed state to established", shorttxid(msg.Txid))
chaincodeLogger.Debugf("Changed state to established for %+v", handler.ChaincodeID)

//for dev mode this will also move to ready automatically
handler.notifyDuringStartup(true)
}

//handleReady is invoked when peer (non dev mode) moves to ready
func (handler *Handler) handleReady(msg *pb.ChaincodeMessage) {
//send the ready synchronousley
if err := handler.serialSend(msg); err != nil {
chaincodeLogger.Errorf("[%s]Error sending message(%s), notifying error", shorttxid(msg.Txid), err)
errMsg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Txid: msg.Txid, ChannelId: msg.ChannelId, Payload: []byte(err.Error())}
handler.notify(errMsg)
handler.errChan <- err
return
}

chaincodeLogger.Debugf("[%s]Entered state ready", shorttxid(msg.Txid))
handler.state = ready
handler.notify(msg)
}

func (handler *Handler) notify(msg *pb.ChaincodeMessage) {
handler.Lock()
defer handler.Unlock()
Expand Down Expand Up @@ -1288,27 +1271,6 @@ func (handler *Handler) setChaincodeProposal(signedProp *pb.SignedProposal, prop
return nil
}

//Ready is sent after launching the chaincode to sets the shim's state internally to READY.
func (handler *Handler) ready(ctxt context.Context, chainID string, txid string, signedProp *pb.SignedProposal, prop *pb.Proposal) (chan *pb.ChaincodeMessage, error) {
txctx, funcErr := handler.createTxContext(ctxt, chainID, txid, signedProp, prop)
if funcErr != nil {
return nil, funcErr
}

chaincodeLogger.Debug("sending READY")
ccMsg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_READY, Txid: txid, ChannelId: chainID}

//if security is disabled the context elements will just be nil
if err := handler.setChaincodeProposal(signedProp, prop, ccMsg); err != nil {
return nil, err
}

//move it to ready
handler.nextState(ccMsg)

return txctx.responseNotifier, nil
}

// handleMessage is the entrance method for Peer's handling of Chaincode messages.
func (handler *Handler) handleMessage(msg *pb.ChaincodeMessage) error {
chaincodeLogger.Debugf("[%s]Fabric side Handling ChaincodeMessage of type: %s in state %s", shorttxid(msg.Txid), msg.Type, handler.state)
Expand All @@ -1318,14 +1280,11 @@ func (handler *Handler) handleMessage(msg *pb.ChaincodeMessage) error {
case created:
//chaincode connects and puts into established
hFn = handler.createStateHandlers[msg.Type]
case established:
//peer moves to ready (when not in dev mode. In dev mode peer does not launch the CC)
hFn = handler.estStateHandlers[msg.Type]
case ready:
//chaincode state requests handled in ready
hFn = handler.readyStateHandlers[msg.Type]
default:
return fmt.Errorf("handleMessage-invalid state %s", handler.state)
return fmt.Errorf("[%s]handleMessage-invalid state %s", msg.Txid, handler.state)
}

if hFn == nil {
Expand Down
Loading

0 comments on commit d6b80f2

Please sign in to comment.