Skip to content

Commit

Permalink
Raw ledger implementation
Browse files Browse the repository at this point in the history
Raw ledger provides basic functionality for storing and
retrieving blocks. This is intended to be used by an orderer service
https://jira.hyperledger.org/browse/FAB-56

Change-Id: I3fb733f5be53b6f630c20554ba4e362540b8f55a
Signed-off-by: manish <[email protected]>
  • Loading branch information
manish-sethi committed Oct 28, 2016
1 parent bb413ce commit c7badc3
Show file tree
Hide file tree
Showing 18 changed files with 531 additions and 134 deletions.
33 changes: 18 additions & 15 deletions core/chaincode/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,16 +716,18 @@ func (handler *Handler) handleRangeQueryState(msg *pb.ChaincodeMessage) {

handler.putRangeQueryIterator(txContext, iterID, rangeIter)

hasNext := rangeIter.Next()

var keysAndValues []*pb.RangeQueryStateKeyValue
var i = uint32(0)
for ; hasNext && i < maxRangeQueryStateLimit; i++ {
qresult, err := rangeIter.Get()
var qresult ledger.QueryResult
for ; i < maxRangeQueryStateLimit; i++ {
qresult, err := rangeIter.Next()
if err != nil {
chaincodeLogger.Errorf("Failed to get query result from iterator. Sending %s", pb.ChaincodeMessage_ERROR)
return
}
if qresult == nil {
break
}
//PDMP - let it panic if not KV
kv := qresult.(ledger.KV)
// Decrypt the data if the confidential is enabled
Expand All @@ -742,16 +744,14 @@ func (handler *Handler) handleRangeQueryState(msg *pb.ChaincodeMessage) {
}
keyAndValue := pb.RangeQueryStateKeyValue{Key: kv.Key, Value: decryptedValue}
keysAndValues = append(keysAndValues, &keyAndValue)

hasNext = rangeIter.Next()
}

if !hasNext {
if qresult != nil {
rangeIter.Close()
handler.deleteRangeQueryIterator(txContext, iterID)
}

payload := &pb.RangeQueryStateResponse{KeysAndValues: keysAndValues, HasMore: hasNext, ID: iterID}
payload := &pb.RangeQueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, ID: iterID}
payloadBytes, err := proto.Marshal(payload)
if err != nil {
rangeIter.Close()
Expand Down Expand Up @@ -827,13 +827,18 @@ func (handler *Handler) handleRangeQueryStateNext(msg *pb.ChaincodeMessage) {

var keysAndValues []*pb.RangeQueryStateKeyValue
var i = uint32(0)
hasNext := true
for ; hasNext && i < maxRangeQueryStateLimit; i++ {
qresult, err := rangeIter.Get()

var qresult ledger.QueryResult
var err error
for ; i < maxRangeQueryStateLimit; i++ {
qresult, err = rangeIter.Next()
if err != nil {
chaincodeLogger.Errorf("Failed to get query result from iterator. Sending %s", pb.ChaincodeMessage_ERROR)
return
}
if qresult != nil {
break
}
//PDMP - let it panic if not KV
kv := qresult.(ledger.KV)
// Decrypt the data if the confidential is enabled
Expand All @@ -850,16 +855,14 @@ func (handler *Handler) handleRangeQueryStateNext(msg *pb.ChaincodeMessage) {
}
keyAndValue := pb.RangeQueryStateKeyValue{Key: kv.Key, Value: decryptedValue}
keysAndValues = append(keysAndValues, &keyAndValue)

hasNext = rangeIter.Next()
}

if !hasNext {
if qresult != nil {
rangeIter.Close()
handler.deleteRangeQueryIterator(txContext, rangeQueryStateNext.ID)
}

payload := &pb.RangeQueryStateResponse{KeysAndValues: keysAndValues, HasMore: hasNext, ID: rangeQueryStateNext.ID}
payload := &pb.RangeQueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, ID: rangeQueryStateNext.ID}
payloadBytes, err := proto.Marshal(payload)
if err != nil {
rangeIter.Close()
Expand Down
28 changes: 27 additions & 1 deletion core/ledger/blkstorage/blockstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,41 @@ limitations under the License.
package blkstorage

import (
"errors"

"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/protos"
)

// IndexableAttr represents an indexable attribute
type IndexableAttr string

// constants for indexable attributes
const (
IndexableAttrBlockNum = IndexableAttr("BlockNum")
IndexableAttrBlockHash = IndexableAttr("BlockHash")
IndexableAttrTxID = IndexableAttr("TxID")
)

// IndexConfig - a configuration that includes a list of attributes that should be indexed
type IndexConfig struct {
AttrsToIndex []IndexableAttr
}

var (
// ErrNotFoundInIndex is used to indicate missing entry in the index
ErrNotFoundInIndex = errors.New("Entry not found in index")
// ErrAttrNotIndexed is used to indicate that an attribute is not indexed
ErrAttrNotIndexed = errors.New("Attribute not indexed")
)

// BlockStore - an interface for persisting and retrieving blocks
// An implementation of this interface is expected to take an argument
// of type `IndexConfig` which configures the block store on what items should be indexed
type BlockStore interface {
AddBlock(block *protos.Block2) error
GetBlockchainInfo() (*protos.BlockchainInfo, error)
RetrieveBlocks(startNum uint64, endNum uint64) (ledger.ResultsIterator, error)
RetrieveBlocks(startNum uint64) (ledger.ResultsIterator, error)
RetrieveBlockByHash(blockHash []byte) (*protos.Block2, error)
RetrieveBlockByNumber(blockNum uint64) (*protos.Block2, error)
RetrieveTxByID(txID string) (*protos.Transaction2, error)
Expand Down
2 changes: 1 addition & 1 deletion core/ledger/blkstorage/fsblkstorage/block_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (s *blockStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacementI
return nil, nil, err
}
logger.Debugf("blockbytes [%d] read from file [%d]", len(blockBytes), s.currentFileNum)
if blockBytes == nil && s.currentFileNum < s.endFileNum {
if blockBytes == nil && (s.currentFileNum < s.endFileNum || s.endFileNum < 0) {
logger.Debugf("current file [%d] exhausted. Moving to next file", s.currentFileNum)
if err = s.moveToNextBlockfileStream(); err != nil {
return nil, nil, err
Expand Down
65 changes: 36 additions & 29 deletions core/ledger/blkstorage/fsblkstorage/blockfile_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package fsblkstorage

import (
"fmt"
"sync"
"sync/atomic"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/ledger/blkstorage"
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/core/ledger/util/db"
"github.com/hyperledger/fabric/protos"
Expand All @@ -46,11 +48,12 @@ type blockfileMgr struct {
defaultCF *gorocksdb.ColumnFamilyHandle
index index
cpInfo *checkpointInfo
cpInfoCond *sync.Cond
currentFileWriter *blockfileWriter
bcInfo atomic.Value
}

func newBlockfileMgr(conf *Conf) *blockfileMgr {
func newBlockfileMgr(conf *Conf, indexConfig *blkstorage.IndexConfig) *blockfileMgr {
rootDir := conf.blockfilesDir
_, err := util.CreateDirIfMissing(rootDir)
if err != nil {
Expand All @@ -69,7 +72,7 @@ func newBlockfileMgr(conf *Conf) *blockfileMgr {
panic(fmt.Sprintf("Could not save next block file info to db: %s", err))
}
}
updateCPInfo(conf, cpInfo)
syncCPInfoFromFS(conf, cpInfo)
currentFileWriter, err := newBlockfileWriter(deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum))
if err != nil {
panic(fmt.Sprintf("Could not open writer to current file: %s", err))
Expand All @@ -79,9 +82,10 @@ func newBlockfileMgr(conf *Conf) *blockfileMgr {
panic(fmt.Sprintf("Could not truncate current file to known size in db: %s", err))
}

mgr.index = newBlockIndex(db, db.GetCFHandle(blockIndexCF))
mgr.index = newBlockIndex(indexConfig, db, db.GetCFHandle(blockIndexCF))
mgr.cpInfo = cpInfo
mgr.currentFileWriter = currentFileWriter
mgr.cpInfoCond = sync.NewCond(&sync.Mutex{})
mgr.syncIndex()

// init BlockchainInfo
Expand Down Expand Up @@ -119,7 +123,7 @@ func initDB(conf *Conf) *db.DB {
return dbInst
}

func updateCPInfo(conf *Conf, cpInfo *checkpointInfo) {
func syncCPInfoFromFS(conf *Conf, cpInfo *checkpointInfo) {
logger.Debugf("Starting checkpoint=%s", cpInfo)
rootDir := conf.blockfilesDir
filePath := deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum)
Expand Down Expand Up @@ -156,23 +160,24 @@ func (mgr *blockfileMgr) close() {
}

func (mgr *blockfileMgr) moveToNextFile() {
nextFileInfo := &checkpointInfo{
cpInfo := &checkpointInfo{
latestFileChunkSuffixNum: mgr.cpInfo.latestFileChunkSuffixNum + 1,
latestFileChunksize: 0}
latestFileChunksize: 0,
lastBlockNumber: mgr.cpInfo.lastBlockNumber}

nextFileWriter, err := newBlockfileWriter(
deriveBlockfilePath(mgr.rootDir, nextFileInfo.latestFileChunkSuffixNum))
deriveBlockfilePath(mgr.rootDir, cpInfo.latestFileChunkSuffixNum))

if err != nil {
panic(fmt.Sprintf("Could not open writer to next file: %s", err))
}
mgr.currentFileWriter.close()
err = mgr.saveCurrentInfo(nextFileInfo, true)
err = mgr.saveCurrentInfo(cpInfo, true)
if err != nil {
panic(fmt.Sprintf("Could not save next block file info to db: %s", err))
}
mgr.cpInfo = nextFileInfo
mgr.currentFileWriter = nextFileWriter
mgr.updateCheckpoint(cpInfo)
}

func (mgr *blockfileMgr) addBlock(block *protos.Block2) error {
Expand Down Expand Up @@ -207,26 +212,30 @@ func (mgr *blockfileMgr) addBlock(block *protos.Block2) error {
return fmt.Errorf("Error while appending block to file: %s", err)
}

mgr.cpInfo.latestFileChunksize += totalBytesToAppend
mgr.cpInfo.lastBlockNumber++
err = mgr.saveCurrentInfo(mgr.cpInfo, false)
if err != nil {
mgr.cpInfo.latestFileChunksize -= totalBytesToAppend
truncateErr := mgr.currentFileWriter.truncateFile(mgr.cpInfo.latestFileChunksize)
currentCPInfo := mgr.cpInfo
newCPInfo := &checkpointInfo{
latestFileChunkSuffixNum: currentCPInfo.latestFileChunkSuffixNum,
latestFileChunksize: currentCPInfo.latestFileChunksize + totalBytesToAppend,
lastBlockNumber: currentCPInfo.lastBlockNumber + 1}
if err = mgr.saveCurrentInfo(newCPInfo, false); err != nil {
truncateErr := mgr.currentFileWriter.truncateFile(currentCPInfo.latestFileChunksize)
if truncateErr != nil {
panic(fmt.Sprintf("Error in truncating current file to known size after an error in saving checkpoint info: %s", err))
}
return fmt.Errorf("Error while saving current file info to db: %s", err)
}
blockFLP := &fileLocPointer{fileSuffixNum: mgr.cpInfo.latestFileChunkSuffixNum}

blockFLP := &fileLocPointer{fileSuffixNum: newCPInfo.latestFileChunkSuffixNum}
blockFLP.offset = currentOffset
// shift the txoffset because we prepend length of bytes before block bytes
for i := 0; i < len(txOffsets); i++ {
txOffsets[i] += len(blockBytesEncodedLen)
}
mgr.index.indexBlock(&blockIdxInfo{
blockNum: mgr.cpInfo.lastBlockNumber, blockHash: blockHash,
blockNum: newCPInfo.lastBlockNumber, blockHash: blockHash,
flp: blockFLP, txOffsets: txOffsets})

mgr.updateCheckpoint(newCPInfo)
mgr.updateBlockchainInfo(blockHash, block)
return nil
}
Expand Down Expand Up @@ -291,6 +300,14 @@ func (mgr *blockfileMgr) getBlockchainInfo() *protos.BlockchainInfo {
return mgr.bcInfo.Load().(*protos.BlockchainInfo)
}

func (mgr *blockfileMgr) updateCheckpoint(cpInfo *checkpointInfo) {
mgr.cpInfoCond.L.Lock()
defer mgr.cpInfoCond.L.Unlock()
mgr.cpInfo = cpInfo
logger.Debugf("Broadcasting about update checkpointInfo: %s", cpInfo)
mgr.cpInfoCond.Broadcast()
}

func (mgr *blockfileMgr) updateBlockchainInfo(latestBlockHash []byte, latestBlock *protos.Block2) {
currentBCInfo := mgr.getBlockchainInfo()
newBCInfo := &protos.BlockchainInfo{
Expand Down Expand Up @@ -328,18 +345,8 @@ func (mgr *blockfileMgr) retrieveSerBlockByNumber(blockNum uint64) (*protos.SerB
return mgr.fetchSerBlock(loc)
}

func (mgr *blockfileMgr) retrieveBlocks(startNum uint64, endNum uint64) (*BlocksItr, error) {
var lp *fileLocPointer
var err error
if lp, err = mgr.index.getBlockLocByBlockNum(startNum); err != nil {
return nil, err
}
var stream *blockStream
if stream, err = newBlockStream(mgr.rootDir, lp.fileSuffixNum,
int64(lp.offset), mgr.cpInfo.latestFileChunkSuffixNum); err != nil {
return nil, err
}
return newBlockItr(stream, int(endNum-startNum)+1), nil
func (mgr *blockfileMgr) retrieveBlocks(startNum uint64) (*BlocksItr, error) {
return newBlockItr(mgr, startNum), nil
}

func (mgr *blockfileMgr) retrieveTransactionByID(txID string) (*protos.Transaction2, error) {
Expand Down
10 changes: 7 additions & 3 deletions core/ledger/blkstorage/fsblkstorage/blockfile_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,18 @@ func TestBlockfileMgrBlockIterator(t *testing.T) {

func testBlockfileMgrBlockIterator(t *testing.T, blockfileMgr *blockfileMgr,
firstBlockNum int, lastBlockNum int, expectedBlocks []*protos.Block2) {
itr, err := blockfileMgr.retrieveBlocks(uint64(firstBlockNum), uint64(lastBlockNum))
itr, err := blockfileMgr.retrieveBlocks(uint64(firstBlockNum))
defer itr.Close()
testutil.AssertNoError(t, err, "Error while getting blocks iterator")
numBlocksItrated := 0
for ; itr.Next(); numBlocksItrated++ {
block, err := itr.Get()
for {
block, err := itr.Next()
testutil.AssertNoError(t, err, fmt.Sprintf("Error while getting block number [%d] from iterator", numBlocksItrated))
testutil.AssertEquals(t, block.(*BlockHolder).GetBlock(), expectedBlocks[numBlocksItrated])
numBlocksItrated++
if numBlocksItrated == lastBlockNum-firstBlockNum+1 {
break
}
}
testutil.AssertEquals(t, numBlocksItrated, lastBlockNum-firstBlockNum+1)
}
Expand Down
Loading

0 comments on commit c7badc3

Please sign in to comment.