Skip to content

Commit

Permalink
[FAB-9691] Extract PendingQueryResult
Browse files Browse the repository at this point in the history
Extract and explicitly unit test this object.

Change-Id: I2585b2582ab5bfbd6a6f6c5791323e7de05835be
Signed-off-by: Matthew Sykes <[email protected]>
  • Loading branch information
sykesm committed Apr 25, 2018
1 parent aa2cc94 commit b5ced55
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 34 deletions.
37 changes: 7 additions & 30 deletions core/chaincode/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,12 @@ type TransactionContext struct {

// tracks open iterators used for range queries
queryIteratorMap map[string]commonledger.ResultsIterator
pendingQueryResults map[string]*pendingQueryResult
pendingQueryResults map[string]*PendingQueryResult

txsimulator ledger.TxSimulator
historyQueryExecutor ledger.HistoryQueryExecutor
}

type pendingQueryResult struct {
batch []*pb.QueryResultBytes
count int
}

type stateHandlers map[pb.ChaincodeMessage_Type]func(*pb.ChaincodeMessage)

// internal interface to scope dependencies on ChaincodeSupport
Expand Down Expand Up @@ -206,7 +201,7 @@ func (h *Handler) initializeQueryContext(txContext *TransactionContext, queryID
h.Lock()
defer h.Unlock()
txContext.queryIteratorMap[queryID] = queryIterator
txContext.pendingQueryResults[queryID] = &pendingQueryResult{batch: make([]*pb.QueryResultBytes, 0)}
txContext.pendingQueryResults[queryID] = &PendingQueryResult{batch: make([]*pb.QueryResultBytes, 0)}
}

func (h *Handler) getQueryIterator(txContext *TransactionContext, queryID string) commonledger.ResultsIterator {
Expand Down Expand Up @@ -668,44 +663,26 @@ func getQueryResponse(h *Handler, txContext *TransactionContext, iter commonledg
return nil, err
case queryResult == nil:
// nil response from iterator indicates end of query results
batch := pendingQueryResults.cut()
batch := pendingQueryResults.Cut()
h.cleanupQueryContext(txContext, iterID)
return &pb.QueryResponse{Results: batch, HasMore: false, Id: iterID}, nil
case pendingQueryResults.count == maxResultLimit:
case pendingQueryResults.Size() == maxResultLimit:
// max number of results queued up, cut batch, then add current result to pending batch
batch := pendingQueryResults.cut()
if err := pendingQueryResults.add(queryResult); err != nil {
batch := pendingQueryResults.Cut()
if err := pendingQueryResults.Add(queryResult); err != nil {
h.cleanupQueryContext(txContext, iterID)
return nil, err
}
return &pb.QueryResponse{Results: batch, HasMore: true, Id: iterID}, nil
default:
if err := pendingQueryResults.add(queryResult); err != nil {
if err := pendingQueryResults.Add(queryResult); err != nil {
h.cleanupQueryContext(txContext, iterID)
return nil, err
}
}
}
}

func (p *pendingQueryResult) cut() []*pb.QueryResultBytes {
batch := p.batch
p.batch = nil
p.count = 0
return batch
}

func (p *pendingQueryResult) add(queryResult commonledger.QueryResult) error {
queryResultBytes, err := proto.Marshal(queryResult.(proto.Message))
if err != nil {
chaincodeLogger.Errorf("Failed to get encode query result as bytes")
return err
}
p.batch = append(p.batch, &pb.QueryResultBytes{ResultBytes: queryResultBytes})
p.count = len(p.batch)
return nil
}

// Handles query to ledger for query state next
func (h *Handler) handleQueryStateNext(msg *pb.ChaincodeMessage) {
go func() {
Expand Down
3 changes: 1 addition & 2 deletions core/chaincode/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestGetQueryResponse(t *testing.T) {
handler := &Handler{}
TransactionContext := &TransactionContext{
queryIteratorMap: make(map[string]ledger.ResultsIterator),
pendingQueryResults: make(map[string]*pendingQueryResult),
pendingQueryResults: make(map[string]*PendingQueryResult),
}
queryID := "test"
t.Run(fmt.Sprintf("%d", tc.expectedResultCount), func(t *testing.T) {
Expand All @@ -65,7 +65,6 @@ func TestGetQueryResponse(t *testing.T) {
totalResultCount := 0
for hasMoreCount := 0; hasMoreCount <= tc.expectedHasMoreCount; hasMoreCount++ {
queryResponse, _ := getQueryResponse(handler, TransactionContext, resultsIterator, queryID)
assert.NotNil(t, queryResponse.GetResults())
if queryResponse.GetHasMore() {
t.Logf("Got %d results and more are expected.", len(queryResponse.GetResults()))
} else {
Expand Down
37 changes: 37 additions & 0 deletions core/chaincode/pending_query_result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package chaincode

import (
"github.com/golang/protobuf/proto"
commonledger "github.com/hyperledger/fabric/common/ledger"
pb "github.com/hyperledger/fabric/protos/peer"
)

type PendingQueryResult struct {
batch []*pb.QueryResultBytes
}

func (p *PendingQueryResult) Cut() []*pb.QueryResultBytes {
batch := p.batch
p.batch = nil
return batch
}

func (p *PendingQueryResult) Add(queryResult commonledger.QueryResult) error {
queryResultBytes, err := proto.Marshal(queryResult.(proto.Message))
if err != nil {
chaincodeLogger.Errorf("failed to marshal query result: %s", err)
return err
}
p.batch = append(p.batch, &pb.QueryResultBytes{ResultBytes: queryResultBytes})
return nil
}

func (p *PendingQueryResult) Size() int {
return len(p.batch)
}
54 changes: 54 additions & 0 deletions core/chaincode/pending_query_result_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package chaincode_test

import (
"fmt"
"testing"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/chaincode"
"github.com/hyperledger/fabric/protos/ledger/queryresult"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)

func TestPendingQueryResult(t *testing.T) {
pqr := chaincode.PendingQueryResult{}
assert.Equal(t, 0, pqr.Size())
assert.Nil(t, pqr.Cut())

for i := 1; i <= 5; i++ {
kv := &queryresult.KV{Key: fmt.Sprintf("key-%d", i)}
err := pqr.Add(kv)
assert.NoError(t, err)
assert.Equal(t, i, pqr.Size())
}

results := pqr.Cut()
assert.Len(t, results, 5)
assert.Equal(t, 0, pqr.Size())
for i, bytes := range results {
var kv queryresult.KV
err := proto.Unmarshal(bytes.ResultBytes, &kv)
assert.NoError(t, err)
assert.Equal(t, kv.Key, fmt.Sprintf("key-%d", i+1))
}
}

func TestPendingQueryResultBadQueryResult(t *testing.T) {
pqr := chaincode.PendingQueryResult{}
err := pqr.Add(brokenProto{})
assert.EqualError(t, err, "marshal-failed")
}

type brokenProto struct{}

func (brokenProto) Reset() {}
func (brokenProto) String() string { return "" }
func (brokenProto) ProtoMessage() {}
func (brokenProto) Marshal() ([]byte, error) { return nil, errors.New("marshal-failed") }
2 changes: 1 addition & 1 deletion core/chaincode/transaction_contexts.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (c *TransactionContexts) Create(ctx context.Context, chainID, txID string,
proposal: prop,
responseNotifier: make(chan *pb.ChaincodeMessage, 1),
queryIteratorMap: map[string]commonledger.ResultsIterator{},
pendingQueryResults: map[string]*pendingQueryResult{},
pendingQueryResults: map[string]*PendingQueryResult{},
txsimulator: getTxSimulator(ctx),
historyQueryExecutor: getHistoryQueryExecutor(ctx),
}
Expand Down
2 changes: 1 addition & 1 deletion core/chaincode/transaction_contexts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var _ = Describe("TransactionContexts", func() {
Expect(txContext.responseNotifier).NotTo(BeNil())
Expect(txContext.responseNotifier).NotTo(BeClosed())
Expect(txContext.queryIteratorMap).To(Equal(map[string]commonledger.ResultsIterator{}))
Expect(txContext.pendingQueryResults).To(Equal(map[string]*pendingQueryResult{}))
Expect(txContext.pendingQueryResults).To(Equal(map[string]*PendingQueryResult{}))
Expect(txContext.txsimulator).To(Equal(fakeTxSimulator))
Expect(txContext.historyQueryExecutor).To(Equal(fakeHistoryQueryExecutor))
})
Expand Down

0 comments on commit b5ced55

Please sign in to comment.