From b8a6b7b4e8953b82ec38c20860c7b20afe280cbe Mon Sep 17 00:00:00 2001 From: hongli750210 Date: Tue, 8 Jun 2021 18:15:51 +0800 Subject: [PATCH 01/11] Fix [Service Bus] Sample needed for Dead Letter Queues by LiHong 202106081815 --- .../azure-messaging-servicebus/pom.xml | 5 + .../servicebus/DeadletterQueueSample.java | 183 ++++++++++++++++++ 2 files changed, 188 insertions(+) create mode 100644 sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java diff --git a/sdk/servicebus/azure-messaging-servicebus/pom.xml b/sdk/servicebus/azure-messaging-servicebus/pom.xml index 2301d5e47afe..729c10fe85a2 100644 --- a/sdk/servicebus/azure-messaging-servicebus/pom.xml +++ b/sdk/servicebus/azure-messaging-servicebus/pom.xml @@ -104,6 +104,11 @@ 3.9.0 test + + com.google.code.gson + gson + [2.8.2,] + diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java new file mode 100644 index 000000000000..e492c3592038 --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java @@ -0,0 +1,183 @@ +package com.azure.messaging.servicebus; + +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import org.junit.jupiter.api.Test; +import reactor.core.Disposable; + +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.nio.charset.StandardCharsets.UTF_8; + +public class DeadletterQueueSample { + String connectionString = System.getenv("AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING"); + String queueName = System.getenv("AZURE_SERVICEBUS_SAMPLE_QUEUE_NAME"); + + static final Gson GSON = new Gson(); + + /** + * Main method to show how to update properties of Service Bus Queue. + * + * @param args Unused arguments to the program. + */ + public static void main(String[] args) throws InterruptedException { + DeadletterQueueSample sample = new DeadletterQueueSample(); + sample.run(); + } + + /** + * run method to invoke this demo on how to dead letter within a Service Bus Queue. + * + * @throws InterruptedException If the program is unable to sleep while waiting for the receive to complete. + */ + @Test + public void run() throws InterruptedException { + ServiceBusSenderAsyncClient senderAsyncClient = new ServiceBusClientBuilder() + .connectionString(connectionString) + .sender() + .queueName(queueName) + .buildAsyncClient(); + + // max delivery-count scenario + sendMessagesAsync(senderAsyncClient, 1); + exceedMaxDelivery(connectionString, queueName); + + // fix-up scenario + sendMessagesAsync(senderAsyncClient, Integer.MAX_VALUE); + this.receiveMessagesAsync(connectionString, queueName); + this.PickUpAndFixDeadletters(connectionString, queueName, senderAsyncClient); + + senderAsyncClient.close(); + } + + void sendMessagesAsync(ServiceBusSenderAsyncClient senderAsyncClient, int maxMessages) { + List> data = + GSON.fromJson( + "[" + + "{'name' = 'Einstein', 'firstName' = 'Albert'}," + + "{'name' = 'Heisenberg', 'firstName' = 'Werner'}," + + "{'name' = 'Curie', 'firstName' = 'Marie'}," + + "{'name' = 'Hawking', 'firstName' = 'Steven'}," + + "{'name' = 'Newton', 'firstName' = 'Isaac'}," + + "{'name' = 'Bohr', 'firstName' = 'Niels'}," + + "{'name' = 'Faraday', 'firstName' = 'Michael'}," + + "{'name' = 'Galilei', 'firstName' = 'Galileo'}," + + "{'name' = 'Kepler', 'firstName' = 'Johannes'}," + + "{'name' = 'Kopernikus', 'firstName' = 'Nikolaus'}" + + "]", + new TypeToken>>() { + }.getType()); + + for (int i = 0; i < Math.min(data.size(), maxMessages); i++) { + final String messageId = Integer.toString(i); + ServiceBusMessage message = new ServiceBusMessage(GSON.toJson(data.get(i), Map.class).getBytes(UTF_8)); + message.setContentType("application/json"); + message.setSubject(i % 2 == 0 ? "Scientist" : "Physicist"); + message.setMessageId(messageId); + message.setTimeToLive(Duration.ofMinutes(2)); + System.out.printf("Message sending: Id = %s\n", message.getMessageId()); + senderAsyncClient.sendMessage(message) + .doOnSuccess(onSuccess -> System.out.printf("\tMessage acknowledged: Id = %s\n", message.getMessageId())) + .block(); + } + } + + void exceedMaxDelivery(String connectionString, String queueName) throws InterruptedException { + ServiceBusReceiverAsyncClient receiverAsyncClient + = new ServiceBusClientBuilder() + .connectionString(connectionString) + .receiver() + .queueName(queueName) + .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) + .buildAsyncClient(); + Disposable disposable = receiverAsyncClient.receiveMessages().subscribe(receiveMessage -> { + System.out.printf("Picked up message; DeliveryCount %d\n", receiveMessage.getDeliveryCount()); + receiverAsyncClient.abandon(receiveMessage); + }); + Thread.sleep(10000); + disposable.dispose(); + receiverAsyncClient.close(); + + Thread.sleep(120000); + + ServiceBusReceiverAsyncClient deadletterReceiverAsyncClient + = new ServiceBusClientBuilder() + .connectionString(connectionString) + .receiver() + .queueName(queueName.concat("/$deadletterqueue")) + .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) + .buildAsyncClient(); + disposable = deadletterReceiverAsyncClient.receiveMessages().subscribe(receiveMessage -> { + System.out.printf("\nDeadletter message:\n"); + receiveMessage.getApplicationProperties().keySet().forEach(key -> System.out.printf("\t%s=%s\n", key, receiveMessage.getApplicationProperties().get(key))); + deadletterReceiverAsyncClient.complete(receiveMessage); + }); + Thread.sleep(10000); + disposable.dispose(); + deadletterReceiverAsyncClient.close(); + } + + void receiveMessagesAsync(String connectionString, String queueName) { + ServiceBusReceiverAsyncClient receiverAsyncClient + = new ServiceBusClientBuilder() + .connectionString(connectionString) + .receiver() + .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) + .queueName(queueName) + .buildAsyncClient(); + + receiverAsyncClient.receiveMessages().subscribe(receiveMessage -> { + if (receiveMessage.getSubject() != null && + receiveMessage.getContentType() != null && + receiveMessage.getSubject().contentEquals("Scientist") && + receiveMessage.getContentType().contentEquals("application/json")) { + byte[] body = receiveMessage.getBody().toBytes(); + Map scientist = GSON.fromJson(new String(body, UTF_8), Map.class); + System.out.printf( + "\n\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = %s, \n\t\t\t\t\t\tSequenceNumber = %s, \n\t\t\t\t\t\tEnqueuedTimeUtc = %s," + + "\n\t\t\t\t\t\tExpiresAtUtc = %s, \n\t\t\t\t\t\tContentType = \"%s\", \n\t\t\t\t\t\tContent: [ firstName = %s, name = %s ]\n", + receiveMessage.getMessageId(), + receiveMessage.getSequenceNumber(), + receiveMessage.getEnqueuedTime(), + receiveMessage.getExpiresAt(), + receiveMessage.getContentType(), + scientist != null ? scientist.get("firstName") : "", + scientist != null ? scientist.get("name") : ""); + } else { + receiverAsyncClient.deadLetter(receiveMessage); + } + receiverAsyncClient.complete(receiveMessage); + }); + } + + void PickUpAndFixDeadletters(String connectionString, String queueName, ServiceBusSenderAsyncClient resubmitSender) { + ServiceBusReceiverAsyncClient receiverAsyncClient + = new ServiceBusClientBuilder() + .connectionString(connectionString) + .receiver() + .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) + .queueName(queueName.concat("/$deadletterqueue")) + .buildAsyncClient(); + + receiverAsyncClient.receiveMessages().subscribe(receiveMessage -> { + if (receiveMessage.getSubject() != null && receiveMessage.getSubject().contentEquals("Physicist")) { + ServiceBusMessage resubmitMessage = new ServiceBusMessage(receiveMessage.getBody()); + System.out.printf( + "\n\t\tFixing: \n\t\t\tMessageId = %s, \n\t\t\tSequenceNumber = %s, \n\t\t\tLabel = %s\n", + receiveMessage.getMessageId(), + receiveMessage.getSequenceNumber(), + receiveMessage.getSubject()); + resubmitMessage.setMessageId(receiveMessage.getMessageId()); + resubmitMessage.setSubject("Scientist"); + resubmitMessage.setContentType(receiveMessage.getContentType()); + resubmitMessage.setTimeToLive(Duration.ofMinutes(2)); + resubmitSender.sendMessage(resubmitMessage); + } + receiverAsyncClient.complete(receiveMessage); + }); + } +} From c80e6788e8ccb8bc0b6977ad15d0dbdf0c43d1a7 Mon Sep 17 00:00:00 2001 From: hongli750210 Date: Wed, 9 Jun 2021 09:11:02 +0800 Subject: [PATCH 02/11] Fixed issue#20452 [Service Bus] Sample needed for Dead Letter Queues 202106090910 --- sdk/servicebus/azure-messaging-servicebus/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/pom.xml b/sdk/servicebus/azure-messaging-servicebus/pom.xml index 729c10fe85a2..f432f41d06fe 100644 --- a/sdk/servicebus/azure-messaging-servicebus/pom.xml +++ b/sdk/servicebus/azure-messaging-servicebus/pom.xml @@ -107,7 +107,7 @@ com.google.code.gson gson - [2.8.2,] + 2.8.2 From 1ca8ab7f6d87c7d9f56a797764248835727dc945 Mon Sep 17 00:00:00 2001 From: hongli750210 Date: Wed, 9 Jun 2021 10:10:25 +0800 Subject: [PATCH 03/11] Fix ci exception for issue#20452 [Service Bus] Sample needed for Dead Letter Queues by lihong 202106091010 --- sdk/servicebus/azure-messaging-servicebus/pom.xml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/pom.xml b/sdk/servicebus/azure-messaging-servicebus/pom.xml index f432f41d06fe..d970d2ca835a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/pom.xml +++ b/sdk/servicebus/azure-messaging-servicebus/pom.xml @@ -54,6 +54,11 @@ azure-core-http-netty 1.9.2 + + com.google.code.gson + gson + 2.8.6 + @@ -104,11 +109,6 @@ 3.9.0 test - - com.google.code.gson - gson - 2.8.2 - From 14dae0c724380b1f0ab50ef07d3b811b7a58ec22 Mon Sep 17 00:00:00 2001 From: hongli750210 Date: Wed, 9 Jun 2021 15:20:01 +0800 Subject: [PATCH 04/11] Fixed issue#20452 [Service Bus] Sample needed for Dead Letter Queues by lihong 202106091519 --- .../servicebus/DeadletterQueueSample.java | 44 ++++++++++++++++--- 1 file changed, 37 insertions(+), 7 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java index e492c3592038..02c98b55d3d6 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java @@ -1,10 +1,12 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + package com.azure.messaging.servicebus; import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import org.junit.jupiter.api.Test; -import reactor.core.Disposable; import java.time.Duration; import java.util.HashMap; @@ -13,6 +15,9 @@ import static java.nio.charset.StandardCharsets.UTF_8; +/** + * Sample demonstrates how to dead letter within a Service Bus Queue. + */ public class DeadletterQueueSample { String connectionString = System.getenv("AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING"); String queueName = System.getenv("AZURE_SERVICEBUS_SAMPLE_QUEUE_NAME"); @@ -20,9 +25,10 @@ public class DeadletterQueueSample { static final Gson GSON = new Gson(); /** - * Main method to show how to update properties of Service Bus Queue. + * Main method to show how to dead letter within a Service Bus Queue. * * @param args Unused arguments to the program. + * @throws InterruptedException If the program is unable to sleep while waiting for the receive to complete. */ public static void main(String[] args) throws InterruptedException { DeadletterQueueSample sample = new DeadletterQueueSample(); @@ -30,7 +36,7 @@ public static void main(String[] args) throws InterruptedException { } /** - * run method to invoke this demo on how to dead letter within a Service Bus Queue. + * Run method to invoke this demo on how to dead letter within a Service Bus Queue. * * @throws InterruptedException If the program is unable to sleep while waiting for the receive to complete. */ @@ -54,6 +60,12 @@ public void run() throws InterruptedException { senderAsyncClient.close(); } + /** + * Send {@link ServiceBusMessage messages} to an Azure Service Bus Queue. + * + * @Param senderAsyncClient Service Bus Sender Async Client + * @Param maxMessages Maximum Number Of Messages + */ void sendMessagesAsync(ServiceBusSenderAsyncClient senderAsyncClient, int maxMessages) { List> data = GSON.fromJson( @@ -86,6 +98,13 @@ void sendMessagesAsync(ServiceBusSenderAsyncClient senderAsyncClient, int maxMes } } + /** + * Receive {@link ServiceBusMessage messages} and dead letter within a Service Bus Queue + * + * @Param connectionString Service Bus Connection String + * @Param queueName Queue Name + * @throws InterruptedException If the program is unable to sleep while waiting for the receive to complete. + */ void exceedMaxDelivery(String connectionString, String queueName) throws InterruptedException { ServiceBusReceiverAsyncClient receiverAsyncClient = new ServiceBusClientBuilder() @@ -94,12 +113,11 @@ void exceedMaxDelivery(String connectionString, String queueName) throws Interru .queueName(queueName) .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) .buildAsyncClient(); - Disposable disposable = receiverAsyncClient.receiveMessages().subscribe(receiveMessage -> { + receiverAsyncClient.receiveMessages().subscribe(receiveMessage -> { System.out.printf("Picked up message; DeliveryCount %d\n", receiveMessage.getDeliveryCount()); receiverAsyncClient.abandon(receiveMessage); }); Thread.sleep(10000); - disposable.dispose(); receiverAsyncClient.close(); Thread.sleep(120000); @@ -111,16 +129,21 @@ void exceedMaxDelivery(String connectionString, String queueName) throws Interru .queueName(queueName.concat("/$deadletterqueue")) .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) .buildAsyncClient(); - disposable = deadletterReceiverAsyncClient.receiveMessages().subscribe(receiveMessage -> { + deadletterReceiverAsyncClient.receiveMessages().subscribe(receiveMessage -> { System.out.printf("\nDeadletter message:\n"); receiveMessage.getApplicationProperties().keySet().forEach(key -> System.out.printf("\t%s=%s\n", key, receiveMessage.getApplicationProperties().get(key))); deadletterReceiverAsyncClient.complete(receiveMessage); }); Thread.sleep(10000); - disposable.dispose(); deadletterReceiverAsyncClient.close(); } + /** + * Receive {@link ServiceBusMessage messages} and dead letter within a Service Bus Queue + * + * @Param connectionString Service Bus Connection String + * @Param queueName Queue Name + */ void receiveMessagesAsync(String connectionString, String queueName) { ServiceBusReceiverAsyncClient receiverAsyncClient = new ServiceBusClientBuilder() @@ -154,6 +177,13 @@ void receiveMessagesAsync(String connectionString, String queueName) { }); } + /** + * Receive dead letter {@link ServiceBusMessage messages} and resend its. + * + * @Param connectionString Service Bus Connection String + * @Param queueName Queue Name + * @Param resubmitSender Service Bus Send Async Client + */ void PickUpAndFixDeadletters(String connectionString, String queueName, ServiceBusSenderAsyncClient resubmitSender) { ServiceBusReceiverAsyncClient receiverAsyncClient = new ServiceBusClientBuilder() From 8cf107980d31b7065f8c0dfd196e3141eee19c7e Mon Sep 17 00:00:00 2001 From: hongli750210 Date: Wed, 9 Jun 2021 15:43:34 +0800 Subject: [PATCH 05/11] Fixed issue#20451 [Service Bus] Sample needed for Topic Filters For Java by lihong 2021060911543 --- .../messaging/servicebus/DeadletterQueueSample.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java index 02c98b55d3d6..4c6c14111326 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java @@ -16,7 +16,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; /** - * Sample demonstrates how to dead letter within a Service Bus Queue. + * Sample demonstrates how to dead letter within an Azure Service Bus Queue. */ public class DeadletterQueueSample { String connectionString = System.getenv("AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING"); @@ -25,7 +25,7 @@ public class DeadletterQueueSample { static final Gson GSON = new Gson(); /** - * Main method to show how to dead letter within a Service Bus Queue. + * Main method to show how to dead letter within an Azure Service Bus Queue. * * @param args Unused arguments to the program. * @throws InterruptedException If the program is unable to sleep while waiting for the receive to complete. @@ -36,7 +36,7 @@ public static void main(String[] args) throws InterruptedException { } /** - * Run method to invoke this demo on how to dead letter within a Service Bus Queue. + * Run method to invoke this demo on how to dead letter within an Azure Service Bus Queue. * * @throws InterruptedException If the program is unable to sleep while waiting for the receive to complete. */ @@ -99,7 +99,7 @@ void sendMessagesAsync(ServiceBusSenderAsyncClient senderAsyncClient, int maxMes } /** - * Receive {@link ServiceBusMessage messages} and dead letter within a Service Bus Queue + * Receive {@link ServiceBusMessage messages} and dead letter it within an Azure Service Bus Queue * * @Param connectionString Service Bus Connection String * @Param queueName Queue Name @@ -139,7 +139,7 @@ void exceedMaxDelivery(String connectionString, String queueName) throws Interru } /** - * Receive {@link ServiceBusMessage messages} and dead letter within a Service Bus Queue + * Receive {@link ServiceBusMessage messages} and dead letter its within an Azure Service Bus Queue * * @Param connectionString Service Bus Connection String * @Param queueName Queue Name From affaae683f9e6fcbb02ee126f4e2787232b69916 Mon Sep 17 00:00:00 2001 From: hongli750210 Date: Thu, 10 Jun 2021 14:41:08 +0800 Subject: [PATCH 06/11] Fix comment for issue#20452 [Service Bus] Sample needed for Dead Letter Queues by lihong 202106101441 --- .../azure-messaging-servicebus/pom.xml | 5 - .../servicebus/DeadletterQueueSample.java | 105 ++++++++++-------- 2 files changed, 57 insertions(+), 53 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/pom.xml b/sdk/servicebus/azure-messaging-servicebus/pom.xml index d970d2ca835a..2301d5e47afe 100644 --- a/sdk/servicebus/azure-messaging-servicebus/pom.xml +++ b/sdk/servicebus/azure-messaging-servicebus/pom.xml @@ -54,11 +54,6 @@ azure-core-http-netty 1.9.2 - - com.google.code.gson - gson - 2.8.6 - diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java index 4c6c14111326..7d1d8db3afa4 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java @@ -4,14 +4,14 @@ package com.azure.messaging.servicebus; import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; +import com.nimbusds.oauth2.sdk.ParseException; +import com.nimbusds.oauth2.sdk.util.JSONObjectUtils; +import net.minidev.json.JSONObject; import org.junit.jupiter.api.Test; import java.time.Duration; -import java.util.HashMap; +import java.util.ArrayList; import java.util.List; -import java.util.Map; import static java.nio.charset.StandardCharsets.UTF_8; @@ -22,8 +22,6 @@ public class DeadletterQueueSample { String connectionString = System.getenv("AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING"); String queueName = System.getenv("AZURE_SERVICEBUS_SAMPLE_QUEUE_NAME"); - static final Gson GSON = new Gson(); - /** * Main method to show how to dead letter within an Azure Service Bus Queue. * @@ -42,70 +40,67 @@ public static void main(String[] args) throws InterruptedException { */ @Test public void run() throws InterruptedException { - ServiceBusSenderAsyncClient senderAsyncClient = new ServiceBusClientBuilder() + ServiceBusSenderClient senderClient = new ServiceBusClientBuilder() .connectionString(connectionString) .sender() .queueName(queueName) - .buildAsyncClient(); + .buildClient(); // max delivery-count scenario - sendMessagesAsync(senderAsyncClient, 1); - exceedMaxDelivery(connectionString, queueName); + sendMessagesAsync(senderClient, 1); + deadLetterByExceedingMaxDelivery(connectionString, queueName); // fix-up scenario - sendMessagesAsync(senderAsyncClient, Integer.MAX_VALUE); - this.receiveMessagesAsync(connectionString, queueName); - this.PickUpAndFixDeadletters(connectionString, queueName, senderAsyncClient); + sendMessagesAsync(senderClient, Integer.MAX_VALUE); + this.receiveAndDeadletterMessagesAsync(connectionString, queueName); + this.pickUpAndFixDeadletters(connectionString, queueName, senderClient); - senderAsyncClient.close(); + senderClient.close(); } /** * Send {@link ServiceBusMessage messages} to an Azure Service Bus Queue. * - * @Param senderAsyncClient Service Bus Sender Async Client + * @Param senderAsyncClient Service Bus Sender Client * @Param maxMessages Maximum Number Of Messages */ - void sendMessagesAsync(ServiceBusSenderAsyncClient senderAsyncClient, int maxMessages) { - List> data = - GSON.fromJson( - "[" + - "{'name' = 'Einstein', 'firstName' = 'Albert'}," + - "{'name' = 'Heisenberg', 'firstName' = 'Werner'}," + - "{'name' = 'Curie', 'firstName' = 'Marie'}," + - "{'name' = 'Hawking', 'firstName' = 'Steven'}," + - "{'name' = 'Newton', 'firstName' = 'Isaac'}," + - "{'name' = 'Bohr', 'firstName' = 'Niels'}," + - "{'name' = 'Faraday', 'firstName' = 'Michael'}," + - "{'name' = 'Galilei', 'firstName' = 'Galileo'}," + - "{'name' = 'Kepler', 'firstName' = 'Johannes'}," + - "{'name' = 'Kopernikus', 'firstName' = 'Nikolaus'}" + - "]", - new TypeToken>>() { - }.getType()); - - for (int i = 0; i < Math.min(data.size(), maxMessages); i++) { + void sendMessagesAsync(ServiceBusSenderClient senderAsyncClient, int maxMessages) { + List messageList = new ArrayList(){{ + add(createServiceBusMessage("{\"name\" : \"Einstein\", \"firstName\" : \"Albert\"}")); + add(createServiceBusMessage("{\"name\" : \"Heisenberg\", \"firstName\" : \"Werner\"}")); + add(createServiceBusMessage("{\"name\" : \"Curie\", \"firstName\" : \"Marie\"}")); + add(createServiceBusMessage("{\"name\" : \"Hawking\", \"firstName\" : \"Steven\"}")); + add(createServiceBusMessage("{\"name\" : \"Newton\", \"firstName\" : \"Isaac\"}")); + add(createServiceBusMessage("{\"name\" : \"Bohr\", \"firstName\" : \"Niels\"}")); + add(createServiceBusMessage("{\"name\" : \"Faraday\", \"firstName\" : \"Michael\"}")); + add(createServiceBusMessage("{\"name\" : \"Galilei\", \"firstName\" : \"Galileo\"}")); + add(createServiceBusMessage("{\"name\" : \"Kepler\", \"firstName\" : \"Johannes\"}")); + add(createServiceBusMessage("{\"name\" : \"Kopernikus\", \"firstName\" : \"Nikolaus\"}")); + }}; + for (int i = 0; i < Math.min(messageList.size(), maxMessages); i++) { final String messageId = Integer.toString(i); - ServiceBusMessage message = new ServiceBusMessage(GSON.toJson(data.get(i), Map.class).getBytes(UTF_8)); + ServiceBusMessage message = messageList.get(i); message.setContentType("application/json"); message.setSubject(i % 2 == 0 ? "Scientist" : "Physicist"); message.setMessageId(messageId); message.setTimeToLive(Duration.ofMinutes(2)); - System.out.printf("Message sending: Id = %s\n", message.getMessageId()); - senderAsyncClient.sendMessage(message) - .doOnSuccess(onSuccess -> System.out.printf("\tMessage acknowledged: Id = %s\n", message.getMessageId())) - .block(); + System.out.printf("\tMessage sending: Id = %s\n", message.getMessageId()); + senderAsyncClient.sendMessage(message); + System.out.printf("\tMessage acknowledged: Id = %s\n", message.getMessageId()); } } /** - * Receive {@link ServiceBusMessage messages} and dead letter it within an Azure Service Bus Queue + * Receive {@link ServiceBusMessage messages} and return {@link ServiceBusMessage messages} back to the queue. + * When the time to life of the {@link ServiceBusMessage messages} expires, + * the {@link ServiceBusMessage messages} will be dumped as dead letters into the dead letter queue. + * We can receive these {@link ServiceBusMessage messages} from the dead letter queue. * * @Param connectionString Service Bus Connection String * @Param queueName Queue Name * @throws InterruptedException If the program is unable to sleep while waiting for the receive to complete. */ - void exceedMaxDelivery(String connectionString, String queueName) throws InterruptedException { + void deadLetterByExceedingMaxDelivery(String connectionString, String queueName) throws InterruptedException { ServiceBusReceiverAsyncClient receiverAsyncClient = new ServiceBusClientBuilder() .connectionString(connectionString) @@ -115,6 +110,7 @@ void exceedMaxDelivery(String connectionString, String queueName) throws Interru .buildAsyncClient(); receiverAsyncClient.receiveMessages().subscribe(receiveMessage -> { System.out.printf("Picked up message; DeliveryCount %d\n", receiveMessage.getDeliveryCount()); + // return message back to the queue receiverAsyncClient.abandon(receiveMessage); }); Thread.sleep(10000); @@ -139,12 +135,12 @@ void exceedMaxDelivery(String connectionString, String queueName) throws Interru } /** - * Receive {@link ServiceBusMessage messages} and dead letter its within an Azure Service Bus Queue + * Receive {@link ServiceBusMessage messages} and transfer to the dead letter queue as a dead letter. * * @Param connectionString Service Bus Connection String * @Param queueName Queue Name */ - void receiveMessagesAsync(String connectionString, String queueName) { + void receiveAndDeadletterMessagesAsync(String connectionString, String queueName) { ServiceBusReceiverAsyncClient receiverAsyncClient = new ServiceBusClientBuilder() .connectionString(connectionString) @@ -159,7 +155,12 @@ void receiveMessagesAsync(String connectionString, String queueName) { receiveMessage.getSubject().contentEquals("Scientist") && receiveMessage.getContentType().contentEquals("application/json")) { byte[] body = receiveMessage.getBody().toBytes(); - Map scientist = GSON.fromJson(new String(body, UTF_8), Map.class); + JSONObject jsonObject = null; + try { + jsonObject = JSONObjectUtils.parse(new String(body, UTF_8)); + } catch (ParseException e) { + e.printStackTrace(); + } System.out.printf( "\n\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = %s, \n\t\t\t\t\t\tSequenceNumber = %s, \n\t\t\t\t\t\tEnqueuedTimeUtc = %s," + "\n\t\t\t\t\t\tExpiresAtUtc = %s, \n\t\t\t\t\t\tContentType = \"%s\", \n\t\t\t\t\t\tContent: [ firstName = %s, name = %s ]\n", @@ -168,8 +169,8 @@ void receiveMessagesAsync(String connectionString, String queueName) { receiveMessage.getEnqueuedTime(), receiveMessage.getExpiresAt(), receiveMessage.getContentType(), - scientist != null ? scientist.get("firstName") : "", - scientist != null ? scientist.get("name") : ""); + jsonObject != null ? jsonObject.get("firstName") : "", + jsonObject != null ? jsonObject.get("name") : ""); } else { receiverAsyncClient.deadLetter(receiveMessage); } @@ -182,9 +183,9 @@ void receiveMessagesAsync(String connectionString, String queueName) { * * @Param connectionString Service Bus Connection String * @Param queueName Queue Name - * @Param resubmitSender Service Bus Send Async Client + * @Param resubmitSender Service Bus Send Client */ - void PickUpAndFixDeadletters(String connectionString, String queueName, ServiceBusSenderAsyncClient resubmitSender) { + void pickUpAndFixDeadletters(String connectionString, String queueName, ServiceBusSenderClient resubmitSender) { ServiceBusReceiverAsyncClient receiverAsyncClient = new ServiceBusClientBuilder() .connectionString(connectionString) @@ -210,4 +211,12 @@ void PickUpAndFixDeadletters(String connectionString, String queueName, ServiceB receiverAsyncClient.complete(receiveMessage); }); } + + /** + * Create a {@link ServiceBusMessage} for add to a {@link ServiceBusMessageBatch}. + */ + static ServiceBusMessage createServiceBusMessage(String label) { + ServiceBusMessage message = new ServiceBusMessage(label.getBytes(UTF_8)); + return message; + } } From 2851da9cde96087ba991d295131e2b00eabce14a Mon Sep 17 00:00:00 2001 From: hongli750210 Date: Fri, 11 Jun 2021 12:58:39 +0800 Subject: [PATCH 07/11] Fix ci exception for issue#20452 [Service Bus] Sample needed for Dead Letter Queues by lihong 202106111257 --- .../servicebus/DeadletterQueueSample.java | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java index 7d1d8db3afa4..e32754a992c0 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java @@ -65,18 +65,18 @@ public void run() throws InterruptedException { * @Param maxMessages Maximum Number Of Messages */ void sendMessagesAsync(ServiceBusSenderClient senderAsyncClient, int maxMessages) { - List messageList = new ArrayList(){{ - add(createServiceBusMessage("{\"name\" : \"Einstein\", \"firstName\" : \"Albert\"}")); - add(createServiceBusMessage("{\"name\" : \"Heisenberg\", \"firstName\" : \"Werner\"}")); - add(createServiceBusMessage("{\"name\" : \"Curie\", \"firstName\" : \"Marie\"}")); - add(createServiceBusMessage("{\"name\" : \"Hawking\", \"firstName\" : \"Steven\"}")); - add(createServiceBusMessage("{\"name\" : \"Newton\", \"firstName\" : \"Isaac\"}")); - add(createServiceBusMessage("{\"name\" : \"Bohr\", \"firstName\" : \"Niels\"}")); - add(createServiceBusMessage("{\"name\" : \"Faraday\", \"firstName\" : \"Michael\"}")); - add(createServiceBusMessage("{\"name\" : \"Galilei\", \"firstName\" : \"Galileo\"}")); - add(createServiceBusMessage("{\"name\" : \"Kepler\", \"firstName\" : \"Johannes\"}")); - add(createServiceBusMessage("{\"name\" : \"Kopernikus\", \"firstName\" : \"Nikolaus\"}")); - }}; + List messageList = new ArrayList(); + messageList.add(createServiceBusMessage("{\"name\" : \"Einstein\", \"firstName\" : \"Albert\"}")); + messageList.add(createServiceBusMessage("{\"name\" : \"Heisenberg\", \"firstName\" : \"Werner\"}")); + messageList.add(createServiceBusMessage("{\"name\" : \"Curie\", \"firstName\" : \"Marie\"}")); + messageList.add(createServiceBusMessage("{\"name\" : \"Hawking\", \"firstName\" : \"Steven\"}")); + messageList.add(createServiceBusMessage("{\"name\" : \"Newton\", \"firstName\" : \"Isaac\"}")); + messageList.add(createServiceBusMessage("{\"name\" : \"Bohr\", \"firstName\" : \"Niels\"}")); + messageList.add(createServiceBusMessage("{\"name\" : \"Faraday\", \"firstName\" : \"Michael\"}")); + messageList.add(createServiceBusMessage("{\"name\" : \"Galilei\", \"firstName\" : \"Galileo\"}")); + messageList.add(createServiceBusMessage("{\"name\" : \"Kepler\", \"firstName\" : \"Johannes\"}")); + messageList.add(createServiceBusMessage("{\"name\" : \"Kopernikus\", \"firstName\" : \"Nikolaus\"}")); + for (int i = 0; i < Math.min(messageList.size(), maxMessages); i++) { final String messageId = Integer.toString(i); ServiceBusMessage message = messageList.get(i); @@ -150,10 +150,10 @@ void receiveAndDeadletterMessagesAsync(String connectionString, String queueName .buildAsyncClient(); receiverAsyncClient.receiveMessages().subscribe(receiveMessage -> { - if (receiveMessage.getSubject() != null && - receiveMessage.getContentType() != null && - receiveMessage.getSubject().contentEquals("Scientist") && - receiveMessage.getContentType().contentEquals("application/json")) { + if (receiveMessage.getSubject() != null + && receiveMessage.getContentType() != null + && receiveMessage.getSubject().contentEquals("Scientist") + && receiveMessage.getContentType().contentEquals("application/json")) { byte[] body = receiveMessage.getBody().toBytes(); JSONObject jsonObject = null; try { @@ -162,8 +162,8 @@ void receiveAndDeadletterMessagesAsync(String connectionString, String queueName e.printStackTrace(); } System.out.printf( - "\n\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = %s, \n\t\t\t\t\t\tSequenceNumber = %s, \n\t\t\t\t\t\tEnqueuedTimeUtc = %s," + - "\n\t\t\t\t\t\tExpiresAtUtc = %s, \n\t\t\t\t\t\tContentType = \"%s\", \n\t\t\t\t\t\tContent: [ firstName = %s, name = %s ]\n", + "\n\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = %s, \n\t\t\t\t\t\tSequenceNumber = %s, \n\t\t\t\t\t\tEnqueuedTimeUtc = %s," + + "\n\t\t\t\t\t\tExpiresAtUtc = %s, \n\t\t\t\t\t\tContentType = \"%s\", \n\t\t\t\t\t\tContent: [ firstName = %s, name = %s ]\n", receiveMessage.getMessageId(), receiveMessage.getSequenceNumber(), receiveMessage.getEnqueuedTime(), From 78fac904776cea8558c810b92d5777d26940cbf7 Mon Sep 17 00:00:00 2001 From: hongli750210 Date: Thu, 17 Jun 2021 09:07:18 +0800 Subject: [PATCH 08/11] Fix comment for issue#20452 [Service Bus] Sample needed for Dead Letter Queues by lihong 202106170907 --- .../com/azure/messaging/servicebus/DeadletterQueueSample.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java index e32754a992c0..71b010cb9b2d 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java @@ -64,7 +64,7 @@ public void run() throws InterruptedException { * @Param senderAsyncClient Service Bus Sender Client * @Param maxMessages Maximum Number Of Messages */ - void sendMessagesAsync(ServiceBusSenderClient senderAsyncClient, int maxMessages) { + void sendMessagesAsync(ServiceBusSenderClient senderClient, int maxMessages) { List messageList = new ArrayList(); messageList.add(createServiceBusMessage("{\"name\" : \"Einstein\", \"firstName\" : \"Albert\"}")); messageList.add(createServiceBusMessage("{\"name\" : \"Heisenberg\", \"firstName\" : \"Werner\"}")); @@ -85,7 +85,7 @@ void sendMessagesAsync(ServiceBusSenderClient senderAsyncClient, int maxMessages message.setMessageId(messageId); message.setTimeToLive(Duration.ofMinutes(2)); System.out.printf("\tMessage sending: Id = %s\n", message.getMessageId()); - senderAsyncClient.sendMessage(message); + senderClient.sendMessage(message); System.out.printf("\tMessage acknowledged: Id = %s\n", message.getMessageId()); } } From 8cdca2ab14e5f0799c7c352ebe7269ea65f1281a Mon Sep 17 00:00:00 2001 From: hongli750210 Date: Tue, 29 Jun 2021 18:16:35 +0800 Subject: [PATCH 09/11] Fix comment for issue#20452 [Service Bus] Sample needed for Dead Letter Queues 202106291816 by lihong --- .../com/azure/messaging/servicebus/DeadletterQueueSample.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java index 71b010cb9b2d..57f790bb7961 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java @@ -21,6 +21,7 @@ public class DeadletterQueueSample { String connectionString = System.getenv("AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING"); String queueName = System.getenv("AZURE_SERVICEBUS_SAMPLE_QUEUE_NAME"); + static final ObjectMapper objectMapper = new ObjectMapper(); /** * Main method to show how to dead letter within an Azure Service Bus Queue. @@ -111,7 +112,7 @@ void deadLetterByExceedingMaxDelivery(String connectionString, String queueName) receiverAsyncClient.receiveMessages().subscribe(receiveMessage -> { System.out.printf("Picked up message; DeliveryCount %d\n", receiveMessage.getDeliveryCount()); // return message back to the queue - receiverAsyncClient.abandon(receiveMessage); + receiverAsyncClient.abandon(receiveMessage).subscribe(); }); Thread.sleep(10000); receiverAsyncClient.close(); From a9f6914f77f058357ae20505e0f4c14071b8c16d Mon Sep 17 00:00:00 2001 From: hongli750210 Date: Tue, 29 Jun 2021 18:21:52 +0800 Subject: [PATCH 10/11] Fix comment for issue#20452 [Service Bus] Sample needed for Dead Letter Queues 202106291821 by lihong --- .../servicebus/DeadletterQueueSample.java | 220 ++++++++++-------- 1 file changed, 126 insertions(+), 94 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java index 57f790bb7961..1b84f1a49c00 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java @@ -4,13 +4,13 @@ package com.azure.messaging.servicebus; import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; -import com.nimbusds.oauth2.sdk.ParseException; -import com.nimbusds.oauth2.sdk.util.JSONObjectUtils; -import net.minidev.json.JSONObject; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; import java.time.Duration; -import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import static java.nio.charset.StandardCharsets.UTF_8; @@ -29,7 +29,7 @@ public class DeadletterQueueSample { * @param args Unused arguments to the program. * @throws InterruptedException If the program is unable to sleep while waiting for the receive to complete. */ - public static void main(String[] args) throws InterruptedException { + public static void main(String[] args) throws InterruptedException, JsonProcessingException { DeadletterQueueSample sample = new DeadletterQueueSample(); sample.run(); } @@ -40,7 +40,7 @@ public static void main(String[] args) throws InterruptedException { * @throws InterruptedException If the program is unable to sleep while waiting for the receive to complete. */ @Test - public void run() throws InterruptedException { + public void run() throws InterruptedException, JsonProcessingException { ServiceBusSenderClient senderClient = new ServiceBusClientBuilder() .connectionString(connectionString) .sender() @@ -48,13 +48,13 @@ public void run() throws InterruptedException { .buildClient(); // max delivery-count scenario - sendMessagesAsync(senderClient, 1); + sendMessages(senderClient, 1); deadLetterByExceedingMaxDelivery(connectionString, queueName); // fix-up scenario - sendMessagesAsync(senderClient, Integer.MAX_VALUE); - this.receiveAndDeadletterMessagesAsync(connectionString, queueName); - this.pickUpAndFixDeadletters(connectionString, queueName, senderClient); + sendMessages(senderClient, Integer.MAX_VALUE); + this.receiveAndDeadletterMessagesAsync(connectionString, queueName).block(); + this.pickUpAndFixDeadletters(connectionString, queueName, senderClient).block(); senderClient.close(); } @@ -65,29 +65,29 @@ public void run() throws InterruptedException { * @Param senderAsyncClient Service Bus Sender Client * @Param maxMessages Maximum Number Of Messages */ - void sendMessagesAsync(ServiceBusSenderClient senderClient, int maxMessages) { - List messageList = new ArrayList(); - messageList.add(createServiceBusMessage("{\"name\" : \"Einstein\", \"firstName\" : \"Albert\"}")); - messageList.add(createServiceBusMessage("{\"name\" : \"Heisenberg\", \"firstName\" : \"Werner\"}")); - messageList.add(createServiceBusMessage("{\"name\" : \"Curie\", \"firstName\" : \"Marie\"}")); - messageList.add(createServiceBusMessage("{\"name\" : \"Hawking\", \"firstName\" : \"Steven\"}")); - messageList.add(createServiceBusMessage("{\"name\" : \"Newton\", \"firstName\" : \"Isaac\"}")); - messageList.add(createServiceBusMessage("{\"name\" : \"Bohr\", \"firstName\" : \"Niels\"}")); - messageList.add(createServiceBusMessage("{\"name\" : \"Faraday\", \"firstName\" : \"Michael\"}")); - messageList.add(createServiceBusMessage("{\"name\" : \"Galilei\", \"firstName\" : \"Galileo\"}")); - messageList.add(createServiceBusMessage("{\"name\" : \"Kepler\", \"firstName\" : \"Johannes\"}")); - messageList.add(createServiceBusMessage("{\"name\" : \"Kopernikus\", \"firstName\" : \"Nikolaus\"}")); - + void sendMessages(ServiceBusSenderClient senderClient, int maxMessages) throws JsonProcessingException { + List messageList = Arrays.asList( + new Person("Einstein", "Albert"), + new Person("Heisenberg", "Werner"), + new Person("Curie", "Marie"), + new Person("Hawking", "Steven"), + new Person("Newton", "Isaac"), + new Person("Bohr", "Niels"), + new Person("Faraday", "Michael"), + new Person("Galilei", "Galileo"), + new Person("Kepler", "Johannes"), + new Person("Kopernikus", "Nikolaus") + ); for (int i = 0; i < Math.min(messageList.size(), maxMessages); i++) { final String messageId = Integer.toString(i); - ServiceBusMessage message = messageList.get(i); + ServiceBusMessage message = new ServiceBusMessage(objectMapper.writeValueAsString(messageList.get(i))); message.setContentType("application/json"); message.setSubject(i % 2 == 0 ? "Scientist" : "Physicist"); message.setMessageId(messageId); message.setTimeToLive(Duration.ofMinutes(2)); - System.out.printf("\tMessage sending: Id = %s\n", message.getMessageId()); + System.out.printf("\tMessage sending: Id = %s%n", message.getMessageId()); senderClient.sendMessage(message); - System.out.printf("\tMessage acknowledged: Id = %s\n", message.getMessageId()); + System.out.printf("\tMessage acknowledged: Id = %s%n", message.getMessageId()); } } @@ -102,15 +102,14 @@ void sendMessagesAsync(ServiceBusSenderClient senderClient, int maxMessages) { * @throws InterruptedException If the program is unable to sleep while waiting for the receive to complete. */ void deadLetterByExceedingMaxDelivery(String connectionString, String queueName) throws InterruptedException { - ServiceBusReceiverAsyncClient receiverAsyncClient - = new ServiceBusClientBuilder() + ServiceBusReceiverAsyncClient receiverAsyncClient = new ServiceBusClientBuilder() .connectionString(connectionString) .receiver() .queueName(queueName) .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) .buildAsyncClient(); receiverAsyncClient.receiveMessages().subscribe(receiveMessage -> { - System.out.printf("Picked up message; DeliveryCount %d\n", receiveMessage.getDeliveryCount()); + System.out.printf("Picked up message; DeliveryCount %d%n", receiveMessage.getDeliveryCount()); // return message back to the queue receiverAsyncClient.abandon(receiveMessage).subscribe(); }); @@ -127,9 +126,9 @@ void deadLetterByExceedingMaxDelivery(String connectionString, String queueName) .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) .buildAsyncClient(); deadletterReceiverAsyncClient.receiveMessages().subscribe(receiveMessage -> { - System.out.printf("\nDeadletter message:\n"); - receiveMessage.getApplicationProperties().keySet().forEach(key -> System.out.printf("\t%s=%s\n", key, receiveMessage.getApplicationProperties().get(key))); - deadletterReceiverAsyncClient.complete(receiveMessage); + System.out.printf("%nDeadletter message:%n"); + receiveMessage.getApplicationProperties().keySet().forEach(key -> System.out.printf("\t%s=%s%n", key, receiveMessage.getApplicationProperties().get(key))); + deadletterReceiverAsyncClient.complete(receiveMessage).subscribe(); }); Thread.sleep(10000); deadletterReceiverAsyncClient.close(); @@ -141,41 +140,47 @@ void deadLetterByExceedingMaxDelivery(String connectionString, String queueName) * @Param connectionString Service Bus Connection String * @Param queueName Queue Name */ - void receiveAndDeadletterMessagesAsync(String connectionString, String queueName) { - ServiceBusReceiverAsyncClient receiverAsyncClient - = new ServiceBusClientBuilder() - .connectionString(connectionString) - .receiver() - .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) - .queueName(queueName) - .buildAsyncClient(); + Mono receiveAndDeadletterMessagesAsync(String connectionString, String queueName) { + Mono createReceiver = Mono.fromCallable(() -> { + return new ServiceBusClientBuilder() + .connectionString(connectionString) + .receiver() + .queueName(queueName) + .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) + .buildAsyncClient(); + }); - receiverAsyncClient.receiveMessages().subscribe(receiveMessage -> { - if (receiveMessage.getSubject() != null - && receiveMessage.getContentType() != null - && receiveMessage.getSubject().contentEquals("Scientist") - && receiveMessage.getContentType().contentEquals("application/json")) { - byte[] body = receiveMessage.getBody().toBytes(); - JSONObject jsonObject = null; - try { - jsonObject = JSONObjectUtils.parse(new String(body, UTF_8)); - } catch (ParseException e) { - e.printStackTrace(); + return Mono.usingWhen(createReceiver, receiver -> { + return receiver.receiveMessages().flatMap(receiveMessage -> { + if (receiveMessage.getSubject() != null + && receiveMessage.getContentType() != null + && receiveMessage.getSubject().contentEquals("Scientist") + && receiveMessage.getContentType().contentEquals("application/json")) { + byte[] body = receiveMessage.getBody().toBytes(); + Person person = null; + try { + person = objectMapper.readValue(new String(body, UTF_8), Person.class); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + System.out.printf( + "%n\t\t\t\tMessage received: %n\t\t\t\t\t\tMessageId = %s, %n\t\t\t\t\t\tSequenceNumber = %s, %n\t\t\t\t\t\tEnqueuedTimeUtc = %s," + + "%n\t\t\t\t\t\tExpiresAtUtc = %s, %n\t\t\t\t\t\tContentType = \"%s\", %n\t\t\t\t\t\tContent: [ firstName = %s, name = %s ]%n", + receiveMessage.getMessageId(), + receiveMessage.getSequenceNumber(), + receiveMessage.getEnqueuedTime(), + receiveMessage.getExpiresAt(), + receiveMessage.getContentType(), + person != null ? person.getFirstName() : "", + person != null ? person.getName() : ""); + } else { + return receiver.deadLetter(receiveMessage); } - System.out.printf( - "\n\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = %s, \n\t\t\t\t\t\tSequenceNumber = %s, \n\t\t\t\t\t\tEnqueuedTimeUtc = %s," - + "\n\t\t\t\t\t\tExpiresAtUtc = %s, \n\t\t\t\t\t\tContentType = \"%s\", \n\t\t\t\t\t\tContent: [ firstName = %s, name = %s ]\n", - receiveMessage.getMessageId(), - receiveMessage.getSequenceNumber(), - receiveMessage.getEnqueuedTime(), - receiveMessage.getExpiresAt(), - receiveMessage.getContentType(), - jsonObject != null ? jsonObject.get("firstName") : "", - jsonObject != null ? jsonObject.get("name") : ""); - } else { - receiverAsyncClient.deadLetter(receiveMessage); - } - receiverAsyncClient.complete(receiveMessage); + return receiver.complete(receiveMessage); + }).then(); + }, receiver -> { + receiver.close(); + return Mono.empty(); }); } @@ -186,38 +191,65 @@ void receiveAndDeadletterMessagesAsync(String connectionString, String queueName * @Param queueName Queue Name * @Param resubmitSender Service Bus Send Client */ - void pickUpAndFixDeadletters(String connectionString, String queueName, ServiceBusSenderClient resubmitSender) { - ServiceBusReceiverAsyncClient receiverAsyncClient - = new ServiceBusClientBuilder() - .connectionString(connectionString) - .receiver() - .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) - .queueName(queueName.concat("/$deadletterqueue")) - .buildAsyncClient(); + Mono pickUpAndFixDeadletters(String connectionString, String queueName, ServiceBusSenderClient resubmitSender) { + Mono createReceiver = Mono.fromCallable(() -> { + return new ServiceBusClientBuilder() + .connectionString(connectionString) + .receiver() + .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) + .queueName(queueName.concat("/$deadletterqueue")) + .buildAsyncClient(); + }); - receiverAsyncClient.receiveMessages().subscribe(receiveMessage -> { - if (receiveMessage.getSubject() != null && receiveMessage.getSubject().contentEquals("Physicist")) { - ServiceBusMessage resubmitMessage = new ServiceBusMessage(receiveMessage.getBody()); - System.out.printf( - "\n\t\tFixing: \n\t\t\tMessageId = %s, \n\t\t\tSequenceNumber = %s, \n\t\t\tLabel = %s\n", - receiveMessage.getMessageId(), - receiveMessage.getSequenceNumber(), - receiveMessage.getSubject()); - resubmitMessage.setMessageId(receiveMessage.getMessageId()); - resubmitMessage.setSubject("Scientist"); - resubmitMessage.setContentType(receiveMessage.getContentType()); - resubmitMessage.setTimeToLive(Duration.ofMinutes(2)); - resubmitSender.sendMessage(resubmitMessage); - } - receiverAsyncClient.complete(receiveMessage); + return Mono.usingWhen(createReceiver, receiver -> { + return receiver.receiveMessages().flatMap(receiveMessage -> { + if (receiveMessage.getSubject() != null && receiveMessage.getSubject().contentEquals("Physicist")) { + ServiceBusMessage resubmitMessage = new ServiceBusMessage(receiveMessage.getBody()); + System.out.printf( + "%n\t\tFixing: %n\t\t\tMessageId = %s, %n\t\t\tSequenceNumber = %s, %n\t\t\tLabel = %s%n", + receiveMessage.getMessageId(), + receiveMessage.getSequenceNumber(), + receiveMessage.getSubject()); + resubmitMessage.setMessageId(receiveMessage.getMessageId()); + resubmitMessage.setSubject("Scientist"); + resubmitMessage.setContentType(receiveMessage.getContentType()); + resubmitMessage.setTimeToLive(Duration.ofMinutes(2)); + resubmitSender.sendMessage(resubmitMessage); + } + return receiver.complete(receiveMessage); + }).then(); + }, receiver -> { + receiver.close(); + return Mono.empty(); }); } - /** - * Create a {@link ServiceBusMessage} for add to a {@link ServiceBusMessageBatch}. - */ - static ServiceBusMessage createServiceBusMessage(String label) { - ServiceBusMessage message = new ServiceBusMessage(label.getBytes(UTF_8)); - return message; + private static final class Person { + private String name; + private String firstName; + + Person() { + } + + Person(String name, String firstName) { + this.name = name; + this.firstName = firstName; + } + + public String getName() { + return name; + } + + public String getFirstName() { + return firstName; + } + + public void setName(String name) { + this.name = name; + } + + public void setFirstName(String firstName) { + this.firstName = firstName; + } } } From 000580dd565f65d648cb8baac5afa12a227467f0 Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 30 Jun 2021 15:30:17 -0700 Subject: [PATCH 11/11] Fix sample. --- .../servicebus/DeadletterQueueSample.java | 490 +++++++++++------- 1 file changed, 312 insertions(+), 178 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java index 1b84f1a49c00..a55aa2d0aeab 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/DeadletterQueueSample.java @@ -4,6 +4,8 @@ package com.azure.messaging.servicebus; import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; +import com.azure.messaging.servicebus.models.SubQueue; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.Test; @@ -12,244 +14,376 @@ import java.time.Duration; import java.util.Arrays; import java.util.List; - -import static java.nio.charset.StandardCharsets.UTF_8; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; /** - * Sample demonstrates how to dead letter within an Azure Service Bus Queue. + * The sample demonstrates how to use dead letter queues: + *
    + *
  • + * Scenario 1: Send a message and then retrieve and abandon the message until the maximum delivery count is + * exhausted and the message is automatically dead-lettered. + *
  • + *
  • + * Scenario 2: Send a set of messages, and explicitly dead-letter messages that do not match a certain criterion + * and would therefore not be processed correctly. The messages are then picked up from the dead-letter queue, are + * automatically corrected, and resubmitted. + *
  • + *
+ * + * @see Dead-letter + * Queue */ public class DeadletterQueueSample { - String connectionString = System.getenv("AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING"); - String queueName = System.getenv("AZURE_SERVICEBUS_SAMPLE_QUEUE_NAME"); - static final ObjectMapper objectMapper = new ObjectMapper(); + private final List personList = Arrays.asList( + new Person("Einstein", "Albert"), + new Person("Heisenberg", "Werner"), + new Person("Curie", "Marie"), + new Person("Hawking", "Steven"), + new Person("Newton", "Isaac"), + new Person("Bohr", "Niels"), + new Person("Faraday", "Michael"), + new Person("Galilei", "Galileo"), + new Person("Kepler", "Johannes"), + new Person("Kopernikus", "Nikolaus") + ); /** * Main method to show how to dead letter within an Azure Service Bus Queue. * * @param args Unused arguments to the program. - * @throws InterruptedException If the program is unable to sleep while waiting for the receive to complete. */ - public static void main(String[] args) throws InterruptedException, JsonProcessingException { - DeadletterQueueSample sample = new DeadletterQueueSample(); + public static void main(String[] args) { + final DeadletterQueueSample sample = new DeadletterQueueSample(); sample.run(); } /** * Run method to invoke this demo on how to dead letter within an Azure Service Bus Queue. - * - * @throws InterruptedException If the program is unable to sleep while waiting for the receive to complete. */ @Test - public void run() throws InterruptedException, JsonProcessingException { - ServiceBusSenderClient senderClient = new ServiceBusClientBuilder() - .connectionString(connectionString) - .sender() - .queueName(queueName) - .buildClient(); - - // max delivery-count scenario - sendMessages(senderClient, 1); - deadLetterByExceedingMaxDelivery(connectionString, queueName); - - // fix-up scenario - sendMessages(senderClient, Integer.MAX_VALUE); - this.receiveAndDeadletterMessagesAsync(connectionString, queueName).block(); - this.pickUpAndFixDeadletters(connectionString, queueName, senderClient).block(); - - senderClient.close(); - } + public void run() { + // The connection string value can be obtained by: + // 1. Going to your Service Bus namespace in Azure Portal. + // 2. Go to "Shared access policies" + // 3. Copy the connection string for the "RootManageSharedAccessKey" policy. + // The 'connectionString' format is shown below. + // 1. "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}" + // 2. "<>" will look similar to "{your-namespace}.servicebus.windows.net" + // 3. "queueName" will be the name of the Service Bus queue instance you created + // inside the Service Bus namespace. + final String connectionString = System.getenv("AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING"); + final String queueName = System.getenv("AZURE_SERVICEBUS_SAMPLE_QUEUE_NAME"); - /** - * Send {@link ServiceBusMessage messages} to an Azure Service Bus Queue. - * - * @Param senderAsyncClient Service Bus Sender Client - * @Param maxMessages Maximum Number Of Messages - */ - void sendMessages(ServiceBusSenderClient senderClient, int maxMessages) throws JsonProcessingException { - List messageList = Arrays.asList( - new Person("Einstein", "Albert"), - new Person("Heisenberg", "Werner"), - new Person("Curie", "Marie"), - new Person("Hawking", "Steven"), - new Person("Newton", "Isaac"), - new Person("Bohr", "Niels"), - new Person("Faraday", "Michael"), - new Person("Galilei", "Galileo"), - new Person("Kepler", "Johannes"), - new Person("Kopernikus", "Nikolaus") - ); - for (int i = 0; i < Math.min(messageList.size(), maxMessages); i++) { - final String messageId = Integer.toString(i); - ServiceBusMessage message = new ServiceBusMessage(objectMapper.writeValueAsString(messageList.get(i))); - message.setContentType("application/json"); - message.setSubject(i % 2 == 0 ? "Scientist" : "Physicist"); - message.setMessageId(messageId); - message.setTimeToLive(Duration.ofMinutes(2)); - System.out.printf("\tMessage sending: Id = %s%n", message.getMessageId()); - senderClient.sendMessage(message); - System.out.printf("\tMessage acknowledged: Id = %s%n", message.getMessageId()); + final ServiceBusClientBuilder builder = new ServiceBusClientBuilder().connectionString(connectionString); + + try (ServiceBusSenderClient sender = builder.sender().queueName(queueName).buildClient()) { + // Scenario 1: Dead letters a message by abandoning it until the MaxDeliveryCount is exceeded. + sendMessages(sender, 1); + deadLetterByExceedingMaxDelivery(builder, queueName).block(); + receiveAndCompleteDeadLetterQueueMessages(builder, queueName).block(); + + // Scenario 2: Dead letters a message by explicitly invoking receiver.deadLetter(). + sendMessages(sender, personList.size()); + receiveAndDeadletterMessages(builder, queueName).block(); + receiveAndFixDeadLetterQueueMessages(builder, queueName, sender).block(); } } /** - * Receive {@link ServiceBusMessage messages} and return {@link ServiceBusMessage messages} back to the queue. - * When the time to life of the {@link ServiceBusMessage messages} expires, - * the {@link ServiceBusMessage messages} will be dumped as dead letters into the dead letter queue. - * We can receive these {@link ServiceBusMessage messages} from the dead letter queue. + * Sends {@link ServiceBusMessage messages} to an Azure Service Bus Queue. * - * @Param connectionString Service Bus Connection String - * @Param queueName Queue Name - * @throws InterruptedException If the program is unable to sleep while waiting for the receive to complete. + * @param sender Sender client. + * @param maxMessages Maximum number of messages to send. */ - void deadLetterByExceedingMaxDelivery(String connectionString, String queueName) throws InterruptedException { - ServiceBusReceiverAsyncClient receiverAsyncClient = new ServiceBusClientBuilder() - .connectionString(connectionString) - .receiver() - .queueName(queueName) - .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) - .buildAsyncClient(); - receiverAsyncClient.receiveMessages().subscribe(receiveMessage -> { - System.out.printf("Picked up message; DeliveryCount %d%n", receiveMessage.getDeliveryCount()); - // return message back to the queue - receiverAsyncClient.abandon(receiveMessage).subscribe(); - }); - Thread.sleep(10000); - receiverAsyncClient.close(); - - Thread.sleep(120000); - - ServiceBusReceiverAsyncClient deadletterReceiverAsyncClient - = new ServiceBusClientBuilder() - .connectionString(connectionString) - .receiver() - .queueName(queueName.concat("/$deadletterqueue")) - .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) - .buildAsyncClient(); - deadletterReceiverAsyncClient.receiveMessages().subscribe(receiveMessage -> { - System.out.printf("%nDeadletter message:%n"); - receiveMessage.getApplicationProperties().keySet().forEach(key -> System.out.printf("\t%s=%s%n", key, receiveMessage.getApplicationProperties().get(key))); - deadletterReceiverAsyncClient.complete(receiveMessage).subscribe(); - }); - Thread.sleep(10000); - deadletterReceiverAsyncClient.close(); + private void sendMessages(ServiceBusSenderClient sender, int maxMessages) { + final int numberOfMessages = Math.min(personList.size(), maxMessages); + + final List serviceBusMessages = IntStream.range(0, numberOfMessages) + .mapToObj(index -> { + final Person person = personList.get(index); + + return new ServiceBusMessage(person.toJson()) + .setContentType("application/json") + .setSubject(index % 2 == 0 ? "Scientist" : "Physicist") + .setMessageId(Integer.toString(index)) + .setTimeToLive(Duration.ofMinutes(2)); + }).collect(Collectors.toList()); + + sender.sendMessages(serviceBusMessages); } /** - * Receive {@link ServiceBusMessage messages} and transfer to the dead letter queue as a dead letter. + * Scenario 1: Part 1 * - * @Param connectionString Service Bus Connection String - * @Param queueName Queue Name + *

+ * Receive {@link ServiceBusMessage messages} and return the {@link ServiceBusMessage messages} back to the queue. + * When the max number of deliveries for each {@link ServiceBusMessage message} expires, then it is moved into the + * dead letter queue. + *

+ * + * @param builder Service Bus client builder. + * @param queueName Name of the queue to receive from. + * + * @return A Mono that completes when all messages in queue have been processed.(because a message has not been + * received in the last 30 seconds). */ - Mono receiveAndDeadletterMessagesAsync(String connectionString, String queueName) { - Mono createReceiver = Mono.fromCallable(() -> { - return new ServiceBusClientBuilder() - .connectionString(connectionString) - .receiver() + private Mono deadLetterByExceedingMaxDelivery(ServiceBusClientBuilder builder, String queueName) { + // This Mono creates an async receiver, and continues receiving from that queue until it has not seen a message + // for 30 seconds. + // + // When it receives any messages, it abandons them so they are returned to the queue to be re-received again. + // When a message is abandoned, its delivery count is incremented. If the delivery count exceeds the + // MaxDeliveryCount for a queue, the message is placed in the deadletter queue for that particular queue. + return Mono.using(() -> { + return builder.receiver() .queueName(queueName) .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) .buildAsyncClient(); - }); + }, receiver -> { + // Continue to receive messages until no message has been seen for 30 seconds. + return receiver.receiveMessages() + .timeout(Duration.ofSeconds(30)) + .flatMap(message -> { + System.out.printf("Received message. Sequence # %s; DeliveryCount %d%n", message.getSequenceNumber(), + message.getDeliveryCount()); - return Mono.usingWhen(createReceiver, receiver -> { - return receiver.receiveMessages().flatMap(receiveMessage -> { - if (receiveMessage.getSubject() != null - && receiveMessage.getContentType() != null - && receiveMessage.getSubject().contentEquals("Scientist") - && receiveMessage.getContentType().contentEquals("application/json")) { - byte[] body = receiveMessage.getBody().toBytes(); - Person person = null; - try { - person = objectMapper.readValue(new String(body, UTF_8), Person.class); - } catch (JsonProcessingException e) { - e.printStackTrace(); - } - System.out.printf( - "%n\t\t\t\tMessage received: %n\t\t\t\t\t\tMessageId = %s, %n\t\t\t\t\t\tSequenceNumber = %s, %n\t\t\t\t\t\tEnqueuedTimeUtc = %s," - + "%n\t\t\t\t\t\tExpiresAtUtc = %s, %n\t\t\t\t\t\tContentType = \"%s\", %n\t\t\t\t\t\tContent: [ firstName = %s, name = %s ]%n", - receiveMessage.getMessageId(), - receiveMessage.getSequenceNumber(), - receiveMessage.getEnqueuedTime(), - receiveMessage.getExpiresAt(), - receiveMessage.getContentType(), - person != null ? person.getFirstName() : "", - person != null ? person.getName() : ""); - } else { - return receiver.deadLetter(receiveMessage); - } - return receiver.complete(receiveMessage); - }).then(); + return receiver.abandon(message); + }) + .onErrorResume(TimeoutException.class, exception -> { + System.out.println("No messages received after 30 seconds. Queue is empty."); + return Mono.empty(); + }) + .then(); }, receiver -> { + // When the receiving operation is completed, close the client. receiver.close(); - return Mono.empty(); }); } /** - * Receive dead letter {@link ServiceBusMessage messages} and resend its. + * Scenario 1: Part 2 + * + *

+ * This method continues to receive messages from the dead letter queue, then completes them. + *

+ * + * @param builder Service Bus client builder. + * @param queueName Name of the queue to receive from. * - * @Param connectionString Service Bus Connection String - * @Param queueName Queue Name - * @Param resubmitSender Service Bus Send Client + * @return A Mono that completes when all messages in queue have been processed (because a message has not been + * received in the last 30 seconds). */ - Mono pickUpAndFixDeadletters(String connectionString, String queueName, ServiceBusSenderClient resubmitSender) { - Mono createReceiver = Mono.fromCallable(() -> { - return new ServiceBusClientBuilder() - .connectionString(connectionString) - .receiver() - .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) - .queueName(queueName.concat("/$deadletterqueue")) + private Mono receiveAndCompleteDeadLetterQueueMessages(ServiceBusClientBuilder builder, String queueName) { + return Mono.using(() -> { + return builder.receiver() + .queueName(queueName) + .subQueue(SubQueue.DEAD_LETTER_QUEUE) .buildAsyncClient(); + }, deadLetterQueueReceiver -> { + // Continue to receive messages until no message has been seen for 30 seconds. + return deadLetterQueueReceiver.receiveMessages() + .timeout(Duration.ofSeconds(30)) + .flatMap(message -> { + System.out.printf("Received message from dead-letter queue. Sequence #: %s. DeliveryCount %d%n", + message.getSequenceNumber(), message.getDeliveryCount()); + + // When messages are dead lettered, there are system properties that can be set to denote the + // reason why via fields such as dead-letter reason and dead-letter description. + System.out.printf("Dead-Letter Reason: %s. Description: %s. Source: %s%n", + message.getDeadLetterReason(), message.getDeadLetterErrorDescription(), + message.getDeadLetterSource()); + + System.out.println("Application properties:"); + message.getApplicationProperties().forEach((key, value) -> + System.out.printf("\t%s=%s%n", key, value)); + + // We complete the messages we received from the queue. + // In a real example, you may do some sort of computation to understand why previous attempts to + // process the message failed. This is shown in Scenario 2. + return deadLetterQueueReceiver.complete(message); + }) + .onErrorResume(TimeoutException.class, exception -> { + System.out.println("No messages received after 30 seconds. Dead-letter queue is empty."); + return Mono.empty(); + }) + .then(); + }, deadLetterQueueReceiver -> { + // When the receiving operation is completed, close the client. + deadLetterQueueReceiver.close(); }); + } - return Mono.usingWhen(createReceiver, receiver -> { - return receiver.receiveMessages().flatMap(receiveMessage -> { - if (receiveMessage.getSubject() != null && receiveMessage.getSubject().contentEquals("Physicist")) { - ServiceBusMessage resubmitMessage = new ServiceBusMessage(receiveMessage.getBody()); - System.out.printf( - "%n\t\tFixing: %n\t\t\tMessageId = %s, %n\t\t\tSequenceNumber = %s, %n\t\t\tLabel = %s%n", - receiveMessage.getMessageId(), - receiveMessage.getSequenceNumber(), - receiveMessage.getSubject()); - resubmitMessage.setMessageId(receiveMessage.getMessageId()); - resubmitMessage.setSubject("Scientist"); - resubmitMessage.setContentType(receiveMessage.getContentType()); - resubmitMessage.setTimeToLive(Duration.ofMinutes(2)); - resubmitSender.sendMessage(resubmitMessage); - } - return receiver.complete(receiveMessage); - }).then(); - }, receiver -> { - receiver.close(); - return Mono.empty(); + /** + * Scenario 2: Part 1 + * + *

+ * Receives {@link ServiceBusMessage messages} and dead letters them if it has a subject of "Scientist" and content + * type of "application/json". This is to simulate that the message may be malformed or didn't contain the right + * content, so we dead letter it so other receivers don't process this message. + *

+ * + * @param builder Service Bus client builder. + * @param queueName Name of queue to receive messages from. + * + * @return A Mono that completes when all the messages in the queue have been processed (because a message has not + * been received in the last 30 seconds). + */ + private Mono receiveAndDeadletterMessages(ServiceBusClientBuilder builder, String queueName) { + return Mono.using( + () -> { + // Creates the async receiver. + return builder.receiver() + .queueName(queueName) + .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) + .buildAsyncClient(); + }, + receiver -> { + // The scope of the receiver resource is to receive messages for a duration of `receiveDuration` then + // complete or dead-letter the message based on its content-type and subject. + return receiver.receiveMessages() + .timeout(Duration.ofSeconds(30)) + .flatMap(message -> { + final String subject = message.getSubject(); + final String contentType = message.getContentType(); + final Person person; + try { + person = Person.fromJson(message.getBody().toString()); + } catch (RuntimeException e) { + return Mono.error(new RuntimeException("Could not deserialize message: " + + message.getSequenceNumber(), e)); + } + + System.out.printf("Received message. SequenceNumber = %s. EnqueuedTimeUtc = %s. " + + "ExpiresAtUtc = %s. ContentType = %s. Content: [ %s ]%n", + message.getSequenceNumber(), message.getEnqueuedTime(), message.getExpiresAt(), + message.getContentType(), person); + + // Simulates that something could not be processed in the message, so we dead letter it. + if ("Scientist".equals(subject) && "application/json".equals(contentType)) { + return receiver.complete(message); + } else { + return receiver.deadLetter(message); + } + }) + .onErrorResume(TimeoutException.class, exception -> { + System.out.println("No messages received after 30 seconds. Queue is empty."); + return Mono.empty(); + }) + .then(); + }, + receiver -> { + // When the receiving operation is completed, close the client. + receiver.close(); + }); + } + + /** + * Scenario 2: Part 2 + * + *

+ * Receives {@link ServiceBusMessage messages} from the dead letter queue, it fixes up the message then resends the + * fixed message to the queue again. This simulates messages that may have errors in them, were dead-lettered, + * and reprocessed in the dead-letter queue so the data is correct again. + *

+ * + * @param builder Service Bus client builder. + * @param queueName Name of queue to receive messages from. + * @param resubmitSender Service Bus sender client. When messages are fixed, they are published via this sender. + * + * @return A Mono that completes when all the messages in the queue have been processed (because a message has not + * been received in the last 30 seconds). + */ + private Mono receiveAndFixDeadLetterQueueMessages(ServiceBusClientBuilder builder, String queueName, + ServiceBusSenderClient resubmitSender) { + + return Mono.using(() -> { + return builder.receiver() + .queueName(queueName) + .subQueue(SubQueue.DEAD_LETTER_QUEUE) + .buildAsyncClient(); + }, deadLetterQueueReceiver -> { + // Continue to receive messages until no message has been seen for 30 seconds. + return deadLetterQueueReceiver.receiveMessages() + .timeout(Duration.ofSeconds(30)) + .flatMap(message -> { + if ("Physicist".equals(message.getSubject())) { + System.out.printf("Fixing DLQ message. MessageId = %s. SequenceNumber = %s. Subject = %s%n", + message.getMessageId(), message.getSequenceNumber(), message.getSubject()); + + // Create a copy of this message and then set the subject to the correct one. + ServiceBusMessage resubmitMessage = new ServiceBusMessage(message) + .setSubject("Scientist"); + + // Sending that corrected message. + resubmitSender.sendMessage(resubmitMessage); + } else { + System.out.printf("Message resubmission is not required. MessageId = %s. SequenceNumber = %s. " + + "Subject = %s%n", + message.getMessageId(), message.getSequenceNumber(), message.getSubject()); + } + + return deadLetterQueueReceiver.complete(message); + }) + .onErrorResume(TimeoutException.class, exception -> { + System.out.println("No messages received after 30 seconds. Dead-letter queue is empty."); + return Mono.empty(); + }) + .then(); + }, deadLetterQueueReceiver -> { + // When the receiving operation is completed, close the client. + deadLetterQueueReceiver.close(); }); } private static final class Person { - private String name; - private String firstName; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - Person() { - } + private final String lastName; + private final String firstName; - Person(String name, String firstName) { - this.name = name; + Person(@JsonProperty String lastName, @JsonProperty String firstName) { + this.lastName = lastName; this.firstName = firstName; } - public String getName() { - return name; + String getLastName() { + return lastName; } - public String getFirstName() { + String getFirstName() { return firstName; } - public void setName(String name) { - this.name = name; + /** + * Serializes an item into its JSON string equivalent. + * + * @return The JSON representation. + * + * @throws RuntimeException if the person could not be serialized. + */ + String toJson() { + try { + return OBJECT_MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException("Could not serialize object.", e); + } } - public void setFirstName(String firstName) { - this.firstName = firstName; + /** + * Deserializes a JSON string into a Person. + * + * @return The corresponding person. + * + * @throws RuntimeException if the JSON string could not be deserialized. + */ + private static Person fromJson(String json) { + try { + return OBJECT_MAPPER.readValue(json, Person.class); + } catch (JsonProcessingException e) { + throw new RuntimeException("Could not deserialize object.", e); + } } } }