Skip to content

Commit

Permalink
Merge pull request #43 from richardschneider/42-peer-addresses
Browse files Browse the repository at this point in the history
fix(Identify1): multiaddress ends with peer ID
  • Loading branch information
richardschneider authored Jul 31, 2019
2 parents e3812d0 + a964264 commit 9e40a37
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 38 deletions.
2 changes: 1 addition & 1 deletion src/PeerTalk.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Ipfs.Core" Version="0.52.1" />
<PackageReference Include="Ipfs.Core" Version="0.52.2" />
<PackageReference Include="Makaretu.Dns.Multicast" Version="0.24.0" />
<PackageReference Include="Makaretu.KBucket" Version="0.5.0" />
<PackageReference Include="Nito.AsyncEx" Version="5.0.0" />
Expand Down
86 changes: 56 additions & 30 deletions src/Protocols/Identify1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,49 +70,75 @@ public async Task<Peer> GetRemotePeer(PeerConnection connection, CancellationTok
var muxer = await connection.MuxerEstablished.Task.ConfigureAwait(false);
log.Debug("Get remote identity");
Peer remote = connection.RemotePeer;
if (remote == null)
{
remote = new Peer();
connection.RemotePeer = remote;
}

// Read the remote peer identify info.
using (var stream = await muxer.CreateStreamAsync("id", cancel).ConfigureAwait(false))
{
await connection.EstablishProtocolAsync("/multistream/", stream, cancel).ConfigureAwait(false);
await connection.EstablishProtocolAsync("/ipfs/id/", stream, cancel).ConfigureAwait(false);

var info = await ProtoBufHelper.ReadMessageAsync<Identify>(stream, cancel).ConfigureAwait(false);
if (remote == null)
{
remote = new Peer();
connection.RemotePeer = remote;
}

remote.AgentVersion = info.AgentVersion;
remote.ProtocolVersion = info.ProtocolVersion;
if (info.PublicKey == null || info.PublicKey.Length == 0)
{
throw new InvalidDataException("Public key is missing.");
}
remote.PublicKey = Convert.ToBase64String(info.PublicKey);
if (remote.Id == null)
{
remote.Id = MultiHash.ComputeHash(info.PublicKey);
}

if (info.ListenAddresses != null)
{
remote.Addresses = info.ListenAddresses
.Select(b => MultiAddress.TryCreate(b))
.Where(a => a != null)
.ToList();
}
if (remote.Addresses.Count() == 0)
log.Warn($"No listen address for {remote}");
await UpdateRemotePeerAsync(remote, stream, cancel).ConfigureAwait(false);
}

// TODO: Verify the Peer ID
// It should always contain the address we used for connections, so
// that NAT translations are maintained.
if (connection.RemoteAddress != null && !remote.Addresses.Contains(connection.RemoteAddress))
{
var addrs = remote.Addresses.ToList();
addrs.Add(connection.RemoteAddress);
remote.Addresses = addrs;
}

connection.IdentityEstablished.TrySetResult(remote);

log.Debug($"Peer id '{remote}' of {connection.RemoteAddress}");
return remote;
}

/// <summary>
/// Read the identify message and update the peer information.
/// </summary>
/// <param name="remote"></param>
/// <param name="stream"></param>
/// <param name="cancel"></param>
/// <returns></returns>
public async Task UpdateRemotePeerAsync(Peer remote, Stream stream, CancellationToken cancel)
{
var info = await ProtoBufHelper.ReadMessageAsync<Identify>(stream, cancel).ConfigureAwait(false);

remote.AgentVersion = info.AgentVersion;
remote.ProtocolVersion = info.ProtocolVersion;
if (info.PublicKey == null || info.PublicKey.Length == 0)
{
throw new InvalidDataException("Public key is missing.");
}
remote.PublicKey = Convert.ToBase64String(info.PublicKey);
if (remote.Id == null)
{
remote.Id = MultiHash.ComputeHash(info.PublicKey);
}

if (info.ListenAddresses != null)
{
remote.Addresses = info.ListenAddresses
.Select(b => MultiAddress.TryCreate(b))
.Where(a => a != null)
.Select(a => a.WithPeerId(remote.Id))
.ToList();
}
if (remote.Addresses.Count() == 0)
log.Warn($"No listen address for {remote}");

if (!remote.IsValid())
{
throw new InvalidDataException($"Invalid peer {remote}.");
}
}

[ProtoContract]
class Identify
{
Expand Down
10 changes: 7 additions & 3 deletions src/Swarm.cs
Original file line number Diff line number Diff line change
Expand Up @@ -545,16 +545,20 @@ async Task<PeerConnection> Dial(Peer remote, IEnumerable<MultiAddress> addrs, Ca
remote.Addresses = addrs;
}

// Get the addresses we can use to dial the remote.
// Get the addresses we can use to dial the remote. Filter
// out any addresses (ip and port) we are listening on.
var blackList = listeners.Keys
.Select(a => a.WithoutPeerId())
.ToArray();
var possibleAddresses = (await Task.WhenAll(addrs.Select(a => a.ResolveAsync(cancel))).ConfigureAwait(false))
.SelectMany(a => a)
.Where(a => !blackList.Contains(a.WithoutPeerId()))
.Select(a => a.WithPeerId(remote.Id))
.Distinct()
.ToArray();
// TODO: filter out self addresses and others.
if (possibleAddresses.Length == 0)
{
throw new Exception($"{remote} has no known address.");
throw new Exception($"{remote} has no known or reachable address.");
}

// Try the various addresses in parallel. The first one to complete wins.
Expand Down
84 changes: 80 additions & 4 deletions test/Protocols/Identify1Test.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ namespace PeerTalk.Protocols
public class Identitfy1Test
{
[TestMethod]
[Ignore("Not ready")]
public async Task RoundTrip()
{
var peerA = new Peer
Expand All @@ -36,18 +35,95 @@ public async Task RoundTrip()
Stream = ms
};

// Generate identify msg.
var identify = new Identify1();
await identify.ProcessMessageAsync(connection, ms);

// Process identify msg.
ms.Position = 0;
await identify.ProcessMessageAsync(connection, ms);
await identify.UpdateRemotePeerAsync(peerB, ms, CancellationToken.None);

Assert.AreEqual(peerA.AgentVersion, peerB.AgentVersion);
Assert.AreEqual(peerA.Id, peerB.Id);
Assert.AreEqual(peerA.ProtocolVersion, peerB.ProtocolVersion);
Assert.AreEqual(peerA.PublicKey, peerB.PublicKey);
Assert.AreEqual(peerA.Addresses.Count(), peerB.Addresses.Count());
Assert.AreEqual(peerA.Addresses.First(), peerB.Addresses.First());
CollectionAssert.AreEqual(peerA.Addresses.ToArray(), peerB.Addresses.ToArray());
}

[TestMethod]
public async Task InvalidPublicKey()
{
var peerA = new Peer
{
Addresses = new MultiAddress[]
{
"/ip4/127.0.0.1/tcp/4002/ipfs/QmXFX2P5ammdmXQgfqGkfswtEVFsZUJ5KeHRXQYCTdiTAb"
},
AgentVersion = "agent/1",
Id = "QmXFX2P5ammdmXQgfqGkfswtEVFsZUJ5KeHRXQYCTdiTAb",
ProtocolVersion = "protocol/1",
PublicKey = "BADSpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCfBYU9c0n28u02N/XCJY8yIsRqRVO5Zw+6kDHCremt2flHT4AaWnwGLAG9YyQJbRTvWN9nW2LK7Pv3uoIlvUSTnZEP0SXB5oZeqtxUdi6tuvcyqTIfsUSanLQucYITq8Qw3IMBzk+KpWNm98g9A/Xy30MkUS8mrBIO9pHmIZa55fvclDkTvLxjnGWA2avaBfJvHgMSTu0D2CQcmJrvwyKMhLCSIbQewZd2V7vc6gtxbRovKlrIwDTmDBXbfjbLljOuzg2yBLyYxXlozO9blpttbnOpU4kTspUVJXglmjsv7YSIJS3UKt3544l/srHbqlwC5CgOgjlwNfYPadO8kmBfAgMBAAE="
};
var peerB = new Peer
{
Id = peerA.Id
};
var ms = new MemoryStream();
var connection = new PeerConnection
{
LocalPeer = peerA,
RemotePeer = peerB,
Stream = ms
};

// Generate identify msg.
var identify = new Identify1();
await identify.ProcessMessageAsync(connection, ms);

// Process identify msg.
ms.Position = 0;
ExceptionAssert.Throws<InvalidDataException>(() =>
{
identify.UpdateRemotePeerAsync(peerB, ms, CancellationToken.None).Wait();
});
}

[TestMethod]
public async Task MustHavePublicKey()
{
var peerA = new Peer
{
Addresses = new MultiAddress[]
{
"/ip4/127.0.0.1/tcp/4002/ipfs/QmXFX2P5ammdmXQgfqGkfswtEVFsZUJ5KeHRXQYCTdiTAb"
},
AgentVersion = "agent/1",
Id = "QmXFX2P5ammdmXQgfqGkfswtEVFsZUJ5KeHRXQYCTdiTAb",
ProtocolVersion = "protocol/1",
PublicKey = ""
};
var peerB = new Peer
{
Id = peerA.Id
};
var ms = new MemoryStream();
var connection = new PeerConnection
{
LocalPeer = peerA,
RemotePeer = peerB,
Stream = ms
};

// Generate identify msg.
var identify = new Identify1();
await identify.ProcessMessageAsync(connection, ms);

// Process identify msg.
ms.Position = 0;
ExceptionAssert.Throws<InvalidDataException>(() =>
{
identify.UpdateRemotePeerAsync(peerB, ms, CancellationToken.None).Wait();
});
}
}
}
91 changes: 91 additions & 0 deletions test/SwarmTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -224,6 +225,7 @@ public async Task Connect_Disconnect_Reconnect()

var swarm = new Swarm { LocalPeer = self };
await swarm.StartAsync();
await swarm.StartListeningAsync("/ip4/127.0.0.1/tcp/0");
try
{
var remotePeer = (await swarm.ConnectAsync(peerBAddress)).RemotePeer;
Expand Down Expand Up @@ -278,6 +280,74 @@ public async Task Connect_Disconnect_Reconnect()
}
}

[TestMethod]
public async Task RemotePeer_Contains_ConnectedAddress1()
{
var peerB = new Peer
{
AgentVersion = "peerB",
Id = "QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1h",
PublicKey = "CAASXjBcMA0GCSqGSIb3DQEBAQUAA0sAMEgCQQDlTSgVLprWaXfmxDr92DJE1FP0wOexhulPqXSTsNh5ot6j+UiuMgwb0shSPKzLx9AuTolCGhnwpTBYHVhFoBErAgMBAAE="
};
var swarmB = new Swarm { LocalPeer = peerB };
await swarmB.StartAsync();
var peerBAddress = await swarmB.StartListeningAsync("/ip4/0.0.0.0/tcp/0");

var swarm = new Swarm { LocalPeer = self };
await swarm.StartAsync();
try
{
var connection = await swarm.ConnectAsync(peerBAddress);
var remote = connection.RemotePeer;
Assert.AreEqual(remote.ConnectedAddress, peerBAddress);
CollectionAssert.Contains(remote.Addresses.ToArray(), peerBAddress);
}
finally
{
await swarm.StopAsync();
await swarmB.StopAsync();
}
}

[TestMethod]
public async Task RemotePeer_Contains_ConnectedAddress2()
{
// Only works on Windows because connecting to 127.0.0.100 is allowed
// when listening on 0.0.0.0
if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
return;
}

var peerB = new Peer
{
AgentVersion = "peerB",
Id = "QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1h",
PublicKey = "CAASXjBcMA0GCSqGSIb3DQEBAQUAA0sAMEgCQQDlTSgVLprWaXfmxDr92DJE1FP0wOexhulPqXSTsNh5ot6j+UiuMgwb0shSPKzLx9AuTolCGhnwpTBYHVhFoBErAgMBAAE="
};
var swarmB = new Swarm { LocalPeer = peerB };
await swarmB.StartAsync();
var peerBAddress = await swarmB.StartListeningAsync("/ip4/0.0.0.0/tcp/0");
var peerBPort = peerBAddress.Protocols[1].Value;
Assert.IsTrue(peerB.Addresses.Count() > 0);

var swarm = new Swarm { LocalPeer = self };
await swarm.StartAsync();
try
{
MultiAddress ma = $"/ip4/127.0.0.100/tcp/{peerBPort}/ipfs/{peerB.Id}";
var connection = await swarm.ConnectAsync(ma);
var remote = connection.RemotePeer;
Assert.AreEqual(remote.ConnectedAddress, ma);
CollectionAssert.Contains(remote.Addresses.ToArray(), ma);
}
finally
{
await swarm.StopAsync();
await swarmB.StopAsync();
}
}

[TestMethod]
public async Task Connect_CancelsOnStop()
{
Expand Down Expand Up @@ -490,6 +560,27 @@ public void Connecting_To_Self()
});
}

[TestMethod]
public async Task Connecting_To_Self_Indirect()
{
var swarm = new Swarm { LocalPeer = self };
await swarm.StartAsync();
try
{
var listen = await swarm.StartListeningAsync("/ip4/127.0.0.1/tcp/0");
var bad = listen.Clone();
bad.Protocols[2].Value = "QmXFX2P5ammdmXQgfqGkfswtEVFsZUJ5KeHRXQYCTdiTAb";
ExceptionAssert.Throws<Exception>(() =>
{
swarm.ConnectAsync(bad).Wait();
});
}
finally
{
await swarm.StopAsync();
}
}

[TestMethod]
public async Task PeerDisconnected()
{
Expand Down

0 comments on commit 9e40a37

Please sign in to comment.