forked from nolash/ethereum-samples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
A5_Reply.go
196 lines (171 loc) · 4.82 KB
/
A5_Reply.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
// send, receive, get notified about a message
package main
import (
"crypto/ecdsa"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
demo "github.com/nolash/go-ethereum-p2p-demo/common"
"sync"
"time"
)
var (
protoW = &sync.WaitGroup{}
pingW = &sync.WaitGroup{}
)
type FooPingMsg struct {
Pong bool
Created time.Time
}
// create a protocol that can take care of message sending
// the Run function is invoked upon connection
// it gets passed:
// * an instance of p2p.Peer, which represents the remote peer
// * an instance of p2p.MsgReadWriter, which is the io between the node and its peer
var (
proto = p2p.Protocol{
Name: "foo",
Version: 42,
Length: 1,
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
pingW.Add(1)
ponged := false
// create the message structure
msg := FooPingMsg{
Pong: false,
Created: time.Now(),
}
// send the message
err := p2p.Send(rw, 0, msg)
if err != nil {
return fmt.Errorf("Send p2p message fail: %v", err)
}
demo.Log.Info("sending ping", "peer", p)
for !ponged {
// wait for the message to come in from the other side
// note that receive message event doesn't get emitted until we ReadMsg()
msg, err := rw.ReadMsg()
if err != nil {
return fmt.Errorf("Receive p2p message fail: %v", err)
}
// decode the message and check the contents
var decodedmsg FooPingMsg
err = msg.Decode(&decodedmsg)
if err != nil {
return fmt.Errorf("Decode p2p message fail: %v", err)
}
if decodedmsg.Pong {
demo.Log.Info("received pong", "peer", p)
ponged = true
pingW.Done()
} else {
demo.Log.Info("received ping", "peer", p)
msg := FooPingMsg{
Pong: true,
Created: time.Now(),
}
err := p2p.Send(rw, 0, msg)
if err != nil {
return fmt.Errorf("Send p2p message fail: %v", err)
}
demo.Log.Info("sent pong", "peer", p)
}
}
// terminate the protocol after all involved have completed the cycle
pingW.Wait()
protoW.Done()
return nil
},
}
)
// create a server
func newServer(privkey *ecdsa.PrivateKey, name string, version string, port int) *p2p.Server {
// we need to explicitly allow at least one peer, otherwise the connection attempt will be refused
// we also need to explicitly tell the server to generate events for messages
cfg := p2p.Config{
PrivateKey: privkey,
Name: common.MakeName(name, version),
MaxPeers: 1,
Protocols: []p2p.Protocol{proto},
EnableMsgEvents: true,
}
if port > 0 {
cfg.ListenAddr = fmt.Sprintf(":%d", port)
}
srv := &p2p.Server{
Config: cfg,
}
return srv
}
func main() {
// we need private keys for both servers
privkey_one, err := crypto.GenerateKey()
if err != nil {
demo.Log.Crit("Generate private key #1 failed", "err", err)
}
privkey_two, err := crypto.GenerateKey()
if err != nil {
demo.Log.Crit("Generate private key #2 failed", "err", err)
}
// set up the two servers
srv_one := newServer(privkey_one, "foo", "42", 0)
err = srv_one.Start()
if err != nil {
demo.Log.Crit("Start p2p.Server #1 failed", "err", err)
}
srv_two := newServer(privkey_two, "bar", "666", 31234)
err = srv_two.Start()
if err != nil {
demo.Log.Crit("Start p2p.Server #2 failed", "err", err)
}
// set up the event subscriptions on both servers
// the Err() on the Subscription object returns when subscription is closed
eventOneC := make(chan *p2p.PeerEvent)
sub_one := srv_one.SubscribeEvents(eventOneC)
protoW.Add(1)
go func() {
for {
select {
case peerevent := <-eventOneC:
if peerevent.Type == "add" {
demo.Log.Debug("Received peer add notification on node #1", "peer", peerevent.Peer)
} else if peerevent.Type == "msgrecv" {
demo.Log.Info("Received message nofification on node #1", "event", peerevent)
}
case <-sub_one.Err():
return
}
}
}()
eventTwoC := make(chan *p2p.PeerEvent)
sub_two := srv_two.SubscribeEvents(eventTwoC)
protoW.Add(1)
go func() {
for {
select {
case peerevent := <-eventTwoC:
if peerevent.Type == "add" {
demo.Log.Debug("Received peer add notification on node #2", "peer", peerevent.Peer)
} else if peerevent.Type == "msgrecv" {
demo.Log.Info("Received message nofification on node #2", "event", peerevent)
}
case <-sub_two.Err():
return
}
}
}()
// get the node instance of the second server
node_two := srv_two.Self()
// add it as a peer to the first node
// the connection and crypto handshake will be performed automatically
srv_one.AddPeer(node_two)
// wait for each respective message to be delivered on both sides
protoW.Wait()
// terminate subscription loops and unsubscribe
sub_one.Unsubscribe()
sub_two.Unsubscribe()
// stop the servers
srv_one.Stop()
srv_two.Stop()
}