Skip to content

Commit

Permalink
fix the issue that release messages might be missed in certain scenarios
Browse files Browse the repository at this point in the history
nobodyiam committed Jul 22, 2021
1 parent 9bd4703 commit f9c69db
Showing 3 changed files with 143 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -59,6 +59,7 @@ Apollo 1.9.0
* [support release apollo-client-config-data](https://github.com/ctripcorp/apollo/pull/3822)
* [Reduce bootstrap time in the situation with large properties](https://github.com/ctripcorp/apollo/pull/3816)
* [docs: English catalog in sidebar](https://github.com/ctripcorp/apollo/pull/3831)
* [fix the issue that release messages might be missed in certain scenarios](https://github.com/ctripcorp/apollo/pull/3819)
------------------
All issues and pull requests are [here](https://github.com/ctripcorp/apollo/milestone/6?closed=1)

Original file line number Diff line number Diff line change
@@ -16,7 +16,12 @@
*/
package com.ctrip.framework.apollo.biz.message;

import com.google.common.collect.Maps;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -40,19 +45,22 @@
*/
public class ReleaseMessageScanner implements InitializingBean {
private static final Logger logger = LoggerFactory.getLogger(ReleaseMessageScanner.class);
private static final int missingReleaseMessageMaxAge = 10; // hardcoded to 10, could be configured via BizConfig if necessary
@Autowired
private BizConfig bizConfig;
@Autowired
private ReleaseMessageRepository releaseMessageRepository;
private int databaseScanInterval;
private List<ReleaseMessageListener> listeners;
private ScheduledExecutorService executorService;
private final List<ReleaseMessageListener> listeners;
private final ScheduledExecutorService executorService;
private final Map<Long, Integer> missingReleaseMessages; // missing release message id => age counter
private long maxIdScanned;

public ReleaseMessageScanner() {
listeners = Lists.newCopyOnWriteArrayList();
executorService = Executors.newScheduledThreadPool(1, ApolloThreadFactory
.create("ReleaseMessageScanner", true));
missingReleaseMessages = Maps.newHashMap();
}

@Override
@@ -62,6 +70,7 @@ public void afterPropertiesSet() throws Exception {
executorService.scheduleWithFixedDelay(() -> {
Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageScanner", "scanMessage");
try {
scanMissingMessages();
scanMessages();
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
@@ -108,10 +117,51 @@ private boolean scanAndSendMessages() {
}
fireMessageScanned(releaseMessages);
int messageScanned = releaseMessages.size();
maxIdScanned = releaseMessages.get(messageScanned - 1).getId();
long newMaxIdScanned = releaseMessages.get(messageScanned - 1).getId();
// check id gaps, possible reasons are release message not committed yet or already rolled back
if (newMaxIdScanned - maxIdScanned > messageScanned) {
recordMissingReleaseMessageIds(releaseMessages, maxIdScanned);
}
maxIdScanned = newMaxIdScanned;
return messageScanned == 500;
}

private void scanMissingMessages() {
Set<Long> missingReleaseMessageIds = missingReleaseMessages.keySet();
Iterable<ReleaseMessage> releaseMessages = releaseMessageRepository
.findAllById(missingReleaseMessageIds);
fireMessageScanned(releaseMessages);
releaseMessages.forEach(releaseMessage -> {
missingReleaseMessageIds.remove(releaseMessage.getId());
});
growAndCleanMissingMessages();
}

private void growAndCleanMissingMessages() {
Iterator<Entry<Long, Integer>> iterator = missingReleaseMessages.entrySet()
.iterator();
while (iterator.hasNext()) {
Entry<Long, Integer> entry = iterator.next();
if (entry.getValue() > missingReleaseMessageMaxAge) {
iterator.remove();
} else {
entry.setValue(entry.getValue() + 1);
}
}
}

private void recordMissingReleaseMessageIds(List<ReleaseMessage> messages, long startId) {
for (ReleaseMessage message : messages) {
long currentId = message.getId();
if (currentId - startId > 1) {
for (long i = startId + 1; i < currentId; i++) {
missingReleaseMessages.putIfAbsent(i, 1);
}
}
startId = currentId;
}
}

/**
* find largest message id as the current start point
* @return current largest message id
@@ -125,7 +175,7 @@ private long loadLargestMessageId() {
* Notify listeners with messages loaded
* @param messages
*/
private void fireMessageScanned(List<ReleaseMessage> messages) {
private void fireMessageScanned(Iterable<ReleaseMessage> messages) {
for (ReleaseMessage message : messages) {
for (ReleaseMessageListener listener : listeners) {
try {
Original file line number Diff line number Diff line change
@@ -18,20 +18,25 @@

import com.ctrip.framework.apollo.biz.config.BizConfig;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.SettableFuture;

import com.ctrip.framework.apollo.biz.AbstractUnitTest;
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.repository.ReleaseMessageRepository;

import java.util.ArrayList;
import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.springframework.test.util.ReflectionTestUtils;

import java.util.concurrent.TimeUnit;

import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.mockito.Mockito.when;

/**
@@ -54,6 +59,10 @@ public void setUp() throws Exception {
databaseScanInterval = 100; //100 ms
when(bizConfig.releaseMessageScanIntervalInMilli()).thenReturn(databaseScanInterval);
releaseMessageScanner.afterPropertiesSet();

Awaitility.reset();
Awaitility.setDefaultTimeout(databaseScanInterval * 5, TimeUnit.MILLISECONDS);
Awaitility.setDefaultPollInterval(databaseScanInterval, TimeUnit.MILLISECONDS);
}

@Test
@@ -91,7 +100,86 @@ public void testScanMessageAndNotifyMessageListener() throws Exception {

assertEquals(anotherMessage, anotherListenerMessage.getMessage());
assertEquals(anotherId, anotherListenerMessage.getId());
}

@Test
public void testScanMessageWithGapAndNotifyMessageListener() throws Exception {
String someMessage = "someMessage";
long someId = 1;
ReleaseMessage someReleaseMessage = assembleReleaseMessage(someId, someMessage);

String someMissingMessage = "someMissingMessage";
long someMissingId = 2;
ReleaseMessage someMissingReleaseMessage = assembleReleaseMessage(someMissingId, someMissingMessage);

String anotherMessage = "anotherMessage";
long anotherId = 3;
ReleaseMessage anotherReleaseMessage = assembleReleaseMessage(anotherId, anotherMessage);

String anotherMissingMessage = "anotherMissingMessage";
long anotherMissingId = 4;
ReleaseMessage anotherMissingReleaseMessage = assembleReleaseMessage(anotherMissingId, anotherMissingMessage);

long someRolledBackId = 5;

String yetAnotherMessage = "yetAnotherMessage";
long yetAnotherId = 6;
ReleaseMessage yetAnotherReleaseMessage = assembleReleaseMessage(yetAnotherId, yetAnotherMessage);

ArrayList<ReleaseMessage> receivedMessage = Lists.newArrayList();
SettableFuture<ReleaseMessage> someListenerFuture = SettableFuture.create();
ReleaseMessageListener someListener = (message, channel) -> receivedMessage.add(message);
releaseMessageScanner.addMessageListener(someListener);

when(releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(0L)).thenReturn(
Lists.newArrayList(someReleaseMessage));

await().untilAsserted(() -> {
assertEquals(1, receivedMessage.size());
assertSame(someReleaseMessage, receivedMessage.get(0));
});

when(releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(someId)).thenReturn(
Lists.newArrayList(anotherReleaseMessage));

await().untilAsserted(() -> {
assertEquals(2, receivedMessage.size());
assertSame(someReleaseMessage, receivedMessage.get(0));
assertSame(anotherReleaseMessage, receivedMessage.get(1));
});

when(releaseMessageRepository.findAllById(Sets.newHashSet(someMissingId)))
.thenReturn(Lists.newArrayList(someMissingReleaseMessage));

await().untilAsserted(() -> {
assertEquals(3, receivedMessage.size());
assertSame(someReleaseMessage, receivedMessage.get(0));
assertSame(anotherReleaseMessage, receivedMessage.get(1));
assertSame(someMissingReleaseMessage, receivedMessage.get(2));
});

when(releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(anotherId)).thenReturn(
Lists.newArrayList(yetAnotherReleaseMessage));

await().untilAsserted(() -> {
assertEquals(4, receivedMessage.size());
assertSame(someReleaseMessage, receivedMessage.get(0));
assertSame(anotherReleaseMessage, receivedMessage.get(1));
assertSame(someMissingReleaseMessage, receivedMessage.get(2));
assertSame(yetAnotherReleaseMessage, receivedMessage.get(3));
});

when(releaseMessageRepository.findAllById(Sets.newHashSet(anotherMissingId, someRolledBackId)))
.thenReturn(Lists.newArrayList(anotherMissingReleaseMessage));

await().untilAsserted(() -> {
assertEquals(5, receivedMessage.size());
assertSame(someReleaseMessage, receivedMessage.get(0));
assertSame(anotherReleaseMessage, receivedMessage.get(1));
assertSame(someMissingReleaseMessage, receivedMessage.get(2));
assertSame(yetAnotherReleaseMessage, receivedMessage.get(3));
assertSame(anotherMissingReleaseMessage, receivedMessage.get(4));
});
}

private ReleaseMessage assembleReleaseMessage(long id, String message) {

0 comments on commit f9c69db

Please sign in to comment.