Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address spring-messaging-azure-service api review comments #27770

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -287,8 +288,11 @@ private ServiceBusProcessorFactory getProcessorFactory() {

processorFactoryCustomizers.forEach(customizer -> customizer.customize(this.processorFactory));

this.processorFactory.addListener((name, subscription, client) -> {
String instrumentationName = name + "/" + getGroup(subscription);
this.processorFactory.addListener((name, client) -> {
String subscriptionName = client.getSubscriptionName();
boolean isTopic = StringUtils.hasText(subscriptionName);
String entityName = isTopic ? client.getTopicName() : client.getQueueName();
String instrumentationName = entityName + "/" + getGroup(subscriptionName);
Instrumentation instrumentation = new ServiceBusProcessorInstrumentation(instrumentationName, CONSUMER, Duration.ofMinutes(2));
instrumentation.setStatus(Instrumentation.Status.UP);
instrumentationManager.addHealthInstrumentation(instrumentation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,17 @@ interface Listener {
* The callback method that the processor has been added.
*
* @param name the name for the processor.
* @param subscription the subscription for the processor.
* @param client the client for the processor.
*/
void processorAdded(String name, String subscription, ServiceBusProcessorClient client);
void processorAdded(String name, ServiceBusProcessorClient client);

/**
* The default callback method that the processor has been removed.
*
* @param name the name for the processor.
* @param subscription the subscription for the processor.
* @param client the client for the processor.
*/
default void processorRemoved(String name, String subscription, ServiceBusProcessorClient client) {
default void processorRemoved(String name, ServiceBusProcessorClient client) {
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import com.azure.core.credential.TokenCredential;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
import com.azure.spring.cloud.core.implementation.util.AzureSpringIdentifier;
import com.azure.spring.cloud.core.credential.AzureCredentialResolver;
import com.azure.spring.cloud.core.customizer.AzureServiceClientBuilderCustomizer;
import com.azure.spring.cloud.core.implementation.util.AzureSpringIdentifier;
import com.azure.spring.cloud.service.implementation.servicebus.factory.ServiceBusProcessorClientBuilderFactory;
import com.azure.spring.cloud.service.implementation.servicebus.factory.ServiceBusSessionProcessorClientBuilderFactory;
import com.azure.spring.cloud.service.listener.MessageListener;
Expand Down Expand Up @@ -199,7 +199,7 @@ private ServiceBusProcessorClient doCreateProcessor(String name,
client = builder.buildProcessorClient();
}

this.listeners.forEach(l -> l.processorAdded(k.getDestination(), k.getGroup(), client));
this.listeners.forEach(l -> l.processorAdded(buildProcessorName(k), client));
return client;
});
}
Expand Down Expand Up @@ -228,7 +228,7 @@ public void setDefaultAzureCredential(TokenCredential defaultAzureCredential) {
private void close(Map<ConsumerIdentifier, ServiceBusProcessorClient> map, Consumer<ServiceBusProcessorClient> close) {
map.forEach((t, p) -> {
try {
listeners.forEach(l -> l.processorRemoved(t.getDestination(), t.getGroup(), p));
listeners.forEach(l -> l.processorRemoved(buildProcessorName(t), p));
close.accept(p);
} catch (Exception ex) {
LOGGER.warn("Failed to clean service bus queue client factory", ex);
Expand Down Expand Up @@ -288,6 +288,11 @@ private void customizeBuilder(String entityName, String subscription,
.forEach(customizer -> customizer.getSessionCustomizer().customize(builder));
}

private String buildProcessorName(ConsumerIdentifier k) {
String group = k.getGroup();
return k.getDestination() + "/" + (group == null ? "" : group);
}

public static class ServiceBusProcessClientBuilderCustomizer {

private final AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusProcessorClientBuilder> noneSessionCustomizer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ void setUp() {
this.processorFactory = new DefaultServiceBusNamespaceProcessorFactory(namespaceProperties);
queueProcessorAddedTimes = 0;
topicProcessorAddedTimes = 0;
this.processorFactory.addListener((name, subscription, client) -> {
if (subscription == null) {
this.processorFactory.addListener((name, client) -> {
if (client.getSubscriptionName() == null) {
queueProcessorAddedTimes++;
} else {
topicProcessorAddedTimes++;
Expand Down