Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removes WebSockets from the network layer #3039

Merged
merged 7 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/Neo.CLI/CLI/MainService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,6 @@ public async void Start(string[] args)
NeoSystem.StartNode(new ChannelsConfig
{
Tcp = new IPEndPoint(IPAddress.Any, Settings.Default.P2P.Port),
WebSocket = new IPEndPoint(IPAddress.Any, Settings.Default.P2P.WsPort),
MinDesiredConnections = Settings.Default.P2P.MinDesiredConnections,
MaxConnections = Settings.Default.P2P.MaxConnections,
MaxConnectionsPerAddress = Settings.Default.P2P.MaxConnectionsPerAddress
Expand Down
4 changes: 1 addition & 3 deletions src/Neo.CLI/Settings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,13 @@ public StorageSettings(IConfigurationSection section)
public class P2PSettings
{
public ushort Port { get; }
public ushort WsPort { get; }
public int MinDesiredConnections { get; }
public int MaxConnections { get; }
public int MaxConnectionsPerAddress { get; }

public P2PSettings(IConfigurationSection section)
{
this.Port = ushort.Parse(section.GetValue("Port", "10333")!);
this.WsPort = ushort.Parse(section.GetValue("WsPort", "10334")!);
this.Port = ushort.Parse(section.GetValue("Port", "10333"));
this.MinDesiredConnections = section.GetValue("MinDesiredConnections", Peer.DefaultMinDesiredConnections);
this.MaxConnections = section.GetValue("MaxConnections", Peer.DefaultMaxConnections);
this.MaxConnectionsPerAddress = section.GetValue("MaxConnectionsPerAddress", 3);
Expand Down
1 change: 0 additions & 1 deletion src/Neo.CLI/config.fs.mainnet.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
},
"P2P": {
"Port": 40333,
"WsPort": 40334,
"MinDesiredConnections": 10,
"MaxConnections": 40,
"MaxConnectionsPerAddress": 3
Expand Down
1 change: 0 additions & 1 deletion src/Neo.CLI/config.fs.testnet.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
},
"P2P": {
"Port": 50333,
"WsPort": 50334,
"MinDesiredConnections": 10,
"MaxConnections": 40,
"MaxConnectionsPerAddress": 3
Expand Down
1 change: 0 additions & 1 deletion src/Neo.CLI/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
},
"P2P": {
"Port": 10333,
"WsPort": 10334,
"MinDesiredConnections": 10,
"MaxConnections": 40,
"MaxConnectionsPerAddress": 3
Expand Down
1 change: 0 additions & 1 deletion src/Neo.CLI/config.mainnet.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
},
"P2P": {
"Port": 10333,
"WsPort": 10334,
"MinDesiredConnections": 10,
"MaxConnections": 40,
"MaxConnectionsPerAddress": 3
Expand Down
1 change: 0 additions & 1 deletion src/Neo.CLI/config.testnet.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
},
"P2P": {
"Port": 20333,
"WsPort": 20334,
"MinDesiredConnections": 10,
"MaxConnections": 40,
"MaxConnectionsPerAddress": 3
Expand Down
2 changes: 2 additions & 0 deletions src/Neo/Network/P2P/Capabilities/NodeCapability.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ public static NodeCapability DeserializeFrom(ref MemoryReader reader)
NodeCapabilityType type = (NodeCapabilityType)reader.ReadByte();
NodeCapability capability = type switch
{
#pragma warning disable CS0612 // Type or member is obsolete
NodeCapabilityType.TcpServer or NodeCapabilityType.WsServer => new ServerCapability(type),
#pragma warning restore CS0612 // Type or member is obsolete
NodeCapabilityType.FullNode => new FullNodeCapability(),
_ => throw new FormatException(),
};
Expand Down
3 changes: 3 additions & 0 deletions src/Neo/Network/P2P/Capabilities/NodeCapabilityType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
// Redistribution and use in source and binary forms with or without
// modifications are permitted.

using System;

namespace Neo.Network.P2P.Capabilities
{
/// <summary>
Expand All @@ -26,6 +28,7 @@ public enum NodeCapabilityType : byte
/// <summary>
/// Indicates that the node is listening on a WebSocket port.
/// </summary>
[Obsolete]
WsServer = 0x02,

#endregion
Expand Down
2 changes: 2 additions & 0 deletions src/Neo/Network/P2P/Capabilities/ServerCapability.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ public class ServerCapability : NodeCapability
/// <param name="port">The port that the node is listening on.</param>
public ServerCapability(NodeCapabilityType type, ushort port = 0) : base(type)
{
#pragma warning disable CS0612 // Type or member is obsolete
if (type != NodeCapabilityType.TcpServer && type != NodeCapabilityType.WsServer)
#pragma warning restore CS0612 // Type or member is obsolete
{
throw new ArgumentException(nameof(type));
}
Expand Down
5 changes: 0 additions & 5 deletions src/Neo/Network/P2P/ChannelsConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@ public class ChannelsConfig
/// </summary>
public IPEndPoint Tcp { get; set; }

/// <summary>
/// Web socket configuration.
/// </summary>
public IPEndPoint WebSocket { get; set; }

/// <summary>
/// Minimum desired connections.
/// </summary>
Expand Down
39 changes: 0 additions & 39 deletions src/Neo/Network/P2P/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
using Akka.IO;
using System;
using System.Net;
using System.Net.WebSockets;
using System.Threading;

namespace Neo.Network.P2P
{
Expand Down Expand Up @@ -48,7 +46,6 @@ internal class Close { public bool Abort; }

private ICancelable timer;
private readonly IActorRef tcp;
private readonly WebSocket ws;
private bool disconnected = false;

/// <summary>
Expand All @@ -67,33 +64,9 @@ protected Connection(object connection, IPEndPoint remote, IPEndPoint local)
case IActorRef tcp:
this.tcp = tcp;
break;
case WebSocket ws:
this.ws = ws;
WsReceive();
break;
}
}

private void WsReceive()
{
byte[] buffer = new byte[512];
ws.ReceiveAsync(buffer, CancellationToken.None).PipeTo(Self,
success: p =>
{
switch (p.MessageType)
{
case WebSocketMessageType.Binary:
return new Tcp.Received(ByteString.FromBytes(buffer, 0, p.Count));
case WebSocketMessageType.Close:
return Tcp.PeerClosed.Instance;
default:
ws.Abort();
return Tcp.Aborted.Instance;
}
},
failure: ex => new Tcp.ErrorClosed(ex.Message));
}

/// <summary>
/// Disconnect from the remote node.
/// </summary>
Expand All @@ -105,10 +78,6 @@ public void Disconnect(bool abort = false)
{
tcp.Tell(abort ? Tcp.Abort.Instance : Tcp.Close.Instance);
}
else
{
ws.Abort();
}
Context.Stop(Self);
}

Expand Down Expand Up @@ -163,7 +132,6 @@ protected override void PostStop()
if (!disconnected)
tcp?.Tell(Tcp.Close.Instance);
timer.CancelIfNotNull();
ws?.Dispose();
base.PostStop();
}

Expand All @@ -177,13 +145,6 @@ protected void SendData(ByteString data)
{
tcp.Tell(Tcp.Write.Create(data, Ack.Instance));
}
else
{
ArraySegment<byte> segment = new(data.ToArray());
ws.SendAsync(segment, WebSocketMessageType.Binary, true, CancellationToken.None).PipeTo(Self,
success: () => Ack.Instance,
failure: ex => new Tcp.ErrorClosed(ex.Message));
}
}
}
}
59 changes: 1 addition & 58 deletions src/Neo/Network/P2P/Peer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@

using Akka.Actor;
using Akka.IO;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Neo.IO;
using System;
using System.Buffers.Binary;
Expand All @@ -24,8 +21,6 @@
using System.Net;
using System.Net.NetworkInformation;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Threading.Tasks;

namespace Neo.Network.P2P
{
Expand Down Expand Up @@ -62,7 +57,6 @@ public class Connect
}

private class Timer { }
private class WsConnected { public WebSocket Socket; public IPEndPoint Remote; public IPEndPoint Local; }

/// <summary>
/// The default minimum number of desired connections.
Expand All @@ -76,7 +70,6 @@ private class WsConnected { public WebSocket Socket; public IPEndPoint Remote; p

private static readonly IActorRef tcp_manager = Context.System.Tcp();
private IActorRef tcp_listener;
private IWebHost ws_host;
private ICancelable timer;

private static readonly HashSet<IPAddress> localAddresses = new();
Expand Down Expand Up @@ -109,11 +102,6 @@ private class WsConnected { public WebSocket Socket; public IPEndPoint Remote; p
/// </summary>
public int ListenerTcpPort { get; private set; }

/// <summary>
/// The port listened by the local WebSocket server.
/// </summary>
public int ListenerWsPort { get; private set; }

/// <summary>
/// Indicates the maximum number of connections with the same address.
/// </summary>
Expand Down Expand Up @@ -221,9 +209,6 @@ protected override void OnReceive(object message)
case Connect connect:
ConnectToPeer(connect.EndPoint, connect.IsTrusted);
break;
case WsConnected ws:
OnWsConnected(ws.Socket, ws.Remote, ws.Local);
break;
case Tcp.Connected connected:
OnTcpConnected(((IPEndPoint)connected.RemoteAddress).Unmap(), ((IPEndPoint)connected.LocalAddress).Unmap());
break;
Expand All @@ -242,15 +227,14 @@ protected override void OnReceive(object message)
private void OnStart(ChannelsConfig config)
{
ListenerTcpPort = config.Tcp?.Port ?? 0;
ListenerWsPort = config.WebSocket?.Port ?? 0;

MinDesiredConnections = config.MinDesiredConnections;
MaxConnections = config.MaxConnections;
MaxConnectionsPerAddress = config.MaxConnectionsPerAddress;

// schedule time to trigger `OnTimer` event every TimerMillisecondsInterval ms
timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(0, 5000, Context.Self, new Timer(), ActorRefs.NoSender);
if ((ListenerTcpPort > 0 || ListenerWsPort > 0)
if ((ListenerTcpPort > 0)
&& localAddresses.All(p => !p.IsIPv4MappedToIPv6 || IsIntranetAddress(p))
&& UPnP.Discover())
{
Expand All @@ -259,27 +243,13 @@ private void OnStart(ChannelsConfig config)
localAddresses.Add(UPnP.GetExternalIP());

if (ListenerTcpPort > 0) UPnP.ForwardPort(ListenerTcpPort, ProtocolType.Tcp, "NEO Tcp");
if (ListenerWsPort > 0) UPnP.ForwardPort(ListenerWsPort, ProtocolType.Tcp, "NEO WebSocket");
}
catch { }
}
if (ListenerTcpPort > 0)
{
tcp_manager.Tell(new Tcp.Bind(Self, config.Tcp, options: new[] { new Inet.SO.ReuseAddress(true) }));
}
if (ListenerWsPort > 0)
{
var host = "*";

if (!config.WebSocket.Address.GetAddressBytes().SequenceEqual(IPAddress.Any.GetAddressBytes()))
{
// Is not for all interfaces
host = config.WebSocket.Address.ToString();
}

ws_host = new WebHostBuilder().UseKestrel().UseUrls($"http://{host}:{ListenerWsPort}").Configure(app => app.UseWebSockets().Run(ProcessWebSocketAsync)).Build();
ws_host.Start();
}
}

/// <summary>
Expand Down Expand Up @@ -368,40 +338,13 @@ private void OnTimer()
}
}

private void OnWsConnected(WebSocket ws, IPEndPoint remote, IPEndPoint local)
{
ConnectedAddresses.TryGetValue(remote.Address, out int count);
if (count >= MaxConnectionsPerAddress)
{
ws.Abort();
}
else
{
ConnectedAddresses[remote.Address] = count + 1;
Context.ActorOf(ProtocolProps(ws, remote, local), $"connection_{Guid.NewGuid()}");
}
}

protected override void PostStop()
{
timer.CancelIfNotNull();
ws_host?.Dispose();
tcp_listener?.Tell(Tcp.Unbind.Instance);
base.PostStop();
}

private async Task ProcessWebSocketAsync(HttpContext context)
{
if (!context.WebSockets.IsWebSocketRequest) return;
WebSocket ws = await context.WebSockets.AcceptWebSocketAsync();
Self.Tell(new WsConnected
{
Socket = ws,
Remote = new IPEndPoint(context.Connection.RemoteIpAddress, context.Connection.RemotePort),
Local = new IPEndPoint(context.Connection.LocalIpAddress, context.Connection.LocalPort)
});
}

/// <summary>
/// Gets a <see cref="Akka.Actor.Props"/> object used for creating the protocol actor.
/// </summary>
Expand Down
1 change: 0 additions & 1 deletion src/Neo/Network/P2P/RemoteNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ private void OnStartProtocol()
};

if (localNode.ListenerTcpPort > 0) capabilities.Add(new ServerCapability(NodeCapabilityType.TcpServer, (ushort)localNode.ListenerTcpPort));
if (localNode.ListenerWsPort > 0) capabilities.Add(new ServerCapability(NodeCapabilityType.WsServer, (ushort)localNode.ListenerWsPort));

SendMessage(Message.Create(MessageCommand.Version, VersionPayload.Create(system.Settings.Network, LocalNode.Nonce, LocalNode.UserAgent, capabilities.ToArray())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@ public void Size_Get()
var test = new ServerCapability(NodeCapabilityType.TcpServer) { Port = 1 };
test.Size.Should().Be(3);

#pragma warning disable CS0612 // Type or member is obsolete
test = new ServerCapability(NodeCapabilityType.WsServer) { Port = 2 };
#pragma warning restore CS0612 // Type or member is obsolete
test.Size.Should().Be(3);
}

[TestMethod]
public void DeserializeAndSerialize()
{
#pragma warning disable CS0612 // Type or member is obsolete
var test = new ServerCapability(NodeCapabilityType.WsServer) { Port = 2 };
var buffer = test.ToArray();

Expand All @@ -43,6 +46,7 @@ public void DeserializeAndSerialize()
Assert.AreEqual(test.Type, clone.Type);

clone = new ServerCapability(NodeCapabilityType.WsServer, 123);
#pragma warning restore CS0612 // Type or member is obsolete
br = new MemoryReader(buffer);
((ISerializable)clone).Deserialize(ref br);

Expand All @@ -61,7 +65,7 @@ public void DeserializeAndSerialize()
_ = new ServerCapability(NodeCapabilityType.FullNode);
});

// Wrog type
// Wrong type
buffer[0] = 0xFF;
Assert.ThrowsException<FormatException>(() =>
{
Expand Down
Loading