Skip to content

Commit

Permalink
Merge pull request #9 from keesschollaart81/dev
Browse files Browse the repository at this point in the history
V1.1.0
  • Loading branch information
keesschollaart81 authored Nov 3, 2018
2 parents 7055710 + 2f11980 commit 286c3aa
Show file tree
Hide file tree
Showing 20 changed files with 162 additions and 123 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using CaseOnline.Azure.WebJobs.Extensions.Mqtt.Config;

namespace CaseOnline.Azure.WebJobs.Extensions.Mqtt.Bindings
{
public interface IMqttConfigurationParser
{
MqttConfiguration Parse(MqttBaseAttribute mqttAttribute);
}
}
Original file line number Diff line number Diff line change
@@ -1,60 +1,59 @@
using System;
using System;
using CaseOnline.Azure.WebJobs.Extensions.Mqtt.Config;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Logging;
using Microsoft.Extensions.Logging;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;

namespace CaseOnline.Azure.WebJobs.Extensions.Mqtt.Bindings
{
/// <summary>
/// Converts a <see cref="MqttTriggerAttribute"/> to a <see cref="MqttConfiguration"/>.
/// Parses a <see cref="MqttTriggerAttribute"/> to a <see cref="MqttConfiguration"/>.
/// </summary>
public class AttributeToConfigConverter
public class MqttConfigurationParser : IMqttConfigurationParser
{
private const string DefaultAppsettingsKeyForConnectionString = "MqttConnection";

private readonly TimeSpan _detaultReconnectTime = TimeSpan.FromSeconds(5);
private readonly IRquireMqttConnection _mqttTriggerAttribute;
private readonly TimeSpan _defaultReconnectTime = TimeSpan.FromSeconds(5);
private readonly INameResolver _nameResolver;
private readonly ILogger _logger;

/// <summary>
/// Initializes a new instance of the <see cref="AttributeToConfigConverter"/> class.
/// Initializes a new instance of the <see cref="MqttConfigurationParser"/> class.
/// </summary>
/// <param name="source">The trigger attribute.</param>
/// <param name="nameResolver">The name resolver.</param>
/// <param name="logger">The logger.</param>
public AttributeToConfigConverter(IRquireMqttConnection source, INameResolver nameResolver, ILogger logger)
{
_mqttTriggerAttribute = source;
/// <param name="loggerFactory">The logger factory.</param>
public MqttConfigurationParser(INameResolver nameResolver, ILoggerFactory loggerFactory)
{
_nameResolver = nameResolver;
_logger = logger;
_logger = loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("Mqtt"));
}

/// <summary>
/// Gets the MQTT configuration from this attribute.
/// Gets the MQTT configuration from the given attribute.
/// </summary>
/// <param name="mqttAttribute">The attribute to parse from.</param>
/// <returns>
/// The configuration.
/// </returns>
public MqttConfiguration GetMqttConfiguration()
public MqttConfiguration Parse(MqttBaseAttribute mqttAttribute)
{
return _mqttTriggerAttribute.MqttConfigCreatorType == null
? GetConfigurationViaAttributeValues()
: GetConfigurationViaCustomConfigCreator();
return mqttAttribute.MqttConfigCreatorType == null
? GetConfigurationViaAttributeValues(mqttAttribute)
: GetConfigurationViaCustomConfigCreator(mqttAttribute);
}

private MqttConfiguration GetConfigurationViaAttributeValues()
private MqttConfiguration GetConfigurationViaAttributeValues(MqttBaseAttribute mqttAttribute)
{
var name = _mqttTriggerAttribute.ConnectionString ?? DefaultAppsettingsKeyForConnectionString;
var connectionString = _nameResolver.Resolve(_mqttTriggerAttribute.ConnectionString) ?? _nameResolver.Resolve(DefaultAppsettingsKeyForConnectionString);
var name = mqttAttribute.ConnectionString ?? DefaultAppsettingsKeyForConnectionString;
var connectionString = _nameResolver.Resolve(mqttAttribute.ConnectionString) ?? _nameResolver.Resolve(DefaultAppsettingsKeyForConnectionString);
var mqttConnectionString = new MqttConnectionString(connectionString);

var mqttClientOptions = GetMqttClientOptions(mqttConnectionString);

var managedMqttClientOptions = new ManagedMqttClientOptionsBuilder()
.WithAutoReconnectDelay(_detaultReconnectTime)
.WithAutoReconnectDelay(_defaultReconnectTime)
.WithClientOptions(mqttClientOptions)
.Build();

Expand Down Expand Up @@ -82,17 +81,17 @@ private IMqttClientOptions GetMqttClientOptions(MqttConnectionString connectionS
return mqttClientOptionsBuilder.Build();
}

private MqttConfiguration GetConfigurationViaCustomConfigCreator()
private MqttConfiguration GetConfigurationViaCustomConfigCreator(MqttBaseAttribute mqttAttribute)
{
CustomMqttConfig customConfig;
try
{
var customConfigCreator = (ICreateMqttConfig)Activator.CreateInstance(_mqttTriggerAttribute.MqttConfigCreatorType);
var customConfigCreator = (ICreateMqttConfig)Activator.CreateInstance(mqttAttribute.MqttConfigCreatorType);
customConfig = customConfigCreator.Create(_nameResolver, _logger);
}
catch (Exception ex)
{
throw new InvalidCustomConfigCreatorException($"Unexpected exception while getting creating a config via type {_mqttTriggerAttribute.MqttConfigCreatorType.FullName}", ex);
throw new InvalidCustomConfigCreatorException($"Unexpected exception while getting creating a config via type {mqttAttribute.MqttConfigCreatorType.FullName}", ex);
}

return new MqttConfiguration(customConfig.Name, customConfig.Options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System.Threading.Tasks;
using CaseOnline.Azure.WebJobs.Extensions.Mqtt.Config;
using CaseOnline.Azure.WebJobs.Extensions.Mqtt.Messaging;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host.Triggers;
using Microsoft.Azure.WebJobs.Logging;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -45,12 +44,19 @@ public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext contex
}

_logger.LogDebug($"Creating binding for parameter '{context.Parameter.Name}'");
try
{
var mqttTriggerBinding = GetMqttTriggerBinding(context.Parameter, mqttTriggerAttribute);

ITriggerBinding mqttTriggerBinding = GetMqttTriggerBinding(context.Parameter, mqttTriggerAttribute);

_logger.LogDebug($"Succesfully created binding for parameter '{context.Parameter.Name}'");
_logger.LogDebug($"Succesfully created binding for parameter '{context.Parameter.Name}'");

return Task.FromResult(mqttTriggerBinding);
return Task.FromResult(mqttTriggerBinding);
}
catch (Exception ex)
{
_logger.LogError(ex, $"Unhandled exception while binding trigger '{context.Parameter.Name}'");
throw;
}
}

private static MqttTriggerAttribute GetMqttTriggerAttribute(ParameterInfo parameter)
Expand All @@ -70,7 +76,7 @@ private static MqttTriggerAttribute GetMqttTriggerAttribute(ParameterInfo parame
return mqttTriggerAttribute;
}

private MqttTriggerBinding GetMqttTriggerBinding(ParameterInfo parameter, MqttTriggerAttribute mqttTriggerAttribute)
private ITriggerBinding GetMqttTriggerBinding(ParameterInfo parameter, MqttTriggerAttribute mqttTriggerAttribute)
{
TopicFilter[] topics;
var mqttConnection = _connectionFactory.GetMqttConnection(mqttTriggerAttribute);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
<PackageProjectUrl>https://github.com/keesschollaart81/CaseOnline.Azure.WebJobs.Extensions.Mqtt</PackageProjectUrl>
<RepositoryUrl>https://github.com/keesschollaart81/CaseOnline.Azure.WebJobs.Extensions.Mqtt</RepositoryUrl>
<RepositoryType>git</RepositoryType>
<AllowedOutputExtensionsInPackageBuildOutputFolder>$(AllowedOutputExtensionsInPackageBuildOutputFolder);.pdb</AllowedOutputExtensionsInPackageBuildOutputFolder>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
<PackageLicenseUrl>https://github.com/keesschollaart81/CaseOnline.Azure.WebJobs.Extensions.Mqtt/blob/master/LICENSE</PackageLicenseUrl>
<VersionPrefix>1.0.0</VersionPrefix>
<VersionPrefix>1.1.0</VersionPrefix>
<VersionSuffix></VersionSuffix>
<PackageTags>azure functions mqtt webjobs trigger binding function webjob</PackageTags>
<AssemblyName>CaseOnline.Azure.WebJobs.Extensions.Mqtt</AssemblyName>
Expand All @@ -27,7 +30,7 @@ For more information, please visit https://github.com/keesschollaart81/CaseOnlin
<PackageIconUrl>https://raw.githubusercontent.com/keesschollaart81/CaseOnline.Azure.WebJobs.Extensions.Mqtt/master/logo.png</PackageIconUrl>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<CodeAnalysisRuleSet>analyzers.ruleset</CodeAnalysisRuleSet>
<Features>IOperation</Features>
<TreatWarningsAsErrors>True</TreatWarningsAsErrors>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<DebugType>Full</DebugType>
Expand All @@ -37,21 +40,22 @@ For more information, please visit https://github.com/keesschollaart81/CaseOnlin
<AdditionalFiles Include="stylecop.json" Link="stylecop.json" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="MQTTnet.Extensions.ManagedClient" Version="2.8.2" />
<PackageReference Include="MQTTnet.Extensions.ManagedClient" Version="2.8.4" />
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
<PackageReference Include="SonarAnalyzer.CSharp" Version="7.5.0.6605">
<PackageReference Include="SonarAnalyzer.CSharp" Version="7.8.0.7320">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference>
<PackageReference Include="StyleCop.Analyzers" Version="1.1.0-beta008">
<PackageReference Include="StyleCop.Analyzers" Version="1.1.0-beta009">
<PrivateAssets>All</PrivateAssets>
</PackageReference>
<PackageReference Include="Microsoft.CodeAnalysis.FxCopAnalyzers" Version="2.6.1">
<PackageReference Include="Microsoft.CodeAnalysis.FxCopAnalyzers" Version="2.6.2">
<PrivateAssets>All</PrivateAssets>
</PackageReference>
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions" Version="3.0.0" />
<PackageReference Include="MQTTnet" Version="2.8.2" />
<PackageReference Include="MQTTnet" Version="2.8.4" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0-beta-63127-02" PrivateAssets="All" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ public interface IMqttConnectionFactory
{
Task DisconnectAll();

MqttConnection GetMqttConnection(IRquireMqttConnection attribute);
MqttConnection GetMqttConnection(MqttBaseAttribute attribute);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System.Threading.Tasks;
using CaseOnline.Azure.WebJobs.Extensions.Mqtt.Bindings;
using CaseOnline.Azure.WebJobs.Extensions.Mqtt.Listeners;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Logging;
using Microsoft.Extensions.Logging;

Expand All @@ -14,45 +13,61 @@ public class MqttConnectionFactory : IMqttConnectionFactory
{
private readonly ILogger _logger;
private readonly IManagedMqttClientFactory _mqttFactory;
private readonly INameResolver _nameResolver;
private readonly IMqttConfigurationParser _mqttConfigurationParser;

private readonly ConcurrentDictionary<string, MqttConnection> _mqttConnections = new ConcurrentDictionary<string, MqttConnection>();
private readonly ConcurrentDictionary<string, MqttConnectionEntry> _mqttConnections = new ConcurrentDictionary<string, MqttConnectionEntry>();

public MqttConnectionFactory(ILoggerFactory loggerFactory, IManagedMqttClientFactory mqttFactory, INameResolver nameResolver)
public MqttConnectionFactory(ILoggerFactory loggerFactory, IManagedMqttClientFactory mqttFactory, IMqttConfigurationParser mqttConfigurationParser)
{
_mqttFactory = mqttFactory;
_nameResolver = nameResolver;
_mqttConfigurationParser = mqttConfigurationParser;
_logger = loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("Mqtt"));
}

public MqttConnectionFactory()
public MqttConnection GetMqttConnection(MqttBaseAttribute attribute)
{
}

public MqttConnection GetMqttConnection(IRquireMqttConnection attribute)
{
var attributeToConfigConverter = new AttributeToConfigConverter(attribute, _nameResolver, _logger);
var mqttConfiguration = attributeToConfigConverter.GetMqttConfiguration();
var mqttConfiguration = _mqttConfigurationParser.Parse(attribute);
if (_mqttConnections.ContainsKey(mqttConfiguration.Name) && attribute is MqttTriggerAttribute)
{
throw new InvalidOperationException($"Error setting up listener for this attribute. Connectionstring '{mqttConfiguration.Name}' is already used by another Trigger. Connections can only be reused for output bindings. Each trigger needs it own connectionstring");
if (_mqttConnections[mqttConfiguration.Name].UsedByTrigger)
{
throw new InvalidOperationException($"Error setting up listener for this attribute. Connectionstring '{mqttConfiguration.Name}' is already used by another Trigger. Connections can only be reused for output bindings. Each trigger needs it own connectionstring");
}
else
{
_mqttConnections[mqttConfiguration.Name].UsedByTrigger = true;
}
}
var connection = _mqttConnections.GetOrAdd(mqttConfiguration.Name, (c) => new MqttConnection(_mqttFactory, mqttConfiguration, _logger));
return connection;
var connection = _mqttConnections.GetOrAdd(mqttConfiguration.Name, (c) => new MqttConnectionEntry(new MqttConnection(_mqttFactory, mqttConfiguration, _logger)));
return connection.MqttConnection;
}

internal bool AllConnectionsConnected()
{
return _mqttConnections.All(x => x.Value.ConnectionState == ConnectionState.Connected);
return _mqttConnections.All(x => x.Value.MqttConnection.ConnectionState == ConnectionState.Connected);
}

public async Task DisconnectAll()
{
foreach (var connection in _mqttConnections)
{
await connection.Value.StopAsync().ConfigureAwait(false);
connection.Value.Dispose();
await connection.Value.MqttConnection.StopAsync().ConfigureAwait(false);
connection.Value.MqttConnection.Dispose();
}
_mqttConnections.Clear();
}

private class MqttConnectionEntry
{
public MqttConnectionEntry(MqttConnection mqttConnection)
{
MqttConnection = mqttConnection;
UsedByTrigger = false;
}

public MqttConnection MqttConnection { get; set; }

public bool UsedByTrigger { get; set; }
}
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
using CaseOnline.Azure.WebJobs.Extensions.Mqtt.Bindings;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Description;
using Microsoft.Azure.WebJobs.Host.Config;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging;

namespace CaseOnline.Azure.WebJobs.Extensions.Mqtt.Config
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using CaseOnline.Azure.WebJobs.Extensions.Mqtt.Bindings;
using CaseOnline.Azure.WebJobs.Extensions.Mqtt.Listeners;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.DependencyInjection;
Expand All @@ -25,6 +26,7 @@ public static IWebJobsBuilder AddMqtt(this IWebJobsBuilder builder)
builder.Services.AddTransient<IMqttClientFactory, MqttFactory>();
builder.Services.AddTransient<IManagedMqttClientFactory, ManagedMqttClientFactory>();
builder.Services.AddSingleton<IMqttConnectionFactory, MqttConnectionFactory>();
builder.Services.AddTransient<IMqttConfigurationParser, MqttConfigurationParser>();
builder.AddExtension<MqttExtensionConfigProvider>();

return builder;
Expand Down

This file was deleted.

9 changes: 2 additions & 7 deletions src/CaseOnline.Azure.WebJobs.Extensions.Mqtt/MqttAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,14 @@ namespace CaseOnline.Azure.WebJobs.Extensions.Mqtt
{
[AttributeUsage(AttributeTargets.Parameter | AttributeTargets.ReturnValue)]
[Binding]
public class MqttAttribute : Attribute, IRquireMqttConnection
public class MqttAttribute : MqttBaseAttribute
{
public MqttAttribute()
{
}

public MqttAttribute(Type mqttConfigCreatorType)
public MqttAttribute(Type mqttConfigCreatorType) : base(mqttConfigCreatorType)
{
MqttConfigCreatorType = mqttConfigCreatorType;
}

public string ConnectionString { get; set; }

public Type MqttConfigCreatorType { get; }
}
}
20 changes: 20 additions & 0 deletions src/CaseOnline.Azure.WebJobs.Extensions.Mqtt/MqttBaseAttribute.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System;

namespace CaseOnline.Azure.WebJobs.Extensions.Mqtt
{
public abstract class MqttBaseAttribute : Attribute
{
protected MqttBaseAttribute()
{
}

protected MqttBaseAttribute(Type mqttConfigCreatorType)
{
MqttConfigCreatorType = mqttConfigCreatorType;
}

public string ConnectionString { get; set; }

public Type MqttConfigCreatorType { get; protected set; }
}
}
Loading

0 comments on commit 286c3aa

Please sign in to comment.