Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TCP P2P optimization #710

Merged
merged 55 commits into from
May 6, 2019
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
8d48da7
First draft of P2P protocol
shargon Apr 16, 2019
4865963
Unit test
shargon Apr 16, 2019
cde68d4
VersionServices
shargon Apr 22, 2019
a115abb
Remove string
shargon Apr 22, 2019
5eb0592
Optimization
shargon Apr 22, 2019
89ac447
Fix test
shargon Apr 22, 2019
964bd05
Rename compression flag
shargon Apr 22, 2019
23a53a3
Fix test
shargon Apr 22, 2019
ee23666
Merge branch 'master' into p2p-optimization
vncoelho Apr 23, 2019
eec51a8
Merge branch 'master' into p2p-optimization
vncoelho Apr 25, 2019
2e8fa8f
Move magic to VersionPayload
shargon Apr 28, 2019
f518596
faster check
shargon Apr 28, 2019
96f510b
Reduce changes
shargon Apr 28, 2019
a9302dd
Capitalize enum
shargon Apr 28, 2019
2b8f13f
Relay inside flag and FullNode flag
shargon Apr 29, 2019
a8ce6dd
Update neo/Network/P2P/Payloads/VersionServices.cs
erikzhang Apr 29, 2019
e258103
Fix
shargon Apr 29, 2019
935f068
Update neo/Network/P2P/Message.cs
erikzhang Apr 29, 2019
7d55288
Allow Checksum
shargon Apr 29, 2019
fa4536b
Clean code
shargon Apr 29, 2019
055e3f9
Clean code
shargon Apr 29, 2019
7c7cd93
Fix tests
shargon Apr 29, 2019
eb18c9a
Fix and clean
shargon Apr 29, 2019
0c8cb42
Fix ut
shargon Apr 29, 2019
9d89b6f
Move checksum
shargon Apr 29, 2019
5d18814
Fix checksum and ut
shargon Apr 30, 2019
63c1c16
More fix :P
shargon Apr 30, 2019
6cf9395
Remove Checksum
shargon Apr 30, 2019
eb15554
Update Message.cs
shargon Apr 30, 2019
d08a059
Simplify `Message.Deserialize()`
erikzhang May 1, 2019
f4de5fb
Add `InventoryType.ToMessageCommand()`
erikzhang May 1, 2019
1caa825
Reorder the members of `MessageCommand`.
erikzhang May 1, 2019
dcdabd8
Lz4
shargon May 1, 2019
f0910e1
Merge remote-tracking branch 'shargon/p2p-optimization' into p2p-opti…
shargon May 1, 2019
6df5c67
Fix lz4
shargon May 1, 2019
985a5ea
Clean
shargon May 1, 2019
d072a96
Clean usings
shargon May 1, 2019
222eb2e
Simplify Lz4 compression/decompression
erikzhang May 1, 2019
3fdbf2e
Deserialize the payloads when deserializing `Message`s
erikzhang May 1, 2019
f22fa21
Speed up relay process
shargon May 2, 2019
1e14d79
Merge pull request #9 from neo-project/p2p/payload
shargon May 2, 2019
23044da
Revert "Speed up relay process"
shargon May 2, 2019
2c06bdc
Use blocks compression
erikzhang May 2, 2019
e3d3a93
Merge pull request #10 from neo-project/p2p/block-compression
shargon May 3, 2019
b46ff8a
format
erikzhang May 3, 2019
5328c33
fix connection issues
belane May 3, 2019
70abc9c
More ut
shargon May 3, 2019
6151590
Merge branch 'master' into p2p-optimization
erikzhang May 4, 2019
0682cce
Merge branch 'master' into p2p-optimization
erikzhang May 5, 2019
4b8f6c4
Merge branch 'master' into p2p-optimization
erikzhang May 5, 2019
d8ec9e5
Simplify GetBlocksPayload
shargon May 5, 2019
d0f82a1
Update GetBlocksPayload.cs
shargon May 5, 2019
982be38
Merge branch 'master' into p2p-optimization
shargon May 6, 2019
cdef64f
Merge pull request #11 from shargon/simplyfy-getblockspayload
shargon May 6, 2019
164e49a
Allow `GetBlocksPayload.Count == -1`
erikzhang May 6, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions neo.UnitTests/UT_P2PMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
using Akka.IO;
using FluentAssertions;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Neo.IO;
using Neo.Network.P2P;
using Neo.Network.P2P.Payloads;

namespace Neo.UnitTests
{
[TestClass]
public class UT_P2PMessage
{
[TestMethod]
public void Serialize_Deserialize()
{
var payload = PingPayload.Create(uint.MaxValue);
var msg = Message.Create(MessageCommand.ping, payload);
var buffer = msg.ToArray();
var copy = buffer.AsSerializable<Message>();
var payloadCopy = copy.GetPayload<PingPayload>();

copy.Command.Should().Be(MessageCommand.ping);
copy.Flags.Should().Be(MessageFlags.None);

payloadCopy.LastBlockIndex.Should().Be(payload.LastBlockIndex);
payloadCopy.Nonce.Should().Be(payload.Nonce);
payloadCopy.Timestamp.Should().Be(payload.Timestamp);
}

[TestMethod]
public void Serialize_Deserialize_ByteString()
{
var payload = PingPayload.Create(uint.MaxValue);
var msg = Message.Create(MessageCommand.ping, payload);
var buffer = ByteString.CopyFrom(msg.ToArray());
var length = Message.TryDeserialize(buffer, out var copy);

length.Should().Be(buffer.Count);

var payloadCopy = copy.GetPayload<PingPayload>();

copy.Command.Should().Be(MessageCommand.ping);
copy.Flags.Should().Be(MessageFlags.None);

payloadCopy.LastBlockIndex.Should().Be(payload.LastBlockIndex);
payloadCopy.Nonce.Should().Be(payload.Nonce);
payloadCopy.Timestamp.Should().Be(payload.Timestamp);
}

[TestMethod]
public void Compression()
{
var payload = new VersionPayload()
{
Relay = true,
UserAgent = "".PadLeft(1024, '0'),
Nonce = 1,
Port = 2,
Services = VersionServices.NodeNetwork,
StartHeight = 4,
Timestamp = 5,
Version = 6
};
var msg = Message.Create(MessageCommand.version, payload);
var buffer = msg.ToArray();

buffer.Length.Should().BeLessThan(80);

var copy = buffer.AsSerializable<Message>();
var payloadCopy = copy.GetPayload<VersionPayload>();

copy.Command.Should().Be(MessageCommand.version);
copy.Flags.Should().Be(MessageFlags.CompressedGzip);

payloadCopy.Relay.Should().Be(payload.Relay);
payloadCopy.UserAgent.Should().Be(payload.UserAgent);
payloadCopy.Nonce.Should().Be(payload.Nonce);
payloadCopy.Port.Should().Be(payload.Port);
payloadCopy.Services.Should().Be(payload.Services);
payloadCopy.StartHeight.Should().Be(payload.StartHeight);
payloadCopy.Timestamp.Should().Be(payload.Timestamp);
payloadCopy.Version.Should().Be(payload.Version);
}
}
}
2 changes: 1 addition & 1 deletion neo/Consensus/ConsensusService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ private void SendPrepareRequest()
if (context.TransactionHashes.Length > 1)
{
foreach (InvPayload payload in InvPayload.CreateGroup(InventoryType.TX, context.TransactionHashes.Skip(1).ToArray()))
localNode.Tell(Message.Create("inv", payload));
localNode.Tell(Message.Create(MessageCommand.inv, payload));
}
ChangeTimer(TimeSpan.FromSeconds((Blockchain.SecondsPerBlock << (context.ViewNumber + 1)) - (context.ViewNumber == 0 ? Blockchain.SecondsPerBlock : 0)));
}
Expand Down
33 changes: 33 additions & 0 deletions neo/Network/P2P/Helper.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,43 @@
using Neo.Network.P2P.Payloads;
using System.IO;
using System.IO.Compression;

namespace Neo.Network.P2P
{
public static class Helper
{
public static byte[] UncompressGzip(this byte[] data)
{
using (var output = new MemoryStream())
using (var input = new MemoryStream(data))
using (var gzip = new GZipStream(input, CompressionMode.Decompress))
{
int nRead;
byte[] buffer = new byte[1024];

while ((nRead = gzip.Read(buffer, 0, buffer.Length)) > 0)
{
output.Write(buffer, 0, nRead);
}

return output.ToArray();
}
}

public static byte[] CompressGzip(this byte[] data)
{
using (var stream = new MemoryStream())
{
using (var gzip = new GZipStream(stream, CompressionLevel.Optimal, true))
{
gzip.Write(data, 0, data.Length);
gzip.Flush();
}

return stream.ToArray();
}
}

public static byte[] GetHashData(this IVerifiable verifiable)
{
using (MemoryStream ms = new MemoryStream())
Expand Down
4 changes: 2 additions & 2 deletions neo/Network/P2P/LocalNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public LocalNode(NeoSystem system)
}
}

private void BroadcastMessage(string command, ISerializable payload = null)
private void BroadcastMessage(MessageCommand command, ISerializable payload = null)
{
BroadcastMessage(Message.Create(command, payload));
}
Expand Down Expand Up @@ -126,7 +126,7 @@ protected override void NeedMorePeers(int count)
count = Math.Max(count, 5);
if (ConnectedPeers.Count > 0)
{
BroadcastMessage("getaddr");
BroadcastMessage(MessageCommand.getaddr);
}
else
{
Expand Down
134 changes: 97 additions & 37 deletions neo/Network/P2P/Message.cs
Original file line number Diff line number Diff line change
@@ -1,79 +1,139 @@
using Neo.Cryptography;
using System;
using System.IO;
using Akka.IO;
using Neo.Cryptography;
using Neo.IO;
using Neo.Network.P2P.Payloads;
using System;
using System.IO;

namespace Neo.Network.P2P
{
public class Message : ISerializable
{
public const int HeaderSize = sizeof(uint) + 12 + sizeof(int) + sizeof(uint);
public const int PayloadMaxSize = 0x02000000;
public const int CompressionMinSize = 180;
public const int CompressionThreshold = 100;

public static readonly uint Magic = ProtocolSettings.Default.Magic;
public string Command;
public uint Checksum;
public MessageFlags Flags;
public MessageCommand Command;
public byte[] Payload;

public int Size => HeaderSize + Payload.Length;
private ISerializable _payload_deserialized = null;

public static Message Create(string command, ISerializable payload = null)
public int Size => 2 + IO.Helper.GetVarSize(Payload.Length) + Payload.Length;

public static Message Create(MessageCommand command, ISerializable payload = null)
{
return Create(command, payload == null ? new byte[0] : payload.ToArray());
var ret = Create(command, payload == null ? new byte[0] : payload.ToArray());
ret._payload_deserialized = payload;

return ret;
}

public static Message Create(string command, byte[] payload)
public static Message Create(MessageCommand command, byte[] payload)
{
var flags = MessageFlags.None;

// Try compression

if (payload.Length > CompressionMinSize)
{
var compressed = payload.CompressGzip();

if (compressed.Length < payload.Length - CompressionThreshold)
{
payload = compressed;
flags |= MessageFlags.CompressedGzip;
}
}

return new Message
{
Flags = flags,
Command = command,
Checksum = GetChecksum(payload),
Payload = payload
};
}

void ISerializable.Serialize(BinaryWriter writer)
{
writer.Write((byte)Flags);
writer.Write((byte)Command);
writer.WriteVarBytes(Payload);
}

void ISerializable.Deserialize(BinaryReader reader)
{
if (reader.ReadUInt32() != Magic)
throw new FormatException();
this.Command = reader.ReadFixedString(12);
uint length = reader.ReadUInt32();
if (length > PayloadMaxSize)
throw new FormatException();
this.Checksum = reader.ReadUInt32();
this.Payload = reader.ReadBytes((int)length);
if (GetChecksum(Payload) != Checksum)
throw new FormatException();
this.Flags = (MessageFlags)reader.ReadByte();
this.Command = (MessageCommand)reader.ReadByte();
var length = (int)reader.ReadVarInt(int.MaxValue);

if (length > PayloadMaxSize) throw new FormatException();
this.Payload = reader.ReadBytes(length);
}

private static uint GetChecksum(byte[] value)
public static int TryDeserialize(ByteString data, out Message msg)
{
return Crypto.Default.Hash256(value).ToUInt32(0);
msg = null;
if (data.Count < 3) return 0;

var header = data.Slice(0, 3).ToArray();
ulong length = header[2];
int payloadIndex = 3;

if (length == 0xFD)
{
if (data.Count < 5) return 0;
length = data.Slice(payloadIndex, 2).ToArray().ToUInt16(0);
payloadIndex += 2;
}
else if (length == 0xFE)
{
if (data.Count < 7) return 0;
length = data.Slice(payloadIndex, 4).ToArray().ToUInt32(0);
payloadIndex += 4;
}
else if (length == 0xFF)
{
if (data.Count < 11) return 0;
length = data.Slice(payloadIndex, 8).ToArray().ToUInt64(0);
payloadIndex += 8;
}

if (length > PayloadMaxSize) throw new FormatException();
if (data.Count < (int)length) return 0;

msg = new Message()
{
Flags = (MessageFlags)header[0],
Command = (MessageCommand)header[1],
Payload = data.Slice(payloadIndex, (int)length).ToArray()
};

return payloadIndex + (int)length;
}

public byte[] GetPayload()
{
if (this.Flags.HasFlag(MessageFlags.CompressedGzip))
{
return this.Payload.UncompressGzip();
}

return this.Payload;
}

private ISerializable _payload_deserialized = null;
public T GetPayload<T>() where T : ISerializable, new()
{
if (_payload_deserialized is null)
_payload_deserialized = Payload.AsSerializable<T>();
_payload_deserialized = GetPayload().AsSerializable<T>();
return (T)_payload_deserialized;
}

public Transaction GetTransaction()
{
if (_payload_deserialized is null)
_payload_deserialized = Transaction.DeserializeFrom(Payload);
_payload_deserialized = Transaction.DeserializeFrom(GetPayload());
return (Transaction)_payload_deserialized;
}

void ISerializable.Serialize(BinaryWriter writer)
{
writer.Write(Magic);
writer.WriteFixedString(Command, 12);
writer.Write(Payload.Length);
writer.Write(Checksum);
writer.Write(Payload);
}
}
}
}
36 changes: 36 additions & 0 deletions neo/Network/P2P/MessageCommand.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
namespace Neo.Network.P2P
{
public enum MessageCommand : byte
{
// Same value as InventoryType
tx = 0x01,
// Same value as InventoryType
block = 0x02,
mempool = 0x03,
addr = 0x04,
inv = 0x05,
headers = 0x06,
merkleblock = 0x07,
version = 0x08,
verack = 0x09,
alert = 0x0A,
reject = 0x0B,

ping = 0x10,
pong = 0x11,

getaddr = 0x20,
getblocks = 0x21,
getdata = 0x22,
getheaders = 0x23,

filteradd = 0x30,
filterclear = 0x31,
filterload = 0x32,

// Same value as InventoryType
consensus = 0xE0,

notfound = 0xFF,
}
}
11 changes: 11 additions & 0 deletions neo/Network/P2P/MessageFlags.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System;

namespace Neo.Network.P2P
{
[Flags]
public enum MessageFlags : byte
{
None = 0,
CompressedGzip = 1 << 0
}
}
Loading