Skip to content

Commit

Permalink
Bug 35954327 - [35945522->23.09.1] Topics channels pause receiving fr…
Browse files Browse the repository at this point in the history
…om publishers if a subscriber is killed

(merge ce/main -> ce/23.09 104356)
RQ job.9.20231103165702.26800

[git-p4: depot-paths = "//dev/coherence-ce/release/coherence-ce-v23.09/": change = 104457]
  • Loading branch information
thegridman committed Nov 3, 2023
1 parent e66cfd1 commit 73a955c
Show file tree
Hide file tree
Showing 10 changed files with 446 additions and 29 deletions.
4 changes: 4 additions & 0 deletions prj/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ modules: ## Build Coherence modules
docker: ## Build the Coherence image (does not rebuild Coherence jars)
mvn clean install -Pmodules,-coherence,docker -pl coherence-docker/ -nsu $(ARGS)

.PHONY: docker-no-graal
docker-no-graal: ## Build the Coherence image (does not rebuild Coherence jars)
mvn clean install -Pmodules,-coherence,docker -pl coherence-docker/ -nsu -Dgraal.image.skip=true $(ARGS)

.PHONY: docker-test
docker-test: ## Build the Coherence image (does not rebuild Coherence image)
mvn verify -Pmodules,-coherence,docker-test -pl coherence-docker/ -nsu $(ARGS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1379,16 +1379,22 @@ public PollProcessor.Result pollFromPageHead(BinaryEntry<Subscription.Key, Subsc
return PollProcessor.Result.unknownSubscriber();
}

PagedTopicSubscription pagedTopicSubscription = getPagedTopicSubscription(entrySubscription);

// check whether the channel count has changed
Subscription subscriptionZero = peekSubscriptionZero(entrySubscription);
if (cChannel != subscriptionZero.getLatestChannelCount())
int cChannelActual = getChannelCount();
if (cChannel != cChannelActual)
{
// the channel count has changed to force the subscriber to reconnect, which will refresh allocations
return PollProcessor.Result.unknownSubscriber();
}

// Get the subscription
// If it exists, the PagedTopicSubscription is a more consistent holder of channel allocations and subscriptions.
Subscription subscription = entrySubscription.getValue();
SubscriberId owner = subscription.getOwningSubscriber();
SubscriberId owner = pagedTopicSubscription != null
? pagedTopicSubscription.getOwningSubscriber(nChannel)
: subscription.getOwningSubscriber();

if (!Objects.equals(owner, subscriberId))
{
Expand Down Expand Up @@ -2398,23 +2404,11 @@ protected PagedTopicStatistics getStatistics()
return mgr.getStatistics(f_sName);
}

/**
* Return the read-only {@link Subscription} for channel zero.
*
* @param entry the current subscription {@link BinaryEntry}
*
* @return the read-only {@link Subscription} for channel zero
*/
protected Subscription peekSubscriptionZero(BinaryEntry<Subscription.Key, Subscription> entry)
protected PagedTopicSubscription getPagedTopicSubscription(BinaryEntry<Subscription.Key, Subscription> entry)
{
Subscription.Key key = entry.getKey();
if (key.getChannelId() == 0)
{
return entry.getValue();
}
Binary binKeyZero = toBinaryKey(new Subscription.Key(getPartition(), 0, key.getGroupId()));
BinaryEntry<Subscription.Key, Subscription> entryZero = peekBackingMapEntry(PagedTopicCaches.Names.SUBSCRIPTIONS, binKeyZero);
return entryZero.getValue();
SubscriberGroupId groupId = entry.getKey().getGroupId();
long lId = f_service.getSubscriptionId(f_sName, groupId);
return f_service.getSubscription(lId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.EventListener;
Expand Down Expand Up @@ -1545,7 +1544,7 @@ private Map<Integer, Position> seekInternal(Map<Integer, Position> mapPosition)
.filter(c -> !isOwner(c))
.toList();

if (listUnallocated.size() > 0)
if (!listUnallocated.isEmpty())
{
throw new IllegalStateException("Subscriber is not allocated channels " + listUnallocated);
}
Expand Down Expand Up @@ -1654,7 +1653,7 @@ private void ensureActiveAnOwnedChannels(int... anChannel)
.boxed()
.toList();

if (listUnallocated.size() != 0)
if (!listUnallocated.isEmpty())
{
throw new IllegalArgumentException("One or more channels are not allocated to this subscriber " + listUnallocated);
}
Expand Down Expand Up @@ -4663,7 +4662,7 @@ public interface StateListener
/**
* Optional queue of prefetched values which can be used to fulfil future receive requests.
*/
protected ConcurrentLinkedDeque<CommittableElement> m_queueValuesPrefetched = new ConcurrentLinkedDeque<>();
private final ConcurrentLinkedDeque<CommittableElement> m_queueValuesPrefetched = new ConcurrentLinkedDeque<>();

/**
* Queue of pending receive awaiting values.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,27 @@ public int getChannelCount()
return m_aChannelAllocation == null ? 0 : m_aChannelAllocation.length;
}

/**
* Return the {@link SubscriberId} for the subscriber that
* owns the specified channel.
*
* @param nChannel the channel to obtain the owner
*
* @return the {@link SubscriberId} for the subscriber that
* owns the specified channel or {@code null} if there
* is no owner or the channel does not exist
*/
public SubscriberId getOwningSubscriber(int nChannel)
{
long[] aChannelAllocation = m_aChannelAllocation;
if (aChannelAllocation.length > nChannel)
{
long lId = aChannelAllocation[nChannel];
return m_mapSubscriber.get(lId);
}
return null;
}

/**
* Returns the channel allocations for this subscription.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,9 @@ public void setLastPolledSubscriber(SubscriberId id)
}

/**
* Returns {@code true} if a given subscriber is allocated to this owns the channel,
* or zero if no subscriber owns the channel.
* Returns the {@link SubscriberId} that owns this {@link Subscription}.
*
* @return the identifier of the subscriber that owns the channel
* @return the {@link SubscriberId} that owns this {@link Subscription}
*/
public SubscriberId getOwningSubscriber()
{
Expand Down Expand Up @@ -559,8 +558,10 @@ long[] getChannelAllocations()
* Update this subscription.
*
* @param subscription the {@link PagedTopicSubscription} to update the state from
*
* @return this updated {@link Subscription}
*/
public void update(PagedTopicSubscription subscription)
public Subscription update(PagedTopicSubscription subscription)
{
if (subscription != null)
{
Expand All @@ -587,6 +588,7 @@ public void update(PagedTopicSubscription subscription)
f_lock.unlock();
}
}
return this;
}

/**
Expand Down Expand Up @@ -1102,5 +1104,5 @@ public void writeExternal(PofWriter out) throws IOException
/**
* A lock to control access to internal state.
*/
private final Lock f_lock = new ReentrantLock();
private final transient Lock f_lock = new ReentrantLock();
}
2 changes: 1 addition & 1 deletion prj/coherence-docker/build-images.sh
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ fi
# Ensure there is a builder image set
if [ "${BUILDER_IMAGE}" == "" ]
then
BUILDER_IMAGE="container-registry.oracle.com/os/oraclelinux:8"
BUILDER_IMAGE="ghcr.io/oracle/oraclelinux:8"
fi

# we must use docker format to use health checks
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright (c) 2000, 2023, Oracle and/or its affiliates.
*
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
*/
package topics.bug_35945522;

import com.oracle.bedrock.junit.CoherenceClusterResource;
import com.oracle.bedrock.options.LaunchLogging;
import com.oracle.bedrock.runtime.coherence.CoherenceCluster;
import com.oracle.bedrock.runtime.coherence.CoherenceClusterBuilder;
import com.oracle.bedrock.runtime.coherence.CoherenceClusterMember;
import com.oracle.bedrock.runtime.coherence.options.ClusterName;
import com.oracle.bedrock.runtime.coherence.options.LocalHost;
import com.oracle.bedrock.runtime.coherence.options.LocalStorage;
import com.oracle.bedrock.runtime.coherence.options.Logging;
import com.oracle.bedrock.runtime.coherence.options.RoleName;
import com.oracle.bedrock.runtime.coherence.options.WellKnownAddress;
import com.oracle.bedrock.runtime.java.options.ClassName;
import com.oracle.bedrock.runtime.java.options.HeapSize;
import com.oracle.bedrock.runtime.java.options.IPv4Preferred;
import com.oracle.bedrock.runtime.options.DisplayName;
import com.oracle.bedrock.runtime.options.StabilityPredicate;
import com.oracle.bedrock.testsupport.deferred.Eventually;
import com.oracle.bedrock.testsupport.junit.TestLogs;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import topics.NamedTopicTests;

import java.util.Map;
import java.util.TreeMap;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.number.OrderingComparison.greaterThan;

/**
* This test is related to BUG 35945522.
* </p>
* Specifically, the bug related to a QA test where 17 subscriber JVMs were started,
* one for each channel. A publisher was started that published, round-robin, to
* each channel. After a short time, a subscriber was killed. The issue was that
* after reallocation of channels, some of the subscribers stopped receiving any
* messages, even though they had channel allocations (which had changed).
*/
@SuppressWarnings("resource")
public class Bug35945522Tests
{
@Ignore("Disabled until we fix the intermittent failure")
@Test
public void shouldContinueToReceive() throws Exception
{
Eventually.assertDeferred(this::getChannelCount, is(not(-1)));
int cChannel = getChannelCount();
assertThat(cChannel, is(not(-1)));

CoherenceClusterBuilder builder = new CoherenceClusterBuilder()
.with(cluster.getCommonOptions())
.include(cChannel, CoherenceClusterMember.class,
ClassName.of(SubscriberMain.class),
LocalStorage.disabled(),
DisplayName.of("subscriber"),
RoleName.of("subscriber"))
.include(1, CoherenceClusterMember.class,
ClassName.of(PublisherMain.class),
LocalStorage.disabled(),
DisplayName.of("publisher"),
RoleName.of("publisher"));

CoherenceCluster pubSub = builder.build();

CoherenceClusterMember publisher = pubSub.stream()
.filter(m -> m.getName()
.startsWith("publisher"))
.findFirst()
.orElse(null);

assertThat(publisher, is(notNullValue()));

TreeMap<Long, CoherenceClusterMember> mapSub = new TreeMap<>();

// assert member is subscribed - it has a subscriber id
for (CoherenceClusterMember member : pubSub.getAll("subscriber"))
{
Eventually.assertDeferred(() -> member.invoke(SubscriberMain.GET_SUBSCRIBER_ID), is(notNullValue()));
mapSub.put(member.invoke(SubscriberMain.GET_SUBSCRIBER_ID), member);
}

// assert each subscriber has one channel
for (CoherenceClusterMember member : mapSub.values())
{
Eventually.assertDeferred(() -> member.invoke(SubscriberMain.GET_CHANNEL_COUNT), is(1));
}

// publish 2 messages to each channel
assertThat(publisher.invoke(new PublisherMain.Publish()), is(cChannel));
assertThat(publisher.invoke(new PublisherMain.Publish()), is(cChannel));

// wait for the subscribers to receive them
for (CoherenceClusterMember member : mapSub.values())
{
Eventually.assertDeferred(() -> member.invoke(SubscriberMain.GET_RECEIVED_COUNT), is(2));
}

// kill a subscriber from the middle of the set of subscribers
Map.Entry<Long, CoherenceClusterMember> entryKill = mapSub.entrySet().stream()
.skip(10)
.findFirst()
.orElse(null);

assertThat(entryKill, is(notNullValue()));
entryKill.getValue().close();

mapSub.remove(entryKill.getKey());

// publish 2 more messages to each channel
assertThat(publisher.invoke(new PublisherMain.Publish()), is(cChannel));
assertThat(publisher.invoke(new PublisherMain.Publish()), is(cChannel));

// wait for the remaining subscribers to receive more messages
for (CoherenceClusterMember member : mapSub.values())
{
Eventually.assertDeferred(() -> member.invoke(SubscriberMain.GET_RECEIVED_COUNT), is(greaterThan(2)));
}
}

private int getChannelCount()
{
return cluster.getCluster()
.findAny()
.map(member -> member.invoke(new PublisherMain.GetChannelCount()))
.orElse(-1);
}

@ClassRule
public static TestLogs s_testLogs = new TestLogs(NamedTopicTests.class);

@ClassRule
public static CoherenceClusterResource cluster =
new CoherenceClusterResource()
.with(ClusterName.of("Bug35945522Tests"),
HeapSize.of(64, HeapSize.Units.MB, 512, HeapSize.Units.MB),
Logging.atMax(),
LocalHost.only(),
WellKnownAddress.loopback(),
IPv4Preferred.yes(),
s_testLogs,
LaunchLogging.disabled(),
StabilityPredicate.of(CoherenceCluster.Predicates.isCoherenceRunning()))
.include(3,
CoherenceClusterMember.class,
DisplayName.of("storage"),
RoleName.of("storage"));
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright (c) 2000, 2023, Oracle and/or its affiliates.
*
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
*/

package topics.bug_35945522;

public interface Constants
{
String TOPIC_NAME = "test";

String GROUP_NAME = "test-group";
}
Loading

0 comments on commit 73a955c

Please sign in to comment.