From 20f39defabd7ffc38fb310ed591c640bfe23260b Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Tue, 22 Sep 2020 10:17:39 +0200 Subject: [PATCH] p2p: move rlpx into separate package (#21464) This change moves the RLPx protocol implementation into a separate package, p2p/rlpx. The new package can be used to establish RLPx connections for protocol testing purposes. Co-authored-by: Felix Lange --- cmd/devp2p/main.go | 1 + cmd/devp2p/rlpxcmd.go | 94 +++++ p2p/message_test.go | 11 - p2p/peer_test.go | 15 +- p2p/{ => rlpx}/rlpx.go | 724 +++++++++++++++++------------------- p2p/{ => rlpx}/rlpx_test.go | 400 +++++--------------- p2p/server.go | 18 +- p2p/server_test.go | 55 ++- p2p/transport.go | 177 +++++++++ p2p/transport_test.go | 148 ++++++++ 10 files changed, 901 insertions(+), 742 deletions(-) create mode 100644 cmd/devp2p/rlpxcmd.go rename p2p/{ => rlpx}/rlpx.go (63%) rename p2p/{ => rlpx}/rlpx_test.go (58%) create mode 100644 p2p/transport.go create mode 100644 p2p/transport_test.go diff --git a/cmd/devp2p/main.go b/cmd/devp2p/main.go index 6064d17c7070..9eebd9b1375b 100644 --- a/cmd/devp2p/main.go +++ b/cmd/devp2p/main.go @@ -63,6 +63,7 @@ func init() { discv5Command, dnsCommand, nodesetCommand, + rlpxCommand, } } diff --git a/cmd/devp2p/rlpxcmd.go b/cmd/devp2p/rlpxcmd.go new file mode 100644 index 000000000000..14eb5989d1e3 --- /dev/null +++ b/cmd/devp2p/rlpxcmd.go @@ -0,0 +1,94 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with go-ethereum. If not, see . + +package main + +import ( + "fmt" + "net" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/rlpx" + "github.com/ethereum/go-ethereum/rlp" + "gopkg.in/urfave/cli.v1" +) + +var ( + rlpxCommand = cli.Command{ + Name: "rlpx", + Usage: "RLPx Commands", + Subcommands: []cli.Command{ + rlpxPingCommand, + }, + } + rlpxPingCommand = cli.Command{ + Name: "ping", + Usage: "Perform a RLPx handshake", + ArgsUsage: "", + Action: rlpxPing, + } +) + +func rlpxPing(ctx *cli.Context) error { + n := getNodeArg(ctx) + + fd, err := net.Dial("tcp", fmt.Sprintf("%v:%d", n.IP(), n.TCP())) + if err != nil { + return err + } + conn := rlpx.NewConn(fd, n.Pubkey()) + + ourKey, _ := crypto.GenerateKey() + _, err = conn.Handshake(ourKey) + if err != nil { + return err + } + + code, data, _, err := conn.Read() + if err != nil { + return err + } + switch code { + case 0: + var h devp2pHandshake + if err := rlp.DecodeBytes(data, &h); err != nil { + return fmt.Errorf("invalid handshake: %v", err) + } + fmt.Printf("%+v\n", h) + case 1: + var msg []p2p.DiscReason + if rlp.DecodeBytes(data, &msg); len(msg) == 0 { + return fmt.Errorf("invalid disconnect message") + } + return fmt.Errorf("received disconnect message: %v", msg[0]) + default: + return fmt.Errorf("invalid message code %d, expected handshake (code zero)", code) + } + return nil +} + +// devp2pHandshake is the RLP structure of the devp2p protocol handshake. +type devp2pHandshake struct { + Version uint64 + Name string + Caps []p2p.Cap + ListenPort uint64 + ID hexutil.Bytes // secp256k1 public key + // Ignore additional fields (for forward compatibility). + Rest []rlp.RawValue `rlp:"tail"` +} diff --git a/p2p/message_test.go b/p2p/message_test.go index a01f75556162..e575c5d96e66 100644 --- a/p2p/message_test.go +++ b/p2p/message_test.go @@ -18,11 +18,9 @@ package p2p import ( "bytes" - "encoding/hex" "fmt" "io" "runtime" - "strings" "testing" "time" ) @@ -141,12 +139,3 @@ func TestEOFSignal(t *testing.T) { default: } } - -func unhex(str string) []byte { - r := strings.NewReplacer("\t", "", " ", "", "\n", "") - b, err := hex.DecodeString(r.Replace(str)) - if err != nil { - panic(fmt.Sprintf("invalid hex string: %q", str)) - } - return b -} diff --git a/p2p/peer_test.go b/p2p/peer_test.go index e40deb98f018..4308bbd2eb4b 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -86,9 +86,15 @@ func newNode(id enode.ID, addr string) *enode.Node { } func testPeer(protos []Protocol) (func(), *conn, *Peer, <-chan error) { - fd1, fd2 := net.Pipe() - c1 := &conn{fd: fd1, node: newNode(randomID(), ""), transport: newTestTransport(&newkey().PublicKey, fd1)} - c2 := &conn{fd: fd2, node: newNode(randomID(), ""), transport: newTestTransport(&newkey().PublicKey, fd2)} + var ( + fd1, fd2 = net.Pipe() + key1, key2 = newkey(), newkey() + t1 = newTestTransport(&key2.PublicKey, fd1, nil) + t2 = newTestTransport(&key1.PublicKey, fd2, &key1.PublicKey) + ) + + c1 := &conn{fd: fd1, node: newNode(uintID(1), ""), transport: t1} + c2 := &conn{fd: fd2, node: newNode(uintID(2), ""), transport: t2} for _, p := range protos { c1.caps = append(c1.caps, p.cap()) c2.caps = append(c2.caps, p.cap()) @@ -173,9 +179,12 @@ func TestPeerPing(t *testing.T) { } } +// This test checks that a disconnect message sent by a peer is returned +// as the error from Peer.run. func TestPeerDisconnect(t *testing.T) { closer, rw, _, disc := testPeer(nil) defer closer() + if err := SendItems(rw, discMsg, DiscQuitting); err != nil { t.Fatal(err) } diff --git a/p2p/rlpx.go b/p2p/rlpx/rlpx.go similarity index 63% rename from p2p/rlpx.go rename to p2p/rlpx/rlpx.go index 4d903a08a0f6..2021bf08be11 100644 --- a/p2p/rlpx.go +++ b/p2p/rlpx/rlpx.go @@ -14,7 +14,8 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -package p2p +// Package rlpx implements the RLPx transport protocol. +package rlpx import ( "bytes" @@ -29,169 +30,312 @@ import ( "fmt" "hash" "io" - "io/ioutil" mrand "math/rand" "net" - "sync" "time" - "github.com/ethereum/go-ethereum/common/bitutil" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/ecies" - "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rlp" "github.com/golang/snappy" "golang.org/x/crypto/sha3" ) -const ( - maxUint24 = ^uint32(0) >> 8 - - sskLen = 16 // ecies.MaxSharedKeyLength(pubKey) / 2 - sigLen = crypto.SignatureLength // elliptic S256 - pubLen = 64 // 512 bit pubkey in uncompressed representation without format byte - shaLen = 32 // hash length (for nonce etc) - - authMsgLen = sigLen + shaLen + pubLen + shaLen + 1 - authRespLen = pubLen + shaLen + 1 - - eciesOverhead = 65 /* pubkey */ + 16 /* IV */ + 32 /* MAC */ - - encAuthMsgLen = authMsgLen + eciesOverhead // size of encrypted pre-EIP-8 initiator handshake - encAuthRespLen = authRespLen + eciesOverhead // size of encrypted pre-EIP-8 handshake reply +// Conn is an RLPx network connection. It wraps a low-level network connection. The +// underlying connection should not be used for other activity when it is wrapped by Conn. +// +// Before sending messages, a handshake must be performed by calling the Handshake method. +// This type is not generally safe for concurrent use, but reading and writing of messages +// may happen concurrently after the handshake. +type Conn struct { + dialDest *ecdsa.PublicKey + conn net.Conn + handshake *handshakeState + snappy bool +} - // total timeout for encryption handshake and protocol - // handshake in both directions. - handshakeTimeout = 5 * time.Second +type handshakeState struct { + enc cipher.Stream + dec cipher.Stream - // This is the timeout for sending the disconnect reason. - // This is shorter than the usual timeout because we don't want - // to wait if the connection is known to be bad anyway. - discWriteTimeout = 1 * time.Second -) - -// errPlainMessageTooLarge is returned if a decompressed message length exceeds -// the allowed 24 bits (i.e. length >= 16MB). -var errPlainMessageTooLarge = errors.New("message length >= 16MB") + macCipher cipher.Block + egressMAC hash.Hash + ingressMAC hash.Hash +} -// rlpx is the transport protocol used by actual (non-test) connections. -// It wraps the frame encoder with locks and read/write deadlines. -type rlpx struct { - fd net.Conn +// NewConn wraps the given network connection. If dialDest is non-nil, the connection +// behaves as the initiator during the handshake. +func NewConn(conn net.Conn, dialDest *ecdsa.PublicKey) *Conn { + return &Conn{ + dialDest: dialDest, + conn: conn, + } +} - rmu, wmu sync.Mutex - rw *rlpxFrameRW +// SetSnappy enables or disables snappy compression of messages. This is usually called +// after the devp2p Hello message exchange when the negotiated version indicates that +// compression is available on both ends of the connection. +func (c *Conn) SetSnappy(snappy bool) { + c.snappy = snappy } -func newRLPX(fd net.Conn) transport { - fd.SetDeadline(time.Now().Add(handshakeTimeout)) - return &rlpx{fd: fd} +// SetReadDeadline sets the deadline for all future read operations. +func (c *Conn) SetReadDeadline(time time.Time) error { + return c.conn.SetReadDeadline(time) } -func (t *rlpx) ReadMsg() (Msg, error) { - t.rmu.Lock() - defer t.rmu.Unlock() - t.fd.SetReadDeadline(time.Now().Add(frameReadTimeout)) - return t.rw.ReadMsg() +// SetWriteDeadline sets the deadline for all future write operations. +func (c *Conn) SetWriteDeadline(time time.Time) error { + return c.conn.SetWriteDeadline(time) } -func (t *rlpx) WriteMsg(msg Msg) error { - t.wmu.Lock() - defer t.wmu.Unlock() - t.fd.SetWriteDeadline(time.Now().Add(frameWriteTimeout)) - return t.rw.WriteMsg(msg) +// SetDeadline sets the deadline for all future read and write operations. +func (c *Conn) SetDeadline(time time.Time) error { + return c.conn.SetDeadline(time) } -func (t *rlpx) close(err error) { - t.wmu.Lock() - defer t.wmu.Unlock() - // Tell the remote end why we're disconnecting if possible. - if t.rw != nil { - if r, ok := err.(DiscReason); ok && r != DiscNetworkError { - // rlpx tries to send DiscReason to disconnected peer - // if the connection is net.Pipe (in-memory simulation) - // it hangs forever, since net.Pipe does not implement - // a write deadline. Because of this only try to send - // the disconnect reason message if there is no error. - if err := t.fd.SetWriteDeadline(time.Now().Add(discWriteTimeout)); err == nil { - SendItems(t.rw, discMsg, r) - } +// Read reads a message from the connection. +func (c *Conn) Read() (code uint64, data []byte, wireSize int, err error) { + if c.handshake == nil { + panic("can't ReadMsg before handshake") + } + + frame, err := c.handshake.readFrame(c.conn) + if err != nil { + return 0, nil, 0, err + } + code, data, err = rlp.SplitUint64(frame) + if err != nil { + return 0, nil, 0, fmt.Errorf("invalid message code: %v", err) + } + wireSize = len(data) + + // If snappy is enabled, verify and decompress message. + if c.snappy { + var actualSize int + actualSize, err = snappy.DecodedLen(data) + if err != nil { + return code, nil, 0, err + } + if actualSize > maxUint24 { + return code, nil, 0, errPlainMessageTooLarge } + data, err = snappy.Decode(nil, data) } - t.fd.Close() + return code, data, wireSize, err } -func (t *rlpx) doProtoHandshake(our *protoHandshake) (their *protoHandshake, err error) { - // Writing our handshake happens concurrently, we prefer - // returning the handshake read error. If the remote side - // disconnects us early with a valid reason, we should return it - // as the error so it can be tracked elsewhere. - werr := make(chan error, 1) - go func() { werr <- Send(t.rw, handshakeMsg, our) }() - if their, err = readProtocolHandshake(t.rw); err != nil { - <-werr // make sure the write terminates too +func (h *handshakeState) readFrame(conn io.Reader) ([]byte, error) { + // read the header + headbuf := make([]byte, 32) + if _, err := io.ReadFull(conn, headbuf); err != nil { return nil, err } - if err := <-werr; err != nil { - return nil, fmt.Errorf("write error: %v", err) + + // verify header mac + shouldMAC := updateMAC(h.ingressMAC, h.macCipher, headbuf[:16]) + if !hmac.Equal(shouldMAC, headbuf[16:]) { + return nil, errors.New("bad header MAC") } - // If the protocol version supports Snappy encoding, upgrade immediately - t.rw.snappy = their.Version >= snappyProtocolVersion + h.dec.XORKeyStream(headbuf[:16], headbuf[:16]) // first half is now decrypted + fsize := readInt24(headbuf) + // ignore protocol type for now - return their, nil -} + // read the frame content + var rsize = fsize // frame size rounded up to 16 byte boundary + if padding := fsize % 16; padding > 0 { + rsize += 16 - padding + } + framebuf := make([]byte, rsize) + if _, err := io.ReadFull(conn, framebuf); err != nil { + return nil, err + } -func readProtocolHandshake(rw MsgReader) (*protoHandshake, error) { - msg, err := rw.ReadMsg() - if err != nil { + // read and validate frame MAC. we can re-use headbuf for that. + h.ingressMAC.Write(framebuf) + fmacseed := h.ingressMAC.Sum(nil) + if _, err := io.ReadFull(conn, headbuf[:16]); err != nil { return nil, err } - if msg.Size > baseProtocolMaxMsgSize { - return nil, fmt.Errorf("message too big") + shouldMAC = updateMAC(h.ingressMAC, h.macCipher, fmacseed) + if !hmac.Equal(shouldMAC, headbuf[:16]) { + return nil, errors.New("bad frame MAC") + } + + // decrypt frame content + h.dec.XORKeyStream(framebuf, framebuf) + return framebuf[:fsize], nil +} + +// Write writes a message to the connection. +// +// Write returns the written size of the message data. This may be less than or equal to +// len(data) depending on whether snappy compression is enabled. +func (c *Conn) Write(code uint64, data []byte) (uint32, error) { + if c.handshake == nil { + panic("can't WriteMsg before handshake") } - if msg.Code == discMsg { - // Disconnect before protocol handshake is valid according to the - // spec and we send it ourself if the post-handshake checks fail. - // We can't return the reason directly, though, because it is echoed - // back otherwise. Wrap it in a string instead. - var reason [1]DiscReason - rlp.Decode(msg.Payload, &reason) - return nil, reason[0] + if len(data) > maxUint24 { + return 0, errPlainMessageTooLarge } - if msg.Code != handshakeMsg { - return nil, fmt.Errorf("expected handshake, got %x", msg.Code) + if c.snappy { + data = snappy.Encode(nil, data) } - var hs protoHandshake - if err := msg.Decode(&hs); err != nil { - return nil, err + + wireSize := uint32(len(data)) + err := c.handshake.writeFrame(c.conn, code, data) + return wireSize, err +} + +func (h *handshakeState) writeFrame(conn io.Writer, code uint64, data []byte) error { + ptype, _ := rlp.EncodeToBytes(code) + + // write header + headbuf := make([]byte, 32) + fsize := len(ptype) + len(data) + if fsize > maxUint24 { + return errPlainMessageTooLarge + } + putInt24(uint32(fsize), headbuf) + copy(headbuf[3:], zeroHeader) + h.enc.XORKeyStream(headbuf[:16], headbuf[:16]) // first half is now encrypted + + // write header MAC + copy(headbuf[16:], updateMAC(h.egressMAC, h.macCipher, headbuf[:16])) + if _, err := conn.Write(headbuf); err != nil { + return err + } + + // write encrypted frame, updating the egress MAC hash with + // the data written to conn. + tee := cipher.StreamWriter{S: h.enc, W: io.MultiWriter(conn, h.egressMAC)} + if _, err := tee.Write(ptype); err != nil { + return err + } + if _, err := tee.Write(data); err != nil { + return err } - if len(hs.ID) != 64 || !bitutil.TestBytes(hs.ID) { - return nil, DiscInvalidIdentity + if padding := fsize % 16; padding > 0 { + if _, err := tee.Write(zero16[:16-padding]); err != nil { + return err + } } - return &hs, nil + + // write frame MAC. egress MAC hash is up to date because + // frame content was written to it as well. + fmacseed := h.egressMAC.Sum(nil) + mac := updateMAC(h.egressMAC, h.macCipher, fmacseed) + _, err := conn.Write(mac) + return err +} + +func readInt24(b []byte) uint32 { + return uint32(b[2]) | uint32(b[1])<<8 | uint32(b[0])<<16 } -// doEncHandshake runs the protocol handshake using authenticated -// messages. the protocol handshake is the first authenticated message -// and also verifies whether the encryption handshake 'worked' and the -// remote side actually provided the right public key. -func (t *rlpx) doEncHandshake(prv *ecdsa.PrivateKey, dial *ecdsa.PublicKey) (*ecdsa.PublicKey, error) { +func putInt24(v uint32, b []byte) { + b[0] = byte(v >> 16) + b[1] = byte(v >> 8) + b[2] = byte(v) +} + +// updateMAC reseeds the given hash with encrypted seed. +// it returns the first 16 bytes of the hash sum after seeding. +func updateMAC(mac hash.Hash, block cipher.Block, seed []byte) []byte { + aesbuf := make([]byte, aes.BlockSize) + block.Encrypt(aesbuf, mac.Sum(nil)) + for i := range aesbuf { + aesbuf[i] ^= seed[i] + } + mac.Write(aesbuf) + return mac.Sum(nil)[:16] +} + +// Handshake performs the handshake. This must be called before any data is written +// or read from the connection. +func (c *Conn) Handshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) { var ( - sec secrets + sec Secrets err error ) - if dial == nil { - sec, err = receiverEncHandshake(t.fd, prv) + if c.dialDest != nil { + sec, err = initiatorEncHandshake(c.conn, prv, c.dialDest) } else { - sec, err = initiatorEncHandshake(t.fd, prv, dial) + sec, err = receiverEncHandshake(c.conn, prv) } if err != nil { return nil, err } - t.wmu.Lock() - t.rw = newRLPXFrameRW(t.fd, sec) - t.wmu.Unlock() - return sec.Remote.ExportECDSA(), nil + c.InitWithSecrets(sec) + return sec.remote, err +} + +// InitWithSecrets injects connection secrets as if a handshake had +// been performed. This cannot be called after the handshake. +func (c *Conn) InitWithSecrets(sec Secrets) { + if c.handshake != nil { + panic("can't handshake twice") + } + macc, err := aes.NewCipher(sec.MAC) + if err != nil { + panic("invalid MAC secret: " + err.Error()) + } + encc, err := aes.NewCipher(sec.AES) + if err != nil { + panic("invalid AES secret: " + err.Error()) + } + // we use an all-zeroes IV for AES because the key used + // for encryption is ephemeral. + iv := make([]byte, encc.BlockSize()) + c.handshake = &handshakeState{ + enc: cipher.NewCTR(encc, iv), + dec: cipher.NewCTR(encc, iv), + macCipher: macc, + egressMAC: sec.EgressMAC, + ingressMAC: sec.IngressMAC, + } +} + +// Close closes the underlying network connection. +func (c *Conn) Close() error { + return c.conn.Close() +} + +// Constants for the handshake. +const ( + maxUint24 = int(^uint32(0) >> 8) + + sskLen = 16 // ecies.MaxSharedKeyLength(pubKey) / 2 + sigLen = crypto.SignatureLength // elliptic S256 + pubLen = 64 // 512 bit pubkey in uncompressed representation without format byte + shaLen = 32 // hash length (for nonce etc) + + authMsgLen = sigLen + shaLen + pubLen + shaLen + 1 + authRespLen = pubLen + shaLen + 1 + + eciesOverhead = 65 /* pubkey */ + 16 /* IV */ + 32 /* MAC */ + + encAuthMsgLen = authMsgLen + eciesOverhead // size of encrypted pre-EIP-8 initiator handshake + encAuthRespLen = authRespLen + eciesOverhead // size of encrypted pre-EIP-8 handshake reply +) + +var ( + // this is used in place of actual frame header data. + // TODO: replace this when Msg contains the protocol type code. + zeroHeader = []byte{0xC2, 0x80, 0x80} + // sixteen zero bytes + zero16 = make([]byte, 16) + + // errPlainMessageTooLarge is returned if a decompressed message length exceeds + // the allowed 24 bits (i.e. length >= 16MB). + errPlainMessageTooLarge = errors.New("message length >= 16MB") +) + +// Secrets represents the connection secrets which are negotiated during the handshake. +type Secrets struct { + AES, MAC []byte + EgressMAC, IngressMAC hash.Hash + remote *ecdsa.PublicKey } // encHandshake contains the state of the encryption handshake. @@ -203,15 +347,6 @@ type encHandshake struct { remoteRandomPub *ecies.PublicKey // ecdhe-random-pubk } -// secrets represents the connection secrets -// which are negotiated during the encryption handshake. -type secrets struct { - Remote *ecies.PublicKey - AES, MAC []byte - EgressMAC, IngressMAC hash.Hash - Token []byte -} - // RLPx v4 handshake auth (defined in EIP-8). type authMsgV4 struct { gotPlain bool // whether read packet had plain format. @@ -235,19 +370,85 @@ type authRespV4 struct { Rest []rlp.RawValue `rlp:"tail"` } +// receiverEncHandshake negotiates a session token on conn. +// it should be called on the listening side of the connection. +// +// prv is the local client's private key. +func receiverEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey) (s Secrets, err error) { + authMsg := new(authMsgV4) + authPacket, err := readHandshakeMsg(authMsg, encAuthMsgLen, prv, conn) + if err != nil { + return s, err + } + h := new(encHandshake) + if err := h.handleAuthMsg(authMsg, prv); err != nil { + return s, err + } + + authRespMsg, err := h.makeAuthResp() + if err != nil { + return s, err + } + var authRespPacket []byte + if authMsg.gotPlain { + authRespPacket, err = authRespMsg.sealPlain(h) + } else { + authRespPacket, err = sealEIP8(authRespMsg, h) + } + if err != nil { + return s, err + } + if _, err = conn.Write(authRespPacket); err != nil { + return s, err + } + return h.secrets(authPacket, authRespPacket) +} + +func (h *encHandshake) handleAuthMsg(msg *authMsgV4, prv *ecdsa.PrivateKey) error { + // Import the remote identity. + rpub, err := importPublicKey(msg.InitiatorPubkey[:]) + if err != nil { + return err + } + h.initNonce = msg.Nonce[:] + h.remote = rpub + + // Generate random keypair for ECDH. + // If a private key is already set, use it instead of generating one (for testing). + if h.randomPrivKey == nil { + h.randomPrivKey, err = ecies.GenerateKey(rand.Reader, crypto.S256(), nil) + if err != nil { + return err + } + } + + // Check the signature. + token, err := h.staticSharedSecret(prv) + if err != nil { + return err + } + signedMsg := xor(token, h.initNonce) + remoteRandomPub, err := crypto.Ecrecover(signedMsg, msg.Signature[:]) + if err != nil { + return err + } + h.remoteRandomPub, _ = importPublicKey(remoteRandomPub) + return nil +} + // secrets is called after the handshake is completed. // It extracts the connection secrets from the handshake values. -func (h *encHandshake) secrets(auth, authResp []byte) (secrets, error) { +func (h *encHandshake) secrets(auth, authResp []byte) (Secrets, error) { ecdheSecret, err := h.randomPrivKey.GenerateShared(h.remoteRandomPub, sskLen, sskLen) if err != nil { - return secrets{}, err + return Secrets{}, err } // derive base secrets from ephemeral key agreement sharedSecret := crypto.Keccak256(ecdheSecret, crypto.Keccak256(h.respNonce, h.initNonce)) aesSecret := crypto.Keccak256(ecdheSecret, sharedSecret) - s := secrets{ - Remote: h.remote, + s := Secrets{ + remote: h.remote.ExportECDSA(), AES: aesSecret, MAC: crypto.Keccak256(ecdheSecret, aesSecret), } @@ -278,7 +479,7 @@ func (h *encHandshake) staticSharedSecret(prv *ecdsa.PrivateKey) ([]byte, error) // it should be called on the dialing side of the connection. // // prv is the local client's private key. -func initiatorEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey, remote *ecdsa.PublicKey) (s secrets, err error) { +func initiatorEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey, remote *ecdsa.PublicKey) (s Secrets, err error) { h := &encHandshake{initiator: true, remote: ecies.ImportECDSAPublic(remote)} authMsg, err := h.makeAuthMsg(prv) if err != nil { @@ -288,6 +489,7 @@ func initiatorEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey, remote *ec if err != nil { return s, err } + if _, err = conn.Write(authPacket); err != nil { return s, err } @@ -342,72 +544,6 @@ func (h *encHandshake) handleAuthResp(msg *authRespV4) (err error) { return err } -// receiverEncHandshake negotiates a session token on conn. -// it should be called on the listening side of the connection. -// -// prv is the local client's private key. -func receiverEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey) (s secrets, err error) { - authMsg := new(authMsgV4) - authPacket, err := readHandshakeMsg(authMsg, encAuthMsgLen, prv, conn) - if err != nil { - return s, err - } - h := new(encHandshake) - if err := h.handleAuthMsg(authMsg, prv); err != nil { - return s, err - } - - authRespMsg, err := h.makeAuthResp() - if err != nil { - return s, err - } - var authRespPacket []byte - if authMsg.gotPlain { - authRespPacket, err = authRespMsg.sealPlain(h) - } else { - authRespPacket, err = sealEIP8(authRespMsg, h) - } - if err != nil { - return s, err - } - if _, err = conn.Write(authRespPacket); err != nil { - return s, err - } - return h.secrets(authPacket, authRespPacket) -} - -func (h *encHandshake) handleAuthMsg(msg *authMsgV4, prv *ecdsa.PrivateKey) error { - // Import the remote identity. - rpub, err := importPublicKey(msg.InitiatorPubkey[:]) - if err != nil { - return err - } - h.initNonce = msg.Nonce[:] - h.remote = rpub - - // Generate random keypair for ECDH. - // If a private key is already set, use it instead of generating one (for testing). - if h.randomPrivKey == nil { - h.randomPrivKey, err = ecies.GenerateKey(rand.Reader, crypto.S256(), nil) - if err != nil { - return err - } - } - - // Check the signature. - token, err := h.staticSharedSecret(prv) - if err != nil { - return err - } - signedMsg := xor(token, h.initNonce) - remoteRandomPub, err := crypto.Ecrecover(signedMsg, msg.Signature[:]) - if err != nil { - return err - } - h.remoteRandomPub, _ = importPublicKey(remoteRandomPub) - return nil -} - func (h *encHandshake) makeAuthResp() (msg *authRespV4, err error) { // Generate random nonce. h.respNonce = make([]byte, shaLen) @@ -531,201 +667,3 @@ func xor(one, other []byte) (xor []byte) { } return xor } - -var ( - // this is used in place of actual frame header data. - // TODO: replace this when Msg contains the protocol type code. - zeroHeader = []byte{0xC2, 0x80, 0x80} - // sixteen zero bytes - zero16 = make([]byte, 16) -) - -// rlpxFrameRW implements a simplified version of RLPx framing. -// chunked messages are not supported and all headers are equal to -// zeroHeader. -// -// rlpxFrameRW is not safe for concurrent use from multiple goroutines. -type rlpxFrameRW struct { - conn io.ReadWriter - enc cipher.Stream - dec cipher.Stream - - macCipher cipher.Block - egressMAC hash.Hash - ingressMAC hash.Hash - - snappy bool -} - -func newRLPXFrameRW(conn io.ReadWriter, s secrets) *rlpxFrameRW { - macc, err := aes.NewCipher(s.MAC) - if err != nil { - panic("invalid MAC secret: " + err.Error()) - } - encc, err := aes.NewCipher(s.AES) - if err != nil { - panic("invalid AES secret: " + err.Error()) - } - // we use an all-zeroes IV for AES because the key used - // for encryption is ephemeral. - iv := make([]byte, encc.BlockSize()) - return &rlpxFrameRW{ - conn: conn, - enc: cipher.NewCTR(encc, iv), - dec: cipher.NewCTR(encc, iv), - macCipher: macc, - egressMAC: s.EgressMAC, - ingressMAC: s.IngressMAC, - } -} - -func (rw *rlpxFrameRW) WriteMsg(msg Msg) error { - ptype, _ := rlp.EncodeToBytes(msg.Code) - - // if snappy is enabled, compress message now - if rw.snappy { - if msg.Size > maxUint24 { - return errPlainMessageTooLarge - } - payload, _ := ioutil.ReadAll(msg.Payload) - payload = snappy.Encode(nil, payload) - - msg.Payload = bytes.NewReader(payload) - msg.Size = uint32(len(payload)) - } - msg.meterSize = msg.Size - if metrics.Enabled && msg.meterCap.Name != "" { // don't meter non-subprotocol messages - m := fmt.Sprintf("%s/%s/%d/%#02x", egressMeterName, msg.meterCap.Name, msg.meterCap.Version, msg.meterCode) - metrics.GetOrRegisterMeter(m, nil).Mark(int64(msg.meterSize)) - metrics.GetOrRegisterMeter(m+"/packets", nil).Mark(1) - } - // write header - headbuf := make([]byte, 32) - fsize := uint32(len(ptype)) + msg.Size - if fsize > maxUint24 { - return errors.New("message size overflows uint24") - } - putInt24(fsize, headbuf) // TODO: check overflow - copy(headbuf[3:], zeroHeader) - rw.enc.XORKeyStream(headbuf[:16], headbuf[:16]) // first half is now encrypted - - // write header MAC - copy(headbuf[16:], updateMAC(rw.egressMAC, rw.macCipher, headbuf[:16])) - if _, err := rw.conn.Write(headbuf); err != nil { - return err - } - - // write encrypted frame, updating the egress MAC hash with - // the data written to conn. - tee := cipher.StreamWriter{S: rw.enc, W: io.MultiWriter(rw.conn, rw.egressMAC)} - if _, err := tee.Write(ptype); err != nil { - return err - } - if _, err := io.Copy(tee, msg.Payload); err != nil { - return err - } - if padding := fsize % 16; padding > 0 { - if _, err := tee.Write(zero16[:16-padding]); err != nil { - return err - } - } - - // write frame MAC. egress MAC hash is up to date because - // frame content was written to it as well. - fmacseed := rw.egressMAC.Sum(nil) - mac := updateMAC(rw.egressMAC, rw.macCipher, fmacseed) - _, err := rw.conn.Write(mac) - return err -} - -func (rw *rlpxFrameRW) ReadMsg() (msg Msg, err error) { - // read the header - headbuf := make([]byte, 32) - if _, err := io.ReadFull(rw.conn, headbuf); err != nil { - return msg, err - } - // verify header mac - shouldMAC := updateMAC(rw.ingressMAC, rw.macCipher, headbuf[:16]) - if !hmac.Equal(shouldMAC, headbuf[16:]) { - return msg, errors.New("bad header MAC") - } - rw.dec.XORKeyStream(headbuf[:16], headbuf[:16]) // first half is now decrypted - fsize := readInt24(headbuf) - // ignore protocol type for now - - // read the frame content - var rsize = fsize // frame size rounded up to 16 byte boundary - if padding := fsize % 16; padding > 0 { - rsize += 16 - padding - } - framebuf := make([]byte, rsize) - if _, err := io.ReadFull(rw.conn, framebuf); err != nil { - return msg, err - } - - // read and validate frame MAC. we can re-use headbuf for that. - rw.ingressMAC.Write(framebuf) - fmacseed := rw.ingressMAC.Sum(nil) - if _, err := io.ReadFull(rw.conn, headbuf[:16]); err != nil { - return msg, err - } - shouldMAC = updateMAC(rw.ingressMAC, rw.macCipher, fmacseed) - if !hmac.Equal(shouldMAC, headbuf[:16]) { - return msg, errors.New("bad frame MAC") - } - - // decrypt frame content - rw.dec.XORKeyStream(framebuf, framebuf) - - // decode message code - content := bytes.NewReader(framebuf[:fsize]) - if err := rlp.Decode(content, &msg.Code); err != nil { - return msg, err - } - msg.Size = uint32(content.Len()) - msg.meterSize = msg.Size - msg.Payload = content - - // if snappy is enabled, verify and decompress message - if rw.snappy { - payload, err := ioutil.ReadAll(msg.Payload) - if err != nil { - return msg, err - } - size, err := snappy.DecodedLen(payload) - if err != nil { - return msg, err - } - if size > int(maxUint24) { - return msg, errPlainMessageTooLarge - } - payload, err = snappy.Decode(nil, payload) - if err != nil { - return msg, err - } - msg.Size, msg.Payload = uint32(size), bytes.NewReader(payload) - } - return msg, nil -} - -// updateMAC reseeds the given hash with encrypted seed. -// it returns the first 16 bytes of the hash sum after seeding. -func updateMAC(mac hash.Hash, block cipher.Block, seed []byte) []byte { - aesbuf := make([]byte, aes.BlockSize) - block.Encrypt(aesbuf, mac.Sum(nil)) - for i := range aesbuf { - aesbuf[i] ^= seed[i] - } - mac.Write(aesbuf) - return mac.Sum(nil)[:16] -} - -func readInt24(b []byte) uint32 { - return uint32(b[2]) | uint32(b[1])<<8 | uint32(b[0])<<16 -} - -func putInt24(v uint32, b []byte) { - b[0] = byte(v >> 16) - b[1] = byte(v >> 8) - b[2] = byte(v) -} diff --git a/p2p/rlpx_test.go b/p2p/rlpx/rlpx_test.go similarity index 58% rename from p2p/rlpx_test.go rename to p2p/rlpx/rlpx_test.go index 3f686fe09f46..127a01816454 100644 --- a/p2p/rlpx_test.go +++ b/p2p/rlpx/rlpx_test.go @@ -1,4 +1,4 @@ -// Copyright 2015 The go-ethereum Authors +// Copyright 2020 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify @@ -14,298 +14,145 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -package p2p +package rlpx import ( "bytes" "crypto/ecdsa" - "crypto/rand" - "errors" + "encoding/hex" "fmt" "io" - "io/ioutil" "net" "reflect" "strings" - "sync" "testing" - "time" "github.com/davecgh/go-spew/spew" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/ecies" - "github.com/ethereum/go-ethereum/p2p/simulations/pipes" "github.com/ethereum/go-ethereum/rlp" - "golang.org/x/crypto/sha3" + "github.com/stretchr/testify/assert" ) -func TestSharedSecret(t *testing.T) { - prv0, _ := crypto.GenerateKey() // = ecdsa.GenerateKey(crypto.S256(), rand.Reader) - pub0 := &prv0.PublicKey - prv1, _ := crypto.GenerateKey() - pub1 := &prv1.PublicKey - - ss0, err := ecies.ImportECDSA(prv0).GenerateShared(ecies.ImportECDSAPublic(pub1), sskLen, sskLen) - if err != nil { - return - } - ss1, err := ecies.ImportECDSA(prv1).GenerateShared(ecies.ImportECDSAPublic(pub0), sskLen, sskLen) - if err != nil { - return - } - t.Logf("Secret:\n%v %x\n%v %x", len(ss0), ss0, len(ss0), ss1) - if !bytes.Equal(ss0, ss1) { - t.Errorf("don't match :(") - } +type message struct { + code uint64 + data []byte + err error } -func TestEncHandshake(t *testing.T) { - for i := 0; i < 10; i++ { - start := time.Now() - if err := testEncHandshake(nil); err != nil { - t.Fatalf("i=%d %v", i, err) - } - t.Logf("(without token) %d %v\n", i+1, time.Since(start)) - } - for i := 0; i < 10; i++ { - tok := make([]byte, shaLen) - rand.Reader.Read(tok) - start := time.Now() - if err := testEncHandshake(tok); err != nil { - t.Fatalf("i=%d %v", i, err) - } - t.Logf("(with token) %d %v\n", i+1, time.Since(start)) - } +func TestHandshake(t *testing.T) { + p1, p2 := createPeers(t) + p1.Close() + p2.Close() } -func testEncHandshake(token []byte) error { - type result struct { - side string - pubkey *ecdsa.PublicKey - err error - } - var ( - prv0, _ = crypto.GenerateKey() - prv1, _ = crypto.GenerateKey() - fd0, fd1 = net.Pipe() - c0, c1 = newRLPX(fd0).(*rlpx), newRLPX(fd1).(*rlpx) - output = make(chan result) - ) - - go func() { - r := result{side: "initiator"} - defer func() { output <- r }() - defer fd0.Close() +// This test checks that messages can be sent and received through WriteMsg/ReadMsg. +func TestReadWriteMsg(t *testing.T) { + peer1, peer2 := createPeers(t) + defer peer1.Close() + defer peer2.Close() - r.pubkey, r.err = c0.doEncHandshake(prv0, &prv1.PublicKey) - if r.err != nil { - return - } - if !reflect.DeepEqual(r.pubkey, &prv1.PublicKey) { - r.err = fmt.Errorf("remote pubkey mismatch: got %v, want: %v", r.pubkey, &prv1.PublicKey) - } - }() - go func() { - r := result{side: "receiver"} - defer func() { output <- r }() - defer fd1.Close() - - r.pubkey, r.err = c1.doEncHandshake(prv1, nil) - if r.err != nil { - return - } - if !reflect.DeepEqual(r.pubkey, &prv0.PublicKey) { - r.err = fmt.Errorf("remote ID mismatch: got %v, want: %v", r.pubkey, &prv0.PublicKey) - } - }() + testCode := uint64(23) + testData := []byte("test") + checkMsgReadWrite(t, peer1, peer2, testCode, testData) - // wait for results from both sides - r1, r2 := <-output, <-output - if r1.err != nil { - return fmt.Errorf("%s side error: %v", r1.side, r1.err) - } - if r2.err != nil { - return fmt.Errorf("%s side error: %v", r2.side, r2.err) - } - - // compare derived secrets - if !reflect.DeepEqual(c0.rw.egressMAC, c1.rw.ingressMAC) { - return fmt.Errorf("egress mac mismatch:\n c0.rw: %#v\n c1.rw: %#v", c0.rw.egressMAC, c1.rw.ingressMAC) - } - if !reflect.DeepEqual(c0.rw.ingressMAC, c1.rw.egressMAC) { - return fmt.Errorf("ingress mac mismatch:\n c0.rw: %#v\n c1.rw: %#v", c0.rw.ingressMAC, c1.rw.egressMAC) - } - if !reflect.DeepEqual(c0.rw.enc, c1.rw.enc) { - return fmt.Errorf("enc cipher mismatch:\n c0.rw: %#v\n c1.rw: %#v", c0.rw.enc, c1.rw.enc) - } - if !reflect.DeepEqual(c0.rw.dec, c1.rw.dec) { - return fmt.Errorf("dec cipher mismatch:\n c0.rw: %#v\n c1.rw: %#v", c0.rw.dec, c1.rw.dec) - } - return nil + t.Log("enabling snappy") + peer1.SetSnappy(true) + peer2.SetSnappy(true) + checkMsgReadWrite(t, peer1, peer2, testCode, testData) } -func TestProtocolHandshake(t *testing.T) { - var ( - prv0, _ = crypto.GenerateKey() - pub0 = crypto.FromECDSAPub(&prv0.PublicKey)[1:] - hs0 = &protoHandshake{Version: 3, ID: pub0, Caps: []Cap{{"a", 0}, {"b", 2}}} - - prv1, _ = crypto.GenerateKey() - pub1 = crypto.FromECDSAPub(&prv1.PublicKey)[1:] - hs1 = &protoHandshake{Version: 3, ID: pub1, Caps: []Cap{{"c", 1}, {"d", 3}}} - - wg sync.WaitGroup - ) +func checkMsgReadWrite(t *testing.T, p1, p2 *Conn, msgCode uint64, msgData []byte) { + // Set up the reader. + ch := make(chan message, 1) + go func() { + var msg message + msg.code, msg.data, _, msg.err = p1.Read() + ch <- msg + }() - fd0, fd1, err := pipes.TCPPipe() + // Write the message. + _, err := p2.Write(msgCode, msgData) if err != nil { t.Fatal(err) } - wg.Add(2) - go func() { - defer wg.Done() - defer fd0.Close() - rlpx := newRLPX(fd0) - rpubkey, err := rlpx.doEncHandshake(prv0, &prv1.PublicKey) - if err != nil { - t.Errorf("dial side enc handshake failed: %v", err) - return - } - if !reflect.DeepEqual(rpubkey, &prv1.PublicKey) { - t.Errorf("dial side remote pubkey mismatch: got %v, want %v", rpubkey, &prv1.PublicKey) - return - } + // Check it was received correctly. + msg := <-ch + assert.Equal(t, msgCode, msg.code, "wrong message code returned from ReadMsg") + assert.Equal(t, msgData, msg.data, "wrong message data returned from ReadMsg") +} - phs, err := rlpx.doProtoHandshake(hs0) - if err != nil { - t.Errorf("dial side proto handshake error: %v", err) - return - } - phs.Rest = nil - if !reflect.DeepEqual(phs, hs1) { - t.Errorf("dial side proto handshake mismatch:\ngot: %s\nwant: %s\n", spew.Sdump(phs), spew.Sdump(hs1)) - return - } - rlpx.close(DiscQuitting) - }() - go func() { - defer wg.Done() - defer fd1.Close() - rlpx := newRLPX(fd1) - rpubkey, err := rlpx.doEncHandshake(prv1, nil) - if err != nil { - t.Errorf("listen side enc handshake failed: %v", err) - return - } - if !reflect.DeepEqual(rpubkey, &prv0.PublicKey) { - t.Errorf("listen side remote pubkey mismatch: got %v, want %v", rpubkey, &prv0.PublicKey) - return - } +func createPeers(t *testing.T) (peer1, peer2 *Conn) { + conn1, conn2 := net.Pipe() + key1, key2 := newkey(), newkey() + peer1 = NewConn(conn1, &key2.PublicKey) // dialer + peer2 = NewConn(conn2, nil) // listener + doHandshake(t, peer1, peer2, key1, key2) + return peer1, peer2 +} - phs, err := rlpx.doProtoHandshake(hs1) +func doHandshake(t *testing.T, peer1, peer2 *Conn, key1, key2 *ecdsa.PrivateKey) { + keyChan := make(chan *ecdsa.PublicKey, 1) + go func() { + pubKey, err := peer2.Handshake(key2) if err != nil { - t.Errorf("listen side proto handshake error: %v", err) - return - } - phs.Rest = nil - if !reflect.DeepEqual(phs, hs0) { - t.Errorf("listen side proto handshake mismatch:\ngot: %s\nwant: %s\n", spew.Sdump(phs), spew.Sdump(hs0)) - return - } - - if err := ExpectMsg(rlpx, discMsg, []DiscReason{DiscQuitting}); err != nil { - t.Errorf("error receiving disconnect: %v", err) + t.Errorf("peer2 could not do handshake: %v", err) } + keyChan <- pubKey }() - wg.Wait() -} -func TestProtocolHandshakeErrors(t *testing.T) { - tests := []struct { - code uint64 - msg interface{} - err error - }{ - { - code: discMsg, - msg: []DiscReason{DiscQuitting}, - err: DiscQuitting, - }, - { - code: 0x989898, - msg: []byte{1}, - err: errors.New("expected handshake, got 989898"), - }, - { - code: handshakeMsg, - msg: make([]byte, baseProtocolMaxMsgSize+2), - err: errors.New("message too big"), - }, - { - code: handshakeMsg, - msg: []byte{1, 2, 3}, - err: newPeerError(errInvalidMsg, "(code 0) (size 4) rlp: expected input list for p2p.protoHandshake"), - }, - { - code: handshakeMsg, - msg: &protoHandshake{Version: 3}, - err: DiscInvalidIdentity, - }, + pubKey2, err := peer1.Handshake(key1) + if err != nil { + t.Errorf("peer1 could not do handshake: %v", err) } + pubKey1 := <-keyChan - for i, test := range tests { - p1, p2 := MsgPipe() - go Send(p1, test.code, test.msg) - _, err := readProtocolHandshake(p2) - if !reflect.DeepEqual(err, test.err) { - t.Errorf("test %d: error mismatch: got %q, want %q", i, err, test.err) - } + // Confirm the handshake was successful. + if !reflect.DeepEqual(pubKey1, &key1.PublicKey) || !reflect.DeepEqual(pubKey2, &key2.PublicKey) { + t.Fatal("unsuccessful handshake") } } -func TestRLPXFrameFake(t *testing.T) { - buf := new(bytes.Buffer) +// This test checks the frame data of written messages. +func TestFrameReadWrite(t *testing.T) { + conn := NewConn(nil, nil) hash := fakeHash([]byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}) - rw := newRLPXFrameRW(buf, secrets{ + conn.InitWithSecrets(Secrets{ AES: crypto.Keccak256(), MAC: crypto.Keccak256(), IngressMAC: hash, EgressMAC: hash, }) + h := conn.handshake golden := unhex(` -00828ddae471818bb0bfa6b551d1cb42 -01010101010101010101010101010101 -ba628a4ba590cb43f7848f41c4382885 -01010101010101010101010101010101 -`) - - // Check WriteMsg. This puts a message into the buffer. - if err := Send(rw, 8, []uint{1, 2, 3, 4}); err != nil { + 00828ddae471818bb0bfa6b551d1cb42 + 01010101010101010101010101010101 + ba628a4ba590cb43f7848f41c4382885 + 01010101010101010101010101010101 + `) + msgCode := uint64(8) + msg := []uint{1, 2, 3, 4} + msgEnc, _ := rlp.EncodeToBytes(msg) + + // Check writeFrame. The frame that's written should be equal to the test vector. + buf := new(bytes.Buffer) + if err := h.writeFrame(buf, msgCode, msgEnc); err != nil { t.Fatalf("WriteMsg error: %v", err) } - written := buf.Bytes() - if !bytes.Equal(written, golden) { - t.Fatalf("output mismatch:\n got: %x\n want: %x", written, golden) + if !bytes.Equal(buf.Bytes(), golden) { + t.Fatalf("output mismatch:\n got: %x\n want: %x", buf.Bytes(), golden) } - // Check ReadMsg. It reads the message encoded by WriteMsg, which - // is equivalent to the golden message above. - msg, err := rw.ReadMsg() + // Check readFrame on the test vector. + content, err := h.readFrame(bytes.NewReader(golden)) if err != nil { t.Fatalf("ReadMsg error: %v", err) } - if msg.Size != 5 { - t.Errorf("msg size mismatch: got %d, want %d", msg.Size, 5) - } - if msg.Code != 8 { - t.Errorf("msg code mismatch: got %d, want %d", msg.Code, 8) - } - payload, _ := ioutil.ReadAll(msg.Payload) - wantPayload := unhex("C401020304") - if !bytes.Equal(payload, wantPayload) { - t.Errorf("msg payload mismatch:\ngot %x\nwant %x", payload, wantPayload) + wantContent := unhex("08C401020304") + if !bytes.Equal(content, wantContent) { + t.Errorf("frame content mismatch:\ngot %x\nwant %x", content, wantContent) } } @@ -314,66 +161,8 @@ type fakeHash []byte func (fakeHash) Write(p []byte) (int, error) { return len(p), nil } func (fakeHash) Reset() {} func (fakeHash) BlockSize() int { return 0 } - -func (h fakeHash) Size() int { return len(h) } -func (h fakeHash) Sum(b []byte) []byte { return append(b, h...) } - -func TestRLPXFrameRW(t *testing.T) { - var ( - aesSecret = make([]byte, 16) - macSecret = make([]byte, 16) - egressMACinit = make([]byte, 32) - ingressMACinit = make([]byte, 32) - ) - for _, s := range [][]byte{aesSecret, macSecret, egressMACinit, ingressMACinit} { - rand.Read(s) - } - conn := new(bytes.Buffer) - - s1 := secrets{ - AES: aesSecret, - MAC: macSecret, - EgressMAC: sha3.NewLegacyKeccak256(), - IngressMAC: sha3.NewLegacyKeccak256(), - } - s1.EgressMAC.Write(egressMACinit) - s1.IngressMAC.Write(ingressMACinit) - rw1 := newRLPXFrameRW(conn, s1) - - s2 := secrets{ - AES: aesSecret, - MAC: macSecret, - EgressMAC: sha3.NewLegacyKeccak256(), - IngressMAC: sha3.NewLegacyKeccak256(), - } - s2.EgressMAC.Write(ingressMACinit) - s2.IngressMAC.Write(egressMACinit) - rw2 := newRLPXFrameRW(conn, s2) - - // send some messages - for i := 0; i < 10; i++ { - // write message into conn buffer - wmsg := []interface{}{"foo", "bar", strings.Repeat("test", i)} - err := Send(rw1, uint64(i), wmsg) - if err != nil { - t.Fatalf("WriteMsg error (i=%d): %v", i, err) - } - - // read message that rw1 just wrote - msg, err := rw2.ReadMsg() - if err != nil { - t.Fatalf("ReadMsg error (i=%d): %v", i, err) - } - if msg.Code != uint64(i) { - t.Fatalf("msg code mismatch: got %d, want %d", msg.Code, i) - } - payload, _ := ioutil.ReadAll(msg.Payload) - wantPayload, _ := rlp.EncodeToBytes(wmsg) - if !bytes.Equal(payload, wantPayload) { - t.Fatalf("msg payload mismatch:\ngot %x\nwant %x", payload, wantPayload) - } - } -} +func (h fakeHash) Size() int { return len(h) } +func (h fakeHash) Sum(b []byte) []byte { return append(b, h...) } type handshakeAuthTest struct { input string @@ -598,3 +387,20 @@ func TestHandshakeForwardCompatibility(t *testing.T) { t.Errorf("ingress-mac('foo') mismatch:\ngot %x\nwant %x", fooIngressHash, wantFooIngressHash) } } + +func unhex(str string) []byte { + r := strings.NewReplacer("\t", "", " ", "", "\n", "") + b, err := hex.DecodeString(r.Replace(str)) + if err != nil { + panic(fmt.Sprintf("invalid hex string: %q", str)) + } + return b +} + +func newkey() *ecdsa.PrivateKey { + key, err := crypto.GenerateKey() + if err != nil { + panic("couldn't generate key: " + err.Error()) + } + return key +} diff --git a/p2p/server.go b/p2p/server.go index 1fe5f3978923..a343f4320aca 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -166,7 +166,7 @@ type Server struct { // Hooks for testing. These are useful because we can inhibit // the whole protocol stack. - newTransport func(net.Conn) transport + newTransport func(net.Conn, *ecdsa.PublicKey) transport newPeerHook func(*Peer) listenFunc func(network, addr string) (net.Listener, error) @@ -231,7 +231,7 @@ type conn struct { type transport interface { // The two handshakes. - doEncHandshake(prv *ecdsa.PrivateKey, dialDest *ecdsa.PublicKey) (*ecdsa.PublicKey, error) + doEncHandshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) doProtoHandshake(our *protoHandshake) (*protoHandshake, error) // The MsgReadWriter can only be used after the encryption // handshake has completed. The code uses conn.id to track this @@ -914,7 +914,13 @@ func (srv *Server) checkInboundConn(fd net.Conn, remoteIP net.IP) error { // as a peer. It returns when the connection has been added as a peer // or the handshakes have failed. func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error { - c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)} + c := &conn{fd: fd, flags: flags, cont: make(chan error)} + if dialDest == nil { + c.transport = srv.newTransport(fd, nil) + } else { + c.transport = srv.newTransport(fd, dialDest.Pubkey()) + } + err := srv.setupConn(c, flags, dialDest) if err != nil { c.close(err) @@ -943,16 +949,12 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro } // Run the RLPx handshake. - remotePubkey, err := c.doEncHandshake(srv.PrivateKey, dialPubkey) + remotePubkey, err := c.doEncHandshake(srv.PrivateKey) if err != nil { srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err) return err } if dialDest != nil { - // For dialed connections, check that the remote public key matches. - if dialPubkey.X.Cmp(remotePubkey.X) != 0 || dialPubkey.Y.Cmp(remotePubkey.Y) != 0 { - return DiscUnexpectedIdentity - } c.node = dialDest } else { c.node = nodeFromConn(remotePubkey, c.fd) diff --git a/p2p/server_test.go b/p2p/server_test.go index 7dc344a67df5..a5b3190aede2 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -18,6 +18,7 @@ package p2p import ( "crypto/ecdsa" + "crypto/sha256" "errors" "io" "math/rand" @@ -31,28 +32,27 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" - "golang.org/x/crypto/sha3" + "github.com/ethereum/go-ethereum/p2p/rlpx" ) type testTransport struct { - rpub *ecdsa.PublicKey - *rlpx - + *rlpxTransport + rpub *ecdsa.PublicKey closeErr error } -func newTestTransport(rpub *ecdsa.PublicKey, fd net.Conn) transport { - wrapped := newRLPX(fd).(*rlpx) - wrapped.rw = newRLPXFrameRW(fd, secrets{ - MAC: zero16, - AES: zero16, - IngressMAC: sha3.NewLegacyKeccak256(), - EgressMAC: sha3.NewLegacyKeccak256(), +func newTestTransport(rpub *ecdsa.PublicKey, fd net.Conn, dialDest *ecdsa.PublicKey) transport { + wrapped := newRLPX(fd, dialDest).(*rlpxTransport) + wrapped.conn.InitWithSecrets(rlpx.Secrets{ + AES: make([]byte, 16), + MAC: make([]byte, 16), + EgressMAC: sha256.New(), + IngressMAC: sha256.New(), }) - return &testTransport{rpub: rpub, rlpx: wrapped} + return &testTransport{rpub: rpub, rlpxTransport: wrapped} } -func (c *testTransport) doEncHandshake(prv *ecdsa.PrivateKey, dialDest *ecdsa.PublicKey) (*ecdsa.PublicKey, error) { +func (c *testTransport) doEncHandshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) { return c.rpub, nil } @@ -62,7 +62,7 @@ func (c *testTransport) doProtoHandshake(our *protoHandshake) (*protoHandshake, } func (c *testTransport) close(err error) { - c.rlpx.fd.Close() + c.conn.Close() c.closeErr = err } @@ -76,9 +76,11 @@ func startTestServer(t *testing.T, remoteKey *ecdsa.PublicKey, pf func(*Peer)) * Logger: testlog.Logger(t, log.LvlTrace), } server := &Server{ - Config: config, - newPeerHook: pf, - newTransport: func(fd net.Conn) transport { return newTestTransport(remoteKey, fd) }, + Config: config, + newPeerHook: pf, + newTransport: func(fd net.Conn, dialDest *ecdsa.PublicKey) transport { + return newTestTransport(remoteKey, fd, dialDest) + }, } if err := server.Start(); err != nil { t.Fatalf("Could not start server: %v", err) @@ -253,7 +255,7 @@ func TestServerAtCap(t *testing.T) { newconn := func(id enode.ID) *conn { fd, _ := net.Pipe() - tx := newTestTransport(&trustedNode.PublicKey, fd) + tx := newTestTransport(&trustedNode.PublicKey, fd, nil) node := enode.SignNull(new(enr.Record), id) return &conn{fd: fd, transport: tx, flags: inboundConn, node: node, cont: make(chan error)} } @@ -321,7 +323,7 @@ func TestServerPeerLimits(t *testing.T) { Protocols: []Protocol{discard}, Logger: testlog.Logger(t, log.LvlTrace), }, - newTransport: func(fd net.Conn) transport { return tp }, + newTransport: func(fd net.Conn, dialDest *ecdsa.PublicKey) transport { return tp }, } if err := srv.Start(); err != nil { t.Fatalf("couldn't start server: %v", err) @@ -390,13 +392,6 @@ func TestServerSetupConn(t *testing.T) { wantCalls: "doEncHandshake,close,", wantCloseErr: errors.New("read error"), }, - { - tt: &setupTransport{pubkey: clientpub}, - dialDest: enode.NewV4(&newkey().PublicKey, nil, 0, 0), - flags: dynDialedConn, - wantCalls: "doEncHandshake,close,", - wantCloseErr: DiscUnexpectedIdentity, - }, { tt: &setupTransport{pubkey: clientpub, phs: protoHandshake{ID: randomID().Bytes()}}, dialDest: enode.NewV4(clientpub, nil, 0, 0), @@ -437,7 +432,7 @@ func TestServerSetupConn(t *testing.T) { } srv := &Server{ Config: cfg, - newTransport: func(fd net.Conn) transport { return test.tt }, + newTransport: func(fd net.Conn, dialDest *ecdsa.PublicKey) transport { return test.tt }, log: cfg.Logger, } if !test.dontstart { @@ -468,7 +463,7 @@ type setupTransport struct { closeErr error } -func (c *setupTransport) doEncHandshake(prv *ecdsa.PrivateKey, dialDest *ecdsa.PublicKey) (*ecdsa.PublicKey, error) { +func (c *setupTransport) doEncHandshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) { c.calls += "doEncHandshake," return c.pubkey, c.encHandshakeErr } @@ -522,9 +517,9 @@ func TestServerInboundThrottle(t *testing.T) { Protocols: []Protocol{discard}, Logger: testlog.Logger(t, log.LvlTrace), }, - newTransport: func(fd net.Conn) transport { + newTransport: func(fd net.Conn, dialDest *ecdsa.PublicKey) transport { newTransportCalled <- struct{}{} - return newRLPX(fd) + return newRLPX(fd, dialDest) }, listenFunc: func(network, laddr string) (net.Listener, error) { fakeAddr := &net.TCPAddr{IP: net.IP{95, 33, 21, 2}, Port: 4444} diff --git a/p2p/transport.go b/p2p/transport.go new file mode 100644 index 000000000000..3f1cd7d64f20 --- /dev/null +++ b/p2p/transport.go @@ -0,0 +1,177 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package p2p + +import ( + "bytes" + "crypto/ecdsa" + "fmt" + "io" + "net" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common/bitutil" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/p2p/rlpx" + "github.com/ethereum/go-ethereum/rlp" +) + +const ( + // total timeout for encryption handshake and protocol + // handshake in both directions. + handshakeTimeout = 5 * time.Second + + // This is the timeout for sending the disconnect reason. + // This is shorter than the usual timeout because we don't want + // to wait if the connection is known to be bad anyway. + discWriteTimeout = 1 * time.Second +) + +// rlpxTransport is the transport used by actual (non-test) connections. +// It wraps an RLPx connection with locks and read/write deadlines. +type rlpxTransport struct { + rmu, wmu sync.Mutex + wbuf bytes.Buffer + conn *rlpx.Conn +} + +func newRLPX(conn net.Conn, dialDest *ecdsa.PublicKey) transport { + return &rlpxTransport{conn: rlpx.NewConn(conn, dialDest)} +} + +func (t *rlpxTransport) ReadMsg() (Msg, error) { + t.rmu.Lock() + defer t.rmu.Unlock() + + var msg Msg + t.conn.SetReadDeadline(time.Now().Add(frameReadTimeout)) + code, data, wireSize, err := t.conn.Read() + if err == nil { + msg = Msg{ + ReceivedAt: time.Now(), + Code: code, + Size: uint32(len(data)), + meterSize: uint32(wireSize), + Payload: bytes.NewReader(data), + } + } + return msg, err +} + +func (t *rlpxTransport) WriteMsg(msg Msg) error { + t.wmu.Lock() + defer t.wmu.Unlock() + + // Copy message data to write buffer. + t.wbuf.Reset() + if _, err := io.CopyN(&t.wbuf, msg.Payload, int64(msg.Size)); err != nil { + return err + } + + // Write the message. + t.conn.SetWriteDeadline(time.Now().Add(frameWriteTimeout)) + size, err := t.conn.Write(msg.Code, t.wbuf.Bytes()) + if err != nil { + return err + } + + // Set metrics. + msg.meterSize = size + if metrics.Enabled && msg.meterCap.Name != "" { // don't meter non-subprotocol messages + m := fmt.Sprintf("%s/%s/%d/%#02x", egressMeterName, msg.meterCap.Name, msg.meterCap.Version, msg.meterCode) + metrics.GetOrRegisterMeter(m, nil).Mark(int64(msg.meterSize)) + metrics.GetOrRegisterMeter(m+"/packets", nil).Mark(1) + } + return nil +} + +func (t *rlpxTransport) close(err error) { + t.wmu.Lock() + defer t.wmu.Unlock() + + // Tell the remote end why we're disconnecting if possible. + // We only bother doing this if the underlying connection supports + // setting a timeout tough. + if t.conn != nil { + if r, ok := err.(DiscReason); ok && r != DiscNetworkError { + deadline := time.Now().Add(discWriteTimeout) + if err := t.conn.SetWriteDeadline(deadline); err == nil { + // Connection supports write deadline. + t.wbuf.Reset() + rlp.Encode(&t.wbuf, []DiscReason{r}) + t.conn.Write(discMsg, t.wbuf.Bytes()) + } + } + } + t.conn.Close() +} + +func (t *rlpxTransport) doEncHandshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) { + t.conn.SetDeadline(time.Now().Add(handshakeTimeout)) + return t.conn.Handshake(prv) +} + +func (t *rlpxTransport) doProtoHandshake(our *protoHandshake) (their *protoHandshake, err error) { + // Writing our handshake happens concurrently, we prefer + // returning the handshake read error. If the remote side + // disconnects us early with a valid reason, we should return it + // as the error so it can be tracked elsewhere. + werr := make(chan error, 1) + go func() { werr <- Send(t, handshakeMsg, our) }() + if their, err = readProtocolHandshake(t); err != nil { + <-werr // make sure the write terminates too + return nil, err + } + if err := <-werr; err != nil { + return nil, fmt.Errorf("write error: %v", err) + } + // If the protocol version supports Snappy encoding, upgrade immediately + t.conn.SetSnappy(their.Version >= snappyProtocolVersion) + + return their, nil +} + +func readProtocolHandshake(rw MsgReader) (*protoHandshake, error) { + msg, err := rw.ReadMsg() + if err != nil { + return nil, err + } + if msg.Size > baseProtocolMaxMsgSize { + return nil, fmt.Errorf("message too big") + } + if msg.Code == discMsg { + // Disconnect before protocol handshake is valid according to the + // spec and we send it ourself if the post-handshake checks fail. + // We can't return the reason directly, though, because it is echoed + // back otherwise. Wrap it in a string instead. + var reason [1]DiscReason + rlp.Decode(msg.Payload, &reason) + return nil, reason[0] + } + if msg.Code != handshakeMsg { + return nil, fmt.Errorf("expected handshake, got %x", msg.Code) + } + var hs protoHandshake + if err := msg.Decode(&hs); err != nil { + return nil, err + } + if len(hs.ID) != 64 || !bitutil.TestBytes(hs.ID) { + return nil, DiscInvalidIdentity + } + return &hs, nil +} diff --git a/p2p/transport_test.go b/p2p/transport_test.go new file mode 100644 index 000000000000..753ea30bf196 --- /dev/null +++ b/p2p/transport_test.go @@ -0,0 +1,148 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package p2p + +import ( + "errors" + "reflect" + "sync" + "testing" + + "github.com/davecgh/go-spew/spew" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/p2p/simulations/pipes" +) + +func TestProtocolHandshake(t *testing.T) { + var ( + prv0, _ = crypto.GenerateKey() + pub0 = crypto.FromECDSAPub(&prv0.PublicKey)[1:] + hs0 = &protoHandshake{Version: 3, ID: pub0, Caps: []Cap{{"a", 0}, {"b", 2}}} + + prv1, _ = crypto.GenerateKey() + pub1 = crypto.FromECDSAPub(&prv1.PublicKey)[1:] + hs1 = &protoHandshake{Version: 3, ID: pub1, Caps: []Cap{{"c", 1}, {"d", 3}}} + + wg sync.WaitGroup + ) + + fd0, fd1, err := pipes.TCPPipe() + if err != nil { + t.Fatal(err) + } + + wg.Add(2) + go func() { + defer wg.Done() + defer fd0.Close() + frame := newRLPX(fd0, &prv1.PublicKey) + rpubkey, err := frame.doEncHandshake(prv0) + if err != nil { + t.Errorf("dial side enc handshake failed: %v", err) + return + } + if !reflect.DeepEqual(rpubkey, &prv1.PublicKey) { + t.Errorf("dial side remote pubkey mismatch: got %v, want %v", rpubkey, &prv1.PublicKey) + return + } + + phs, err := frame.doProtoHandshake(hs0) + if err != nil { + t.Errorf("dial side proto handshake error: %v", err) + return + } + phs.Rest = nil + if !reflect.DeepEqual(phs, hs1) { + t.Errorf("dial side proto handshake mismatch:\ngot: %s\nwant: %s\n", spew.Sdump(phs), spew.Sdump(hs1)) + return + } + frame.close(DiscQuitting) + }() + go func() { + defer wg.Done() + defer fd1.Close() + rlpx := newRLPX(fd1, nil) + rpubkey, err := rlpx.doEncHandshake(prv1) + if err != nil { + t.Errorf("listen side enc handshake failed: %v", err) + return + } + if !reflect.DeepEqual(rpubkey, &prv0.PublicKey) { + t.Errorf("listen side remote pubkey mismatch: got %v, want %v", rpubkey, &prv0.PublicKey) + return + } + + phs, err := rlpx.doProtoHandshake(hs1) + if err != nil { + t.Errorf("listen side proto handshake error: %v", err) + return + } + phs.Rest = nil + if !reflect.DeepEqual(phs, hs0) { + t.Errorf("listen side proto handshake mismatch:\ngot: %s\nwant: %s\n", spew.Sdump(phs), spew.Sdump(hs0)) + return + } + + if err := ExpectMsg(rlpx, discMsg, []DiscReason{DiscQuitting}); err != nil { + t.Errorf("error receiving disconnect: %v", err) + } + }() + wg.Wait() +} + +func TestProtocolHandshakeErrors(t *testing.T) { + tests := []struct { + code uint64 + msg interface{} + err error + }{ + { + code: discMsg, + msg: []DiscReason{DiscQuitting}, + err: DiscQuitting, + }, + { + code: 0x989898, + msg: []byte{1}, + err: errors.New("expected handshake, got 989898"), + }, + { + code: handshakeMsg, + msg: make([]byte, baseProtocolMaxMsgSize+2), + err: errors.New("message too big"), + }, + { + code: handshakeMsg, + msg: []byte{1, 2, 3}, + err: newPeerError(errInvalidMsg, "(code 0) (size 4) rlp: expected input list for p2p.protoHandshake"), + }, + { + code: handshakeMsg, + msg: &protoHandshake{Version: 3}, + err: DiscInvalidIdentity, + }, + } + + for i, test := range tests { + p1, p2 := MsgPipe() + go Send(p1, test.code, test.msg) + _, err := readProtocolHandshake(p2) + if !reflect.DeepEqual(err, test.err) { + t.Errorf("test %d: error mismatch: got %q, want %q", i, err, test.err) + } + } +}