Skip to content

Commit

Permalink
Merge pull request #10 from Arlodotexe/PeerRoomExtensions
Browse files Browse the repository at this point in the history
  • Loading branch information
Arlodotexe authored Jul 22, 2024
2 parents 033ff6b + 4090a9c commit 2a6532d
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 16 deletions.
161 changes: 161 additions & 0 deletions src/Extensions/PeerRoomExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
using System.Collections.Specialized;
using System.Timers;
using CommunityToolkit.Diagnostics;
using Ipfs;
using Timer = System.Timers.Timer;

namespace OwlCore.Kubo.Extensions;

/// <summary>
/// A set of extensions and helpers for <see cref="PeerRoom"/>.
/// </summary>
public static class PeerRoomExtensions
{
/// <summary>
/// Waits for a specific peer to join the peer room.
/// </summary>>
/// <param name="peerRoom">The peer room to observe.</param>
/// <param name="peer">The peer to wait for.</param>
/// <param name="cancellationToken">A token that can be used to cancel the ongoing operation.</param>
/// <returns>A task containing the peer that joined the room.</returns>
public static async Task WaitForJoinAsync(this PeerRoom peerRoom, Peer peer, CancellationToken cancellationToken)
{
var enteredPeerTaskCompletionSource = new TaskCompletionSource<object?>();

void OnConnectedPeersOnCollectionChanged(object? _, NotifyCollectionChangedEventArgs args)
{
if (args.NewItems == null)
return;

if (args.NewItems.Count > 1)
throw new InvalidOperationException("Too many nodes joined the room.");

if (args.NewItems.Cast<Peer>().All(x => x.Id != peer.Id))
return;

enteredPeerTaskCompletionSource.SetResult(null);
}

peerRoom.ConnectedPeers.CollectionChanged += OnConnectedPeersOnCollectionChanged;

#if NET5_0_OR_GREATER
var joinedPeer = await enteredPeerTaskCompletionSource.Task.WaitAsync(cancellationToken);
#elif NETSTANDARD
var joinedPeer = await enteredPeerTaskCompletionSource.Task;
#endif

peerRoom.ConnectedPeers.CollectionChanged -= OnConnectedPeersOnCollectionChanged;
}

/// <summary>
/// Waits for the first peer to enter the room.
/// </summary>
/// <param name="peerRoom">The peer room to observe.</param>
/// <param name="cancellationToken">A token that can be used to cancel the ongoing operation.</param>
/// <returns>A task containing the peer that joined the room.</returns>
/// <exception cref="InvalidOperationException">Too many peers joined the room at once.</exception>
public static async Task<Peer> WaitForJoinAsync(this PeerRoom peerRoom, CancellationToken cancellationToken)
{
var enteredPeerTaskCompletionSource = new TaskCompletionSource<Peer>();

void OnConnectedPeersOnCollectionChanged(object? _, NotifyCollectionChangedEventArgs args)
{
if (args.NewItems == null)
return;

if (args.NewItems.Count > 1)
throw new InvalidOperationException("Too many nodes joined the room.");

var peer = (Peer?)args.NewItems[0];
Guard.IsNotNull(peer);
enteredPeerTaskCompletionSource.SetResult(peer);
}

peerRoom.ConnectedPeers.CollectionChanged += OnConnectedPeersOnCollectionChanged;

#if NET5_0_OR_GREATER
var joinedPeer = await enteredPeerTaskCompletionSource.Task.WaitAsync(cancellationToken);
#elif NETSTANDARD
var joinedPeer = await enteredPeerTaskCompletionSource.Task;
#endif

peerRoom.ConnectedPeers.CollectionChanged -= OnConnectedPeersOnCollectionChanged;

return joinedPeer;
}

/// <summary>
/// Waits for a specific message to be sent to the provided <paramref name="room"/>.
/// </summary>
/// <param name="room">The room to listen for messages in.</param>
/// <param name="expectedBytes">The bytes that are expected for a received message.</param>
/// <param name="cancellationToken">A token that can be used to cancel the ongoing task.</param>
/// <returns>A task containing the received message.</returns>
public static async Task<IPublishedMessage> WaitToReceiveMessageAsync(this PeerRoom room, byte[] expectedBytes, CancellationToken cancellationToken)
{
var taskCompletionSource = new TaskCompletionSource<IPublishedMessage>();

room.MessageReceived += MessageReceived;

void MessageReceived(object? sender, IPublishedMessage e)
{
if (e.DataBytes.SequenceEqual(expectedBytes))
taskCompletionSource.SetResult(e);
}

#if NET5_0_OR_GREATER
var result = await taskCompletionSource.Task.WaitAsync(cancellationToken);
#elif NETSTANDARD
var result = await taskCompletionSource.Task;
#endif

room.MessageReceived -= MessageReceived;

return result;
}

/// <summary>
/// Waits for a specific message to be sent to the provided <paramref name="room"/>.
/// </summary>
/// <param name="messageToPublish">The message to publish at the given interval.</param>
/// <param name="room">The room to listen for messages in.</param>
/// <param name="publishInterval">The interval at which the message is published to the room.</param>
/// <param name="expectedResponse">The bytes that are expected for a received message.</param>
/// <param name="cancellationToken">A token that can be used to cancel the ongoing task.</param>
/// <returns>A task containing the received message.</returns>
public static async Task<IPublishedMessage> PublishUntilMessageReceivedAsync(this PeerRoom room, byte[] messageToPublish, byte[] expectedResponse, TimeSpan publishInterval, CancellationToken cancellationToken)
{
var timer = new Timer(publishInterval.TotalMilliseconds);
var taskCompletionSource = new TaskCompletionSource<IPublishedMessage>();

#if NET5_0_OR_GREATER
var task = taskCompletionSource.Task.WaitAsync(cancellationToken);
#elif NETSTANDARD
var task = taskCompletionSource.Task;
#endif

timer.Elapsed += TimerOnElapsed;
room.MessageReceived += MessageReceived;

timer.Start();

var result = await task;

timer.Elapsed -= TimerOnElapsed;
room.MessageReceived -= MessageReceived;

return result;

void MessageReceived(object? sender, IPublishedMessage e)
{
if (e.DataBytes.SequenceEqual(expectedResponse))
taskCompletionSource.SetResult(e);
}

async void TimerOnElapsed(object? sender, ElapsedEventArgs e)
{
cancellationToken.ThrowIfCancellationRequested();
await room.PublishAsync(messageToPublish, cancellationToken);
}
}
}
14 changes: 7 additions & 7 deletions src/KuboBootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ protected virtual async Task ApplyStartupProfileSettingsAsync(CancellationToken

// Startup profiles
foreach (var profile in StartupProfiles)
RunExecutable(_kuboBinaryFile, $"config --repo-dir {RepoPath} profile apply {profile}", throwOnError: true);
RunExecutable(_kuboBinaryFile, $"config --repo-dir \"{RepoPath}\" profile apply {profile}", throwOnError: true);
}

/// <summary>
Expand All @@ -502,23 +502,23 @@ protected virtual async Task ApplyPortSettingsAsync(CancellationToken cancellati

// Port options
if (ApiUriMode == ConfigMode.OverwriteExisting)
RunExecutable(_kuboBinaryFile, $"config --repo-dir \"{RepoPath}\" Addresses.API /ip4/{ApiUri.Host}/tcp/{ApiUri.Port}", throwOnError: true);
RunExecutable(_kuboBinaryFile, $"config Addresses.API /ip4/{ApiUri.Host}/tcp/{ApiUri.Port} --repo-dir \"{RepoPath}\"", throwOnError: true);

if (GatewayUriMode == ConfigMode.OverwriteExisting)
RunExecutable(_kuboBinaryFile, $"config --repo-dir \"{RepoPath}\" Addresses.Gateway /ip4/{GatewayUri.Host}/tcp/{GatewayUri.Port}", throwOnError: true);
RunExecutable(_kuboBinaryFile, $"config Addresses.Gateway /ip4/{GatewayUri.Host}/tcp/{GatewayUri.Port} --repo-dir \"{RepoPath}\"", throwOnError: true);

if (GatewayUriMode == ConfigMode.UseExisting)
{
var existingGatewayUri = await GetGatewayAsync(cancellationToken);
if (existingGatewayUri is null)
RunExecutable(_kuboBinaryFile, $"config --repo-dir \"{RepoPath}\" Addresses.Gateway /ip4/{GatewayUri.Host}/tcp/{GatewayUri.Port}", throwOnError: true);
RunExecutable(_kuboBinaryFile, $"config Addresses.Gateway /ip4/{GatewayUri.Host}/tcp/{GatewayUri.Port} --repo-dir \"{RepoPath}\"", throwOnError: true);
}

if (ApiUriMode == ConfigMode.UseExisting)
{
var existingApiUri = await GetApiAsync(cancellationToken);
if (existingApiUri is null)
RunExecutable(_kuboBinaryFile, $"config --repo-dir \"{RepoPath}\" Addresses.API /ip4/{ApiUri.Host}/tcp/{ApiUri.Port}", throwOnError: true);
RunExecutable(_kuboBinaryFile, $"config Addresses.API /ip4/{ApiUri.Host}/tcp/{ApiUri.Port} --repo-dir \"{RepoPath}\"", throwOnError: true);
}
}

Expand All @@ -530,9 +530,9 @@ protected virtual async Task ApplyRoutingSettingsAsync(CancellationToken cancell
{
Guard.IsNotNullOrWhiteSpace(_kuboBinaryFile?.Path);

RunExecutable(_kuboBinaryFile, $"config --repo-dir \"{RepoPath}\" Routing.Type {RoutingMode.ToString().ToLowerInvariant()}", throwOnError: true);
RunExecutable(_kuboBinaryFile, $"config Routing.Type {RoutingMode.ToString().ToLowerInvariant()} --repo-dir \"{RepoPath}\"", throwOnError: true);

RunExecutable(_kuboBinaryFile, $"config --repo-dir \"{RepoPath}\" Routing.AcceleratedDHTClient \"{UseAcceleratedDHTClient.ToString().ToLower()}\" --json", throwOnError: true);
RunExecutable(_kuboBinaryFile, $"config Routing.AcceleratedDHTClient \"{UseAcceleratedDHTClient.ToString().ToLower()}\" --json --repo-dir \"{RepoPath}\"", throwOnError: true);
}

/// <summary>
Expand Down
16 changes: 15 additions & 1 deletion src/OwlCore.Kubo.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,27 @@
<AllowedOutputExtensionsInPackageBuildOutputFolder>$(AllowedOutputExtensionsInPackageBuildOutputFolder);.pdb</AllowedOutputExtensionsInPackageBuildOutputFolder>

<Author>Arlo Godfrey</Author>
<Version>0.16.5</Version>
<Version>0.17.0</Version>
<Product>OwlCore</Product>
<Description>
An essential toolkit for Kubo, IPFS and the distributed web.
</Description>
<PackageLicenseFile>LICENSE.txt</PackageLicenseFile>
<PackageReleaseNotes>
--- 0.17.0 ---
[New]
Added PeerRoomExtensions.WaitForJoinAsync extension method
Added PeerRoomExtensions.WaitToReceiveMessageAsync extension method
Added PeerRoomExtensions.PublishUntilMessageReceivedAsync extension methods
Added PeerRoom.HeartbeatMessage to enable changing the heartbeat for the peer room.
Prune stale/outdated heartbeat message, made PruneStalePeersAsync public.

[Breaking]
Allow receiving messages when heartbeat is disabled.

[Fixes]
Fixed inconsistent passing of --repo-dir parameter during cli invocation

--- 0.16.5 ---
[Fixes]
Fixed serialization issues with CachedNameApi.
Expand Down
28 changes: 20 additions & 8 deletions src/PeerRoom.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class PeerRoom : IDisposable
private readonly Timer? _timer;
private readonly TimeSpan _heartbeatExpirationTime;
private readonly CancellationTokenSource _disconnectTokenSource = new();
private readonly Dictionary<MultiHash, (Peer Peer, DateTime LastSeen)> _lastSeenDates = new();
private readonly Dictionary<MultiHash, (Peer Peer, DateTime LastSeen, string HeartbeatMessage)> _lastSeenDates = new();
private readonly SemaphoreSlim _receivedMessageMutex = new(1, 1);

/// <summary>
Expand Down Expand Up @@ -62,7 +62,7 @@ public PeerRoom(Peer thisPeer, IPubSubApi pubSubApi, string topicName, TimeSpan
private async void Timer_Elapsed(object? sender, System.Timers.ElapsedEventArgs e)
{
await BroadcastHeartbeatAsync();
await PruneStalePeersAsync();
await PruneStalePeersAsync(CancellationToken.None);
}

/// <summary>
Expand All @@ -85,6 +85,11 @@ private async void Timer_Elapsed(object? sender, System.Timers.ElapsedEventArgs
/// </summary>
public string TopicName { get; }

/// <summary>
/// The string to publish for the heartbeat.
/// </summary>
public string HeartbeatMessage { get; set; } = "KuboPeerRoomHeartbeat";

/// <summary>
/// Gets or sets a boolean that indicates whether the heartbeat for this peer is enabled.
/// </summary>
Expand All @@ -100,7 +105,7 @@ private async void Timer_Elapsed(object? sender, System.Timers.ElapsedEventArgs
public async Task BroadcastHeartbeatAsync()
{
if (HeartbeatEnabled)
await _pubSubApi.PublishAsync(TopicName, "KuboPeerRoomHeartbeat", _disconnectTokenSource.Token);
await _pubSubApi.PublishAsync(TopicName, HeartbeatMessage, _disconnectTokenSource.Token);
}

/// <summary>
Expand Down Expand Up @@ -140,7 +145,7 @@ private async void ReceiveMessage(IPublishedMessage publishedMessage)

await _receivedMessageMutex.WaitAsync();

if (System.Text.Encoding.UTF8.GetString(publishedMessage.DataBytes) == "KuboPeerRoomHeartbeat" && HeartbeatEnabled)
if (System.Text.Encoding.UTF8.GetString(publishedMessage.DataBytes) == HeartbeatMessage)
{
if (!_lastSeenDates.ContainsKey(publishedMessage.Sender.Id))
{
Expand All @@ -151,7 +156,7 @@ await _syncContext.PostAsync(() =>
});
}

_lastSeenDates[publishedMessage.Sender.Id] = (publishedMessage.Sender, DateTime.Now);
_lastSeenDates[publishedMessage.Sender.Id] = (publishedMessage.Sender, DateTime.Now, HeartbeatMessage);
}
else if (ConnectedPeers.Any(x => x.Id == publishedMessage.Sender.Id))
{
Expand All @@ -161,17 +166,24 @@ await _syncContext.PostAsync(() =>
_receivedMessageMutex.Release();
}

internal async Task PruneStalePeersAsync()
/// <summary>
/// Prunes any stale peers from <see cref="ConnectedPeers"/>, including expired or outdated heartbeats.
/// </summary>
public async Task PruneStalePeersAsync(CancellationToken cancellationToken)
{
await _receivedMessageMutex.WaitAsync();
await _receivedMessageMutex.WaitAsync(cancellationToken);

var now = DateTime.Now;

foreach (var peer in ConnectedPeers.ToArray())
{
cancellationToken.ThrowIfCancellationRequested();
Guard.IsNotNull(peer.Id);

if (now - _heartbeatExpirationTime > _lastSeenDates[peer.Id].LastSeen)
var heartbeatExpired = now - _heartbeatExpirationTime > _lastSeenDates[peer.Id].LastSeen;
var heartbeatOutdated = _lastSeenDates[peer.Id].HeartbeatMessage != HeartbeatMessage;

if (heartbeatExpired || heartbeatOutdated)
{
await _syncContext.PostAsync(() => Task.FromResult(ConnectedPeers.Remove(ConnectedPeers.First(x => x.Id == peer.Id))));
_lastSeenDates.Remove(_lastSeenDates.First(x => x.Key == peer.Id).Key);
Expand Down

0 comments on commit 2a6532d

Please sign in to comment.