Skip to content

Commit

Permalink
Merge pull request #67 from NicolasConstant/develop
Browse files Browse the repository at this point in the history
0.11.0 PR
  • Loading branch information
NicolasConstant authored Jan 24, 2021
2 parents 4bfb0c2 + c048786 commit cb4883c
Show file tree
Hide file tree
Showing 37 changed files with 1,620 additions and 143 deletions.
3 changes: 2 additions & 1 deletion VARIABLES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ You can configure some of BirdsiteLIVE's settings via environment variables (tho
## Instance customization

* `Instance:Name` (default: BirdsiteLIVE) the name of the instance
* `Instance:ResolveMentionsInProfiles` (default: true) to enable or disable mentions parsing in profile's description. Resolving it will consume more User's API calls since newly discovered account can also contain references to others accounts as well. On a big instance it is recommended to disable it.
* `Instance:ResolveMentionsInProfiles` (default: true) to enable or disable mentions parsing in profile's description. Resolving it will consume more User's API calls since newly discovered account can also contain references to others accounts as well. On a big instance it is recommended to disable it.
* `Instance:PublishReplies` (default: false) to enable or disable replies publishing.
2 changes: 2 additions & 0 deletions src/BirdsiteLive.Common/Settings/InstanceSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@ public class InstanceSettings
public string Domain { get; set; }
public string AdminEmail { get; set; }
public bool ResolveMentionsInProfiles { get; set; }
public bool PublishReplies { get; set; }
public int MaxUsersCapacity { get; set; }
}
}
4 changes: 4 additions & 0 deletions src/BirdsiteLive.Pipeline/BirdsiteLive.Pipeline.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,8 @@
<ProjectReference Include="..\DataAccessLayers\BirdsiteLive.DAL\BirdsiteLive.DAL.csproj" />
</ItemGroup>

<ItemGroup>
<Folder Include="Tools\" />
</ItemGroup>

</Project>
11 changes: 9 additions & 2 deletions src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -45,7 +46,13 @@ public async Task<UserWithTweetsToSync[]> ProcessAsync(SyncTwitterUser[] syncTwi
else if (tweets.Length > 0 && user.LastTweetPostedId == -1)
{
var tweetId = tweets.Last().Id;
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId);
var now = DateTime.UtcNow;
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId, now);
}
else
{
var now = DateTime.UtcNow;
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, now);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,28 @@
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using BirdsiteLive.Common.Extensions;
using BirdsiteLive.Common.Settings;
using BirdsiteLive.DAL.Contracts;
using BirdsiteLive.DAL.Models;
using BirdsiteLive.Pipeline.Contracts;
using BirdsiteLive.Pipeline.Tools;
using Microsoft.Extensions.Logging;

namespace BirdsiteLive.Pipeline.Processors
{
public class RetrieveTwitterUsersProcessor : IRetrieveTwitterUsersProcessor
{
private readonly ITwitterUserDal _twitterUserDal;
private readonly IMaxUsersNumberProvider _maxUsersNumberProvider;
private readonly ILogger<RetrieveTwitterUsersProcessor> _logger;

public int WaitFactor = 1000 * 60; //1 min

#region Ctor
public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, ILogger<RetrieveTwitterUsersProcessor> logger)
public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, IMaxUsersNumberProvider maxUsersNumberProvider, ILogger<RetrieveTwitterUsersProcessor> logger)
{
_twitterUserDal = twitterUserDal;
_maxUsersNumberProvider = maxUsersNumberProvider;
_logger = logger;
}
#endregion
Expand All @@ -33,7 +38,8 @@ public async Task GetTwitterUsersAsync(BufferBlock<SyncTwitterUser[]> twitterUse

try
{
var users = await _twitterUserDal.GetAllTwitterUsersAsync();
var maxUsersNumber = await _maxUsersNumberProvider.GetMaxUsersNumberAsync();
var users = await _twitterUserDal.GetAllTwitterUsersAsync(maxUsersNumber);

var userCount = users.Any() ? users.Length : 1;
var splitNumber = (int) Math.Ceiling(userCount / 15d);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Linq;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BirdsiteLive.DAL.Contracts;
Expand All @@ -23,7 +24,8 @@ public async Task ProcessAsync(UserWithTweetsToSync userWithTweetsToSync, Cancel
var userId = userWithTweetsToSync.User.Id;
var lastPostedTweet = userWithTweetsToSync.Tweets.Select(x => x.Id).Max();
var minimumSync = userWithTweetsToSync.Followers.Select(x => x.FollowingsSyncStatus[userId]).Min();
await _twitterUserDal.UpdateTwitterUserAsync(userId, lastPostedTweet, minimumSync);
var now = DateTime.UtcNow;
await _twitterUserDal.UpdateTwitterUserAsync(userId, lastPostedTweet, minimumSync, now);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using BirdsiteLive.Common.Settings;
using BirdsiteLive.DAL.Contracts;
using BirdsiteLive.DAL.Models;
using BirdsiteLive.Domain;
Expand All @@ -21,15 +22,17 @@ public class SendTweetsToInboxTask : ISendTweetsToInboxTask
private readonly IActivityPubService _activityPubService;
private readonly IStatusService _statusService;
private readonly IFollowersDal _followersDal;
private readonly InstanceSettings _settings;
private readonly ILogger<SendTweetsToInboxTask> _logger;


#region Ctor
public SendTweetsToInboxTask(IActivityPubService activityPubService, IStatusService statusService, IFollowersDal followersDal, ILogger<SendTweetsToInboxTask> logger)
public SendTweetsToInboxTask(IActivityPubService activityPubService, IStatusService statusService, IFollowersDal followersDal, InstanceSettings settings, ILogger<SendTweetsToInboxTask> logger)
{
_activityPubService = activityPubService;
_statusService = statusService;
_followersDal = followersDal;
_settings = settings;
_logger = logger;
}
#endregion
Expand All @@ -52,8 +55,13 @@ public async Task ExecuteAsync(IEnumerable<ExtractedTweet> tweets, Follower foll
{
try
{
var note = _statusService.GetStatus(user.Acct, tweet);
await _activityPubService.PostNewNoteActivity(note, user.Acct, tweet.Id.ToString(), follower.Host, inbox);
if (!tweet.IsReply ||
tweet.IsReply && tweet.IsThread ||
_settings.PublishReplies)
{
var note = _statusService.GetStatus(user.Acct, tweet);
await _activityPubService.PostNewNoteActivity(note, user.Acct, tweet.Id.ToString(), follower.Host, inbox);
}
}
catch (ArgumentException e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using BirdsiteLive.Common.Settings;
using BirdsiteLive.DAL.Contracts;
using BirdsiteLive.DAL.Models;
using BirdsiteLive.Domain;
Expand All @@ -20,14 +21,16 @@ public class SendTweetsToSharedInboxTask : ISendTweetsToSharedInboxTask
private readonly IStatusService _statusService;
private readonly IActivityPubService _activityPubService;
private readonly IFollowersDal _followersDal;
private readonly InstanceSettings _settings;
private readonly ILogger<SendTweetsToSharedInboxTask> _logger;

#region Ctor
public SendTweetsToSharedInboxTask(IActivityPubService activityPubService, IStatusService statusService, IFollowersDal followersDal, ILogger<SendTweetsToSharedInboxTask> logger)
public SendTweetsToSharedInboxTask(IActivityPubService activityPubService, IStatusService statusService, IFollowersDal followersDal, InstanceSettings settings, ILogger<SendTweetsToSharedInboxTask> logger)
{
_activityPubService = activityPubService;
_statusService = statusService;
_followersDal = followersDal;
_settings = settings;
_logger = logger;
}
#endregion
Expand All @@ -52,8 +55,13 @@ public async Task ExecuteAsync(ExtractedTweet[] tweets, SyncTwitterUser user, st
{
try
{
var note = _statusService.GetStatus(user.Acct, tweet);
await _activityPubService.PostNewNoteActivity(note, user.Acct, tweet.Id.ToString(), host, inbox);
if (!tweet.IsReply ||
tweet.IsReply && tweet.IsThread ||
_settings.PublishReplies)
{
var note = _statusService.GetStatus(user.Acct, tweet);
await _activityPubService.PostNewNoteActivity(note, user.Acct, tweet.Id.ToString(), host, inbox);
}
}
catch (ArgumentException e)
{
Expand All @@ -66,7 +74,7 @@ public async Task ExecuteAsync(ExtractedTweet[] tweets, SyncTwitterUser user, st
throw;
}
}

syncStatus = tweet.Id;
}
}
Expand Down
15 changes: 11 additions & 4 deletions src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@ public class StatusPublicationPipeline : IStatusPublicationPipeline
private readonly IRetrieveTweetsProcessor _retrieveTweetsProcessor;
private readonly IRetrieveFollowersProcessor _retrieveFollowersProcessor;
private readonly ISendTweetsToFollowersProcessor _sendTweetsToFollowersProcessor;
private readonly ISaveProgressionProcessor _saveProgressionProcessor;
private readonly ILogger<StatusPublicationPipeline> _logger;

#region Ctor
public StatusPublicationPipeline(IRetrieveTweetsProcessor retrieveTweetsProcessor, IRetrieveTwitterUsersProcessor retrieveTwitterAccountsProcessor, IRetrieveFollowersProcessor retrieveFollowersProcessor, ISendTweetsToFollowersProcessor sendTweetsToFollowersProcessor, ILogger<StatusPublicationPipeline> logger)
public StatusPublicationPipeline(IRetrieveTweetsProcessor retrieveTweetsProcessor, IRetrieveTwitterUsersProcessor retrieveTwitterAccountsProcessor, IRetrieveFollowersProcessor retrieveFollowersProcessor, ISendTweetsToFollowersProcessor sendTweetsToFollowersProcessor, ISaveProgressionProcessor saveProgressionProcessor, ILogger<StatusPublicationPipeline> logger)
{
_retrieveTweetsProcessor = retrieveTweetsProcessor;
_retrieveTwitterAccountsProcessor = retrieveTwitterAccountsProcessor;
_retrieveFollowersProcessor = retrieveFollowersProcessor;
_sendTweetsToFollowersProcessor = sendTweetsToFollowersProcessor;
_saveProgressionProcessor = saveProgressionProcessor;

_logger = logger;
}
#endregion
Expand All @@ -41,22 +44,26 @@ public async Task ExecuteAsync(CancellationToken ct)
var retrieveTweetsBufferBlock = new BufferBlock<UserWithTweetsToSync[]>(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct });
var retrieveFollowersBlock = new TransformManyBlock<UserWithTweetsToSync[], UserWithTweetsToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct));
var retrieveFollowersBufferBlock = new BufferBlock<UserWithTweetsToSync>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
var sendTweetsToFollowersBlock = new ActionBlock<UserWithTweetsToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct });
var sendTweetsToFollowersBlock = new TransformBlock<UserWithTweetsToSync, UserWithTweetsToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct });
var sendTweetsToFollowersBufferBlock = new BufferBlock<UserWithTweetsToSync>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
var saveProgressionBlock = new ActionBlock<UserWithTweetsToSync>(async x => await _saveProgressionProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct });

// Link pipeline
twitterUsersBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveTweetsBlock.LinkTo(retrieveTweetsBufferBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveTweetsBufferBlock.LinkTo(retrieveFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveFollowersBlock.LinkTo(retrieveFollowersBufferBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveFollowersBufferBlock.LinkTo(sendTweetsToFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true });
sendTweetsToFollowersBlock.LinkTo(sendTweetsToFollowersBufferBlock, new DataflowLinkOptions { PropagateCompletion = true });
sendTweetsToFollowersBufferBlock.LinkTo(saveProgressionBlock, new DataflowLinkOptions { PropagateCompletion = true });

// Launch twitter user retriever
var retrieveTwitterAccountsTask = _retrieveTwitterAccountsProcessor.GetTwitterUsersAsync(twitterUsersBufferBlock, ct);

// Wait
await Task.WhenAny(new[] { retrieveTwitterAccountsTask, sendTweetsToFollowersBlock.Completion });
await Task.WhenAny(new[] { retrieveTwitterAccountsTask, saveProgressionBlock.Completion });

var ex = retrieveTwitterAccountsTask.IsFaulted ? retrieveTwitterAccountsTask.Exception : sendTweetsToFollowersBlock.Completion.Exception;
var ex = retrieveTwitterAccountsTask.IsFaulted ? retrieveTwitterAccountsTask.Exception : saveProgressionBlock.Completion.Exception;
_logger.LogCritical(ex, "An error occurred, pipeline stopped");
}
}
Expand Down
49 changes: 49 additions & 0 deletions src/BirdsiteLive.Pipeline/Tools/MaxUsersNumberProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using System.Threading.Tasks;
using BirdsiteLive.Common.Settings;
using BirdsiteLive.DAL.Contracts;

namespace BirdsiteLive.Pipeline.Tools
{
public interface IMaxUsersNumberProvider
{
Task<int> GetMaxUsersNumberAsync();
}

public class MaxUsersNumberProvider : IMaxUsersNumberProvider
{
private readonly InstanceSettings _instanceSettings;
private readonly ITwitterUserDal _twitterUserDal;

private int _totalUsersCount = -1;
private int _warmUpIterations;

#region Ctor
public MaxUsersNumberProvider(InstanceSettings instanceSettings, ITwitterUserDal twitterUserDal)
{
_instanceSettings = instanceSettings;
_twitterUserDal = twitterUserDal;
}
#endregion

public async Task<int> GetMaxUsersNumberAsync()
{
// Init data
if (_totalUsersCount == -1)
{
_totalUsersCount = await _twitterUserDal.GetTwitterUsersCountAsync();
var warmUpMaxCapacity = _instanceSettings.MaxUsersCapacity / 4;
_warmUpIterations = warmUpMaxCapacity == 0 ? 0 : (int)(_totalUsersCount / (float)warmUpMaxCapacity);
}

// Return if warm up ended
if (_warmUpIterations <= 0) return _instanceSettings.MaxUsersCapacity;

// Calculate warm up value
var maxUsers = _warmUpIterations > 0
? _instanceSettings.MaxUsersCapacity / 4
: _instanceSettings.MaxUsersCapacity;
_warmUpIterations--;
return maxUsers;
}
}
}
4 changes: 3 additions & 1 deletion src/BirdsiteLive.Twitter/Extractors/TweetExtractor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ public ExtractedTweet Extract(ITweet tweet)
InReplyToAccount = tweet.InReplyToScreenName,
MessageContent = ExtractMessage(tweet),
Media = ExtractMedia(tweet.Media),
CreatedAt = tweet.CreatedAt.ToUniversalTime()
CreatedAt = tweet.CreatedAt.ToUniversalTime(),
IsReply = tweet.InReplyToUserId != null,
IsThread = tweet.InReplyToUserId != null && tweet.InReplyToUserId == tweet.CreatedBy.Id
};
return extractedTweet;
}
Expand Down
2 changes: 2 additions & 0 deletions src/BirdsiteLive.Twitter/Models/ExtractedTweet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@ public class ExtractedTweet
public ExtractedMedia[] Media { get; set; }
public DateTime CreatedAt { get; set; }
public string InReplyToAccount { get; set; }
public bool IsReply { get; set; }
public bool IsThread { get; set; }
}
}
2 changes: 1 addition & 1 deletion src/BirdsiteLive.Twitter/TwitterTweetsService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public ExtractedTweet GetTweet(long statusId)
var tweet = Tweet.GetTweet(statusId);
_statisticsHandler.CalledTweetApi();
if (tweet == null) return null; //TODO: test this
return _tweetExtractor.Extract(tweet);
return _tweetExtractor.Extract(tweet);
}

public ExtractedTweet[] GetTimeline(string username, int nberTweets, long fromTweetId = -1)
Expand Down
7 changes: 7 additions & 0 deletions src/BirdsiteLive.sln
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "BirdsiteLive.Domain.Tests",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "BirdsiteLive.Pipeline.Tests", "Tests\BirdsiteLive.Pipeline.Tests\BirdsiteLive.Pipeline.Tests.csproj", "{BF51CA81-5A7A-46F8-B4FB-861C6BE59298}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BirdsiteLive.DAL.Tests", "Tests\BirdsiteLive.DAL.Tests\BirdsiteLive.DAL.Tests.csproj", "{5A1E3EB5-6CBB-470D-8A0D-10F8C18353D5}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -101,6 +103,10 @@ Global
{BF51CA81-5A7A-46F8-B4FB-861C6BE59298}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BF51CA81-5A7A-46F8-B4FB-861C6BE59298}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BF51CA81-5A7A-46F8-B4FB-861C6BE59298}.Release|Any CPU.Build.0 = Release|Any CPU
{5A1E3EB5-6CBB-470D-8A0D-10F8C18353D5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5A1E3EB5-6CBB-470D-8A0D-10F8C18353D5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5A1E3EB5-6CBB-470D-8A0D-10F8C18353D5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5A1E3EB5-6CBB-470D-8A0D-10F8C18353D5}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -119,6 +125,7 @@ Global
{2A8CC30D-D775-47D1-9388-F72A5C32DE2A} = {DA3C160C-4811-4E26-A5AD-42B81FAF2D7C}
{F544D745-89A8-4DEA-B61C-A7E6C53C1D63} = {A32D3458-09D0-4E0A-BA4B-8C411B816B94}
{BF51CA81-5A7A-46F8-B4FB-861C6BE59298} = {A32D3458-09D0-4E0A-BA4B-8C411B816B94}
{5A1E3EB5-6CBB-470D-8A0D-10F8C18353D5} = {A32D3458-09D0-4E0A-BA4B-8C411B816B94}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {69E8DCAD-4C37-4010-858F-5F94E6FBABCE}
Expand Down
2 changes: 1 addition & 1 deletion src/BirdsiteLive/BirdsiteLive.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<TargetFramework>netcoreapp3.1</TargetFramework>
<UserSecretsId>d21486de-a812-47eb-a419-05682bb68856</UserSecretsId>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
<Version>0.10.1</Version>
<Version>0.11.0</Version>
</PropertyGroup>

<ItemGroup>
Expand Down
Loading

0 comments on commit cb4883c

Please sign in to comment.