Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topic filter sample #20816

Merged
merged 7 commits into from
May 4, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.CommandLine;
using System.CommandLine.Invocation;
using System.Threading.Tasks;
using Azure.Identity;
using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;

namespace TopicSubscriptionWithRuleOperationsSample
{
public class Program
{
private const string TopicName = "TopicSubscriptionWithRuleOperationsSample";

private const string NoFilterSubscriptionName = "NoFilterSubscription";
private const string SqlFilterOnlySubscriptionName = "SqlFilterOnlySubscription";
private const string SqlFilterWithActionSubscriptionName = "SqlFilterWithActionSubscription";
private const string CorrelationFilterSubscriptionName = "CorrelationFilterSubscription";

private static ServiceBusClient s_client;
private static ServiceBusAdministrationClient s_adminClient;
private static ServiceBusSender s_sender;

public static async Task Main(string[] args)
{
var command = new RootCommand("Demonstrates the Topic Filters feature of Azure Service Bus.")
{
new Option<string>(
alias: "--namespace",
description: "Fully qualified Service Bus Queue namespace to use") {Name = "FullyQualifiedNamespace"},
new Option<string>(
alias: "--connection-variable",
description: "The name of an environment variable containing the connection string to use.") {Name = "Connection"},
};
command.Handler = CommandHandler.Create<string, string>(RunAsync);
await command.InvokeAsync(args);
}

private static async Task RunAsync(string fullyQualifiedNamespace, string connection)
{
if (!string.IsNullOrEmpty(connection))
{
s_client = new ServiceBusClient(Environment.GetEnvironmentVariable(connection));
s_adminClient = new ServiceBusAdministrationClient(Environment.GetEnvironmentVariable(connection));
}
else if (!string.IsNullOrEmpty(fullyQualifiedNamespace))
{
var defaultAzureCredential = new DefaultAzureCredential();
s_client = new ServiceBusClient(fullyQualifiedNamespace, defaultAzureCredential);
s_adminClient = new ServiceBusAdministrationClient(fullyQualifiedNamespace, defaultAzureCredential);
}
else
{
throw new ArgumentException(
"Either a fully qualified namespace or a connection string environment variable must be specified.");
}

Console.WriteLine($"Creating topic {TopicName}");
await s_adminClient.CreateTopicAsync(TopicName);

s_sender = s_client.CreateSender(TopicName);

// First Subscription is already created with default rule. Leave as is.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider defining what a filter is and why we'd want to use them? Alternative: point at existing documentation with a (very) brief description

Console.WriteLine($"Creating subscription {NoFilterSubscriptionName}");
await s_adminClient.CreateSubscriptionAsync(TopicName, NoFilterSubscriptionName);

Console.WriteLine($"SubscriptionName: {NoFilterSubscriptionName}, Removing and re-adding Default Rule");
await s_adminClient.DeleteRuleAsync(TopicName, NoFilterSubscriptionName, RuleProperties.DefaultRuleName);
await s_adminClient.CreateRuleAsync(TopicName, NoFilterSubscriptionName,
new CreateRuleOptions(RuleProperties.DefaultRuleName, new TrueRuleFilter()));

// 2nd Subscription: Add SqlFilter on Subscription 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's a SqlFilter and what's its significance? What are we filtering on? Why are we removing the default rule?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A link to the documentation for the SQL Rule syntax would also be super helpful for folks here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We link to the docs in the README, but I can add links in the comments as well. The comments (and code) was more or less copied as-is from https://github.com/Azure/azure-service-bus/tree/master/samples/DotNet/GettingStarted/Microsoft.Azure.ServiceBus/TopicSubscriptionWithRuleOperationsSample

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even just a mention of something like "go see the README if you want to know stuffs" would be good by me. When we had code-forward samples initially, we saw questions coming up in issues that we had answered in the README or elsewhere; people seemed to be jumping to the code and skipping the docs.

// Delete Default Rule.
// Add the required SqlFilter Rule
// Note: Does not apply to this sample but if there are multiple rules configured for a
// single subscription, then one message is delivered to the subscription when any of the
// rule matches. If more than one rules match and if there is no `SqlRuleAction` set for the
// rule, then only one message will be delivered to the subscription. If more than one rules
// match and there is a `SqlRuleAction` specified for the rule, then one message per `SqlRuleAction`
// is delivered to the subscription.
Console.WriteLine($"Creating subscription {SqlFilterOnlySubscriptionName}");
await s_adminClient.CreateSubscriptionAsync(TopicName, SqlFilterOnlySubscriptionName);

Console.WriteLine($"SubscriptionName: {SqlFilterOnlySubscriptionName}, Removing Default Rule and Adding SqlFilter");
await s_adminClient.DeleteRuleAsync(TopicName, SqlFilterOnlySubscriptionName, RuleProperties.DefaultRuleName);
await s_adminClient.CreateRuleAsync(
TopicName,
SqlFilterOnlySubscriptionName,
new CreateRuleOptions {Name = "RedSqlRule", Filter = new SqlRuleFilter("Color = 'Red'")});


// 3rd Subscription: Add SqlFilter and SqlRuleAction on Subscription 3
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same general thoughts as above. This does a good job of demonstrating the mechanics but even with knowledge of the subject matter, it's not clear from the comments what we're filtering for and what makes it significant for the purposes of demonstration in this sample.

I realize that we're focusing on the "how" but this topic is esoteric enough that I wonder if those who already know the details of "what" and "why" aren't already well positioned to understand the "how" form documentation. I think it would be beneficial if we added some additional context and details to assume less familiarity and knowledge of readers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really think that what you are looking for should be covered by the service docs. I don't think we'd want to repeat the same thing using slightly different language in 4 repos across 4 languages. The goal here is to provide samples that can be linked from these service docs for users who want to see how the feature can be implemented.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've got nothing against just pointing back to the docs, I'm just looking at it from the perspective of someone browsing our repository and seeing the sample without context.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well isn't that what the README is for 😄

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'd think...

// Delete Default Rule
// Add the required SqlFilter Rule and Action
Console.WriteLine($"Creating subscription {SqlFilterWithActionSubscriptionName}");
await s_adminClient.CreateSubscriptionAsync(TopicName, SqlFilterWithActionSubscriptionName);

Console.WriteLine(
$"SubscriptionName: {SqlFilterWithActionSubscriptionName}, Removing Default Rule and Adding SqlFilter and SqlRuleAction");
await s_adminClient.DeleteRuleAsync(TopicName, SqlFilterWithActionSubscriptionName, RuleProperties.DefaultRuleName);
await s_adminClient.CreateRuleAsync(
TopicName,
SqlFilterWithActionSubscriptionName,
new CreateRuleOptions
{
Name = "BlueSqlRule",
Filter = new SqlRuleFilter("Color = 'Blue'"),
Action = new SqlRuleAction("SET Color = 'BlueProcessed'")
});

// 4th Subscription: Add Correlation Filter on Subscription 4
Console.WriteLine($"Creating subscription {CorrelationFilterSubscriptionName}");
await s_adminClient.CreateSubscriptionAsync(TopicName, CorrelationFilterSubscriptionName);

Console.WriteLine($"SubscriptionName: {CorrelationFilterSubscriptionName}, Removing Default Rule and Adding CorrelationFilter");
await s_adminClient.DeleteRuleAsync(TopicName, CorrelationFilterSubscriptionName, RuleProperties.DefaultRuleName);
await s_adminClient.CreateRuleAsync(
TopicName,
CorrelationFilterSubscriptionName,
new CreateRuleOptions
{
Name = "ImportantCorrelationRule", Filter = new CorrelationRuleFilter {Subject = "Red", CorrelationId = "important"}
});

// Get Rules on Subscription, called here only for one subscription as example
var rules = s_adminClient.GetRulesAsync(TopicName, CorrelationFilterSubscriptionName);
await foreach (var rule in rules)
{
Console.WriteLine(
$"GetRules:: SubscriptionName: {CorrelationFilterSubscriptionName}, CorrelationFilter Name: {rule.Name}, Rule: {rule.Filter}");
}

// Send messages to Topic
await SendMessagesAsync();

// Receive messages from 'NoFilterSubscription'. Should receive all 9 messages
Console.ForegroundColor = ConsoleColor.Yellow;
await ReceiveMessagesAsync(NoFilterSubscriptionName);

Console.ForegroundColor = ConsoleColor.Red;
// Receive messages from 'sqlFilterOnlySubscription'. Should receive all messages with Color = 'Red' i.e 3 messages
await ReceiveMessagesAsync(SqlFilterOnlySubscriptionName);

Console.ForegroundColor = ConsoleColor.Blue;
// Receive messages from 'SqlFilterWithActionSubscription'. Should receive all messages with Color = 'Blue'
// i.e 3 messages AND all messages should have color set to 'BlueProcessed'
await ReceiveMessagesAsync(SqlFilterWithActionSubscriptionName);

Console.ForegroundColor = ConsoleColor.DarkMagenta;
// Receive messages from 'CorrelationFilterSubscription'. Should receive all messages with Color = 'Red' and CorrelationId = "important"
// i.e 1 message
await ReceiveMessagesAsync(CorrelationFilterSubscriptionName);
Console.ResetColor();

Console.WriteLine("======================================================================");
Console.WriteLine("Completed Receiving all messages... Press any key to clean up and exit");
Console.WriteLine("======================================================================");

Console.ReadKey();

Console.WriteLine("Disposing sender");
await s_sender.CloseAsync();
Console.WriteLine("Disposing client");
await s_client.DisposeAsync();

Console.WriteLine("Deleting topic");
// Deleting the topic will handle deleting all the subscriptions as well.
await s_adminClient.DeleteTopicAsync(TopicName);
}

private static async Task SendMessagesAsync()
{
Console.WriteLine($"==========================================================================");
Console.WriteLine("Sending Messages to Topic");
try
{
await Task.WhenAll(
SendMessageAsync(subject: "Red"),
SendMessageAsync(subject: "Blue"),
SendMessageAsync(subject: "Red", correlationId: "important"),
SendMessageAsync(subject: "Blue", correlationId: "important"),
SendMessageAsync(subject: "Red", correlationId: "notimportant"),
SendMessageAsync(subject: "Blue", correlationId: "notimportant"),
SendMessageAsync(subject: "Green"),
SendMessageAsync(subject: "Green", correlationId: "important"),
SendMessageAsync(subject: "Green", correlationId: "notimportant")
);
}
catch (Exception exception)
{
Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
}
}

private static async Task SendMessageAsync(string subject, string correlationId = null)
{
ServiceBusMessage message = new() {Subject = subject};
message.ApplicationProperties.Add("Color", subject);

if (correlationId != null)
{
message.CorrelationId = correlationId;
}

await s_sender.SendMessageAsync(message);
Console.WriteLine($"Sent Message:: Label: {message.Subject}, CorrelationId: {message.CorrelationId}");
}

private static async Task ReceiveMessagesAsync(string subscriptionName)
{
await using ServiceBusReceiver subscriptionReceiver = s_client.CreateReceiver(
TopicName,
subscriptionName,
new ServiceBusReceiverOptions {ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete});

Console.WriteLine($"==========================================================================");
Console.WriteLine($"{DateTime.Now} :: Receiving Messages From Subscription: {subscriptionName}");
int receivedMessageCount = 0;
while (true)
{
var receivedMessage = await subscriptionReceiver.ReceiveMessageAsync(TimeSpan.FromSeconds(1));
if (receivedMessage != null)
{
receivedMessage.ApplicationProperties.TryGetValue("Color", out object colorProperty);
Console.WriteLine($"Color Property = {colorProperty}, CorrelationId = {receivedMessage.CorrelationId}");
receivedMessageCount++;
}
else
{
break;
}
}

Console.WriteLine($"{DateTime.Now} :: Received '{receivedMessageCount}' Messages From Subscription: {subscriptionName}");
Console.WriteLine($"==========================================================================");
await subscriptionReceiver.CloseAsync();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
---
page_type: sample
languages:
- csharp
products:
- azure
- azure-service-bus
name: Explore topic filters in Azure Service Bus
description: This sample shows how to apply topic filters to subscriptions.
---

#
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved

This sample shows how to apply topic filters to subscriptions.

## What is a Topic filter?

[Topic filters](https://docs.microsoft.com/azure/service-bus-messaging/topic-filters), or rules, can be applied to subscriptions to allow subscribers to define which messages they want to receive from a topic.

## Sample Code

The sample implements four scenarios:

* Create a subscription with no filters. Technically, all subscriptions are created with the default `TrueFilter`. In the sample, we remove and re-add this filter.
jsquire marked this conversation as resolved.
Show resolved Hide resolved

* Create a subscription with a SQL filter against a user-defined property. SQL filters hold a SQL-like conditional expression that is evaluated in the broker against user-defined or system properties. If the expression evaluates to `true`, the message is delivered to the subscription.

* Create a subscription with a SQL filter and SQL action. In this scenario, we define a SQL filter along with a SQL-like expression that performs an action on the received message, for any messages that makes it through the filter expression.

* Create a subscription with a Correlation filter. A correlation filter provides a strongly typed model for matching against the properties of a received message.

The sample code is further documented inline in the `Program.cs` C# file.

## Prerequisites
In order to run the sample, you will need a Service Bus Namespace. For more information on getting setup see the [Getting Started](https://github.com/Azure/azure-sdk-for-net/tree/master/sdk/servicebus/Azure.Messaging.ServiceBus#getting-started) section of the Service Bus library Readme. Once you have a Service Bus Namespace, you will need to create a queue that can be used for the sample.

## Building the Sample

To build the sample:

1. Install [.NET Core 3.1](https://dot.net) or newer.
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved

2. Run in the project directory:

```bash
dotnet build
```

## Running the Sample

You can either run the executable you just build, or build and run the project at the same time. There are two ways to authenticate in the sample.
The sample will automatically create the topic and subscriptions for you as well as delete them at the end of the run.

### Use Azure Activity Directory
You can use any of the [authentication mechanisms](https://docs.microsoft.com/dotnet/api/overview/azure/identity-readme?view=azure-dotnet) that the `DefaultAzureCredential` from the Azure.Identity supports.

To run the sample using Azure Identity:

```bash
dotnet run -- --namespace <fully qualified namespace>
```
### Use a Service Bus connection string
The other way to run the sample is by specifying an environment variable that contains the connection string for the namespace you wish to use:

```bash
dotnet run -- --connection-variable <environment variable name>
```


Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
<PackageReference Include="Azure.Identity" Version="1.2.1" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.2.0-beta.2" />
<PackageReference Include="System.CommandLine" Version="2.0.0-beta1.21216.1" />
</ItemGroup>

</Project>