Skip to content

Commit

Permalink
Unsubscribe from connection loss events when closing service client A…
Browse files Browse the repository at this point in the history
…MQP connection (#3237)

There is no reason to notify users that the connection was lost when they are actively trying to close the connection.

I also made the other callbacks in these AMQP using clients work asynchronously just like #3224 did for the file upload notification processor callbacks.
  • Loading branch information
timtay-microsoft authored Apr 6, 2023
1 parent d5693a0 commit dd658b0
Show file tree
Hide file tree
Showing 13 changed files with 123 additions and 51 deletions.
4 changes: 2 additions & 2 deletions e2e/LongHaul/service/IotHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public async Task ReceiveMessageFeedbacksAsync(CancellationToken ct)
// It is important to note that receiver only gets feedback messages when the device is actively running and acting on messages.
_logger.Trace("Starting to listen to cloud-to-device feedback messages...", TraceSeverity.Verbose);

AcknowledgementType OnC2dMessageAck(FeedbackBatch feedbackMessages)
Task<AcknowledgementType> OnC2dMessageAck(FeedbackBatch feedbackMessages)
{
foreach (FeedbackRecord feedbackRecord in feedbackMessages.Records)
{
Expand All @@ -146,7 +146,7 @@ AcknowledgementType OnC2dMessageAck(FeedbackBatch feedbackMessages)
_totalFeedbackMessagesReceivedCount += feedbackMessages.Records.Count();
_logger.Metric(TotalFeedbackMessagesReceivedCount, _totalFeedbackMessagesReceivedCount);

return AcknowledgementType.Complete;
return Task.FromResult(AcknowledgementType.Complete);
}

s_serviceClient.MessageFeedback.MessageFeedbackProcessor = OnC2dMessageAck;
Expand Down
61 changes: 24 additions & 37 deletions e2e/Tests/iothub/service/FileUploadNotificationE2ETest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,48 +119,35 @@ async Task<AcknowledgementType> OnFileUploadNotificationReceived(FileUploadNotif
}

[TestMethod]
[DataRow(IotHubTransportProtocol.Tcp)]
[DataRow(IotHubTransportProtocol.WebSocket)]
public async Task FileUploadNotification_ErrorProcessor_ReceivesNotifications(IotHubTransportProtocol protocol)
public async Task FileUploadNotification_CloseGracefully_DoesNotExecuteConnectionLoss()
{
var options = new IotHubServiceClientOptions
// arrange
using var sender = new IotHubServiceClient(TestConfiguration.IotHub.ConnectionString);
bool connectionLossEventExecuted = false;
Func<ErrorContext, Task> OnConnectionLost = delegate
{
Protocol = protocol
// There is a small chance that this test's connection is interrupted by an actual
// network failure (when this callback should be executed), but the operations
// tested here are so quick that it should be safe to ignore that possibility.
connectionLossEventExecuted = true;
return Task.CompletedTask;
};
sender.FileUploadNotifications.ErrorProcessor = OnConnectionLost;

using var serviceClient = new IotHubServiceClient(TestConfiguration.IotHub.ConnectionString, options);

try
{
var errorProcessorNotified = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
serviceClient.FileUploadNotifications.FileUploadNotificationProcessor = (_) => Task.FromResult(_defaultAcknowledgementType);
serviceClient.FileUploadNotifications.ErrorProcessor = (errorContext) =>
{
VerboseTestLogger.WriteLine("Error processor fired.");
errorProcessorNotified.TrySetResult(true);
return Task.CompletedTask;
};

VerboseTestLogger.WriteLine("Opening client...");
await serviceClient.FileUploadNotifications.OpenAsync().ConfigureAwait(false);
VerboseTestLogger.WriteLine("Client opened.");

VerboseTestLogger.WriteLine("Client closing...");
await serviceClient.FileUploadNotifications.CloseAsync().ConfigureAwait(false);
VerboseTestLogger.WriteLine("Client closed.");

// The open file upload notification processor should be able to receive more than one
// file upload notification without closing and re-opening as long as there is more
// than one file upload notification to consume.
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(TestTimeoutMilliseconds));
await errorProcessorNotified.WaitAsync(cts.Token).ConfigureAwait(false);
errorProcessorNotified.Task.IsCompleted.Should().BeTrue();
}
finally
{
serviceClient.FileUploadNotifications.ErrorProcessor = null;
await serviceClient.FileUploadNotifications.CloseAsync().ConfigureAwait(false);
Task<AcknowledgementType> OnFileUploadNotificationReceivedAsync(FileUploadNotification fileUploadNotification)
{
// No file upload notifications belong to this test, so abandon any that it may receive
return Task.FromResult(AcknowledgementType.Abandon);
}
sender.FileUploadNotifications.FileUploadNotificationProcessor = OnFileUploadNotificationReceivedAsync;
await sender.FileUploadNotifications.OpenAsync().ConfigureAwait(false);

// act
await sender.FileUploadNotifications.CloseAsync().ConfigureAwait(false);

// assert
connectionLossEventExecuted.Should().BeFalse(
"One or more connection lost events were reported by the error processor unexpectedly");
}

private async Task UploadFile(string fileName, CancellationToken ct)
Expand Down
38 changes: 36 additions & 2 deletions e2e/Tests/iothub/service/MessageFeedbackReceiverE2ETest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ Task<MessageAcknowledgement> OnC2DMessageReceived(IncomingMessage message)
if (feedback.Records.Any(x => x.OriginalMessageId == message.MessageId))
{
feedbackMessageReceived.TrySetResult(true);
return AcknowledgementType.Complete;
return Task.FromResult(AcknowledgementType.Complete);
}

// Same hub as other tests, so we don't want to complete messages that aren't meant for us.
return AcknowledgementType.Abandon;
return Task.FromResult(AcknowledgementType.Abandon);
};
await serviceClient.MessageFeedback.OpenAsync().ConfigureAwait(false);

Expand Down Expand Up @@ -100,5 +100,39 @@ Task<MessageAcknowledgement> OnC2DMessageReceived(IncomingMessage message)
}
}
}

[TestMethod]
public async Task MessageFeedbackReceiver_CloseGracefully_DoesNotExecuteConnectionLoss()
{
// arrange
using var sender = new IotHubServiceClient(TestConfiguration.IotHub.ConnectionString);
bool connectionLossEventExecuted = false;
Func<ErrorContext, Task> OnConnectionLost = delegate
{
// There is a small chance that this test's connection is interrupted by an actual
// network failure (when this callback should be executed), but the operations
// tested here are so quick that it should be safe to ignore that possibility.
connectionLossEventExecuted = true;
return Task.CompletedTask;
};
sender.MessageFeedback.ErrorProcessor = OnConnectionLost;

Task<AcknowledgementType> OnFeedbackMessageReceivedAsync(FeedbackBatch feedbackBatch)
{
// No feedback messages belong to this test, so abandon any that it may receive
return Task.FromResult(AcknowledgementType.Abandon);
}
sender.MessageFeedback.MessageFeedbackProcessor = OnFeedbackMessageReceivedAsync;

await sender.MessageFeedback.OpenAsync().ConfigureAwait(false);

// act
await sender.MessageFeedback.CloseAsync().ConfigureAwait(false);

// assert
Assert.IsFalse(
connectionLossEventExecuted,
"One or more connection lost events were reported by the error processor unexpectedly");
}
}
}
26 changes: 26 additions & 0 deletions e2e/Tests/iothub/service/MessagingClientE2ETests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -417,5 +417,31 @@ await deviceClient
// assert
actualPayloadString.Should().Be(payload);
}

[TestMethod]
public async Task MessagingClient_CloseGracefully_DoesNotExecuteConnectionLoss()
{
// arrange
using var sender = new IotHubServiceClient(TestConfiguration.IotHub.ConnectionString);
bool connectionLossEventExecuted = false;
Func<ErrorContext, Task> OnConnectionLost = delegate
{
// There is a small chance that this test's connection is interrupted by an actual
// network failure (when this callback should be executed), but the operations
// tested here are so quick that it should be safe to ignore that possibility.
connectionLossEventExecuted = true;
return Task.CompletedTask;
};
sender.Messages.ErrorProcessor = OnConnectionLost;

await sender.Messages.OpenAsync().ConfigureAwait(false);

// act
await sender.Messages.CloseAsync().ConfigureAwait(false);

// assert
connectionLossEventExecuted.Should().BeFalse(
"One or more connection lost events were reported by the error processor unexpectedly");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private async Task ReceiveMessageFeedbacksAsync(CancellationToken token)
// It is important to note that receiver only gets feedback messages when the device is actively running and acting on messages.
_logger.LogInformation("Starting to listen to feedback messages");

AcknowledgementType OnC2dMessageAck(FeedbackBatch feedbackMessages)
Task<AcknowledgementType> OnC2dMessageAck(FeedbackBatch feedbackMessages)
{
AcknowledgementType ackType = AcknowledgementType.Abandon;

Expand All @@ -72,7 +72,7 @@ AcknowledgementType OnC2dMessageAck(FeedbackBatch feedbackMessages)
_logger.LogInformation($"\tDevice {feedbackRecord.DeviceId} acted on message: {feedbackRecord.OriginalMessageId} with status: {feedbackRecord.StatusCode}");
}

return ackType;
return Task.FromResult(ackType);
}

s_serviceClient.MessageFeedback.MessageFeedbackProcessor = OnC2dMessageAck;
Expand Down
2 changes: 2 additions & 0 deletions iothub/service/src/Amqp/AmqpConnectionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ internal virtual async Task CloseAsync(CancellationToken cancellationToken)

try
{
_connection.Closed -= _connectionLossHandler;

_cbsSession?.Close(); // not async because the cbs link type only has a sync close API

if (_workerSession != null)
Expand Down
1 change: 1 addition & 0 deletions iothub/service/src/Amqp/AmqpReceivingLinkHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ internal async Task CloseAsync(CancellationToken cancellationToken)
{
if (_receivingLink != null)
{
_receivingLink.Closed -= _connectionLossHandler;
await _receivingLink.CloseAsync(cancellationToken);
}
}
Expand Down
1 change: 1 addition & 0 deletions iothub/service/src/Amqp/AmqpSendingLinkHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public async Task CloseAsync(CancellationToken cancellationToken)
{
if (_sendingLink != null)
{
_sendingLink.Closed -= _connectionLossHandler;
await _sendingLink.CloseAsync(cancellationToken).ConfigureAwait(false);
}
}
Expand Down
2 changes: 2 additions & 0 deletions iothub/service/src/Amqp/AmqpSessionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ internal async Task CloseAsync(CancellationToken cancellationToken)

try
{
_session.Closed -= _connectionLossHandler;

if (_sendingLinkHandler != null)
{
await _sendingLinkHandler.CloseAsync(cancellationToken).ConfigureAwait(false);
Expand Down
15 changes: 11 additions & 4 deletions iothub/service/src/Feedback/MessageFeedbackProcessorClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ internal MessageFeedbackProcessorClient(
/// }
/// </code>
/// </example>
public Func<FeedbackBatch, AcknowledgementType> MessageFeedbackProcessor { get; set; }
public Func<FeedbackBatch, Task<AcknowledgementType>> MessageFeedbackProcessor { get; set; }

/// <summary>
/// The callback to be executed when the connection is lost.
Expand All @@ -101,12 +101,19 @@ internal MessageFeedbackProcessorClient(
///
/// public void OnConnectionLost(ErrorContext errorContext)
/// {
/// // Add reconnection logic as needed
/// Console.WriteLine("Feedback message processor connection lost")
///
/// // Add reconnection logic as needed, for example:
/// await serviceClient.MessageFeedbackProcessor.OpenAsync();
/// }
/// </code>
/// </example>
public Action<ErrorContext> ErrorProcessor { get; set; }
/// <remarks>
/// This callback will not receive events once <see cref="CloseAsync(CancellationToken)"/> is called.
/// This callback will start receiving events again once <see cref="OpenAsync(CancellationToken)"/> is called.
/// This callback will persist across any number of open/close/open calls, so it does not need to be set before each open call.
/// </remarks>
public Func<ErrorContext, Task> ErrorProcessor { get; set; }

/// <summary>
/// Open the connection and start receiving acknowledgements for messages sent.
Expand Down Expand Up @@ -223,7 +230,7 @@ private async void OnFeedbackMessageReceivedAsync(AmqpMessage amqpMessage)
amqpMessage.Properties.UserId.Count)
};

AcknowledgementType ack = MessageFeedbackProcessor.Invoke(feedbackBatch);
AcknowledgementType ack = await MessageFeedbackProcessor.Invoke(feedbackBatch);
if (ack == AcknowledgementType.Complete)
{
await _amqpConnection.CompleteMessageAsync(amqpMessage.DeliveryTag).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ internal FileUploadNotificationProcessorClient(
/// }
/// </code>
/// </example>
/// <remarks>
/// This callback will not receive events once <see cref="CloseAsync(CancellationToken)"/> is called.
/// This callback will start receiving events again once <see cref="OpenAsync(CancellationToken)"/> is called.
/// This callback will persist across any number of open/close/open calls, so it does not need to be set before each open call.
/// </remarks>
public Func<ErrorContext, Task> ErrorProcessor { get; set; }

/// <summary>
Expand Down
13 changes: 10 additions & 3 deletions iothub/service/src/Messaging/MessagesClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,19 @@ internal MessagesClient(
///
/// public void OnConnectionLost(ErrorContext errorContext)
/// {
/// // Add reconnection logic as needed
/// Console.WriteLine("Messaging client connection lost")
/// Console.WriteLine("Messaging client connection lost");
///
/// // Add reconnection logic as needed, for example:
/// await serviceClient.Messaging.OpenAsync();
/// }
/// </code>
/// </example>
public Action<ErrorContext> ErrorProcessor { get; set; }
/// <remarks>
/// This callback will not receive events once <see cref="CloseAsync(CancellationToken)"/> is called.
/// This callback will start receiving events again once <see cref="OpenAsync(CancellationToken)"/> is called.
/// This callback will persist across any number of open/close/open calls, so it does not need to be set before each open call.
/// </remarks>
public Func<ErrorContext, Task> ErrorProcessor { get; set; }

/// <summary>
/// Open the connection. Must be done before any cloud-to-device messages can be sent.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public async Task MessageFeedbackProcessorClient_OpenAsync_Ok()
s_retryHandler,
mockAmqpConnectionHandler.Object);

AcknowledgementType messageFeedbackProcessor(FeedbackBatch FeedbackBatch) => AcknowledgementType.Complete;
Task<AcknowledgementType> messageFeedbackProcessor(FeedbackBatch FeedbackBatch) => Task.FromResult(AcknowledgementType.Complete);

messageFeedbackProcessorClient.MessageFeedbackProcessor = messageFeedbackProcessor;

Expand Down

0 comments on commit dd658b0

Please sign in to comment.