Skip to content

Commit

Permalink
Merge pull request #488 from ONLYOFFICE/feature/support-elasticache
Browse files Browse the repository at this point in the history
Feature/support elasticache
  • Loading branch information
alexeybannov authored Jan 14, 2022
2 parents a1ff0a8 + 0796169 commit a764862
Show file tree
Hide file tree
Showing 160 changed files with 1,184 additions and 749 deletions.
36 changes: 23 additions & 13 deletions build/build.backend.bat
Original file line number Diff line number Diff line change
@@ -1,23 +1,33 @@
@echo off

PUSHD %~dp0..
dotnet build ASC.Web.slnf /fl1 /flp1:LogFile=build/ASC.Web.log;Verbosity=Normal
echo.
echo Install nodejs projects dependencies...
echo Start build backend...
echo.

cd /D "%~dp0"
call runasadmin.bat "%~dpnx0"

if %errorlevel% == 0 (
for /R "build\scripts\" %%f in (*.bat) do (
echo Run script %%~nxf...
echo.
call build\scripts\%%~nxf
)
)
call start\stop.bat nopause
dotnet build ..\asc.web.slnf /fl1 /flp1:logfile=asc.web.log;verbosity=normal
echo.

call start\start.bat nopause

echo install nodejs projects dependencies...
echo.

POPD

if "%1"=="nopause" goto start
for /R "scripts\" %%f in (*.bat) do (
echo Run script %%~nxf...
echo.
call scripts\%%~nxf
)

echo.

if "%1"=="nopause" goto end
pause
:start

)

:end
2 changes: 1 addition & 1 deletion build/build.static.bat
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
@echo off

PUSHD %~dp0
cd /D "%~dp0"
call runasadmin.bat "%~dpnx0"

if %errorlevel% == 0 (
Expand Down
10 changes: 3 additions & 7 deletions build/buildAndDeploy.bat
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,20 @@ echo "##########################################################"

echo.

PUSHD %~dp0
setlocal EnableDelayedExpansion

cd /D "%~dp0"
call runasadmin.bat "%~dpnx0"

if %errorlevel% == 0 (

call start\stop.bat nopause

echo "FRONT-END static"
call build.static.bat nopause

echo "BACK-END"
call build.backend.bat nopause

call start\start.bat nopause

echo.

pause
)

)
2 changes: 1 addition & 1 deletion build/scripts/urlshortener.bat
Original file line number Diff line number Diff line change
@@ -1 +1 @@
yarn install --cwd common/ASC.UrlShortener/ --frozen-lockfile
yarn install --cwd %~dp0../../common/ASC.UrlShortener/ --frozen-lockfile
22 changes: 18 additions & 4 deletions common/ASC.Api.Core/Core/BaseStartup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Diagnostics.HealthChecks;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http.Extensions;
using Microsoft.AspNetCore.HttpOverrides;
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.Mvc.Authorization;
Expand All @@ -34,6 +35,8 @@
using NLog;
using NLog.Extensions.Logging;

using StackExchange.Redis.Extensions.Core.Configuration;

namespace ASC.Api.Core
{
public abstract class BaseStartup
Expand All @@ -56,7 +59,7 @@ public BaseStartup(IConfiguration configuration, IHostEnvironment hostEnvironmen
if (bool.TryParse(Configuration["core:products"], out var loadProducts))
{
LoadProducts = loadProducts;
}
}
}

public virtual void ConfigureServices(IServiceCollection services)
Expand All @@ -82,8 +85,8 @@ public virtual void ConfigureServices(IServiceCollection services)
{
options.JsonSerializerOptions.Converters.Add(c);
}
}
};
}
};

services.AddControllers()
.AddXmlSerializerFormatters()
Expand All @@ -101,7 +104,18 @@ public virtual void ConfigureServices(IServiceCollection services)
DIHelper.TryAdd<CookieAuthHandler>();
DIHelper.TryAdd<WebhooksGlobalFilterAttribute>();

DIHelper.TryAdd(typeof(ICacheNotify<>), typeof(KafkaCache<>));
var redisConfiguration = Configuration.GetSection("Redis").Get<RedisConfiguration>();

if (redisConfiguration != null)
{
DIHelper.TryAdd(typeof(ICacheNotify<>), typeof(RedisCache<>));
}
else
{
DIHelper.TryAdd(typeof(ICacheNotify<>), typeof(MemoryCacheNotify<>));
}


DIHelper.TryAdd(typeof(IWebhookPublisher), typeof(WebhookPublisher));

if (LoadProducts)
Expand Down
14 changes: 8 additions & 6 deletions common/ASC.Common/ASC.Common.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
<PackageReference Include="Autofac.Configuration" Version="6.0.0" />
<PackageReference Include="AutoMapper" Version="10.1.1" />
<PackageReference Include="Confluent.Kafka" Version="1.4.3" />
<PackageReference Include="Google.Protobuf" Version="3.14.0" />
<PackageReference Include="Grpc.Tools" Version="2.32.0">
<PackageReference Include="Google.Protobuf" Version="3.19.1" />
<PackageReference Include="Grpc.Tools" Version="2.40.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
Expand All @@ -55,12 +55,14 @@
</PackageReference> -->
<PackageReference Include="NLog" Version="4.7.10" />
<PackageReference Include="NVelocity" Version="1.2.0" />
<PackageReference Include="StackExchange.Redis" Version="2.2.88" />
<PackageReference Include="StackExchange.Redis.Extensions.Core" Version="7.2.1" />
</ItemGroup>
<ItemGroup>
<Protobuf Include="protos\AscCacheItem.proto" />
<Protobuf Include="protos\NotifyItem.proto" />
<Protobuf Include="protos\DistributedTaskCache.proto" />
<Protobuf Include="protos\DistributedTaskCancelation.proto" />
<Protobuf Include="protos\asc_cache_item.proto" />
<Protobuf Include="protos\notify_item.proto" />
<Protobuf Include="protos\distributed_task_cache.proto" />
<Protobuf Include="protos\distributed_task_cancelation.proto" />
</ItemGroup>
<ItemGroup>
<Folder Include="Notify\" />
Expand Down
2 changes: 1 addition & 1 deletion common/ASC.Common/Caching/AscCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public AscCacheNotify(ICacheNotify<AscCacheItem> cacheNotify)

public void ClearCache()
{
CacheNotify.Publish(new AscCacheItem { Id = ByteString.CopyFrom(Guid.NewGuid().ToByteArray()) }, CacheNotifyAction.Any);
CacheNotify.Publish(new AscCacheItem { Id = Guid.NewGuid().ToString() }, CacheNotifyAction.Any);
}

public static void OnClearCache()
Expand Down
70 changes: 7 additions & 63 deletions common/ASC.Common/Caching/KafkaCache.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -24,8 +23,6 @@ namespace ASC.Common.Caching
private ILog Log { get; set; }
private ConcurrentDictionary<string, CancellationTokenSource> Cts { get; set; }
private ConcurrentDictionary<string, Action<T>> Actions { get; set; }
private MemoryCacheNotify<T> MemoryCacheNotify { get; set; }
private string ChannelName { get; } = $"ascchannel{typeof(T).Name}";
private ProtobufSerializer<T> ValueSerializer { get; } = new ProtobufSerializer<T>();
private ProtobufDeserializer<T> ValueDeserializer { get; } = new ProtobufDeserializer<T>();
private ProtobufSerializer<AscCacheItem> KeySerializer { get; } = new ProtobufSerializer<AscCacheItem>();
Expand All @@ -41,26 +38,13 @@ public KafkaCache(ConfigurationExtension configuration, IOptionsMonitor<ILog> op
Key = Guid.NewGuid();

var settings = configuration.GetSetting<KafkaSettings>("kafka");
if (settings != null && !string.IsNullOrEmpty(settings.BootstrapServers))
{
ClientConfig = new ClientConfig { BootstrapServers = settings.BootstrapServers };
AdminClientConfig = new AdminClientConfig { BootstrapServers = settings.BootstrapServers };
}
else
{
MemoryCacheNotify = new MemoryCacheNotify<T>();
}

ClientConfig = new ClientConfig { BootstrapServers = settings.BootstrapServers };
AdminClientConfig = new AdminClientConfig { BootstrapServers = settings.BootstrapServers };
}

public void Publish(T obj, CacheNotifyAction cacheNotifyAction)
{
if (ClientConfig == null)
{
MemoryCacheNotify.Publish(obj, cacheNotifyAction);
return;
}

try
{
if (Producer == null)
Expand All @@ -84,7 +68,7 @@ public void Publish(T obj, CacheNotifyAction cacheNotifyAction)
Value = obj,
Key = new AscCacheItem
{
Id = ByteString.CopyFrom(Key.ToByteArray())
Id = Key.ToString()
}
};

Expand All @@ -102,12 +86,8 @@ public void Publish(T obj, CacheNotifyAction cacheNotifyAction)

public void Subscribe(Action<T> onchange, CacheNotifyAction cacheNotifyAction)
{
if (ClientConfig == null)
{
MemoryCacheNotify.Subscribe(onchange, cacheNotifyAction);
return;
}
var channelName = GetChannelName(cacheNotifyAction);

Cts[channelName] = new CancellationTokenSource();
Actions[channelName] = onchange;

Expand Down Expand Up @@ -137,7 +117,7 @@ void action()
}
}).Wait();
}
catch(AggregateException)
catch (AggregateException)
{

}
Expand All @@ -159,7 +139,7 @@ void action()
try
{
var cr = c.Consume(Cts[channelName].Token);
if (cr != null && cr.Message != null && cr.Message.Value != null && !(new Guid(cr.Message.Key.Id.ToByteArray())).Equals(Key) && Actions.TryGetValue(channelName, out var act))
if (cr != null && cr.Message != null && cr.Message.Value != null && !(new Guid(cr.Message.Key.Id)).Equals(Key) && Actions.TryGetValue(channelName, out var act))
{
try
{
Expand Down Expand Up @@ -189,7 +169,7 @@ void action()

private string GetChannelName(CacheNotifyAction cacheNotifyAction)
{
return $"{ChannelName}{cacheNotifyAction}";
return $"asc:channel:{cacheNotifyAction}:{typeof(T).FullName}".ToLower();
}

public void Unsubscribe(CacheNotifyAction action)
Expand Down Expand Up @@ -231,40 +211,4 @@ public class KafkaSettings
{
public string BootstrapServers { get; set; }
}

public class MemoryCacheNotify<T> : ICacheNotify<T> where T : IMessage<T>, new()
{
private readonly Dictionary<string, List<Action<T>>> actions = new Dictionary<string, List<Action<T>>>();

public void Publish(T obj, CacheNotifyAction action)
{
if (actions.TryGetValue(GetKey(action), out var onchange) && onchange != null)
{
foreach (var a in onchange)
{
a(obj);
}
}
}

public void Subscribe(Action<T> onchange, CacheNotifyAction notifyAction)
{
if (onchange != null)
{
var key = GetKey(notifyAction);
actions.TryAdd(key, new List<Action<T>>());
actions[key].Add(onchange);
}
}

public void Unsubscribe(CacheNotifyAction action)
{
actions.Remove(GetKey(action));
}

private string GetKey(CacheNotifyAction cacheNotifyAction)
{
return $"{typeof(T).Name}{cacheNotifyAction}";
}
}
}
48 changes: 48 additions & 0 deletions common/ASC.Common/Caching/MemoryCacheNotify.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;

using Google.Protobuf;

namespace ASC.Common.Caching
{
[Singletone]
public class MemoryCacheNotify<T> : ICacheNotify<T> where T : IMessage<T>, new()
{
private readonly ConcurrentDictionary<string, List<Action<T>>> _actions;

public MemoryCacheNotify()
{
_actions = new ConcurrentDictionary<string, List<Action<T>>>();
}

public void Publish(T obj, CacheNotifyAction action)
{
if (_actions.TryGetValue(GetKey(action), out var onchange) && onchange != null)
{
Parallel.ForEach(onchange, a => a(obj));
}
}

public void Subscribe(Action<T> onchange, CacheNotifyAction notifyAction)
{
if (onchange != null)
{
var key = GetKey(notifyAction);
_actions.TryAdd(key, new List<Action<T>>());
_actions[key].Add(onchange);
}
}

public void Unsubscribe(CacheNotifyAction action)
{
_actions.TryRemove(GetKey(action), out _);
}

private string GetKey(CacheNotifyAction cacheNotifyAction)
{
return $"asc:channel:{cacheNotifyAction}:{typeof(T).FullName}".ToLower();
}
}
}
Loading

0 comments on commit a764862

Please sign in to comment.