From 22ae6ca99346e4e81957563e4b387e64c45bb989 Mon Sep 17 00:00:00 2001 From: liangshiwei Date: Wed, 12 Aug 2020 20:30:09 +0800 Subject: [PATCH 1/2] Add Kafka event bus integration --- framework/Volo.Abp.sln | 14 ++ .../Volo.Abp.EventBus.Kafka/FodyWeavers.xml | 3 + .../Volo.Abp.EventBus.Kafka/FodyWeavers.xsd | 30 +++ .../Volo.Abp.EventBus.Kafka.csproj | 16 ++ .../EventBus/Kafka/AbpEventBusKafkaModule.cs | 27 +++ .../EventBus/Kafka/AbpKafkaEventBusOptions.cs | 12 + .../Kafka/KafkaDistributedEventBus.cs | 208 ++++++++++++++++++ framework/src/Volo.Abp.Kafka/FodyWeavers.xml | 3 + framework/src/Volo.Abp.Kafka/FodyWeavers.xsd | 30 +++ .../src/Volo.Abp.Kafka/Volo.Abp.Kafka.csproj | 17 ++ .../Volo/Abp/Kafka/AbpKafkaModule.cs | 31 +++ .../Volo/Abp/Kafka/AbpKafkaOptions.cs | 22 ++ .../Volo/Abp/Kafka/ConsumerPool.cs | 105 +++++++++ .../Volo/Abp/Kafka/IConsumerPool.cs | 10 + .../Volo/Abp/Kafka/IKafkaMessageConsumer.cs | 11 + .../Abp/Kafka/IKafkaMessageConsumerFactory.cs | 19 ++ .../Volo/Abp/Kafka/IKafkaSerializer.cs | 11 + .../Volo/Abp/Kafka/IProducerPool.cs | 10 + .../Volo/Abp/Kafka/KafkaConnections.cs | 35 +++ .../Volo/Abp/Kafka/KafkaMessageConsumer.cs | 156 +++++++++++++ .../Abp/Kafka/KafkaMessageConsumerFactory.cs | 32 +++ .../Volo/Abp/Kafka/ProducerPool.cs | 97 ++++++++ .../Volo/Abp/Kafka/Utf8JsonKafkaSerializer.cs | 27 +++ nupkg/common.ps1 | 2 + 24 files changed, 928 insertions(+) create mode 100644 framework/src/Volo.Abp.EventBus.Kafka/FodyWeavers.xml create mode 100644 framework/src/Volo.Abp.EventBus.Kafka/FodyWeavers.xsd create mode 100644 framework/src/Volo.Abp.EventBus.Kafka/Volo.Abp.EventBus.Kafka.csproj create mode 100644 framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/AbpEventBusKafkaModule.cs create mode 100644 framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/AbpKafkaEventBusOptions.cs create mode 100644 framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs create mode 100644 framework/src/Volo.Abp.Kafka/FodyWeavers.xml create mode 100644 framework/src/Volo.Abp.Kafka/FodyWeavers.xsd create mode 100644 framework/src/Volo.Abp.Kafka/Volo.Abp.Kafka.csproj create mode 100644 framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaModule.cs create mode 100644 framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaOptions.cs create mode 100644 framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ConsumerPool.cs create mode 100644 framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IConsumerPool.cs create mode 100644 framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumer.cs create mode 100644 framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumerFactory.cs create mode 100644 framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaSerializer.cs create mode 100644 framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IProducerPool.cs create mode 100644 framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaConnections.cs create mode 100644 framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs create mode 100644 framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumerFactory.cs create mode 100644 framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs create mode 100644 framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/Utf8JsonKafkaSerializer.cs diff --git a/framework/Volo.Abp.sln b/framework/Volo.Abp.sln index 5778b054ed5..2b381f43159 100644 --- a/framework/Volo.Abp.sln +++ b/framework/Volo.Abp.sln @@ -323,6 +323,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.BlobStoring.Aws", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.BlobStoring.Aws.Tests", "test\Volo.Abp.BlobStoring.Aws.Tests\Volo.Abp.BlobStoring.Aws.Tests.csproj", "{2CD3B26A-CA81-4279-8D5D-6A594517BB3F}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.Kafka", "src\Volo.Abp.Kafka\Volo.Abp.Kafka.csproj", "{2A864049-9CD5-4493-8CDB-C408474D43D4}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.EventBus.Kafka", "src\Volo.Abp.EventBus.Kafka\Volo.Abp.EventBus.Kafka.csproj", "{C1D891B0-AE83-42CB-987D-425A2787DE78}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -961,6 +965,14 @@ Global {2CD3B26A-CA81-4279-8D5D-6A594517BB3F}.Debug|Any CPU.Build.0 = Debug|Any CPU {2CD3B26A-CA81-4279-8D5D-6A594517BB3F}.Release|Any CPU.ActiveCfg = Release|Any CPU {2CD3B26A-CA81-4279-8D5D-6A594517BB3F}.Release|Any CPU.Build.0 = Release|Any CPU + {2A864049-9CD5-4493-8CDB-C408474D43D4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {2A864049-9CD5-4493-8CDB-C408474D43D4}.Debug|Any CPU.Build.0 = Debug|Any CPU + {2A864049-9CD5-4493-8CDB-C408474D43D4}.Release|Any CPU.ActiveCfg = Release|Any CPU + {2A864049-9CD5-4493-8CDB-C408474D43D4}.Release|Any CPU.Build.0 = Release|Any CPU + {C1D891B0-AE83-42CB-987D-425A2787DE78}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C1D891B0-AE83-42CB-987D-425A2787DE78}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C1D891B0-AE83-42CB-987D-425A2787DE78}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C1D891B0-AE83-42CB-987D-425A2787DE78}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -1124,6 +1136,8 @@ Global {8E49687A-E69F-49F2-8DB0-428D0883A937} = {447C8A77-E5F0-4538-8687-7383196D04EA} {50968CDE-1029-4051-B2E5-B69D0ECF2A18} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6} {2CD3B26A-CA81-4279-8D5D-6A594517BB3F} = {447C8A77-E5F0-4538-8687-7383196D04EA} + {2A864049-9CD5-4493-8CDB-C408474D43D4} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6} + {C1D891B0-AE83-42CB-987D-425A2787DE78} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {BB97ECF4-9A84-433F-A80B-2A3285BDD1D5} diff --git a/framework/src/Volo.Abp.EventBus.Kafka/FodyWeavers.xml b/framework/src/Volo.Abp.EventBus.Kafka/FodyWeavers.xml new file mode 100644 index 00000000000..bc5a74a236f --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Kafka/FodyWeavers.xml @@ -0,0 +1,3 @@ + + + diff --git a/framework/src/Volo.Abp.EventBus.Kafka/FodyWeavers.xsd b/framework/src/Volo.Abp.EventBus.Kafka/FodyWeavers.xsd new file mode 100644 index 00000000000..3f3946e282d --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Kafka/FodyWeavers.xsd @@ -0,0 +1,30 @@ + + + + + + + + + + + + + + + 'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed. + + + + + A comma-separated list of error codes that can be safely ignored in assembly verification. + + + + + 'false' to turn off automatic generation of the XML Schema file. + + + + + \ No newline at end of file diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo.Abp.EventBus.Kafka.csproj b/framework/src/Volo.Abp.EventBus.Kafka/Volo.Abp.EventBus.Kafka.csproj new file mode 100644 index 00000000000..42b02937813 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo.Abp.EventBus.Kafka.csproj @@ -0,0 +1,16 @@ + + + + + + + netstandard2.0 + + + + + + + + + diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/AbpEventBusKafkaModule.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/AbpEventBusKafkaModule.cs new file mode 100644 index 00000000000..e0e408cb6ba --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/AbpEventBusKafkaModule.cs @@ -0,0 +1,27 @@ +using Microsoft.Extensions.DependencyInjection; +using Volo.Abp.Kafka; +using Volo.Abp.Modularity; + +namespace Volo.Abp.EventBus.Kafka +{ + [DependsOn( + typeof(AbpEventBusModule), + typeof(AbpKafkaModule))] + public class AbpEventBusKafkaModule : AbpModule + { + public override void ConfigureServices(ServiceConfigurationContext context) + { + var configuration = context.Services.GetConfiguration(); + + Configure(configuration.GetSection("Kafka:EventBus")); + } + + public override void OnApplicationInitialization(ApplicationInitializationContext context) + { + context + .ServiceProvider + .GetRequiredService() + .Initialize(); + } + } +} diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/AbpKafkaEventBusOptions.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/AbpKafkaEventBusOptions.cs new file mode 100644 index 00000000000..3aff9699fb4 --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/AbpKafkaEventBusOptions.cs @@ -0,0 +1,12 @@ +namespace Volo.Abp.EventBus.Kafka +{ + public class AbpKafkaEventBusOptions + { + + public string ConnectionName { get; set; } + + public string TopicName { get; set; } + + public string GroupId { get; set; } + } +} diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs new file mode 100644 index 00000000000..fd6d773952f --- /dev/null +++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs @@ -0,0 +1,208 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Confluent.Kafka; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using Volo.Abp.DependencyInjection; +using Volo.Abp.EventBus.Distributed; +using Volo.Abp.Kafka; +using Volo.Abp.MultiTenancy; +using Volo.Abp.Threading; + +namespace Volo.Abp.EventBus.Kafka +{ + [Dependency(ReplaceServices = true)] + [ExposeServices(typeof(IDistributedEventBus), typeof(KafkaDistributedEventBus))] + public class KafkaDistributedEventBus : EventBusBase, IDistributedEventBus, ISingletonDependency + { + protected AbpKafkaEventBusOptions AbpKafkaEventBusOptions { get; } + protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; } + protected IKafkaMessageConsumerFactory MessageConsumerFactory { get; } + protected IKafkaSerializer Serializer { get; } + protected IProducerPool ProducerPool { get; } + protected ConcurrentDictionary> HandlerFactories { get; } + protected ConcurrentDictionary EventTypes { get; } + protected IKafkaMessageConsumer Consumer { get; private set; } + + public KafkaDistributedEventBus( + IServiceScopeFactory serviceScopeFactory, + ICurrentTenant currentTenant, + IOptions abpKafkaEventBusOptions, + IKafkaMessageConsumerFactory messageConsumerFactory, + IOptions abpDistributedEventBusOptions, + IKafkaSerializer serializer, + IProducerPool producerPool) + : base(serviceScopeFactory, currentTenant) + { + AbpKafkaEventBusOptions = abpKafkaEventBusOptions.Value; + AbpDistributedEventBusOptions = abpDistributedEventBusOptions.Value; + MessageConsumerFactory = messageConsumerFactory; + Serializer = serializer; + ProducerPool = producerPool; + + HandlerFactories = new ConcurrentDictionary>(); + EventTypes = new ConcurrentDictionary(); + } + + public void Initialize() + { + Consumer = MessageConsumerFactory.Create( + AbpKafkaEventBusOptions.TopicName, + AbpKafkaEventBusOptions.GroupId, + AbpKafkaEventBusOptions.ConnectionName); + + Consumer.OnMessageReceived(ProcessEventAsync); + + SubscribeHandlers(AbpDistributedEventBusOptions.Handlers); + } + + private async Task ProcessEventAsync(Message message) + { + var eventName = message.Key; + var eventType = EventTypes.GetOrDefault(eventName); + if (eventType == null) + { + return; + } + + var eventData = Serializer.Deserialize(message.Value, eventType); + + await TriggerHandlersAsync(eventType, eventData); + } + + public IDisposable Subscribe(IDistributedEventHandler handler) where TEvent : class + { + return Subscribe(typeof(TEvent), handler); + } + + public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) + { + var handlerFactories = GetOrCreateHandlerFactories(eventType); + + if (factory.IsInFactories(handlerFactories)) + { + return NullDisposable.Instance; + } + + handlerFactories.Add(factory); + + return new EventHandlerFactoryUnregistrar(this, eventType, factory); + } + + /// + public override void Unsubscribe(Func action) + { + Check.NotNull(action, nameof(action)); + + GetOrCreateHandlerFactories(typeof(TEvent)) + .Locking(factories => + { + factories.RemoveAll( + factory => + { + var singleInstanceFactory = factory as SingleInstanceHandlerFactory; + if (singleInstanceFactory == null) + { + return false; + } + + var actionHandler = singleInstanceFactory.HandlerInstance as ActionEventHandler; + if (actionHandler == null) + { + return false; + } + + return actionHandler.Action == action; + }); + }); + } + + /// + public override void Unsubscribe(Type eventType, IEventHandler handler) + { + GetOrCreateHandlerFactories(eventType) + .Locking(factories => + { + factories.RemoveAll( + factory => + factory is SingleInstanceHandlerFactory handlerFactory && + handlerFactory.HandlerInstance == handler + ); + }); + } + + /// + public override void Unsubscribe(Type eventType, IEventHandlerFactory factory) + { + GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Remove(factory)); + } + + /// + public override void UnsubscribeAll(Type eventType) + { + GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear()); + } + + public override async Task PublishAsync(Type eventType, object eventData) + { + var eventName = EventNameAttribute.GetNameOrDefault(eventType); + var body = Serializer.Serialize(eventData); + + var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName); + + await producer.ProduceAsync( + AbpKafkaEventBusOptions.TopicName, + new Message + { + Key = eventName, Value = body + }); + } + + private List GetOrCreateHandlerFactories(Type eventType) + { + return HandlerFactories.GetOrAdd( + eventType, + type => + { + var eventName = EventNameAttribute.GetNameOrDefault(type); + EventTypes[eventName] = type; + return new List(); + } + ); + } + + protected override IEnumerable GetHandlerFactories(Type eventType) + { + var handlerFactoryList = new List(); + + foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key)) + ) + { + handlerFactoryList.Add( + new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value)); + } + + return handlerFactoryList.ToArray(); + } + + private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType) + { + //Should trigger same type + if (handlerEventType == targetEventType) + { + return true; + } + + //Should trigger for inherited types + if (handlerEventType.IsAssignableFrom(targetEventType)) + { + return true; + } + + return false; + } + } +} diff --git a/framework/src/Volo.Abp.Kafka/FodyWeavers.xml b/framework/src/Volo.Abp.Kafka/FodyWeavers.xml new file mode 100644 index 00000000000..be0de3a9084 --- /dev/null +++ b/framework/src/Volo.Abp.Kafka/FodyWeavers.xml @@ -0,0 +1,3 @@ + + + \ No newline at end of file diff --git a/framework/src/Volo.Abp.Kafka/FodyWeavers.xsd b/framework/src/Volo.Abp.Kafka/FodyWeavers.xsd new file mode 100644 index 00000000000..3f3946e282d --- /dev/null +++ b/framework/src/Volo.Abp.Kafka/FodyWeavers.xsd @@ -0,0 +1,30 @@ + + + + + + + + + + + + + + + 'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed. + + + + + A comma-separated list of error codes that can be safely ignored in assembly verification. + + + + + 'false' to turn off automatic generation of the XML Schema file. + + + + + \ No newline at end of file diff --git a/framework/src/Volo.Abp.Kafka/Volo.Abp.Kafka.csproj b/framework/src/Volo.Abp.Kafka/Volo.Abp.Kafka.csproj new file mode 100644 index 00000000000..e91f42d7b28 --- /dev/null +++ b/framework/src/Volo.Abp.Kafka/Volo.Abp.Kafka.csproj @@ -0,0 +1,17 @@ + + + + + + + netstandard2.0 + + + + + + + + + + diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaModule.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaModule.cs new file mode 100644 index 00000000000..d69e9243490 --- /dev/null +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaModule.cs @@ -0,0 +1,31 @@ +using Microsoft.Extensions.DependencyInjection; +using Volo.Abp.Json; +using Volo.Abp.Modularity; +using Volo.Abp.Threading; + +namespace Volo.Abp.Kafka +{ + [DependsOn( + typeof(AbpJsonModule), + typeof(AbpThreadingModule) + )] + public class AbpKafkaModule : AbpModule + { + public override void ConfigureServices(ServiceConfigurationContext context) + { + var configuration = context.Services.GetConfiguration(); + Configure(configuration.GetSection("Kafka")); + } + + public override void OnApplicationShutdown(ApplicationShutdownContext context) + { + context.ServiceProvider + .GetRequiredService() + .Dispose(); + + context.ServiceProvider + .GetRequiredService() + .Dispose(); + } + } +} diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaOptions.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaOptions.cs new file mode 100644 index 00000000000..1769d8a076f --- /dev/null +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaOptions.cs @@ -0,0 +1,22 @@ +using System; +using Confluent.Kafka; +using Confluent.Kafka.Admin; + +namespace Volo.Abp.Kafka +{ + public class AbpKafkaOptions + { + public KafkaConnections Connections { get; } + + public Action ConfigureProducer { get; set; } + + public Action ConfigureConsumer { get; set; } + + public Action ConfigureTopic { get; set; } + + public AbpKafkaOptions() + { + Connections = new KafkaConnections(); + } + } +} diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ConsumerPool.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ConsumerPool.cs new file mode 100644 index 00000000000..07af447e1a3 --- /dev/null +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ConsumerPool.cs @@ -0,0 +1,105 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using Confluent.Kafka; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using Volo.Abp.DependencyInjection; + +namespace Volo.Abp.Kafka +{ + public class ConsumerPool : IConsumerPool, ISingletonDependency + { + protected AbpKafkaOptions Options { get; } + + protected ConcurrentDictionary> Consumers { get; } + + protected TimeSpan TotalDisposeWaitDuration { get; set; } = TimeSpan.FromSeconds(10); + + public ILogger Logger { get; set; } + + private bool _isDisposed; + + public ConsumerPool(IOptions options) + { + Options = options.Value; + + Consumers = new ConcurrentDictionary>(); + Logger = new NullLogger(); + } + + public virtual IConsumer Get(string groupId, string connectionName = null) + { + connectionName ??= KafkaConnections.DefaultConnectionName; + var config = new ConsumerConfig(Options.Connections.GetOrDefault(connectionName)) + { + GroupId = groupId, + EnableAutoCommit = false + }; + + Options.ConfigureConsumer?.Invoke(config); + + return Consumers.GetOrAdd( + connectionName, + new ConsumerBuilder(config).Build()); + } + + public void Dispose() + { + if (_isDisposed) + { + return; + } + + _isDisposed = true; + + if (!Consumers.Any()) + { + Logger.LogDebug($"Disposed consumer pool with no consumers in the pool."); + return; + } + + var poolDisposeStopwatch = Stopwatch.StartNew(); + + Logger.LogInformation($"Disposing consumer pool ({Consumers.Count} consumers)."); + + var remainingWaitDuration = TotalDisposeWaitDuration; + + foreach (var consumer in Consumers.Values) + { + var poolItemDisposeStopwatch = Stopwatch.StartNew(); + + try + { + consumer.Close(); + consumer.Dispose(); + } + catch + { + } + + poolItemDisposeStopwatch.Stop(); + + remainingWaitDuration = remainingWaitDuration > poolItemDisposeStopwatch.Elapsed + ? remainingWaitDuration.Subtract(poolItemDisposeStopwatch.Elapsed) + : TimeSpan.Zero; + } + + poolDisposeStopwatch.Stop(); + + Logger.LogInformation( + $"Disposed Kafka Consumer Pool ({Consumers.Count} consumers in {poolDisposeStopwatch.Elapsed.TotalMilliseconds:0.00} ms)."); + + if (poolDisposeStopwatch.Elapsed.TotalSeconds > 5.0) + { + Logger.LogWarning( + $"Disposing Kafka Consumer Pool got time greather than expected: {poolDisposeStopwatch.Elapsed.TotalMilliseconds:0.00} ms."); + } + + Consumers.Clear(); + } + } +} diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IConsumerPool.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IConsumerPool.cs new file mode 100644 index 00000000000..94c8a551e2a --- /dev/null +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IConsumerPool.cs @@ -0,0 +1,10 @@ +using System; +using Confluent.Kafka; + +namespace Volo.Abp.Kafka +{ + public interface IConsumerPool : IDisposable + { + IConsumer Get(string groupId, string connectionName = null); + } +} diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumer.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumer.cs new file mode 100644 index 00000000000..87872b31a23 --- /dev/null +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumer.cs @@ -0,0 +1,11 @@ +using System; +using System.Threading.Tasks; +using Confluent.Kafka; + +namespace Volo.Abp.Kafka +{ + public interface IKafkaMessageConsumer + { + void OnMessageReceived(Func, Task> callback); + } +} diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumerFactory.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumerFactory.cs new file mode 100644 index 00000000000..2b01b5a935a --- /dev/null +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumerFactory.cs @@ -0,0 +1,19 @@ +namespace Volo.Abp.Kafka +{ + public interface IKafkaMessageConsumerFactory + { + /// + /// Creates a new . + /// Avoid to create too many consumers since they are + /// not disposed until end of the application. + /// + /// + /// + /// + /// + IKafkaMessageConsumer Create( + string topicName, + string groupId, + string connectionName = null); + } +} diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaSerializer.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaSerializer.cs new file mode 100644 index 00000000000..58e718831c8 --- /dev/null +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaSerializer.cs @@ -0,0 +1,11 @@ +using System; + +namespace Volo.Abp.Kafka +{ + public interface IKafkaSerializer + { + byte[] Serialize(object obj); + + object Deserialize(byte[] value, Type type); + } +} diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IProducerPool.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IProducerPool.cs new file mode 100644 index 00000000000..2f94f23e2a6 --- /dev/null +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IProducerPool.cs @@ -0,0 +1,10 @@ +using System; +using Confluent.Kafka; + +namespace Volo.Abp.Kafka +{ + public interface IProducerPool : IDisposable + { + IProducer Get(string connectionName = null); + } +} diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaConnections.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaConnections.cs new file mode 100644 index 00000000000..159c7986918 --- /dev/null +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaConnections.cs @@ -0,0 +1,35 @@ +using System; +using System.Collections.Generic; +using Confluent.Kafka; +using JetBrains.Annotations; + +namespace Volo.Abp.Kafka +{ + [Serializable] + public class KafkaConnections : Dictionary + { + public const string DefaultConnectionName = "Default"; + + [NotNull] + public ClientConfig Default + { + get => this[DefaultConnectionName]; + set => this[DefaultConnectionName] = Check.NotNull(value, nameof(value)); + } + + public KafkaConnections() + { + Default = new ClientConfig(); + } + + public ClientConfig GetOrDefault(string connectionName) + { + if (TryGetValue(connectionName, out var connectionFactory)) + { + return connectionFactory; + } + + return Default; + } + } +} \ No newline at end of file diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs new file mode 100644 index 00000000000..c22ca6756b8 --- /dev/null +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs @@ -0,0 +1,156 @@ +using System; +using System.Collections.Concurrent; +using System.Linq; +using System.Threading.Tasks; +using Confluent.Kafka; +using Confluent.Kafka.Admin; +using JetBrains.Annotations; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using Volo.Abp.DependencyInjection; +using Volo.Abp.ExceptionHandling; +using Volo.Abp.Threading; + +namespace Volo.Abp.Kafka +{ + public class KafkaMessageConsumer : IKafkaMessageConsumer, ITransientDependency, IDisposable + { + public ILogger Logger { get; set; } + + protected IConsumerPool ConsumerPool { get; } + + protected IExceptionNotifier ExceptionNotifier { get; } + + protected AbpKafkaOptions Options { get; } + + protected ConcurrentBag, Task>> Callbacks { get; } + + protected IConsumer Consumer { get; private set; } + + protected string ConnectionName { get; private set; } + + protected string GroupId { get; private set; } + + protected string TopicName { get; private set; } + + public KafkaMessageConsumer( + IConsumerPool consumerPool, + IExceptionNotifier exceptionNotifier, + IOptions options) + { + ConsumerPool = consumerPool; + ExceptionNotifier = exceptionNotifier; + Options = options.Value; + Logger = NullLogger.Instance; + + Callbacks = new ConcurrentBag, Task>>(); + } + + public virtual void Initialize( + [NotNull] string topicName, + [NotNull] string groupId, + string connectionName = null) + { + Check.NotNull(topicName, nameof(topicName)); + Check.NotNull(groupId, nameof(groupId)); + TopicName = topicName; + ConnectionName = connectionName ?? KafkaConnections.DefaultConnectionName; + GroupId = groupId; + + AsyncHelper.RunSync(CreateTopicAsync); + Consume(); + } + + public virtual void OnMessageReceived(Func, Task> callback) + { + Callbacks.Add(callback); + } + + protected virtual async Task CreateTopicAsync() + { + using (var adminClient = new AdminClientBuilder(Options.Connections.GetOrDefault(ConnectionName)).Build()) + { + var topic = new TopicSpecification + { + Name = TopicName, + NumPartitions = 1, + ReplicationFactor = 1 + }; + + Options.ConfigureTopic?.Invoke(topic); + + try + { + await adminClient.CreateTopicsAsync(new[] {topic}); + } + catch (CreateTopicsException e) + { + if (!e.Error.Reason.Contains($"Topic '{TopicName}' already exists")) + { + throw; + } + } + } + } + + protected virtual void Consume() + { + Consumer = ConsumerPool.Get(GroupId, ConnectionName); + + Task.Factory.StartNew(async () => + { + Consumer.Subscribe(TopicName); + + while (true) + { + try + { + var consumeResult = Consumer.Consume(); + + if (consumeResult.IsPartitionEOF) + { + continue; + } + + await HandleIncomingMessage(consumeResult); + } + catch (ConsumeException ex) + { + Logger.LogException(ex, LogLevel.Warning); + AsyncHelper.RunSync(() => ExceptionNotifier.NotifyAsync(ex, logLevel: LogLevel.Warning)); + } + } + }); + } + + protected virtual async Task HandleIncomingMessage(ConsumeResult consumeResult) + { + try + { + foreach (var callback in Callbacks) + { + await callback(consumeResult.Message); + } + + Consumer.Commit(consumeResult); + } + catch (Exception ex) + { + Logger.LogException(ex); + await ExceptionNotifier.NotifyAsync(ex); + } + } + + public virtual void Dispose() + { + if (Consumer == null) + { + return; + } + + Consumer.Close(); + Consumer.Dispose(); + } + } +} diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumerFactory.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumerFactory.cs new file mode 100644 index 00000000000..fb08aa41444 --- /dev/null +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumerFactory.cs @@ -0,0 +1,32 @@ +using System; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Volo.Abp.DependencyInjection; + +namespace Volo.Abp.Kafka +{ + public class KafkaMessageConsumerFactory : IKafkaMessageConsumerFactory, ISingletonDependency, IDisposable + { + protected IServiceScope ServiceScope { get; } + + public KafkaMessageConsumerFactory(IServiceScopeFactory serviceScopeFactory) + { + ServiceScope = serviceScopeFactory.CreateScope(); + } + + public IKafkaMessageConsumer Create( + string topicName, + string groupId, + string connectionName = null) + { + var consumer = ServiceScope.ServiceProvider.GetRequiredService(); + consumer.Initialize(topicName, groupId, connectionName); + return consumer; + } + + public void Dispose() + { + ServiceScope?.Dispose(); + } + } +} diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs new file mode 100644 index 00000000000..77c8898d429 --- /dev/null +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs @@ -0,0 +1,97 @@ +using System; +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Linq; +using Confluent.Kafka; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using Volo.Abp.DependencyInjection; + +namespace Volo.Abp.Kafka +{ + public class ProducerPool : IProducerPool, ISingletonDependency + { + protected AbpKafkaOptions Options { get; } + + protected ConcurrentDictionary> Producers { get; } + + protected TimeSpan TotalDisposeWaitDuration { get; set; } = TimeSpan.FromSeconds(10); + + public ILogger Logger { get; set; } + + private bool _isDisposed; + + public ProducerPool(IOptions options) + { + Options = options.Value; + + Producers = new ConcurrentDictionary>(); + Logger = new NullLogger(); + } + + public virtual IProducer Get(string connectionName = null) + { + connectionName ??= KafkaConnections.DefaultConnectionName; + var config = Options.Connections.GetOrDefault(connectionName); + + Options.ConfigureProducer?.Invoke(new ProducerConfig(config)); + + return Producers.GetOrAdd( + connectionName, + new ProducerBuilder(config).Build()); + } + + public void Dispose() + { + if (_isDisposed) + { + return; + } + + _isDisposed = true; + + if (!Producers.Any()) + { + Logger.LogDebug($"Disposed producer pool with no producers in the pool."); + return; + } + + var poolDisposeStopwatch = Stopwatch.StartNew(); + + Logger.LogInformation($"Disposing producer pool ({Producers.Count} producers)."); + + var remainingWaitDuration = TotalDisposeWaitDuration; + + foreach (var producer in Producers.Values) + { + var poolItemDisposeStopwatch = Stopwatch.StartNew(); + + try + { + producer.Dispose(); + } + catch + { + } + + poolItemDisposeStopwatch.Stop(); + + remainingWaitDuration = remainingWaitDuration > poolItemDisposeStopwatch.Elapsed + ? remainingWaitDuration.Subtract(poolItemDisposeStopwatch.Elapsed) + : TimeSpan.Zero; + } + + poolDisposeStopwatch.Stop(); + + Logger.LogInformation($"Disposed Kafka Producer Pool ({Producers.Count} producers in {poolDisposeStopwatch.Elapsed.TotalMilliseconds:0.00} ms)."); + + if(poolDisposeStopwatch.Elapsed.TotalSeconds > 5.0) + { + Logger.LogWarning($"Disposing Kafka Producer Pool got time greather than expected: {poolDisposeStopwatch.Elapsed.TotalMilliseconds:0.00} ms."); + } + + Producers.Clear(); + } + } +} diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/Utf8JsonKafkaSerializer.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/Utf8JsonKafkaSerializer.cs new file mode 100644 index 00000000000..a04125f8a68 --- /dev/null +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/Utf8JsonKafkaSerializer.cs @@ -0,0 +1,27 @@ +using System; +using System.Text; +using Volo.Abp.DependencyInjection; +using Volo.Abp.Json; + +namespace Volo.Abp.Kafka +{ + public class Utf8JsonKafkaSerializer : IKafkaSerializer, ITransientDependency + { + private readonly IJsonSerializer _jsonSerializer; + + public Utf8JsonKafkaSerializer(IJsonSerializer jsonSerializer) + { + _jsonSerializer = jsonSerializer; + } + + public byte[] Serialize(object obj) + { + return Encoding.UTF8.GetBytes(_jsonSerializer.Serialize(obj)); + } + + public object Deserialize(byte[] value, Type type) + { + return _jsonSerializer.Deserialize(type, Encoding.UTF8.GetString(value)); + } + } +} diff --git a/nupkg/common.ps1 b/nupkg/common.ps1 index 99244d5d6ce..39d27004116 100644 --- a/nupkg/common.ps1 +++ b/nupkg/common.ps1 @@ -84,6 +84,7 @@ $projects = ( "framework/src/Volo.Abp.EntityFrameworkCore.SqlServer", "framework/src/Volo.Abp.EventBus", "framework/src/Volo.Abp.EventBus.RabbitMQ", + "framework/src/Volo.Abp.EventBus.Kafka", "framework/src/Volo.Abp.Features", "framework/src/Volo.Abp.FluentValidation", "framework/src/Volo.Abp.Guids", @@ -122,6 +123,7 @@ $projects = ( "framework/src/Volo.Abp.Validation.Abstractions", "framework/src/Volo.Abp.Validation", "framework/src/Volo.Abp.VirtualFileSystem", + "framework/src/Volo.Abp.Kafka" # modules/account "modules/account/src/Volo.Abp.Account.Application.Contracts", From 0e25b18aad27aded5468e693d56a94d5c23287ab Mon Sep 17 00:00:00 2001 From: liangshiwei Date: Fri, 14 Aug 2020 15:12:05 +0800 Subject: [PATCH 2/2] Update kafka connection pool --- .../Volo/Abp/Kafka/ConsumerPool.cs | 22 ++++++++++-------- .../Volo/Abp/Kafka/ProducerPool.cs | 23 +++++++++++-------- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ConsumerPool.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ConsumerPool.cs index 07af447e1a3..b2995d1079a 100644 --- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ConsumerPool.cs +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ConsumerPool.cs @@ -34,17 +34,21 @@ public ConsumerPool(IOptions options) public virtual IConsumer Get(string groupId, string connectionName = null) { connectionName ??= KafkaConnections.DefaultConnectionName; - var config = new ConsumerConfig(Options.Connections.GetOrDefault(connectionName)) - { - GroupId = groupId, - EnableAutoCommit = false - }; - - Options.ConfigureConsumer?.Invoke(config); return Consumers.GetOrAdd( - connectionName, - new ConsumerBuilder(config).Build()); + connectionName, connection => + { + var config = new ConsumerConfig(Options.Connections.GetOrDefault(connection)) + { + GroupId = groupId, + EnableAutoCommit = false + }; + + Options.ConfigureConsumer?.Invoke(config); + + return new ConsumerBuilder(config).Build(); + } + ); } public void Dispose() diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs index 77c8898d429..c4b4b9ead08 100644 --- a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs +++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs @@ -33,13 +33,16 @@ public ProducerPool(IOptions options) public virtual IProducer Get(string connectionName = null) { connectionName ??= KafkaConnections.DefaultConnectionName; - var config = Options.Connections.GetOrDefault(connectionName); - - Options.ConfigureProducer?.Invoke(new ProducerConfig(config)); - + return Producers.GetOrAdd( - connectionName, - new ProducerBuilder(config).Build()); + connectionName, connection => + { + var config = Options.Connections.GetOrDefault(connection); + + Options.ConfigureProducer?.Invoke(new ProducerConfig(config)); + + return new ProducerBuilder(config).Build(); + }); } public void Dispose() @@ -84,11 +87,13 @@ public void Dispose() poolDisposeStopwatch.Stop(); - Logger.LogInformation($"Disposed Kafka Producer Pool ({Producers.Count} producers in {poolDisposeStopwatch.Elapsed.TotalMilliseconds:0.00} ms)."); + Logger.LogInformation( + $"Disposed Kafka Producer Pool ({Producers.Count} producers in {poolDisposeStopwatch.Elapsed.TotalMilliseconds:0.00} ms)."); - if(poolDisposeStopwatch.Elapsed.TotalSeconds > 5.0) + if (poolDisposeStopwatch.Elapsed.TotalSeconds > 5.0) { - Logger.LogWarning($"Disposing Kafka Producer Pool got time greather than expected: {poolDisposeStopwatch.Elapsed.TotalMilliseconds:0.00} ms."); + Logger.LogWarning( + $"Disposing Kafka Producer Pool got time greather than expected: {poolDisposeStopwatch.Elapsed.TotalMilliseconds:0.00} ms."); } Producers.Clear();