Skip to content

Commit

Permalink
minor cleanup of pvtdatastore
Browse files Browse the repository at this point in the history
Signed-off-by: senthil <[email protected]>
  • Loading branch information
cendhu authored and ale-linux committed Sep 8, 2020
1 parent 20f0697 commit cc6dc99
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 176 deletions.
2 changes: 0 additions & 2 deletions core/ledger/pvtdatastorage/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func prepareMissingDataEntries(committingBlk uint64, missingPvtData ledger.TxMis
// and missing private.
func prepareExpiryEntries(committingBlk uint64, dataEntries []*dataEntry, missingDataEntries map[missingDataKey]*bitset.BitSet,
btlPolicy pvtdatapolicy.BTLPolicy) ([]*expiryEntry, error) {

var expiryEntries []*expiryEntry
mapByExpiringBlk := make(map[uint64]*ExpiryData)

Expand Down Expand Up @@ -152,7 +151,6 @@ func deriveKeys(expiryEntry *expiryEntry) (dataKeys []*dataKey, missingDataKeys
&missingDataKey{nsCollBlk{ns, coll, expiryEntry.key.committingBlk}, true})
missingDataKeys = append(missingDataKeys,
&missingDataKey{nsCollBlk{ns, coll, expiryEntry.key.committingBlk}, false})

}
}
return
Expand Down
38 changes: 13 additions & 25 deletions core/ledger/pvtdatastorage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,24 +276,18 @@ func (s *Store) CommitPvtDataOfOldBlocks(blocksPvtData map[uint64][]*ledger.TxPv
stateDB may not be in sync with the pvtStore`}
}

// (1) construct dataEntries for all pvtData
dataEntries := constructDataEntriesFromBlocksPvtData(blocksPvtData)

// (2) construct update entries (i.e., dataEntries, expiryEntries, missingDataEntries) from the above created data entries
logger.Debugf("Constructing pvtdatastore entries for pvtData of [%d] old blocks", len(blocksPvtData))
updateEntries, err := s.constructUpdateEntriesFromDataEntries(dataEntries)
updateEntries, err := s.constructUpdateEntries(blocksPvtData)
if err != nil {
return err
}

// (3) create a db update batch from the update entries
logger.Debug("Constructing update batch from pvtdatastore entries")
batch, err := s.constructUpdateBatchFromUpdateEntries(updateEntries)
batch, err := s.constructUpdateBatch(updateEntries)
if err != nil {
return err
}

// (4) commit the update batch to the pvtStore
logger.Debug("Committing the update batch to pvtdatastore")
if err := s.commitBatch(batch); err != nil {
return err
Expand All @@ -302,17 +296,12 @@ func (s *Store) CommitPvtDataOfOldBlocks(blocksPvtData map[uint64][]*ledger.TxPv
return nil
}

func constructDataEntriesFromBlocksPvtData(blocksPvtData map[uint64][]*ledger.TxPvtData) []*dataEntry {
// construct dataEntries for all pvtData
func (s *Store) constructUpdateEntries(blocksPvtData map[uint64][]*ledger.TxPvtData) (*entriesForPvtDataOfOldBlocks, error) {
var dataEntries []*dataEntry
for blkNum, pvtData := range blocksPvtData {
// prepare the dataEntries for the pvtData
dataEntries = append(dataEntries, prepareDataEntries(blkNum, pvtData)...)
}
return dataEntries
}

func (s *Store) constructUpdateEntriesFromDataEntries(dataEntries []*dataEntry) (*entriesForPvtDataOfOldBlocks, error) {
updateEntries := &entriesForPvtDataOfOldBlocks{
dataEntries: make(map[dataKey]*rwset.CollectionPvtReadWriteSet),
expiryEntries: make(map[expiryKey]*ExpiryData),
Expand Down Expand Up @@ -418,7 +407,6 @@ func (updateEntries *entriesForPvtDataOfOldBlocks) updateAndAddExpiryEntry(expir
}

func (updateEntries *entriesForPvtDataOfOldBlocks) updateAndAddMissingDataEntry(missingData *bitset.BitSet, dataKey *dataKey) {

txNum := dataKey.txNum
nsCollBlk := dataKey.nsCollBlk
// update
Expand All @@ -427,7 +415,7 @@ func (updateEntries *entriesForPvtDataOfOldBlocks) updateAndAddMissingDataEntry(
updateEntries.missingDataEntries[nsCollBlk] = missingData
}

func (s *Store) constructUpdateBatchFromUpdateEntries(updateEntries *entriesForPvtDataOfOldBlocks) (*leveldbhelper.UpdateBatch, error) {
func (s *Store) constructUpdateBatch(updateEntries *entriesForPvtDataOfOldBlocks) (*leveldbhelper.UpdateBatch, error) {
batch := s.db.NewUpdateBatch()

// add the following four types of entries to the update batch: (1) new data entries
Expand Down Expand Up @@ -505,12 +493,12 @@ func (s *Store) commitBatch(batch *leveldbhelper.UpdateBatch) error {
return nil
}

// GetLastUpdatedOldBlocksPvtData returns the pvtdata of blocks listed in `lastUpdatedOldBlocksList`
// TODO FAB-16293 -- GetLastUpdatedOldBlocksPvtData() can be removed either in v2.0 or in v2.1.
// If we decide to rebuild stateDB in v2.0, by default, the rebuild logic would take
// care of synching stateDB with pvtdataStore without calling GetLastUpdatedOldBlocksPvtData().
// Hence, it can be safely removed. Suppose if we decide not to rebuild stateDB in v2.0,
// we can remove this function in v2.1.
// GetLastUpdatedOldBlocksPvtData returns the pvtdata of blocks listed in `lastUpdatedOldBlocksList`
func (s *Store) GetLastUpdatedOldBlocksPvtData() (map[uint64][]*ledger.TxPvtData, error) {
if !s.isLastUpdatedOldBlocksSet {
return nil, nil
Expand Down Expand Up @@ -934,28 +922,28 @@ type collElgProcSync struct {
notification, procComplete chan bool
}

func (sync *collElgProcSync) notify() {
func (c *collElgProcSync) notify() {
select {
case sync.notification <- true:
case c.notification <- true:
logger.Debugf("Signaled to collection eligibility processing routine")
default: //noop
logger.Debugf("Previous signal still pending. Skipping new signal")
}
}

func (sync *collElgProcSync) waitForNotification() {
<-sync.notification
func (c *collElgProcSync) waitForNotification() {
<-c.notification
}

func (sync *collElgProcSync) done() {
func (c *collElgProcSync) done() {
select {
case sync.procComplete <- true:
case c.procComplete <- true:
default:
}
}

func (sync *collElgProcSync) waitForDone() {
<-sync.procComplete
func (c *collElgProcSync) waitForDone() {
<-c.procComplete
}

func (s *Store) getBitmapOfMissingDataKey(missingDataKey *missingDataKey) (*bitset.BitSet, error) {
Expand Down
Loading

0 comments on commit cc6dc99

Please sign in to comment.