Skip to content

Commit

Permalink
Merge pull request #66 from NicolasConstant/topic_add-last-sync
Browse files Browse the repository at this point in the history
Topic add last sync
  • Loading branch information
NicolasConstant authored Jan 23, 2021
2 parents 32df020 + 1f7bc89 commit 9971433
Show file tree
Hide file tree
Showing 28 changed files with 682 additions and 122 deletions.
1 change: 1 addition & 0 deletions src/BirdsiteLive.Common/Settings/InstanceSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ public class InstanceSettings
public string AdminEmail { get; set; }
public bool ResolveMentionsInProfiles { get; set; }
public bool PublishReplies { get; set; }
public int MaxUsersCapacity { get; set; }
}
}
4 changes: 4 additions & 0 deletions src/BirdsiteLive.Pipeline/BirdsiteLive.Pipeline.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,8 @@
<ProjectReference Include="..\DataAccessLayers\BirdsiteLive.DAL\BirdsiteLive.DAL.csproj" />
</ItemGroup>

<ItemGroup>
<Folder Include="Tools\" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -45,7 +46,8 @@ public async Task<UserWithTweetsToSync[]> ProcessAsync(SyncTwitterUser[] syncTwi
else if (tweets.Length > 0 && user.LastTweetPostedId == -1)
{
var tweetId = tweets.Last().Id;
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId);
var now = DateTime.UtcNow;
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId, now);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,28 @@
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using BirdsiteLive.Common.Extensions;
using BirdsiteLive.Common.Settings;
using BirdsiteLive.DAL.Contracts;
using BirdsiteLive.DAL.Models;
using BirdsiteLive.Pipeline.Contracts;
using BirdsiteLive.Pipeline.Tools;
using Microsoft.Extensions.Logging;

namespace BirdsiteLive.Pipeline.Processors
{
public class RetrieveTwitterUsersProcessor : IRetrieveTwitterUsersProcessor
{
private readonly ITwitterUserDal _twitterUserDal;
private readonly IMaxUsersNumberProvider _maxUsersNumberProvider;
private readonly ILogger<RetrieveTwitterUsersProcessor> _logger;

public int WaitFactor = 1000 * 60; //1 min

#region Ctor
public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, ILogger<RetrieveTwitterUsersProcessor> logger)
public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, IMaxUsersNumberProvider maxUsersNumberProvider, ILogger<RetrieveTwitterUsersProcessor> logger)
{
_twitterUserDal = twitterUserDal;
_maxUsersNumberProvider = maxUsersNumberProvider;
_logger = logger;
}
#endregion
Expand All @@ -33,7 +38,8 @@ public async Task GetTwitterUsersAsync(BufferBlock<SyncTwitterUser[]> twitterUse

try
{
var users = await _twitterUserDal.GetAllTwitterUsersAsync();
var maxUsersNumber = await _maxUsersNumberProvider.GetMaxUsersNumberAsync();
var users = await _twitterUserDal.GetAllTwitterUsersAsync(maxUsersNumber);

var userCount = users.Any() ? users.Length : 1;
var splitNumber = (int) Math.Ceiling(userCount / 15d);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Linq;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BirdsiteLive.DAL.Contracts;
Expand All @@ -23,7 +24,8 @@ public async Task ProcessAsync(UserWithTweetsToSync userWithTweetsToSync, Cancel
var userId = userWithTweetsToSync.User.Id;
var lastPostedTweet = userWithTweetsToSync.Tweets.Select(x => x.Id).Max();
var minimumSync = userWithTweetsToSync.Followers.Select(x => x.FollowingsSyncStatus[userId]).Min();
await _twitterUserDal.UpdateTwitterUserAsync(userId, lastPostedTweet, minimumSync);
var now = DateTime.UtcNow;
await _twitterUserDal.UpdateTwitterUserAsync(userId, lastPostedTweet, minimumSync, now);
}
}
}
49 changes: 49 additions & 0 deletions src/BirdsiteLive.Pipeline/Tools/MaxUsersNumberProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using System.Threading.Tasks;
using BirdsiteLive.Common.Settings;
using BirdsiteLive.DAL.Contracts;

namespace BirdsiteLive.Pipeline.Tools
{
public interface IMaxUsersNumberProvider
{
Task<int> GetMaxUsersNumberAsync();
}

public class MaxUsersNumberProvider : IMaxUsersNumberProvider
{
private readonly InstanceSettings _instanceSettings;
private readonly ITwitterUserDal _twitterUserDal;

private int _totalUsersCount = -1;
private int _warmUpIterations;

#region Ctor
public MaxUsersNumberProvider(InstanceSettings instanceSettings, ITwitterUserDal twitterUserDal)
{
_instanceSettings = instanceSettings;
_twitterUserDal = twitterUserDal;
}
#endregion

public async Task<int> GetMaxUsersNumberAsync()
{
// Init data
if (_totalUsersCount == -1)
{
_totalUsersCount = await _twitterUserDal.GetTwitterUsersCountAsync();
var warmUpMaxCapacity = _instanceSettings.MaxUsersCapacity / 4;
_warmUpIterations = warmUpMaxCapacity == 0 ? 0 : (int)(_totalUsersCount / (float)warmUpMaxCapacity);
}

// Return if warm up ended
if (_warmUpIterations <= 0) return _instanceSettings.MaxUsersCapacity;

// Calculate warm up value
var maxUsers = _warmUpIterations > 0
? _instanceSettings.MaxUsersCapacity / 4
: _instanceSettings.MaxUsersCapacity;
_warmUpIterations--;
return maxUsers;
}
}
}
7 changes: 7 additions & 0 deletions src/BirdsiteLive.sln
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "BirdsiteLive.Domain.Tests",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "BirdsiteLive.Pipeline.Tests", "Tests\BirdsiteLive.Pipeline.Tests\BirdsiteLive.Pipeline.Tests.csproj", "{BF51CA81-5A7A-46F8-B4FB-861C6BE59298}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BirdsiteLive.DAL.Tests", "Tests\BirdsiteLive.DAL.Tests\BirdsiteLive.DAL.Tests.csproj", "{5A1E3EB5-6CBB-470D-8A0D-10F8C18353D5}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -101,6 +103,10 @@ Global
{BF51CA81-5A7A-46F8-B4FB-861C6BE59298}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BF51CA81-5A7A-46F8-B4FB-861C6BE59298}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BF51CA81-5A7A-46F8-B4FB-861C6BE59298}.Release|Any CPU.Build.0 = Release|Any CPU
{5A1E3EB5-6CBB-470D-8A0D-10F8C18353D5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5A1E3EB5-6CBB-470D-8A0D-10F8C18353D5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5A1E3EB5-6CBB-470D-8A0D-10F8C18353D5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5A1E3EB5-6CBB-470D-8A0D-10F8C18353D5}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -119,6 +125,7 @@ Global
{2A8CC30D-D775-47D1-9388-F72A5C32DE2A} = {DA3C160C-4811-4E26-A5AD-42B81FAF2D7C}
{F544D745-89A8-4DEA-B61C-A7E6C53C1D63} = {A32D3458-09D0-4E0A-BA4B-8C411B816B94}
{BF51CA81-5A7A-46F8-B4FB-861C6BE59298} = {A32D3458-09D0-4E0A-BA4B-8C411B816B94}
{5A1E3EB5-6CBB-470D-8A0D-10F8C18353D5} = {A32D3458-09D0-4E0A-BA4B-8C411B816B94}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {69E8DCAD-4C37-4010-858F-5F94E6FBABCE}
Expand Down
2 changes: 1 addition & 1 deletion src/BirdsiteLive/BirdsiteLive.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<TargetFramework>netcoreapp3.1</TargetFramework>
<UserSecretsId>d21486de-a812-47eb-a419-05682bb68856</UserSecretsId>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
<Version>0.10.1</Version>
<Version>0.11.0</Version>
</PropertyGroup>

<ItemGroup>
Expand Down
25 changes: 6 additions & 19 deletions src/BirdsiteLive/Services/FederationService.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BirdsiteLive.DAL;
using BirdsiteLive.DAL.Contracts;
using BirdsiteLive.Pipeline;
using Microsoft.Extensions.Hosting;
Expand All @@ -9,36 +11,21 @@ namespace BirdsiteLive.Services
{
public class FederationService : BackgroundService
{
private readonly IDbInitializerDal _dbInitializerDal;
private readonly IDatabaseInitializer _databaseInitializer;
private readonly IStatusPublicationPipeline _statusPublicationPipeline;

#region Ctor
public FederationService(IDbInitializerDal dbInitializerDal, IStatusPublicationPipeline statusPublicationPipeline)
public FederationService(IDatabaseInitializer databaseInitializer, IStatusPublicationPipeline statusPublicationPipeline)
{
_dbInitializerDal = dbInitializerDal;
_databaseInitializer = databaseInitializer;
_statusPublicationPipeline = statusPublicationPipeline;
}
#endregion

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await DbInitAsync();
await _databaseInitializer.InitAndMigrateDbAsync();
await _statusPublicationPipeline.ExecuteAsync(stoppingToken);
}

private async Task DbInitAsync()
{
var currentVersion = await _dbInitializerDal.GetCurrentDbVersionAsync();
var mandatoryVersion = _dbInitializerDal.GetMandatoryDbVersion();

if (currentVersion == null)
{
await _dbInitializerDal.InitDbAsync();
}
else if (currentVersion != mandatoryVersion)
{
throw new NotImplementedException();
}
}
}
}
3 changes: 2 additions & 1 deletion src/BirdsiteLive/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
"Domain": "domain.name",
"AdminEmail": "[email protected]",
"ResolveMentionsInProfiles": true,
"PublishReplies": false
"PublishReplies": false,
"MaxUsersCapacity": 1400
},
"Db": {
"Type": "postgres",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ internal class DbVersion
public class DbInitializerPostgresDal : PostgresBase, IDbInitializerDal
{
private readonly PostgresTools _tools;
private readonly Version _currentVersion = new Version(1,0);
private readonly Version _currentVersion = new Version(2, 0);
private const string DbVersionType = "db-version";

#region Ctor
Expand All @@ -32,7 +32,7 @@ public DbInitializerPostgresDal(PostgresSettings settings, PostgresTools tools)
_tools = tools;
}
#endregion

public async Task<Version> GetCurrentDbVersionAsync()
{
var query = $"SELECT * FROM {_settings.DbVersionTableName} WHERE type = @type";
Expand Down Expand Up @@ -65,17 +65,7 @@ public Version GetMandatoryDbVersion()
return _currentVersion;
}

public Tuple<Version, Version>[] GetMigrationPatterns()
{
return new Tuple<Version, Version>[0];
}

public Task MigrateDbAsync(Version from, Version to)
{
throw new NotImplementedException();
}

public async Task InitDbAsync()
public async Task<Version> InitDbAsync()
{
// Create version table
var createVersion = $@"CREATE TABLE {_settings.DbVersionTableName}
Expand Down Expand Up @@ -124,13 +114,53 @@ data JSONB
await _tools.ExecuteRequestAsync(createCachedTweets);

// Insert version to db
var firstVersion = new Version(1, 0);
using (var dbConnection = Connection)
{
dbConnection.Open();

await dbConnection.ExecuteAsync(
$"INSERT INTO {_settings.DbVersionTableName} (type,major,minor) VALUES(@type,@major,@minor)",
new { type = DbVersionType, major = _currentVersion.Major, minor = _currentVersion.Minor });
new { type = DbVersionType, major = firstVersion.Major, minor = firstVersion.Minor });
}

return firstVersion;
}

public Tuple<Version, Version>[] GetMigrationPatterns()
{
return new[]
{
new Tuple<Version, Version>(new Version(1,0), new Version(2,0))
};
}

public async Task<Version> MigrateDbAsync(Version from, Version to)
{
if (from == new Version(1, 0) && to == new Version(2, 0))
{
var addLastSync = $@"ALTER TABLE {_settings.TwitterUserTableName} ADD lastSync TIMESTAMP (2) WITHOUT TIME ZONE";
await _tools.ExecuteRequestAsync(addLastSync);

var addIndex = $@"CREATE INDEX IF NOT EXISTS lastsync_twitteruser ON {_settings.TwitterUserTableName}(lastSync)";
await _tools.ExecuteRequestAsync(addIndex);

await UpdateDbVersionAsync(to);
return to;
}

throw new NotImplementedException();
}

private async Task UpdateDbVersionAsync(Version newVersion)
{
using (var dbConnection = Connection)
{
dbConnection.Open();

await dbConnection.ExecuteAsync(
$"UPDATE {_settings.DbVersionTableName} SET major = @major, minor = @minor WHERE type = @type",
new { type = DbVersionType, major = newVersion.Major, minor = newVersion.Minor });
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,32 +62,33 @@ public async Task<int> GetTwitterUsersCountAsync()
}
}

public async Task<SyncTwitterUser[]> GetAllTwitterUsersAsync()
public async Task<SyncTwitterUser[]> GetAllTwitterUsersAsync(int maxNumber)
{
var query = $"SELECT * FROM {_settings.TwitterUserTableName}";
var query = $"SELECT * FROM {_settings.TwitterUserTableName} ORDER BY lastSync ASC LIMIT @maxNumber";

using (var dbConnection = Connection)
{
dbConnection.Open();

var result = await dbConnection.QueryAsync<SyncTwitterUser>(query);
var result = await dbConnection.QueryAsync<SyncTwitterUser>(query, new { maxNumber });
return result.ToArray();
}
}

public async Task UpdateTwitterUserAsync(int id, long lastTweetPostedId, long lastTweetSynchronizedForAllFollowersId)
public async Task UpdateTwitterUserAsync(int id, long lastTweetPostedId, long lastTweetSynchronizedForAllFollowersId, DateTime lastSync)
{
if(id == default) throw new ArgumentException("id");
if(lastTweetPostedId == default) throw new ArgumentException("lastTweetPostedId");
if(lastTweetSynchronizedForAllFollowersId == default) throw new ArgumentException("lastTweetSynchronizedForAllFollowersId");

var query = $"UPDATE {_settings.TwitterUserTableName} SET lastTweetPostedId = @lastTweetPostedId, lastTweetSynchronizedForAllFollowersId = @lastTweetSynchronizedForAllFollowersId WHERE id = @id";
if(lastSync == default) throw new ArgumentException("lastSync");

var query = $"UPDATE {_settings.TwitterUserTableName} SET lastTweetPostedId = @lastTweetPostedId, lastTweetSynchronizedForAllFollowersId = @lastTweetSynchronizedForAllFollowersId, lastSync = @lastSync WHERE id = @id";

using (var dbConnection = Connection)
{
dbConnection.Open();

await dbConnection.QueryAsync(query, new { id, lastTweetPostedId, lastTweetSynchronizedForAllFollowersId });
await dbConnection.QueryAsync(query, new { id, lastTweetPostedId, lastTweetSynchronizedForAllFollowersId, lastSync = lastSync.ToUniversalTime() });
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,11 @@ public PostgresTools(PostgresSettings settings)

public async Task ExecuteRequestAsync(string request)
{
try
using (var conn = new NpgsqlConnection(_settings.ConnString))
using (var cmd = new NpgsqlCommand(request, conn))
{
using (var conn = new NpgsqlConnection(_settings.ConnString))
using (var cmd = new NpgsqlCommand(request, conn))
{
await conn.OpenAsync();
await cmd.ExecuteNonQueryAsync();
}
}
catch (Exception e)
{
Console.WriteLine(e);
await conn.OpenAsync();
await cmd.ExecuteNonQueryAsync();
}
}
}
Expand Down
Loading

0 comments on commit 9971433

Please sign in to comment.