From fcfd27e3377dcf4c64fc200191f562bf33e39971 Mon Sep 17 00:00:00 2001 From: "Mark S. Lewis" Date: Tue, 31 Aug 2021 17:59:04 +0100 Subject: [PATCH] Refactor ChaincodeEvents to use ledger iterator This implementation supports the ability to replay events. It also avoids any possibility of ledger commit processing being blocked by slow client event consumers. Signed-off-by: Mark S. Lewis --- integration/gateway/gateway_suite_test.go | 44 --- integration/gateway/gateway_test.go | 43 +++ internal/pkg/gateway/api.go | 44 ++- internal/pkg/gateway/api_test.go | 336 ++++++++++++++---- .../gateway/commit/chaincodeeventnotifier.go | 166 --------- internal/pkg/gateway/commit/eventer.go | 23 -- internal/pkg/gateway/commit/eventer_test.go | 259 -------------- internal/pkg/gateway/commit/notifier.go | 23 +- internal/pkg/gateway/event/block.go | 100 ++++++ internal/pkg/gateway/event/blockiterator.go | 40 +++ internal/pkg/gateway/event/chaincode.go | 35 ++ .../pkg/gateway/event/chaincodeiterator.go | 83 +++++ internal/pkg/gateway/event/iterator_test.go | 294 +++++++++++++++ .../gateway/event/mocks/resultsiterator.go | 135 +++++++ internal/pkg/gateway/event/transaction.go | 99 ++++++ internal/pkg/gateway/gateway.go | 39 +- internal/pkg/gateway/mocks/discovery.go | 8 +- internal/pkg/gateway/mocks/eventer.go | 119 ------- internal/pkg/gateway/mocks/ledger.go | 294 +++++++++++++++ internal/pkg/gateway/mocks/ledgerprovider.go | 114 ++++++ internal/pkg/gateway/mocks/resultsiterator.go | 135 +++++++ internal/pkg/gateway/peeradapter.go | 20 +- internal/pkg/gateway/peeradapter_test.go | 12 + 23 files changed, 1727 insertions(+), 738 deletions(-) delete mode 100644 internal/pkg/gateway/commit/chaincodeeventnotifier.go delete mode 100644 internal/pkg/gateway/commit/eventer.go delete mode 100644 internal/pkg/gateway/commit/eventer_test.go create mode 100644 internal/pkg/gateway/event/block.go create mode 100644 internal/pkg/gateway/event/blockiterator.go create mode 100644 internal/pkg/gateway/event/chaincode.go create mode 100644 internal/pkg/gateway/event/chaincodeiterator.go create mode 100644 internal/pkg/gateway/event/iterator_test.go create mode 100644 internal/pkg/gateway/event/mocks/resultsiterator.go create mode 100644 internal/pkg/gateway/event/transaction.go delete mode 100644 internal/pkg/gateway/mocks/eventer.go create mode 100644 internal/pkg/gateway/mocks/ledger.go create mode 100644 internal/pkg/gateway/mocks/ledgerprovider.go create mode 100644 internal/pkg/gateway/mocks/resultsiterator.go diff --git a/integration/gateway/gateway_suite_test.go b/integration/gateway/gateway_suite_test.go index b13ef3494e5..c589868d6ba 100644 --- a/integration/gateway/gateway_suite_test.go +++ b/integration/gateway/gateway_suite_test.go @@ -10,12 +10,9 @@ import ( "encoding/json" "testing" - "github.com/hyperledger/fabric-protos-go/common" - "github.com/hyperledger/fabric-protos-go/peer" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/integration" "github.com/hyperledger/fabric/integration/nwo" - "github.com/hyperledger/fabric/protoutil" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) @@ -54,44 +51,3 @@ var _ = SynchronizedAfterSuite(func() { func StartPort() int { return integration.GatewayBasePort.StartPortForNode() } - -func NewProposedTransaction(signingIdentity *nwo.SigningIdentity, channelName, chaincodeName, transactionName string, transientData map[string][]byte, args ...[]byte) (*peer.SignedProposal, string) { - proposal, transactionID := newProposalProto(signingIdentity, channelName, chaincodeName, transactionName, transientData, args...) - signedProposal, err := protoutil.GetSignedProposal(proposal, signingIdentity) - Expect(err).NotTo(HaveOccurred()) - - return signedProposal, transactionID -} - -func newProposalProto(signingIdentity *nwo.SigningIdentity, channelName, chaincodeName, transactionName string, transientData map[string][]byte, args ...[]byte) (*peer.Proposal, string) { - creator, err := signingIdentity.Serialize() - Expect(err).NotTo(HaveOccurred()) - - invocationSpec := &peer.ChaincodeInvocationSpec{ - ChaincodeSpec: &peer.ChaincodeSpec{ - Type: peer.ChaincodeSpec_NODE, - ChaincodeId: &peer.ChaincodeID{Name: chaincodeName}, - Input: &peer.ChaincodeInput{Args: chaincodeArgs(transactionName, args...)}, - }, - } - - result, transactionID, err := protoutil.CreateChaincodeProposalWithTransient( - common.HeaderType_ENDORSER_TRANSACTION, - channelName, - invocationSpec, - creator, - transientData, - ) - Expect(err).NotTo(HaveOccurred()) - - return result, transactionID -} - -func chaincodeArgs(transactionName string, args ...[]byte) [][]byte { - result := make([][]byte, len(args)+1) - - result[0] = []byte(transactionName) - copy(result[1:], args) - - return result -} diff --git a/integration/gateway/gateway_test.go b/integration/gateway/gateway_test.go index 4d8ba290b1f..a48c0548051 100644 --- a/integration/gateway/gateway_test.go +++ b/integration/gateway/gateway_test.go @@ -16,9 +16,11 @@ import ( docker "github.com/fsouza/go-dockerclient" "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric-protos-go/common" "github.com/hyperledger/fabric-protos-go/gateway" "github.com/hyperledger/fabric-protos-go/peer" "github.com/hyperledger/fabric/integration/nwo" + "github.com/hyperledger/fabric/protoutil" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/tedsuo/ifrit" @@ -27,6 +29,47 @@ import ( "google.golang.org/grpc/status" ) +func NewProposedTransaction(signingIdentity *nwo.SigningIdentity, channelName, chaincodeName, transactionName string, transientData map[string][]byte, args ...[]byte) (*peer.SignedProposal, string) { + proposal, transactionID := newProposalProto(signingIdentity, channelName, chaincodeName, transactionName, transientData, args...) + signedProposal, err := protoutil.GetSignedProposal(proposal, signingIdentity) + Expect(err).NotTo(HaveOccurred()) + + return signedProposal, transactionID +} + +func newProposalProto(signingIdentity *nwo.SigningIdentity, channelName, chaincodeName, transactionName string, transientData map[string][]byte, args ...[]byte) (*peer.Proposal, string) { + creator, err := signingIdentity.Serialize() + Expect(err).NotTo(HaveOccurred()) + + invocationSpec := &peer.ChaincodeInvocationSpec{ + ChaincodeSpec: &peer.ChaincodeSpec{ + Type: peer.ChaincodeSpec_NODE, + ChaincodeId: &peer.ChaincodeID{Name: chaincodeName}, + Input: &peer.ChaincodeInput{Args: chaincodeArgs(transactionName, args...)}, + }, + } + + result, transactionID, err := protoutil.CreateChaincodeProposalWithTransient( + common.HeaderType_ENDORSER_TRANSACTION, + channelName, + invocationSpec, + creator, + transientData, + ) + Expect(err).NotTo(HaveOccurred()) + + return result, transactionID +} + +func chaincodeArgs(transactionName string, args ...[]byte) [][]byte { + result := make([][]byte, len(args)+1) + + result[0] = []byte(transactionName) + copy(result[1:], args) + + return result +} + var _ = Describe("GatewayService", func() { var ( testDir string diff --git a/internal/pkg/gateway/api.go b/internal/pkg/gateway/api.go index 86ad6ffb294..9942498cfd5 100644 --- a/internal/pkg/gateway/api.go +++ b/internal/pkg/gateway/api.go @@ -16,6 +16,7 @@ import ( gp "github.com/hyperledger/fabric-protos-go/gateway" "github.com/hyperledger/fabric-protos-go/peer" "github.com/hyperledger/fabric/core/aclmgmt/resources" + "github.com/hyperledger/fabric/internal/pkg/gateway/event" "github.com/hyperledger/fabric/protoutil" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -355,21 +356,46 @@ func (gs *Server) ChaincodeEvents(signedRequest *gp.SignedChaincodeEventsRequest return status.Error(codes.PermissionDenied, err.Error()) } - events, err := gs.eventer.ChaincodeEvents(stream.Context(), request.ChannelId, request.ChaincodeId) + ledger, err := gs.ledgerProvider.Ledger(request.ChannelId) if err != nil { - return status.Error(codes.FailedPrecondition, err.Error()) + return status.Error(codes.InvalidArgument, err.Error()) } - for event := range events { - response := &gp.ChaincodeEventsResponse{ - BlockNumber: event.BlockNumber, - Events: event.Events, + ledgerInfo, err := ledger.GetBlockchainInfo() + if err != nil { + return status.Error(codes.Unavailable, err.Error()) + } + + ledgerIter, err := ledger.GetBlocksIterator(ledgerInfo.Height) + if err != nil { + return status.Error(codes.Unavailable, err.Error()) + } + + eventsIter := event.NewChaincodeEventsIterator(ledgerIter) + defer eventsIter.Close() + + for { + response, err := eventsIter.Next() + if err != nil { + return status.Error(codes.Unavailable, err.Error()) } + + var matchingEvents []*peer.ChaincodeEvent + + for _, event := range response.Events { + if event.GetChaincodeId() == request.ChaincodeId { + matchingEvents = append(matchingEvents, event) + } + } + + if len(matchingEvents) == 0 { + continue + } + + response.Events = matchingEvents + if err := stream.Send(response); err != nil { return err // Likely stream closed by the client } } - - // If stream is still open, this was a server-side error; otherwise client won't see it anyway - return status.Error(codes.Unavailable, "failed to read events") } diff --git a/internal/pkg/gateway/api_test.go b/internal/pkg/gateway/api_test.go index 0bf96f515fd..a61ac0c751f 100644 --- a/internal/pkg/gateway/api_test.go +++ b/internal/pkg/gateway/api_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes/timestamp" cp "github.com/hyperledger/fabric-protos-go/common" dp "github.com/hyperledger/fabric-protos-go/discovery" pb "github.com/hyperledger/fabric-protos-go/gateway" @@ -21,6 +22,7 @@ import ( ab "github.com/hyperledger/fabric-protos-go/orderer" "github.com/hyperledger/fabric-protos-go/peer" "github.com/hyperledger/fabric/common/crypto/tlsgen" + commonledger "github.com/hyperledger/fabric/common/ledger" "github.com/hyperledger/fabric/gossip/api" "github.com/hyperledger/fabric/gossip/common" gdiscovery "github.com/hyperledger/fabric/gossip/discovery" @@ -63,11 +65,6 @@ type commitFinder interface { CommitFinder } -//go:generate counterfeiter -o mocks/eventer.go --fake-name Eventer . eventer -type eventer interface { - Eventer -} - //go:generate counterfeiter -o mocks/chaincodeeventsserver.go --fake-name ChaincodeEventsServer github.com/hyperledger/fabric-protos-go/gateway.Gateway_ChaincodeEventsServer //go:generate counterfeiter -o mocks/aclchecker.go --fake-name ACLChecker . aclChecker @@ -75,6 +72,21 @@ type aclChecker interface { ACLChecker } +//go:generate counterfeiter -o mocks/ledgerprovider.go --fake-name LedgerProvider . ledgerProvider +type ledgerProvider interface { + LedgerProvider +} + +//go:generate counterfeiter -o mocks/ledger.go --fake-name Ledger . mockLedger +type mockLedger interface { + commonledger.Ledger +} + +//go:generate counterfeiter -o mocks/resultsiterator.go --fake-name ResultsIterator . resultsIterator +type mockResultsIterator interface { + commonledger.ResultsIterator +} + type ( endorsementPlan map[string][]endorserState endorsementLayout map[string]uint32 @@ -124,16 +136,17 @@ type testDef struct { endpointDefinition *endpointDef endorsingOrgs []string postSetup func(t *testing.T, def *preparedTest) + postTest func(t *testing.T, def *preparedTest) expectedEndorsers []string finderStatus *commit.Status finderErr error - chaincodeEvents []*commit.BlockChaincodeEvents eventErr error policyErr error expectedResponse proto.Message expectedResponses []proto.Message transientData map[string][]byte interest *peer.ChaincodeInterest + blocks []*cp.Block } type preparedTest struct { @@ -144,9 +157,11 @@ type preparedTest struct { discovery *mocks.Discovery dialer *mocks.Dialer finder *mocks.CommitFinder - eventer *mocks.Eventer eventsServer *mocks.ChaincodeEventsServer policy *mocks.ACLChecker + ledgerProvider *mocks.LedgerProvider + ledger *mocks.Ledger + blockIterator *mocks.ResultsIterator } type contextKey string @@ -962,76 +977,234 @@ func TestCommitStatus(t *testing.T) { } func TestChaincodeEvents(t *testing.T) { - closedEventsChannel := make(chan *commit.BlockChaincodeEvents) - close(closedEventsChannel) + now := time.Now() + transactionId := "TRANSACTION_ID" + + matchChaincodeEvent := &peer.ChaincodeEvent{ + ChaincodeId: testChaincode, + TxId: transactionId, + EventName: "EVENT_NAME", + Payload: []byte("PAYLOAD"), + } + + mismatchChaincodeEvent := &peer.ChaincodeEvent{ + ChaincodeId: "WRONG_CHAINCODE_ID", + TxId: transactionId, + EventName: "EVENT_NAME", + Payload: []byte("PAYLOAD"), + } + + txHeader := &cp.Header{ + ChannelHeader: protoutil.MarshalOrPanic(&cp.ChannelHeader{ + Type: int32(cp.HeaderType_ENDORSER_TRANSACTION), + Timestamp: ×tamp.Timestamp{ + Seconds: now.Unix(), + Nanos: int32(now.Nanosecond()), + }, + TxId: transactionId, + }), + } + + matchTxEnvelope := &cp.Envelope{ + Payload: protoutil.MarshalOrPanic(&cp.Payload{ + Header: txHeader, + Data: protoutil.MarshalOrPanic(&peer.Transaction{ + Actions: []*peer.TransactionAction{ + { + Payload: protoutil.MarshalOrPanic(&peer.ChaincodeActionPayload{ + Action: &peer.ChaincodeEndorsedAction{ + ProposalResponsePayload: protoutil.MarshalOrPanic(&peer.ProposalResponsePayload{ + Extension: protoutil.MarshalOrPanic(&peer.ChaincodeAction{ + Events: protoutil.MarshalOrPanic(matchChaincodeEvent), + }), + }), + }, + }), + }, + }, + }), + }), + } + + mismatchTxEnvelope := &cp.Envelope{ + Payload: protoutil.MarshalOrPanic(&cp.Payload{ + Header: txHeader, + Data: protoutil.MarshalOrPanic(&peer.Transaction{ + Actions: []*peer.TransactionAction{ + { + Payload: protoutil.MarshalOrPanic(&peer.ChaincodeActionPayload{ + Action: &peer.ChaincodeEndorsedAction{ + ProposalResponsePayload: protoutil.MarshalOrPanic(&peer.ProposalResponsePayload{ + Extension: protoutil.MarshalOrPanic(&peer.ChaincodeAction{ + Events: protoutil.MarshalOrPanic(mismatchChaincodeEvent), + }), + }), + }, + }), + }, + }, + }), + }), + } + + block100Proto := &cp.Block{ + Header: &cp.BlockHeader{ + Number: 100, + }, + Metadata: &cp.BlockMetadata{ + Metadata: [][]byte{ + nil, + nil, + { + byte(peer.TxValidationCode_VALID), + }, + nil, + nil, + }, + }, + Data: &cp.BlockData{ + Data: [][]byte{ + protoutil.MarshalOrPanic(mismatchTxEnvelope), + }, + }, + } + + block101Proto := &cp.Block{ + Header: &cp.BlockHeader{ + Number: 101, + }, + Metadata: &cp.BlockMetadata{ + Metadata: [][]byte{ + nil, + nil, + { + byte(peer.TxValidationCode_VALID), + byte(peer.TxValidationCode_VALID), + byte(peer.TxValidationCode_VALID), + }, + nil, + nil, + }, + }, + Data: &cp.BlockData{ + Data: [][]byte{ + protoutil.MarshalOrPanic(&cp.Envelope{ + Payload: protoutil.MarshalOrPanic(&cp.Payload{ + Header: &cp.Header{ + ChannelHeader: protoutil.MarshalOrPanic(&cp.ChannelHeader{ + Type: int32(cp.HeaderType_CONFIG_UPDATE), + }), + }, + }), + }), + protoutil.MarshalOrPanic(mismatchTxEnvelope), + protoutil.MarshalOrPanic(matchTxEnvelope), + }, + }, + } tests := []testDef{ { - name: "error establishing event reading", + name: "error reading events", eventErr: errors.New("EVENT_ERROR"), - errString: "rpc error: code = FailedPrecondition desc = EVENT_ERROR", + errString: "rpc error: code = Unavailable desc = EVENT_ERROR", }, { name: "returns chaincode events", - chaincodeEvents: []*commit.BlockChaincodeEvents{ - { - BlockNumber: 101, + blocks: []*cp.Block{ + block101Proto, + }, + expectedResponses: []proto.Message{ + &pb.ChaincodeEventsResponse{ + BlockNumber: block101Proto.GetHeader().GetNumber(), Events: []*peer.ChaincodeEvent{ { ChaincodeId: testChaincode, - TxId: "TX_ID", - EventName: "EVENT_NAME", - Payload: []byte("PAYLOAD"), + TxId: matchChaincodeEvent.GetTxId(), + EventName: matchChaincodeEvent.GetEventName(), + Payload: matchChaincodeEvent.GetPayload(), }, }, }, }, + }, + { + name: "skips blocks containing only non-matching chaincode events", + blocks: []*cp.Block{ + block100Proto, + block101Proto, + }, expectedResponses: []proto.Message{ &pb.ChaincodeEventsResponse{ - BlockNumber: 101, + BlockNumber: block101Proto.GetHeader().GetNumber(), Events: []*peer.ChaincodeEvent{ { ChaincodeId: testChaincode, - TxId: "TX_ID", - EventName: "EVENT_NAME", - Payload: []byte("PAYLOAD"), + TxId: matchChaincodeEvent.GetTxId(), + EventName: matchChaincodeEvent.GetEventName(), + Payload: matchChaincodeEvent.GetPayload(), }, }, }, }, }, { - name: "passes channel name to eventer", + name: "passes channel name to ledger provider", + postTest: func(t *testing.T, test *preparedTest) { + require.Equal(t, 1, test.ledgerProvider.LedgerCallCount()) + require.Equal(t, testChannel, test.ledgerProvider.LedgerArgsForCall(0)) + }, + }, + { + name: "returns error obtaining ledger", + blocks: []*cp.Block{ + block101Proto, + }, + errString: "rpc error: code = InvalidArgument desc = LEDGER_PROVIDER_ERROR", + postSetup: func(t *testing.T, test *preparedTest) { + test.ledgerProvider.LedgerReturns(nil, errors.New("LEDGER_PROVIDER_ERROR")) + }, + }, + { + name: "returns error obtaining ledger height", + blocks: []*cp.Block{ + block101Proto, + }, + errString: "rpc error: code = Unavailable desc = LEDGER_INFO_ERROR", postSetup: func(t *testing.T, test *preparedTest) { - test.eventer.ChaincodeEventsCalls(func(ctx context.Context, channelName string, chaincodeName string) (<-chan *commit.BlockChaincodeEvents, error) { - require.Equal(t, testChannel, channelName) - return closedEventsChannel, nil - }) + test.ledger.GetBlockchainInfoReturns(nil, errors.New("LEDGER_INFO_ERROR")) }, }, { - name: "passes chaincode ID to eventer", + name: "uses block height as default start block", + blocks: []*cp.Block{ + block101Proto, + }, postSetup: func(t *testing.T, test *preparedTest) { - test.eventer.ChaincodeEventsCalls(func(ctx context.Context, channelName string, chaincodeName string) (<-chan *commit.BlockChaincodeEvents, error) { - require.Equal(t, testChaincode, chaincodeName) - return closedEventsChannel, nil - }) + ledgerInfo := &cp.BlockchainInfo{ + Height: 101, + } + test.ledger.GetBlockchainInfoReturns(ledgerInfo, nil) + }, + postTest: func(t *testing.T, test *preparedTest) { + require.Equal(t, 1, test.ledger.GetBlocksIteratorCallCount()) + require.EqualValues(t, 101, test.ledger.GetBlocksIteratorArgsForCall(0)) + }, + }, + { + name: "returns error obtaining ledger iterator", + blocks: []*cp.Block{ + block101Proto, + }, + errString: "rpc error: code = Unavailable desc = LEDGER_ITERATOR_ERROR", + postSetup: func(t *testing.T, test *preparedTest) { + test.ledger.GetBlocksIteratorReturns(nil, errors.New("LEDGER_ITERATOR_ERROR")) }, }, { name: "returns error from send to client", - chaincodeEvents: []*commit.BlockChaincodeEvents{ - { - BlockNumber: 101, - Events: []*peer.ChaincodeEvent{ - { - ChaincodeId: testChaincode, - TxId: "TX_ID", - EventName: "EVENT_NAME", - Payload: []byte("PAYLOAD"), - }, - }, - }, + blocks: []*cp.Block{ + block101Proto, }, errString: "rpc error: code = Aborted desc = SEND_ERROR", postSetup: func(t *testing.T, test *preparedTest) { @@ -1045,29 +1218,23 @@ func TestChaincodeEvents(t *testing.T) { }, { name: "passes channel name to policy checker", - postSetup: func(t *testing.T, test *preparedTest) { - test.policy.CheckACLCalls(func(policyName string, channelName string, data interface{}) error { - require.Equal(t, testChannel, channelName) - return nil - }) + postTest: func(t *testing.T, test *preparedTest) { + require.Equal(t, 1, test.policy.CheckACLCallCount()) + _, channelName, _ := test.policy.CheckACLArgsForCall(0) + require.Equal(t, testChannel, channelName) }, }, { name: "passes identity to policy checker", identity: []byte("IDENTITY"), - postSetup: func(t *testing.T, test *preparedTest) { - test.policy.CheckACLCalls(func(policyName string, channelName string, data interface{}) error { - require.IsType(t, &protoutil.SignedData{}, data) - signedData := data.(*protoutil.SignedData) - require.Equal(t, []byte("IDENTITY"), signedData.Identity) - return nil - }) + postTest: func(t *testing.T, test *preparedTest) { + require.Equal(t, 1, test.policy.CheckACLCallCount()) + _, _, data := test.policy.CheckACLArgsForCall(0) + require.IsType(t, &protoutil.SignedData{}, data) + signedData := data.(*protoutil.SignedData) + require.Equal(t, []byte("IDENTITY"), signedData.Identity) }, }, - { - name: "error when no more events can be read", - errString: "rpc error: code = Unavailable desc = failed to read events", - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -1095,7 +1262,11 @@ func TestChaincodeEvents(t *testing.T) { for i, expectedResponse := range tt.expectedResponses { actualResponse := test.eventsServer.SendArgsForCall(i) - require.True(t, proto.Equal(expectedResponse, actualResponse)) + require.True(t, proto.Equal(expectedResponse, actualResponse), "response[%d] mismatch: %v", i, actualResponse) + } + + if tt.postTest != nil { + tt.postTest(t, test) } }) } @@ -1106,8 +1277,8 @@ func TestNilArgs(t *testing.T) { &mocks.EndorserClient{}, &mocks.Discovery{}, &mocks.CommitFinder{}, - &mocks.Eventer{}, &mocks.ACLChecker{}, + &mocks.LedgerProvider{}, common.PKIidType("id1"), "localhost:7051", "msp1", @@ -1164,17 +1335,38 @@ func prepareTest(t *testing.T, tt *testDef) *preparedTest { mockFinder := &mocks.CommitFinder{} mockFinder.TransactionStatusReturns(tt.finderStatus, tt.finderErr) - eventChannel := make(chan *commit.BlockChaincodeEvents, len(tt.chaincodeEvents)) - for _, event := range tt.chaincodeEvents { - eventChannel <- event - } - close(eventChannel) - mockEventer := &mocks.Eventer{} - mockEventer.ChaincodeEventsReturns(eventChannel, tt.eventErr) - mockPolicy := &mocks.ACLChecker{} mockPolicy.CheckACLReturns(tt.policyErr) + mockBlockIterator := &mocks.ResultsIterator{} + blockChannel := make(chan *cp.Block, len(tt.blocks)) + for _, block := range tt.blocks { + blockChannel <- block + } + close(blockChannel) + mockBlockIterator.NextCalls(func() (commonledger.QueryResult, error) { + if tt.eventErr != nil { + return nil, tt.eventErr + } + + block := <-blockChannel + if block == nil { + return nil, errors.New("NO_MORE_BLOCKS") + } + + return block, nil + }) + + mockLedger := &mocks.Ledger{} + ledgerInfo := &cp.BlockchainInfo{ + Height: 1, + } + mockLedger.GetBlockchainInfoReturns(ledgerInfo, nil) + mockLedger.GetBlocksIteratorReturns(mockBlockIterator, nil) + + mockLedgerProvider := &mocks.LedgerProvider{} + mockLedgerProvider.LedgerReturns(mockLedger, nil) + validProposal := createProposal(t, testChannel, testChaincode, tt.transientData) validSignedProposal, err := protoutil.GetSignedProposal(validProposal, mockSigner) require.NoError(t, err) @@ -1215,7 +1407,7 @@ func prepareTest(t *testing.T, tt *testDef) *preparedTest { EndorsementTimeout: endorsementTimeout, } - server := newServer(localEndorser, disc, mockFinder, mockEventer, mockPolicy, common.PKIidType("id1"), "localhost:7051", "msp1", options) + server := newServer(localEndorser, disc, mockFinder, mockPolicy, mockLedgerProvider, common.PKIidType("id1"), "localhost:7051", "msp1", options) dialer := &mocks.Dialer{} dialer.Returns(nil, nil) @@ -1232,9 +1424,11 @@ func prepareTest(t *testing.T, tt *testDef) *preparedTest { discovery: disc, dialer: dialer, finder: mockFinder, - eventer: mockEventer, eventsServer: &mocks.ChaincodeEventsServer{}, policy: mockPolicy, + ledgerProvider: mockLedgerProvider, + ledger: mockLedger, + blockIterator: mockBlockIterator, } if tt.postSetup != nil { tt.postSetup(t, pt) diff --git a/internal/pkg/gateway/commit/chaincodeeventnotifier.go b/internal/pkg/gateway/commit/chaincodeeventnotifier.go deleted file mode 100644 index 1b3800aa208..00000000000 --- a/internal/pkg/gateway/commit/chaincodeeventnotifier.go +++ /dev/null @@ -1,166 +0,0 @@ -/* -Copyright IBM Corp. All Rights Reserved. - -SPDX-License-Identifier: Apache-2.0 -*/ - -package commit - -import ( - "sync" - - "github.com/golang/protobuf/proto" - "github.com/hyperledger/fabric-protos-go/peer" - "github.com/hyperledger/fabric/core/ledger" -) - -type BlockChaincodeEvents struct { - BlockNumber uint64 - Events []*peer.ChaincodeEvent -} - -type chaincodeEventListenerSet map[*chaincodeEventListener]struct{} - -type chaincodeEventNotifier struct { - lock sync.Mutex - listenersByChaincodeName map[string]chaincodeEventListenerSet - closed bool -} - -func newChaincodeEventNotifier() *chaincodeEventNotifier { - return &chaincodeEventNotifier{ - listenersByChaincodeName: make(map[string]chaincodeEventListenerSet), - } -} - -func (notifier *chaincodeEventNotifier) ReceiveBlock(blockEvent *ledger.CommitNotification) { - notifier.removeCompletedListeners() - - chaincodeEvents := getEventsByChaincodeName(blockEvent) - notifier.notify(chaincodeEvents) -} - -func (notifier *chaincodeEventNotifier) removeCompletedListeners() { - notifier.lock.Lock() - defer notifier.lock.Unlock() - - for chaincodeName, listeners := range notifier.listenersByChaincodeName { - for listener := range listeners { - if listener.isDone() { - notifier.removeListener(chaincodeName, listener) - } - } - } -} - -func (notifier *chaincodeEventNotifier) removeListener(chaincodeName string, listener *chaincodeEventListener) { - listener.close() - - listeners := notifier.listenersByChaincodeName[chaincodeName] - delete(listeners, listener) - - if len(listeners) == 0 { - delete(notifier.listenersByChaincodeName, chaincodeName) - } -} - -func getEventsByChaincodeName(blockEvent *ledger.CommitNotification) map[string]*BlockChaincodeEvents { - results := make(map[string]*BlockChaincodeEvents) - - for _, txInfo := range blockEvent.TxsInfo { - if txInfo.ChaincodeEventData != nil { - event := &peer.ChaincodeEvent{} - if err := proto.Unmarshal(txInfo.ChaincodeEventData, event); err != nil { - continue - } - - events := results[event.ChaincodeId] - if events == nil { - events = &BlockChaincodeEvents{ - BlockNumber: blockEvent.BlockNumber, - } - results[event.ChaincodeId] = events - } - - events.Events = append(events.Events, event) - } - } - - return results -} - -func (notifier *chaincodeEventNotifier) notify(eventsByChaincodeName map[string]*BlockChaincodeEvents) { - notifier.lock.Lock() - defer notifier.lock.Unlock() - - for chaincodeName, events := range eventsByChaincodeName { - for listener := range notifier.listenersByChaincodeName[chaincodeName] { - listener.receive(events) - } - } -} - -func (notifier *chaincodeEventNotifier) registerListener(done <-chan struct{}, chaincodeName string) <-chan *BlockChaincodeEvents { - notifyChannel := make(chan *BlockChaincodeEvents, 100) // Avoid blocking by buffering a number of blocks - - notifier.lock.Lock() - defer notifier.lock.Unlock() - - if notifier.closed { - close(notifyChannel) - } else { - listener := &chaincodeEventListener{ - done: done, - notifyChannel: notifyChannel, - } - notifier.listenersForChaincodeName(chaincodeName)[listener] = struct{}{} - } - - return notifyChannel -} - -func (notifier *chaincodeEventNotifier) listenersForChaincodeName(chaincodeName string) chaincodeEventListenerSet { - listeners, exists := notifier.listenersByChaincodeName[chaincodeName] - if !exists { - listeners = make(chaincodeEventListenerSet) - notifier.listenersByChaincodeName[chaincodeName] = listeners - } - - return listeners -} - -func (notifier *chaincodeEventNotifier) Close() { - notifier.lock.Lock() - defer notifier.lock.Unlock() - - for _, listeners := range notifier.listenersByChaincodeName { - for listener := range listeners { - listener.close() - } - } - - notifier.listenersByChaincodeName = nil - notifier.closed = true -} - -type chaincodeEventListener struct { - done <-chan struct{} - notifyChannel chan<- *BlockChaincodeEvents -} - -func (listener *chaincodeEventListener) isDone() bool { - select { - case <-listener.done: - return true - default: - return false - } -} - -func (listener *chaincodeEventListener) close() { - close(listener.notifyChannel) -} - -func (listener *chaincodeEventListener) receive(events *BlockChaincodeEvents) { - listener.notifyChannel <- events -} diff --git a/internal/pkg/gateway/commit/eventer.go b/internal/pkg/gateway/commit/eventer.go deleted file mode 100644 index 8f484d8be7f..00000000000 --- a/internal/pkg/gateway/commit/eventer.go +++ /dev/null @@ -1,23 +0,0 @@ -/* -Copyright IBM Corp. All Rights Reserved. - -SPDX-License-Identifier: Apache-2.0 -*/ - -package commit - -import "context" - -type Eventer struct { - notifier *Notifier -} - -func NewEventer(notifier *Notifier) *Eventer { - return &Eventer{ - notifier: notifier, - } -} - -func (e *Eventer) ChaincodeEvents(ctx context.Context, channelName string, chaincodeName string) (<-chan *BlockChaincodeEvents, error) { - return e.notifier.notifyChaincodeEvents(ctx.Done(), channelName, chaincodeName) -} diff --git a/internal/pkg/gateway/commit/eventer_test.go b/internal/pkg/gateway/commit/eventer_test.go deleted file mode 100644 index b9a14a5ba37..00000000000 --- a/internal/pkg/gateway/commit/eventer_test.go +++ /dev/null @@ -1,259 +0,0 @@ -/* -Copyright IBM Corp. All Rights Reserved. - -SPDX-License-Identifier: Apache-2.0 -*/ - -package commit - -import ( - "context" - "testing" - - "github.com/hyperledger/fabric-protos-go/peer" - "github.com/hyperledger/fabric/core/ledger" - "github.com/hyperledger/fabric/internal/pkg/gateway/commit/mocks" - "github.com/pkg/errors" - "github.com/stretchr/testify/require" -) - -func TestEventer(t *testing.T) { - t.Run("realtime events", func(t *testing.T) { - t.Run("returns error from notification supplier", func(t *testing.T) { - supplier := &mocks.NotificationSupplier{} - supplier.CommitNotificationsReturns(nil, errors.New("MY_ERROR")) - notifier := NewNotifier(supplier) - defer notifier.close() - eventer := NewEventer(notifier) - - _, err := eventer.ChaincodeEvents(context.Background(), "CHANNEL_NAME", "CHAINCODE_NAME") - - require.ErrorContains(t, err, "MY_ERROR") - }) - - t.Run("delivers events for matching chaincode", func(t *testing.T) { - commitSend := make(chan *ledger.CommitNotification, 1) - notifier := newTestNotifier(commitSend) - defer notifier.close() - eventer := NewEventer(notifier) - - eventReceive, err := eventer.ChaincodeEvents(context.Background(), "CHANNEL_NAME", "CHAINCODE_NAME") - require.NoError(t, err) - - chaincodeEvent := newTestChaincodeEvent("CHAINCODE_NAME") - commitSend <- &ledger.CommitNotification{ - BlockNumber: 1, - TxsInfo: []*ledger.CommitNotificationTxInfo{ - { - ChaincodeEventData: assertMarshallProto(t, chaincodeEvent), - }, - }, - } - actual := <-eventReceive - - expectedEvents := []*peer.ChaincodeEvent{ - chaincodeEvent, - } - require.Equal(t, uint64(1), actual.BlockNumber, "block number") - assertEqualChaincodeEvents(t, expectedEvents, actual.Events) - }) - - t.Run("ignores events for non-matching chaincode", func(t *testing.T) { - commitSend := make(chan *ledger.CommitNotification, 1) - notifier := newTestNotifier(commitSend) - defer notifier.close() - eventer := NewEventer(notifier) - - eventReceive, err := eventer.ChaincodeEvents(context.Background(), "CHANNEL_NAME", "CHAINCODE_NAME") - require.NoError(t, err) - - chaincodeEvent := newTestChaincodeEvent("CHAINCODE_NAME") - commitSend <- &ledger.CommitNotification{ - BlockNumber: 1, - TxsInfo: []*ledger.CommitNotificationTxInfo{ - { - ChaincodeEventData: assertMarshallProto(t, chaincodeEvent), - }, - { - ChaincodeEventData: assertMarshallProto(t, newTestChaincodeEvent("WRONG")), - }, - { - ChaincodeEventData: assertMarshallProto(t, chaincodeEvent), - }, - }, - } - actual := <-eventReceive - - expectedEvents := []*peer.ChaincodeEvent{ - chaincodeEvent, - chaincodeEvent, - } - require.Equal(t, uint64(1), actual.BlockNumber, "block number") - assertEqualChaincodeEvents(t, expectedEvents, actual.Events) - }) - - t.Run("delivers events for multiple blocks", func(t *testing.T) { - commitSend := make(chan *ledger.CommitNotification, 1) - notifier := newTestNotifier(commitSend) - defer notifier.close() - eventer := NewEventer(notifier) - - eventReceive, err := eventer.ChaincodeEvents(context.Background(), "CHANNEL_NAME", "CHAINCODE_NAME") - require.NoError(t, err) - - chaincodeEvent := newTestChaincodeEvent("CHAINCODE_NAME") - commitSend <- &ledger.CommitNotification{ - BlockNumber: 1, - TxsInfo: []*ledger.CommitNotificationTxInfo{ - { - ChaincodeEventData: assertMarshallProto(t, chaincodeEvent), - }, - }, - } - - expectedEvents := []*peer.ChaincodeEvent{ - chaincodeEvent, - } - - actual1 := <-eventReceive - require.Equal(t, uint64(1), actual1.BlockNumber, "block number") - assertEqualChaincodeEvents(t, expectedEvents, actual1.Events) - - commitSend <- &ledger.CommitNotification{ - BlockNumber: 2, - TxsInfo: []*ledger.CommitNotificationTxInfo{ - { - ChaincodeEventData: assertMarshallProto(t, chaincodeEvent), - }, - }, - } - actual2 := <-eventReceive - - require.Equal(t, uint64(2), actual2.BlockNumber, "block number") - assertEqualChaincodeEvents(t, expectedEvents, actual2.Events) - }) - - t.Run("ignores blocks with no events", func(t *testing.T) { - commitSend := make(chan *ledger.CommitNotification, 2) - notifier := newTestNotifier(commitSend) - defer notifier.close() - eventer := NewEventer(notifier) - - eventReceive, err := eventer.ChaincodeEvents(context.Background(), "CHANNEL_NAME", "CHAINCODE_NAME") - require.NoError(t, err) - - chaincodeEvent := newTestChaincodeEvent("CHAINCODE_NAME") - commitSend <- &ledger.CommitNotification{ - BlockNumber: 1, - } - commitSend <- &ledger.CommitNotification{ - BlockNumber: 2, - TxsInfo: []*ledger.CommitNotificationTxInfo{ - { - ChaincodeEventData: assertMarshallProto(t, chaincodeEvent), - }, - }, - } - - actual := <-eventReceive - - expectedEvents := []*peer.ChaincodeEvent{ - chaincodeEvent, - } - require.Equal(t, uint64(2), actual.BlockNumber, "block number") - assertEqualChaincodeEvents(t, expectedEvents, actual.Events) - }) - - t.Run("ignores blocks with no events matching chaincode name", func(t *testing.T) { - commitSend := make(chan *ledger.CommitNotification, 2) - notifier := newTestNotifier(commitSend) - defer notifier.close() - eventer := NewEventer(notifier) - - eventReceive, err := eventer.ChaincodeEvents(context.Background(), "CHANNEL_NAME", "CHAINCODE_NAME") - require.NoError(t, err) - - chaincodeEvent := newTestChaincodeEvent("CHAINCODE_NAME") - commitSend <- &ledger.CommitNotification{ - BlockNumber: 1, - TxsInfo: []*ledger.CommitNotificationTxInfo{ - { - ChaincodeEventData: assertMarshallProto(t, newTestChaincodeEvent("WRONG")), - }, - }, - } - commitSend <- &ledger.CommitNotification{ - BlockNumber: 2, - TxsInfo: []*ledger.CommitNotificationTxInfo{ - { - ChaincodeEventData: assertMarshallProto(t, chaincodeEvent), - }, - }, - } - - actual := <-eventReceive - - expectedEvents := []*peer.ChaincodeEvent{ - chaincodeEvent, - } - require.Equal(t, uint64(2), actual.BlockNumber, "block number") - assertEqualChaincodeEvents(t, expectedEvents, actual.Events) - }) - - t.Run("delivers events to multiple listeners", func(t *testing.T) { - commitSend := make(chan *ledger.CommitNotification, 1) - notifier := newTestNotifier(commitSend) - defer notifier.close() - eventer := NewEventer(notifier) - - eventReceive1, err := eventer.ChaincodeEvents(context.Background(), "CHANNEL_NAME", "CHAINCODE_NAME") - require.NoError(t, err) - eventReceive2, err := eventer.ChaincodeEvents(context.Background(), "CHANNEL_NAME", "CHAINCODE_NAME") - require.NoError(t, err) - - chaincodeEvent := newTestChaincodeEvent("CHAINCODE_NAME") - commitSend <- &ledger.CommitNotification{ - BlockNumber: 1, - TxsInfo: []*ledger.CommitNotificationTxInfo{ - { - ChaincodeEventData: assertMarshallProto(t, chaincodeEvent), - }, - }, - } - actual1 := <-eventReceive1 - actual2 := <-eventReceive2 - - expectedEvents := []*peer.ChaincodeEvent{ - chaincodeEvent, - } - require.Equal(t, uint64(1), actual1.BlockNumber, "block number") - assertEqualChaincodeEvents(t, expectedEvents, actual1.Events) - require.Equal(t, uint64(1), actual2.BlockNumber, "block number") - assertEqualChaincodeEvents(t, expectedEvents, actual2.Events) - }) - - t.Run("stops listening when done channel is closed", func(t *testing.T) { - commitSend := make(chan *ledger.CommitNotification, 1) - notifier := newTestNotifier(commitSend) - defer notifier.close() - eventer := NewEventer(notifier) - - ctx, cancel := context.WithCancel(context.Background()) - eventReceive, err := eventer.ChaincodeEvents(ctx, "CHANNEL_NAME", "CHAINCODE_NAME") - require.NoError(t, err) - - cancel() - commitSend <- &ledger.CommitNotification{ - BlockNumber: 1, - TxsInfo: []*ledger.CommitNotificationTxInfo{ - { - ChaincodeEventData: assertMarshallProto(t, newTestChaincodeEvent("CHAINCODE_NAME")), - }, - }, - } - _, ok := <-eventReceive - - require.False(t, ok, "Expected notification channel to be closed but receive was successful") - }) - }) -} diff --git a/internal/pkg/gateway/commit/notifier.go b/internal/pkg/gateway/commit/notifier.go index dbcb01a1734..7d9e3280c52 100644 --- a/internal/pkg/gateway/commit/notifier.go +++ b/internal/pkg/gateway/commit/notifier.go @@ -19,9 +19,8 @@ type NotificationSupplier interface { } type notifiers struct { - block *blockNotifier - status *statusNotifier - chaincodeEvents *chaincodeEventNotifier + block *blockNotifier + status *statusNotifier } // Notifier provides notification of transaction commits. @@ -53,16 +52,6 @@ func (n *Notifier) notifyStatus(done <-chan struct{}, channelName string, transa return notifyChannel, nil } -func (n *Notifier) notifyChaincodeEvents(done <-chan struct{}, channelName string, chaincodeName string) (<-chan *BlockChaincodeEvents, error) { - notifiers, err := n.notifiersForChannel(channelName) - if err != nil { - return nil, err - } - - notifyChannel := notifiers.chaincodeEvents.registerListener(done, chaincodeName) - return notifyChannel, nil -} - // close the notifier. This closes all notification channels obtained from this notifier. Behavior is undefined after // closing and the notifier should not be used. func (n *Notifier) close() { @@ -86,12 +75,10 @@ func (n *Notifier) notifiersForChannel(channelName string) (*notifiers, error) { } statusNotifier := newStatusNotifier() - chaincodeEventNotifier := newChaincodeEventNotifier() - blockNotifier := newBlockNotifier(n.cancel, commitChannel, statusNotifier, chaincodeEventNotifier) + blockNotifier := newBlockNotifier(n.cancel, commitChannel, statusNotifier) result = ¬ifiers{ - block: blockNotifier, - status: statusNotifier, - chaincodeEvents: chaincodeEventNotifier, + block: blockNotifier, + status: statusNotifier, } n.notifiersByChannel[channelName] = result diff --git a/internal/pkg/gateway/event/block.go b/internal/pkg/gateway/event/block.go new file mode 100644 index 00000000000..e65c5c2fb8d --- /dev/null +++ b/internal/pkg/gateway/event/block.go @@ -0,0 +1,100 @@ +/* +Copyright IBM Corp. All Rights Reserved. +SPDX-License-Identifier: Apache-2.0 +*/ + +package event + +import ( + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric-protos-go/peer" +) + +type Block struct { + block *common.Block + transactions []*Transaction +} + +func NewBlock(block *common.Block) *Block { + return &Block{ + block: block, + } +} + +func (b *Block) Number() uint64 { + return b.block.GetHeader().GetNumber() +} + +func (b *Block) Transactions() ([]*Transaction, error) { + var err error + + if b.transactions == nil { + b.transactions, err = b.readTransactions() + } + + return b.transactions, err +} + +func (b *Block) readTransactions() ([]*Transaction, error) { + transactions := make([]*Transaction, 0) + + txPayloads, err := b.payloads() + if err != nil { + return nil, err + } + + for i, payload := range txPayloads { + header := &common.ChannelHeader{} + if err := proto.Unmarshal(payload.GetHeader().GetChannelHeader(), header); err != nil { + return nil, err + } + + if header.GetType() == int32(common.HeaderType_ENDORSER_TRANSACTION) { + transaction := &Transaction{ + parent: b, + payload: payload, + id: header.GetTxId(), + timestamp: header.GetTimestamp(), + status: b.statusCode(i), + } + transactions = append(transactions, transaction) + } + } + + return transactions, nil +} + +func (b *Block) payloads() ([]*common.Payload, error) { + var payloads []*common.Payload + + for _, envelopeBytes := range b.block.GetData().GetData() { + envelope := &common.Envelope{} + if err := proto.Unmarshal(envelopeBytes, envelope); err != nil { + return nil, err + } + + payload := &common.Payload{} + if err := proto.Unmarshal(envelope.Payload, payload); err != nil { + return nil, err + } + + payloads = append(payloads, payload) + } + + return payloads, nil +} + +func (b *Block) statusCode(txIndex int) peer.TxValidationCode { + metadata := b.block.GetMetadata().GetMetadata() + if int(common.BlockMetadataIndex_TRANSACTIONS_FILTER) >= len(metadata) { + return peer.TxValidationCode_INVALID_OTHER_REASON + } + + statusCodes := metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] + if txIndex >= len(statusCodes) { + return peer.TxValidationCode_INVALID_OTHER_REASON + } + + return peer.TxValidationCode(statusCodes[txIndex]) +} diff --git a/internal/pkg/gateway/event/blockiterator.go b/internal/pkg/gateway/event/blockiterator.go new file mode 100644 index 00000000000..56c372c2b02 --- /dev/null +++ b/internal/pkg/gateway/event/blockiterator.go @@ -0,0 +1,40 @@ +/* +Copyright IBM Corp. All Rights Reserved. +SPDX-License-Identifier: Apache-2.0 +*/ + +package event + +import ( + "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric/common/ledger" + "github.com/pkg/errors" +) + +type BlockIterator struct { + ledgerIter ledger.ResultsIterator +} + +func NewBlockIterator(iterator ledger.ResultsIterator) *BlockIterator { + return &BlockIterator{ + ledgerIter: iterator, + } +} + +func (iter *BlockIterator) Next() (*Block, error) { + result, err := iter.ledgerIter.Next() + if err != nil { + return nil, err + } + + switch block := result.(type) { + case *common.Block: + return NewBlock(block), nil + default: + return nil, errors.Errorf("unexpected block type: %T", result) + } +} + +func (iter *BlockIterator) Close() { + iter.ledgerIter.Close() +} diff --git a/internal/pkg/gateway/event/chaincode.go b/internal/pkg/gateway/event/chaincode.go new file mode 100644 index 00000000000..264a26de718 --- /dev/null +++ b/internal/pkg/gateway/event/chaincode.go @@ -0,0 +1,35 @@ +/* +Copyright IBM Corp. All Rights Reserved. +SPDX-License-Identifier: Apache-2.0 +*/ + +package event + +import ( + "github.com/hyperledger/fabric-protos-go/peer" +) + +type ChaincodeEvent struct { + parent *Transaction + message *peer.ChaincodeEvent +} + +func (event *ChaincodeEvent) Transaction() *Transaction { + return event.parent +} + +func (event *ChaincodeEvent) ChaincodeID() string { + return event.message.ChaincodeId +} + +func (event *ChaincodeEvent) EventName() string { + return event.message.EventName +} + +func (event *ChaincodeEvent) Payload() []byte { + return event.message.Payload +} + +func (event *ChaincodeEvent) ProtoMessage() *peer.ChaincodeEvent { + return event.message +} diff --git a/internal/pkg/gateway/event/chaincodeiterator.go b/internal/pkg/gateway/event/chaincodeiterator.go new file mode 100644 index 00000000000..42ff1700876 --- /dev/null +++ b/internal/pkg/gateway/event/chaincodeiterator.go @@ -0,0 +1,83 @@ +/* +Copyright IBM Corp. All Rights Reserved. +SPDX-License-Identifier: Apache-2.0 +*/ + +package event + +import ( + "github.com/hyperledger/fabric-protos-go/gateway" + "github.com/hyperledger/fabric-protos-go/peer" + "github.com/hyperledger/fabric/common/ledger" +) + +type ChaincodeEventsIterator struct { + blockIter *BlockIterator +} + +func NewChaincodeEventsIterator(iterator ledger.ResultsIterator) *ChaincodeEventsIterator { + return &ChaincodeEventsIterator{ + blockIter: NewBlockIterator(iterator), + } +} + +func (iter *ChaincodeEventsIterator) Next() (*gateway.ChaincodeEventsResponse, error) { + for { + result, err := iter.nextBlock() + if err != nil { + return nil, err + } + + if len(result.Events) > 0 { + return result, nil + } + } +} + +func (iter *ChaincodeEventsIterator) nextBlock() (*gateway.ChaincodeEventsResponse, error) { + block, err := iter.blockIter.Next() + if err != nil { + return nil, err + } + + events, err := chaincodeEventsFromBlock(block) + if err != nil { + return nil, err + } + + result := &gateway.ChaincodeEventsResponse{ + BlockNumber: block.Number(), + Events: events, + } + return result, nil +} + +func chaincodeEventsFromBlock(block *Block) ([]*peer.ChaincodeEvent, error) { + transactions, err := block.Transactions() + if err != nil { + return nil, err + } + + var results []*peer.ChaincodeEvent + + for _, transaction := range transactions { + if !transaction.Valid() { + continue + } + + events, err := transaction.ChaincodeEvents() + if err != nil { + return nil, err + } + + for _, event := range events { + results = append(results, event.ProtoMessage()) + } + } + + return results, nil +} + +func (iter *ChaincodeEventsIterator) Close() { + iter.blockIter.Close() +} diff --git a/internal/pkg/gateway/event/iterator_test.go b/internal/pkg/gateway/event/iterator_test.go new file mode 100644 index 00000000000..0a842daf710 --- /dev/null +++ b/internal/pkg/gateway/event/iterator_test.go @@ -0,0 +1,294 @@ +/* +Copyright IBM Corp. All Rights Reserved. +SPDX-License-Identifier: Apache-2.0 +*/ + +package event_test + +import ( + "fmt" + "testing" + "time" + + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes/timestamp" + "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric-protos-go/gateway" + "github.com/hyperledger/fabric-protos-go/peer" + "github.com/hyperledger/fabric/common/ledger" + "github.com/hyperledger/fabric/internal/pkg/gateway/event" + "github.com/hyperledger/fabric/internal/pkg/gateway/event/mocks" + "github.com/hyperledger/fabric/protoutil" + "github.com/pkg/errors" + "github.com/stretchr/testify/require" +) + +//go:generate counterfeiter -o mocks/resultsiterator.go --fake-name ResultsIterator . mockResultsIterator +type mockResultsIterator interface { + ledger.ResultsIterator +} + +func TestIterators(t *testing.T) { + now := time.Now() + transactionId := "TRANSACTION_ID" + + chaincodeEvent := &peer.ChaincodeEvent{ + ChaincodeId: "CHAINCODE_ID", + TxId: transactionId, + EventName: "EVENT_NAME", + Payload: []byte("PAYLOAD"), + } + + txEnvelope := &common.Envelope{ + Payload: protoutil.MarshalOrPanic(&common.Payload{ + Header: &common.Header{ + ChannelHeader: protoutil.MarshalOrPanic(&common.ChannelHeader{ + Type: int32(common.HeaderType_ENDORSER_TRANSACTION), + Timestamp: ×tamp.Timestamp{ + Seconds: now.Unix(), + Nanos: int32(now.Nanosecond()), + }, + TxId: transactionId, + }), + }, + Data: protoutil.MarshalOrPanic(&peer.Transaction{ + Actions: []*peer.TransactionAction{ + { + Payload: protoutil.MarshalOrPanic(&peer.ChaincodeActionPayload{ + Action: &peer.ChaincodeEndorsedAction{ + ProposalResponsePayload: protoutil.MarshalOrPanic(&peer.ProposalResponsePayload{ + Extension: protoutil.MarshalOrPanic(&peer.ChaincodeAction{ + Events: protoutil.MarshalOrPanic(chaincodeEvent), + }), + }), + }, + }), + }, + }, + }), + }), + } + + blockProto := &common.Block{ + Header: &common.BlockHeader{ + Number: 1337, + }, + Metadata: &common.BlockMetadata{ + Metadata: [][]byte{ + nil, + nil, + { + byte(peer.TxValidationCode_MVCC_READ_CONFLICT), + byte(peer.TxValidationCode_VALID), + byte(peer.TxValidationCode_VALID), + }, + nil, + nil, + }, + }, + Data: &common.BlockData{ + Data: [][]byte{ + protoutil.MarshalOrPanic(txEnvelope), + protoutil.MarshalOrPanic(txEnvelope), + protoutil.MarshalOrPanic(&common.Envelope{ + Payload: protoutil.MarshalOrPanic(&common.Payload{ + Header: &common.Header{ + ChannelHeader: protoutil.MarshalOrPanic(&common.ChannelHeader{ + Type: int32(common.HeaderType_CONFIG_UPDATE), + }), + }, + }), + }), + }, + }, + } + + const invalidTxIndex = 0 + const validTxIndex = 1 + + assertExpectedBlock := func(t *testing.T, block *event.Block) { + require.NotNil(t, block, "block") + require.EqualValues(t, blockProto.GetHeader().GetNumber(), block.Number(), "block.Number()") + + transactions, err := block.Transactions() + require.NoError(t, err, "Transactions()") + require.Len(t, transactions, 2, "transactions") + + for txIndex, transaction := range transactions { + require.Equal(t, block, transaction.Block(), "transaction[%d].Block()", txIndex) + require.Equal(t, transactionId, transaction.ID(), "transaction[%d].ID()", txIndex) + require.EqualValues(t, now.Unix(), transaction.Timestamp().Seconds, "transaction[%d].Timestamp.Seconds", txIndex) + require.EqualValues(t, now.Nanosecond(), transaction.Timestamp().Nanos, "transaction[%d].Tomestamp.Nanos", txIndex) + + events, err := transaction.ChaincodeEvents() + require.NoError(t, err, "ChaincodeEvents()") + require.Len(t, events, 1, "chaincodeEvents") + + for eventIndex, event := range events { + require.Equal(t, transaction, event.Transaction(), "transaction[%d].ChaincodeEvents()[%d].Transaction()", txIndex, eventIndex) + require.Equal(t, chaincodeEvent.GetChaincodeId(), event.ChaincodeID(), "transaction[%d].ChaincodeEvents()[%d].ChaincodeID()", txIndex, eventIndex) + require.Equal(t, chaincodeEvent.GetEventName(), event.EventName(), "transaction[%d].ChaincodeEvents()[%d].EventName()", txIndex, eventIndex) + require.EqualValues(t, chaincodeEvent.GetPayload(), event.Payload(), "transaction[%d].ChaincodeEvents()[%d].Payload()", txIndex, eventIndex) + require.True(t, proto.Equal(chaincodeEvent, event.ProtoMessage()), "transaction[%d].ChaincodeEvents()[%d].ProtoMessage(): %v", txIndex, eventIndex, event.ProtoMessage()) + } + } + } + + t.Run("BlockIterator", func(t *testing.T) { + t.Run("Next", func(t *testing.T) { + t.Run("returns error from wrapped iterator", func(t *testing.T) { + resultIter := &mocks.ResultsIterator{} + resultIter.NextReturns(nil, errors.New("MY_ERROR")) + + blockIter := event.NewBlockIterator(resultIter) + _, err := blockIter.Next() + + require.ErrorContains(t, err, "MY_ERROR") + }) + + t.Run("returns error if wrapped iterator returns unexpected type", func(t *testing.T) { + resultIter := &mocks.ResultsIterator{} + result := &common.Envelope{} + resultIter.NextReturns(result, nil) + + blockIter := event.NewBlockIterator(resultIter) + _, err := blockIter.Next() + + require.ErrorContains(t, err, fmt.Sprintf("%T", result)) + }) + + t.Run("returns a block with no transactions", func(t *testing.T) { + resultIter := &mocks.ResultsIterator{} + result := &common.Block{ + Header: &common.BlockHeader{ + Number: 418, + }, + } + resultIter.NextReturns(result, nil) + + blockIter := event.NewBlockIterator(resultIter) + block, err := blockIter.Next() + + require.NoError(t, err, "Next()") + require.NotNil(t, block, "block") + require.EqualValues(t, result.GetHeader().GetNumber(), block.Number(), "Number()") + + transactions, err := block.Transactions() + require.NoError(t, err, "Transactions()") + require.Len(t, transactions, 0, "transactions") + }) + + t.Run("returns a block with invalid transaction", func(t *testing.T) { + resultIter := &mocks.ResultsIterator{} + resultIter.NextReturns(blockProto, nil) + + blockIter := event.NewBlockIterator(resultIter) + block, err := blockIter.Next() + + require.NoError(t, err, "Next()") + assertExpectedBlock(t, block) + + transactions, _ := block.Transactions() + transaction := transactions[invalidTxIndex] + require.Equal(t, peer.TxValidationCode_MVCC_READ_CONFLICT, transaction.Status()) + require.False(t, transaction.Valid(), "Valid()") + }) + + t.Run("returns a block with valid transaction", func(t *testing.T) { + resultIter := &mocks.ResultsIterator{} + resultIter.NextReturns(blockProto, nil) + + blockIter := event.NewBlockIterator(resultIter) + block, err := blockIter.Next() + + require.NoError(t, err, "Next()") + assertExpectedBlock(t, block) + + transactions, _ := block.Transactions() + transaction := transactions[validTxIndex] + require.Equal(t, peer.TxValidationCode_VALID, transaction.Status()) + require.True(t, transaction.Valid(), "Valid()") + }) + }) + + t.Run("Close", func(t *testing.T) { + t.Run("closes wrapped iterator", func(t *testing.T) { + resultIter := &mocks.ResultsIterator{} + + blockIter := event.NewBlockIterator(resultIter) + blockIter.Close() + + require.Equal(t, 1, resultIter.CloseCallCount()) + }) + }) + }) + + t.Run("ChaincodeEventsIterator", func(t *testing.T) { + t.Run("Next", func(t *testing.T) { + t.Run("returns error from wrapped iterator", func(t *testing.T) { + resultIter := &mocks.ResultsIterator{} + resultIter.NextReturns(nil, errors.New("MY_ERROR")) + + eventsIter := event.NewChaincodeEventsIterator(resultIter) + _, err := eventsIter.Next() + + require.ErrorContains(t, err, "MY_ERROR") + }) + }) + + t.Run("only returns events for valid transactions", func(t *testing.T) { + resultIter := &mocks.ResultsIterator{} + resultIter.NextReturns(blockProto, nil) + + eventsIter := event.NewChaincodeEventsIterator(resultIter) + actual, err := eventsIter.Next() + + require.NoError(t, err, "Next()") + require.NotNil(t, actual, "events") + + expected := &gateway.ChaincodeEventsResponse{ + BlockNumber: blockProto.GetHeader().GetNumber(), + Events: []*peer.ChaincodeEvent{ + chaincodeEvent, + }, + } + require.True(t, proto.Equal(expected, actual), "ChaincodeEventsResponse: %v", actual) + }) + + t.Run("skips blocks with no valid chaincode events", func(t *testing.T) { + emptyBlock := &common.Block{ + Header: &common.BlockHeader{ + Number: 418, + }, + } + resultIter := &mocks.ResultsIterator{} + resultIter.NextReturnsOnCall(0, emptyBlock, nil) + resultIter.NextReturnsOnCall(1, blockProto, nil) + + eventsIter := event.NewChaincodeEventsIterator(resultIter) + actual, err := eventsIter.Next() + + require.NoError(t, err, "Next()") + require.NotNil(t, actual, "events") + + expected := &gateway.ChaincodeEventsResponse{ + BlockNumber: blockProto.GetHeader().GetNumber(), + Events: []*peer.ChaincodeEvent{ + chaincodeEvent, + }, + } + require.True(t, proto.Equal(expected, actual), "ChaincodeEventsResponse: %v", actual) + }) + + t.Run("Close", func(t *testing.T) { + t.Run("closes wrapped iterator", func(t *testing.T) { + resultIter := &mocks.ResultsIterator{} + + eventsIter := event.NewChaincodeEventsIterator(resultIter) + eventsIter.Close() + + require.Equal(t, 1, resultIter.CloseCallCount()) + }) + }) + }) +} diff --git a/internal/pkg/gateway/event/mocks/resultsiterator.go b/internal/pkg/gateway/event/mocks/resultsiterator.go new file mode 100644 index 00000000000..a8b0007f4ff --- /dev/null +++ b/internal/pkg/gateway/event/mocks/resultsiterator.go @@ -0,0 +1,135 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mocks + +import ( + "sync" + + "github.com/hyperledger/fabric/common/ledger" +) + +type ResultsIterator struct { + CloseStub func() + closeMutex sync.RWMutex + closeArgsForCall []struct { + } + NextStub func() (ledger.QueryResult, error) + nextMutex sync.RWMutex + nextArgsForCall []struct { + } + nextReturns struct { + result1 ledger.QueryResult + result2 error + } + nextReturnsOnCall map[int]struct { + result1 ledger.QueryResult + result2 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *ResultsIterator) Close() { + fake.closeMutex.Lock() + fake.closeArgsForCall = append(fake.closeArgsForCall, struct { + }{}) + stub := fake.CloseStub + fake.recordInvocation("Close", []interface{}{}) + fake.closeMutex.Unlock() + if stub != nil { + fake.CloseStub() + } +} + +func (fake *ResultsIterator) CloseCallCount() int { + fake.closeMutex.RLock() + defer fake.closeMutex.RUnlock() + return len(fake.closeArgsForCall) +} + +func (fake *ResultsIterator) CloseCalls(stub func()) { + fake.closeMutex.Lock() + defer fake.closeMutex.Unlock() + fake.CloseStub = stub +} + +func (fake *ResultsIterator) Next() (ledger.QueryResult, error) { + fake.nextMutex.Lock() + ret, specificReturn := fake.nextReturnsOnCall[len(fake.nextArgsForCall)] + fake.nextArgsForCall = append(fake.nextArgsForCall, struct { + }{}) + stub := fake.NextStub + fakeReturns := fake.nextReturns + fake.recordInvocation("Next", []interface{}{}) + fake.nextMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *ResultsIterator) NextCallCount() int { + fake.nextMutex.RLock() + defer fake.nextMutex.RUnlock() + return len(fake.nextArgsForCall) +} + +func (fake *ResultsIterator) NextCalls(stub func() (ledger.QueryResult, error)) { + fake.nextMutex.Lock() + defer fake.nextMutex.Unlock() + fake.NextStub = stub +} + +func (fake *ResultsIterator) NextReturns(result1 ledger.QueryResult, result2 error) { + fake.nextMutex.Lock() + defer fake.nextMutex.Unlock() + fake.NextStub = nil + fake.nextReturns = struct { + result1 ledger.QueryResult + result2 error + }{result1, result2} +} + +func (fake *ResultsIterator) NextReturnsOnCall(i int, result1 ledger.QueryResult, result2 error) { + fake.nextMutex.Lock() + defer fake.nextMutex.Unlock() + fake.NextStub = nil + if fake.nextReturnsOnCall == nil { + fake.nextReturnsOnCall = make(map[int]struct { + result1 ledger.QueryResult + result2 error + }) + } + fake.nextReturnsOnCall[i] = struct { + result1 ledger.QueryResult + result2 error + }{result1, result2} +} + +func (fake *ResultsIterator) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.closeMutex.RLock() + defer fake.closeMutex.RUnlock() + fake.nextMutex.RLock() + defer fake.nextMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *ResultsIterator) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} diff --git a/internal/pkg/gateway/event/transaction.go b/internal/pkg/gateway/event/transaction.go new file mode 100644 index 00000000000..f128508a20a --- /dev/null +++ b/internal/pkg/gateway/event/transaction.go @@ -0,0 +1,99 @@ +/* +Copyright IBM Corp. All Rights Reserved. +SPDX-License-Identifier: Apache-2.0 +*/ + +package event + +import ( + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes/timestamp" + "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric-protos-go/peer" +) + +type Transaction struct { + parent *Block + payload *common.Payload + id string + timestamp *timestamp.Timestamp + status peer.TxValidationCode + chaincodeEvents []*ChaincodeEvent +} + +func (tx *Transaction) Block() *Block { + return tx.parent +} + +func (tx *Transaction) ID() string { + return tx.id +} + +func (tx *Transaction) Timestamp() *timestamp.Timestamp { + return tx.timestamp +} + +func (tx *Transaction) Status() peer.TxValidationCode { + return tx.status +} + +func (tx *Transaction) Valid() bool { + return tx.status == peer.TxValidationCode_VALID +} + +func (tx *Transaction) ChaincodeEvents() ([]*ChaincodeEvent, error) { + var err error + + if tx.chaincodeEvents == nil { + tx.chaincodeEvents, err = tx.readChaincodeEvents() + } + + return tx.chaincodeEvents, err +} + +func (tx *Transaction) readChaincodeEvents() ([]*ChaincodeEvent, error) { + transaction := &peer.Transaction{} + if err := proto.Unmarshal(tx.payload.GetData(), transaction); err != nil { + return nil, err + } + + chaincodeEvents := make([]*ChaincodeEvent, 0) + + for _, action := range transaction.GetActions() { + actionPayload := &peer.ChaincodeActionPayload{} + if err := proto.Unmarshal(action.GetPayload(), actionPayload); err != nil { + continue + } + + responsePayload := &peer.ProposalResponsePayload{} + if err := proto.Unmarshal(actionPayload.GetAction().GetProposalResponsePayload(), responsePayload); err != nil { + continue + } + + action := &peer.ChaincodeAction{} + if err := proto.Unmarshal(responsePayload.GetExtension(), action); err != nil { + continue + } + + event := &peer.ChaincodeEvent{} + if err := proto.Unmarshal(action.GetEvents(), event); err != nil { + continue + } + + if !validChaincodeEvent(event) { + continue + } + + chaincodeEvent := &ChaincodeEvent{ + parent: tx, + message: event, + } + chaincodeEvents = append(chaincodeEvents, chaincodeEvent) + } + + return chaincodeEvents, nil +} + +func validChaincodeEvent(event *peer.ChaincodeEvent) bool { + return len(event.GetChaincodeId()) > 0 && len(event.GetEventName()) > 0 && len(event.GetTxId()) > 0 +} diff --git a/internal/pkg/gateway/gateway.go b/internal/pkg/gateway/gateway.go index be957321241..fa6e4bbdebf 100644 --- a/internal/pkg/gateway/gateway.go +++ b/internal/pkg/gateway/gateway.go @@ -10,6 +10,7 @@ import ( peerproto "github.com/hyperledger/fabric-protos-go/peer" "github.com/hyperledger/fabric/common/flogging" + "github.com/hyperledger/fabric/common/ledger" "github.com/hyperledger/fabric/core/peer" "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/internal/pkg/gateway/commit" @@ -21,12 +22,12 @@ var logger = flogging.MustGetLogger("gateway") // Server represents the GRPC server for the Gateway. type Server struct { - registry *registry - commitFinder CommitFinder - eventer Eventer - policy ACLChecker - options config.Options - logger *flogging.FabricLogger + registry *registry + commitFinder CommitFinder + policy ACLChecker + options config.Options + logger *flogging.FabricLogger + ledgerProvider LedgerProvider } type EndorserServerAdapter struct { @@ -41,14 +42,14 @@ type CommitFinder interface { TransactionStatus(ctx context.Context, channelName string, transactionID string) (*commit.Status, error) } -type Eventer interface { - ChaincodeEvents(ctx context.Context, channelName string, chaincodeName string) (<-chan *commit.BlockChaincodeEvents, error) -} - type ACLChecker interface { CheckACL(policyName string, channelName string, data interface{}) error } +type LedgerProvider interface { + Ledger(channelName string) (ledger.Ledger, error) +} + // CreateServer creates an embedded instance of the Gateway. func CreateServer(localEndorser peerproto.EndorserServer, discovery Discovery, peerInstance *peer.Peer, policy ACLChecker, localMSPID string, options config.Options) *Server { adapter := &peerAdapter{ @@ -62,8 +63,8 @@ func CreateServer(localEndorser peerproto.EndorserServer, discovery Discovery, p }, discovery, commit.NewFinder(adapter, notifier), - commit.NewEventer(notifier), policy, + adapter, peerInstance.GossipService.SelfMembershipInfo().PKIid, peerInstance.GossipService.SelfMembershipInfo().Endpoint, localMSPID, @@ -71,8 +72,8 @@ func CreateServer(localEndorser peerproto.EndorserServer, discovery Discovery, p ) } -func newServer(localEndorser peerproto.EndorserClient, discovery Discovery, finder CommitFinder, eventer Eventer, policy ACLChecker, localPKIID common.PKIidType, localEndpoint, localMSPID string, options config.Options) *Server { - gwServer := &Server{ +func newServer(localEndorser peerproto.EndorserClient, discovery Discovery, finder CommitFinder, policy ACLChecker, ledgerProvider LedgerProvider, localPKIID common.PKIidType, localEndpoint, localMSPID string, options config.Options) *Server { + return &Server{ registry: ®istry{ localEndorser: &endorser{client: localEndorser, endpointConfig: &endpointConfig{pkiid: localPKIID, address: localEndpoint, mspid: localMSPID}}, discovery: discovery, @@ -83,12 +84,10 @@ func newServer(localEndorser peerproto.EndorserClient, discovery Discovery, find tlsRootCerts: map[string][][]byte{}, channelsInitialized: map[string]bool{}, }, - commitFinder: finder, - eventer: eventer, - policy: policy, - options: options, - logger: logger, + commitFinder: finder, + policy: policy, + options: options, + logger: logger, + ledgerProvider: ledgerProvider, } - - return gwServer } diff --git a/internal/pkg/gateway/mocks/discovery.go b/internal/pkg/gateway/mocks/discovery.go index 77d25ce34c2..782c43162c4 100644 --- a/internal/pkg/gateway/mocks/discovery.go +++ b/internal/pkg/gateway/mocks/discovery.go @@ -27,8 +27,9 @@ type Discovery struct { } IdentityInfoStub func() api.PeerIdentitySet identityInfoMutex sync.RWMutex - identityInfoArgsForCall []struct{} - identityInfoReturns struct { + identityInfoArgsForCall []struct { + } + identityInfoReturns struct { result1 api.PeerIdentitySet } identityInfoReturnsOnCall map[int]struct { @@ -130,7 +131,8 @@ func (fake *Discovery) ConfigReturnsOnCall(i int, result1 *discovery.ConfigResul func (fake *Discovery) IdentityInfo() api.PeerIdentitySet { fake.identityInfoMutex.Lock() ret, specificReturn := fake.identityInfoReturnsOnCall[len(fake.identityInfoArgsForCall)] - fake.identityInfoArgsForCall = append(fake.identityInfoArgsForCall, struct{}{}) + fake.identityInfoArgsForCall = append(fake.identityInfoArgsForCall, struct { + }{}) stub := fake.IdentityInfoStub fakeReturns := fake.identityInfoReturns fake.recordInvocation("IdentityInfo", []interface{}{}) diff --git a/internal/pkg/gateway/mocks/eventer.go b/internal/pkg/gateway/mocks/eventer.go deleted file mode 100644 index 3396f14218d..00000000000 --- a/internal/pkg/gateway/mocks/eventer.go +++ /dev/null @@ -1,119 +0,0 @@ -// Code generated by counterfeiter. DO NOT EDIT. -package mocks - -import ( - "context" - "sync" - - "github.com/hyperledger/fabric/internal/pkg/gateway/commit" -) - -type Eventer struct { - ChaincodeEventsStub func(context.Context, string, string) (<-chan *commit.BlockChaincodeEvents, error) - chaincodeEventsMutex sync.RWMutex - chaincodeEventsArgsForCall []struct { - arg1 context.Context - arg2 string - arg3 string - } - chaincodeEventsReturns struct { - result1 <-chan *commit.BlockChaincodeEvents - result2 error - } - chaincodeEventsReturnsOnCall map[int]struct { - result1 <-chan *commit.BlockChaincodeEvents - result2 error - } - invocations map[string][][]interface{} - invocationsMutex sync.RWMutex -} - -func (fake *Eventer) ChaincodeEvents(arg1 context.Context, arg2 string, arg3 string) (<-chan *commit.BlockChaincodeEvents, error) { - fake.chaincodeEventsMutex.Lock() - ret, specificReturn := fake.chaincodeEventsReturnsOnCall[len(fake.chaincodeEventsArgsForCall)] - fake.chaincodeEventsArgsForCall = append(fake.chaincodeEventsArgsForCall, struct { - arg1 context.Context - arg2 string - arg3 string - }{arg1, arg2, arg3}) - stub := fake.ChaincodeEventsStub - fakeReturns := fake.chaincodeEventsReturns - fake.recordInvocation("ChaincodeEvents", []interface{}{arg1, arg2, arg3}) - fake.chaincodeEventsMutex.Unlock() - if stub != nil { - return stub(arg1, arg2, arg3) - } - if specificReturn { - return ret.result1, ret.result2 - } - return fakeReturns.result1, fakeReturns.result2 -} - -func (fake *Eventer) ChaincodeEventsCallCount() int { - fake.chaincodeEventsMutex.RLock() - defer fake.chaincodeEventsMutex.RUnlock() - return len(fake.chaincodeEventsArgsForCall) -} - -func (fake *Eventer) ChaincodeEventsCalls(stub func(context.Context, string, string) (<-chan *commit.BlockChaincodeEvents, error)) { - fake.chaincodeEventsMutex.Lock() - defer fake.chaincodeEventsMutex.Unlock() - fake.ChaincodeEventsStub = stub -} - -func (fake *Eventer) ChaincodeEventsArgsForCall(i int) (context.Context, string, string) { - fake.chaincodeEventsMutex.RLock() - defer fake.chaincodeEventsMutex.RUnlock() - argsForCall := fake.chaincodeEventsArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 -} - -func (fake *Eventer) ChaincodeEventsReturns(result1 <-chan *commit.BlockChaincodeEvents, result2 error) { - fake.chaincodeEventsMutex.Lock() - defer fake.chaincodeEventsMutex.Unlock() - fake.ChaincodeEventsStub = nil - fake.chaincodeEventsReturns = struct { - result1 <-chan *commit.BlockChaincodeEvents - result2 error - }{result1, result2} -} - -func (fake *Eventer) ChaincodeEventsReturnsOnCall(i int, result1 <-chan *commit.BlockChaincodeEvents, result2 error) { - fake.chaincodeEventsMutex.Lock() - defer fake.chaincodeEventsMutex.Unlock() - fake.ChaincodeEventsStub = nil - if fake.chaincodeEventsReturnsOnCall == nil { - fake.chaincodeEventsReturnsOnCall = make(map[int]struct { - result1 <-chan *commit.BlockChaincodeEvents - result2 error - }) - } - fake.chaincodeEventsReturnsOnCall[i] = struct { - result1 <-chan *commit.BlockChaincodeEvents - result2 error - }{result1, result2} -} - -func (fake *Eventer) Invocations() map[string][][]interface{} { - fake.invocationsMutex.RLock() - defer fake.invocationsMutex.RUnlock() - fake.chaincodeEventsMutex.RLock() - defer fake.chaincodeEventsMutex.RUnlock() - copiedInvocations := map[string][][]interface{}{} - for key, value := range fake.invocations { - copiedInvocations[key] = value - } - return copiedInvocations -} - -func (fake *Eventer) recordInvocation(key string, args []interface{}) { - fake.invocationsMutex.Lock() - defer fake.invocationsMutex.Unlock() - if fake.invocations == nil { - fake.invocations = map[string][][]interface{}{} - } - if fake.invocations[key] == nil { - fake.invocations[key] = [][]interface{}{} - } - fake.invocations[key] = append(fake.invocations[key], args) -} diff --git a/internal/pkg/gateway/mocks/ledger.go b/internal/pkg/gateway/mocks/ledger.go new file mode 100644 index 00000000000..e6dc41e00e0 --- /dev/null +++ b/internal/pkg/gateway/mocks/ledger.go @@ -0,0 +1,294 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mocks + +import ( + "sync" + + "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric/common/ledger" +) + +type Ledger struct { + CloseStub func() + closeMutex sync.RWMutex + closeArgsForCall []struct { + } + GetBlockByNumberStub func(uint64) (*common.Block, error) + getBlockByNumberMutex sync.RWMutex + getBlockByNumberArgsForCall []struct { + arg1 uint64 + } + getBlockByNumberReturns struct { + result1 *common.Block + result2 error + } + getBlockByNumberReturnsOnCall map[int]struct { + result1 *common.Block + result2 error + } + GetBlockchainInfoStub func() (*common.BlockchainInfo, error) + getBlockchainInfoMutex sync.RWMutex + getBlockchainInfoArgsForCall []struct { + } + getBlockchainInfoReturns struct { + result1 *common.BlockchainInfo + result2 error + } + getBlockchainInfoReturnsOnCall map[int]struct { + result1 *common.BlockchainInfo + result2 error + } + GetBlocksIteratorStub func(uint64) (ledger.ResultsIterator, error) + getBlocksIteratorMutex sync.RWMutex + getBlocksIteratorArgsForCall []struct { + arg1 uint64 + } + getBlocksIteratorReturns struct { + result1 ledger.ResultsIterator + result2 error + } + getBlocksIteratorReturnsOnCall map[int]struct { + result1 ledger.ResultsIterator + result2 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *Ledger) Close() { + fake.closeMutex.Lock() + fake.closeArgsForCall = append(fake.closeArgsForCall, struct { + }{}) + stub := fake.CloseStub + fake.recordInvocation("Close", []interface{}{}) + fake.closeMutex.Unlock() + if stub != nil { + fake.CloseStub() + } +} + +func (fake *Ledger) CloseCallCount() int { + fake.closeMutex.RLock() + defer fake.closeMutex.RUnlock() + return len(fake.closeArgsForCall) +} + +func (fake *Ledger) CloseCalls(stub func()) { + fake.closeMutex.Lock() + defer fake.closeMutex.Unlock() + fake.CloseStub = stub +} + +func (fake *Ledger) GetBlockByNumber(arg1 uint64) (*common.Block, error) { + fake.getBlockByNumberMutex.Lock() + ret, specificReturn := fake.getBlockByNumberReturnsOnCall[len(fake.getBlockByNumberArgsForCall)] + fake.getBlockByNumberArgsForCall = append(fake.getBlockByNumberArgsForCall, struct { + arg1 uint64 + }{arg1}) + stub := fake.GetBlockByNumberStub + fakeReturns := fake.getBlockByNumberReturns + fake.recordInvocation("GetBlockByNumber", []interface{}{arg1}) + fake.getBlockByNumberMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *Ledger) GetBlockByNumberCallCount() int { + fake.getBlockByNumberMutex.RLock() + defer fake.getBlockByNumberMutex.RUnlock() + return len(fake.getBlockByNumberArgsForCall) +} + +func (fake *Ledger) GetBlockByNumberCalls(stub func(uint64) (*common.Block, error)) { + fake.getBlockByNumberMutex.Lock() + defer fake.getBlockByNumberMutex.Unlock() + fake.GetBlockByNumberStub = stub +} + +func (fake *Ledger) GetBlockByNumberArgsForCall(i int) uint64 { + fake.getBlockByNumberMutex.RLock() + defer fake.getBlockByNumberMutex.RUnlock() + argsForCall := fake.getBlockByNumberArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *Ledger) GetBlockByNumberReturns(result1 *common.Block, result2 error) { + fake.getBlockByNumberMutex.Lock() + defer fake.getBlockByNumberMutex.Unlock() + fake.GetBlockByNumberStub = nil + fake.getBlockByNumberReturns = struct { + result1 *common.Block + result2 error + }{result1, result2} +} + +func (fake *Ledger) GetBlockByNumberReturnsOnCall(i int, result1 *common.Block, result2 error) { + fake.getBlockByNumberMutex.Lock() + defer fake.getBlockByNumberMutex.Unlock() + fake.GetBlockByNumberStub = nil + if fake.getBlockByNumberReturnsOnCall == nil { + fake.getBlockByNumberReturnsOnCall = make(map[int]struct { + result1 *common.Block + result2 error + }) + } + fake.getBlockByNumberReturnsOnCall[i] = struct { + result1 *common.Block + result2 error + }{result1, result2} +} + +func (fake *Ledger) GetBlockchainInfo() (*common.BlockchainInfo, error) { + fake.getBlockchainInfoMutex.Lock() + ret, specificReturn := fake.getBlockchainInfoReturnsOnCall[len(fake.getBlockchainInfoArgsForCall)] + fake.getBlockchainInfoArgsForCall = append(fake.getBlockchainInfoArgsForCall, struct { + }{}) + stub := fake.GetBlockchainInfoStub + fakeReturns := fake.getBlockchainInfoReturns + fake.recordInvocation("GetBlockchainInfo", []interface{}{}) + fake.getBlockchainInfoMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *Ledger) GetBlockchainInfoCallCount() int { + fake.getBlockchainInfoMutex.RLock() + defer fake.getBlockchainInfoMutex.RUnlock() + return len(fake.getBlockchainInfoArgsForCall) +} + +func (fake *Ledger) GetBlockchainInfoCalls(stub func() (*common.BlockchainInfo, error)) { + fake.getBlockchainInfoMutex.Lock() + defer fake.getBlockchainInfoMutex.Unlock() + fake.GetBlockchainInfoStub = stub +} + +func (fake *Ledger) GetBlockchainInfoReturns(result1 *common.BlockchainInfo, result2 error) { + fake.getBlockchainInfoMutex.Lock() + defer fake.getBlockchainInfoMutex.Unlock() + fake.GetBlockchainInfoStub = nil + fake.getBlockchainInfoReturns = struct { + result1 *common.BlockchainInfo + result2 error + }{result1, result2} +} + +func (fake *Ledger) GetBlockchainInfoReturnsOnCall(i int, result1 *common.BlockchainInfo, result2 error) { + fake.getBlockchainInfoMutex.Lock() + defer fake.getBlockchainInfoMutex.Unlock() + fake.GetBlockchainInfoStub = nil + if fake.getBlockchainInfoReturnsOnCall == nil { + fake.getBlockchainInfoReturnsOnCall = make(map[int]struct { + result1 *common.BlockchainInfo + result2 error + }) + } + fake.getBlockchainInfoReturnsOnCall[i] = struct { + result1 *common.BlockchainInfo + result2 error + }{result1, result2} +} + +func (fake *Ledger) GetBlocksIterator(arg1 uint64) (ledger.ResultsIterator, error) { + fake.getBlocksIteratorMutex.Lock() + ret, specificReturn := fake.getBlocksIteratorReturnsOnCall[len(fake.getBlocksIteratorArgsForCall)] + fake.getBlocksIteratorArgsForCall = append(fake.getBlocksIteratorArgsForCall, struct { + arg1 uint64 + }{arg1}) + stub := fake.GetBlocksIteratorStub + fakeReturns := fake.getBlocksIteratorReturns + fake.recordInvocation("GetBlocksIterator", []interface{}{arg1}) + fake.getBlocksIteratorMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *Ledger) GetBlocksIteratorCallCount() int { + fake.getBlocksIteratorMutex.RLock() + defer fake.getBlocksIteratorMutex.RUnlock() + return len(fake.getBlocksIteratorArgsForCall) +} + +func (fake *Ledger) GetBlocksIteratorCalls(stub func(uint64) (ledger.ResultsIterator, error)) { + fake.getBlocksIteratorMutex.Lock() + defer fake.getBlocksIteratorMutex.Unlock() + fake.GetBlocksIteratorStub = stub +} + +func (fake *Ledger) GetBlocksIteratorArgsForCall(i int) uint64 { + fake.getBlocksIteratorMutex.RLock() + defer fake.getBlocksIteratorMutex.RUnlock() + argsForCall := fake.getBlocksIteratorArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *Ledger) GetBlocksIteratorReturns(result1 ledger.ResultsIterator, result2 error) { + fake.getBlocksIteratorMutex.Lock() + defer fake.getBlocksIteratorMutex.Unlock() + fake.GetBlocksIteratorStub = nil + fake.getBlocksIteratorReturns = struct { + result1 ledger.ResultsIterator + result2 error + }{result1, result2} +} + +func (fake *Ledger) GetBlocksIteratorReturnsOnCall(i int, result1 ledger.ResultsIterator, result2 error) { + fake.getBlocksIteratorMutex.Lock() + defer fake.getBlocksIteratorMutex.Unlock() + fake.GetBlocksIteratorStub = nil + if fake.getBlocksIteratorReturnsOnCall == nil { + fake.getBlocksIteratorReturnsOnCall = make(map[int]struct { + result1 ledger.ResultsIterator + result2 error + }) + } + fake.getBlocksIteratorReturnsOnCall[i] = struct { + result1 ledger.ResultsIterator + result2 error + }{result1, result2} +} + +func (fake *Ledger) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.closeMutex.RLock() + defer fake.closeMutex.RUnlock() + fake.getBlockByNumberMutex.RLock() + defer fake.getBlockByNumberMutex.RUnlock() + fake.getBlockchainInfoMutex.RLock() + defer fake.getBlockchainInfoMutex.RUnlock() + fake.getBlocksIteratorMutex.RLock() + defer fake.getBlocksIteratorMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *Ledger) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} diff --git a/internal/pkg/gateway/mocks/ledgerprovider.go b/internal/pkg/gateway/mocks/ledgerprovider.go new file mode 100644 index 00000000000..bfbaf33ee08 --- /dev/null +++ b/internal/pkg/gateway/mocks/ledgerprovider.go @@ -0,0 +1,114 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mocks + +import ( + "sync" + + "github.com/hyperledger/fabric/common/ledger" +) + +type LedgerProvider struct { + LedgerStub func(string) (ledger.Ledger, error) + ledgerMutex sync.RWMutex + ledgerArgsForCall []struct { + arg1 string + } + ledgerReturns struct { + result1 ledger.Ledger + result2 error + } + ledgerReturnsOnCall map[int]struct { + result1 ledger.Ledger + result2 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *LedgerProvider) Ledger(arg1 string) (ledger.Ledger, error) { + fake.ledgerMutex.Lock() + ret, specificReturn := fake.ledgerReturnsOnCall[len(fake.ledgerArgsForCall)] + fake.ledgerArgsForCall = append(fake.ledgerArgsForCall, struct { + arg1 string + }{arg1}) + stub := fake.LedgerStub + fakeReturns := fake.ledgerReturns + fake.recordInvocation("Ledger", []interface{}{arg1}) + fake.ledgerMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *LedgerProvider) LedgerCallCount() int { + fake.ledgerMutex.RLock() + defer fake.ledgerMutex.RUnlock() + return len(fake.ledgerArgsForCall) +} + +func (fake *LedgerProvider) LedgerCalls(stub func(string) (ledger.Ledger, error)) { + fake.ledgerMutex.Lock() + defer fake.ledgerMutex.Unlock() + fake.LedgerStub = stub +} + +func (fake *LedgerProvider) LedgerArgsForCall(i int) string { + fake.ledgerMutex.RLock() + defer fake.ledgerMutex.RUnlock() + argsForCall := fake.ledgerArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *LedgerProvider) LedgerReturns(result1 ledger.Ledger, result2 error) { + fake.ledgerMutex.Lock() + defer fake.ledgerMutex.Unlock() + fake.LedgerStub = nil + fake.ledgerReturns = struct { + result1 ledger.Ledger + result2 error + }{result1, result2} +} + +func (fake *LedgerProvider) LedgerReturnsOnCall(i int, result1 ledger.Ledger, result2 error) { + fake.ledgerMutex.Lock() + defer fake.ledgerMutex.Unlock() + fake.LedgerStub = nil + if fake.ledgerReturnsOnCall == nil { + fake.ledgerReturnsOnCall = make(map[int]struct { + result1 ledger.Ledger + result2 error + }) + } + fake.ledgerReturnsOnCall[i] = struct { + result1 ledger.Ledger + result2 error + }{result1, result2} +} + +func (fake *LedgerProvider) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.ledgerMutex.RLock() + defer fake.ledgerMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *LedgerProvider) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} diff --git a/internal/pkg/gateway/mocks/resultsiterator.go b/internal/pkg/gateway/mocks/resultsiterator.go new file mode 100644 index 00000000000..a8b0007f4ff --- /dev/null +++ b/internal/pkg/gateway/mocks/resultsiterator.go @@ -0,0 +1,135 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mocks + +import ( + "sync" + + "github.com/hyperledger/fabric/common/ledger" +) + +type ResultsIterator struct { + CloseStub func() + closeMutex sync.RWMutex + closeArgsForCall []struct { + } + NextStub func() (ledger.QueryResult, error) + nextMutex sync.RWMutex + nextArgsForCall []struct { + } + nextReturns struct { + result1 ledger.QueryResult + result2 error + } + nextReturnsOnCall map[int]struct { + result1 ledger.QueryResult + result2 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *ResultsIterator) Close() { + fake.closeMutex.Lock() + fake.closeArgsForCall = append(fake.closeArgsForCall, struct { + }{}) + stub := fake.CloseStub + fake.recordInvocation("Close", []interface{}{}) + fake.closeMutex.Unlock() + if stub != nil { + fake.CloseStub() + } +} + +func (fake *ResultsIterator) CloseCallCount() int { + fake.closeMutex.RLock() + defer fake.closeMutex.RUnlock() + return len(fake.closeArgsForCall) +} + +func (fake *ResultsIterator) CloseCalls(stub func()) { + fake.closeMutex.Lock() + defer fake.closeMutex.Unlock() + fake.CloseStub = stub +} + +func (fake *ResultsIterator) Next() (ledger.QueryResult, error) { + fake.nextMutex.Lock() + ret, specificReturn := fake.nextReturnsOnCall[len(fake.nextArgsForCall)] + fake.nextArgsForCall = append(fake.nextArgsForCall, struct { + }{}) + stub := fake.NextStub + fakeReturns := fake.nextReturns + fake.recordInvocation("Next", []interface{}{}) + fake.nextMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *ResultsIterator) NextCallCount() int { + fake.nextMutex.RLock() + defer fake.nextMutex.RUnlock() + return len(fake.nextArgsForCall) +} + +func (fake *ResultsIterator) NextCalls(stub func() (ledger.QueryResult, error)) { + fake.nextMutex.Lock() + defer fake.nextMutex.Unlock() + fake.NextStub = stub +} + +func (fake *ResultsIterator) NextReturns(result1 ledger.QueryResult, result2 error) { + fake.nextMutex.Lock() + defer fake.nextMutex.Unlock() + fake.NextStub = nil + fake.nextReturns = struct { + result1 ledger.QueryResult + result2 error + }{result1, result2} +} + +func (fake *ResultsIterator) NextReturnsOnCall(i int, result1 ledger.QueryResult, result2 error) { + fake.nextMutex.Lock() + defer fake.nextMutex.Unlock() + fake.NextStub = nil + if fake.nextReturnsOnCall == nil { + fake.nextReturnsOnCall = make(map[int]struct { + result1 ledger.QueryResult + result2 error + }) + } + fake.nextReturnsOnCall[i] = struct { + result1 ledger.QueryResult + result2 error + }{result1, result2} +} + +func (fake *ResultsIterator) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.closeMutex.RLock() + defer fake.closeMutex.RUnlock() + fake.nextMutex.RLock() + defer fake.nextMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *ResultsIterator) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} diff --git a/internal/pkg/gateway/peeradapter.go b/internal/pkg/gateway/peeradapter.go index 8ba43d50cb4..ad75b17ab19 100644 --- a/internal/pkg/gateway/peeradapter.go +++ b/internal/pkg/gateway/peeradapter.go @@ -8,18 +8,19 @@ package gateway import ( peerproto "github.com/hyperledger/fabric-protos-go/peer" - "github.com/hyperledger/fabric/core/ledger" + commonledger "github.com/hyperledger/fabric/common/ledger" + coreledger "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/peer" "github.com/pkg/errors" ) // peerAdapter presents a small piece of the Peer in a form that can be easily used (and mocked) by the gateway's -// transaction status checking. +// transaction status checking and eventing. type peerAdapter struct { Peer *peer.Peer } -func (adapter *peerAdapter) CommitNotifications(done <-chan struct{}, channelName string) (<-chan *ledger.CommitNotification, error) { +func (adapter *peerAdapter) CommitNotifications(done <-chan struct{}, channelName string) (<-chan *coreledger.CommitNotification, error) { channel, err := adapter.channel(channelName) if err != nil { return nil, err @@ -34,9 +35,7 @@ func (adapter *peerAdapter) TransactionStatus(channelName string, transactionID return 0, 0, err } - ledger := channel.Ledger() - - status, blockNumber, err := ledger.GetTxValidationCodeByTxID(transactionID) + status, blockNumber, err := channel.Ledger().GetTxValidationCodeByTxID(transactionID) if err != nil { return 0, 0, err } @@ -44,6 +43,15 @@ func (adapter *peerAdapter) TransactionStatus(channelName string, transactionID return status, blockNumber, nil } +func (adapter *peerAdapter) Ledger(channelName string) (commonledger.Ledger, error) { + channel, err := adapter.channel(channelName) + if err != nil { + return nil, err + } + + return channel.Ledger(), nil +} + func (adapter *peerAdapter) channel(name string) (*peer.Channel, error) { channel := adapter.Peer.Channel(name) if channel == nil { diff --git a/internal/pkg/gateway/peeradapter_test.go b/internal/pkg/gateway/peeradapter_test.go index 9286ffa1f3d..d028cd9f327 100644 --- a/internal/pkg/gateway/peeradapter_test.go +++ b/internal/pkg/gateway/peeradapter_test.go @@ -37,4 +37,16 @@ func TestPeerAdapter(t *testing.T) { require.ErrorContains(t, err, "CHANNEL") }) }) + + t.Run("Ledger", func(t *testing.T) { + t.Run("returns error when channel does not exist", func(t *testing.T) { + adapter := &peerAdapter{ + Peer: &peer.Peer{}, + } + + _, err := adapter.Ledger("CHANNEL") + + require.ErrorContains(t, err, "CHANNEL") + }) + }) }