Skip to content

Commit

Permalink
feat: manage dead peers
Browse files Browse the repository at this point in the history
  • Loading branch information
richardschneider committed Aug 20, 2019
1 parent d1a1bff commit 58b8f14
Show file tree
Hide file tree
Showing 6 changed files with 380 additions and 4 deletions.
2 changes: 1 addition & 1 deletion src/MultiAddressBlackList.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace PeerTalk
/// <remarks>
/// Only targets that do match a filter will pass.
/// </remarks>
public class MultiAddressBlackList : ConcurrentBag<MultiAddress>, IPolicy<MultiAddress>
public class MultiAddressBlackList : List<MultiAddress>, IPolicy<MultiAddress>
{
/// <inheritdoc />
public Task<bool> IsAllowedAsync(MultiAddress target, CancellationToken cancel = default(CancellationToken))
Expand Down
201 changes: 201 additions & 0 deletions src/PeerManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
using Common.Logging;
using Ipfs;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace PeerTalk
{
/// <summary>
/// Manages the peers.
/// </summary>
/// <remarks>
/// Listens to the <see cref="Swarm"/> events to determine the state
/// of a peer.
/// </remarks>
public class PeerManager : IService
{
static ILog log = LogManager.GetLogger(typeof(PeerManager));
Thread thread;
CancellationTokenSource cancel;

/// <summary>
/// Initial time to wait before attempting a reconnection
/// to a dead peer.
/// </summary>
/// <value>
/// Defaults to 1 minute.
/// </value>
public TimeSpan InitialBackoff = TimeSpan.FromMinutes(1);

/// <summary>
/// When reached, the peer is considered permanently dead.
/// </summary>
/// <value>
/// Defaults to 64 minutes.
/// </value>
public TimeSpan MaxBackoff = TimeSpan.FromMinutes(64);

/// <summary>
/// Provides access to other peers.
/// </summary>
public Swarm Swarm { get; set; }

/// <summary>
/// The peers that are reachable.
/// </summary>
public ConcurrentDictionary<Peer, DeadPeer> DeadPeers = new ConcurrentDictionary<Peer, DeadPeer>();

/// <inheritdoc />
public Task StartAsync()
{
Swarm.ConnectionEstablished += Swarm_ConnectionEstablished;
Swarm.PeerNotReachable += Swarm_PeerNotReachable;

var thread = new Thread(Phoenix)
{
IsBackground = true
};
cancel = new CancellationTokenSource();
thread.Start();

log.Debug("started");
return Task.CompletedTask;
}

/// <inheritdoc />
public Task StopAsync()
{
Swarm.ConnectionEstablished -= Swarm_ConnectionEstablished;
Swarm.PeerNotReachable -= Swarm_PeerNotReachable;
DeadPeers.Clear();

cancel.Cancel();
cancel.Dispose();

log.Debug("stopped");
return Task.CompletedTask;
}

/// <summary>
/// Indicates that the peer can not be connected to.
/// </summary>
/// <param name="peer"></param>
public void SetNotReachable(Peer peer)
{
var dead = DeadPeers.AddOrUpdate(peer,
new DeadPeer
{
Peer = peer,
Backoff = InitialBackoff,
NextAttempt = DateTime.Now + InitialBackoff
},
(key, existing) =>
{
existing.Backoff += existing.Backoff;
existing.NextAttempt = existing.Backoff <= MaxBackoff
? DateTime.Now + existing.Backoff
: DateTime.MaxValue;
return existing;
});

Swarm.BlackList.Add($"/p2p/{peer.Id}");
if (dead.NextAttempt == DateTime.MaxValue)
{
log.DebugFormat("Dead '{0}' for {1} minutes.", dead.Peer, dead.Backoff.TotalMinutes);
}
else
{
Swarm.DeregisterPeer(dead.Peer);
log.DebugFormat("Permanently dead '{0}'.", dead.Peer);
}
}

/// <summary>
/// Indicates that the peer can be connected to.
/// </summary>
/// <param name="peer"></param>
public void SetReachable(Peer peer)
{
log.DebugFormat("Alive '{0}'.", peer);

DeadPeers.TryRemove(peer, out DeadPeer _);
Swarm.BlackList.Remove($"/p2p/{peer.Id}");
}

/// <summary>
/// Is invoked by the <see cref="Swarm"/> when a peer can not be connected to.
/// </summary>
void Swarm_PeerNotReachable(object sender, Peer peer)
{
SetNotReachable(peer);
}

/// <summary>
/// Is invoked by the <see cref="Swarm"/> when a peer is connected to.
/// </summary>
void Swarm_ConnectionEstablished(object sender, PeerConnection connection)
{
SetReachable(connection.RemotePeer);
}

/// <summary>
/// Background process to try reconnecting to a dead peer.
/// </summary>
async void Phoenix()
{
while (!cancel.IsCancellationRequested)
{
try
{
await Task.Delay(InitialBackoff);
var now = DateTime.Now;
await DeadPeers.Values
.Where(p => p.NextAttempt < now)
.ParallelForEachAsync(async dead =>
{
log.DebugFormat("Attempt reconnect to {0}", dead.Peer);
Swarm.BlackList.Remove($"/p2p/{dead.Peer.Id}");
try
{
await Swarm.ConnectAsync(dead.Peer, cancel.Token);
}
catch
{
// eat it
}
}, maxDoP: 10);
}
catch
{
// eat it.
}
}
}
}

/// <summary>
/// Information on a peer that is not reachable.
/// </summary>
public class DeadPeer
{
/// <summary>
/// The peer that does not respond.
/// </summary>
public Peer Peer { get; set; }

/// <summary>
/// How long to wait before attempting another connect.
/// </summary>
public TimeSpan Backoff { get; set; }

/// <summary>
/// When another connect should be tried.
/// </summary>
public DateTime NextAttempt { get; set; }
}
}
2 changes: 1 addition & 1 deletion src/PeerTalk.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net461;netstandard14;netstandard2</TargetFrameworks>
<TargetFrameworks>net461;netstandard2</TargetFrameworks>
<AssemblyName>PeerTalk</AssemblyName>
<RootNamespace>PeerTalk</RootNamespace>
<DocumentationFile>bin\$(Configuration)\$(TargetFramework)\$(AssemblyName).xml</DocumentationFile>
Expand Down
33 changes: 32 additions & 1 deletion src/TaskWhenAnyResult.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
Expand All @@ -10,7 +11,7 @@ namespace PeerTalk
/// <summary>
/// Some helpers for tasks.
/// </summary>
public class TaskHelper
public static class TaskHelper
{
/// <summary>
/// Gets the first result from a set of tasks.
Expand Down Expand Up @@ -62,5 +63,35 @@ public static async Task<T> WhenAnyResult<T>(
cancel.ThrowIfCancellationRequested();
throw new AggregateException("No task(s) returned a result.", exceptions);
}

/// <summary>
///
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="source"></param>
/// <param name="funcBody"></param>
/// <param name="maxDoP"></param>
/// <returns></returns>
/// <remarks>
/// Copied from https://houseofcat.io/tutorials/csharp/async/parallelforeachasync
/// </remarks>
public static Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> funcBody, int maxDoP = 4)
{
async Task AwaitPartition(IEnumerator<T> partition)
{
using (partition)
{
while (partition.MoveNext())
{ await funcBody(partition.Current); }
}
}

return Task.WhenAll(
Partitioner
.Create(source)
.GetPartitions(maxDoP)
.AsParallel()
.Select(p => AwaitPartition(p)));
}
}
}
Loading

0 comments on commit 58b8f14

Please sign in to comment.