Skip to content

Commit

Permalink
[FAB-1573] Make deliver API signable
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-1573

This changeset modifies the Deliver API to accept an Envelope of type
DELIVER_SEEK_INFO instead of a standard SeekInfo message.  The Envelope
embeds a SeekInfo as well as the bits necessary to construct a
non-replayable signature.

This unfortunately breaks the API, but it seems better to fix it now
than to wait until there are more dependencies.

The alternative was to invent yet another signing scheme, which seemed
like it would be equally as much work on the consumer side with ongoing
messiness.

Change-Id: I3e8a759d33f68cb6a69f5242440136da730f0396
Signed-off-by: Jason Yellick <[email protected]>
Jason Yellick committed Jan 11, 2017

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 149ae0d commit 1bf6190
Showing 14 changed files with 291 additions and 195 deletions.
45 changes: 19 additions & 26 deletions bddtests/orderer/ab_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions bddtests/orderer/ab_pb2_grpc.py
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@

import common.common_pb2 as common_dot_common__pb2
import orderer.ab_pb2 as orderer_dot_ab__pb2
import orderer.ab_pb2 as orderer_dot_ab__pb2
import common.common_pb2 as common_dot_common__pb2
import orderer.ab_pb2 as orderer_dot_ab__pb2


@@ -23,7 +23,7 @@ def __init__(self, channel):
)
self.Deliver = channel.stream_stream(
'/orderer.AtomicBroadcast/Deliver',
request_serializer=orderer_dot_ab__pb2.SeekInfo.SerializeToString,
request_serializer=common_dot_common__pb2.Envelope.SerializeToString,
response_deserializer=orderer_dot_ab__pb2.DeliverResponse.FromString,
)

@@ -38,7 +38,7 @@ def Broadcast(self, request_iterator, context):
raise NotImplementedError('Method not implemented!')

def Deliver(self, request_iterator, context):
"""deliver first requires an update containing a seek message, then a stream of block replies is received.
"""deliver first requires an Envelope of type DELIVER_SEEK_INFO with Payload data as a mashaled SeekInfo message, then a stream of block replies is received.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
@@ -54,7 +54,7 @@ def add_AtomicBroadcastServicer_to_server(servicer, server):
),
'Deliver': grpc.stream_stream_rpc_method_handler(
servicer.Deliver,
request_deserializer=orderer_dot_ab__pb2.SeekInfo.FromString,
request_deserializer=common_dot_common__pb2.Envelope.FromString,
response_serializer=orderer_dot_ab__pb2.DeliverResponse.SerializeToString,
),
}
17 changes: 12 additions & 5 deletions bddtests/steps/orderer_util.py
Original file line number Diff line number Diff line change
@@ -210,11 +210,18 @@ def seekPosition(position):
return ab_pb2.SeekPosition(specified = ab_pb2.SeekSpecified(number = position))

def createSeekInfo(chainID = TEST_CHAIN_ID, start = 'Oldest', end = 'Newest', behavior = 'FAIL_IF_NOT_READY'):
return ab_pb2.SeekInfo(
chainID = chainID,
start = seekPosition(start),
stop = seekPosition(end),
behavior = ab_pb2.SeekInfo.SeekBehavior.Value(behavior),
return common_pb2.Envelope(
payload = common_pb2.Payload(
header = common_pb2.Header(
chainHeader = common_pb2.ChainHeader( chainID = chainID ),
signatureHeader = common_pb2.SignatureHeader(),
),
data = ab_pb2.SeekInfo(
start = seekPosition(start),
stop = seekPosition(end),
behavior = ab_pb2.SeekInfo.SeekBehavior.Value(behavior),
).SerializeToString(),
).SerializeToString(),
)

def generateBroadcastMessages(chainID = TEST_CHAIN_ID, numToGenerate = 1, timeToHoldOpen = 1):
39 changes: 29 additions & 10 deletions core/deliverservice/client.go
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@ import (
"github.com/hyperledger/fabric/gossip/service"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"
"github.com/op/go-logging"
"github.com/spf13/viper"
"golang.org/x/net/context"
@@ -155,20 +156,38 @@ func (d *DeliverService) checkLeaderAndRunDeliver(committer committer.Committer)
}

func (d *DeliverService) seekOldest() error {
return d.client.Send(&orderer.SeekInfo{
ChainID: d.chainID,
Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Oldest{Oldest: &orderer.SeekOldest{}}},
Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}},
Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY,
return d.client.Send(&common.Envelope{
Payload: utils.MarshalOrPanic(&common.Payload{
Header: &common.Header{
ChainHeader: &common.ChainHeader{
ChainID: d.chainID,
},
SignatureHeader: &common.SignatureHeader{},
},
Data: utils.MarshalOrPanic(&orderer.SeekInfo{
Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Oldest{Oldest: &orderer.SeekOldest{}}},
Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}},
Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY,
}),
}),
})
}

func (d *DeliverService) seekLatestFromCommitter(height uint64) error {
return d.client.Send(&orderer.SeekInfo{
ChainID: d.chainID,
Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: height}}},
Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}},
Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY,
return d.client.Send(&common.Envelope{
Payload: utils.MarshalOrPanic(&common.Payload{
Header: &common.Header{
ChainHeader: &common.ChainHeader{
ChainID: d.chainID,
},
SignatureHeader: &common.SignatureHeader{},
},
Data: utils.MarshalOrPanic(&orderer.SeekInfo{
Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: height}}},
Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}},
Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY,
}),
}),
})
}

28 changes: 25 additions & 3 deletions orderer/common/deliver/deliver.go
Original file line number Diff line number Diff line change
@@ -17,11 +17,15 @@ limitations under the License.
package deliver

import (
"fmt"

"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/orderer/rawledger"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/op/go-logging"

"github.com/golang/protobuf/proto"
)

var logger = logging.MustGetLogger("orderer/common/deliver")
@@ -60,20 +64,38 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {
logger.Debugf("Starting new deliver loop")
for {
logger.Debugf("Attempting to read seek info message")
seekInfo, err := srv.Recv()
envelope, err := srv.Recv()
if err != nil {
logger.Errorf("Error reading from stream: %s", err)
return err
}
logger.Debugf("Received message %v", seekInfo)
payload := &cb.Payload{}
if err = proto.Unmarshal(envelope.Payload, payload); err != nil {
logger.Errorf("Received an envelope with no payload: %s", err)
return err
}

chain, ok := ds.sm.GetChain(seekInfo.ChainID)
if payload.Header == nil || payload.Header.ChainHeader == nil {
err := fmt.Errorf("Malformed envelope recieved with bad header")
logger.Error(err)
return err
}

chain, ok := ds.sm.GetChain(payload.Header.ChainHeader.ChainID)
if !ok {
return sendStatusReply(srv, cb.Status_NOT_FOUND)
}

// XXX add deliver authorization checking

seekInfo := &ab.SeekInfo{}
if err = proto.Unmarshal(payload.Data, seekInfo); err != nil {
logger.Errorf("Received a signed deliver request with malformed seekInfo payload: %s", err)
return err
}

logger.Debugf("Received seekInfo %v", seekInfo)

cursor, number := chain.Reader().Iterator(seekInfo.Start)
var stopNum uint64
switch stop := seekInfo.Stop.Type.(type) {
Loading

0 comments on commit 1bf6190

Please sign in to comment.