Skip to content

Commit

Permalink
[FAB-7048] Move deliver from orderer to fabric/common
Browse files Browse the repository at this point in the history
This CR moves deliver from the orderer to fabric/common. This is being
done to support the future implementation of deliver on the peer.

Change-Id: If077f2c05b5a10fdeb4e6ac315111495304e4c5e
Signed-off-by: Will Lahti <[email protected]>
  • Loading branch information
wlahti committed Dec 7, 2017
1 parent 0c5fa00 commit 0dfe4f3
Show file tree
Hide file tree
Showing 32 changed files with 167 additions and 163 deletions.
12 changes: 6 additions & 6 deletions orderer/common/deliver/deliver.go → common/deliver/deliver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@ package deliver
import (
"io"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/blockledger"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/orderer/common/ledger"
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"

"github.com/golang/protobuf/proto"
"github.com/op/go-logging"
)

const pkgLogID = "orderer/common/deliver"
const pkgLogID = "common/deliver"

var logger *logging.Logger

Expand All @@ -58,7 +58,7 @@ type Support interface {
PolicyManager() policies.Manager

// Reader returns the chain Reader for the chain
Reader() ledger.Reader
Reader() blockledger.Reader

// Errored returns a channel which closes when the backing consenter has errored
Errored() <-chan struct{}
Expand Down Expand Up @@ -137,7 +137,7 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env

lastConfigSequence := chain.Sequence()

sf := msgprocessor.NewSigFilter(policies.ChannelReaders, chain)
sf := NewSigFilter(policies.ChannelReaders, chain)
if err := sf.Apply(envelope); err != nil {
logger.Warningf("[channel: %s] Received unauthorized deliver request from %s: %s", chdr.ChannelId, addr, err)
return sendStatusReply(srv, cb.Status_FORBIDDEN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (
"time"

"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/blockledger"
ramledger "github.com/hyperledger/fabric/common/ledger/blockledger/ram"
mockpolicies "github.com/hyperledger/fabric/common/mocks/policies"
"github.com/hyperledger/fabric/common/policies"
genesisconfig "github.com/hyperledger/fabric/common/tools/configtxgen/localconfig"
"github.com/hyperledger/fabric/orderer/common/ledger"
ramledger "github.com/hyperledger/fabric/orderer/common/ledger/ram"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"
Expand Down Expand Up @@ -120,7 +120,7 @@ func (mm *mockSupportManager) GetChain(chainID string) (Support, bool) {
}

type mockSupport struct {
ledger ledger.ReadWriter
ledger blockledger.ReadWriter
policyManager *mockpolicies.Manager
erroredChan chan struct{}
configSeq uint64
Expand All @@ -138,11 +138,11 @@ func (mcs *mockSupport) PolicyManager() policies.Manager {
return mcs.policyManager
}

func (mcs *mockSupport) Reader() ledger.Reader {
func (mcs *mockSupport) Reader() blockledger.Reader {
return mcs.ledger
}

func NewRAMLedger() ledger.ReadWriter {
func NewRAMLedger() blockledger.ReadWriter {
rlf := ramledger.New(ledgerSize + 1)
rl, _ := rlf.GetOrCreate(genesisconfig.TestChainID)
rl.Append(genesisBlock)
Expand All @@ -153,7 +153,7 @@ func initializeDeliverHandler() Handler {
mm := newMockMultichainManager()
for i := 1; i < ledgerSize; i++ {
l := mm.chains[systemChainID].ledger
l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}}))
l.Append(blockledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}}))
}

return NewHandlerImpl(mm)
Expand Down Expand Up @@ -282,7 +282,7 @@ func TestUnauthorizedSeek(t *testing.T) {
mm := newMockMultichainManager()
for i := 1; i < ledgerSize; i++ {
l := mm.chains[systemChainID].ledger
l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}}))
l.Append(blockledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}}))
}
mm.chains[systemChainID].policyManager.Policy.Err = fmt.Errorf("Fail to evaluate policy")

Expand All @@ -308,7 +308,7 @@ func TestRevokedAuthorizationSeek(t *testing.T) {
mm := newMockMultichainManager()
for i := 1; i < ledgerSize; i++ {
l := mm.chains[systemChainID].ledger
l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}}))
l.Append(blockledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}}))
}

m := newMockD()
Expand All @@ -329,7 +329,7 @@ func TestRevokedAuthorizationSeek(t *testing.T) {
mm.chains[systemChainID].policyManager.Policy.Err = fmt.Errorf("Fail to evaluate policy")
mm.chains[systemChainID].configSeq++
l := mm.chains[systemChainID].ledger
l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", ledgerSize+1))}}))
l.Append(blockledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", ledgerSize+1))}}))

select {
case deliverReply := <-m.sendChan:
Expand Down Expand Up @@ -391,7 +391,7 @@ func TestBlockingSeek(t *testing.T) {
mm := newMockMultichainManager()
for i := 1; i < ledgerSize; i++ {
l := mm.chains[systemChainID].ledger
l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}}))
l.Append(blockledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}}))
}

m := newMockD()
Expand All @@ -418,7 +418,7 @@ func TestBlockingSeek(t *testing.T) {
}

l := mm.chains[systemChainID].ledger
l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", ledgerSize+1))}}))
l.Append(blockledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", ledgerSize+1))}}))

select {
case deliverReply := <-m.sendChan:
Expand All @@ -445,7 +445,7 @@ func TestErroredSeek(t *testing.T) {
l := ms.ledger
close(ms.erroredChan)
for i := 1; i < ledgerSize; i++ {
l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}}))
l.Append(blockledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}}))
}

m := newMockD()
Expand All @@ -469,7 +469,7 @@ func TestErroredBlockingSeek(t *testing.T) {
ms := mm.chains[systemChainID]
l := ms.ledger
for i := 1; i < ledgerSize; i++ {
l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}}))
l.Append(blockledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}}))
}

m := newMockD()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package msgprocessor
package deliver

import (
"fmt"
Expand All @@ -15,28 +15,34 @@ import (
"github.com/pkg/errors"
)

// ErrPermissionDenied is returned by errors which are caused by transactions
// which are not permitted due to an authorization failure.
var ErrPermissionDenied = errors.New("permission denied")

// SigFilterSupport provides the resources required for the signature filter
type SigFilterSupport interface {
// PolicyManager returns a reference to the current policy manager
PolicyManager() policies.Manager
}

type sigFilter struct {
// SigFilter stores the name of the policy to apply to deliver requests to
// determine whether a client is authorized
type SigFilter struct {
policyName string
support SigFilterSupport
}

// NewSigFilter creates a new signature filter, at every evaluation, the policy manager is called
// to retrieve the latest version of the policy
func NewSigFilter(policyName string, support SigFilterSupport) Rule {
return &sigFilter{
func NewSigFilter(policyName string, support SigFilterSupport) *SigFilter {
return &SigFilter{
policyName: policyName,
support: support,
}
}

// Apply applies the policy given, resulting in Reject or Forward, never Accept
func (sf *sigFilter) Apply(message *cb.Envelope) error {
func (sf *SigFilter) Apply(message *cb.Envelope) error {
signedData, err := message.AsSignedData()

if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package msgprocessor
package deliver

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package ledger_test
package blockledger_test

import (
"bytes"
"reflect"
"testing"

. "github.com/hyperledger/fabric/orderer/common/ledger"
"github.com/hyperledger/fabric/common/ledger/blockledger"

cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
)
Expand All @@ -32,7 +33,7 @@ type ledgerTestable interface {
}

type ledgerTestFactory interface {
New() (Factory, ReadWriter)
New() (blockledger.Factory, blockledger.ReadWriter)
Destroy() error
Persistent() bool
}
Expand Down Expand Up @@ -69,7 +70,7 @@ func testInitialization(lf ledgerTestFactory, t *testing.T) {
if li.Height() != 1 {
t.Fatalf("Block height should be 1")
}
block := GetBlock(li, 0)
block := blockledger.GetBlock(li, 0)
if block == nil {
t.Fatalf("Error retrieving genesis block")
}
Expand All @@ -86,7 +87,7 @@ func testReinitialization(lf ledgerTestFactory, t *testing.T) {
return
}
olf, oli := lf.New()
aBlock := CreateNextBlock(oli, []*cb.Envelope{{Payload: []byte("My Data")}})
aBlock := blockledger.CreateNextBlock(oli, []*cb.Envelope{{Payload: []byte("My Data")}})
err := oli.Append(aBlock)
if err != nil {
t.Fatalf("Error appending block: %s", err)
Expand All @@ -97,7 +98,7 @@ func testReinitialization(lf ledgerTestFactory, t *testing.T) {
if li.Height() != 2 {
t.Fatalf("Block height should be 2")
}
block := GetBlock(li, 1)
block := blockledger.GetBlock(li, 1)
if block == nil {
t.Fatalf("Error retrieving block 1")
}
Expand All @@ -112,17 +113,17 @@ func TestAddition(t *testing.T) {

func testAddition(lf ledgerTestFactory, t *testing.T) {
_, li := lf.New()
genesis := GetBlock(li, 0)
genesis := blockledger.GetBlock(li, 0)
if genesis == nil {
t.Fatalf("Could not retrieve genesis block")
}
prevHash := genesis.Header.Hash()

li.Append(CreateNextBlock(li, []*cb.Envelope{{Payload: []byte("My Data")}}))
li.Append(blockledger.CreateNextBlock(li, []*cb.Envelope{{Payload: []byte("My Data")}}))
if li.Height() != 2 {
t.Fatalf("Block height should be 2")
}
block := GetBlock(li, 1)
block := blockledger.GetBlock(li, 1)
if block == nil {
t.Fatalf("Error retrieving genesis block")
}
Expand All @@ -137,7 +138,7 @@ func TestRetrieval(t *testing.T) {

func testRetrieval(lf ledgerTestFactory, t *testing.T) {
_, li := lf.New()
li.Append(CreateNextBlock(li, []*cb.Envelope{{Payload: []byte("My Data")}}))
li.Append(blockledger.CreateNextBlock(li, []*cb.Envelope{{Payload: []byte("My Data")}}))
it, num := li.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Oldest{}})
defer it.Close()
if num != 0 {
Expand Down Expand Up @@ -188,7 +189,7 @@ func testBlockedRetrieval(lf ledgerTestFactory, t *testing.T) {
t.Fatalf("Should not be ready for block read")
default:
}
li.Append(CreateNextBlock(li, []*cb.Envelope{{Payload: []byte("My Data")}}))
li.Append(blockledger.CreateNextBlock(li, []*cb.Envelope{{Payload: []byte("My Data")}}))
select {
case <-signal:
default:
Expand Down Expand Up @@ -222,8 +223,8 @@ func testMultichain(lf ledgerTestFactory, t *testing.T) {
t.Fatalf("Error creating chain1: %s", err)
}

c1.Append(CreateNextBlock(c1, []*cb.Envelope{{Payload: c1p1}}))
c1b1 := CreateNextBlock(c1, []*cb.Envelope{{Payload: c1p2}})
c1.Append(blockledger.CreateNextBlock(c1, []*cb.Envelope{{Payload: c1p1}}))
c1b1 := blockledger.CreateNextBlock(c1, []*cb.Envelope{{Payload: c1p2}})
c1.Append(c1b1)

if c1.Height() != 2 {
Expand All @@ -234,7 +235,7 @@ func testMultichain(lf ledgerTestFactory, t *testing.T) {
if err != nil {
t.Fatalf("Error creating chain2: %s", err)
}
c2b0 := c2.Append(CreateNextBlock(c2, []*cb.Envelope{{Payload: c2p1}}))
c2b0 := c2.Append(blockledger.CreateNextBlock(c2, []*cb.Envelope{{Payload: c2p1}}))

if c2.Height() != 1 {
t.Fatalf("Block height for c2 should be 1")
Expand All @@ -245,7 +246,7 @@ func testMultichain(lf ledgerTestFactory, t *testing.T) {
t.Fatalf("Error retrieving chain1: %s", err)
}

if b := GetBlock(c1, 1); !reflect.DeepEqual(c1b1, b) {
if b := blockledger.GetBlock(c1, 1); !reflect.DeepEqual(c1b1, b) {
t.Fatalf("Did not properly store block 1 on chain 1:")
}

Expand All @@ -254,7 +255,7 @@ func testMultichain(lf ledgerTestFactory, t *testing.T) {
t.Fatalf("Error retrieving chain2: %s", err)
}

if b := GetBlock(c2, 0); reflect.DeepEqual(c2b0, b) {
if b := blockledger.GetBlock(c2, 0); reflect.DeepEqual(c2b0, b) {
t.Fatalf("Did not properly store block 1 on chain 1")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ import (

"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/ledger/blkstorage/fsblkstorage"
"github.com/hyperledger/fabric/orderer/common/ledger"
"github.com/hyperledger/fabric/common/ledger/blockledger"
)

type fileLedgerFactory struct {
blkstorageProvider blkstorage.BlockStoreProvider
ledgers map[string]ledger.ReadWriter
ledgers map[string]blockledger.ReadWriter
mutex sync.Mutex
}

// GetOrCreate gets an existing ledger (if it exists) or creates it if it does not
func (flf *fileLedgerFactory) GetOrCreate(chainID string) (ledger.ReadWriter, error) {
func (flf *fileLedgerFactory) GetOrCreate(chainID string) (blockledger.ReadWriter, error) {
flf.mutex.Lock()
defer flf.mutex.Unlock()

Expand Down Expand Up @@ -66,13 +66,13 @@ func (flf *fileLedgerFactory) Close() {
}

// New creates a new ledger factory
func New(directory string) ledger.Factory {
func New(directory string) blockledger.Factory {
return &fileLedgerFactory{
blkstorageProvider: fsblkstorage.NewProvider(
fsblkstorage.NewConf(directory, -1),
&blkstorage.IndexConfig{
AttrsToIndex: []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockNum}},
),
ledgers: make(map[string]ledger.ReadWriter),
ledgers: make(map[string]blockledger.ReadWriter),
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
"testing"

"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/ledger/blockledger"
genesisconfig "github.com/hyperledger/fabric/common/tools/configtxgen/localconfig"
"github.com/hyperledger/fabric/orderer/common/ledger"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -56,7 +56,7 @@ func (mbsp *mockBlockStoreProvider) Close() {
func TestBlockstoreProviderError(t *testing.T) {
flf := &fileLedgerFactory{
blkstorageProvider: &mockBlockStoreProvider{error: fmt.Errorf("blockstorage provider error")},
ledgers: make(map[string]ledger.ReadWriter),
ledgers: make(map[string]blockledger.ReadWriter),
}
assert.Panics(
t,
Expand Down
Loading

0 comments on commit 0dfe4f3

Please sign in to comment.