Skip to content

Commit

Permalink
[FAB-9587] keepalive and userRunsCC on handler
Browse files Browse the repository at this point in the history
- Explicitly propagate configuration from chaincode support to handler.
- Create internal interface to handler dependencies on chaincode
  support. This should be temporary.

Change-Id: I27c231a9d7e29fed4e128745ba3ede1e8a59c98b
Signed-off-by: Matthew Sykes <[email protected]>
  • Loading branch information
sykesm committed Apr 20, 2018
1 parent 8c15a34 commit 1c4ff73
Showing 1 changed file with 33 additions and 17 deletions.
50 changes: 33 additions & 17 deletions core/chaincode/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/flogging"
commonledger "github.com/hyperledger/fabric/common/ledger"
"github.com/hyperledger/fabric/common/resourcesconfig"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/aclmgmt"
"github.com/hyperledger/fabric/core/aclmgmt/resources"
Expand Down Expand Up @@ -61,6 +62,16 @@ type pendingQueryResult struct {

type stateHandlers map[pb.ChaincodeMessage_Type]func(*pb.ChaincodeMessage)

// internal interface to scope dependencies on ChaincodeSupport
type handlerSupport interface {
deregisterHandler(*Handler) error
registerHandler(*Handler) error

GetChaincodeDefinition(ctxt context.Context, txid string, signedProp *pb.SignedProposal, prop *pb.Proposal, chainID string, chaincodeID string) (resourcesconfig.ChaincodeDefinition, error)
Launch(context context.Context, cccid *ccprovider.CCContext, spec interface{}) (*pb.ChaincodeID, *pb.ChaincodeInput, error)
Execute(ctxt context.Context, cccid *ccprovider.CCContext, msg *pb.ChaincodeMessage, timeout time.Duration) (*pb.ChaincodeMessage, error)
}

// Handler responsible for management of Peer's side of chaincode stream
type Handler struct {
sync.Mutex
Expand All @@ -71,9 +82,9 @@ type Handler struct {
ChaincodeID *pb.ChaincodeID
ccInstance *sysccprovider.ChaincodeInstance

chaincodeSupport *ChaincodeSupport
registered bool
readyNotify chan bool
handlerSupport handlerSupport
registered bool
readyNotify chan bool

//chan to pass error in sync and nonsync mode
errChan chan error
Expand All @@ -86,6 +97,9 @@ type Handler struct {
//handlers for each state of the handler
readyStateHandlers stateHandlers
createStateHandlers stateHandlers

keepalive time.Duration
userRunsCC bool
}

func shorttxid(txid string) string {
Expand Down Expand Up @@ -261,14 +275,14 @@ func (handler *Handler) checkACL(signedProp *pb.SignedProposal, proposal *pb.Pro

func (handler *Handler) deregister() error {
if handler.registered {
handler.chaincodeSupport.deregisterHandler(handler)
handler.handlerSupport.deregisterHandler(handler)
}
return nil
}

func (handler *Handler) waitForKeepaliveTimer() <-chan time.Time {
if handler.chaincodeSupport.keepalive > 0 {
c := time.After(handler.chaincodeSupport.keepalive)
if handler.keepalive > 0 {
c := time.After(handler.keepalive)
return c
}
//no one will signal this channel, listener blocks forever
Expand Down Expand Up @@ -339,8 +353,8 @@ func (handler *Handler) processStream() error {
chaincodeLogger.Errorf("%s", err)
return err
case <-handler.waitForKeepaliveTimer():
if handler.chaincodeSupport.keepalive <= 0 {
chaincodeLogger.Errorf("Invalid select: keepalive not on (keepalive=%d)", handler.chaincodeSupport.keepalive)
if handler.keepalive <= 0 {
chaincodeLogger.Errorf("Invalid select: keepalive not on (keepalive=%d)", handler.keepalive)
continue
}

Expand Down Expand Up @@ -369,10 +383,12 @@ func HandleChaincodeStream(chaincodeSupport *ChaincodeSupport, ctxt context.Cont

func newChaincodeSupportHandler(chaincodeSupport *ChaincodeSupport, peerChatStream ccintf.ChaincodeStream) *Handler {
v := &Handler{
ChatStream: peerChatStream,
chaincodeSupport: chaincodeSupport,
state: created,
errChan: make(chan error, 1),
ChatStream: peerChatStream,
handlerSupport: chaincodeSupport,
state: created,
errChan: make(chan error, 1),
keepalive: chaincodeSupport.keepalive,
userRunsCC: chaincodeSupport.userRunsCC,
}

v.readyStateHandlers = stateHandlers{
Expand Down Expand Up @@ -466,7 +482,7 @@ func (handler *Handler) notifyDuringStartup(val bool) {
//environment where we can attach a chaincode manually. This could be
//useful .... but for now lets just be conservative and allow manual
//chaincode only in dev mode (ie, peer started with --peer-chaincodedev=true)
if handler.chaincodeSupport.userRunsCC {
if handler.userRunsCC {
if val {
handler.sendReady()
} else {
Expand All @@ -489,7 +505,7 @@ func (handler *Handler) handleRegister(msg *pb.ChaincodeMessage) {

// Now register with the chaincodeSupport
handler.ChaincodeID = chaincodeID
err = handler.chaincodeSupport.registerHandler(handler)
err = handler.handlerSupport.registerHandler(handler)
if err != nil {
handler.notifyDuringStartup(false)
return
Expand Down Expand Up @@ -1191,7 +1207,7 @@ func (handler *Handler) handleModState(msg *pb.ChaincodeMessage) {
var version string
if !isscc {
//if its a user chaincode, get the details
cd, err := handler.chaincodeSupport.GetChaincodeDefinition(ctxt, msg.Txid, txContext.signedProp, txContext.proposal, calledCcIns.ChainID, calledCcIns.ChaincodeName)
cd, err := handler.handlerSupport.GetChaincodeDefinition(ctxt, msg.Txid, txContext.signedProp, txContext.proposal, calledCcIns.ChainID, calledCcIns.ChaincodeName)
if err != nil {
errHandler([]byte(err.Error()), "[%s]Failed to get chaincode data (%s) for invoked chaincode. Sending %s", shorttxid(msg.Txid), err, pb.ChaincodeMessage_ERROR)
return
Expand All @@ -1215,7 +1231,7 @@ func (handler *Handler) handleModState(msg *pb.ChaincodeMessage) {
chaincodeLogger.Debugf("[%s] launching chaincode %s on channel %s",
shorttxid(msg.Txid), calledCcIns.ChaincodeName, calledCcIns.ChainID)
cciSpec := &pb.ChaincodeInvocationSpec{ChaincodeSpec: chaincodeSpec}
_, chaincodeInput, launchErr := handler.chaincodeSupport.Launch(ctxt, cccid, cciSpec)
_, chaincodeInput, launchErr := handler.handlerSupport.Launch(ctxt, cccid, cciSpec)
if launchErr != nil {
payload := []byte(launchErr.Error())
chaincodeLogger.Debugf("[%s]Failed to launch invoked chaincode. Sending %s",
Expand All @@ -1230,7 +1246,7 @@ func (handler *Handler) handleModState(msg *pb.ChaincodeMessage) {
ccMsg, _ := createCCMessage(pb.ChaincodeMessage_TRANSACTION, calledCcIns.ChainID, msg.Txid, chaincodeInput)

// Execute the chaincode... this CANNOT be an init at least for now
response, execErr := handler.chaincodeSupport.Execute(ctxt, cccid, ccMsg, timeout)
response, execErr := handler.handlerSupport.Execute(ctxt, cccid, ccMsg, timeout)

//payload is marshalled and send to the calling chaincode's shim which unmarshals and
//sends it to chaincode
Expand Down

0 comments on commit 1c4ff73

Please sign in to comment.