Skip to content

Commit

Permalink
Move rpc message type definitions into proto file (#741)
Browse files Browse the repository at this point in the history
  • Loading branch information
zjshen14 committed Mar 14, 2019
1 parent 4d6efae commit a3b6e51
Show file tree
Hide file tree
Showing 8 changed files with 297 additions and 319 deletions.
36 changes: 17 additions & 19 deletions dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ type blockMsg struct {
ctx context.Context
chainID uint32
block *iotextypes.Block
blkType uint32
}

func (m blockMsg) ChainID() uint32 {
Expand Down Expand Up @@ -101,7 +100,7 @@ type IotxDispatcher struct {
started int32
shutdown int32
eventChan chan interface{}
eventAudit map[uint32]int
eventAudit map[iotexrpc.MessageType]int
eventAuditLock sync.RWMutex
wg sync.WaitGroup
quit chan struct{}
Expand All @@ -114,7 +113,7 @@ type IotxDispatcher struct {
func NewDispatcher(cfg config.Config) (Dispatcher, error) {
d := &IotxDispatcher{
eventChan: make(chan interface{}, cfg.Dispatcher.EventChanSize),
eventAudit: make(map[uint32]int),
eventAudit: make(map[iotexrpc.MessageType]int),
quit: make(chan struct{}),
subscribers: make(map[uint32]Subscriber),
}
Expand Down Expand Up @@ -160,10 +159,10 @@ func (d *IotxDispatcher) EventChan() *chan interface{} {
}

// EventAudit returns the event audit map
func (d *IotxDispatcher) EventAudit() map[uint32]int {
func (d *IotxDispatcher) EventAudit() map[iotexrpc.MessageType]int {
d.eventAuditLock.RLock()
defer d.eventAuditLock.RUnlock()
snapshot := make(map[uint32]int)
snapshot := make(map[iotexrpc.MessageType]int)
for k, v := range d.eventAudit {
snapshot[k] = v
}
Expand Down Expand Up @@ -199,7 +198,7 @@ loop:

// handleActionMsg handles actionMsg from all peers.
func (d *IotxDispatcher) handleActionMsg(m *actionMsg) {
d.updateEventAudit(protogen.MsgActionType)
d.updateEventAudit(iotexrpc.MessageType_ACTION)
if subscriber, ok := d.subscribers[m.ChainID()]; ok {
if err := subscriber.HandleAction(m.ctx, m.action); err != nil {
requestMtc.WithLabelValues("AddAction", "false").Inc()
Expand All @@ -215,7 +214,7 @@ func (d *IotxDispatcher) handleBlockMsg(m *blockMsg) {
d.subscribersMU.RLock()
defer d.subscribersMU.RUnlock()
if subscriber, ok := d.subscribers[m.ChainID()]; ok {
d.updateEventAudit(protogen.MsgBlockProtoMsgType)
d.updateEventAudit(iotexrpc.MessageType_BLOCK)
if err := subscriber.HandleBlock(m.ctx, m.block); err != nil {
log.L().Error("Fail to handle the block.", zap.Error(err))
}
Expand All @@ -231,7 +230,7 @@ func (d *IotxDispatcher) handleBlockSyncMsg(m *blockSyncMsg) {
zap.Uint64("start", m.sync.Start),
zap.Uint64("end", m.sync.End))

d.updateEventAudit(protogen.MsgBlockSyncReqType)
d.updateEventAudit(iotexrpc.MessageType_BLOCK_REQUEST)
if subscriber, ok := d.subscribers[m.ChainID()]; ok {
// dispatch to block sync
if err := subscriber.HandleSyncRequest(m.ctx, m.peer, m.sync); err != nil {
Expand Down Expand Up @@ -263,7 +262,6 @@ func (d *IotxDispatcher) dispatchBlockCommit(ctx context.Context, chainID uint32
ctx: ctx,
chainID: chainID,
block: (msg).(*iotextypes.Block),
blkType: protogen.MsgBlockProtoMsgType,
})
}

Expand All @@ -282,7 +280,7 @@ func (d *IotxDispatcher) dispatchBlockSyncReq(ctx context.Context, chainID uint3

// HandleBroadcast handles incoming broadcast message
func (d *IotxDispatcher) HandleBroadcast(ctx context.Context, chainID uint32, message proto.Message) {
msgType, err := protogen.GetTypeFromProtoMsg(message)
msgType, err := protogen.GetTypeFromRPCMsg(message)
if err != nil {
log.L().Warn("Unexpected message handled by HandleBroadcast.", zap.Error(err))
}
Expand All @@ -296,33 +294,33 @@ func (d *IotxDispatcher) HandleBroadcast(ctx context.Context, chainID uint32, me
d.subscribersMU.RUnlock()

switch msgType {
case protogen.MsgConsensusType:
case iotexrpc.MessageType_CONSENSUS:
err := subscriber.HandleConsensusMsg(message.(*iotexrpc.Consensus))
if err != nil {
log.L().Error("Failed to handle block propose.", zap.Error(err))
}
case protogen.MsgActionType:
case iotexrpc.MessageType_ACTION:
d.dispatchAction(ctx, chainID, message)
case protogen.MsgBlockProtoMsgType:
case iotexrpc.MessageType_BLOCK:
d.dispatchBlockCommit(ctx, chainID, message)
default:
log.L().Warn("Unexpected msgType handled by HandleBroadcast.", zap.Uint32("msgType", msgType))
log.L().Warn("Unexpected msgType handled by HandleBroadcast.", zap.Any("msgType", msgType))
}
}

// HandleTell handles incoming unicast message
func (d *IotxDispatcher) HandleTell(ctx context.Context, chainID uint32, peer peerstore.PeerInfo, message proto.Message) {
msgType, err := protogen.GetTypeFromProtoMsg(message)
msgType, err := protogen.GetTypeFromRPCMsg(message)
if err != nil {
log.L().Warn("Unexpected message handled by HandleTell.", zap.Error(err))
}
switch msgType {
case protogen.MsgBlockSyncReqType:
case iotexrpc.MessageType_BLOCK_REQUEST:
d.dispatchBlockSyncReq(ctx, chainID, peer, message)
case protogen.MsgBlockProtoMsgType:
case iotexrpc.MessageType_BLOCK:
d.dispatchBlockCommit(ctx, chainID, message)
default:
log.L().Warn("Unexpected msgType handled by HandleTell.", zap.Uint32("msgType", msgType))
log.L().Warn("Unexpected msgType handled by HandleTell.", zap.Any("msgType", msgType))
}
}

Expand All @@ -336,7 +334,7 @@ func (d *IotxDispatcher) enqueueEvent(event interface{}) {
}()
}

func (d *IotxDispatcher) updateEventAudit(t uint32) {
func (d *IotxDispatcher) updateEventAudit(t iotexrpc.MessageType) {
d.eventAuditLock.Lock()
defer d.eventAuditLock.Unlock()
d.eventAudit[t]++
Expand Down
22 changes: 11 additions & 11 deletions p2p/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"go.uber.org/zap"

"github.com/iotexproject/iotex-core/config"
p2ppb "github.com/iotexproject/iotex-core/p2p/pb"
"github.com/iotexproject/iotex-core/pkg/log"
"github.com/iotexproject/iotex-core/protogen"
"github.com/iotexproject/iotex-core/protogen/iotexrpc"
)

const (
Expand Down Expand Up @@ -115,7 +115,7 @@ func (p *Agent) Start(ctx context.Context) error {
<-ready
var (
peerID string
broadcast p2ppb.BroadcastMsg
broadcast iotexrpc.BroadcastMsg
latency int64
)
skip := false
Expand Down Expand Up @@ -150,7 +150,7 @@ func (p *Agent) Start(ctx context.Context) error {
t, _ := ptypes.Timestamp(broadcast.GetTimestamp())
latency = time.Since(t).Nanoseconds() / time.Millisecond.Nanoseconds()

msg, err := protogen.TypifyProtoMsg(broadcast.MsgType, broadcast.MsgBody)
msg, err := protogen.TypifyRPCMsg(broadcast.MsgType, broadcast.MsgBody)
if err != nil {
err = errors.Wrap(err, "error when typifying broadcast message")
return
Expand All @@ -165,7 +165,7 @@ func (p *Agent) Start(ctx context.Context) error {
// Blocking handling the unicast message until the agent is started
<-ready
var (
unicast p2ppb.UnicastMsg
unicast iotexrpc.UnicastMsg
peerID string
latency int64
)
Expand All @@ -181,7 +181,7 @@ func (p *Agent) Start(ctx context.Context) error {
err = errors.Wrap(err, "error when marshaling unicast message")
return
}
msg, err := protogen.TypifyProtoMsg(unicast.MsgType, unicast.MsgBody)
msg, err := protogen.TypifyRPCMsg(unicast.MsgType, unicast.MsgBody)
if err != nil {
err = errors.Wrap(err, "error when typifying unicast message")
return
Expand Down Expand Up @@ -277,7 +277,7 @@ func (p *Agent) Stop(ctx context.Context) error {

// BroadcastOutbound sends a broadcast message to the whole network
func (p *Agent) BroadcastOutbound(ctx context.Context, msg proto.Message) (err error) {
var msgType uint32
var msgType iotexrpc.MessageType
var msgBody []byte
defer func() {
status := successStr
Expand All @@ -301,7 +301,7 @@ func (p *Agent) BroadcastOutbound(ctx context.Context, msg proto.Message) (err e
err = errors.New("P2P context doesn't exist")
return
}
broadcast := p2ppb.BroadcastMsg{
broadcast := iotexrpc.BroadcastMsg{
ChainId: p2pCtx.ChainID,
PeerId: p.host.HostIdentity(),
MsgType: msgType,
Expand All @@ -322,7 +322,7 @@ func (p *Agent) BroadcastOutbound(ctx context.Context, msg proto.Message) (err e

// UnicastOutbound sends a unicast message to the given address
func (p *Agent) UnicastOutbound(ctx context.Context, peer peerstore.PeerInfo, msg proto.Message) (err error) {
var msgType uint32
var msgType iotexrpc.MessageType
var msgBody []byte
defer func() {
status := successStr
Expand All @@ -340,7 +340,7 @@ func (p *Agent) UnicastOutbound(ctx context.Context, peer peerstore.PeerInfo, ms
err = errors.New("P2P context doesn't exist")
return
}
unicast := p2ppb.UnicastMsg{
unicast := iotexrpc.UnicastMsg{
ChainId: p2pCtx.ChainID,
PeerId: p.host.HostIdentity(),
MsgType: msgType,
Expand Down Expand Up @@ -370,8 +370,8 @@ func (p *Agent) Neighbors(ctx context.Context) ([]peerstore.PeerInfo, error) {
return p.host.Neighbors(ctx)
}

func convertAppMsg(msg proto.Message) (uint32, []byte, error) {
msgType, err := protogen.GetTypeFromProtoMsg(msg)
func convertAppMsg(msg proto.Message) (iotexrpc.MessageType, []byte, error) {
msgType, err := protogen.GetTypeFromRPCMsg(msg)
if err != nil {
return 0, nil, errors.Wrap(err, "error when converting application message to proto")
}
Expand Down
Loading

0 comments on commit a3b6e51

Please sign in to comment.