Skip to content

Commit

Permalink
FAB-1172 - Advanced simulation functions for CouchDB
Browse files Browse the repository at this point in the history
Added implementations for the following:

    QueryExecutor.GetStateRangeScanIterator()
    QueryExecutor.GetStateMultipleKeys()
    TxSimulator.DeleteState()
    TxSimulator.SetStateMultipleKeys()

Unit testing was extended for the range query in couchdb.go

Additional unit testing for simulation functions will be added
following a refactoring of unit tests for goleveldb.

Change-Id: Id0907ace75767fe4b296f16c10d18693718d790f
Signed-off-by: Chris Elder <[email protected]>
  • Loading branch information
Chris Elder committed Dec 12, 2016
1 parent 87a0ce8 commit 2ebd342
Show file tree
Hide file tree
Showing 5 changed files with 464 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,27 @@ func (q *CouchDBQueryExecutor) GetState(ns string, key string) ([]byte, error) {

// GetStateMultipleKeys implements method in interface `ledger.QueryExecutor`
func (q *CouchDBQueryExecutor) GetStateMultipleKeys(namespace string, keys []string) ([][]byte, error) {
return nil, errors.New("Not yet implemented")
var results [][]byte
var value []byte
var err error
for _, key := range keys {
value, err = q.GetState(namespace, key)
if err != nil {
return nil, err
}
results = append(results, value)
}
return results, nil
}

// GetStateRangeScanIterator implements method in interface `ledger.QueryExecutor`
func (q *CouchDBQueryExecutor) GetStateRangeScanIterator(namespace string, startKey string, endKey string) (ledger.ResultsIterator, error) {
return nil, errors.New("Not yet implemented")
//q.checkDone()
scanner, err := q.txmgr.getCommittedRangeScanner(namespace, startKey, endKey)
if err != nil {
return nil, err
}
return &qKVItr{scanner}, nil
}

// GetTransactionsForKey - implements method in interface `ledger.QueryExecutor`
Expand All @@ -61,3 +76,25 @@ func (q *CouchDBQueryExecutor) ExecuteQuery(query string) (ledger.ResultsIterato
func (q *CouchDBQueryExecutor) Done() {
//TODO - acquire lock when constructing and release the lock here
}

type qKVItr struct {
s *kvScanner
}

// Next implements Next() method in ledger.ResultsIterator
func (itr *qKVItr) Next() (ledger.QueryResult, error) {
committedKV, err := itr.s.next()
if err != nil {
return nil, err
}
if committedKV == nil {
return nil, nil
}

return &ledger.KV{Key: committedKV.key, Value: committedKV.value}, nil
}

// Close implements Close() method in ledger.ResultsIterator
func (itr *qKVItr) Close() {
itr.s.close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"reflect"

"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt"
logging "github.com/op/go-logging"
)
Expand Down Expand Up @@ -108,6 +109,16 @@ func (s *CouchDBTxSimulator) GetState(ns string, key string) ([]byte, error) {
return value, nil
}

// GetStateRangeScanIterator implements method in interface `ledger.QueryExecutor`
func (s *CouchDBTxSimulator) GetStateRangeScanIterator(namespace string, startKey string, endKey string) (ledger.ResultsIterator, error) {
//s.checkDone()
scanner, err := s.txmgr.getCommittedRangeScanner(namespace, startKey, endKey)
if err != nil {
return nil, err
}
return &sKVItr{scanner, s}, nil
}

// SetState implements method in interface `ledger.TxSimulator`
func (s *CouchDBTxSimulator) SetState(ns string, key string, value []byte) error {
logger.Debugf("===COUCHDB=== Entering CouchDBTxSimulator.SetState()")
Expand Down Expand Up @@ -190,7 +201,12 @@ func (s *CouchDBTxSimulator) GetTxSimulationResults() ([]byte, error) {

// SetStateMultipleKeys implements method in interface `ledger.TxSimulator`
func (s *CouchDBTxSimulator) SetStateMultipleKeys(namespace string, kvs map[string][]byte) error {
return errors.New("Not yet implemented")
for k, v := range kvs {
if err := s.SetState(namespace, k, v); err != nil {
return err
}
}
return nil
}

// CopyState implements method in interface `ledger.TxSimulator`
Expand All @@ -202,3 +218,34 @@ func (s *CouchDBTxSimulator) CopyState(sourceNamespace string, targetNamespace s
func (s *CouchDBTxSimulator) ExecuteUpdate(query string) error {
return errors.New("Not supported by KV data model")
}

type sKVItr struct {
scanner *kvScanner
simulator *CouchDBTxSimulator
}

// Next implements Next() method in ledger.ResultsIterator
// Returns the next item in the result set. The `QueryResult` is expected to be nil when
// the iterator gets exhausted
func (itr *sKVItr) Next() (ledger.QueryResult, error) {
committedKV, err := itr.scanner.next()
if err != nil {
return nil, err
}
if committedKV == nil {
return nil, nil
}

// Get existing cache for RW at the namespace of the result set if it exists. If none exists, then create it.
nsRWs := itr.simulator.getOrCreateNsRWHolder(itr.scanner.namespace)
nsRWs.readMap[committedKV.key] = &kvReadCache{
&txmgmt.KVRead{Key: committedKV.key, Version: committedKV.version}, committedKV.value}

return &ledger.KV{Key: committedKV.key, Value: committedKV.value}, nil
}

// Close implements Close() method in ledger.ResultsIterator
// which releases resources occupied by the iterator.
func (itr *sKVItr) Close() {
itr.scanner.close()
}
64 changes: 63 additions & 1 deletion core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package couchdbtxmgmt

import (
"bytes"
"encoding/json"
"errors"
"sync"
Expand All @@ -36,6 +37,8 @@ import (

var logger = logging.MustGetLogger("couchdbtxmgmt")

var compositeKeySep = []byte{0x00}

// Conf - configuration for `CouchDBTxMgr`
type Conf struct {
DBPath string
Expand Down Expand Up @@ -383,6 +386,22 @@ func (txmgr *CouchDBTxMgr) getCommittedValueAndVersion(ns string, key string) ([
return docBytes, ver, nil
}

//getCommittedRangeScanner contructs composite start and end keys based on the namespace then calls the CouchDB range scanner
func (txmgr *CouchDBTxMgr) getCommittedRangeScanner(namespace string, startKey string, endKey string) (*kvScanner, error) {
var compositeStartKey []byte
var compositeEndKey []byte
if startKey != "" {
compositeStartKey = constructCompositeKey(namespace, startKey)
}
if endKey != "" {
compositeEndKey = constructCompositeKey(namespace, endKey)
}

queryResult, _ := txmgr.couchDB.ReadDocRange(string(compositeStartKey), string(compositeEndKey), 1000, 0)

return newKVScanner(namespace, *queryResult), nil
}

func encodeValue(value []byte, version uint64) []byte {
versionBytes := proto.EncodeVarint(version)
deleteMarker := 0
Expand All @@ -409,7 +428,50 @@ func decodeValue(encodedValue []byte) ([]byte, uint64) {

func constructCompositeKey(ns string, key string) []byte {
compositeKey := []byte(ns)
compositeKey = append(compositeKey, byte(0))
compositeKey = append(compositeKey, compositeKeySep...)
compositeKey = append(compositeKey, []byte(key)...)
return compositeKey
}

func splitCompositeKey(compositeKey []byte) (string, string) {
split := bytes.SplitN(compositeKey, compositeKeySep, 2)
return string(split[0]), string(split[1])
}

type kvScanner struct {
cursor int
namespace string
results []couchdb.QueryResult
}

type committedKV struct {
key string
version *version.Height
value []byte
}

func newKVScanner(namespace string, queryResults []couchdb.QueryResult) *kvScanner {
return &kvScanner{-1, namespace, queryResults}
}

func (scanner *kvScanner) next() (*committedKV, error) {

scanner.cursor++

if scanner.cursor >= len(scanner.results) {
return nil, nil
}

selectedValue := scanner.results[scanner.cursor]

_, key := splitCompositeKey([]byte(selectedValue.ID))

//TODO - change hardcoded version when version support is available in CouchDB
return &committedKV{key, version.NewHeight(1, 1), selectedValue.Value}, nil

}

func (scanner *kvScanner) close() {

scanner = nil
}
Loading

0 comments on commit 2ebd342

Please sign in to comment.