diff --git a/framework/Volo.Abp.sln b/framework/Volo.Abp.sln
index 3811a80ee86..aa0a3f12228 100644
--- a/framework/Volo.Abp.sln
+++ b/framework/Volo.Abp.sln
@@ -323,6 +323,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.BlobStoring.Aws",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.BlobStoring.Aws.Tests", "test\Volo.Abp.BlobStoring.Aws.Tests\Volo.Abp.BlobStoring.Aws.Tests.csproj", "{2CD3B26A-CA81-4279-8D5D-6A594517BB3F}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.Kafka", "src\Volo.Abp.Kafka\Volo.Abp.Kafka.csproj", "{2A864049-9CD5-4493-8CDB-C408474D43D4}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.EventBus.Kafka", "src\Volo.Abp.EventBus.Kafka\Volo.Abp.EventBus.Kafka.csproj", "{C1D891B0-AE83-42CB-987D-425A2787DE78}"
+EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.GlobalFeatures", "src\Volo.Abp.GlobalFeatures\Volo.Abp.GlobalFeatures.csproj", "{04F44063-C952-403A-815F-EFB778BDA125}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.GlobalFeatures.Tests", "test\Volo.Abp.GlobalFeatures.Tests\Volo.Abp.GlobalFeatures.Tests.csproj", "{231F1581-AA21-44C3-BF27-51EB3AD5355C}"
@@ -965,6 +969,14 @@ Global
{2CD3B26A-CA81-4279-8D5D-6A594517BB3F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2CD3B26A-CA81-4279-8D5D-6A594517BB3F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2CD3B26A-CA81-4279-8D5D-6A594517BB3F}.Release|Any CPU.Build.0 = Release|Any CPU
+ {2A864049-9CD5-4493-8CDB-C408474D43D4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {2A864049-9CD5-4493-8CDB-C408474D43D4}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {2A864049-9CD5-4493-8CDB-C408474D43D4}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {2A864049-9CD5-4493-8CDB-C408474D43D4}.Release|Any CPU.Build.0 = Release|Any CPU
+ {C1D891B0-AE83-42CB-987D-425A2787DE78}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {C1D891B0-AE83-42CB-987D-425A2787DE78}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {C1D891B0-AE83-42CB-987D-425A2787DE78}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {C1D891B0-AE83-42CB-987D-425A2787DE78}.Release|Any CPU.Build.0 = Release|Any CPU
{04F44063-C952-403A-815F-EFB778BDA125}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{04F44063-C952-403A-815F-EFB778BDA125}.Debug|Any CPU.Build.0 = Debug|Any CPU
{04F44063-C952-403A-815F-EFB778BDA125}.Release|Any CPU.ActiveCfg = Release|Any CPU
@@ -1136,6 +1148,8 @@ Global
{8E49687A-E69F-49F2-8DB0-428D0883A937} = {447C8A77-E5F0-4538-8687-7383196D04EA}
{50968CDE-1029-4051-B2E5-B69D0ECF2A18} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{2CD3B26A-CA81-4279-8D5D-6A594517BB3F} = {447C8A77-E5F0-4538-8687-7383196D04EA}
+ {2A864049-9CD5-4493-8CDB-C408474D43D4} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
+ {C1D891B0-AE83-42CB-987D-425A2787DE78} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{04F44063-C952-403A-815F-EFB778BDA125} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{231F1581-AA21-44C3-BF27-51EB3AD5355C} = {447C8A77-E5F0-4538-8687-7383196D04EA}
EndGlobalSection
diff --git a/framework/src/Volo.Abp.EventBus.Kafka/FodyWeavers.xml b/framework/src/Volo.Abp.EventBus.Kafka/FodyWeavers.xml
new file mode 100644
index 00000000000..bc5a74a236f
--- /dev/null
+++ b/framework/src/Volo.Abp.EventBus.Kafka/FodyWeavers.xml
@@ -0,0 +1,3 @@
+
+
+
diff --git a/framework/src/Volo.Abp.EventBus.Kafka/FodyWeavers.xsd b/framework/src/Volo.Abp.EventBus.Kafka/FodyWeavers.xsd
new file mode 100644
index 00000000000..3f3946e282d
--- /dev/null
+++ b/framework/src/Volo.Abp.EventBus.Kafka/FodyWeavers.xsd
@@ -0,0 +1,30 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed.
+
+
+
+
+ A comma-separated list of error codes that can be safely ignored in assembly verification.
+
+
+
+
+ 'false' to turn off automatic generation of the XML Schema file.
+
+
+
+
+
\ No newline at end of file
diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo.Abp.EventBus.Kafka.csproj b/framework/src/Volo.Abp.EventBus.Kafka/Volo.Abp.EventBus.Kafka.csproj
new file mode 100644
index 00000000000..42b02937813
--- /dev/null
+++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo.Abp.EventBus.Kafka.csproj
@@ -0,0 +1,16 @@
+
+
+
+
+
+
+ netstandard2.0
+
+
+
+
+
+
+
+
+
diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/AbpEventBusKafkaModule.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/AbpEventBusKafkaModule.cs
new file mode 100644
index 00000000000..e0e408cb6ba
--- /dev/null
+++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/AbpEventBusKafkaModule.cs
@@ -0,0 +1,27 @@
+using Microsoft.Extensions.DependencyInjection;
+using Volo.Abp.Kafka;
+using Volo.Abp.Modularity;
+
+namespace Volo.Abp.EventBus.Kafka
+{
+ [DependsOn(
+ typeof(AbpEventBusModule),
+ typeof(AbpKafkaModule))]
+ public class AbpEventBusKafkaModule : AbpModule
+ {
+ public override void ConfigureServices(ServiceConfigurationContext context)
+ {
+ var configuration = context.Services.GetConfiguration();
+
+ Configure(configuration.GetSection("Kafka:EventBus"));
+ }
+
+ public override void OnApplicationInitialization(ApplicationInitializationContext context)
+ {
+ context
+ .ServiceProvider
+ .GetRequiredService()
+ .Initialize();
+ }
+ }
+}
diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/AbpKafkaEventBusOptions.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/AbpKafkaEventBusOptions.cs
new file mode 100644
index 00000000000..3aff9699fb4
--- /dev/null
+++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/AbpKafkaEventBusOptions.cs
@@ -0,0 +1,12 @@
+namespace Volo.Abp.EventBus.Kafka
+{
+ public class AbpKafkaEventBusOptions
+ {
+
+ public string ConnectionName { get; set; }
+
+ public string TopicName { get; set; }
+
+ public string GroupId { get; set; }
+ }
+}
diff --git a/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs
new file mode 100644
index 00000000000..fd6d773952f
--- /dev/null
+++ b/framework/src/Volo.Abp.EventBus.Kafka/Volo/Abp/EventBus/Kafka/KafkaDistributedEventBus.cs
@@ -0,0 +1,208 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using Confluent.Kafka;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
+using Volo.Abp.DependencyInjection;
+using Volo.Abp.EventBus.Distributed;
+using Volo.Abp.Kafka;
+using Volo.Abp.MultiTenancy;
+using Volo.Abp.Threading;
+
+namespace Volo.Abp.EventBus.Kafka
+{
+ [Dependency(ReplaceServices = true)]
+ [ExposeServices(typeof(IDistributedEventBus), typeof(KafkaDistributedEventBus))]
+ public class KafkaDistributedEventBus : EventBusBase, IDistributedEventBus, ISingletonDependency
+ {
+ protected AbpKafkaEventBusOptions AbpKafkaEventBusOptions { get; }
+ protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; }
+ protected IKafkaMessageConsumerFactory MessageConsumerFactory { get; }
+ protected IKafkaSerializer Serializer { get; }
+ protected IProducerPool ProducerPool { get; }
+ protected ConcurrentDictionary> HandlerFactories { get; }
+ protected ConcurrentDictionary EventTypes { get; }
+ protected IKafkaMessageConsumer Consumer { get; private set; }
+
+ public KafkaDistributedEventBus(
+ IServiceScopeFactory serviceScopeFactory,
+ ICurrentTenant currentTenant,
+ IOptions abpKafkaEventBusOptions,
+ IKafkaMessageConsumerFactory messageConsumerFactory,
+ IOptions abpDistributedEventBusOptions,
+ IKafkaSerializer serializer,
+ IProducerPool producerPool)
+ : base(serviceScopeFactory, currentTenant)
+ {
+ AbpKafkaEventBusOptions = abpKafkaEventBusOptions.Value;
+ AbpDistributedEventBusOptions = abpDistributedEventBusOptions.Value;
+ MessageConsumerFactory = messageConsumerFactory;
+ Serializer = serializer;
+ ProducerPool = producerPool;
+
+ HandlerFactories = new ConcurrentDictionary>();
+ EventTypes = new ConcurrentDictionary();
+ }
+
+ public void Initialize()
+ {
+ Consumer = MessageConsumerFactory.Create(
+ AbpKafkaEventBusOptions.TopicName,
+ AbpKafkaEventBusOptions.GroupId,
+ AbpKafkaEventBusOptions.ConnectionName);
+
+ Consumer.OnMessageReceived(ProcessEventAsync);
+
+ SubscribeHandlers(AbpDistributedEventBusOptions.Handlers);
+ }
+
+ private async Task ProcessEventAsync(Message message)
+ {
+ var eventName = message.Key;
+ var eventType = EventTypes.GetOrDefault(eventName);
+ if (eventType == null)
+ {
+ return;
+ }
+
+ var eventData = Serializer.Deserialize(message.Value, eventType);
+
+ await TriggerHandlersAsync(eventType, eventData);
+ }
+
+ public IDisposable Subscribe(IDistributedEventHandler handler) where TEvent : class
+ {
+ return Subscribe(typeof(TEvent), handler);
+ }
+
+ public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
+ {
+ var handlerFactories = GetOrCreateHandlerFactories(eventType);
+
+ if (factory.IsInFactories(handlerFactories))
+ {
+ return NullDisposable.Instance;
+ }
+
+ handlerFactories.Add(factory);
+
+ return new EventHandlerFactoryUnregistrar(this, eventType, factory);
+ }
+
+ ///
+ public override void Unsubscribe(Func action)
+ {
+ Check.NotNull(action, nameof(action));
+
+ GetOrCreateHandlerFactories(typeof(TEvent))
+ .Locking(factories =>
+ {
+ factories.RemoveAll(
+ factory =>
+ {
+ var singleInstanceFactory = factory as SingleInstanceHandlerFactory;
+ if (singleInstanceFactory == null)
+ {
+ return false;
+ }
+
+ var actionHandler = singleInstanceFactory.HandlerInstance as ActionEventHandler;
+ if (actionHandler == null)
+ {
+ return false;
+ }
+
+ return actionHandler.Action == action;
+ });
+ });
+ }
+
+ ///
+ public override void Unsubscribe(Type eventType, IEventHandler handler)
+ {
+ GetOrCreateHandlerFactories(eventType)
+ .Locking(factories =>
+ {
+ factories.RemoveAll(
+ factory =>
+ factory is SingleInstanceHandlerFactory handlerFactory &&
+ handlerFactory.HandlerInstance == handler
+ );
+ });
+ }
+
+ ///
+ public override void Unsubscribe(Type eventType, IEventHandlerFactory factory)
+ {
+ GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Remove(factory));
+ }
+
+ ///
+ public override void UnsubscribeAll(Type eventType)
+ {
+ GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear());
+ }
+
+ public override async Task PublishAsync(Type eventType, object eventData)
+ {
+ var eventName = EventNameAttribute.GetNameOrDefault(eventType);
+ var body = Serializer.Serialize(eventData);
+
+ var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName);
+
+ await producer.ProduceAsync(
+ AbpKafkaEventBusOptions.TopicName,
+ new Message
+ {
+ Key = eventName, Value = body
+ });
+ }
+
+ private List GetOrCreateHandlerFactories(Type eventType)
+ {
+ return HandlerFactories.GetOrAdd(
+ eventType,
+ type =>
+ {
+ var eventName = EventNameAttribute.GetNameOrDefault(type);
+ EventTypes[eventName] = type;
+ return new List();
+ }
+ );
+ }
+
+ protected override IEnumerable GetHandlerFactories(Type eventType)
+ {
+ var handlerFactoryList = new List();
+
+ foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))
+ )
+ {
+ handlerFactoryList.Add(
+ new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
+ }
+
+ return handlerFactoryList.ToArray();
+ }
+
+ private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)
+ {
+ //Should trigger same type
+ if (handlerEventType == targetEventType)
+ {
+ return true;
+ }
+
+ //Should trigger for inherited types
+ if (handlerEventType.IsAssignableFrom(targetEventType))
+ {
+ return true;
+ }
+
+ return false;
+ }
+ }
+}
diff --git a/framework/src/Volo.Abp.Kafka/FodyWeavers.xml b/framework/src/Volo.Abp.Kafka/FodyWeavers.xml
new file mode 100644
index 00000000000..be0de3a9084
--- /dev/null
+++ b/framework/src/Volo.Abp.Kafka/FodyWeavers.xml
@@ -0,0 +1,3 @@
+
+
+
\ No newline at end of file
diff --git a/framework/src/Volo.Abp.Kafka/FodyWeavers.xsd b/framework/src/Volo.Abp.Kafka/FodyWeavers.xsd
new file mode 100644
index 00000000000..3f3946e282d
--- /dev/null
+++ b/framework/src/Volo.Abp.Kafka/FodyWeavers.xsd
@@ -0,0 +1,30 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed.
+
+
+
+
+ A comma-separated list of error codes that can be safely ignored in assembly verification.
+
+
+
+
+ 'false' to turn off automatic generation of the XML Schema file.
+
+
+
+
+
\ No newline at end of file
diff --git a/framework/src/Volo.Abp.Kafka/Volo.Abp.Kafka.csproj b/framework/src/Volo.Abp.Kafka/Volo.Abp.Kafka.csproj
new file mode 100644
index 00000000000..e91f42d7b28
--- /dev/null
+++ b/framework/src/Volo.Abp.Kafka/Volo.Abp.Kafka.csproj
@@ -0,0 +1,17 @@
+
+
+
+
+
+
+ netstandard2.0
+
+
+
+
+
+
+
+
+
+
diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaModule.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaModule.cs
new file mode 100644
index 00000000000..d69e9243490
--- /dev/null
+++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaModule.cs
@@ -0,0 +1,31 @@
+using Microsoft.Extensions.DependencyInjection;
+using Volo.Abp.Json;
+using Volo.Abp.Modularity;
+using Volo.Abp.Threading;
+
+namespace Volo.Abp.Kafka
+{
+ [DependsOn(
+ typeof(AbpJsonModule),
+ typeof(AbpThreadingModule)
+ )]
+ public class AbpKafkaModule : AbpModule
+ {
+ public override void ConfigureServices(ServiceConfigurationContext context)
+ {
+ var configuration = context.Services.GetConfiguration();
+ Configure(configuration.GetSection("Kafka"));
+ }
+
+ public override void OnApplicationShutdown(ApplicationShutdownContext context)
+ {
+ context.ServiceProvider
+ .GetRequiredService()
+ .Dispose();
+
+ context.ServiceProvider
+ .GetRequiredService()
+ .Dispose();
+ }
+ }
+}
diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaOptions.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaOptions.cs
new file mode 100644
index 00000000000..1769d8a076f
--- /dev/null
+++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/AbpKafkaOptions.cs
@@ -0,0 +1,22 @@
+using System;
+using Confluent.Kafka;
+using Confluent.Kafka.Admin;
+
+namespace Volo.Abp.Kafka
+{
+ public class AbpKafkaOptions
+ {
+ public KafkaConnections Connections { get; }
+
+ public Action ConfigureProducer { get; set; }
+
+ public Action ConfigureConsumer { get; set; }
+
+ public Action ConfigureTopic { get; set; }
+
+ public AbpKafkaOptions()
+ {
+ Connections = new KafkaConnections();
+ }
+ }
+}
diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ConsumerPool.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ConsumerPool.cs
new file mode 100644
index 00000000000..b2995d1079a
--- /dev/null
+++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ConsumerPool.cs
@@ -0,0 +1,109 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using Confluent.Kafka;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Logging.Abstractions;
+using Microsoft.Extensions.Options;
+using Volo.Abp.DependencyInjection;
+
+namespace Volo.Abp.Kafka
+{
+ public class ConsumerPool : IConsumerPool, ISingletonDependency
+ {
+ protected AbpKafkaOptions Options { get; }
+
+ protected ConcurrentDictionary> Consumers { get; }
+
+ protected TimeSpan TotalDisposeWaitDuration { get; set; } = TimeSpan.FromSeconds(10);
+
+ public ILogger Logger { get; set; }
+
+ private bool _isDisposed;
+
+ public ConsumerPool(IOptions options)
+ {
+ Options = options.Value;
+
+ Consumers = new ConcurrentDictionary>();
+ Logger = new NullLogger();
+ }
+
+ public virtual IConsumer Get(string groupId, string connectionName = null)
+ {
+ connectionName ??= KafkaConnections.DefaultConnectionName;
+
+ return Consumers.GetOrAdd(
+ connectionName, connection =>
+ {
+ var config = new ConsumerConfig(Options.Connections.GetOrDefault(connection))
+ {
+ GroupId = groupId,
+ EnableAutoCommit = false
+ };
+
+ Options.ConfigureConsumer?.Invoke(config);
+
+ return new ConsumerBuilder(config).Build();
+ }
+ );
+ }
+
+ public void Dispose()
+ {
+ if (_isDisposed)
+ {
+ return;
+ }
+
+ _isDisposed = true;
+
+ if (!Consumers.Any())
+ {
+ Logger.LogDebug($"Disposed consumer pool with no consumers in the pool.");
+ return;
+ }
+
+ var poolDisposeStopwatch = Stopwatch.StartNew();
+
+ Logger.LogInformation($"Disposing consumer pool ({Consumers.Count} consumers).");
+
+ var remainingWaitDuration = TotalDisposeWaitDuration;
+
+ foreach (var consumer in Consumers.Values)
+ {
+ var poolItemDisposeStopwatch = Stopwatch.StartNew();
+
+ try
+ {
+ consumer.Close();
+ consumer.Dispose();
+ }
+ catch
+ {
+ }
+
+ poolItemDisposeStopwatch.Stop();
+
+ remainingWaitDuration = remainingWaitDuration > poolItemDisposeStopwatch.Elapsed
+ ? remainingWaitDuration.Subtract(poolItemDisposeStopwatch.Elapsed)
+ : TimeSpan.Zero;
+ }
+
+ poolDisposeStopwatch.Stop();
+
+ Logger.LogInformation(
+ $"Disposed Kafka Consumer Pool ({Consumers.Count} consumers in {poolDisposeStopwatch.Elapsed.TotalMilliseconds:0.00} ms).");
+
+ if (poolDisposeStopwatch.Elapsed.TotalSeconds > 5.0)
+ {
+ Logger.LogWarning(
+ $"Disposing Kafka Consumer Pool got time greather than expected: {poolDisposeStopwatch.Elapsed.TotalMilliseconds:0.00} ms.");
+ }
+
+ Consumers.Clear();
+ }
+ }
+}
diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IConsumerPool.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IConsumerPool.cs
new file mode 100644
index 00000000000..94c8a551e2a
--- /dev/null
+++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IConsumerPool.cs
@@ -0,0 +1,10 @@
+using System;
+using Confluent.Kafka;
+
+namespace Volo.Abp.Kafka
+{
+ public interface IConsumerPool : IDisposable
+ {
+ IConsumer Get(string groupId, string connectionName = null);
+ }
+}
diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumer.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumer.cs
new file mode 100644
index 00000000000..87872b31a23
--- /dev/null
+++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumer.cs
@@ -0,0 +1,11 @@
+using System;
+using System.Threading.Tasks;
+using Confluent.Kafka;
+
+namespace Volo.Abp.Kafka
+{
+ public interface IKafkaMessageConsumer
+ {
+ void OnMessageReceived(Func, Task> callback);
+ }
+}
diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumerFactory.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumerFactory.cs
new file mode 100644
index 00000000000..2b01b5a935a
--- /dev/null
+++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaMessageConsumerFactory.cs
@@ -0,0 +1,19 @@
+namespace Volo.Abp.Kafka
+{
+ public interface IKafkaMessageConsumerFactory
+ {
+ ///
+ /// Creates a new .
+ /// Avoid to create too many consumers since they are
+ /// not disposed until end of the application.
+ ///
+ ///
+ ///
+ ///
+ ///
+ IKafkaMessageConsumer Create(
+ string topicName,
+ string groupId,
+ string connectionName = null);
+ }
+}
diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaSerializer.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaSerializer.cs
new file mode 100644
index 00000000000..58e718831c8
--- /dev/null
+++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IKafkaSerializer.cs
@@ -0,0 +1,11 @@
+using System;
+
+namespace Volo.Abp.Kafka
+{
+ public interface IKafkaSerializer
+ {
+ byte[] Serialize(object obj);
+
+ object Deserialize(byte[] value, Type type);
+ }
+}
diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IProducerPool.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IProducerPool.cs
new file mode 100644
index 00000000000..2f94f23e2a6
--- /dev/null
+++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/IProducerPool.cs
@@ -0,0 +1,10 @@
+using System;
+using Confluent.Kafka;
+
+namespace Volo.Abp.Kafka
+{
+ public interface IProducerPool : IDisposable
+ {
+ IProducer Get(string connectionName = null);
+ }
+}
diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaConnections.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaConnections.cs
new file mode 100644
index 00000000000..159c7986918
--- /dev/null
+++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaConnections.cs
@@ -0,0 +1,35 @@
+using System;
+using System.Collections.Generic;
+using Confluent.Kafka;
+using JetBrains.Annotations;
+
+namespace Volo.Abp.Kafka
+{
+ [Serializable]
+ public class KafkaConnections : Dictionary
+ {
+ public const string DefaultConnectionName = "Default";
+
+ [NotNull]
+ public ClientConfig Default
+ {
+ get => this[DefaultConnectionName];
+ set => this[DefaultConnectionName] = Check.NotNull(value, nameof(value));
+ }
+
+ public KafkaConnections()
+ {
+ Default = new ClientConfig();
+ }
+
+ public ClientConfig GetOrDefault(string connectionName)
+ {
+ if (TryGetValue(connectionName, out var connectionFactory))
+ {
+ return connectionFactory;
+ }
+
+ return Default;
+ }
+ }
+}
\ No newline at end of file
diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs
new file mode 100644
index 00000000000..c22ca6756b8
--- /dev/null
+++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumer.cs
@@ -0,0 +1,156 @@
+using System;
+using System.Collections.Concurrent;
+using System.Linq;
+using System.Threading.Tasks;
+using Confluent.Kafka;
+using Confluent.Kafka.Admin;
+using JetBrains.Annotations;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Logging.Abstractions;
+using Microsoft.Extensions.Options;
+using Volo.Abp.DependencyInjection;
+using Volo.Abp.ExceptionHandling;
+using Volo.Abp.Threading;
+
+namespace Volo.Abp.Kafka
+{
+ public class KafkaMessageConsumer : IKafkaMessageConsumer, ITransientDependency, IDisposable
+ {
+ public ILogger Logger { get; set; }
+
+ protected IConsumerPool ConsumerPool { get; }
+
+ protected IExceptionNotifier ExceptionNotifier { get; }
+
+ protected AbpKafkaOptions Options { get; }
+
+ protected ConcurrentBag, Task>> Callbacks { get; }
+
+ protected IConsumer Consumer { get; private set; }
+
+ protected string ConnectionName { get; private set; }
+
+ protected string GroupId { get; private set; }
+
+ protected string TopicName { get; private set; }
+
+ public KafkaMessageConsumer(
+ IConsumerPool consumerPool,
+ IExceptionNotifier exceptionNotifier,
+ IOptions options)
+ {
+ ConsumerPool = consumerPool;
+ ExceptionNotifier = exceptionNotifier;
+ Options = options.Value;
+ Logger = NullLogger.Instance;
+
+ Callbacks = new ConcurrentBag, Task>>();
+ }
+
+ public virtual void Initialize(
+ [NotNull] string topicName,
+ [NotNull] string groupId,
+ string connectionName = null)
+ {
+ Check.NotNull(topicName, nameof(topicName));
+ Check.NotNull(groupId, nameof(groupId));
+ TopicName = topicName;
+ ConnectionName = connectionName ?? KafkaConnections.DefaultConnectionName;
+ GroupId = groupId;
+
+ AsyncHelper.RunSync(CreateTopicAsync);
+ Consume();
+ }
+
+ public virtual void OnMessageReceived(Func, Task> callback)
+ {
+ Callbacks.Add(callback);
+ }
+
+ protected virtual async Task CreateTopicAsync()
+ {
+ using (var adminClient = new AdminClientBuilder(Options.Connections.GetOrDefault(ConnectionName)).Build())
+ {
+ var topic = new TopicSpecification
+ {
+ Name = TopicName,
+ NumPartitions = 1,
+ ReplicationFactor = 1
+ };
+
+ Options.ConfigureTopic?.Invoke(topic);
+
+ try
+ {
+ await adminClient.CreateTopicsAsync(new[] {topic});
+ }
+ catch (CreateTopicsException e)
+ {
+ if (!e.Error.Reason.Contains($"Topic '{TopicName}' already exists"))
+ {
+ throw;
+ }
+ }
+ }
+ }
+
+ protected virtual void Consume()
+ {
+ Consumer = ConsumerPool.Get(GroupId, ConnectionName);
+
+ Task.Factory.StartNew(async () =>
+ {
+ Consumer.Subscribe(TopicName);
+
+ while (true)
+ {
+ try
+ {
+ var consumeResult = Consumer.Consume();
+
+ if (consumeResult.IsPartitionEOF)
+ {
+ continue;
+ }
+
+ await HandleIncomingMessage(consumeResult);
+ }
+ catch (ConsumeException ex)
+ {
+ Logger.LogException(ex, LogLevel.Warning);
+ AsyncHelper.RunSync(() => ExceptionNotifier.NotifyAsync(ex, logLevel: LogLevel.Warning));
+ }
+ }
+ });
+ }
+
+ protected virtual async Task HandleIncomingMessage(ConsumeResult consumeResult)
+ {
+ try
+ {
+ foreach (var callback in Callbacks)
+ {
+ await callback(consumeResult.Message);
+ }
+
+ Consumer.Commit(consumeResult);
+ }
+ catch (Exception ex)
+ {
+ Logger.LogException(ex);
+ await ExceptionNotifier.NotifyAsync(ex);
+ }
+ }
+
+ public virtual void Dispose()
+ {
+ if (Consumer == null)
+ {
+ return;
+ }
+
+ Consumer.Close();
+ Consumer.Dispose();
+ }
+ }
+}
diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumerFactory.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumerFactory.cs
new file mode 100644
index 00000000000..fb08aa41444
--- /dev/null
+++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/KafkaMessageConsumerFactory.cs
@@ -0,0 +1,32 @@
+using System;
+using System.Threading.Tasks;
+using Microsoft.Extensions.DependencyInjection;
+using Volo.Abp.DependencyInjection;
+
+namespace Volo.Abp.Kafka
+{
+ public class KafkaMessageConsumerFactory : IKafkaMessageConsumerFactory, ISingletonDependency, IDisposable
+ {
+ protected IServiceScope ServiceScope { get; }
+
+ public KafkaMessageConsumerFactory(IServiceScopeFactory serviceScopeFactory)
+ {
+ ServiceScope = serviceScopeFactory.CreateScope();
+ }
+
+ public IKafkaMessageConsumer Create(
+ string topicName,
+ string groupId,
+ string connectionName = null)
+ {
+ var consumer = ServiceScope.ServiceProvider.GetRequiredService();
+ consumer.Initialize(topicName, groupId, connectionName);
+ return consumer;
+ }
+
+ public void Dispose()
+ {
+ ServiceScope?.Dispose();
+ }
+ }
+}
diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs
new file mode 100644
index 00000000000..c4b4b9ead08
--- /dev/null
+++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/ProducerPool.cs
@@ -0,0 +1,102 @@
+using System;
+using System.Collections.Concurrent;
+using System.Diagnostics;
+using System.Linq;
+using Confluent.Kafka;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Logging.Abstractions;
+using Microsoft.Extensions.Options;
+using Volo.Abp.DependencyInjection;
+
+namespace Volo.Abp.Kafka
+{
+ public class ProducerPool : IProducerPool, ISingletonDependency
+ {
+ protected AbpKafkaOptions Options { get; }
+
+ protected ConcurrentDictionary> Producers { get; }
+
+ protected TimeSpan TotalDisposeWaitDuration { get; set; } = TimeSpan.FromSeconds(10);
+
+ public ILogger Logger { get; set; }
+
+ private bool _isDisposed;
+
+ public ProducerPool(IOptions options)
+ {
+ Options = options.Value;
+
+ Producers = new ConcurrentDictionary>();
+ Logger = new NullLogger();
+ }
+
+ public virtual IProducer Get(string connectionName = null)
+ {
+ connectionName ??= KafkaConnections.DefaultConnectionName;
+
+ return Producers.GetOrAdd(
+ connectionName, connection =>
+ {
+ var config = Options.Connections.GetOrDefault(connection);
+
+ Options.ConfigureProducer?.Invoke(new ProducerConfig(config));
+
+ return new ProducerBuilder(config).Build();
+ });
+ }
+
+ public void Dispose()
+ {
+ if (_isDisposed)
+ {
+ return;
+ }
+
+ _isDisposed = true;
+
+ if (!Producers.Any())
+ {
+ Logger.LogDebug($"Disposed producer pool with no producers in the pool.");
+ return;
+ }
+
+ var poolDisposeStopwatch = Stopwatch.StartNew();
+
+ Logger.LogInformation($"Disposing producer pool ({Producers.Count} producers).");
+
+ var remainingWaitDuration = TotalDisposeWaitDuration;
+
+ foreach (var producer in Producers.Values)
+ {
+ var poolItemDisposeStopwatch = Stopwatch.StartNew();
+
+ try
+ {
+ producer.Dispose();
+ }
+ catch
+ {
+ }
+
+ poolItemDisposeStopwatch.Stop();
+
+ remainingWaitDuration = remainingWaitDuration > poolItemDisposeStopwatch.Elapsed
+ ? remainingWaitDuration.Subtract(poolItemDisposeStopwatch.Elapsed)
+ : TimeSpan.Zero;
+ }
+
+ poolDisposeStopwatch.Stop();
+
+ Logger.LogInformation(
+ $"Disposed Kafka Producer Pool ({Producers.Count} producers in {poolDisposeStopwatch.Elapsed.TotalMilliseconds:0.00} ms).");
+
+ if (poolDisposeStopwatch.Elapsed.TotalSeconds > 5.0)
+ {
+ Logger.LogWarning(
+ $"Disposing Kafka Producer Pool got time greather than expected: {poolDisposeStopwatch.Elapsed.TotalMilliseconds:0.00} ms.");
+ }
+
+ Producers.Clear();
+ }
+ }
+}
diff --git a/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/Utf8JsonKafkaSerializer.cs b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/Utf8JsonKafkaSerializer.cs
new file mode 100644
index 00000000000..a04125f8a68
--- /dev/null
+++ b/framework/src/Volo.Abp.Kafka/Volo/Abp/Kafka/Utf8JsonKafkaSerializer.cs
@@ -0,0 +1,27 @@
+using System;
+using System.Text;
+using Volo.Abp.DependencyInjection;
+using Volo.Abp.Json;
+
+namespace Volo.Abp.Kafka
+{
+ public class Utf8JsonKafkaSerializer : IKafkaSerializer, ITransientDependency
+ {
+ private readonly IJsonSerializer _jsonSerializer;
+
+ public Utf8JsonKafkaSerializer(IJsonSerializer jsonSerializer)
+ {
+ _jsonSerializer = jsonSerializer;
+ }
+
+ public byte[] Serialize(object obj)
+ {
+ return Encoding.UTF8.GetBytes(_jsonSerializer.Serialize(obj));
+ }
+
+ public object Deserialize(byte[] value, Type type)
+ {
+ return _jsonSerializer.Deserialize(type, Encoding.UTF8.GetString(value));
+ }
+ }
+}
diff --git a/nupkg/common.ps1 b/nupkg/common.ps1
index 58b8cf83cff..7d571f67094 100644
--- a/nupkg/common.ps1
+++ b/nupkg/common.ps1
@@ -84,6 +84,7 @@ $projects = (
"framework/src/Volo.Abp.EntityFrameworkCore.SqlServer",
"framework/src/Volo.Abp.EventBus",
"framework/src/Volo.Abp.EventBus.RabbitMQ",
+ "framework/src/Volo.Abp.EventBus.Kafka",
"framework/src/Volo.Abp.Features",
"framework/src/Volo.Abp.FluentValidation",
"framework/src/Volo.Abp.GlobalFeatures",
@@ -123,6 +124,7 @@ $projects = (
"framework/src/Volo.Abp.Validation.Abstractions",
"framework/src/Volo.Abp.Validation",
"framework/src/Volo.Abp.VirtualFileSystem",
+ "framework/src/Volo.Abp.Kafka"
# modules/account
"modules/account/src/Volo.Abp.Account.Application.Contracts",