Skip to content

Commit

Permalink
Support callback service listeners anyway when listeners added. (#13136)
Browse files Browse the repository at this point in the history
* Support callback service listeners anyway when listeners added.

* Fix UT.
  • Loading branch information
KomachiSion authored Feb 26, 2025
1 parent e6f46f0 commit f03cd12
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,8 @@ private ServiceInfo getServiceInfoBySubscribe(String serviceName, String groupNa
return serviceInfo;
}

private ServiceInfo tryToSubscribe(String serviceName, String groupName, ServiceInfo cachedServiceInfo) throws NacosException {
private ServiceInfo tryToSubscribe(String serviceName, String groupName, ServiceInfo cachedServiceInfo)
throws NacosException {
// not found in cache, service never subscribed.
if (null == cachedServiceInfo) {
return clientProxy.subscribe(serviceName, groupName, StringUtils.EMPTY);
Expand Down Expand Up @@ -475,8 +476,8 @@ private void doSubscribe(String serviceName, String groupName, String clusters,
return;
}
NamingSelectorWrapper wrapper = new NamingSelectorWrapper(serviceName, groupName, clusters, selector, listener);
notifyIfSubscribed(serviceName, groupName, wrapper);
changeNotifier.registerListener(groupName, serviceName, wrapper);
notifyIfSubscribed(serviceName, groupName, wrapper);
clientProxy.subscribe(serviceName, groupName, Constants.NULL);
}

Expand Down Expand Up @@ -583,8 +584,9 @@ private void checkAndStripGroupNamePrefix(Instance instance, String groupName) t
}
}

private void notifyIfSubscribed(String serviceName, String groupName, NamingSelectorWrapper wrapper) {
if (changeNotifier.isSubscribed(groupName, serviceName)) {
private void notifyIfSubscribed(String serviceName, String groupName, NamingSelectorWrapper wrapper)
throws NacosException {
if (clientProxy.isSubscribed(serviceName, groupName, StringUtils.EMPTY)) {
NAMING_LOGGER.warn(
"Duplicate subscribe for groupName: {}, serviceName: {}; directly use current cached to notify.",
groupName, serviceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.alibaba.nacos.client.selector.ListenerInvoker;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.alibaba.nacos.client.utils.LogUtils.NAMING_LOGGER;

Expand All @@ -34,12 +35,15 @@ public class NamingListenerInvoker implements ListenerInvoker<NamingEvent> {

private final EventListener listener;

private final AtomicBoolean invoked = new AtomicBoolean(false);

public NamingListenerInvoker(EventListener listener) {
this.listener = listener;
}

@Override
public void invoke(NamingEvent event) {
invoked.set(true);
logInvoke(event);
if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) {
((AbstractEventListener) listener).getExecutor().execute(() -> listener.onEvent(event));
Expand All @@ -53,6 +57,11 @@ private void logInvoke(NamingEvent event) {
event.getServiceName(), listener.toString());
}

@Override
public boolean isInvoked() {
return invoked.get();
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,27 @@ public void notifyListener(T event) {
}
E newEvent = buildListenerEvent(event);
if (isCallable(newEvent)) {
listener.invoke(newEvent);
// lock listener to make sure isInvoked is thread safe.
synchronized (listener) {
listener.invoke(newEvent);
}
}
}

/**
* Notify listener If the listener is not invoked.
*
* @param event original event
*/
public void notifyIfListenerIfNotNotified(T event) {
if (!isSelectable(event)) {
return;
}
E newEvent = buildListenerEvent(event);
synchronized (listener) {
if (!listener.isInvoked()) {
listener.invoke(newEvent);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,11 @@ public interface ListenerInvoker<E> {
* @param event event
*/
void invoke(E event);

/**
* Mark the listener whether invoked once. It should return {@code true} after {@link #invoke(E)} called at lease once.
*
* @return {@code true} if this listener has invoked at least once, {@code false} otherwise
*/
boolean isInvoked();
}
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ void testSubscribeWithNullListener() throws NacosException {
@Test
void testSubscribeDuplicate() throws NacosException {
String serviceName = "service1";
when(changeNotifier.isSubscribed(Constants.DEFAULT_GROUP, serviceName)).thenReturn(true);
when(proxy.isSubscribed(serviceName, Constants.DEFAULT_GROUP, StringUtils.EMPTY)).thenReturn(true);
ServiceInfo serviceInfo = new ServiceInfo(Constants.DEFAULT_GROUP + "@@" + serviceName);
serviceInfo.addHost(new Instance());
when(serviceInfoHolder.getServiceInfo(serviceName, Constants.DEFAULT_GROUP, "")).thenReturn(serviceInfo);
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
</issueManagement>

<properties>
<revision>2.5.0</revision>
<revision>2.5.1-SNAPSHOT</revision>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!-- Compiler settings properties -->
Expand Down

0 comments on commit f03cd12

Please sign in to comment.