Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.11.0 PR #67

Merged
merged 18 commits into from
Jan 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion VARIABLES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ You can configure some of BirdsiteLIVE's settings via environment variables (tho
## Instance customization

* `Instance:Name` (default: BirdsiteLIVE) the name of the instance
* `Instance:ResolveMentionsInProfiles` (default: true) to enable or disable mentions parsing in profile's description. Resolving it will consume more User's API calls since newly discovered account can also contain references to others accounts as well. On a big instance it is recommended to disable it.
* `Instance:ResolveMentionsInProfiles` (default: true) to enable or disable mentions parsing in profile's description. Resolving it will consume more User's API calls since newly discovered account can also contain references to others accounts as well. On a big instance it is recommended to disable it.
* `Instance:PublishReplies` (default: false) to enable or disable replies publishing.
2 changes: 2 additions & 0 deletions src/BirdsiteLive.Common/Settings/InstanceSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@ public class InstanceSettings
public string Domain { get; set; }
public string AdminEmail { get; set; }
public bool ResolveMentionsInProfiles { get; set; }
public bool PublishReplies { get; set; }
public int MaxUsersCapacity { get; set; }
}
}
4 changes: 4 additions & 0 deletions src/BirdsiteLive.Pipeline/BirdsiteLive.Pipeline.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,8 @@
<ProjectReference Include="..\DataAccessLayers\BirdsiteLive.DAL\BirdsiteLive.DAL.csproj" />
</ItemGroup>

<ItemGroup>
<Folder Include="Tools\" />
</ItemGroup>

</Project>
11 changes: 9 additions & 2 deletions src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -45,7 +46,13 @@ public async Task<UserWithTweetsToSync[]> ProcessAsync(SyncTwitterUser[] syncTwi
else if (tweets.Length > 0 && user.LastTweetPostedId == -1)
{
var tweetId = tweets.Last().Id;
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId);
var now = DateTime.UtcNow;
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId, now);
}
else
{
var now = DateTime.UtcNow;
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, now);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,28 @@
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using BirdsiteLive.Common.Extensions;
using BirdsiteLive.Common.Settings;
using BirdsiteLive.DAL.Contracts;
using BirdsiteLive.DAL.Models;
using BirdsiteLive.Pipeline.Contracts;
using BirdsiteLive.Pipeline.Tools;
using Microsoft.Extensions.Logging;

namespace BirdsiteLive.Pipeline.Processors
{
public class RetrieveTwitterUsersProcessor : IRetrieveTwitterUsersProcessor
{
private readonly ITwitterUserDal _twitterUserDal;
private readonly IMaxUsersNumberProvider _maxUsersNumberProvider;
private readonly ILogger<RetrieveTwitterUsersProcessor> _logger;

public int WaitFactor = 1000 * 60; //1 min

#region Ctor
public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, ILogger<RetrieveTwitterUsersProcessor> logger)
public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, IMaxUsersNumberProvider maxUsersNumberProvider, ILogger<RetrieveTwitterUsersProcessor> logger)
{
_twitterUserDal = twitterUserDal;
_maxUsersNumberProvider = maxUsersNumberProvider;
_logger = logger;
}
#endregion
Expand All @@ -33,7 +38,8 @@ public async Task GetTwitterUsersAsync(BufferBlock<SyncTwitterUser[]> twitterUse

try
{
var users = await _twitterUserDal.GetAllTwitterUsersAsync();
var maxUsersNumber = await _maxUsersNumberProvider.GetMaxUsersNumberAsync();
var users = await _twitterUserDal.GetAllTwitterUsersAsync(maxUsersNumber);

var userCount = users.Any() ? users.Length : 1;
var splitNumber = (int) Math.Ceiling(userCount / 15d);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Linq;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BirdsiteLive.DAL.Contracts;
Expand All @@ -23,7 +24,8 @@ public async Task ProcessAsync(UserWithTweetsToSync userWithTweetsToSync, Cancel
var userId = userWithTweetsToSync.User.Id;
var lastPostedTweet = userWithTweetsToSync.Tweets.Select(x => x.Id).Max();
var minimumSync = userWithTweetsToSync.Followers.Select(x => x.FollowingsSyncStatus[userId]).Min();
await _twitterUserDal.UpdateTwitterUserAsync(userId, lastPostedTweet, minimumSync);
var now = DateTime.UtcNow;
await _twitterUserDal.UpdateTwitterUserAsync(userId, lastPostedTweet, minimumSync, now);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using BirdsiteLive.Common.Settings;
using BirdsiteLive.DAL.Contracts;
using BirdsiteLive.DAL.Models;
using BirdsiteLive.Domain;
Expand All @@ -21,15 +22,17 @@ public class SendTweetsToInboxTask : ISendTweetsToInboxTask
private readonly IActivityPubService _activityPubService;
private readonly IStatusService _statusService;
private readonly IFollowersDal _followersDal;
private readonly InstanceSettings _settings;
private readonly ILogger<SendTweetsToInboxTask> _logger;


#region Ctor
public SendTweetsToInboxTask(IActivityPubService activityPubService, IStatusService statusService, IFollowersDal followersDal, ILogger<SendTweetsToInboxTask> logger)
public SendTweetsToInboxTask(IActivityPubService activityPubService, IStatusService statusService, IFollowersDal followersDal, InstanceSettings settings, ILogger<SendTweetsToInboxTask> logger)
{
_activityPubService = activityPubService;
_statusService = statusService;
_followersDal = followersDal;
_settings = settings;
_logger = logger;
}
#endregion
Expand All @@ -52,8 +55,13 @@ public async Task ExecuteAsync(IEnumerable<ExtractedTweet> tweets, Follower foll
{
try
{
var note = _statusService.GetStatus(user.Acct, tweet);
await _activityPubService.PostNewNoteActivity(note, user.Acct, tweet.Id.ToString(), follower.Host, inbox);
if (!tweet.IsReply ||
tweet.IsReply && tweet.IsThread ||
_settings.PublishReplies)
{
var note = _statusService.GetStatus(user.Acct, tweet);
await _activityPubService.PostNewNoteActivity(note, user.Acct, tweet.Id.ToString(), follower.Host, inbox);
}
}
catch (ArgumentException e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using BirdsiteLive.Common.Settings;
using BirdsiteLive.DAL.Contracts;
using BirdsiteLive.DAL.Models;
using BirdsiteLive.Domain;
Expand All @@ -20,14 +21,16 @@ public class SendTweetsToSharedInboxTask : ISendTweetsToSharedInboxTask
private readonly IStatusService _statusService;
private readonly IActivityPubService _activityPubService;
private readonly IFollowersDal _followersDal;
private readonly InstanceSettings _settings;
private readonly ILogger<SendTweetsToSharedInboxTask> _logger;

#region Ctor
public SendTweetsToSharedInboxTask(IActivityPubService activityPubService, IStatusService statusService, IFollowersDal followersDal, ILogger<SendTweetsToSharedInboxTask> logger)
public SendTweetsToSharedInboxTask(IActivityPubService activityPubService, IStatusService statusService, IFollowersDal followersDal, InstanceSettings settings, ILogger<SendTweetsToSharedInboxTask> logger)
{
_activityPubService = activityPubService;
_statusService = statusService;
_followersDal = followersDal;
_settings = settings;
_logger = logger;
}
#endregion
Expand All @@ -52,8 +55,13 @@ public async Task ExecuteAsync(ExtractedTweet[] tweets, SyncTwitterUser user, st
{
try
{
var note = _statusService.GetStatus(user.Acct, tweet);
await _activityPubService.PostNewNoteActivity(note, user.Acct, tweet.Id.ToString(), host, inbox);
if (!tweet.IsReply ||
tweet.IsReply && tweet.IsThread ||
_settings.PublishReplies)
{
var note = _statusService.GetStatus(user.Acct, tweet);
await _activityPubService.PostNewNoteActivity(note, user.Acct, tweet.Id.ToString(), host, inbox);
}
}
catch (ArgumentException e)
{
Expand All @@ -66,7 +74,7 @@ public async Task ExecuteAsync(ExtractedTweet[] tweets, SyncTwitterUser user, st
throw;
}
}

syncStatus = tweet.Id;
}
}
Expand Down
15 changes: 11 additions & 4 deletions src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@ public class StatusPublicationPipeline : IStatusPublicationPipeline
private readonly IRetrieveTweetsProcessor _retrieveTweetsProcessor;
private readonly IRetrieveFollowersProcessor _retrieveFollowersProcessor;
private readonly ISendTweetsToFollowersProcessor _sendTweetsToFollowersProcessor;
private readonly ISaveProgressionProcessor _saveProgressionProcessor;
private readonly ILogger<StatusPublicationPipeline> _logger;

#region Ctor
public StatusPublicationPipeline(IRetrieveTweetsProcessor retrieveTweetsProcessor, IRetrieveTwitterUsersProcessor retrieveTwitterAccountsProcessor, IRetrieveFollowersProcessor retrieveFollowersProcessor, ISendTweetsToFollowersProcessor sendTweetsToFollowersProcessor, ILogger<StatusPublicationPipeline> logger)
public StatusPublicationPipeline(IRetrieveTweetsProcessor retrieveTweetsProcessor, IRetrieveTwitterUsersProcessor retrieveTwitterAccountsProcessor, IRetrieveFollowersProcessor retrieveFollowersProcessor, ISendTweetsToFollowersProcessor sendTweetsToFollowersProcessor, ISaveProgressionProcessor saveProgressionProcessor, ILogger<StatusPublicationPipeline> logger)
{
_retrieveTweetsProcessor = retrieveTweetsProcessor;
_retrieveTwitterAccountsProcessor = retrieveTwitterAccountsProcessor;
_retrieveFollowersProcessor = retrieveFollowersProcessor;
_sendTweetsToFollowersProcessor = sendTweetsToFollowersProcessor;
_saveProgressionProcessor = saveProgressionProcessor;

_logger = logger;
}
#endregion
Expand All @@ -41,22 +44,26 @@ public async Task ExecuteAsync(CancellationToken ct)
var retrieveTweetsBufferBlock = new BufferBlock<UserWithTweetsToSync[]>(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct });
var retrieveFollowersBlock = new TransformManyBlock<UserWithTweetsToSync[], UserWithTweetsToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct));
var retrieveFollowersBufferBlock = new BufferBlock<UserWithTweetsToSync>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
var sendTweetsToFollowersBlock = new ActionBlock<UserWithTweetsToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct });
var sendTweetsToFollowersBlock = new TransformBlock<UserWithTweetsToSync, UserWithTweetsToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct });
var sendTweetsToFollowersBufferBlock = new BufferBlock<UserWithTweetsToSync>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
var saveProgressionBlock = new ActionBlock<UserWithTweetsToSync>(async x => await _saveProgressionProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct });

// Link pipeline
twitterUsersBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveTweetsBlock.LinkTo(retrieveTweetsBufferBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveTweetsBufferBlock.LinkTo(retrieveFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveFollowersBlock.LinkTo(retrieveFollowersBufferBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveFollowersBufferBlock.LinkTo(sendTweetsToFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true });
sendTweetsToFollowersBlock.LinkTo(sendTweetsToFollowersBufferBlock, new DataflowLinkOptions { PropagateCompletion = true });
sendTweetsToFollowersBufferBlock.LinkTo(saveProgressionBlock, new DataflowLinkOptions { PropagateCompletion = true });

// Launch twitter user retriever
var retrieveTwitterAccountsTask = _retrieveTwitterAccountsProcessor.GetTwitterUsersAsync(twitterUsersBufferBlock, ct);

// Wait
await Task.WhenAny(new[] { retrieveTwitterAccountsTask, sendTweetsToFollowersBlock.Completion });
await Task.WhenAny(new[] { retrieveTwitterAccountsTask, saveProgressionBlock.Completion });

var ex = retrieveTwitterAccountsTask.IsFaulted ? retrieveTwitterAccountsTask.Exception : sendTweetsToFollowersBlock.Completion.Exception;
var ex = retrieveTwitterAccountsTask.IsFaulted ? retrieveTwitterAccountsTask.Exception : saveProgressionBlock.Completion.Exception;
_logger.LogCritical(ex, "An error occurred, pipeline stopped");
}
}
Expand Down
49 changes: 49 additions & 0 deletions src/BirdsiteLive.Pipeline/Tools/MaxUsersNumberProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using System.Threading.Tasks;
using BirdsiteLive.Common.Settings;
using BirdsiteLive.DAL.Contracts;

namespace BirdsiteLive.Pipeline.Tools
{
public interface IMaxUsersNumberProvider
{
Task<int> GetMaxUsersNumberAsync();
}

public class MaxUsersNumberProvider : IMaxUsersNumberProvider
{
private readonly InstanceSettings _instanceSettings;
private readonly ITwitterUserDal _twitterUserDal;

private int _totalUsersCount = -1;
private int _warmUpIterations;

#region Ctor
public MaxUsersNumberProvider(InstanceSettings instanceSettings, ITwitterUserDal twitterUserDal)
{
_instanceSettings = instanceSettings;
_twitterUserDal = twitterUserDal;
}
#endregion

public async Task<int> GetMaxUsersNumberAsync()
{
// Init data
if (_totalUsersCount == -1)
{
_totalUsersCount = await _twitterUserDal.GetTwitterUsersCountAsync();
var warmUpMaxCapacity = _instanceSettings.MaxUsersCapacity / 4;
_warmUpIterations = warmUpMaxCapacity == 0 ? 0 : (int)(_totalUsersCount / (float)warmUpMaxCapacity);
}

// Return if warm up ended
if (_warmUpIterations <= 0) return _instanceSettings.MaxUsersCapacity;

// Calculate warm up value
var maxUsers = _warmUpIterations > 0
? _instanceSettings.MaxUsersCapacity / 4
: _instanceSettings.MaxUsersCapacity;
_warmUpIterations--;
return maxUsers;
}
}
}
4 changes: 3 additions & 1 deletion src/BirdsiteLive.Twitter/Extractors/TweetExtractor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ public ExtractedTweet Extract(ITweet tweet)
InReplyToAccount = tweet.InReplyToScreenName,
MessageContent = ExtractMessage(tweet),
Media = ExtractMedia(tweet.Media),
CreatedAt = tweet.CreatedAt.ToUniversalTime()
CreatedAt = tweet.CreatedAt.ToUniversalTime(),
IsReply = tweet.InReplyToUserId != null,
IsThread = tweet.InReplyToUserId != null && tweet.InReplyToUserId == tweet.CreatedBy.Id
};
return extractedTweet;
}
Expand Down
2 changes: 2 additions & 0 deletions src/BirdsiteLive.Twitter/Models/ExtractedTweet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@ public class ExtractedTweet
public ExtractedMedia[] Media { get; set; }
public DateTime CreatedAt { get; set; }
public string InReplyToAccount { get; set; }
public bool IsReply { get; set; }
public bool IsThread { get; set; }
}
}
2 changes: 1 addition & 1 deletion src/BirdsiteLive.Twitter/TwitterTweetsService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public ExtractedTweet GetTweet(long statusId)
var tweet = Tweet.GetTweet(statusId);
_statisticsHandler.CalledTweetApi();
if (tweet == null) return null; //TODO: test this
return _tweetExtractor.Extract(tweet);
return _tweetExtractor.Extract(tweet);
}

public ExtractedTweet[] GetTimeline(string username, int nberTweets, long fromTweetId = -1)
Expand Down
7 changes: 7 additions & 0 deletions src/BirdsiteLive.sln
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "BirdsiteLive.Domain.Tests",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "BirdsiteLive.Pipeline.Tests", "Tests\BirdsiteLive.Pipeline.Tests\BirdsiteLive.Pipeline.Tests.csproj", "{BF51CA81-5A7A-46F8-B4FB-861C6BE59298}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BirdsiteLive.DAL.Tests", "Tests\BirdsiteLive.DAL.Tests\BirdsiteLive.DAL.Tests.csproj", "{5A1E3EB5-6CBB-470D-8A0D-10F8C18353D5}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -101,6 +103,10 @@ Global
{BF51CA81-5A7A-46F8-B4FB-861C6BE59298}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BF51CA81-5A7A-46F8-B4FB-861C6BE59298}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BF51CA81-5A7A-46F8-B4FB-861C6BE59298}.Release|Any CPU.Build.0 = Release|Any CPU
{5A1E3EB5-6CBB-470D-8A0D-10F8C18353D5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5A1E3EB5-6CBB-470D-8A0D-10F8C18353D5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5A1E3EB5-6CBB-470D-8A0D-10F8C18353D5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5A1E3EB5-6CBB-470D-8A0D-10F8C18353D5}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -119,6 +125,7 @@ Global
{2A8CC30D-D775-47D1-9388-F72A5C32DE2A} = {DA3C160C-4811-4E26-A5AD-42B81FAF2D7C}
{F544D745-89A8-4DEA-B61C-A7E6C53C1D63} = {A32D3458-09D0-4E0A-BA4B-8C411B816B94}
{BF51CA81-5A7A-46F8-B4FB-861C6BE59298} = {A32D3458-09D0-4E0A-BA4B-8C411B816B94}
{5A1E3EB5-6CBB-470D-8A0D-10F8C18353D5} = {A32D3458-09D0-4E0A-BA4B-8C411B816B94}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {69E8DCAD-4C37-4010-858F-5F94E6FBABCE}
Expand Down
2 changes: 1 addition & 1 deletion src/BirdsiteLive/BirdsiteLive.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<TargetFramework>netcoreapp3.1</TargetFramework>
<UserSecretsId>d21486de-a812-47eb-a419-05682bb68856</UserSecretsId>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
<Version>0.10.1</Version>
<Version>0.11.0</Version>
</PropertyGroup>

<ItemGroup>
Expand Down
Loading