From f21ec451b2690802cef490f5a2eba0c4f297322d Mon Sep 17 00:00:00 2001 From: manish Date: Thu, 31 May 2018 09:34:10 -0400 Subject: [PATCH] [FAB-10513] Support pvtdata store from v1.1 This CR adds support for a pvtdata store that has pvt data from both v1.1 and v1.2. See jira for more details Change-Id: Icecc8ee085a097c439e205736759785b17818348 Signed-off-by: manish --- common/ledger/testutil/test_util.go | 53 ++++ core/ledger/pvtdatastorage/store_impl.go | 5 + .../ledgersData/pvtdataStore/000002.ldb | Bin 0 -> 442 bytes .../ledgersData/pvtdataStore/000005.ldb | Bin 0 -> 360 bytes .../v11_v12/ledgersData/pvtdataStore/CURRENT | 1 + .../v11_v12/ledgersData/pvtdataStore/LOCK | 0 .../v11_v12/ledgersData/pvtdataStore/LOG | 291 ++++++++++++++++++ .../ledgersData/pvtdataStore/MANIFEST-000065 | Bin 0 -> 146 bytes core/ledger/pvtdatastorage/v11.go | 92 ++++++ core/ledger/pvtdatastorage/v11_V12_test.go | 71 +++++ 10 files changed, 513 insertions(+) create mode 100644 core/ledger/pvtdatastorage/testdata/v11_v12/ledgersData/pvtdataStore/000002.ldb create mode 100644 core/ledger/pvtdatastorage/testdata/v11_v12/ledgersData/pvtdataStore/000005.ldb create mode 100644 core/ledger/pvtdatastorage/testdata/v11_v12/ledgersData/pvtdataStore/CURRENT create mode 100644 core/ledger/pvtdatastorage/testdata/v11_v12/ledgersData/pvtdataStore/LOCK create mode 100644 core/ledger/pvtdatastorage/testdata/v11_v12/ledgersData/pvtdataStore/LOG create mode 100644 core/ledger/pvtdatastorage/testdata/v11_v12/ledgersData/pvtdataStore/MANIFEST-000065 create mode 100644 core/ledger/pvtdatastorage/v11.go create mode 100644 core/ledger/pvtdatastorage/v11_V12_test.go diff --git a/common/ledger/testutil/test_util.go b/common/ledger/testutil/test_util.go index 5cf0d41653b..c6e129c6a9e 100644 --- a/common/ledger/testutil/test_util.go +++ b/common/ledger/testutil/test_util.go @@ -22,6 +22,9 @@ import ( "crypto/rand" "encoding/json" "fmt" + "io" + "os" + "path/filepath" "reflect" "runtime" "testing" @@ -202,3 +205,53 @@ func CreateTarBytesForTest(testFiles []*TarFileEntry) []byte { tarWriter.Close() return buffer.Bytes() } + +// CopyDir creates a copy of a dir +func CopyDir(srcroot, destroot string) error { + _, lastSegment := filepath.Split(srcroot) + destroot = filepath.Join(destroot, lastSegment) + + walkFunc := func(srcpath string, info os.FileInfo, err error) error { + srcsubpath, err := filepath.Rel(srcroot, srcpath) + if err != nil { + return err + } + destpath := filepath.Join(destroot, srcsubpath) + + if info.IsDir() { // its a dir, make corresponding dir in the dest + if err = os.MkdirAll(destpath, info.Mode()); err != nil { + return err + } + return nil + } + + // its a file, copy to corresponding path in the dest + if err = copyFile(srcpath, destpath); err != nil { + return err + } + return nil + } + + return filepath.Walk(srcroot, walkFunc) +} + +func copyFile(srcpath, destpath string) error { + var srcFile, destFile *os.File + var err error + if srcFile, err = os.Open(srcpath); err != nil { + return err + } + if destFile, err = os.Create(destpath); err != nil { + return err + } + if _, err = io.Copy(destFile, srcFile); err != nil { + return err + } + if err = srcFile.Close(); err != nil { + return err + } + if err = destFile.Close(); err != nil { + return err + } + return nil +} diff --git a/core/ledger/pvtdatastorage/store_impl.go b/core/ledger/pvtdatastorage/store_impl.go index b2b2945ac1d..71ea389f6df 100644 --- a/core/ledger/pvtdatastorage/store_impl.go +++ b/core/ledger/pvtdatastorage/store_impl.go @@ -75,6 +75,8 @@ func (p *provider) OpenStore(ledgerid string) (Store, error) { if err := s.initState(); err != nil { return nil, err } + logger.Debugf("Pvtdata store opened. Initial state: isEmpty [%t], lastCommittedBlock [%d], batchPending [%t]", + s.isEmpty, s.lastCommittedBlock, s.batchPending) return s, nil } @@ -197,6 +199,9 @@ func (s *store) GetPvtDataByBlockNum(blockNum uint64, filter ledger.PvtNsCollFil for itr.Next() { dataKeyBytes := itr.Key() + if v11Format(dataKeyBytes) { + return v11RetrievePvtdata(itr, filter) + } dataValueBytes := itr.Value() dataKey := decodeDatakey(dataKeyBytes) expired, err := isExpired(dataKey, s.btlPolicy, s.lastCommittedBlock) diff --git a/core/ledger/pvtdatastorage/testdata/v11_v12/ledgersData/pvtdataStore/000002.ldb b/core/ledger/pvtdatastorage/testdata/v11_v12/ledgersData/pvtdataStore/000002.ldb new file mode 100644 index 0000000000000000000000000000000000000000..f020745b16dfd0c368e7f8aaeb722a1cf733fd49 GIT binary patch literal 442 zcmaKoO-sW-5Qb;cq|I(xo5p_7SVe+|9tt-2i3b%C6tUt#QM^c+1}vmWZBs={!K2`% z|Gq zI8i1lL}Xf7mXlv7AqUK`%%d?uo^Cmvy6<%_+nwf(6M06!=iJcuJvVBGK~0&3V>VOW z^&+R~ca1~yfG616x@~Uttw!jc+_gPx!(zGBMaWteD{xxHxqX|Au5+nnuT%#{$TOR5 z(7VQkd0uqoT7Sodqf;o;J7I^8*Zm&OtKH^3x}|EhVuiPXhvs$^wyXjDB8(^!1@U9@ us+L3vMdHiP!$KBVtbsoa%P!o*ze@b~#`hT5$KRhk;_Qt=bBSoA5 literal 0 HcmV?d00001 diff --git a/core/ledger/pvtdatastorage/testdata/v11_v12/ledgersData/pvtdataStore/000005.ldb b/core/ledger/pvtdatastorage/testdata/v11_v12/ledgersData/pvtdataStore/000005.ldb new file mode 100644 index 0000000000000000000000000000000000000000..624b33248da9aee1eabf6b0e2fd36c6afb7f3b7d GIT binary patch literal 360 zcmbQvtiiy`kep%2z`&ryz{)7V%E7>>$jZgY&A}iGny`pgoS$2eSd>_jU&Q3V?8KtN&cOPDi8ndJ zkb!|g35bEf5X56*fEdEXz$ge2004z&A$|Y= literal 0 HcmV?d00001 diff --git a/core/ledger/pvtdatastorage/v11.go b/core/ledger/pvtdatastorage/v11.go new file mode 100644 index 00000000000..8b3c9b82778 --- /dev/null +++ b/core/ledger/pvtdatastorage/v11.go @@ -0,0 +1,92 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package pvtdatastorage + +import ( + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/common/ledger/util/leveldbhelper" + "github.com/hyperledger/fabric/core/ledger" + "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version" + "github.com/hyperledger/fabric/protos/ledger/rwset" +) + +func v11Format(datakeyBytes []byte) bool { + _, n := version.NewHeightFromBytes(datakeyBytes[1:]) + remainingBytes := datakeyBytes[n+1:] + return len(remainingBytes) == 0 +} + +func v11DecodePK(key blkTranNumKey) (blockNum uint64, tranNum uint64) { + height, _ := version.NewHeightFromBytes(key[1:]) + return height.BlockNum, height.TxNum +} + +func v11DecodePvtRwSet(encodedBytes []byte) (*rwset.TxPvtReadWriteSet, error) { + writeset := &rwset.TxPvtReadWriteSet{} + return writeset, proto.Unmarshal(encodedBytes, writeset) +} + +func v11RetrievePvtdata(itr *leveldbhelper.Iterator, filter ledger.PvtNsCollFilter) ([]*ledger.TxPvtData, error) { + var blkPvtData []*ledger.TxPvtData + txPvtData, err := v11DecodeKV(itr.Key(), itr.Value(), filter) + if err != nil { + return nil, err + } + blkPvtData = append(blkPvtData, txPvtData) + for itr.Next() { + pvtDatum, err := v11DecodeKV(itr.Key(), itr.Value(), filter) + if err != nil { + return nil, err + } + blkPvtData = append(blkPvtData, pvtDatum) + } + return blkPvtData, nil +} + +func v11DecodeKV(k, v []byte, filter ledger.PvtNsCollFilter) (*ledger.TxPvtData, error) { + bNum, tNum := v11DecodePK(k) + var pvtWSet *rwset.TxPvtReadWriteSet + var err error + if pvtWSet, err = v11DecodePvtRwSet(v); err != nil { + return nil, err + } + logger.Debugf("Retrieved V11 private data write set for block [%d] tran [%d]", bNum, tNum) + filteredWSet := v11TrimPvtWSet(pvtWSet, filter) + return &ledger.TxPvtData{SeqInBlock: tNum, WriteSet: filteredWSet}, nil +} + +func v11TrimPvtWSet(pvtWSet *rwset.TxPvtReadWriteSet, filter ledger.PvtNsCollFilter) *rwset.TxPvtReadWriteSet { + if filter == nil { + return pvtWSet + } + + var filteredNsRwSet []*rwset.NsPvtReadWriteSet + for _, ns := range pvtWSet.NsPvtRwset { + var filteredCollRwSet []*rwset.CollectionPvtReadWriteSet + for _, coll := range ns.CollectionPvtRwset { + if filter.Has(ns.Namespace, coll.CollectionName) { + filteredCollRwSet = append(filteredCollRwSet, coll) + } + } + if filteredCollRwSet != nil { + filteredNsRwSet = append(filteredNsRwSet, + &rwset.NsPvtReadWriteSet{ + Namespace: ns.Namespace, + CollectionPvtRwset: filteredCollRwSet, + }, + ) + } + } + var filteredTxPvtRwSet *rwset.TxPvtReadWriteSet + if filteredNsRwSet != nil { + filteredTxPvtRwSet = &rwset.TxPvtReadWriteSet{ + DataModel: pvtWSet.GetDataModel(), + NsPvtRwset: filteredNsRwSet, + } + } + return filteredTxPvtRwSet +} diff --git a/core/ledger/pvtdatastorage/v11_V12_test.go b/core/ledger/pvtdatastorage/v11_V12_test.go new file mode 100644 index 00000000000..5a07fa1bd14 --- /dev/null +++ b/core/ledger/pvtdatastorage/v11_V12_test.go @@ -0,0 +1,71 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package pvtdatastorage + +import ( + "os" + "testing" + + "github.com/davecgh/go-spew/spew" + "github.com/hyperledger/fabric/common/ledger/testutil" + "github.com/hyperledger/fabric/core/ledger/pvtdatapolicy" + btltestutil "github.com/hyperledger/fabric/core/ledger/pvtdatapolicy/testutil" + "github.com/spf13/viper" + "github.com/stretchr/testify/assert" +) + +// TestV11v12 test that we are able to read the mixed format data (for v11 and v12) +// from pvtdata store. This test used a pvt data store that is produced in one of the +// upgrade tests. The store contains total 15 blocks. Block number one to nine has not +// pvt data because, that time peer code was v1.0 and hence no pvt data. Block 10 contains +// a pvtdata from peer v1.1. Block 11 - 13 has not pvt data. Block 14 has pvt data from peer v1.2 +func TestV11v12(t *testing.T) { + testWorkingDir := "test-working-dir" + testutil.CopyDir("testdata/v11_v12/ledgersData", testWorkingDir) + defer os.RemoveAll(testWorkingDir) + + viper.Set("peer.fileSystemPath", testWorkingDir) + defer viper.Reset() + + ledgerid := "ch1" + cs := btltestutil.NewMockCollectionStore() + cs.SetBTL("marbles_private", "collectionMarbles", 0) + cs.SetBTL("marbles_private", "collectionMarblePrivateDetails", 0) + btlPolicy := pvtdatapolicy.ConstructBTLPolicy(cs) + + p := NewProvider() + defer p.Close() + s, err := p.OpenStore(ledgerid) + assert.NoError(t, err) + s.Init(btlPolicy) + + for blk := 0; blk < 10; blk++ { + checkDataNotExists(t, s, blk) + } + checkDataExists(t, s, 10) + for blk := 11; blk < 14; blk++ { + checkDataNotExists(t, s, blk) + } + checkDataExists(t, s, 14) + + _, err = s.GetPvtDataByBlockNum(uint64(15), nil) + _, ok := err.(*ErrOutOfRange) + assert.True(t, ok) +} + +func checkDataNotExists(t *testing.T, s Store, blkNum int) { + data, err := s.GetPvtDataByBlockNum(uint64(blkNum), nil) + assert.NoError(t, err) + assert.Nil(t, data) +} + +func checkDataExists(t *testing.T, s Store, blkNum int) { + data, err := s.GetPvtDataByBlockNum(uint64(blkNum), nil) + assert.NoError(t, err) + assert.NotNil(t, data) + t.Logf("pvtdata = %s\n", spew.Sdump(data)) +}