Skip to content

Commit

Permalink
[[Feat]] support multiple address format(33cn#1181)
Browse files Browse the repository at this point in the history
  • Loading branch information
bysomeone committed Mar 21, 2022
2 parents 03b290b + 841cee2 commit 94c4141
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 13 deletions.
8 changes: 5 additions & 3 deletions system/p2p/dht/protocol/broadcast/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ func (p *broadcastProtocol) handleBroadcastReceive(msg subscribeMsg) {
} else if topic == psBlockTopic {
block := msg.value.(*types.Block)
hash = hex.EncodeToString(block.Hash(p.ChainCfg))
log.Debug("recvBlkPs", "height", block.GetHeight(), "hash", hash)
log.Debug("recvBlk", "height", block.GetHeight(), "hash", hash,
"size(KB)", float32(block.Size())/1024, "from", msg.publisher.String())
err = p.postBlockChain(hash, msg.receiveFrom.String(), block, msg.publisher)

} else if topic == psLtBlockTopic {
Expand All @@ -216,14 +217,15 @@ func (p *broadcastProtocol) handleBroadcastReceive(msg subscribeMsg) {
return
}
p.ltB.addLtBlock(lb, msg.receiveFrom, msg.publisher)
log.Debug("recvLtBlk", "height", lb.GetHeader().GetHeight(), "hash", hash)
log.Debug("recvLtBlk", "height", lb.GetHeader().GetHeight(), "hash", hash,
"size(KB)", float32(lb.GetSize())/1024, "from", msg.publisher.String())

} else if strings.HasPrefix(topic, psPeerMsgTopicPrefix) {
err = p.handlePeerMsg(msg.value.(*types.PeerPubSubMsg), msg.receiveFrom, msg.publisher)
}

if err != nil {
log.Error("receivePs", "topic", topic, "hash", hash, "post msg err", err)
log.Error("handleBroadcastReceive", "topic", topic, "hash", hash, "post msg err", err)
}
}

Expand Down
6 changes: 5 additions & 1 deletion system/p2p/dht/protocol/broadcast/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (p *pubSub) init() {
if !p.cfg.DisableValidation {
p.val = initValidator(p)
p.Pubsub.RegisterTopicValidator(psBlockTopic, p.val.validateBlock, pubsub.WithValidatorInline(true))
p.Pubsub.RegisterTopicValidator(psTxTopic, p.val.validatePeer, pubsub.WithValidatorInline(true))
p.Pubsub.RegisterTopicValidator(psTxTopic, p.val.validateTx, pubsub.WithValidatorInline(true))
p.Pubsub.RegisterTopicValidator(psLtBlockTopic, p.val.validatePeer, pubsub.WithValidatorInline(true))
}

Expand Down Expand Up @@ -117,6 +117,10 @@ func (p *pubSub) handleSubMsg(in chan net.SubMsg) {
return
}
topic := *data.Topic
// 交易在pubsub内部验证时已经发送至mempool, 此处直接忽略
if topic == psTxTopic {
break
}
msg = p.newMsg(topic)
err = p.decodeMsg(data.Data, &buf, msg)
if err != nil {
Expand Down
34 changes: 32 additions & 2 deletions system/p2p/dht/protocol/broadcast/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,9 @@ func (v *validator) addBlockHeader(header *types.Header) {
}
}

func (v *validator) validateBlock(ctx context.Context, id peer.ID, msg *ps.Message) ps.ValidationResult {
func (v *validator) validateBlock(ctx context.Context, _ peer.ID, msg *ps.Message) ps.ValidationResult {

id := msg.GetFrom()
if id == v.Host.ID() {
return ps.ValidationAccept
}
Expand Down Expand Up @@ -249,10 +250,39 @@ func (v *validator) validateBlock(ctx context.Context, id peer.ID, msg *ps.Messa
}

//
func (v *validator) validatePeer(ctx context.Context, id peer.ID, msg *ps.Message) ps.ValidationResult {
func (v *validator) validatePeer(ctx context.Context, _ peer.ID, msg *ps.Message) ps.ValidationResult {
id := msg.GetFrom()
if v.isDeniedPeer(id) {
log.Debug("validatePeer", "topic", *msg.Topic, "denied peer", id.Pretty())
return ps.ValidationReject
}
return ps.ValidationAccept
}

func (v *validator) validateTx(ctx context.Context, _ peer.ID, msg *ps.Message) ps.ValidationResult {

from := msg.GetFrom()
if from == v.Host.ID() {
return ps.ValidationAccept
}

tx := &types.Transaction{}
err := v.decodeMsg(msg.Data, nil, tx)
if err != nil {
log.Error("validateTx", "decodeMsg err", err)
return ps.ValidationReject
}

//重复检测
if v.txFilter.AddWithCheckAtomic(hex.EncodeToString(tx.Hash()), struct{}{}) {
return ps.ValidationIgnore
}

_, err = v.API.SendTx(tx)
if err != nil {
log.Debug("validateTx", "hash", hex.EncodeToString(tx.Hash()), "err", err)
return ps.ValidationIgnore
}

return ps.ValidationAccept
}
16 changes: 9 additions & 7 deletions system/p2p/dht/protocol/broadcast/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,12 @@ func Test_validateBlock(t *testing.T) {
proto, cancel := newTestProtocol()
defer cancel()
val.broadcastProtocol = proto

require.Equal(t, ps.ValidationAccept, val.validateBlock(val.Ctx, val.Host.ID(), nil))
msg := &ps.Message{Message: &pubsub_pb.Message{From: []byte(val.Host.ID())}}
require.Equal(t, ps.ValidationAccept, val.validateBlock(val.Ctx, val.Host.ID(), msg))
val.addDeniedPeer("errpid", 10)
require.Equal(t, ps.ValidationReject, val.validateBlock(val.Ctx, "errpid", nil))
msg := &ps.Message{Message: &pubsub_pb.Message{Data: []byte("errmsg")}}
msg = &ps.Message{Message: &pubsub_pb.Message{From: []byte("errpid")}}
require.Equal(t, ps.ValidationReject, val.validateBlock(val.Ctx, "errpid", msg))
msg = &ps.Message{Message: &pubsub_pb.Message{Data: []byte("errmsg")}}
require.Equal(t, ps.ValidationReject, val.validateBlock(val.Ctx, "testpid", msg))

testBlock := &types.Block{Height: 1}
Expand All @@ -113,7 +114,8 @@ func Test_validatePeer(t *testing.T) {
val := newValidator(newTestPubSub())
val.addDeniedPeer("errpid", 10)
topic := "tx"
msg := &ps.Message{Message: &pubsub_pb.Message{Topic: &topic}}
require.Equal(t, ps.ValidationReject, val.validatePeer(val.Ctx, "errpid", msg))
require.Equal(t, ps.ValidationAccept, val.validatePeer(val.Ctx, "normalpid", msg))
msg := &ps.Message{Message: &pubsub_pb.Message{Topic: &topic, From: []byte("errpid")}}
require.Equal(t, ps.ValidationReject, val.validatePeer(val.Ctx, "", msg))
msg.From = []byte("normalPid")
require.Equal(t, ps.ValidationAccept, val.validatePeer(val.Ctx, "", msg))
}

0 comments on commit 94c4141

Please sign in to comment.