From 4d88e1786cba6683b68783f62613f88da25ebf4c Mon Sep 17 00:00:00 2001 From: zongtanghu Date: Mon, 13 Jul 2020 16:51:07 +0800 Subject: [PATCH 1/5] [ISSUE##2859]Replace some usage of EventDispatcher for ConfigCacheService and LongPollingService. --- .../model/event/LocalDataChangeEvent.java | 4 +- .../server/service/ConfigCacheService.java | 89 ++++++++++--------- .../server/service/LongPollingService.java | 70 ++++++++------- 3 files changed, 86 insertions(+), 77 deletions(-) diff --git a/config/src/main/java/com/alibaba/nacos/config/server/model/event/LocalDataChangeEvent.java b/config/src/main/java/com/alibaba/nacos/config/server/model/event/LocalDataChangeEvent.java index 493783ff600..9a99c4e5d70 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/model/event/LocalDataChangeEvent.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/model/event/LocalDataChangeEvent.java @@ -16,7 +16,7 @@ package com.alibaba.nacos.config.server.model.event; -import com.alibaba.nacos.config.server.utils.event.EventDispatcher.Event; +import com.alibaba.nacos.common.notify.Event; import java.util.List; @@ -25,7 +25,7 @@ * * @author Nacos */ -public class LocalDataChangeEvent implements Event { +public class LocalDataChangeEvent extends Event { public final String groupKey; diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/ConfigCacheService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/ConfigCacheService.java index 40983c0361b..0a525b28566 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/ConfigCacheService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/ConfigCacheService.java @@ -16,6 +16,7 @@ package com.alibaba.nacos.config.server.service; +import com.alibaba.nacos.common.notify.NotifyCenter; import com.alibaba.nacos.common.utils.MD5Utils; import com.alibaba.nacos.config.server.constant.Constants; import com.alibaba.nacos.config.server.model.CacheItem; @@ -26,7 +27,6 @@ import com.alibaba.nacos.config.server.utils.GroupKey; import com.alibaba.nacos.config.server.utils.GroupKey2; import com.alibaba.nacos.config.server.utils.PropertyUtil; -import com.alibaba.nacos.config.server.utils.event.EventDispatcher; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,12 +67,12 @@ public static boolean hasGroupKey(String groupKey) { /** * Save config file and update md5 value in cache. * - * @param dataId dataId string value. - * @param group group string value. - * @param tenant tenant string value. - * @param content content string value. + * @param dataId dataId string value. + * @param group group string value. + * @param tenant tenant string value. + * @param content content string value. * @param lastModifiedTs lastModifiedTs. - * @param type file type. + * @param type file type. * @return dumpChange success or not. */ public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs, @@ -120,12 +120,12 @@ public static boolean dump(String dataId, String group, String tenant, String co /** * Save config file and update md5 value in cache. * - * @param dataId dataId string value. - * @param group group string value. - * @param tenant tenant string value. - * @param content content string value. + * @param dataId dataId string value. + * @param group group string value. + * @param tenant tenant string value. + * @param content content string value. * @param lastModifiedTs lastModifiedTs. - * @param betaIps betaIps string value. + * @param betaIps betaIps string value. * @return dumpChange success or not. */ public static boolean dumpBeta(String dataId, String group, String tenant, String content, long lastModifiedTs, @@ -165,12 +165,12 @@ public static boolean dumpBeta(String dataId, String group, String tenant, Strin /** * Save config file and update md5 value in cache. * - * @param dataId dataId string value. - * @param group group string value. - * @param tenant tenant string value. - * @param content content string value. + * @param dataId dataId string value. + * @param group group string value. + * @param tenant tenant string value. + * @param content content string value. * @param lastModifiedTs lastModifiedTs. - * @param tag tag string value. + * @param tag tag string value. * @return dumpChange success or not. */ public static boolean dumpTag(String dataId, String group, String tenant, String tag, String content, @@ -209,10 +209,10 @@ public static boolean dumpTag(String dataId, String group, String tenant, String /** * Save config file and update md5 value in cache. * - * @param dataId dataId string value. - * @param group group string value. - * @param tenant tenant string value. - * @param content content string value. + * @param dataId dataId string value. + * @param group group string value. + * @param tenant tenant string value. + * @param content content string value. * @param lastModifiedTs lastModifiedTs. * @return dumpChange success or not. */ @@ -313,6 +313,7 @@ public static void reloadConfig() { /** * Check md5. + * * @return return diff result list. */ public static List checkMd5() { @@ -343,20 +344,20 @@ public static List checkMd5() { * Delete config file, and delete cache. * * @param dataId dataId string value. - * @param group group string value. + * @param group group string value. * @param tenant tenant string value. * @return remove success or not. */ public static boolean remove(String dataId, String group, String tenant) { final String groupKey = GroupKey2.getKey(dataId, group, tenant); final int lockResult = tryWriteLock(groupKey); - + // If data is non-existent. if (0 == lockResult) { DUMP_LOG.info("[remove-ok] {} not exist.", groupKey); return true; } - + // try to lock failed if (lockResult < 0) { DUMP_LOG.warn("[remove-error] write lock failed. {}", groupKey); @@ -368,7 +369,7 @@ public static boolean remove(String dataId, String group, String tenant) { DiskUtil.removeConfigInfo(dataId, group, tenant); } CACHE.remove(groupKey); - EventDispatcher.fireEvent(new LocalDataChangeEvent(groupKey)); + NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey)); return true; } finally { @@ -380,7 +381,7 @@ public static boolean remove(String dataId, String group, String tenant) { * Delete beta config file, and delete cache. * * @param dataId dataId string value. - * @param group group string value. + * @param group group string value. * @param tenant tenant string value. * @return remove success or not. */ @@ -393,7 +394,7 @@ public static boolean removeBeta(String dataId, String group, String tenant) { DUMP_LOG.info("[remove-ok] {} not exist.", groupKey); return true; } - + // try to lock failed if (lockResult < 0) { DUMP_LOG.warn("[remove-error] write lock failed. {}", groupKey); @@ -404,7 +405,7 @@ public static boolean removeBeta(String dataId, String group, String tenant) { if (!PropertyUtil.isDirectRead()) { DiskUtil.removeConfigInfo4Beta(dataId, group, tenant); } - EventDispatcher.fireEvent(new LocalDataChangeEvent(groupKey, true, CACHE.get(groupKey).getIps4Beta())); + NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey, true, CACHE.get(groupKey).getIps4Beta())); CACHE.get(groupKey).setBeta(false); CACHE.get(groupKey).setIps4Beta(null); CACHE.get(groupKey).setMd54Beta(Constants.NULL); @@ -418,21 +419,21 @@ public static boolean removeBeta(String dataId, String group, String tenant) { * Delete tag config file, and delete cache. * * @param dataId dataId string value. - * @param group group string value. + * @param group group string value. * @param tenant tenant string value. - * @param tag tag string value. + * @param tag tag string value. * @return remove success or not. */ public static boolean removeTag(String dataId, String group, String tenant, String tag) { final String groupKey = GroupKey2.getKey(dataId, group, tenant); final int lockResult = tryWriteLock(groupKey); - + // If data is non-existent. if (0 == lockResult) { DUMP_LOG.info("[remove-ok] {} not exist.", groupKey); return true; } - + // try to lock failed if (lockResult < 0) { DUMP_LOG.warn("[remove-error] write lock failed. {}", groupKey); @@ -447,7 +448,7 @@ public static boolean removeTag(String dataId, String group, String tenant, Stri CacheItem ci = CACHE.get(groupKey); ci.tagMd5.remove(tag); ci.tagLastModifiedTs.remove(tag); - EventDispatcher.fireEvent(new LocalDataChangeEvent(groupKey, false, null, tag)); + NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey, false, null, tag)); return true; } finally { releaseWriteLock(groupKey); @@ -457,8 +458,8 @@ public static boolean removeTag(String dataId, String group, String tenant, Stri /** * Update md5 value. * - * @param groupKey groupKey string value. - * @param md5 md5 string value. + * @param groupKey groupKey string value. + * @param md5 md5 string value. * @param lastModifiedTs lastModifiedTs long value. */ public static void updateMd5(String groupKey, String md5, long lastModifiedTs) { @@ -466,16 +467,16 @@ public static void updateMd5(String groupKey, String md5, long lastModifiedTs) { if (cache.md5 == null || !cache.md5.equals(md5)) { cache.md5 = md5; cache.lastModifiedTs = lastModifiedTs; - EventDispatcher.fireEvent(new LocalDataChangeEvent(groupKey)); + NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey)); } } /** * Update Beta md5 value. * - * @param groupKey groupKey string value. - * @param md5 md5 string value. - * @param ips4Beta ips4Beta List. + * @param groupKey groupKey string value. + * @param md5 md5 string value. + * @param ips4Beta ips4Beta List. * @param lastModifiedTs lastModifiedTs long value. */ public static void updateBetaMd5(String groupKey, String md5, List ips4Beta, long lastModifiedTs) { @@ -485,16 +486,16 @@ public static void updateBetaMd5(String groupKey, String md5, List ips4B cache.md54Beta = md5; cache.lastModifiedTs4Beta = lastModifiedTs; cache.ips4Beta = ips4Beta; - EventDispatcher.fireEvent(new LocalDataChangeEvent(groupKey, true, ips4Beta)); + NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey, true, ips4Beta)); } } /** * Update tag md5 value. * - * @param groupKey groupKey string value. - * @param tag tag string value. - * @param md5 md5 string value. + * @param groupKey groupKey string value. + * @param tag tag string value. + * @param md5 md5 string value. * @param lastModifiedTs lastModifiedTs long value. */ public static void updateTagMd5(String groupKey, String tag, String md5, long lastModifiedTs) { @@ -510,13 +511,13 @@ public static void updateTagMd5(String groupKey, String tag, String md5, long la } else { cache.tagLastModifiedTs.put(tag, lastModifiedTs); } - EventDispatcher.fireEvent(new LocalDataChangeEvent(groupKey, false, null, tag)); + NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey, false, null, tag)); return; } if (cache.tagMd5.get(tag) == null || !cache.tagMd5.get(tag).equals(md5)) { cache.tagMd5.put(tag, md5); cache.tagLastModifiedTs.put(tag, lastModifiedTs); - EventDispatcher.fireEvent(new LocalDataChangeEvent(groupKey, false, null, tag)); + NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey, false, null, tag)); } } diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java index f6035356cc1..0f237aafb86 100755 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java @@ -16,6 +16,9 @@ package com.alibaba.nacos.config.server.service; +import com.alibaba.nacos.common.notify.Event; +import com.alibaba.nacos.common.notify.NotifyCenter; +import com.alibaba.nacos.common.notify.listener.Subscriber; import com.alibaba.nacos.common.utils.CollectionUtils; import com.alibaba.nacos.common.utils.ExceptionUtil; import com.alibaba.nacos.config.server.model.SampleResult; @@ -25,8 +28,6 @@ import com.alibaba.nacos.config.server.utils.LogUtil; import com.alibaba.nacos.config.server.utils.MD5Util; import com.alibaba.nacos.config.server.utils.RequestUtil; -import com.alibaba.nacos.config.server.utils.event.EventDispatcher.AbstractEventListener; -import com.alibaba.nacos.config.server.utils.event.EventDispatcher.Event; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; @@ -61,7 +62,7 @@ * @author Nacos */ @Service -public class LongPollingService extends AbstractEventListener { +public class LongPollingService { private static final int FIXED_POLLING_INTERVAL_MS = 10000; @@ -126,8 +127,8 @@ public SampleResult getSubscribleInfoByIp(String clientIp) { } /** - * Aggregate the sampling IP and monitoring configuration information in the sampling results. - * There is no problem for the merging strategy to cover the previous one with the latter. + * Aggregate the sampling IP and monitoring configuration information in the sampling results. There is no problem + * for the merging strategy to cover the previous one with the latter. * * @param sampleResults sample Results. * @return Results. @@ -147,6 +148,7 @@ public SampleResult mergeSampleResult(List sampleResults) { /** * Collect application subscribe configinfos. + * * @return configinfos results. */ public Map> collectApplicationSubscribeConfigInfos() { @@ -232,9 +234,9 @@ private ClientLongPolling getClientPollingRecord(String clientIp) { /** * Add LongPollingClient. * - * @param req HttpServletRequest. - * @param rsp HttpServletResponse. - * @param clientMd5Map clientMd5Map. + * @param req HttpServletRequest. + * @param rsp HttpServletResponse. + * @param clientMd5Map clientMd5Map. * @param probeRequestSize probeRequestSize. */ public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map clientMd5Map, @@ -245,7 +247,7 @@ public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER); String tag = req.getHeader("Vipserver-Tag"); int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500); - + // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout. long timeout = Math.max(10000, Long.parseLong(str) - delayTime); if (isFixedPolling()) { @@ -279,25 +281,6 @@ public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag)); } - @Override - public List> interest() { - List> eventTypes = new ArrayList>(); - eventTypes.add(LocalDataChangeEvent.class); - return eventTypes; - } - - @Override - public void onEvent(Event event) { - if (isFixedPolling()) { - // Ignore. - } else { - if (event instanceof LocalDataChangeEvent) { - LocalDataChangeEvent evt = (LocalDataChangeEvent) event; - scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps)); - } - } - } - public static boolean isSupportLongPolling(HttpServletRequest req) { return null != req.getHeader(LONG_POLLING_HEADER); } @@ -316,6 +299,31 @@ public Thread newThread(Runnable r) { } }); scheduler.scheduleWithFixedDelay(new StatTask(), 0L, 10L, TimeUnit.SECONDS); + + // Register LocalDataChangeEvent to NotifyCenter. + NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize); + + // Register A Subscriber. + NotifyCenter.registerSubscriber(new Subscriber() { + + @Override + public void onEvent(Event event) { + if (isFixedPolling()) { + // Ignore. + } else { + if (event instanceof LocalDataChangeEvent) { + LocalDataChangeEvent evt = (LocalDataChangeEvent) event; + scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps)); + } + } + } + + @Override + public Class subscribeType() { + return LocalDataChangeEvent.class; + } + }); + } public static final String LONG_POLLING_HEADER = "Long-Pulling-Timeout"; @@ -403,7 +411,7 @@ public void run() { public void run() { try { getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis()); - + // Delete subsciber's relations. allSubs.remove(ClientLongPolling.this); @@ -439,7 +447,7 @@ public void run() { } void sendResponse(List changedGroups) { - + // Cancel time out task. if (null != asyncTimeoutFuture) { asyncTimeoutFuture.cancel(false); @@ -449,7 +457,7 @@ void sendResponse(List changedGroups) { void generateResponse(List changedGroups) { if (null == changedGroups) { - + // Tell web container to send http response. asyncContext.complete(); return; From 43e765516c7267fd2a6c4e7bcdc96d5c38e8cee3 Mon Sep 17 00:00:00 2001 From: zongtanghu Date: Mon, 13 Jul 2020 17:49:48 +0800 Subject: [PATCH 2/5] [ISSUE##2859]Replace some usage of EventDispatcher for AsyncNotifyService and ConfigChangePublisher. --- .../model/event/ConfigDataChangeEvent.java | 4 +- .../server/service/ConfigChangePublisher.java | 5 +- .../server/service/LongPollingService.java | 2 +- .../service/merge/MergeTaskProcessor.java | 19 ++--- .../service/notify/AsyncNotifyService.java | 73 ++++++++++--------- .../service/ConfigChangePublisherTest.java | 28 ++++--- 6 files changed, 70 insertions(+), 61 deletions(-) diff --git a/config/src/main/java/com/alibaba/nacos/config/server/model/event/ConfigDataChangeEvent.java b/config/src/main/java/com/alibaba/nacos/config/server/model/event/ConfigDataChangeEvent.java index 66be77ffeb5..3d816932a8f 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/model/event/ConfigDataChangeEvent.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/model/event/ConfigDataChangeEvent.java @@ -16,7 +16,7 @@ package com.alibaba.nacos.config.server.model.event; -import com.alibaba.nacos.config.server.utils.event.EventDispatcher.Event; +import com.alibaba.nacos.common.notify.Event; import org.apache.commons.lang3.StringUtils; /** @@ -24,7 +24,7 @@ * * @author Nacos */ -public class ConfigDataChangeEvent implements Event { +public class ConfigDataChangeEvent extends Event { public final boolean isBeta; diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/ConfigChangePublisher.java b/config/src/main/java/com/alibaba/nacos/config/server/service/ConfigChangePublisher.java index d9279218de4..99fdeaf0b95 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/ConfigChangePublisher.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/ConfigChangePublisher.java @@ -16,9 +16,9 @@ package com.alibaba.nacos.config.server.service; +import com.alibaba.nacos.common.notify.NotifyCenter; import com.alibaba.nacos.config.server.model.event.ConfigDataChangeEvent; import com.alibaba.nacos.config.server.utils.PropertyUtil; -import com.alibaba.nacos.config.server.utils.event.EventDispatcher; import com.alibaba.nacos.core.utils.ApplicationUtils; /** @@ -30,13 +30,14 @@ public class ConfigChangePublisher { /** * Notify ConfigChange. + * * @param event ConfigDataChangeEvent instance. */ public static void notifyConfigChange(ConfigDataChangeEvent event) { if (PropertyUtil.isEmbeddedStorage() && !ApplicationUtils.getStandaloneMode()) { return; } - EventDispatcher.fireEvent(event); + NotifyCenter.publishEvent(event); } } diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java index 0f237aafb86..a028324a766 100755 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java @@ -303,7 +303,7 @@ public Thread newThread(Runnable r) { // Register LocalDataChangeEvent to NotifyCenter. NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize); - // Register A Subscriber. + // Register A Subscriber to subscribe LocalDataChangeEvent. NotifyCenter.registerSubscriber(new Subscriber() { @Override diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeTaskProcessor.java b/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeTaskProcessor.java index 1e9f4fe6d01..0ec8667153d 100755 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeTaskProcessor.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/merge/MergeTaskProcessor.java @@ -16,6 +16,7 @@ package com.alibaba.nacos.config.server.service.merge; +import com.alibaba.nacos.common.notify.NotifyCenter; import com.alibaba.nacos.config.server.constant.Constants; import com.alibaba.nacos.config.server.manager.AbstractTask; import com.alibaba.nacos.config.server.manager.TaskProcessor; @@ -27,7 +28,6 @@ import com.alibaba.nacos.config.server.service.trace.ConfigTraceService; import com.alibaba.nacos.config.server.utils.ContentUtils; import com.alibaba.nacos.config.server.utils.TimeUtils; -import com.alibaba.nacos.config.server.utils.event.EventDispatcher; import com.alibaba.nacos.core.utils.InetUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -79,8 +79,9 @@ public boolean process(String taskType, AbstractTask task) { persistService.insertOrUpdate(null, null, cf, time, null); - LOGGER.info("[merge-ok] {}, {}, size={}, length={}, md5={}, content={}", dataId, group, datumList.size(), - cf.getContent().length(), cf.getMd5(), ContentUtils.truncateContent(cf.getContent())); + LOGGER.info("[merge-ok] {}, {}, size={}, length={}, md5={}, content={}", dataId, group, + datumList.size(), cf.getContent().length(), cf.getMd5(), + ContentUtils.truncateContent(cf.getContent())); ConfigTraceService .logPersistenceEvent(dataId, group, tenant, null, time.getTime(), InetUtils.getSelfIp(), @@ -93,14 +94,14 @@ public boolean process(String taskType, AbstractTask task) { persistService.removeConfigInfoTag(dataId, group, tenant, tag, clientIp, null); } - LOGGER.warn("[merge-delete] delete config info because no datum. dataId=" + dataId + ", groupId=" + group); + LOGGER.warn( + "[merge-delete] delete config info because no datum. dataId=" + dataId + ", groupId=" + group); ConfigTraceService .logPersistenceEvent(dataId, group, tenant, null, time.getTime(), InetUtils.getSelfIp(), ConfigTraceService.PERSISTENCE_EVENT_REMOVE, null); } - - EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime())); + NotifyCenter.publishEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime())); } catch (Exception e) { mergeService.addMergeTask(dataId, group, tenant, mergeTask.getClientIp()); @@ -113,9 +114,9 @@ public boolean process(String taskType, AbstractTask task) { /** * merge datumList {@link ConfigInfoAggr}. * - * @param dataId data id - * @param group group - * @param tenant tenant + * @param dataId data id + * @param group group + * @param tenant tenant * @param datumList datumList * @return {@link ConfigInfo} */ diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java index d53febeada4..a4d3546b2bf 100644 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/notify/AsyncNotifyService.java @@ -16,14 +16,15 @@ package com.alibaba.nacos.config.server.service.notify; +import com.alibaba.nacos.common.notify.Event; +import com.alibaba.nacos.common.notify.NotifyCenter; +import com.alibaba.nacos.common.notify.listener.Subscriber; import com.alibaba.nacos.config.server.constant.Constants; import com.alibaba.nacos.config.server.monitor.MetricsMonitor; import com.alibaba.nacos.config.server.model.event.ConfigDataChangeEvent; import com.alibaba.nacos.config.server.service.trace.ConfigTraceService; import com.alibaba.nacos.config.server.utils.LogUtil; import com.alibaba.nacos.config.server.utils.PropertyUtil; -import com.alibaba.nacos.config.server.utils.event.EventDispatcher.AbstractEventListener; -import com.alibaba.nacos.config.server.utils.event.EventDispatcher.Event; import com.alibaba.nacos.core.cluster.Member; import com.alibaba.nacos.core.cluster.ServerMemberManager; import com.alibaba.nacos.core.utils.ApplicationUtils; @@ -45,10 +46,8 @@ import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.text.MessageFormat; -import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; -import java.util.List; import java.util.Queue; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -62,42 +61,46 @@ * @author Nacos */ @Service -public class AsyncNotifyService extends AbstractEventListener { - - @Override - public List> interest() { - List> types = new ArrayList>(); - // Trigger configuration change synchronization notification - types.add(ConfigDataChangeEvent.class); - return types; - } - - @Override - public void onEvent(Event event) { - - // Generate ConfigDataChangeEvent concurrently - if (event instanceof ConfigDataChangeEvent) { - ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event; - long dumpTs = evt.lastModifiedTs; - String dataId = evt.dataId; - String group = evt.group; - String tenant = evt.tenant; - String tag = evt.tag; - Collection ipList = memberManager.allMembers(); - - // In fact, any type of queue here can be - Queue queue = new LinkedList(); - for (Member member : ipList) { - queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(), evt.isBeta)); - } - EXECUTOR.execute(new AsyncTask(httpclient, queue)); - } - } +public class AsyncNotifyService { @Autowired public AsyncNotifyService(ServerMemberManager memberManager) { this.memberManager = memberManager; httpclient.start(); + + // Register ConfigDataChangeEvent to NotifyCenter. + NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize); + + // Register A Subscriber to subscribe ConfigDataChangeEvent. + NotifyCenter.registerSubscriber(new Subscriber() { + + @Override + public void onEvent(Event event) { + // Generate ConfigDataChangeEvent concurrently + if (event instanceof ConfigDataChangeEvent) { + ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event; + long dumpTs = evt.lastModifiedTs; + String dataId = evt.dataId; + String group = evt.group; + String tenant = evt.tenant; + String tag = evt.tag; + Collection ipList = memberManager.allMembers(); + + // In fact, any type of queue here can be + Queue queue = new LinkedList(); + for (Member member : ipList) { + queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(), + evt.isBeta)); + } + EXECUTOR.execute(new AsyncTask(httpclient, queue)); + } + } + + @Override + public Class subscribeType() { + return ConfigDataChangeEvent.class; + } + }); } public Executor getExecutor() { diff --git a/config/src/test/java/com/alibaba/nacos/config/server/service/ConfigChangePublisherTest.java b/config/src/test/java/com/alibaba/nacos/config/server/service/ConfigChangePublisherTest.java index a6e97ab74d9..d685215bfb8 100644 --- a/config/src/test/java/com/alibaba/nacos/config/server/service/ConfigChangePublisherTest.java +++ b/config/src/test/java/com/alibaba/nacos/config/server/service/ConfigChangePublisherTest.java @@ -16,33 +16,36 @@ package com.alibaba.nacos.config.server.service; +import com.alibaba.nacos.common.notify.Event; +import com.alibaba.nacos.common.notify.NotifyCenter; +import com.alibaba.nacos.common.notify.listener.Subscriber; import com.alibaba.nacos.config.server.model.event.ConfigDataChangeEvent; import com.alibaba.nacos.config.server.utils.PropertyUtil; -import com.alibaba.nacos.config.server.utils.event.EventDispatcher; import com.alibaba.nacos.core.utils.ApplicationUtils; import org.junit.Assert; import org.junit.Test; -import java.util.Collections; -import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; public class ConfigChangePublisherTest { @Test - public void testConfigChangeNotify() { + public void testConfigChangeNotify() throws InterruptedException { AtomicReference reference = new AtomicReference<>(); - EventDispatcher.addEventListener(new EventDispatcher.AbstractEventListener() { + NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize); + NotifyCenter.registerSubscriber(new Subscriber() { + @Override - public List> interest() { - return Collections.singletonList(ConfigDataChangeEvent.class); + public void onEvent(Event event) { + reference.set((ConfigDataChangeEvent) event); } @Override - public void onEvent(EventDispatcher.Event event) { - reference.set((ConfigDataChangeEvent) event); + public Class subscribeType() { + return ConfigDataChangeEvent.class; } }); @@ -52,33 +55,34 @@ public void onEvent(EventDispatcher.Event event) { ConfigChangePublisher .notifyConfigChange(new ConfigDataChangeEvent("chuntaojun", "chuntaojun", System.currentTimeMillis())); + Thread.sleep(2000); Assert.assertNotNull(reference.get()); reference.set(null); // nacos is standalone mode and use external storage ApplicationUtils.setIsStandalone(true); PropertyUtil.setEmbeddedStorage(false); - ConfigChangePublisher .notifyConfigChange(new ConfigDataChangeEvent("chuntaojun", "chuntaojun", System.currentTimeMillis())); + Thread.sleep(2000); Assert.assertNotNull(reference.get()); reference.set(null); // nacos is cluster mode and use embedded storage ApplicationUtils.setIsStandalone(false); PropertyUtil.setEmbeddedStorage(true); - ConfigChangePublisher .notifyConfigChange(new ConfigDataChangeEvent("chuntaojun", "chuntaojun", System.currentTimeMillis())); + Thread.sleep(2000); Assert.assertNull(reference.get()); reference.set(null); // nacos is cluster mode and use external storage ApplicationUtils.setIsStandalone(false); PropertyUtil.setEmbeddedStorage(false); - ConfigChangePublisher .notifyConfigChange(new ConfigDataChangeEvent("chuntaojun", "chuntaojun", System.currentTimeMillis())); + Thread.sleep(2000); Assert.assertNotNull(reference.get()); reference.set(null); } From c414bc03f9e190fdf945edb141e75226fa8ab7a4 Mon Sep 17 00:00:00 2001 From: zongtanghu Date: Mon, 13 Jul 2020 18:52:23 +0800 Subject: [PATCH 3/5] [ISSUE#3179]fix typo. --- .../nacos/config/server/service/ConfigChangePublisherTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/config/src/test/java/com/alibaba/nacos/config/server/service/ConfigChangePublisherTest.java b/config/src/test/java/com/alibaba/nacos/config/server/service/ConfigChangePublisherTest.java index d685215bfb8..224e1e6fa68 100644 --- a/config/src/test/java/com/alibaba/nacos/config/server/service/ConfigChangePublisherTest.java +++ b/config/src/test/java/com/alibaba/nacos/config/server/service/ConfigChangePublisherTest.java @@ -25,7 +25,6 @@ import org.junit.Assert; import org.junit.Test; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; public class ConfigChangePublisherTest { From 31af7d7f4051a4ab2032ded58a8f181b5f2c69e3 Mon Sep 17 00:00:00 2001 From: zongtanghu Date: Mon, 13 Jul 2020 19:08:43 +0800 Subject: [PATCH 4/5] [ISSUE#3179]fix typo. --- .../server/service/LongPollingService.java | 21 +------------------ 1 file changed, 1 insertion(+), 20 deletions(-) diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java index bf6ab09a47d..37d3e1f4c3a 100755 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java @@ -278,25 +278,6 @@ public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag)); } - @Override - public List> interest() { - List> eventTypes = new ArrayList>(); - eventTypes.add(LocalDataChangeEvent.class); - return eventTypes; - } - - @Override - public void onEvent(Event event) { - if (isFixedPolling()) { - // Ignore. - } else { - if (event instanceof LocalDataChangeEvent) { - LocalDataChangeEvent evt = (LocalDataChangeEvent) event; - ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps)); - } - } - } - public static boolean isSupportLongPolling(HttpServletRequest req) { return null != req.getHeader(LONG_POLLING_HEADER); } @@ -306,7 +287,7 @@ public LongPollingService() { allSubs = new ConcurrentLinkedQueue(); ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS); - + // Register LocalDataChangeEvent to NotifyCenter. NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize); From 8a84835af1883f0d0676be4a4fb68f8b2602f5d5 Mon Sep 17 00:00:00 2001 From: zongtanghu Date: Mon, 13 Jul 2020 19:32:01 +0800 Subject: [PATCH 5/5] [ISSUE#3179]fix typo. --- .../alibaba/nacos/config/server/service/LongPollingService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java b/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java index 37d3e1f4c3a..a6a20bddaa6 100755 --- a/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java +++ b/config/src/main/java/com/alibaba/nacos/config/server/service/LongPollingService.java @@ -29,6 +29,7 @@ import com.alibaba.nacos.config.server.utils.LogUtil; import com.alibaba.nacos.config.server.utils.MD5Util; import com.alibaba.nacos.config.server.utils.RequestUtil; + import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service;