Skip to content

Commit

Permalink
Merge "[FAB-10513] Support pvtdata store from v1.1"
Browse files Browse the repository at this point in the history
  • Loading branch information
christo4ferris authored and Gerrit Code Review committed Jun 4, 2018
2 parents b0d5328 + f21ec45 commit 78ae92c
Show file tree
Hide file tree
Showing 10 changed files with 513 additions and 0 deletions.
53 changes: 53 additions & 0 deletions common/ledger/testutil/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
"crypto/rand"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"reflect"
"runtime"
"testing"
Expand Down Expand Up @@ -202,3 +205,53 @@ func CreateTarBytesForTest(testFiles []*TarFileEntry) []byte {
tarWriter.Close()
return buffer.Bytes()
}

// CopyDir creates a copy of a dir
func CopyDir(srcroot, destroot string) error {
_, lastSegment := filepath.Split(srcroot)
destroot = filepath.Join(destroot, lastSegment)

walkFunc := func(srcpath string, info os.FileInfo, err error) error {
srcsubpath, err := filepath.Rel(srcroot, srcpath)
if err != nil {
return err
}
destpath := filepath.Join(destroot, srcsubpath)

if info.IsDir() { // its a dir, make corresponding dir in the dest
if err = os.MkdirAll(destpath, info.Mode()); err != nil {
return err
}
return nil
}

// its a file, copy to corresponding path in the dest
if err = copyFile(srcpath, destpath); err != nil {
return err
}
return nil
}

return filepath.Walk(srcroot, walkFunc)
}

func copyFile(srcpath, destpath string) error {
var srcFile, destFile *os.File
var err error
if srcFile, err = os.Open(srcpath); err != nil {
return err
}
if destFile, err = os.Create(destpath); err != nil {
return err
}
if _, err = io.Copy(destFile, srcFile); err != nil {
return err
}
if err = srcFile.Close(); err != nil {
return err
}
if err = destFile.Close(); err != nil {
return err
}
return nil
}
5 changes: 5 additions & 0 deletions core/ledger/pvtdatastorage/store_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ func (p *provider) OpenStore(ledgerid string) (Store, error) {
if err := s.initState(); err != nil {
return nil, err
}
logger.Debugf("Pvtdata store opened. Initial state: isEmpty [%t], lastCommittedBlock [%d], batchPending [%t]",
s.isEmpty, s.lastCommittedBlock, s.batchPending)
return s, nil
}

Expand Down Expand Up @@ -197,6 +199,9 @@ func (s *store) GetPvtDataByBlockNum(blockNum uint64, filter ledger.PvtNsCollFil

for itr.Next() {
dataKeyBytes := itr.Key()
if v11Format(dataKeyBytes) {
return v11RetrievePvtdata(itr, filter)
}
dataValueBytes := itr.Value()
dataKey := decodeDatakey(dataKeyBytes)
expired, err := isExpired(dataKey, s.btlPolicy, s.lastCommittedBlock)
Expand Down
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
MANIFEST-000065
Empty file.

Large diffs are not rendered by default.

Binary file not shown.
92 changes: 92 additions & 0 deletions core/ledger/pvtdatastorage/v11.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package pvtdatastorage

import (
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
"github.com/hyperledger/fabric/protos/ledger/rwset"
)

func v11Format(datakeyBytes []byte) bool {
_, n := version.NewHeightFromBytes(datakeyBytes[1:])
remainingBytes := datakeyBytes[n+1:]
return len(remainingBytes) == 0
}

func v11DecodePK(key blkTranNumKey) (blockNum uint64, tranNum uint64) {
height, _ := version.NewHeightFromBytes(key[1:])
return height.BlockNum, height.TxNum
}

func v11DecodePvtRwSet(encodedBytes []byte) (*rwset.TxPvtReadWriteSet, error) {
writeset := &rwset.TxPvtReadWriteSet{}
return writeset, proto.Unmarshal(encodedBytes, writeset)
}

func v11RetrievePvtdata(itr *leveldbhelper.Iterator, filter ledger.PvtNsCollFilter) ([]*ledger.TxPvtData, error) {
var blkPvtData []*ledger.TxPvtData
txPvtData, err := v11DecodeKV(itr.Key(), itr.Value(), filter)
if err != nil {
return nil, err
}
blkPvtData = append(blkPvtData, txPvtData)
for itr.Next() {
pvtDatum, err := v11DecodeKV(itr.Key(), itr.Value(), filter)
if err != nil {
return nil, err
}
blkPvtData = append(blkPvtData, pvtDatum)
}
return blkPvtData, nil
}

func v11DecodeKV(k, v []byte, filter ledger.PvtNsCollFilter) (*ledger.TxPvtData, error) {
bNum, tNum := v11DecodePK(k)
var pvtWSet *rwset.TxPvtReadWriteSet
var err error
if pvtWSet, err = v11DecodePvtRwSet(v); err != nil {
return nil, err
}
logger.Debugf("Retrieved V11 private data write set for block [%d] tran [%d]", bNum, tNum)
filteredWSet := v11TrimPvtWSet(pvtWSet, filter)
return &ledger.TxPvtData{SeqInBlock: tNum, WriteSet: filteredWSet}, nil
}

func v11TrimPvtWSet(pvtWSet *rwset.TxPvtReadWriteSet, filter ledger.PvtNsCollFilter) *rwset.TxPvtReadWriteSet {
if filter == nil {
return pvtWSet
}

var filteredNsRwSet []*rwset.NsPvtReadWriteSet
for _, ns := range pvtWSet.NsPvtRwset {
var filteredCollRwSet []*rwset.CollectionPvtReadWriteSet
for _, coll := range ns.CollectionPvtRwset {
if filter.Has(ns.Namespace, coll.CollectionName) {
filteredCollRwSet = append(filteredCollRwSet, coll)
}
}
if filteredCollRwSet != nil {
filteredNsRwSet = append(filteredNsRwSet,
&rwset.NsPvtReadWriteSet{
Namespace: ns.Namespace,
CollectionPvtRwset: filteredCollRwSet,
},
)
}
}
var filteredTxPvtRwSet *rwset.TxPvtReadWriteSet
if filteredNsRwSet != nil {
filteredTxPvtRwSet = &rwset.TxPvtReadWriteSet{
DataModel: pvtWSet.GetDataModel(),
NsPvtRwset: filteredNsRwSet,
}
}
return filteredTxPvtRwSet
}
71 changes: 71 additions & 0 deletions core/ledger/pvtdatastorage/v11_V12_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package pvtdatastorage

import (
"os"
"testing"

"github.com/davecgh/go-spew/spew"
"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
btltestutil "github.com/hyperledger/fabric/core/ledger/pvtdatapolicy/testutil"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
)

// TestV11v12 test that we are able to read the mixed format data (for v11 and v12)
// from pvtdata store. This test used a pvt data store that is produced in one of the
// upgrade tests. The store contains total 15 blocks. Block number one to nine has not
// pvt data because, that time peer code was v1.0 and hence no pvt data. Block 10 contains
// a pvtdata from peer v1.1. Block 11 - 13 has not pvt data. Block 14 has pvt data from peer v1.2
func TestV11v12(t *testing.T) {
testWorkingDir := "test-working-dir"
testutil.CopyDir("testdata/v11_v12/ledgersData", testWorkingDir)
defer os.RemoveAll(testWorkingDir)

viper.Set("peer.fileSystemPath", testWorkingDir)
defer viper.Reset()

ledgerid := "ch1"
cs := btltestutil.NewMockCollectionStore()
cs.SetBTL("marbles_private", "collectionMarbles", 0)
cs.SetBTL("marbles_private", "collectionMarblePrivateDetails", 0)
btlPolicy := pvtdatapolicy.ConstructBTLPolicy(cs)

p := NewProvider()
defer p.Close()
s, err := p.OpenStore(ledgerid)
assert.NoError(t, err)
s.Init(btlPolicy)

for blk := 0; blk < 10; blk++ {
checkDataNotExists(t, s, blk)
}
checkDataExists(t, s, 10)
for blk := 11; blk < 14; blk++ {
checkDataNotExists(t, s, blk)
}
checkDataExists(t, s, 14)

_, err = s.GetPvtDataByBlockNum(uint64(15), nil)
_, ok := err.(*ErrOutOfRange)
assert.True(t, ok)
}

func checkDataNotExists(t *testing.T, s Store, blkNum int) {
data, err := s.GetPvtDataByBlockNum(uint64(blkNum), nil)
assert.NoError(t, err)
assert.Nil(t, data)
}

func checkDataExists(t *testing.T, s Store, blkNum int) {
data, err := s.GetPvtDataByBlockNum(uint64(blkNum), nil)
assert.NoError(t, err)
assert.NotNil(t, data)
t.Logf("pvtdata = %s\n", spew.Sdump(data))
}

0 comments on commit 78ae92c

Please sign in to comment.