Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug Service Bus binder consumer cannot be automatically created Topic/Subscriptions #30722

Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions sdk/spring/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ This section includes changes in `spring-cloud-azure-autoconfigure` module.
#### Features Added
- Support auto start-up for the auto-configured Service Bus Processor Client by enabling a new property of `spring.cloud.azure.servicebus.processor.auto-startup`. [#29997](https://github.com/Azure/azure-sdk-for-java/issues/29997)

### Spring Cloud Azure Resource Manager

#### Bugs Fixed
- Fix the Service Bus stream binder cannot automatically create Topic/Subscriptions from consumer. [#30722](https://github.com/Azure/azure-sdk-for-java/pull/30722).

## 4.3.0 (2022-06-29)
- This release is compatible with Spring Boot 2.5.0-2.5.14, 2.6.0-2.6.9, 2.7.0-2.7.1. (Note: 2.5.x (x>14), 2.6.y (y>9) and 2.7.z (z>1) should be supported, but they aren't tested with this release.)
- This release is compatible with Spring Cloud 2020.0.3-2020.0.5, 2021.0.0-2021.0.3. (Note: 2020.0.x (x>5) and 2021.0.y (y>3) should be supported, but they aren't tested with this release.)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.core.management.exception.ManagementException;
import com.azure.resourcemanager.AzureResourceManager;
import com.azure.resourcemanager.servicebus.models.ServiceBusSubscription;
import com.azure.resourcemanager.servicebus.models.Topic;
import com.azure.spring.cloud.core.properties.resource.AzureResourceMetadata;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;
Expand All @@ -16,11 +17,20 @@
public class ServiceBusTopicSubscriptionCrud extends AbstractResourceCrud<ServiceBusSubscription,
Tuple3<String, String, String>> {


private ServiceBusTopicCrud serviceBusTopicCrud;
public ServiceBusTopicSubscriptionCrud(AzureResourceManager azureResourceManager,
AzureResourceMetadata azureResourceMetadata) {
this(azureResourceManager, azureResourceMetadata,
new ServiceBusTopicCrud(azureResourceManager, azureResourceMetadata));
}

ServiceBusTopicSubscriptionCrud(AzureResourceManager azureResourceManager,
AzureResourceMetadata azureResourceMetadata,
ServiceBusTopicCrud serviceBusTopicCrud) {
super(azureResourceManager, azureResourceMetadata);
this.serviceBusTopicCrud = serviceBusTopicCrud;
}

@Override
String getResourceName(Tuple3<String, String, String> key) {
return key.getT3();
Expand All @@ -34,8 +44,9 @@ String getResourceType() {
@Override
public ServiceBusSubscription internalGet(Tuple3<String, String, String> subscriptionCoordinate) {
try {
return new ServiceBusTopicCrud(this.resourceManager, this.resourceMetadata)
.get(Tuples.of(subscriptionCoordinate.getT1(), subscriptionCoordinate.getT2()))
Topic topic = this.serviceBusTopicCrud
.get(Tuples.of(subscriptionCoordinate.getT1(), subscriptionCoordinate.getT2()));
return topic == null ? null : topic
.subscriptions()
.getByName(subscriptionCoordinate.getT2());
} catch (ManagementException e) {
Expand All @@ -49,7 +60,7 @@ public ServiceBusSubscription internalGet(Tuple3<String, String, String> subscri

@Override
public ServiceBusSubscription internalCreate(Tuple3<String, String, String> subscriptionCoordinate) {
return new ServiceBusTopicCrud(this.resourceManager, this.resourceMetadata)
return this.serviceBusTopicCrud
.getOrCreate(Tuples.of(subscriptionCoordinate.getT1(), subscriptionCoordinate.getT2()))
.subscriptions()
.define(subscriptionCoordinate.getT3())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.spring.cloud.resourcemanager.implementation.crud;

import com.azure.core.management.exception.ManagementException;
import com.azure.resourcemanager.servicebus.models.ServiceBusNamespace;
import com.azure.resourcemanager.servicebus.models.ServiceBusNamespaces;
import com.azure.resourcemanager.servicebus.models.ServiceBusSubscription;
import com.azure.resourcemanager.servicebus.models.ServiceBusSubscriptions;
import com.azure.resourcemanager.servicebus.models.Topic;
import com.azure.resourcemanager.servicebus.models.Topics;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class ServiceBusTopicSubscriptionCrudTests extends AbstractResourceCrudTests<ServiceBusSubscription,
Tuple3<String, String, String>> {

private static final String NAMESPACE = "namespace";
private static final String TOPIC_NAME = "topic";
private static final String SUBSCRIPTION_NAME = "subscription";

@Override
AbstractResourceCrud<ServiceBusSubscription, Tuple3<String, String, String>> getResourceCrud() {
return new ServiceBusTopicSubscriptionCrud(resourceManager, resourceMetadata);
}

@Override
void getStubManagementException(int statusCode, String message) {
ServiceBusNamespaces namespaces = mock(ServiceBusNamespaces.class);
ServiceBusNamespace namespace = mock(ServiceBusNamespace.class);

when(resourceManager.serviceBusNamespaces()).thenReturn(namespaces);
ManagementException exception = getManagementException(statusCode, message);
when(namespaces.getByResourceGroup(resourceMetadata.getResourceGroup(), getKey().getT1()))
.thenReturn(namespace);

Topics topics = mock(Topics.class);
Topic topic = mock(Topic.class);
ServiceBusSubscriptions serviceBusSubscriptions = mock(ServiceBusSubscriptions.class);
when(namespace.topics()).thenReturn(topics);
when(topics.getByName(getKey().getT2())).thenReturn(topic);
when(topic.subscriptions()).thenReturn(serviceBusSubscriptions);
when(serviceBusSubscriptions.getByName(getKey().getT2())).thenThrow(exception);
hui1110 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
void createStubManagementException() {
ServiceBusNamespaces namespaces = mock(ServiceBusNamespaces.class);
ServiceBusNamespace namespace = mock(ServiceBusNamespace.class);
when(resourceManager.serviceBusNamespaces()).thenReturn(namespaces);
ManagementException exception = getManagementException(500, "Create service bus namespace exception");
when(namespaces.getByResourceGroup(resourceMetadata.getResourceGroup(), getKey().getT1()))
.thenReturn(namespace);

Topics topics = mock(Topics.class);
Topic topic = mock(Topic.class);
ServiceBusSubscriptions serviceBusSubscriptions = mock(ServiceBusSubscriptions.class);
when(namespace.topics()).thenReturn(topics);
when(topics.getByName(getKey().getT2())).thenReturn(topic);
when(topic.subscriptions()).thenReturn(serviceBusSubscriptions);
when(serviceBusSubscriptions.getByName(getKey().getT3())).thenThrow(exception);
hui1110 marked this conversation as resolved.
Show resolved Hide resolved

ServiceBusTopicCrud serviceBusTopicCrud = mock(ServiceBusTopicCrud.class);
when(serviceBusTopicCrud.get(Tuples.of(NAMESPACE, TOPIC_NAME))).thenReturn(null);

ServiceBusSubscription.DefinitionStages.Blank define = mock(ServiceBusSubscription.DefinitionStages.Blank.class);
when(serviceBusSubscriptions.define(SUBSCRIPTION_NAME)).thenReturn(define);
when(define.create()).thenThrow(exception);

ServiceBusSubscription.DefinitionStages.WithCreate create = mock(ServiceBusSubscription.DefinitionStages.WithCreate.class);
when(create.create()).thenThrow(exception);
}

@Override
Tuple3<String, String, String> getKey() {
return Tuples.of(NAMESPACE, TOPIC_NAME, SUBSCRIPTION_NAME);
}

@Test
void topicDoesNotExistReturnNull() {
ServiceBusTopicCrud topicCrud = mock(ServiceBusTopicCrud.class);
ServiceBusTopicSubscriptionCrud topicSubCrud = new ServiceBusTopicSubscriptionCrud(this.resourceManager,
this.resourceMetadata, topicCrud);

when(topicCrud.get(Tuples.of(NAMESPACE, TOPIC_NAME))).thenReturn(null);

hui1110 marked this conversation as resolved.
Show resolved Hide resolved
ServiceBusSubscription actualGet = topicSubCrud.get(getKey());
Assertions.assertNull(actualGet);
}
hui1110 marked this conversation as resolved.
Show resolved Hide resolved

@Test
void topicDoesNotExistReturnNullToCreate() {
ServiceBusTopicCrud topicCrud = mock(ServiceBusTopicCrud.class);
ServiceBusTopicSubscriptionCrud topicSubCrud = new ServiceBusTopicSubscriptionCrud(this.resourceManager,
this.resourceMetadata, topicCrud);

Topic topic = mock(Topic.class);
ServiceBusSubscriptions serviceBusSubscriptions = mock(ServiceBusSubscriptions.class);
when(topic.subscriptions()).thenReturn(serviceBusSubscriptions);

ServiceBusSubscription.DefinitionStages.Blank define =
mock(ServiceBusSubscription.DefinitionStages.Blank.class);
when(serviceBusSubscriptions.define(SUBSCRIPTION_NAME)).thenReturn(define);
ServiceBusSubscription serviceBusSubscription = mock(ServiceBusSubscription.class);
when(define.create()).thenReturn(serviceBusSubscription);
when(topicCrud.get(Tuples.of(NAMESPACE, TOPIC_NAME))).thenReturn(null);
when(topicCrud.getOrCreate(Tuples.of(NAMESPACE, TOPIC_NAME))).thenReturn(topic);

ServiceBusSubscription actualGet = topicSubCrud.get(getKey());
ServiceBusSubscription actualCreate = topicSubCrud.create(getKey());
Assertions.assertNull(actualGet);
Assertions.assertNotNull(actualCreate);
Assertions.assertEquals(serviceBusSubscription, actualCreate);
}

}