From bb2d0e7d83671d2f37f1f436b7ebab8c36e8b5d9 Mon Sep 17 00:00:00 2001 From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com> Date: Tue, 4 May 2021 13:33:37 -0700 Subject: [PATCH] Topic filter sample (#20816) * Topic filter sample * Update README.md * Update README.md * Fix link * PR FB * PR fb --- .../samples/TopicFilters/Program.cs | 226 ++++++++++++++++++ .../samples/TopicFilters/README.md | 69 ++++++ .../samples/TopicFilters/TopicFilters.csproj | 13 + 3 files changed, 308 insertions(+) create mode 100644 sdk/servicebus/Azure.Messaging.ServiceBus/samples/TopicFilters/Program.cs create mode 100644 sdk/servicebus/Azure.Messaging.ServiceBus/samples/TopicFilters/README.md create mode 100644 sdk/servicebus/Azure.Messaging.ServiceBus/samples/TopicFilters/TopicFilters.csproj diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/TopicFilters/Program.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/TopicFilters/Program.cs new file mode 100644 index 0000000000000..4ad5174be26c4 --- /dev/null +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/TopicFilters/Program.cs @@ -0,0 +1,226 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Collections.Generic; +using System.CommandLine; +using System.CommandLine.Invocation; +using System.Threading.Tasks; +using Azure.Identity; +using Azure.Messaging.ServiceBus; +using Azure.Messaging.ServiceBus.Administration; + +namespace TopicSubscriptionWithRuleOperationsSample +{ + public class Program + { + private const string TopicName = "TopicSubscriptionWithRuleOperationsSample"; + + private const string NoFilterSubscriptionName = "NoFilterSubscription"; + private const string SqlFilterOnlySubscriptionName = "RedSqlFilterSubscription"; + private const string SqlFilterWithActionSubscriptionName = "BlueSqlFilterWithActionSubscription"; + private const string CorrelationFilterSubscriptionName = "ImportantCorrelationFilterSubscription"; + + private static ServiceBusClient s_client; + private static ServiceBusAdministrationClient s_adminClient; + private static ServiceBusSender s_sender; + + public static async Task Main(string[] args) + { + var command = new RootCommand("Demonstrates the Topic Filters feature of Azure Service Bus.") + { + new Option( + alias: "--namespace", + description: "Fully qualified Service Bus Queue namespace to use") {Name = "FullyQualifiedNamespace"}, + new Option( + alias: "--connection-variable", + description: "The name of an environment variable containing the connection string to use.") {Name = "Connection"}, + }; + command.Handler = CommandHandler.Create(RunAsync); + await command.InvokeAsync(args); + } + + private static async Task RunAsync(string fullyQualifiedNamespace, string connection) + { + if (!string.IsNullOrEmpty(connection)) + { + s_client = new ServiceBusClient(Environment.GetEnvironmentVariable(connection)); + s_adminClient = new ServiceBusAdministrationClient(Environment.GetEnvironmentVariable(connection)); + } + else if (!string.IsNullOrEmpty(fullyQualifiedNamespace)) + { + var defaultAzureCredential = new DefaultAzureCredential(); + s_client = new ServiceBusClient(fullyQualifiedNamespace, defaultAzureCredential); + s_adminClient = new ServiceBusAdministrationClient(fullyQualifiedNamespace, defaultAzureCredential); + } + else + { + throw new ArgumentException( + "Either a fully qualified namespace or a connection string environment variable must be specified."); + } + + Console.WriteLine($"Creating topic {TopicName}"); + await s_adminClient.CreateTopicAsync(TopicName); + + s_sender = s_client.CreateSender(TopicName); + + // First Subscription is already created with default rule. Leave as is. + Console.WriteLine($"Creating subscription {NoFilterSubscriptionName}"); + await s_adminClient.CreateSubscriptionAsync(TopicName, NoFilterSubscriptionName); + + Console.WriteLine($"SubscriptionName: {NoFilterSubscriptionName}, Removing and re-adding Default Rule"); + await s_adminClient.DeleteRuleAsync(TopicName, NoFilterSubscriptionName, RuleProperties.DefaultRuleName); + await s_adminClient.CreateRuleAsync(TopicName, NoFilterSubscriptionName, + new CreateRuleOptions(RuleProperties.DefaultRuleName, new TrueRuleFilter())); + + // 2nd Subscription: Add SqlFilter on Subscription 2 + // In this scenario, rather than deleting the default rule after creating the subscription, + // we will create the subscription along with our desired rule in a single operation. + // See https://docs.microsoft.com/en-us/azure/service-bus-messaging/topic-filters to learn more about topic filters. + Console.WriteLine($"Creating subscription {SqlFilterOnlySubscriptionName}"); + await s_adminClient.CreateSubscriptionAsync( + new CreateSubscriptionOptions(TopicName, SqlFilterOnlySubscriptionName), + new CreateRuleOptions { Name = "RedSqlRule", Filter = new SqlRuleFilter("Color = 'Red'") }); + + // 3rd Subscription: Add the SqlFilter Rule and Action + // See https://docs.microsoft.com/en-us/azure/service-bus-messaging/topic-filters#actions to learn more about actions. + Console.WriteLine($"Creating subscription {SqlFilterWithActionSubscriptionName}"); + await s_adminClient.CreateSubscriptionAsync( + new CreateSubscriptionOptions(TopicName, SqlFilterWithActionSubscriptionName), + new CreateRuleOptions + { + Name = "BlueSqlRule", + Filter = new SqlRuleFilter("Color = 'Blue'"), + Action = new SqlRuleAction("SET Color = 'BlueProcessed'") + }); + + // 4th Subscription: Add Correlation Filter on Subscription 4 + Console.WriteLine($"Creating subscription {CorrelationFilterSubscriptionName}"); + await s_adminClient.CreateSubscriptionAsync( + new CreateSubscriptionOptions(TopicName, CorrelationFilterSubscriptionName), + new CreateRuleOptions + { + Name = "ImportantCorrelationRule", + Filter = new CorrelationRuleFilter { Subject = "Red", CorrelationId = "important" } + }); + + // Get Rules on Subscription, called here only for one subscription as example + var rules = s_adminClient.GetRulesAsync(TopicName, CorrelationFilterSubscriptionName); + await foreach (var rule in rules) + { + Console.WriteLine( + $"GetRules:: SubscriptionName: {CorrelationFilterSubscriptionName}, CorrelationFilter Name: {rule.Name}, Rule: {rule.Filter}"); + } + + // Send messages to Topic + await SendMessagesAsync(); + + // Receive messages from 'NoFilterSubscription'. Should receive all 9 messages + await ReceiveMessagesAsync(NoFilterSubscriptionName); + + // Receive messages from 'SqlFilterOnlySubscription'. Should receive all messages with Color = 'Red' i.e 3 messages + await ReceiveMessagesAsync(SqlFilterOnlySubscriptionName); + + // Receive messages from 'SqlFilterWithActionSubscription'. Should receive all messages with Color = 'Blue' + // i.e 3 messages AND all messages should have color set to 'BlueProcessed' + await ReceiveMessagesAsync(SqlFilterWithActionSubscriptionName); + + // Receive messages from 'CorrelationFilterSubscription'. Should receive all messages with Color = 'Red' and CorrelationId = "important" + // i.e 1 message + await ReceiveMessagesAsync(CorrelationFilterSubscriptionName); + Console.ResetColor(); + + Console.WriteLine("======================================================================="); + Console.WriteLine("Completed Receiving all messages. Disposing clients and deleting topic."); + Console.WriteLine("======================================================================="); + + Console.WriteLine("Disposing sender"); + await s_sender.CloseAsync(); + Console.WriteLine("Disposing client"); + await s_client.DisposeAsync(); + + Console.WriteLine("Deleting topic"); + + // Deleting the topic will handle deleting all the subscriptions as well. + await s_adminClient.DeleteTopicAsync(TopicName); + } + + private static async Task SendMessagesAsync() + { + Console.WriteLine($"=========================================================================="); + Console.WriteLine("Creating messages to send to Topic"); + List messages = new (); + messages.Add(CreateMessage(subject: "Red")); + messages.Add(CreateMessage(subject: "Blue")); + messages.Add(CreateMessage(subject: "Red", correlationId: "important")); + messages.Add(CreateMessage(subject: "Blue", correlationId: "important")); + messages.Add(CreateMessage(subject: "Red", correlationId: "notimportant")); + messages.Add(CreateMessage(subject: "Blue", correlationId: "notimportant")); + messages.Add(CreateMessage(subject: "Green")); + messages.Add(CreateMessage(subject: "Green", correlationId: "important")); + messages.Add(CreateMessage(subject: "Green", correlationId: "notimportant")); + + Console.WriteLine("Sending messages to send to Topic"); + await s_sender.SendMessagesAsync(messages); + Console.WriteLine($"=========================================================================="); + } + + private static ServiceBusMessage CreateMessage(string subject, string correlationId = null) + { + ServiceBusMessage message = new() {Subject = subject}; + message.ApplicationProperties.Add("Color", subject); + + if (correlationId != null) + { + message.CorrelationId = correlationId; + } + + PrintMessage(message); + + return message; + } + + private static void PrintMessage(ServiceBusMessage message) + { + Console.ForegroundColor = (ConsoleColor) Enum.Parse(typeof(ConsoleColor), message.Subject); + Console.WriteLine($"Created message with color: {message.ApplicationProperties["Color"]}, CorrelationId: {message.CorrelationId}"); + Console.ResetColor(); + } + + private static void PrintReceivedMessage(ServiceBusReceivedMessage message) + { + Console.ForegroundColor = (ConsoleColor) Enum.Parse(typeof(ConsoleColor), message.Subject); + Console.WriteLine($"Received message with color: {message.ApplicationProperties["Color"]}, CorrelationId: {message.CorrelationId}"); + Console.ResetColor(); + } + + private static async Task ReceiveMessagesAsync(string subscriptionName) + { + await using ServiceBusReceiver subscriptionReceiver = s_client.CreateReceiver( + TopicName, + subscriptionName, + new ServiceBusReceiverOptions {ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete}); + + Console.WriteLine($"=========================================================================="); + Console.WriteLine($"{DateTime.Now} :: Receiving Messages From Subscription: {subscriptionName}"); + int receivedMessageCount = 0; + while (true) + { + var receivedMessage = await subscriptionReceiver.ReceiveMessageAsync(TimeSpan.FromSeconds(1)); + if (receivedMessage != null) + { + PrintReceivedMessage(receivedMessage); + receivedMessageCount++; + } + else + { + break; + } + } + + Console.WriteLine($"{DateTime.Now} :: Received '{receivedMessageCount}' Messages From Subscription: {subscriptionName}"); + Console.WriteLine($"=========================================================================="); + await subscriptionReceiver.CloseAsync(); + } + } +} diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/TopicFilters/README.md b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/TopicFilters/README.md new file mode 100644 index 0000000000000..a2f24c49f5120 --- /dev/null +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/TopicFilters/README.md @@ -0,0 +1,69 @@ +--- +page_type: sample +languages: +- csharp +products: +- azure +- azure-service-bus +name: Explore topic filters in Azure Service Bus +description: This sample shows how to apply topic filters to subscriptions. +--- + +# Topic Filters + +This sample shows how to apply topic filters to subscriptions. + +## What is a Topic Filter? + +[Topic filters](https://docs.microsoft.com/azure/service-bus-messaging/topic-filters), or rules, can be applied to subscriptions to allow subscribers to define which messages they want to receive from a topic. For instance, certain subscribers may only be interested in processing messages that fit a certain pattern. Rather than create separate topics for each type of message, or add filtering client side within the application, an application can use a single topic and add filtering logic in the subcriptions to achieve the same result. This is more efficient than filtering client-side as the messages that don't match the filter do not go over the wire. It is also generally more simple and flexible than creating separate topics for each type of message, as it provides a more decoupled architecture between sender and receiver. To learn more, see the [usage patterns](https://docs.microsoft.com/azure/service-bus-messaging/topic-filters#usage-patterns) section of the topic filters docs. + +## Sample Code + +The sample implements four scenarios: + +* Create a subscription with no filters. Technically, all subscriptions are created with the default `TrueFilter`. In the sample, we remove and re-add this filter to demonstrate that all subscriptions will have this filter by default. + +* Create a subscription with a SQL filter against a user-defined property. SQL filters hold a SQL-like conditional expression that is evaluated in the broker against user-defined or system properties. If the expression evaluates to `true`, the message is delivered to the subscription. + +* Create a subscription with a SQL filter and SQL action. In this scenario, we define a SQL filter along with a SQL expression that performs an action on the received message, for any messages that makes it through the filter expression. + +* Create a subscription with a Correlation filter. A correlation filter provides a strongly typed model for matching against the properties of a received message. Correlation filters are recommended over SQL filters as they are more efficient. + +The sample code is further documented inline in the `Program.cs` C# file. + +## Prerequisites +In order to run the sample, you will need a Service Bus Namespace. For more information on getting setup see the [Getting Started](https://github.com/Azure/azure-sdk-for-net/tree/master/sdk/servicebus/Azure.Messaging.ServiceBus#getting-started) section of the Service Bus library Readme. Once you have a Service Bus Namespace, you will need to create a queue that can be used for the sample. + +## Building the Sample + +To build the sample: + +1. Install [.NET 5.0](https://dot.net) or newer. + +2. Run in the project directory: + + ```bash + dotnet build + ``` + +## Running the Sample + +You can either run the executable you just build, or build and run the project at the same time. There are two ways to authenticate in the sample. +The sample will automatically create the topic and subscriptions for you as well as delete them at the end of the run. + +### Use Azure Activity Directory +You can use any of the [authentication mechanisms](https://docs.microsoft.com/dotnet/api/overview/azure/identity-readme?view=azure-dotnet) that the `DefaultAzureCredential` from the Azure.Identity supports. + +To run the sample using Azure Identity: + +```bash +dotnet run -- --namespace +``` +### Use a Service Bus connection string +The other way to run the sample is by specifying an environment variable that contains the connection string for the namespace you wish to use: + +```bash +dotnet run -- --connection-variable +``` + + diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/samples/TopicFilters/TopicFilters.csproj b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/TopicFilters/TopicFilters.csproj new file mode 100644 index 0000000000000..66f513f6f920f --- /dev/null +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/samples/TopicFilters/TopicFilters.csproj @@ -0,0 +1,13 @@ + + + Exe + net5.0 + + + + + + + + +