Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SessionReceiverOptions/SessionProcessorOptions #12383

Merged
merged 6 commits into from
Jun 4, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ byte[] state = await receiver.GetSessionStateAsync();
// create a receiver specifying a particular session
ServiceBusSessionReceiver receiver = await client.CreateSessionReceiverAsync(
queueName,
sessionId: "Session2");
new ServiceBusSessionReceiverOptions
{
SessionId = "Session2"
});

// the received message is a different type as it contains some service set properties
ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ messageBatch.TryAdd(
await sender.SendAsync(messageBatch);

// get the options to use for configuring the processor
var options = new ServiceBusProcessorOptions
var options = new ServiceBusSessionProcessorOptions
{
// By default after the message handler returns, the processor will complete the message
// If I want more fine-grained control over settlement, I can set this to false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ messageBatch.TryAdd(
await sender.SendAsync(messageBatch);

// get the options to use for configuring the processor
var options = new ServiceBusProcessorOptions
var options = new ServiceBusSessionProcessorOptions
{
// By default after the message handler returns, the processor will complete the message
// If I want more fine-grained control over settlement, I can set this to false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ internal class AmqpConnectionScope : TransportConnectionScope
/// <summary>The URI scheme to apply when using web sockets for service communication.</summary>
private const string WebSocketsUriScheme = "wss";

/// <summary>The string formatting mask to apply to the service endpoint to consume events for a given consumer group and partition.</summary>
private const string ConsumerPathSuffixMask = "{0}/ConsumerGroups/{1}/Partitions/{2}";

/// <summary>The string formatting mask to apply to the service endpoint to publish events for a given partition.</summary>
private const string PartitionProducerPathSuffixMask = "{0}/Partitions/{1}";

/// <summary>
/// The version of AMQP to use within the scope.
/// </summary>
Expand Down Expand Up @@ -160,9 +154,11 @@ public AmqpConnectionScope(Uri serviceEndpoint,
Transport = transport;
Proxy = proxy;
TokenProvider = new CbsTokenProvider(new ServiceBusTokenCredential(credential, serviceEndpoint.ToString()), OperationCancellationSource.Token);
Id = identifier ?? $"{ ServiceEndpoint }-{ Guid.NewGuid().ToString("D").Substring(0, 8) }";
Id = identifier ?? $"{ ServiceEndpoint }-{ Guid.NewGuid().ToString("D", CultureInfo.InvariantCulture).Substring(0, 8) }";

#pragma warning disable CA2214 // Do not call overridable methods in constructors. This internal method is virtual for testing purposes.
Task<AmqpConnection> connectionFactory(TimeSpan timeout) => CreateAndOpenConnectionAsync(AmqpVersion, ServiceEndpoint, Transport, Proxy, Id, timeout);
#pragma warning restore CA2214 // Do not call overridable methods in constructors
ActiveConnection = new FaultTolerantAmqpObject<AmqpConnection>(
connectionFactory,
CloseConnection);
Expand Down Expand Up @@ -411,7 +407,9 @@ protected virtual async Task<AmqpConnection> CreateAndOpenConnectionAsync(
// Create the CBS link that will be used for authorization. The act of creating the link will associate
// it with the connection.

#pragma warning disable CA1806 // Do not ignore method results
new AmqpCbsLink(connection);
#pragma warning restore CA1806 // Do not ignore method results

// When the connection is closed, close each of the links associated with it.

Expand Down Expand Up @@ -1102,7 +1100,7 @@ private static void ValidateTransport(ServiceBusTransportType transport)
{
if ((transport != ServiceBusTransportType.AmqpTcp) && (transport != ServiceBusTransportType.AmqpWebSockets))
{
throw new ArgumentException(nameof(transport), string.Format(CultureInfo.CurrentCulture, Resources.UnknownConnectionType, transport));
throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, Resources.UnknownConnectionType, transport), nameof(transport));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ public static Exception ToMessagingContractException(this Error error, bool conn

public static Exception ToMessagingContractException(string condition, string message, bool connectionError = false)
{
if (string.Equals(condition, AmqpClientConstants.TimeoutError.Value))
if (string.Equals(condition, AmqpClientConstants.TimeoutError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ServiceBusException(message, ServiceBusException.FailureReason.ServiceTimeout);
}

if (string.Equals(condition, AmqpErrorCode.NotFound.Value))
if (string.Equals(condition, AmqpErrorCode.NotFound.Value, StringComparison.InvariantCultureIgnoreCase))
{
if (connectionError)
{
Expand All @@ -108,68 +108,68 @@ public static Exception ToMessagingContractException(string condition, string me
return new ServiceBusException(message, ServiceBusException.FailureReason.MessagingEntityNotFound);
}

if (string.Equals(condition, AmqpErrorCode.NotImplemented.Value))
if (string.Equals(condition, AmqpErrorCode.NotImplemented.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new NotSupportedException(message);
}

if (string.Equals(condition, AmqpErrorCode.NotAllowed.Value))
if (string.Equals(condition, AmqpErrorCode.NotAllowed.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new InvalidOperationException(message);
}

if (string.Equals(condition, AmqpErrorCode.UnauthorizedAccess.Value) ||
string.Equals(condition, AmqpClientConstants.AuthorizationFailedError.Value))
if (string.Equals(condition, AmqpErrorCode.UnauthorizedAccess.Value, StringComparison.InvariantCultureIgnoreCase) ||
string.Equals(condition, AmqpClientConstants.AuthorizationFailedError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ServiceBusException(message, ServiceBusException.FailureReason.Unauthorized);
}

if (string.Equals(condition, AmqpClientConstants.ServerBusyError.Value))
if (string.Equals(condition, AmqpClientConstants.ServerBusyError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ServiceBusException(message, ServiceBusException.FailureReason.ServiceBusy);
}

if (string.Equals(condition, AmqpClientConstants.ArgumentError.Value))
if (string.Equals(condition, AmqpClientConstants.ArgumentError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ArgumentException(message);
}

if (string.Equals(condition, AmqpClientConstants.ArgumentOutOfRangeError.Value))
if (string.Equals(condition, AmqpClientConstants.ArgumentOutOfRangeError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ArgumentOutOfRangeException(message);
}

if (string.Equals(condition, AmqpClientConstants.EntityDisabledError.Value))
if (string.Equals(condition, AmqpClientConstants.EntityDisabledError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ServiceBusException(message, ServiceBusException.FailureReason.MessagingEntityDisabled);
}

if (string.Equals(condition, AmqpClientConstants.MessageLockLostError.Value))
if (string.Equals(condition, AmqpClientConstants.MessageLockLostError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ServiceBusException(message, ServiceBusException.FailureReason.MessageLockLost);
}

if (string.Equals(condition, AmqpClientConstants.SessionLockLostError.Value))
if (string.Equals(condition, AmqpClientConstants.SessionLockLostError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ServiceBusException(message, ServiceBusException.FailureReason.SessionLockLost);
}

if (string.Equals(condition, AmqpErrorCode.ResourceLimitExceeded.Value))
if (string.Equals(condition, AmqpErrorCode.ResourceLimitExceeded.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ServiceBusException(message, ServiceBusException.FailureReason.QuotaExceeded);
}

if (string.Equals(condition, AmqpErrorCode.MessageSizeExceeded.Value))
if (string.Equals(condition, AmqpErrorCode.MessageSizeExceeded.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ServiceBusException(message, ServiceBusException.FailureReason.MessageSizeExceeded);
}

if (string.Equals(condition, AmqpClientConstants.MessageNotFoundError.Value))
if (string.Equals(condition, AmqpClientConstants.MessageNotFoundError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ServiceBusException(message, ServiceBusException.FailureReason.MessageNotFound);
}

if (string.Equals(condition, AmqpClientConstants.SessionCannotBeLockedError.Value))
if (string.Equals(condition, AmqpClientConstants.SessionCannotBeLockedError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ServiceBusException(message, ServiceBusException.FailureReason.SessionCannotBeLocked);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ internal static class AmqpMessageConverter
private const string SequenceNumberName = "x-opt-sequence-number";
private const string EnqueueSequenceNumberName = "x-opt-enqueue-sequence-number";
private const string LockedUntilName = "x-opt-locked-until";
private const string PublisherName = "x-opt-publisher";
private const string PartitionKeyName = "x-opt-partition-key";
private const string PartitionIdName = "x-opt-partition-id";
private const string ViaPartitionKeyName = "x-opt-via-partition-key";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ namespace Azure.Messaging.ServiceBus.Amqp
/// </summary>
///
/// <seealso cref="Azure.Messaging.ServiceBus.Core.TransportReceiver" />
#pragma warning disable CA1001 // Types that own disposable fields should be disposable
internal class AmqpReceiver : TransportReceiver
#pragma warning restore CA1001 // Types that own disposable fields should be disposable
{
/// <summary>Indicates whether or not this instance has been closed.</summary>
private bool _closed = false;
Expand Down Expand Up @@ -181,13 +183,13 @@ private async Task<ReceivingAmqpLink> OpenReceiverLinkAsync(
}
}

private void CloseLink(ReceivingAmqpLink link)
private static void CloseLink(ReceivingAmqpLink link)
{
link.Session?.SafeClose();
link.SafeClose();
}

private void CloseLink(RequestResponseAmqpLink link)
private static void CloseLink(RequestResponseAmqpLink link)
{
link.Session?.SafeClose();
link.SafeClose();
Expand Down Expand Up @@ -734,18 +736,18 @@ private async Task DisposeMessageRequestResponseAsync(
}
}

private Outcome GetAbandonOutcome(IDictionary<string, object> propertiesToModify) =>
private static Outcome GetAbandonOutcome(IDictionary<string, object> propertiesToModify) =>
GetModifiedOutcome(propertiesToModify, false);

private Outcome GetDeferOutcome(IDictionary<string, object> propertiesToModify) =>
private static Outcome GetDeferOutcome(IDictionary<string, object> propertiesToModify) =>
GetModifiedOutcome(propertiesToModify, true);

private List<ArraySegment<byte>> ConvertLockTokensToDeliveryTags(IEnumerable<Guid> lockTokens)
private static List<ArraySegment<byte>> ConvertLockTokensToDeliveryTags(IEnumerable<Guid> lockTokens)
{
return lockTokens.Select(lockToken => new ArraySegment<byte>(lockToken.ToByteArray())).ToList();
}

private Outcome GetModifiedOutcome(
private static Outcome GetModifiedOutcome(
IDictionary<string, object> propertiesToModify,
bool undeliverableHere)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

namespace Azure.Messaging.ServiceBus.Amqp
{
#pragma warning disable CA1001 // Types that own disposable fields should be disposable. AmqpRuleManager does not own connection scope.
internal class AmqpRuleManager : TransportRuleManager
#pragma warning restore CA1001 // Types that own disposable fields should be disposable
{
/// <summary>
/// The path of the Service Bus subscription to which the rule manager is bound.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using System.Globalization;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -25,7 +26,9 @@ namespace Azure.Messaging.ServiceBus.Amqp
///
/// <seealso cref="Azure.Messaging.ServiceBus.Core.TransportSender" />
///
#pragma warning disable CA1001 // Types that own disposable fields should be disposable. The AmqpSender doesn't own the connection scope.
internal class AmqpSender : TransportSender
#pragma warning restore CA1001 // Types that own disposable fields should be disposable
{
/// <summary>Indicates whether or not this instance has been closed.</summary>
private bool _closed = false;
Expand Down Expand Up @@ -239,7 +242,7 @@ internal virtual async Task SendBatchInternalAsync(
using (AmqpMessage batchMessage = messageFactory())
{

string messageHash = batchMessage.GetHashCode().ToString();
string messageHash = batchMessage.GetHashCode().ToString(CultureInfo.InvariantCulture);

ArraySegment<byte> transactionId = AmqpConstants.NullBinary;
Transaction ambientTransaction = Transaction.Current;
Expand All @@ -258,7 +261,7 @@ internal virtual async Task SendBatchInternalAsync(

if (batchMessage.SerializedMessageSize > MaxMessageSize)
{
throw new ServiceBusException(string.Format(Resources.MessageSizeExceeded, messageHash, batchMessage.SerializedMessageSize, MaxMessageSize, _entityPath), ServiceBusException.FailureReason.MessageSizeExceeded);
throw new ServiceBusException(string.Format(CultureInfo.InvariantCulture, Resources.MessageSizeExceeded, messageHash, batchMessage.SerializedMessageSize, MaxMessageSize, _entityPath), ServiceBusException.FailureReason.MessageSizeExceeded);
}

// Attempt to send the message batch.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Azure.Messaging.ServiceBus.Amqp.Framing
{
internal sealed class AmqpCorrelationFilterCodec : AmqpFilterCodec
{
public static readonly string Name = AmqpConstants.Vendor + ":correlation-filter:list";
public const string Name = AmqpConstants.Vendor + ":correlation-filter:list";
public const ulong Code = 0x000001370000009;
private const int Fields = 9;
private AmqpMap properties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Azure.Messaging.ServiceBus.Amqp.Framing
{
internal sealed class AmqpEmptyRuleActionCodec : AmqpRuleActionCodec
{
public static readonly string Name = AmqpConstants.Vendor + ":empty-rule-action:list";
public const string Name = AmqpConstants.Vendor + ":empty-rule-action:list";
public const ulong Code = 0x0000013700000005;
private const int Fields = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Azure.Messaging.ServiceBus.Amqp.Framing
{
internal sealed class AmqpFalseFilterCodec : AmqpFilterCodec
{
public static readonly string Name = AmqpConstants.Vendor + ":false-filter:list";
public const string Name = AmqpConstants.Vendor + ":false-filter:list";
public const ulong Code = 0x000001370000008;

public AmqpFalseFilterCodec() : base(Name, Code) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Azure.Messaging.ServiceBus.Amqp.Framing
{
internal sealed class AmqpRuleDescriptionCodec : DescribedList
{
public static readonly string Name = AmqpConstants.Vendor + ":rule-description:list";
public const string Name = AmqpConstants.Vendor + ":rule-description:list";
public const ulong Code = 0x0000013700000004;
private const int Fields = 4;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Azure.Messaging.ServiceBus.Amqp.Framing
{
internal sealed class AmqpSqlFilterCodec : AmqpFilterCodec
{
public static readonly string Name = AmqpConstants.Vendor + ":sql-filter:list";
public const string Name = AmqpConstants.Vendor + ":sql-filter:list";
public const ulong Code = 0x000001370000006;
private const int Fields = 2;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Azure.Messaging.ServiceBus.Amqp.Framing
{
internal sealed class AmqpSqlRuleActionCodec : AmqpRuleActionCodec
{
public static readonly string Name = AmqpConstants.Vendor + ":sql-rule-action:list";
public const string Name = AmqpConstants.Vendor + ":sql-rule-action:list";
public const ulong Code = 0x0000013700000006;
private const int Fields = 2;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Azure.Messaging.ServiceBus.Amqp.Framing
{
internal sealed class AmqpTrueFilterCodec : AmqpFilterCodec
{
public static readonly string Name = AmqpConstants.Vendor + ":true-filter:list";
public const string Name = AmqpConstants.Vendor + ":true-filter:list";
public const ulong Code = 0x000001370000007;

public AmqpTrueFilterCodec() : base(Name, Code) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
<Version>7.0.0-preview.3</Version>
<PackageTags>Azure;Service Bus;ServiceBus;.NET;AMQP;$(PackageCommonTags)</PackageTags>
<TargetFrameworks>$(RequiredTargetFrameworks)</TargetFrameworks>
<EnableFxCopAnalyzers>false</EnableFxCopAnalyzers>
<EnableApiCompat>false</EnableApiCompat>
<GenerateAPIListing>false</GenerateAPIListing>
</PropertyGroup>
Expand Down
Loading