Skip to content

Commit

Permalink
EdgeHub: Hook up client product info to upstream connections (#1103)
Browse files Browse the repository at this point in the history
* Add support ProductInfo store

* Fix ProductInfo check

* Add support for AMQP and fix tests

* Fix tests

* Add tests

* Add Tests

* Fix Indentity factory tests

* Update edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Service/DependencyManager.cs

Co-Authored-By: varunpuranik <[email protected]>

* Update edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Amqp/linkhandlers/LinkHandler.cs

Co-Authored-By: varunpuranik <[email protected]>

* Change string to EdgeHub

* Fix build
  • Loading branch information
varunpuranik authored May 16, 2019
1 parent 3dff47f commit 749b9b7
Show file tree
Hide file tree
Showing 43 changed files with 455 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ public DeviceBoundLinkHandler(
Uri requestUri,
IDictionary<string, string> boundVariables,
IConnectionHandler connectionHandler,
IMessageConverter<AmqpMessage> messageConverter)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter)
IMessageConverter<AmqpMessage> messageConverter,
IProductInfoStore productInfoStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ public EventsLinkHandler(
Uri requestUri,
IDictionary<string, string> boundVariables,
IConnectionHandler connectionHandler,
IMessageConverter<AmqpMessage> messageConverter)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter)
IMessageConverter<AmqpMessage> messageConverter,
IProductInfoStore productInfoStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore)
{
}

Expand Down Expand Up @@ -58,7 +59,7 @@ protected override async Task OnMessageReceived(AmqpMessage amqpMessage)
};

List<IMessage> outgoingMessages = messages.Select(m => this.MessageConverter.ToMessage(m)).ToList();
outgoingMessages.ForEach(m => this.AddMessageSystemProperties(m));
outgoingMessages.ForEach(this.AddMessageSystemProperties);
await this.DeviceListener.ProcessDeviceMessageBatchAsync(outgoingMessages);
Events.ProcessedMessages(messages, this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Amqp.LinkHandlers
public abstract class LinkHandler : ILinkHandler
{
readonly IConnectionHandler connectionHandler;
readonly IProductInfoStore productInfoStore;

protected LinkHandler(
IIdentity identity,
IAmqpLink link,
Uri requestUri,
IDictionary<string, string> boundVariables,
IConnectionHandler connectionHandler,
IMessageConverter<AmqpMessage> messageConverter)
IMessageConverter<AmqpMessage> messageConverter,
IProductInfoStore productInfoStore)
{
this.Identity = Preconditions.CheckNotNull(identity, nameof(identity));
this.MessageConverter = Preconditions.CheckNotNull(messageConverter, nameof(messageConverter));
Expand All @@ -30,6 +32,13 @@ protected LinkHandler(
this.LinkUri = Preconditions.CheckNotNull(requestUri, nameof(requestUri));
this.Link.SafeAddClosed(this.OnLinkClosed);
this.connectionHandler = Preconditions.CheckNotNull(connectionHandler, nameof(connectionHandler));
this.productInfoStore = Preconditions.CheckNotNull(productInfoStore, nameof(productInfoStore));

string clientVersion = null;
if (this.Link.Settings?.Properties?.TryGetValue(IotHubAmqpProperty.ClientVersion, out clientVersion) ?? false)
{
this.ClientVersion = Option.Maybe(clientVersion);
}
}

public IAmqpLink Link { get; }
Expand All @@ -50,6 +59,8 @@ protected LinkHandler(

protected IDictionary<string, string> BoundVariables { get; }

protected Option<string> ClientVersion { get; }

public async Task OpenAsync(TimeSpan timeout)
{
if (!await this.Authenticate())
Expand All @@ -72,7 +83,7 @@ public async Task CloseAsync(TimeSpan timeout)

protected abstract Task OnOpenAsync(TimeSpan timeout);

protected Task<bool> Authenticate()
protected async Task<bool> Authenticate()
{
IAmqpAuthenticator amqpAuth;
IAmqpConnection connection = this.Link.Session.Connection;
Expand All @@ -91,7 +102,15 @@ protected Task<bool> Authenticate()
throw new InvalidOperationException($"Unable to find authentication mechanism for AMQP connection for identity {this.Identity.Id}");
}

return amqpAuth.AuthenticateAsync(this.Identity.Id);
bool authenticated = await amqpAuth.AuthenticateAsync(this.Identity.Id);
if (authenticated)
{
await this.ClientVersion
.Filter(c => !string.IsNullOrWhiteSpace(c))
.ForEachAsync(c => this.productInfoStore.SetProductInfo(this.Identity.Id, c));
}

return authenticated;
}

protected virtual void OnLinkClosed(object sender, EventArgs args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ public class LinkHandlerProvider : ILinkHandlerProvider
readonly IMessageConverter<AmqpMessage> twinMessageConverter;
readonly IMessageConverter<AmqpMessage> methodMessageConverter;
readonly IIdentityProvider identityProvider;
readonly IProductInfoStore productInfoStore;
readonly IDictionary<(UriPathTemplate Template, bool IsReceiver), LinkType> templatesList;

public LinkHandlerProvider(
IMessageConverter<AmqpMessage> messageConverter,
IMessageConverter<AmqpMessage> twinMessageConverter,
IMessageConverter<AmqpMessage> methodMessageConverter,
IIdentityProvider identityProvider)
: this(messageConverter, twinMessageConverter, methodMessageConverter, identityProvider, DefaultTemplatesList)
IIdentityProvider identityProvider,
IProductInfoStore productInfoStore)
: this(messageConverter, twinMessageConverter, methodMessageConverter, identityProvider, productInfoStore, DefaultTemplatesList)
{
}

Expand All @@ -50,12 +52,14 @@ public LinkHandlerProvider(
IMessageConverter<AmqpMessage> twinMessageConverter,
IMessageConverter<AmqpMessage> methodMessageConverter,
IIdentityProvider identityProvider,
IProductInfoStore productInfoStore,
IDictionary<(UriPathTemplate Template, bool IsReceiver), LinkType> templatesList)
{
this.messageConverter = Preconditions.CheckNotNull(messageConverter, nameof(messageConverter));
this.twinMessageConverter = Preconditions.CheckNotNull(twinMessageConverter, nameof(twinMessageConverter));
this.methodMessageConverter = Preconditions.CheckNotNull(methodMessageConverter, nameof(methodMessageConverter));
this.identityProvider = Preconditions.CheckNotNull(identityProvider, nameof(identityProvider));
this.productInfoStore = Preconditions.CheckNotNull(productInfoStore, nameof(productInfoStore));
this.templatesList = Preconditions.CheckNotNull(templatesList, nameof(templatesList));
}

Expand All @@ -82,25 +86,25 @@ internal ILinkHandler GetLinkHandler(LinkType linkType, IAmqpLink link, Uri uri,
switch (linkType)
{
case LinkType.C2D:
return new DeviceBoundLinkHandler(identity, link as ISendingAmqpLink, uri, boundVariables, connectionHandler, this.messageConverter);
return new DeviceBoundLinkHandler(identity, link as ISendingAmqpLink, uri, boundVariables, connectionHandler, this.messageConverter, this.productInfoStore);

case LinkType.Events:
return new EventsLinkHandler(identity, link as IReceivingAmqpLink, uri, boundVariables, connectionHandler, this.messageConverter);
return new EventsLinkHandler(identity, link as IReceivingAmqpLink, uri, boundVariables, connectionHandler, this.messageConverter, this.productInfoStore);

case LinkType.ModuleMessages:
return new ModuleMessageLinkHandler(identity, link as ISendingAmqpLink, uri, boundVariables, connectionHandler, this.messageConverter);
return new ModuleMessageLinkHandler(identity, link as ISendingAmqpLink, uri, boundVariables, connectionHandler, this.messageConverter, this.productInfoStore);

case LinkType.MethodSending:
return new MethodSendingLinkHandler(identity, link as ISendingAmqpLink, uri, boundVariables, connectionHandler, this.methodMessageConverter);
return new MethodSendingLinkHandler(identity, link as ISendingAmqpLink, uri, boundVariables, connectionHandler, this.methodMessageConverter, this.productInfoStore);

case LinkType.MethodReceiving:
return new MethodReceivingLinkHandler(identity, link as IReceivingAmqpLink, uri, boundVariables, connectionHandler, this.methodMessageConverter);
return new MethodReceivingLinkHandler(identity, link as IReceivingAmqpLink, uri, boundVariables, connectionHandler, this.methodMessageConverter, this.productInfoStore);

case LinkType.TwinReceiving:
return new TwinReceivingLinkHandler(identity, link as IReceivingAmqpLink, uri, boundVariables, connectionHandler, this.twinMessageConverter);
return new TwinReceivingLinkHandler(identity, link as IReceivingAmqpLink, uri, boundVariables, connectionHandler, this.twinMessageConverter, this.productInfoStore);

case LinkType.TwinSending:
return new TwinSendingLinkHandler(identity, link as ISendingAmqpLink, uri, boundVariables, connectionHandler, this.twinMessageConverter);
return new TwinSendingLinkHandler(identity, link as ISendingAmqpLink, uri, boundVariables, connectionHandler, this.twinMessageConverter, this.productInfoStore);

default:
throw new InvalidOperationException($"Invalid link type {linkType}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ public MethodReceivingLinkHandler(
Uri requestUri,
IDictionary<string, string> boundVariables,
IConnectionHandler connectionHandler,
IMessageConverter<AmqpMessage> messageConverter)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter)
IMessageConverter<AmqpMessage> messageConverter,
IProductInfoStore productInfoStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ public MethodSendingLinkHandler(
Uri requestUri,
IDictionary<string, string> boundVariables,
IConnectionHandler connectionHandler,
IMessageConverter<AmqpMessage> messageConverter)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter)
IMessageConverter<AmqpMessage> messageConverter,
IProductInfoStore productInfoStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ public ModuleMessageLinkHandler(
Uri requestUri,
IDictionary<string, string> boundVariables,
IConnectionHandler connectionHandler,
IMessageConverter<AmqpMessage> messageConverter)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter)
IMessageConverter<AmqpMessage> messageConverter,
IProductInfoStore productInfoStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ protected ReceivingLinkHandler(
Uri requestUri,
IDictionary<string, string> boundVariables,
IConnectionHandler connectionHandler,
IMessageConverter<AmqpMessage> messageConverter)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter)
IMessageConverter<AmqpMessage> messageConverter,
IProductInfoStore productInfoStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore)
{
Preconditions.CheckArgument(link.IsReceiver, $"Link {requestUri} cannot receive");
this.ReceivingLink = link;
this.sendMessageProcessor = new ActionBlock<AmqpMessage>(s => this.ProcessMessageAsync(s));
this.sendMessageProcessor = new ActionBlock<AmqpMessage>(this.ProcessMessageAsync);
}

protected IReceivingAmqpLink ReceivingLink { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ protected SendingLinkHandler(
Uri requestUri,
IDictionary<string, string> boundVariables,
IConnectionHandler connectionHandler,
IMessageConverter<AmqpMessage> messageConverter)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter)
IMessageConverter<AmqpMessage> messageConverter,
IProductInfoStore productInfoStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore)
{
Preconditions.CheckArgument(!link.IsReceiver, $"Link {requestUri} cannot send");
this.SendingAmqpLink = link;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ public TwinReceivingLinkHandler(
Uri requestUri,
IDictionary<string, string> boundVariables,
IConnectionHandler connectionHandler,
IMessageConverter<AmqpMessage> messageConverter)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter)
IMessageConverter<AmqpMessage> messageConverter,
IProductInfoStore productInfoStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ public TwinSendingLinkHandler(
Uri requestUri,
IDictionary<string, string> boundVariables,
IConnectionHandler connectionHandler,
IMessageConverter<AmqpMessage> messageConverter)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter)
IMessageConverter<AmqpMessage> messageConverter,
IProductInfoStore productInfoStore)
: base(identity, link, requestUri, boundVariables, connectionHandler, messageConverter, productInfoStore)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class ClientTokenCloudConnection : CloudConnection, IClientTokenCloudConnection
ICloudListener cloudListener,
TimeSpan idleTimeout,
bool closeOnIdleTimeout,
TimeSpan operationTimeout)
TimeSpan operationTimeout,
string productInfo)
: base(
identity,
connectionStatusChangedHandler,
Expand All @@ -45,7 +46,8 @@ class ClientTokenCloudConnection : CloudConnection, IClientTokenCloudConnection
cloudListener,
idleTimeout,
closeOnIdleTimeout,
operationTimeout)
operationTimeout,
productInfo)
{
}

Expand All @@ -60,7 +62,8 @@ public static async Task<ClientTokenCloudConnection> Create(
ICloudListener cloudListener,
TimeSpan idleTimeout,
bool closeOnIdleTimeout,
TimeSpan operationTimeout)
TimeSpan operationTimeout,
string productInfo)
{
Preconditions.CheckNotNull(tokenCredentials, nameof(tokenCredentials));
var cloudConnection = new ClientTokenCloudConnection(
Expand All @@ -72,7 +75,8 @@ public static async Task<ClientTokenCloudConnection> Create(
cloudListener,
idleTimeout,
closeOnIdleTimeout,
operationTimeout);
operationTimeout,
productInfo);
ITokenProvider tokenProvider = new ClientTokenBasedTokenProvider(tokenCredentials, cloudConnection);
ICloudProxy cloudProxy = await cloudConnection.CreateNewCloudProxyAsync(tokenProvider);
cloudConnection.cloudProxy = Option.Some(cloudProxy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class CloudConnection : ICloudConnection
readonly TimeSpan idleTimeout;
readonly bool closeOnIdleTimeout;
readonly TimeSpan operationTimeout;
readonly string productInfo;
Option<ICloudProxy> cloudProxy;

protected CloudConnection(
Expand All @@ -34,7 +35,8 @@ protected CloudConnection(
ICloudListener cloudListener,
TimeSpan idleTimeout,
bool closeOnIdleTimeout,
TimeSpan operationTimeout)
TimeSpan operationTimeout,
string productInfo)
{
this.Identity = Preconditions.CheckNotNull(identity, nameof(identity));
this.ConnectionStatusChangedHandler = connectionStatusChangedHandler;
Expand All @@ -46,6 +48,7 @@ protected CloudConnection(
this.closeOnIdleTimeout = closeOnIdleTimeout;
this.cloudProxy = Option.None<ICloudProxy>();
this.operationTimeout = operationTimeout;
this.productInfo = productInfo;
}

public Option<ICloudProxy> CloudProxy => this.GetCloudProxy().Filter(cp => cp.IsActive);
Expand All @@ -70,7 +73,8 @@ public static async Task<CloudConnection> Create(
ITokenProvider tokenProvider,
TimeSpan idleTimeout,
bool closeOnIdleTimeout,
TimeSpan operationTimeout)
TimeSpan operationTimeout,
string productInfo)
{
Preconditions.CheckNotNull(tokenProvider, nameof(tokenProvider));
var cloudConnection = new CloudConnection(
Expand All @@ -82,7 +86,8 @@ public static async Task<CloudConnection> Create(
cloudListener,
idleTimeout,
closeOnIdleTimeout,
operationTimeout);
operationTimeout,
productInfo);
ICloudProxy cloudProxy = await cloudConnection.CreateNewCloudProxyAsync(tokenProvider);
cloudConnection.cloudProxy = Option.Some(cloudProxy);
return cloudConnection;
Expand Down Expand Up @@ -113,14 +118,10 @@ async Task<IClient> ConnectToIoTHub(ITokenProvider newTokenProvider)

client.SetOperationTimeoutInMilliseconds((uint)this.operationTimeout.TotalMilliseconds);
client.SetConnectionStatusChangedHandler(this.InternalConnectionStatusChangesHandler);

// TODO: Add support for ProductInfo
/*
if (!string.IsNullOrWhiteSpace(newCredentials.ProductInfo))
if (!string.IsNullOrWhiteSpace(this.productInfo))
{
client.SetProductInfo(newCredentials.ProductInfo);
client.SetProductInfo(this.productInfo);
}
*/

await client.OpenAsync();
Events.CreateDeviceClientSuccess(this.transportSettingsList, this.operationTimeout, this.Identity);
Expand Down
Loading

0 comments on commit 749b9b7

Please sign in to comment.