Skip to content

Commit

Permalink
Fixing the name resolver for custom config provider (addresses #12)
Browse files Browse the repository at this point in the history
  • Loading branch information
keesschollaart81 committed Mar 22, 2019
1 parent b35390b commit 3641511
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
using System.Threading.Tasks;
using CaseOnline.Azure.WebJobs.Extensions.Mqtt.Config;
using CaseOnline.Azure.WebJobs.Extensions.Mqtt.Messaging;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Triggers;
using Microsoft.Azure.WebJobs.Logging;
using Microsoft.Extensions.Logging;
Expand All @@ -17,16 +19,19 @@ namespace CaseOnline.Azure.WebJobs.Extensions.Mqtt.Bindings
public class MqttTriggerAttributeBindingProvider : ITriggerBindingProvider
{
private readonly IMqttConnectionFactory _connectionFactory;
private readonly INameResolver _nameResolver;
private readonly ILogger _logger;

/// <summary>
/// Initializes a new instance of the <see cref="MqttTriggerAttribute"/>.
/// </summary>
/// <param name="connectionFactory">the connection factory.</param>
/// <param name="loggerFactory">The loggerFactory.</param>
internal MqttTriggerAttributeBindingProvider(IMqttConnectionFactory connectionFactory, ILoggerFactory loggerFactory)
/// <param name="nameResolver">The nameResolver.</param>
internal MqttTriggerAttributeBindingProvider(IMqttConnectionFactory connectionFactory, ILoggerFactory loggerFactory, INameResolver nameResolver)
{
_connectionFactory = connectionFactory;
_nameResolver = nameResolver;
_logger = loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("Mqtt"));
}

Expand Down Expand Up @@ -82,7 +87,11 @@ private ITriggerBinding GetMqttTriggerBinding(ParameterInfo parameter, MqttTrigg
var mqttConnection = _connectionFactory.GetMqttConnection(mqttTriggerAttribute);
try
{
topics = mqttTriggerAttribute.Topics.Select(t => new TopicFilter(t, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce)).ToArray();
topics = mqttTriggerAttribute.Topics.Select(t =>
{
var topicString = (mqttTriggerAttribute.MqttConfigCreatorType != null) ? _nameResolver.ResolveWholeString(t) : t;
return new TopicFilter(topicString, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);
}).ToArray();
}
catch (Exception ex)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using CaseOnline.Azure.WebJobs.Extensions.Mqtt.Bindings;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Description;
using Microsoft.Azure.WebJobs.Host.Config;
using Microsoft.Extensions.Logging;
Expand All @@ -13,11 +14,13 @@ public class MqttExtensionConfigProvider : IExtensionConfigProvider
{
private readonly ILoggerFactory _loggerFactory;
private readonly IMqttConnectionFactory _mqttConnectionFactory;
private readonly INameResolver _nameResolver;

public MqttExtensionConfigProvider(ILoggerFactory loggerFactory, IMqttConnectionFactory mqttConnectionFactory)
public MqttExtensionConfigProvider(ILoggerFactory loggerFactory, IMqttConnectionFactory mqttConnectionFactory, INameResolver nameResolver)
{
_loggerFactory = loggerFactory;
_mqttConnectionFactory = mqttConnectionFactory;
_nameResolver = nameResolver;
}

/// <summary>
Expand All @@ -32,7 +35,7 @@ public void Initialize(ExtensionConfigContext context)
return new MqttMessageCollector(_mqttConnectionFactory.GetMqttConnection(attr));
});

var bindingProvider = new MqttTriggerAttributeBindingProvider(_mqttConnectionFactory, _loggerFactory);
var bindingProvider = new MqttTriggerAttributeBindingProvider(_mqttConnectionFactory, _loggerFactory, _nameResolver);
context.AddBindingRule<MqttTriggerAttribute>()
.BindToTrigger(bindingProvider);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ public static void Testert2([MqttTrigger("test/topic2")] IMqttMessage mqttMessag
}
}


[Fact]
public async Task MultipleTriggersReceiveOwnMessages()
{
Expand Down Expand Up @@ -251,5 +250,40 @@ public static void Testert2([MqttTrigger("test/topic2", ConnectionString = "Mqtt
LastReceivedMessageFunction2 = mqttMessage;
}
}

[Fact]
public async Task CustomMqttConfigProviderGetsTriggered()
{
var message = new MqttApplicationMessageBuilder()
.WithTopic("TestTopic/random")
.WithPayload("{ \"test\":\"case\" }")
.WithAtLeastOnceQoS()
.Build();

using (var mqttServer = await MqttServerHelper.Get(_logger))
using (var jobHost = await JobHostHelper<CustomMqttConfigProviderTestFunction>.RunFor(_testLoggerProvider))
{
await mqttServer.PublishAsync(message);

await WaitFor(() => CustomMqttConfigProviderTestFunction.CallCount >= 1);
}

Assert.Equal(1, CustomMqttConfigProviderTestFunction.CallCount);
Assert.Equal("TestTopic/random", CustomMqttConfigProviderTestFunction.LastReceivedMessage.Topic);
var messageBody = Encoding.UTF8.GetString(CustomMqttConfigProviderTestFunction.LastReceivedMessage.GetMessage());
Assert.Equal("{ \"test\":\"case\" }", messageBody);
}

private class CustomMqttConfigProviderTestFunction
{
public static int CallCount = 0;
public static IMqttMessage LastReceivedMessage;

public static void Testert([MqttTrigger(typeof(TestMqttConfigProvider), "%TopicName%/#")] IMqttMessage mqttMessage)
{
CallCount++;
LastReceivedMessage = mqttMessage;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using CaseOnline.Azure.WebJobs.Extensions.Mqtt.Config;
using MQTTnet.Extensions.ManagedClient;

namespace CaseOnline.Azure.WebJobs.Extensions.Mqtt.Tests.Helpers
{
public class MqttConfigExample : CustomMqttConfig
{
public override IManagedMqttClientOptions Options { get; }

public override string Name { get; }

public MqttConfigExample(string name, IManagedMqttClientOptions options)
{
Options = options;
Name = name;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using System;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Client;
using CaseOnline.Azure.WebJobs.Extensions.Mqtt.Config;
using CaseOnline.Azure.WebJobs.Extensions.Mqtt.Bindings;

namespace CaseOnline.Azure.WebJobs.Extensions.Mqtt.Tests.Helpers
{
public class TestMqttConfigProvider : ICreateMqttConfig
{
public CustomMqttConfig Create(INameResolver nameResolver, ILogger logger)
{
var connectionString = new MqttConnectionString(nameResolver.Resolve("MqttConnectionWithCustomClientId"));

var options = new ManagedMqttClientOptionsBuilder()
.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
.WithClientOptions(new MqttClientOptionsBuilder()
.WithClientId(connectionString.ClientId.ToString())
.WithTcpServer(connectionString.Server, connectionString.Port)
.WithCredentials(connectionString.Username, connectionString.Password)
.Build())
.Build();


return new MqttConfigExample("CustomConnection", options);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
"MqttConnectionWithCustomClientId": "Server=localhost;ClientId=Custom",
"MqttConnectionWithCustomPort": "Server=localhost;Port=1337",
"MqttConnectionWithUsernameAndPassword": "Server=localhost;Username=admin;Password=Welkom123",
"MqttConnectionWithTls": "Server=localhost;Tls=True"
"MqttConnectionWithTls": "Server=localhost;Tls=True",
"TopicName": "TestTopic"
}

0 comments on commit 3641511

Please sign in to comment.