Skip to content

Commit

Permalink
Merge pull request #5034 from abpframework/liangshiwei/Eventbus-kafka
Browse files Browse the repository at this point in the history
Add Kafka event bus integration
  • Loading branch information
hikalkan authored Aug 29, 2020
2 parents 78ae854 + 296fee3 commit 120804d
Show file tree
Hide file tree
Showing 24 changed files with 937 additions and 0 deletions.
14 changes: 14 additions & 0 deletions framework/Volo.Abp.sln
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.BlobStoring.Aws",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.BlobStoring.Aws.Tests", "test\Volo.Abp.BlobStoring.Aws.Tests\Volo.Abp.BlobStoring.Aws.Tests.csproj", "{2CD3B26A-CA81-4279-8D5D-6A594517BB3F}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.Kafka", "src\Volo.Abp.Kafka\Volo.Abp.Kafka.csproj", "{2A864049-9CD5-4493-8CDB-C408474D43D4}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.EventBus.Kafka", "src\Volo.Abp.EventBus.Kafka\Volo.Abp.EventBus.Kafka.csproj", "{C1D891B0-AE83-42CB-987D-425A2787DE78}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.GlobalFeatures", "src\Volo.Abp.GlobalFeatures\Volo.Abp.GlobalFeatures.csproj", "{04F44063-C952-403A-815F-EFB778BDA125}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.GlobalFeatures.Tests", "test\Volo.Abp.GlobalFeatures.Tests\Volo.Abp.GlobalFeatures.Tests.csproj", "{231F1581-AA21-44C3-BF27-51EB3AD5355C}"
Expand Down Expand Up @@ -965,6 +969,14 @@ Global
{2CD3B26A-CA81-4279-8D5D-6A594517BB3F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2CD3B26A-CA81-4279-8D5D-6A594517BB3F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2CD3B26A-CA81-4279-8D5D-6A594517BB3F}.Release|Any CPU.Build.0 = Release|Any CPU
{2A864049-9CD5-4493-8CDB-C408474D43D4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2A864049-9CD5-4493-8CDB-C408474D43D4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2A864049-9CD5-4493-8CDB-C408474D43D4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2A864049-9CD5-4493-8CDB-C408474D43D4}.Release|Any CPU.Build.0 = Release|Any CPU
{C1D891B0-AE83-42CB-987D-425A2787DE78}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C1D891B0-AE83-42CB-987D-425A2787DE78}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C1D891B0-AE83-42CB-987D-425A2787DE78}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C1D891B0-AE83-42CB-987D-425A2787DE78}.Release|Any CPU.Build.0 = Release|Any CPU
{04F44063-C952-403A-815F-EFB778BDA125}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{04F44063-C952-403A-815F-EFB778BDA125}.Debug|Any CPU.Build.0 = Debug|Any CPU
{04F44063-C952-403A-815F-EFB778BDA125}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -1136,6 +1148,8 @@ Global
{8E49687A-E69F-49F2-8DB0-428D0883A937} = {447C8A77-E5F0-4538-8687-7383196D04EA}
{50968CDE-1029-4051-B2E5-B69D0ECF2A18} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{2CD3B26A-CA81-4279-8D5D-6A594517BB3F} = {447C8A77-E5F0-4538-8687-7383196D04EA}
{2A864049-9CD5-4493-8CDB-C408474D43D4} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{C1D891B0-AE83-42CB-987D-425A2787DE78} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{04F44063-C952-403A-815F-EFB778BDA125} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{231F1581-AA21-44C3-BF27-51EB3AD5355C} = {447C8A77-E5F0-4538-8687-7383196D04EA}
EndGlobalSection
Expand Down
3 changes: 3 additions & 0 deletions framework/src/Volo.Abp.EventBus.Kafka/FodyWeavers.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<Weavers xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="FodyWeavers.xsd">
<ConfigureAwait ContinueOnCapturedContext="false" />
</Weavers>
30 changes: 30 additions & 0 deletions framework/src/Volo.Abp.EventBus.Kafka/FodyWeavers.xsd
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="utf-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<!-- This file was generated by Fody. Manual changes to this file will be lost when your project is rebuilt. -->
<xs:element name="Weavers">
<xs:complexType>
<xs:all>
<xs:element name="ConfigureAwait" minOccurs="0" maxOccurs="1">
<xs:complexType>
<xs:attribute name="ContinueOnCapturedContext" type="xs:boolean" />
</xs:complexType>
</xs:element>
</xs:all>
<xs:attribute name="VerifyAssembly" type="xs:boolean">
<xs:annotation>
<xs:documentation>'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="VerifyIgnoreCodes" type="xs:string">
<xs:annotation>
<xs:documentation>A comma-separated list of error codes that can be safely ignored in assembly verification.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="GenerateXsd" type="xs:boolean">
<xs:annotation>
<xs:documentation>'false' to turn off automatic generation of the XML Schema file.</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:complexType>
</xs:element>
</xs:schema>
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">

<Import Project="..\..\..\configureawait.props" />
<Import Project="..\..\..\common.props" />

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<RootNamespace />
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Volo.Abp.EventBus\Volo.Abp.EventBus.csproj" />
<ProjectReference Include="..\Volo.Abp.Kafka\Volo.Abp.Kafka.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.Kafka;
using Volo.Abp.Modularity;

namespace Volo.Abp.EventBus.Kafka
{
[DependsOn(
typeof(AbpEventBusModule),
typeof(AbpKafkaModule))]
public class AbpEventBusKafkaModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
var configuration = context.Services.GetConfiguration();

Configure<AbpKafkaEventBusOptions>(configuration.GetSection("Kafka:EventBus"));
}

public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
context
.ServiceProvider
.GetRequiredService<KafkaDistributedEventBus>()
.Initialize();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace Volo.Abp.EventBus.Kafka
{
public class AbpKafkaEventBusOptions
{

public string ConnectionName { get; set; }

public string TopicName { get; set; }

public string GroupId { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Kafka;
using Volo.Abp.MultiTenancy;
using Volo.Abp.Threading;

namespace Volo.Abp.EventBus.Kafka
{
[Dependency(ReplaceServices = true)]
[ExposeServices(typeof(IDistributedEventBus), typeof(KafkaDistributedEventBus))]
public class KafkaDistributedEventBus : EventBusBase, IDistributedEventBus, ISingletonDependency
{
protected AbpKafkaEventBusOptions AbpKafkaEventBusOptions { get; }
protected AbpDistributedEventBusOptions AbpDistributedEventBusOptions { get; }
protected IKafkaMessageConsumerFactory MessageConsumerFactory { get; }
protected IKafkaSerializer Serializer { get; }
protected IProducerPool ProducerPool { get; }
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
protected ConcurrentDictionary<string, Type> EventTypes { get; }
protected IKafkaMessageConsumer Consumer { get; private set; }

public KafkaDistributedEventBus(
IServiceScopeFactory serviceScopeFactory,
ICurrentTenant currentTenant,
IOptions<AbpKafkaEventBusOptions> abpKafkaEventBusOptions,
IKafkaMessageConsumerFactory messageConsumerFactory,
IOptions<AbpDistributedEventBusOptions> abpDistributedEventBusOptions,
IKafkaSerializer serializer,
IProducerPool producerPool)
: base(serviceScopeFactory, currentTenant)
{
AbpKafkaEventBusOptions = abpKafkaEventBusOptions.Value;
AbpDistributedEventBusOptions = abpDistributedEventBusOptions.Value;
MessageConsumerFactory = messageConsumerFactory;
Serializer = serializer;
ProducerPool = producerPool;

HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
EventTypes = new ConcurrentDictionary<string, Type>();
}

public void Initialize()
{
Consumer = MessageConsumerFactory.Create(
AbpKafkaEventBusOptions.TopicName,
AbpKafkaEventBusOptions.GroupId,
AbpKafkaEventBusOptions.ConnectionName);

Consumer.OnMessageReceived(ProcessEventAsync);

SubscribeHandlers(AbpDistributedEventBusOptions.Handlers);
}

private async Task ProcessEventAsync(Message<string, byte[]> message)
{
var eventName = message.Key;
var eventType = EventTypes.GetOrDefault(eventName);
if (eventType == null)
{
return;
}

var eventData = Serializer.Deserialize(message.Value, eventType);

await TriggerHandlersAsync(eventType, eventData);
}

public IDisposable Subscribe<TEvent>(IDistributedEventHandler<TEvent> handler) where TEvent : class
{
return Subscribe(typeof(TEvent), handler);
}

public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
var handlerFactories = GetOrCreateHandlerFactories(eventType);

if (factory.IsInFactories(handlerFactories))
{
return NullDisposable.Instance;
}

handlerFactories.Add(factory);

return new EventHandlerFactoryUnregistrar(this, eventType, factory);
}

/// <inheritdoc/>
public override void Unsubscribe<TEvent>(Func<TEvent, Task> action)
{
Check.NotNull(action, nameof(action));

GetOrCreateHandlerFactories(typeof(TEvent))
.Locking(factories =>
{
factories.RemoveAll(
factory =>
{
var singleInstanceFactory = factory as SingleInstanceHandlerFactory;
if (singleInstanceFactory == null)
{
return false;
}

var actionHandler = singleInstanceFactory.HandlerInstance as ActionEventHandler<TEvent>;
if (actionHandler == null)
{
return false;
}

return actionHandler.Action == action;
});
});
}

/// <inheritdoc/>
public override void Unsubscribe(Type eventType, IEventHandler handler)
{
GetOrCreateHandlerFactories(eventType)
.Locking(factories =>
{
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory handlerFactory &&
handlerFactory.HandlerInstance == handler
);
});
}

/// <inheritdoc/>
public override void Unsubscribe(Type eventType, IEventHandlerFactory factory)
{
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Remove(factory));
}

/// <inheritdoc/>
public override void UnsubscribeAll(Type eventType)
{
GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Clear());
}

public override async Task PublishAsync(Type eventType, object eventData)
{
var eventName = EventNameAttribute.GetNameOrDefault(eventType);
var body = Serializer.Serialize(eventData);

var producer = ProducerPool.Get(AbpKafkaEventBusOptions.ConnectionName);

await producer.ProduceAsync(
AbpKafkaEventBusOptions.TopicName,
new Message<string, byte[]>
{
Key = eventName, Value = body
});
}

private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
{
return HandlerFactories.GetOrAdd(
eventType,
type =>
{
var eventName = EventNameAttribute.GetNameOrDefault(type);
EventTypes[eventName] = type;
return new List<IEventHandlerFactory>();
}
);
}

protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType)
{
var handlerFactoryList = new List<EventTypeWithEventHandlerFactories>();

foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key))
)
{
handlerFactoryList.Add(
new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
}

return handlerFactoryList.ToArray();
}

private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)
{
//Should trigger same type
if (handlerEventType == targetEventType)
{
return true;
}

//Should trigger for inherited types
if (handlerEventType.IsAssignableFrom(targetEventType))
{
return true;
}

return false;
}
}
}
3 changes: 3 additions & 0 deletions framework/src/Volo.Abp.Kafka/FodyWeavers.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<Weavers xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="FodyWeavers.xsd">
<ConfigureAwait ContinueOnCapturedContext="false" />
</Weavers>
30 changes: 30 additions & 0 deletions framework/src/Volo.Abp.Kafka/FodyWeavers.xsd
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="utf-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<!-- This file was generated by Fody. Manual changes to this file will be lost when your project is rebuilt. -->
<xs:element name="Weavers">
<xs:complexType>
<xs:all>
<xs:element name="ConfigureAwait" minOccurs="0" maxOccurs="1">
<xs:complexType>
<xs:attribute name="ContinueOnCapturedContext" type="xs:boolean" />
</xs:complexType>
</xs:element>
</xs:all>
<xs:attribute name="VerifyAssembly" type="xs:boolean">
<xs:annotation>
<xs:documentation>'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="VerifyIgnoreCodes" type="xs:string">
<xs:annotation>
<xs:documentation>A comma-separated list of error codes that can be safely ignored in assembly verification.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="GenerateXsd" type="xs:boolean">
<xs:annotation>
<xs:documentation>'false' to turn off automatic generation of the XML Schema file.</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:complexType>
</xs:element>
</xs:schema>
17 changes: 17 additions & 0 deletions framework/src/Volo.Abp.Kafka/Volo.Abp.Kafka.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">

<Import Project="..\..\..\configureawait.props" />
<Import Project="..\..\..\common.props" />

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<RootNamespace />
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.5.0" />
<ProjectReference Include="..\Volo.Abp.Json\Volo.Abp.Json.csproj" />
<ProjectReference Include="..\Volo.Abp.Threading\Volo.Abp.Threading.csproj" />
</ItemGroup>

</Project>
Loading

0 comments on commit 120804d

Please sign in to comment.