diff --git a/core/chaincode/configer_test.go b/core/chaincode/configer_test.go index 871758130be..e769724e6e5 100644 --- a/core/chaincode/configer_test.go +++ b/core/chaincode/configer_test.go @@ -28,6 +28,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/core/chaincode/shim" + "github.com/hyperledger/fabric/core/ledger/ledgermgmt" "github.com/hyperledger/fabric/core/peer" "github.com/hyperledger/fabric/gossip/service" "github.com/hyperledger/fabric/protos/common" @@ -98,6 +99,8 @@ func TestConfigerInvokeJoinChainWrongParams(t *testing.T) { func TestConfigerInvokeJoinChainCorrectParams(t *testing.T) { //t.Skip("Test CI build") viper.Set("peer.fileSystemPath", "/var/hyperledger/test/") + ledgermgmt.InitializeTestEnv() + defer ledgermgmt.CleanupTestEnv() defer os.RemoveAll("/var/hyperledger/test/") e := new(PeerConfiger) diff --git a/core/chaincode/exectransaction_test.go b/core/chaincode/exectransaction_test.go index bbaf534f295..cb09d55b9d0 100644 --- a/core/chaincode/exectransaction_test.go +++ b/core/chaincode/exectransaction_test.go @@ -38,6 +38,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/core/crypto/primitives" + "github.com/hyperledger/fabric/core/ledger/ledgermgmt" "github.com/hyperledger/fabric/core/peer/msp" "github.com/hyperledger/fabric/msp" "github.com/hyperledger/fabric/protos/common" @@ -114,7 +115,7 @@ func finitPeer(lis net.Listener, chainIDs ...string) { } closeListenerAndSleep(lis) } - + ledgermgmt.CleanupTestEnv() ledgerPath := viper.GetString("peer.fileSystemPath") os.RemoveAll(ledgerPath) os.RemoveAll(filepath.Join(os.TempDir(), "hyperledger")) diff --git a/core/committer/committer_test.go b/core/committer/committer_test.go index c11e1a31a86..3fd3d6c6c5e 100644 --- a/core/committer/committer_test.go +++ b/core/committer/committer_test.go @@ -17,25 +17,25 @@ limitations under the License. package committer import ( - "os" "testing" - "github.com/hyperledger/fabric/core/ledger/kvledger" "github.com/hyperledger/fabric/core/ledger/testutil" + "github.com/spf13/viper" "github.com/stretchr/testify/assert" + "github.com/hyperledger/fabric/core/ledger/ledgermgmt" pb "github.com/hyperledger/fabric/protos/peer" ) func TestKVLedgerBlockStorage(t *testing.T) { - conf := kvledger.NewConf("/tmp/tests/ledger/", 0) - defer os.RemoveAll("/tmp/tests/ledger/") - - ledger, _ := kvledger.NewKVLedger(conf) + viper.Set("peer.fileSystemPath", "/tmp/fabric/committertest") + ledgermgmt.InitializeTestEnv() + defer ledgermgmt.CleanupTestEnv() + ledger, err := ledgermgmt.CreateLedger("TestLedger") + assert.NoError(t, err, "Error while creating ledger: %s", err) defer ledger.Close() committer := NewLedgerCommitter(ledger) - height, err := committer.LedgerHeight() assert.Equal(t, uint64(0), height) assert.NoError(t, err) diff --git a/core/committer/txvalidator/txvalidator_test.go b/core/committer/txvalidator/txvalidator_test.go index 0990d4ae6fb..40829c858bf 100644 --- a/core/committer/txvalidator/txvalidator_test.go +++ b/core/committer/txvalidator/txvalidator_test.go @@ -17,14 +17,14 @@ limitations under the License. package txvalidator import ( - "os" "testing" - "github.com/hyperledger/fabric/core/ledger/kvledger" + "github.com/hyperledger/fabric/core/ledger/ledgermgmt" "github.com/hyperledger/fabric/core/ledger/testutil" "github.com/hyperledger/fabric/core/ledger/util" "github.com/hyperledger/fabric/protos/common" pb "github.com/hyperledger/fabric/protos/peer" + "github.com/spf13/viper" "github.com/stretchr/testify/assert" ) @@ -36,10 +36,10 @@ func (v *mockVsccValidator) VSCCValidateTx(payload *common.Payload, envBytes []b } func TestKVLedgerBlockStorage(t *testing.T) { - conf := kvledger.NewConf("/tmp/tests/ledger/", 0) - defer os.RemoveAll("/tmp/tests/ledger/") - - ledger, _ := kvledger.NewKVLedger(conf) + viper.Set("peer.fileSystemPath", "/tmp/fabric/txvalidatortest") + ledgermgmt.InitializeTestEnv() + defer ledgermgmt.CleanupTestEnv() + ledger, _ := ledgermgmt.CreateLedger("TestLedger") defer ledger.Close() validator := &txValidator{ledger, &mockVsccValidator{}} diff --git a/core/ledger/kvledger/example/main/example.go b/core/ledger/kvledger/example/main/example.go index 3be5a1381e1..c1619129e28 100644 --- a/core/ledger/kvledger/example/main/example.go +++ b/core/ledger/kvledger/example/main/example.go @@ -21,15 +21,16 @@ import ( "os" "github.com/hyperledger/fabric/core/ledger" - "github.com/hyperledger/fabric/core/ledger/kvledger" "github.com/hyperledger/fabric/core/ledger/kvledger/example" + "github.com/hyperledger/fabric/core/ledger/ledgerconfig" + "github.com/hyperledger/fabric/core/ledger/ledgermgmt" "github.com/hyperledger/fabric/core/ledger/testutil" "github.com/hyperledger/fabric/core/ledger/util" "github.com/hyperledger/fabric/protos/common" ) const ( - ledgerPath = "/tmp/test/ledger/kvledger/example" + ledgerID = "Default" ) var finalLedger ledger.ValidatedLedger @@ -47,10 +48,10 @@ func init() { // Initialization will get a handle to the ledger at the specified path // Note, if subledgers are supported in the future, // the various ledgers could be created/managed at this level - os.RemoveAll(ledgerPath) - ledgerConf := kvledger.NewConf(ledgerPath, 0) + cleanup() + ledgermgmt.Initialize() var err error - finalLedger, err = kvledger.NewKVLedger(ledgerConf) + finalLedger, err = ledgermgmt.CreateLedger(ledgerID) if err != nil { panic(fmt.Errorf("Error in NewKVLedger(): %s", err)) } @@ -60,7 +61,7 @@ func init() { } func main() { - defer finalLedger.Close() + defer ledgermgmt.Close() // Each of the functions here will emulate endorser, orderer, // and committer by calling ledger APIs to similate the proposal, @@ -164,3 +165,8 @@ func handleError(err error, quit bool) { } } } + +func cleanup() { + ledgerRootPath := ledgerconfig.GetRootPath() + os.RemoveAll(ledgerRootPath) +} diff --git a/core/ledger/kvledger/kv_ledger.go b/core/ledger/kvledger/kv_ledger.go index 34490a88464..425fe853f38 100644 --- a/core/ledger/kvledger/kv_ledger.go +++ b/core/ledger/kvledger/kv_ledger.go @@ -19,14 +19,13 @@ package kvledger import ( "errors" "fmt" - "strings" "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/ledger/blkstorage" "github.com/hyperledger/fabric/core/ledger/blkstorage/fsblkstorage" "github.com/hyperledger/fabric/core/ledger/history" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/couchdbtxmgmt" - "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/stateleveldb" + "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr" "github.com/hyperledger/fabric/core/ledger/ledgerconfig" @@ -39,37 +38,18 @@ import ( var logger = logging.MustGetLogger("kvledger") -// Conf captures `KVLedger` configurations -type Conf struct { - blockStorageDir string - maxBlockfileSize int - txMgrDBPath string -} - -// NewConf constructs new `Conf`. -// filesystemPath is the top level directory under which `KVLedger` manages its data -func NewConf(filesystemPath string, maxBlockfileSize int) *Conf { - if !strings.HasSuffix(filesystemPath, "/") { - filesystemPath = filesystemPath + "/" - } - blocksStorageDir := filesystemPath + "blocks" - txMgrDBPath := filesystemPath + "txMgmgt/db" - return &Conf{blocksStorageDir, maxBlockfileSize, txMgrDBPath} -} - // KVLedger provides an implementation of `ledger.ValidatedLedger`. // This implementation provides a key-value based data model type KVLedger struct { + ledgerID string blockStore blkstorage.BlockStore txtmgmt txmgr.TxMgr historymgmt history.HistMgr } // NewKVLedger constructs new `KVLedger` -func NewKVLedger(conf *Conf) (*KVLedger, error) { - - logger.Debugf("Creating KVLedger using config: ", conf) - +func NewKVLedger(versionedDBProvider statedb.VersionedDBProvider, ledgerID string) (*KVLedger, error) { + logger.Debugf("Creating KVLedger ledgerID=%s: ", ledgerID) attrsToIndex := []blkstorage.IndexableAttr{ blkstorage.IndexableAttrBlockHash, blkstorage.IndexableAttrBlockNum, @@ -77,7 +57,9 @@ func NewKVLedger(conf *Conf) (*KVLedger, error) { blkstorage.IndexableAttrBlockNumTranNum, } indexConfig := &blkstorage.IndexConfig{AttrsToIndex: attrsToIndex} - blockStorageConf := fsblkstorage.NewConf(conf.blockStorageDir, conf.maxBlockfileSize) + + blockStorageDir := ledgerconfig.GetBlockStoragePath(ledgerID) + blockStorageConf := fsblkstorage.NewConf(blockStorageDir, ledgerconfig.GetMaxBlockfileSize()) blockStore := fsblkstorage.NewFsBlockStore(blockStorageConf, indexConfig) //State and History database managers @@ -91,14 +73,14 @@ func NewKVLedger(conf *Conf) (*KVLedger, error) { couchDBDef := ledgerconfig.GetCouchDBDefinition() //create new transaction manager based on couchDB - txmgmt = couchdbtxmgmt.NewCouchDBTxMgr(&couchdbtxmgmt.Conf{DBPath: conf.txMgrDBPath}, + txmgmt = couchdbtxmgmt.NewCouchDBTxMgr(&couchdbtxmgmt.Conf{DBPath: ""}, couchDBDef.URL, //couchDB connection URL "system", //couchDB db name matches ledger name, TODO for now use system ledger, eventually allow passing in subledger name couchDBDef.Username, //enter couchDB id here couchDBDef.Password) //enter couchDB pw here } else { // Fall back to using goleveldb lockbased transaction manager - db := stateleveldb.NewVersionedDBProvider(&stateleveldb.Conf{DBPath: conf.txMgrDBPath}).GetDBHandle("Default") + db := versionedDBProvider.GetDBHandle(ledgerID) txmgmt = lockbasedtxmgr.NewLockBasedTxMgr(db) } @@ -114,7 +96,7 @@ func NewKVLedger(conf *Conf) (*KVLedger, error) { couchDBDef.Password) //enter couchDB pw here } - l := &KVLedger{blockStore, txmgmt, historymgmt} + l := &KVLedger{ledgerID, blockStore, txmgmt, historymgmt} if err := recoverStateDB(l); err != nil { panic(fmt.Errorf(`Error during state DB recovery:%s`, err)) diff --git a/core/ledger/kvledger/kv_ledger_provider.go b/core/ledger/kvledger/kv_ledger_provider.go new file mode 100644 index 00000000000..675edaefabc --- /dev/null +++ b/core/ledger/kvledger/kv_ledger_provider.go @@ -0,0 +1,157 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kvledger + +import ( + "errors" + + "github.com/hyperledger/fabric/core/ledger" + "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb" + "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/stateleveldb" + "github.com/hyperledger/fabric/core/ledger/ledgerconfig" + "github.com/hyperledger/fabric/core/ledger/util/db" +) + +var ( + // ErrLedgerIDExists is thrown by a CreateLedger call if a ledger with the given id already exists + ErrLedgerIDExists = errors.New("LedgerID already exists") + // ErrNonExistingLedgerID is thrown by a OpenLedger call if a ledger with the given id does not exist + ErrNonExistingLedgerID = errors.New("LedgerID does not exist") + // ErrLedgerNotOpened is thrown by a CloseLedger call if a ledger with the given id has not been opened + ErrLedgerNotOpened = errors.New("Ledger is not opened yet") +) + +// Provider implements interface ledger.ValidatedLedgerProvider +type Provider struct { + idStore *idStore + vdbProvider statedb.VersionedDBProvider +} + +// NewProvider instantiates a new Provider. +// This is not thread-safe and assumed to be synchronized be the caller +func NewProvider() (ledger.ValidatedLedgerProvider, error) { + logger.Info("Initializing ledger provider") + var vdbProvider statedb.VersionedDBProvider + if !ledgerconfig.IsCouchDBEnabled() { + logger.Debugf("Constructing leveldb VersionedDBProvider") + vdbProvider = stateleveldb.NewVersionedDBProvider() + } else { + //TODO same for couchDB after refactoring of couchdb code + } + ledgerMgmtPath := ledgerconfig.GetLedgerProviderPath() + idStore := openIDStore(ledgerMgmtPath) + logger.Info("ledger provider Initialized") + return &Provider{idStore, vdbProvider}, nil +} + +// Create implements the corresponding method from interface ledger.ValidatedLedgerProvider +func (provider *Provider) Create(ledgerID string) (ledger.ValidatedLedger, error) { + exists, err := provider.idStore.ledgerIDExists(ledgerID) + if err != nil { + return nil, err + } + if exists { + return nil, ErrLedgerIDExists + } + provider.idStore.createLedgerID(ledgerID) + l, err := NewKVLedger(provider.vdbProvider, ledgerID) + if err != nil { + return nil, err + } + return l, nil +} + +// Open implements the corresponding method from interface ledger.ValidatedLedgerProvider +func (provider *Provider) Open(ledgerID string) (ledger.ValidatedLedger, error) { + exists, err := provider.idStore.ledgerIDExists(ledgerID) + if err != nil { + return nil, err + } + if !exists { + return nil, ErrNonExistingLedgerID + } + l, err := NewKVLedger(provider.vdbProvider, ledgerID) + if err != nil { + return nil, err + } + return l, nil +} + +// Exists implements the corresponding method from interface ledger.ValidatedLedgerProvider +func (provider *Provider) Exists(ledgerID string) (bool, error) { + return provider.idStore.ledgerIDExists(ledgerID) +} + +// List implements the corresponding method from interface ledger.ValidatedLedgerProvider +func (provider *Provider) List() ([]string, error) { + return provider.idStore.getAllLedgerIds() +} + +// Close implements the corresponding method from interface ledger.ValidatedLedgerProvider +func (provider *Provider) Close() { + provider.vdbProvider.Close() + provider.idStore.close() +} + +type idStore struct { + db *db.DB +} + +func openIDStore(path string) *idStore { + db := db.CreateDB(&db.Conf{DBPath: path}) + db.Open() + return &idStore{db} +} + +func (s *idStore) createLedgerID(ledgerID string) error { + key := []byte(ledgerID) + val := []byte{} + err := error(nil) + if val, err = s.db.Get(key); err != nil { + return err + } + if val != nil { + return ErrLedgerIDExists + } + return s.db.Put(key, val, true) +} + +func (s *idStore) ledgerIDExists(ledgerID string) (bool, error) { + key := []byte(ledgerID) + val := []byte{} + err := error(nil) + if val, err = s.db.Get(key); err != nil { + return false, err + } + return val != nil, nil +} + +func (s *idStore) getAllLedgerIds() ([]string, error) { + var ids []string + itr := s.db.GetIterator(nil, nil) + itr.First() + for itr.Valid() { + key := string(itr.Key()) + ids = append(ids, key) + itr.Next() + } + return ids, nil +} + +func (s *idStore) close() { + s.db.Close() +} diff --git a/core/ledger/kvledger/kv_ledger_provider_test.go b/core/ledger/kvledger/kv_ledger_provider_test.go new file mode 100644 index 00000000000..0b618d1dee5 --- /dev/null +++ b/core/ledger/kvledger/kv_ledger_provider_test.go @@ -0,0 +1,107 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kvledger + +import ( + "fmt" + "testing" + + "github.com/hyperledger/fabric/core/ledger" + "github.com/hyperledger/fabric/core/ledger/testutil" +) + +func TestLedgerProvider(t *testing.T) { + env := newTestEnv(t) + defer env.cleanup() + numLedgers := 10 + provider, _ := NewProvider() + existingLedgerIDs, err := provider.List() + testutil.AssertNoError(t, err, "") + testutil.AssertEquals(t, len(existingLedgerIDs), 0) + for i := 0; i < numLedgers; i++ { + provider.Create(constructTestLedgerID(i)) + } + existingLedgerIDs, err = provider.List() + testutil.AssertNoError(t, err, "") + testutil.AssertEquals(t, len(existingLedgerIDs), numLedgers) + + provider.Close() + + provider, _ = NewProvider() + defer provider.Close() + ledgerIds, _ := provider.List() + testutil.AssertEquals(t, len(ledgerIds), numLedgers) + t.Logf("ledgerIDs=%#v", ledgerIds) + for i := 0; i < numLedgers; i++ { + testutil.AssertEquals(t, ledgerIds[i], constructTestLedgerID(i)) + } + _, err = provider.Create(constructTestLedgerID(2)) + testutil.AssertEquals(t, err, ErrLedgerIDExists) + + _, err = provider.Open(constructTestLedgerID(numLedgers)) + testutil.AssertEquals(t, err, ErrNonExistingLedgerID) +} + +func TestMultipleLedgerBasicRW(t *testing.T) { + env := newTestEnv(t) + defer env.cleanup() + numLedgers := 10 + provider, _ := NewProvider() + ledgers := make([]ledger.ValidatedLedger, numLedgers) + for i := 0; i < numLedgers; i++ { + l, err := provider.Create(constructTestLedgerID(i)) + testutil.AssertNoError(t, err, "") + ledgers[i] = l + } + + for i, l := range ledgers { + s, _ := l.NewTxSimulator() + err := s.SetState("ns", "testKey", []byte(fmt.Sprintf("testValue_%d", i))) + s.Done() + testutil.AssertNoError(t, err, "") + res, err := s.GetTxSimulationResults() + testutil.AssertNoError(t, err, "") + b := testutil.ConstructBlock(t, [][]byte{res}, false) + err = l.Commit(b) + l.Close() + testutil.AssertNoError(t, err, "") + } + + provider.Close() + + provider, _ = NewProvider() + defer provider.Close() + ledgers = make([]ledger.ValidatedLedger, numLedgers) + for i := 0; i < numLedgers; i++ { + l, err := provider.Open(constructTestLedgerID(i)) + testutil.AssertNoError(t, err, "") + ledgers[i] = l + } + + for i, l := range ledgers { + q, _ := l.NewQueryExecutor() + val, err := q.GetState("ns", "testKey") + q.Done() + testutil.AssertNoError(t, err, "") + testutil.AssertEquals(t, val, []byte(fmt.Sprintf("testValue_%d", i))) + l.Close() + } +} + +func constructTestLedgerID(i int) string { + return fmt.Sprintf("ledger_%06d", i) +} diff --git a/core/ledger/kvledger/kv_ledger_test.go b/core/ledger/kvledger/kv_ledger_test.go index fd7c2a40ce4..e797247d612 100644 --- a/core/ledger/kvledger/kv_ledger_test.go +++ b/core/ledger/kvledger/kv_ledger_test.go @@ -29,8 +29,11 @@ import ( func TestKVLedgerBlockStorage(t *testing.T) { env := newTestEnv(t) defer env.cleanup() - ledger, _ := NewKVLedger(env.conf) + provider, _ := NewProvider() + defer provider.Close() + ledger, _ := provider.Create("testLedger") defer ledger.Close() + bcInfo, _ := ledger.GetBlockchainInfo() testutil.AssertEquals(t, bcInfo, &pb.BlockchainInfo{ Height: 0, CurrentBlockHash: nil, PreviousBlockHash: nil}) @@ -80,7 +83,9 @@ func TestKVLedgerBlockStorage(t *testing.T) { func TestKVLedgerStateDBRecovery(t *testing.T) { env := newTestEnv(t) defer env.cleanup() - ledger, _ := NewKVLedger(env.conf) + provider, _ := NewProvider() + defer provider.Close() + ledger, _ := provider.Create("testLedger") defer ledger.Close() bcInfo, _ := ledger.GetBlockchainInfo() @@ -117,9 +122,9 @@ func TestKVLedgerStateDBRecovery(t *testing.T) { //generating a block based on the simulation result block2 := bg.NextBlock([][]byte{simRes}, false) //performing validation of read and write set to find valid transactions - ledger.txtmgmt.ValidateAndPrepare(block2, true) + ledger.(*KVLedger).txtmgmt.ValidateAndPrepare(block2, true) //writing the validated block to block storage but not committing the transaction to state DB - err := ledger.blockStore.AddBlock(block2) + err := ledger.(*KVLedger).blockStore.AddBlock(block2) //assume that peer fails here before committing the transaction assert.NoError(t, err) @@ -140,10 +145,12 @@ func TestKVLedgerStateDBRecovery(t *testing.T) { testutil.AssertEquals(t, value, []byte("value3")) simulator.Done() ledger.Close() + provider.Close() //we assume here that the peer comes online and calls NewKVLedger to get a handler for the ledger //State DB should be recovered before returning from NewKVLedger call - ledger, _ = NewKVLedger(env.conf) + provider, _ = NewProvider() + ledger, _ = provider.Open("testLedger") simulator, _ = ledger.NewTxSimulator() value, _ = simulator.GetState("ns1", "key1") //value for 'key1' should be 'value4' after recovery @@ -155,7 +162,6 @@ func TestKVLedgerStateDBRecovery(t *testing.T) { //value for 'key3' should be 'value6' after recovery testutil.AssertEquals(t, value, []byte("value6")) simulator.Done() - ledger.Close() } func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) { @@ -168,7 +174,9 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) { env := newTestEnv(t) defer env.cleanup() - ledger, _ := NewKVLedger(env.conf) + provider, _ := NewProvider() + defer provider.Close() + ledger, _ := provider.Create("testLedger") defer ledger.Close() //testNs := "ns1" diff --git a/core/ledger/kvledger/marble_example/main/marble_example.go b/core/ledger/kvledger/marble_example/main/marble_example.go index 9932b1dbb4e..2efd611d442 100644 --- a/core/ledger/kvledger/marble_example/main/marble_example.go +++ b/core/ledger/kvledger/marble_example/main/marble_example.go @@ -18,23 +18,25 @@ package main import ( "fmt" + "os" "github.com/hyperledger/fabric/core/ledger" - "github.com/hyperledger/fabric/core/ledger/kvledger" "github.com/hyperledger/fabric/core/ledger/kvledger/example" + "github.com/hyperledger/fabric/core/ledger/ledgerconfig" + "github.com/hyperledger/fabric/core/ledger/ledgermgmt" "github.com/hyperledger/fabric/core/ledger/testutil" "github.com/hyperledger/fabric/core/ledger/util" "github.com/hyperledger/fabric/protos/common" logging "github.com/op/go-logging" ) -var logger = logging.MustGetLogger("main") - const ( - ledgerPath = "/tmp/test/ledgernext/kvledger/example" + ledgerID = "Default" ) +var logger = logging.MustGetLogger("main") + var finalLedger ledger.ValidatedLedger var marbleApp *example.MarbleApp var committer *example.Committer @@ -50,10 +52,10 @@ func init() { //call a helper method to load the core.yaml testutil.SetupCoreYAMLConfig("./../../../../../peer") - os.RemoveAll(ledgerPath) - ledgerConf := kvledger.NewConf(ledgerPath, 0) + cleanup() + ledgermgmt.Initialize() var err error - finalLedger, err = kvledger.NewKVLedger(ledgerConf) + finalLedger, err = ledgermgmt.CreateLedger(ledgerID) if err != nil { panic(fmt.Errorf("Error in NewKVLedger(): %s", err)) } @@ -63,8 +65,7 @@ func init() { } func main() { - defer finalLedger.Close() - + defer ledgermgmt.Close() // Each of the functions here will emulate endorser, orderer, // and committer by calling ledger APIs to similate the proposal, // get simulation results, create a transaction, add it to a block, @@ -119,3 +120,8 @@ func handleError(err error, quit bool) { } } } + +func cleanup() { + ledgerRootPath := ledgerconfig.GetRootPath() + os.RemoveAll(ledgerRootPath) +} diff --git a/core/ledger/kvledger/pkg_test.go b/core/ledger/kvledger/pkg_test.go index d3091c453a8..dfcdea579b5 100644 --- a/core/ledger/kvledger/pkg_test.go +++ b/core/ledger/kvledger/pkg_test.go @@ -19,26 +19,23 @@ package kvledger import ( "os" "testing" + + "github.com/hyperledger/fabric/core/ledger/ledgerconfig" + "github.com/spf13/viper" ) type testEnv struct { - conf *Conf - t testing.TB + t testing.TB } func newTestEnv(t testing.TB) *testEnv { - conf := NewConf("/tmp/tests/ledger/", 0) - os.RemoveAll(conf.blockStorageDir) - os.RemoveAll(conf.txMgrDBPath) - return &testEnv{conf, t} + viper.Set("peer.fileSystemPath", "/tmp/fabric/ledgertests") + env := &testEnv{t} + env.cleanup() + return env } func (env *testEnv) cleanup() { - os.RemoveAll(env.conf.blockStorageDir) - os.RemoveAll(env.conf.txMgrDBPath) -} - -type testLedgerWrapper struct { - ledger *KVLedger - t *testing.T + path := ledgerconfig.GetRootPath() + os.RemoveAll(path) } diff --git a/core/ledger/kvledger/txmgmt/statedb/commontests/test_common.go b/core/ledger/kvledger/txmgmt/statedb/commontests/test_common.go index 2de339f91d6..47d035ccb5c 100644 --- a/core/ledger/kvledger/txmgmt/statedb/commontests/test_common.go +++ b/core/ledger/kvledger/txmgmt/statedb/commontests/test_common.go @@ -25,9 +25,8 @@ import ( ) // TestBasicRW tests basic read-write -func TestBasicRW(t *testing.T, db statedb.VersionedDB) { - db.Open() - defer db.Close() +func TestBasicRW(t *testing.T, dbProvider statedb.VersionedDBProvider) { + db := dbProvider.GetDBHandle("TestDB") val, err := db.GetState("ns", "key1") testutil.AssertNoError(t, err, "") testutil.AssertNil(t, val) @@ -55,10 +54,46 @@ func TestBasicRW(t *testing.T, db statedb.VersionedDB) { testutil.AssertEquals(t, sp, savePoint) } +// TestMultiDBBasicRW tests basic read-write on multiple dbs +func TestMultiDBBasicRW(t *testing.T, dbProvider statedb.VersionedDBProvider) { + db1 := dbProvider.GetDBHandle("TestDB1") + db2 := dbProvider.GetDBHandle("TestDB2") + + batch1 := statedb.NewUpdateBatch() + vv1 := statedb.VersionedValue{Value: []byte("value1_db1"), Version: version.NewHeight(1, 1)} + vv2 := statedb.VersionedValue{Value: []byte("value2_db1"), Version: version.NewHeight(1, 2)} + batch1.Put("ns1", "key1", vv1.Value, vv1.Version) + batch1.Put("ns1", "key2", vv2.Value, vv2.Version) + savePoint1 := version.NewHeight(1, 2) + db1.ApplyUpdates(batch1, savePoint1) + + batch2 := statedb.NewUpdateBatch() + vv3 := statedb.VersionedValue{Value: []byte("value1_db2"), Version: version.NewHeight(1, 4)} + vv4 := statedb.VersionedValue{Value: []byte("value2_db2"), Version: version.NewHeight(1, 5)} + batch2.Put("ns1", "key1", vv3.Value, vv3.Version) + batch2.Put("ns1", "key2", vv4.Value, vv4.Version) + savePoint2 := version.NewHeight(1, 5) + db2.ApplyUpdates(batch2, savePoint2) + + vv, _ := db1.GetState("ns1", "key1") + testutil.AssertEquals(t, vv, &vv1) + + sp, err := db1.GetLatestSavePoint() + testutil.AssertNoError(t, err, "") + testutil.AssertEquals(t, sp, savePoint1) + + vv, _ = db2.GetState("ns1", "key1") + testutil.AssertEquals(t, vv, &vv3) + + sp, err = db2.GetLatestSavePoint() + testutil.AssertNoError(t, err, "") + testutil.AssertEquals(t, sp, savePoint2) +} + // TestDeletes tests deteles -func TestDeletes(t *testing.T, db statedb.VersionedDB) { - db.Open() - defer db.Close() +func TestDeletes(t *testing.T, dbProvider statedb.VersionedDBProvider) { + db := dbProvider.GetDBHandle("TestDB") + batch := statedb.NewUpdateBatch() vv1 := statedb.VersionedValue{Value: []byte("value1"), Version: version.NewHeight(1, 1)} vv2 := statedb.VersionedValue{Value: []byte("value2"), Version: version.NewHeight(1, 2)} @@ -90,7 +125,8 @@ func TestDeletes(t *testing.T, db statedb.VersionedDB) { } // TestIterator tests the iterator -func TestIterator(t *testing.T, db statedb.VersionedDB) { +func TestIterator(t *testing.T, dbProvider statedb.VersionedDBProvider) { + db := dbProvider.GetDBHandle("TestDB") db.Open() defer db.Close() batch := statedb.NewUpdateBatch() diff --git a/core/ledger/kvledger/txmgmt/statedb/statedb.go b/core/ledger/kvledger/txmgmt/statedb/statedb.go index 5cfe650bfc5..ce594a90862 100644 --- a/core/ledger/kvledger/txmgmt/statedb/statedb.go +++ b/core/ledger/kvledger/txmgmt/statedb/statedb.go @@ -20,7 +20,10 @@ import "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version" // VersionedDBProvider provides an instance of an versioned DB type VersionedDBProvider interface { + // GetDBHandle returns a handle to a VersionedDB GetDBHandle(id string) VersionedDB + // Close closes all the VersionedDB instances and releases any resources held by VersionedDBProvider + Close() } // VersionedDB lists methods that a db is supposed to implement diff --git a/core/ledger/kvledger/txmgmt/statedb/stateleveldb/config.go b/core/ledger/kvledger/txmgmt/statedb/stateleveldb/config.go new file mode 100644 index 00000000000..5649f99dd52 --- /dev/null +++ b/core/ledger/kvledger/txmgmt/statedb/stateleveldb/config.go @@ -0,0 +1,27 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package stateleveldb + +import ( + "path/filepath" + + "github.com/hyperledger/fabric/core/ledger/ledgerconfig" +) + +func getDBPath() string { + return filepath.Join(ledgerconfig.GetRootPath(), "stateleveldb") +} diff --git a/core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb.go b/core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb.go index 35cb818d45a..45044ce0f73 100644 --- a/core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb.go +++ b/core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb.go @@ -19,6 +19,8 @@ package stateleveldb import ( "bytes" + "sync" + "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version" "github.com/hyperledger/fabric/core/ledger/util/db" @@ -27,59 +29,73 @@ import ( "github.com/syndtr/goleveldb/leveldb/iterator" ) -var logger = logging.MustGetLogger("store.level") +var logger = logging.MustGetLogger("stateleveldb") var compositeKeySep = []byte{0x00} var lastKeyIndicator = byte(0x01) var savePointKey = []byte{0x00} -// Conf for leveldb backed VersionedDB implementation -type Conf struct { - DBPath string -} - // VersionedDBProvider implements interface VersionedDBProvider type VersionedDBProvider struct { - conf *Conf + db *db.DB + databases map[string]*VersionedDB + mux sync.Mutex + openCounts uint64 } // NewVersionedDBProvider instantiates VersionedDBProvider -func NewVersionedDBProvider(conf *Conf) *VersionedDBProvider { - return &VersionedDBProvider{conf} +func NewVersionedDBProvider() *VersionedDBProvider { + dbPath := getDBPath() + logger.Debugf("constructing VersionedDBProvider dbPath=%s", dbPath) + db := db.CreateDB(&db.Conf{DBPath: dbPath}) + db.Open() + logger.Debugf("Opened db dbPath=%s", dbPath) + return &VersionedDBProvider{db, make(map[string]*VersionedDB), sync.Mutex{}, 0} } // GetDBHandle gets the handle to a named database -func (provider *VersionedDBProvider) GetDBHandle(dbName string) *VersionedDB { - return newVersionedDB(provider.conf, dbName) +func (provider *VersionedDBProvider) GetDBHandle(dbName string) statedb.VersionedDB { + provider.mux.Lock() + defer provider.mux.Unlock() + vdb := provider.databases[dbName] + if vdb == nil { + vdb = newVersionedDB(provider.db, dbName) + provider.databases[dbName] = vdb + } + return vdb +} + +// Close closes the underlying db +func (provider *VersionedDBProvider) Close() { + provider.db.Close() } // VersionedDB implements VersionedDB interface type VersionedDB struct { - conf *Conf - db *db.DB + db *db.DB + dbName string } // newVersionedDB constructs an instance of VersionedDB -func newVersionedDB(conf *Conf, dbName string) *VersionedDB { - db := db.CreateDB(&db.Conf{DBPath: conf.DBPath + "/" + dbName}) - return &VersionedDB{conf, db} +func newVersionedDB(db *db.DB, dbName string) *VersionedDB { + return &VersionedDB{db, dbName} } // Open implements method in VersionedDB interface func (vdb *VersionedDB) Open() error { - vdb.db.Open() + // do nothing because shared db is used return nil } // Close implements method in VersionedDB interface func (vdb *VersionedDB) Close() { - vdb.db.Close() + // do nothing because shared db is used } // GetState implements method in VersionedDB interface func (vdb *VersionedDB) GetState(namespace string, key string) (*statedb.VersionedValue, error) { logger.Debugf("GetState(). ns=%s, key=%s", namespace, key) - compositeKey := constructCompositeKey(namespace, key) + compositeKey := constructCompositeKey(vdb.dbName, namespace, key) dbVal, err := vdb.db.Get(compositeKey) if err != nil { return nil, err @@ -106,8 +122,8 @@ func (vdb *VersionedDB) GetStateMultipleKeys(namespace string, keys []string) ([ // GetStateRangeScanIterator implements method in VersionedDB interface func (vdb *VersionedDB) GetStateRangeScanIterator(namespace string, startKey string, endKey string) (statedb.ResultsIterator, error) { - compositeStartKey := constructCompositeKey(namespace, startKey) - compositeEndKey := constructCompositeKey(namespace, endKey) + compositeStartKey := constructCompositeKey(vdb.dbName, namespace, startKey) + compositeEndKey := constructCompositeKey(vdb.dbName, namespace, endKey) if endKey == "" { compositeEndKey[len(compositeEndKey)-1] = lastKeyIndicator } @@ -124,7 +140,7 @@ func (vdb *VersionedDB) ExecuteQuery(query string) (statedb.ResultsIterator, err func (vdb *VersionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version.Height) error { levelBatch := &leveldb.Batch{} for ck, vv := range batch.KVs { - compositeKey := constructCompositeKey(ck.Namespace, ck.Key) + compositeKey := constructCompositeKey(vdb.dbName, ck.Namespace, ck.Key) logger.Debugf("processing key=%#v, versionedValue=%#v", ck, vv) if vv.Value == nil { levelBatch.Delete(compositeKey) @@ -132,7 +148,7 @@ func (vdb *VersionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version levelBatch.Put(compositeKey, encodeValue(vv.Value, vv.Version)) } } - levelBatch.Put(savePointKey, height.ToBytes()) + levelBatch.Put(constructSavepointKey(vdb.dbName), height.ToBytes()) if err := vdb.db.WriteBatch(levelBatch, false); err != nil { return err } @@ -141,7 +157,7 @@ func (vdb *VersionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version // GetLatestSavePoint implements method in VersionedDB interface func (vdb *VersionedDB) GetLatestSavePoint() (*version.Height, error) { - versionBytes, err := vdb.db.Get(savePointKey) + versionBytes, err := vdb.db.Get(constructSavepointKey(vdb.dbName)) if err != nil { return nil, err } @@ -163,16 +179,23 @@ func decodeValue(encodedValue []byte) ([]byte, *version.Height) { return value, version } -func constructCompositeKey(ns string, key string) []byte { - compositeKey := []byte(ns) +func constructCompositeKey(dbName string, ns string, key string) []byte { + compositeKey := []byte(dbName) + compositeKey = append(compositeKey, compositeKeySep...) + compositeKey = append(compositeKey, []byte(ns)...) compositeKey = append(compositeKey, compositeKeySep...) compositeKey = append(compositeKey, []byte(key)...) return compositeKey } -func splitCompositeKey(compositeKey []byte) (string, string) { - split := bytes.SplitN(compositeKey, compositeKeySep, 2) - return string(split[0]), string(split[1]) +func splitCompositeKey(compositeKey []byte) (string, string, string) { + split := bytes.SplitN(compositeKey, compositeKeySep, 3) + return string(split[0]), string(split[1]), string(split[2]) +} + +func constructSavepointKey(dbName string) []byte { + key := savePointKey + return append(key, []byte(dbName)...) } type kvScanner struct { @@ -188,7 +211,7 @@ func (scanner *kvScanner) Next() (*statedb.VersionedKV, error) { if !scanner.dbItr.Next() { return nil, nil } - _, key := splitCompositeKey(scanner.dbItr.Key()) + _, _, key := splitCompositeKey(scanner.dbItr.Key()) value, version := decodeValue(scanner.dbItr.Value()) return &statedb.VersionedKV{ CompositeKey: statedb.CompositeKey{Namespace: scanner.namespace, Key: key}, diff --git a/core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb_test.go b/core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb_test.go index b628b457dc7..d7a4fdd4fae 100644 --- a/core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb_test.go +++ b/core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb_test.go @@ -17,31 +17,42 @@ limitations under the License. package stateleveldb import ( + "os" "testing" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/commontests" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version" "github.com/hyperledger/fabric/core/ledger/testutil" + "github.com/spf13/viper" ) -var testDBPath = "/tmp/fabric/core/ledger/versioneddb/levelimpl" +func TestMain(m *testing.M) { + viper.Set("peer.fileSystemPath", "/tmp/fabric/ledgertests") + os.Exit(m.Run()) +} func TestBasicRW(t *testing.T) { - env := NewTestVDBEnv(t, testDBPath) + env := NewTestVDBEnv(t) + defer env.Cleanup() + commontests.TestBasicRW(t, env.DBProvider) +} + +func TestMultiDBBasicRW(t *testing.T) { + env := NewTestVDBEnv(t) defer env.Cleanup() - commontests.TestBasicRW(t, env.DB) + commontests.TestMultiDBBasicRW(t, env.DBProvider) } func TestDeletes(t *testing.T) { - env := NewTestVDBEnv(t, testDBPath) + env := NewTestVDBEnv(t) defer env.Cleanup() - commontests.TestDeletes(t, env.DB) + commontests.TestDeletes(t, env.DBProvider) } func TestIterator(t *testing.T) { - env := NewTestVDBEnv(t, testDBPath) + env := NewTestVDBEnv(t) defer env.Cleanup() - commontests.TestIterator(t, env.DB) + commontests.TestIterator(t, env.DBProvider) } func TestEncodeDecodeValueAndVersion(t *testing.T) { @@ -57,14 +68,15 @@ func testValueAndVersionEncodeing(t *testing.T, value []byte, version *version.H } func TestCompositeKey(t *testing.T) { - testCompositeKey(t, "ns", "key") - testCompositeKey(t, "ns", "") + testCompositeKey(t, "ledger1", "ns", "key") + testCompositeKey(t, "ledger2", "ns", "") } -func testCompositeKey(t *testing.T, ns string, key string) { - compositeKey := constructCompositeKey(ns, key) +func testCompositeKey(t *testing.T, dbName string, ns string, key string) { + compositeKey := constructCompositeKey(dbName, ns, key) t.Logf("compositeKey=%#v", compositeKey) - ns1, key1 := splitCompositeKey(compositeKey) + dbName1, ns1, key1 := splitCompositeKey(compositeKey) + testutil.AssertEquals(t, dbName1, dbName) testutil.AssertEquals(t, ns1, ns) testutil.AssertEquals(t, key1, key) } diff --git a/core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb_test_export.go b/core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb_test_export.go index d102a0d75e6..aeb5dd750f0 100644 --- a/core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb_test_export.go +++ b/core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb_test_export.go @@ -25,20 +25,30 @@ import ( // TestVDBEnv provides a level db backed versioned db for testing type TestVDBEnv struct { - DBPath string - DB statedb.VersionedDB + t testing.TB + DBProvider statedb.VersionedDBProvider } // NewTestVDBEnv instantiates and new level db backed TestVDB -func NewTestVDBEnv(t testing.TB, dbPath string) *TestVDBEnv { - os.RemoveAll(dbPath) - db := NewVersionedDBProvider(&Conf{DBPath: dbPath}).GetDBHandle("testDB") - db.Open() - return &TestVDBEnv{dbPath, db} +func NewTestVDBEnv(t testing.TB) *TestVDBEnv { + t.Logf("Creating new TestVDBEnv") + removeDBPath(t, "NewTestVDBEnv") + dbProvider := NewVersionedDBProvider() + return &TestVDBEnv{t, dbProvider} } // Cleanup closes the db and removes the db folder func (env *TestVDBEnv) Cleanup() { - env.DB.Close() - os.RemoveAll(env.DBPath) + env.t.Logf("Cleaningup TestVDBEnv") + env.DBProvider.Close() + removeDBPath(env.t, "Cleanup") +} + +func removeDBPath(t testing.TB, caller string) { + dbPath := getDBPath() + if err := os.RemoveAll(dbPath); err != nil { + t.Fatalf("Err: %s", err) + t.FailNow() + } + logger.Debugf("Removed folder [%s] for test environment for %s", dbPath, caller) } diff --git a/core/ledger/kvledger/txmgmt/txmgr/commontests/pkg_test.go b/core/ledger/kvledger/txmgmt/txmgr/commontests/pkg_test.go index 58baac7755b..dfec7f6f77f 100644 --- a/core/ledger/kvledger/txmgmt/txmgr/commontests/pkg_test.go +++ b/core/ledger/kvledger/txmgmt/txmgr/commontests/pkg_test.go @@ -26,6 +26,7 @@ import ( "github.com/hyperledger/fabric/core/ledger/testutil" "github.com/hyperledger/fabric/core/ledger/util" "github.com/hyperledger/fabric/protos/common" + "github.com/spf13/viper" ) type testEnv interface { @@ -40,6 +41,7 @@ var testEnvs = []testEnv{&levelDBLockBasedEnv{}} type levelDBLockBasedEnv struct { testDBEnv *stateleveldb.TestVDBEnv + testDB statedb.VersionedDB txmgr txmgr.TxMgr } @@ -48,10 +50,12 @@ func (env *levelDBLockBasedEnv) getName() string { } func (env *levelDBLockBasedEnv) init(t *testing.T) { - testDBPath := "/tmp/fabric/core/ledger/kvledger/txmgmt/lockbasedtxmgmt" - testDBEnv := stateleveldb.NewTestVDBEnv(t, testDBPath) - txMgr := lockbasedtxmgr.NewLockBasedTxMgr(testDBEnv.DB) + viper.Set("peer.fileSystemPath", "/tmp/fabric/ledgertests") + testDBEnv := stateleveldb.NewTestVDBEnv(t) + testDB := testDBEnv.DBProvider.GetDBHandle("TestDB") + txMgr := lockbasedtxmgr.NewLockBasedTxMgr(testDB) env.testDBEnv = testDBEnv + env.testDB = testDB env.txmgr = txMgr } @@ -60,7 +64,7 @@ func (env *levelDBLockBasedEnv) getTxMgr() txmgr.TxMgr { } func (env *levelDBLockBasedEnv) getVDB() statedb.VersionedDB { - return env.testDBEnv.DB + return env.testDB } func (env *levelDBLockBasedEnv) cleanup() { diff --git a/core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator_test.go b/core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator_test.go index a97cac222db..bbcb66d2c87 100644 --- a/core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator_test.go +++ b/core/ledger/kvledger/txmgmt/validator/statebasedval/state_based_validator_test.go @@ -17,6 +17,7 @@ limitations under the License. package statebasedval import ( + "os" "testing" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwset" @@ -26,15 +27,19 @@ import ( "github.com/hyperledger/fabric/core/ledger/testutil" "github.com/hyperledger/fabric/core/ledger/util" "github.com/hyperledger/fabric/protos/common" + "github.com/spf13/viper" ) -var testDBPath = "/tmp/fabric/core/ledger/kvledger/txmgmt/validator/statebasedval" +func TestMain(m *testing.M) { + viper.Set("peer.fileSystemPath", "/tmp/fabric/ledgertests") + os.Exit(m.Run()) +} func TestValidator(t *testing.T) { - testDBEnv := stateleveldb.NewTestVDBEnv(t, testDBPath) + testDBEnv := stateleveldb.NewTestVDBEnv(t) defer testDBEnv.Cleanup() - db := testDBEnv.DB + db := testDBEnv.DBProvider.GetDBHandle("TestDB") //populate db with initial data batch := statedb.NewUpdateBatch() batch.Put("ns1", "key1", []byte("value1"), version.NewHeight(1, 1)) diff --git a/core/ledger/ledger_interface.go b/core/ledger/ledger_interface.go index 3871a9ae6c7..2d8cac0824f 100644 --- a/core/ledger/ledger_interface.go +++ b/core/ledger/ledger_interface.go @@ -45,6 +45,20 @@ type RawLedger interface { CommitBlock(block *common.Block) error } +// ValidatedLedgerProvider provides handle to ledger instances +type ValidatedLedgerProvider interface { + // CreateLedger creates a new ledger with a given unique id + Create(ledgerID string) (ValidatedLedger, error) + // OpenLedger opens an already created ledger + Open(ledgerID string) (ValidatedLedger, error) + // Exists tells whether the ledger with given id exits + Exists(ledgerID string) (bool, error) + // List lists the ids of the existing ledgers + List() ([]string, error) + // Close closes the ValidatedLedgerProvider + Close() +} + // ValidatedLedger represents the 'final ledger'. In addition to implement the methods inherited from the Ledger, // it provides the handle to objects for querying the state and executing transactions. type ValidatedLedger interface { diff --git a/core/ledger/ledgerconfig/ledger_config.go b/core/ledger/ledgerconfig/ledger_config.go index 6ed15d60e55..c600326ee50 100644 --- a/core/ledger/ledgerconfig/ledger_config.go +++ b/core/ledger/ledgerconfig/ledger_config.go @@ -16,7 +16,11 @@ limitations under the License. package ledgerconfig -import "github.com/spf13/viper" +import ( + "path/filepath" + + "github.com/spf13/viper" +) var stateDatabase = "goleveldb" var couchDBAddress = "127.0.0.1:5984" @@ -40,6 +44,39 @@ func IsCouchDBEnabled() bool { return false } +// GetRootPath returns the filesystem path. +// All ledger related contents are expected to be stored under this path +func GetRootPath() string { + sysPath := viper.GetString("peer.fileSystemPath") + return filepath.Join(sysPath, "ledgersData") +} + +// GetLedgersPath returns the filesystem path that further contains sub-directories. +// Each sub-directory for each specific ledger and the name of the sub-directory is the ledgerid +func GetLedgersPath() string { + return filepath.Join(GetRootPath(), "ledgers") +} + +// GetLedgerPath returns the filesystem path for stroing ledger specific contents +func GetLedgerPath(ledgerID string) string { + return filepath.Join(GetLedgersPath(), ledgerID) +} + +// GetBlockStoragePath returns the path for storing blocks for a specific ledger +func GetBlockStoragePath(ledgerID string) string { + return filepath.Join(GetLedgerPath(ledgerID), "blocks") +} + +// GetLedgerProviderPath returns the filesystem path for stroing ledger ledgerProvider contents +func GetLedgerProviderPath() string { + return filepath.Join(GetRootPath(), "ledgerProvider") +} + +// GetMaxBlockfileSize returns the maximum size of the block file +func GetMaxBlockfileSize() int { + return 0 +} + //GetCouchDBDefinition exposes the useCouchDB variable func GetCouchDBDefinition() *CouchDBDef { diff --git a/core/ledger/ledgermgmt/ledger_mgmt.go b/core/ledger/ledgermgmt/ledger_mgmt.go new file mode 100644 index 00000000000..6ec895615fc --- /dev/null +++ b/core/ledger/ledgermgmt/ledger_mgmt.go @@ -0,0 +1,151 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ledgermgmt + +import ( + "errors" + "sync" + + "fmt" + + "github.com/hyperledger/fabric/core/ledger" + "github.com/hyperledger/fabric/core/ledger/kvledger" + logging "github.com/op/go-logging" +) + +var logger = logging.MustGetLogger("ledgermgmt") + +// ErrLedgerAlreadyOpened is thrown by a CreateLedger call if a ledger with the given id is already opened +var ErrLedgerAlreadyOpened = errors.New("Ledger already opened") + +// ErrLedgerMgmtNotInitialized is thrown when ledger mgmt is used before initializing this +var ErrLedgerMgmtNotInitialized = errors.New("ledger mgmt should be initialized before using") + +var openedLedgers map[string]ledger.ValidatedLedger +var ledgerProvider ledger.ValidatedLedgerProvider +var lock sync.Mutex +var initialized bool +var once sync.Once + +// Initialize initializes ledgermgmt +func Initialize() { + once.Do(func() { + initialize() + }) +} + +func initialize() { + logger.Info("Initializing ledger mgmt") + lock.Lock() + defer lock.Unlock() + initialized = true + openedLedgers = make(map[string]ledger.ValidatedLedger) + provider, err := kvledger.NewProvider() + if err != nil { + panic(fmt.Errorf("Error in instantiating ledger provider: %s", err)) + } + ledgerProvider = provider + logger.Info("ledger mgmt initialized") +} + +// CreateLedger creates a new ledger with the given id +func CreateLedger(id string) (ledger.ValidatedLedger, error) { + logger.Infof("Creating leadger with id = %s", id) + lock.Lock() + defer lock.Unlock() + if !initialized { + return nil, ErrLedgerMgmtNotInitialized + } + l, err := ledgerProvider.Create(id) + if err != nil { + return nil, err + } + l = wrapLedger(id, l) + openedLedgers[id] = l + logger.Infof("Created leadger with id = %s", id) + return l, nil +} + +// OpenLedger returns a ledger for the given id +func OpenLedger(id string) (ledger.ValidatedLedger, error) { + logger.Infof("Opening leadger with id = %s", id) + lock.Lock() + defer lock.Unlock() + if !initialized { + return nil, ErrLedgerMgmtNotInitialized + } + l, ok := openedLedgers[id] + if ok { + return nil, ErrLedgerAlreadyOpened + } + l, err := ledgerProvider.Open(id) + if err != nil { + return nil, err + } + l = wrapLedger(id, l) + openedLedgers[id] = l + logger.Infof("Opened leadger with id = %s", id) + return l, nil +} + +// GetLedgerIDs returns the ids of the ledgers created +func GetLedgerIDs() ([]string, error) { + lock.Lock() + defer lock.Unlock() + if !initialized { + return nil, ErrLedgerMgmtNotInitialized + } + return ledgerProvider.List() +} + +// Close closes all the opened ledgers and any resources held for ledger management +func Close() { + logger.Infof("Closing ledger mgmt") + lock.Lock() + defer lock.Unlock() + if !initialized { + return + } + for _, l := range openedLedgers { + l.(*ClosableLedger).closeWithoutLock() + } + ledgerProvider.Close() + openedLedgers = nil + logger.Infof("ledger mgmt closed") +} + +func wrapLedger(id string, l ledger.ValidatedLedger) ledger.ValidatedLedger { + return &ClosableLedger{id, l} +} + +// ClosableLedger extends from actual validated ledger and overwrites the Close method +type ClosableLedger struct { + id string + ledger.ValidatedLedger +} + +// Close closes the actual ledger and removes the entries from opened ledgers map +func (l *ClosableLedger) Close() { + lock.Lock() + defer lock.Unlock() + l.closeWithoutLock() +} + +func (l *ClosableLedger) closeWithoutLock() { + l.ValidatedLedger.Close() + delete(openedLedgers, l.id) +} diff --git a/core/ledger/ledgermgmt/ledger_mgmt_test.go b/core/ledger/ledgermgmt/ledger_mgmt_test.go new file mode 100644 index 00000000000..d9f9df3299d --- /dev/null +++ b/core/ledger/ledgermgmt/ledger_mgmt_test.go @@ -0,0 +1,62 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ledgermgmt + +import ( + "fmt" + "testing" + + "github.com/hyperledger/fabric/core/ledger" + "github.com/hyperledger/fabric/core/ledger/testutil" +) + +func TestLedgerMgmt(t *testing.T) { + InitializeTestEnv() + defer CleanupTestEnv() + + numLedgers := 10 + ledgers := make([]ledger.ValidatedLedger, 10) + for i := 0; i < numLedgers; i++ { + l, _ := CreateLedger(constructTestLedgerID(i)) + ledgers[i] = l + } + + ledgerID := constructTestLedgerID(2) + t.Logf("Ledger selected for test = %s", ledgerID) + _, err := OpenLedger(ledgerID) + testutil.AssertEquals(t, err, ErrLedgerAlreadyOpened) + + l := ledgers[2] + l.Close() + l, err = OpenLedger(ledgerID) + testutil.AssertNoError(t, err, "") + + l, err = OpenLedger(ledgerID) + testutil.AssertEquals(t, err, ErrLedgerAlreadyOpened) + + // close all opened ledgers and ledger mgmt + Close() + // Restart ledger mgmt with existing ledgers + initialize() + l, err = OpenLedger(ledgerID) + testutil.AssertNoError(t, err, "") + Close() +} + +func constructTestLedgerID(i int) string { + return fmt.Sprintf("ledger_%06d", i) +} diff --git a/core/ledger/ledgermgmt/ledger_mgmt_test_exports.go b/core/ledger/ledgermgmt/ledger_mgmt_test_exports.go new file mode 100644 index 00000000000..7fdacbcb692 --- /dev/null +++ b/core/ledger/ledgermgmt/ledger_mgmt_test_exports.go @@ -0,0 +1,43 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ledgermgmt + +import ( + "os" + + "github.com/hyperledger/fabric/core/ledger/ledgerconfig" +) + +// InitializeTestEnv initializes ledgermgmt for tests +func InitializeTestEnv() { + remove() + initialize() +} + +// CleanupTestEnv closes the ledgermagmt and removes the store directory +func CleanupTestEnv() { + Close() + remove() +} + +func remove() { + path := ledgerconfig.GetRootPath() + err := os.RemoveAll(path) + if err != nil { + logger.Errorf("Error: %s", err) + } +} diff --git a/core/ledger/util/db/db.go b/core/ledger/util/db/db.go index e832594300b..f9919adb5a4 100644 --- a/core/ledger/util/db/db.go +++ b/core/ledger/util/db/db.go @@ -81,7 +81,7 @@ func (dbInst *DB) Open() { var err error var dirEmpty bool if dirEmpty, err = util.CreateDirIfMissing(dbPath); err != nil { - panic(fmt.Sprintf("Error while trying to open DB: %s", err)) + panic(fmt.Sprintf("Error while trying to create dir if missing: %s", err)) } dbOpts.ErrorIfMissing = !dirEmpty if dbInst.db, err = leveldb.OpenFile(dbPath, dbOpts); err != nil { @@ -97,7 +97,9 @@ func (dbInst *DB) Close() { if dbInst.dbState == closed { return } - dbInst.db.Close() + if err := dbInst.db.Close(); err != nil { + logger.Errorf("Error while closing DB: %s", err) + } dbInst.dbState = closed } diff --git a/core/ledger/util/ioutil.go b/core/ledger/util/ioutil.go index 07a52ca201d..91ed26228c1 100644 --- a/core/ledger/util/ioutil.go +++ b/core/ledger/util/ioutil.go @@ -34,11 +34,13 @@ func CreateDirIfMissing(dirPath string) (bool, error) { dirPath = dirPath + "/" } logger.Debugf("CreateDirIfMissing [%s]", dirPath) + logDirStatus("Before creating dir", dirPath) err := os.MkdirAll(path.Dir(dirPath), 0755) if err != nil { logger.Debugf("Error while creating dir [%s]", dirPath) return false, err } + logDirStatus("After creating dir", dirPath) return DirEmpty(dirPath) } @@ -67,3 +69,15 @@ func FileExists(filePath string) (bool, int64, error) { } return true, fileInfo.Size(), err } + +func logDirStatus(msg string, dirPath string) { + exists, _, err := FileExists(dirPath) + if err != nil { + logger.Errorf("Error while checking for dir existance") + } + if exists { + logger.Debugf("%s - [%s] exists", msg, dirPath) + } else { + logger.Debugf("%s - [%s] does not exist", msg, dirPath) + } +} diff --git a/core/peer/peer.go b/core/peer/peer.go index 2834f66c8ba..2514f9372c0 100644 --- a/core/peer/peer.go +++ b/core/peer/peer.go @@ -18,10 +18,8 @@ package peer import ( "fmt" - "io/ioutil" "math" "net" - "path/filepath" "sync" "google.golang.org/grpc" @@ -31,7 +29,8 @@ import ( "github.com/hyperledger/fabric/core/comm" "github.com/hyperledger/fabric/core/committer" - "github.com/hyperledger/fabric/core/ledger/kvledger" + "github.com/hyperledger/fabric/core/ledger" + "github.com/hyperledger/fabric/core/ledger/ledgermgmt" "github.com/hyperledger/fabric/core/peer/msp" "github.com/hyperledger/fabric/gossip/service" "github.com/hyperledger/fabric/msp" @@ -44,7 +43,7 @@ var peerLogger = logging.MustGetLogger("peer") // chain is a local struct to manage objects in a chain type chain struct { cb *common.Block - ledger *kvledger.KVLedger + ledger ledger.ValidatedLedger committer committer.Committer mspmgr msp.MSPManager } @@ -57,6 +56,7 @@ var chains = struct { //MockInitialize resets chains for test env func MockInitialize() { + ledgermgmt.InitializeTestEnv() chains.list = nil chains.list = make(map[string]*chain) } @@ -66,25 +66,16 @@ func MockInitialize() { // ready func Initialize() { //Till JoinChain works, we continue to use default chain - path := getLedgerPath("") - - peerLogger.Infof("Init peer by loading chains from %s", path) - files, err := ioutil.ReadDir(path) + var cb *common.Block + var ledger ledger.ValidatedLedger + ledgermgmt.Initialize() + ledgerIds, err := ledgermgmt.GetLedgerIDs() if err != nil { - peerLogger.Debug("Must be no chains created yet, err %s", err) - - // We just continue. The ledger will create the directory later - return + panic(fmt.Errorf("Error in initializing ledgermgmt: %s", err)) } - - // File name is the name of the chain that we will use to initialize - var cb *common.Block - var cid string - var ledger *kvledger.KVLedger - for _, file := range files { - cid = file.Name() + for _, cid := range ledgerIds { peerLogger.Infof("Loading chain %s", cid) - if ledger, err = createLedger(cid); err != nil { + if ledger, err = ledgermgmt.OpenLedger(cid); err != nil { peerLogger.Warning("Failed to load ledger %s", cid) peerLogger.Debug("Error while loading ledger %s with message %s. We continue to the next ledger rather than abort.", cid, err) continue @@ -102,7 +93,7 @@ func Initialize() { } } -func getCurrConfigBlockFromLedger(ledger *kvledger.KVLedger) (*common.Block, error) { +func getCurrConfigBlockFromLedger(ledger ledger.ValidatedLedger) (*common.Block, error) { // Configuration blocks contain only 1 transaction, so we look for 1-tx // blocks and check the transaction type var envelope *common.Envelope @@ -135,7 +126,7 @@ func getCurrConfigBlockFromLedger(ledger *kvledger.KVLedger) (*common.Block, err } // createChain creates a new chain object and insert it into the chains -func createChain(cid string, ledger *kvledger.KVLedger, cb *common.Block) error { +func createChain(cid string, ledger ledger.ValidatedLedger, cb *common.Block) error { c := committer.NewLedgerCommitter(ledger) mgr, err := mspmgmt.GetMSPManagerFromBlock(cb) @@ -159,7 +150,7 @@ func CreateChainFromBlock(cb *common.Block) error { if err != nil { return err } - var ledger *kvledger.KVLedger + var ledger ledger.ValidatedLedger if ledger, err = createLedger(cid); err != nil { return err } @@ -170,7 +161,7 @@ func CreateChainFromBlock(cb *common.Block) error { // MockCreateChain used for creating a ledger for a chain for tests // without havin to join func MockCreateChain(cid string) error { - var ledger *kvledger.KVLedger + var ledger ledger.ValidatedLedger var err error if ledger, err = createLedger(cid); err != nil { return err @@ -185,7 +176,7 @@ func MockCreateChain(cid string) error { // GetLedger returns the ledger of the chain with chain ID. Note that this // call returns nil if chain cid has not been created. -func GetLedger(cid string) *kvledger.KVLedger { +func GetLedger(cid string) ledger.ValidatedLedger { chains.RLock() defer chains.RUnlock() if c, ok := chains.list[cid]; ok { @@ -232,27 +223,12 @@ func SetCurrConfigBlock(block *common.Block, cid string) error { } // All ledgers are located under `peer.fileSystemPath` -func createLedger(cid string) (*kvledger.KVLedger, error) { - var ledger *kvledger.KVLedger - var err error +func createLedger(cid string) (ledger.ValidatedLedger, error) { + var ledger ledger.ValidatedLedger if ledger = GetLedger(cid); ledger != nil { return ledger, nil } - loc := getLedgerPath(cid) - if ledger, err = kvledger.NewKVLedger(kvledger.NewConf(loc, 0)); err != nil { - // hmm let's try 1 more time - if ledger, err = kvledger.NewKVLedger(kvledger.NewConf(loc, 0)); err != nil { - // this peer is no longer reliable, so we exit with error - return nil, fmt.Errorf("Failed to create ledger for chain %s with error %s", cid, err) - } - } - return ledger, nil -} - -// Ledger path is system path with "ledgers/" appended -func getLedgerPath(cid string) string { - sysPath := viper.GetString("peer.fileSystemPath") - return filepath.Join(sysPath, "ledgers", cid) + return ledgermgmt.CreateLedger(cid) } // NewPeerClientConnection Returns a new grpc.ClientConn to the configured local PEER. diff --git a/gossip/state/state_test.go b/gossip/state/state_test.go index 94d4fb8aa29..9f0b61f38a2 100644 --- a/gossip/state/state_test.go +++ b/gossip/state/state_test.go @@ -19,20 +19,20 @@ package state import ( "bytes" "fmt" - "os" "strconv" "testing" "time" pb "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/core/committer" - "github.com/hyperledger/fabric/core/ledger/kvledger" + "github.com/hyperledger/fabric/core/ledger/ledgermgmt" "github.com/hyperledger/fabric/gossip/api" "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/gossip" "github.com/hyperledger/fabric/gossip/proto" pcomm "github.com/hyperledger/fabric/protos/common" "github.com/op/go-logging" + "github.com/spf13/viper" ) var ( @@ -123,9 +123,8 @@ func newGossipInstance(config *gossip.Config) gossip.Gossip { } // Create new instance of KVLedger to be used for testing -func newCommitter(id int, basePath string) committer.Committer { - conf := kvledger.NewConf(basePath+strconv.Itoa(id), 0) - ledger, _ := kvledger.NewKVLedger(conf) +func newCommitter(id int) committer.Committer { + ledger, _ := ledgermgmt.CreateLedger(strconv.Itoa(id)) return committer.NewLedgerCommitter(ledger) } @@ -225,14 +224,15 @@ func TestNewGossipStateProvider_RepeatGossipingOneMessage(t *testing.T) { }*/ func TestNewGossipStateProvider_SendingManyMessages(t *testing.T) { - ledgerPath := "/tmp/tests/ledger/" - defer os.RemoveAll(ledgerPath) + viper.Set("peer.fileSystemPath", "/tmp/tests/ledger/node") + ledgermgmt.InitializeTestEnv() + defer ledgermgmt.CleanupTestEnv() bootstrapSetSize := 5 bootstrapSet := make([]*peerNode, 0) for i := 0; i < bootstrapSetSize; i++ { - committer := newCommitter(i, ledgerPath+"node/") + committer := newCommitter(i) bootstrapSet = append(bootstrapSet, newPeerNode(newGossipConfig(i, 100), committer)) } @@ -258,7 +258,7 @@ func TestNewGossipStateProvider_SendingManyMessages(t *testing.T) { peersSet := make([]*peerNode, 0) for i := 0; i < standartPeersSize; i++ { - committer := newCommitter(standartPeersSize+i, ledgerPath+"node/") + committer := newCommitter(standartPeersSize + i) peersSet = append(peersSet, newPeerNode(newGossipConfig(standartPeersSize+i, 100, 0, 1, 2, 3, 4), committer)) } diff --git a/peer/node/start.go b/peer/node/start.go index df145e4fae2..1018c2750f8 100755 --- a/peer/node/start.go +++ b/peer/node/start.go @@ -32,6 +32,7 @@ import ( "github.com/hyperledger/fabric/core/comm" "github.com/hyperledger/fabric/core/committer/noopssinglechain" "github.com/hyperledger/fabric/core/endorser" + "github.com/hyperledger/fabric/core/ledger/ledgermgmt" "github.com/hyperledger/fabric/core/peer" "github.com/hyperledger/fabric/core/util" "github.com/hyperledger/fabric/events/producer" @@ -79,6 +80,7 @@ func initChainless() { } func serve(args []string) error { + ledgermgmt.Initialize() // Parameter overrides must be processed before any paramaters are // cached. Failures to cache cause the server to terminate immediately. if chaincodeDevMode {