Skip to content

Commit

Permalink
Fix simplified ordered consuming when a delivery policy was set. (#1251)
Browse files Browse the repository at this point in the history
* Fix simplified ordered consuming when a delivery policy was set.

* more testing for ordered with start time or start sequence

* more testing for ordered with start time or start sequence
  • Loading branch information
scottf authored Nov 15, 2024
1 parent 87cdd45 commit 1bf06d5
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 71 deletions.
12 changes: 6 additions & 6 deletions src/main/java/io/nats/client/impl/NatsJetStreamImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,13 @@ ConsumerConfiguration consumerConfigurationForOrdered(
String consumerName,
Long inactiveThreshold)
{
ConsumerConfiguration.Builder builder =
ConsumerConfiguration.builder(originalCc)
.deliverSubject(newDeliverSubject)
.startTime(null); // clear start time in case it was originally set
ConsumerConfiguration.Builder builder = ConsumerConfiguration.builder(originalCc).deliverSubject(newDeliverSubject);

if (lastStreamSeq > 0) {
builder.deliverPolicy(DeliverPolicy.ByStartSequence)
.startSequence(Math.max(1, lastStreamSeq + 1));
builder
.deliverPolicy(DeliverPolicy.ByStartSequence)
.startSequence(Math.max(1, lastStreamSeq + 1))
.startTime(null); // clear start time in case it was originally set
}

if (consumerName != null && consumerCreate290Available) {
Expand All @@ -213,6 +212,7 @@ ConsumerConfiguration consumerConfigurationForOrdered(
if (inactiveThreshold != null) {
builder.inactiveThreshold(inactiveThreshold);
}

return builder.build();
}

Expand Down
7 changes: 7 additions & 0 deletions src/test/java/io/nats/client/impl/JetStreamTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,13 @@ public static void jsPublish(JetStream js, String subject, String prefix, int st
}
}

public static void jsPublish(JetStream js, String subject, int startId, int count, long sleep) throws IOException, JetStreamApiException {
for (int x = 0; x < count; x++) {
js.publish(NatsMessage.builder().subject(subject).data((dataBytes(startId++))).build());
sleep(sleep);
}
}

public static void jsPublish(JetStream js, String subject, int startId, int count) throws IOException, JetStreamApiException {
for (int x = 0; x < count; x++) {
js.publish(NatsMessage.builder().subject(subject).data((dataBytes(startId++))).build());
Expand Down
222 changes: 157 additions & 65 deletions src/test/java/io/nats/client/impl/SimplificationTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.*;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -265,7 +266,7 @@ public void testIterableConsumer() throws Exception {
int stopCount = 500;
// create the consumer then use it
try (IterableConsumer consumer = consumerContext.iterate()) {
_testIterable(js, stopCount, consumer, tsc.subject());
_testIterableBasic(js, stopCount, consumer, tsc.subject());
}

// coverage
Expand All @@ -275,6 +276,44 @@ public void testIterableConsumer() throws Exception {
});
}

@Test
public void testOrderedConsumerDeliverPolices() throws Exception {
jsServer.run(TestBase::atLeast2_9_1, nc -> {
// Setup
JetStream js = nc.jetStream();
JetStreamManagement jsm = nc.jetStreamManagement();

TestingStreamContainer tsc = new TestingStreamContainer(jsm);

jsPublish(js, tsc.subject(), 101, 3, 100);
ZonedDateTime startTime = getStartTimeFirstMessage(js, tsc);

StreamContext sctx = nc.getStreamContext(tsc.stream);

// test a start time
OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration()
.filterSubject(tsc.subject())
.deliverPolicy(DeliverPolicy.ByStartTime)
.startTime(startTime);
OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ);
try (IterableConsumer consumer = occtx.iterate()) {
Message m = consumer.nextMessage(1000);
assertEquals(2, m.metaData().streamSequence());
}

// test a start sequence
occ = new OrderedConsumerConfiguration()
.filterSubject(tsc.subject())
.deliverPolicy(DeliverPolicy.ByStartSequence)
.startSequence(2);
occtx = sctx.createOrderedConsumer(occ);
try (IterableConsumer consumer = occtx.iterate()) {
Message m = consumer.nextMessage(1000);
assertEquals(2, m.metaData().streamSequence());
}
});
}

@Test
public void testOrderedIterableConsumerBasic() throws Exception {
jsServer.run(TestBase::atLeast2_9_1, nc -> {
Expand All @@ -288,12 +327,12 @@ public void testOrderedIterableConsumerBasic() throws Exception {
OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(tsc.subject());
OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ);
try (IterableConsumer consumer = occtx.iterate()) {
_testIterable(js, stopCount, consumer, tsc.subject());
_testIterableBasic(js, stopCount, consumer, tsc.subject());
}
});
}

private static void _testIterable(JetStream js, int stopCount, IterableConsumer consumer, String subject) throws InterruptedException {
private static void _testIterableBasic(JetStream js, int stopCount, IterableConsumer consumer, String subject) throws InterruptedException {
AtomicInteger count = new AtomicInteger();
Thread consumeThread = new Thread(() -> {
try {
Expand Down Expand Up @@ -615,28 +654,49 @@ public void testOrderedBehaviorNext() throws Exception {
JetStream js = nc.jetStream();
JetStreamManagement jsm = nc.jetStreamManagement();

// Get this in place before subscriptions are made
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedNextTestDropSimulator::new;

TestingStreamContainer tsc = new TestingStreamContainer(jsm);
StreamContext sctx = js.getStreamContext(tsc.stream);
jsPublish(js, tsc.subject(), 101, 6);

OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(tsc.subject());
OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ);
// Loop through the messages to make sure I get stream sequence 1 to 6
int expectedStreamSeq = 1;
while (expectedStreamSeq <= 6) {
Message m = occtx.next(1000);
if (m != null) {
assertEquals(expectedStreamSeq, m.metaData().streamSequence());
assertEquals(1, m.metaData().consumerSequence());
++expectedStreamSeq;
}
}
jsPublish(js, tsc.subject(), 101, 6, 100);
ZonedDateTime startTime = getStartTimeFirstMessage(js, tsc);

// New pomm factory in place before each subscription is made
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedNextTestDropSimulator::new;
_testOrderedNext(sctx, 1, new OrderedConsumerConfiguration().filterSubject(tsc.subject()));

((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedNextTestDropSimulator::new;
_testOrderedNext(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject())
.deliverPolicy(DeliverPolicy.ByStartTime).startTime(startTime));

((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedNextTestDropSimulator::new;
_testOrderedNext(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject())
.deliverPolicy(DeliverPolicy.ByStartSequence).startSequence(2));
});
}

private static ZonedDateTime getStartTimeFirstMessage(JetStream js, TestingStreamContainer tsc) throws IOException, JetStreamApiException, InterruptedException {
ZonedDateTime startTime;
JetStreamSubscription sub = js.subscribe(tsc.subject());
Message mt = sub.nextMessage(1000);
startTime = mt.metaData().timestamp().plus(30, ChronoUnit.MILLIS);
sub.unsubscribe();
return startTime;
}

private static void _testOrderedNext(StreamContext sctx, int expectedStreamSeq, OrderedConsumerConfiguration occ) throws IOException, JetStreamApiException, InterruptedException, JetStreamStatusCheckedException {
OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ);
// Loop through the messages to make sure I get stream sequence 1 to 6
while (expectedStreamSeq <= 6) {
Message m = occtx.next(1000);
if (m != null) {
assertEquals(expectedStreamSeq, m.metaData().streamSequence());
assertEquals(1, m.metaData().consumerSequence());
++expectedStreamSeq;
}
}
}

public static long CS_FOR_SS_3 = 3;
public static class PullOrderedTestDropSimulator extends PullOrderedMessageManager {
@SuppressWarnings("ClassEscapesDefinedScope")
public PullOrderedTestDropSimulator(NatsConnection conn, NatsJetStream js, String stream, SubscribeOptions so, ConsumerConfiguration serverCC, boolean queueMode, boolean syncMode) {
Expand All @@ -646,8 +706,8 @@ public PullOrderedTestDropSimulator(NatsConnection conn, NatsJetStream js, Strin
@Override
protected Boolean beforeQueueProcessorImpl(NatsMessage msg) {
if (msg.isJetStream()
&& msg.metaData().streamSequence() == 2
&& msg.metaData().consumerSequence() == 2)
&& msg.metaData().streamSequence() == 3
&& msg.metaData().consumerSequence() == CS_FOR_SS_3)
{
return false;
}
Expand All @@ -663,39 +723,55 @@ public void testOrderedBehaviorFetch() throws Exception {
JetStream js = nc.jetStream();
JetStreamManagement jsm = nc.jetStreamManagement();

// Get this in place before subscriptions are made
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new;

TestingStreamContainer tsc = new TestingStreamContainer(jsm);
StreamContext sctx = js.getStreamContext(tsc.stream);
jsPublish(js, tsc.subject(), 101, 5);
OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(tsc.subject());
OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ);
int expectedStreamSeq = 1;
FetchConsumeOptions fco = FetchConsumeOptions.builder().maxMessages(6).expiresIn(1000).build();
try (FetchConsumer fcon = occtx.fetch(fco)) {
Message m = fcon.nextMessage();
while (m != null) {
assertEquals(expectedStreamSeq++, m.metaData().streamSequence());
m = fcon.nextMessage();
}
// we know this because the simulator is designed to fail the first time at the second message
assertEquals(2, expectedStreamSeq);
// fetch failure will stop the consumer, but make sure it's done b/c with ordered
// I can't have more than one consuming at a time.
while (!fcon.isFinished()) {
sleep(1);
}

jsPublish(js, tsc.subject(), 101, 6, 100);
ZonedDateTime startTime = getStartTimeFirstMessage(js, tsc);

// New pomm factory in place before each subscription is made
// Set the Consumer Sequence For Stream Sequence 3 statically for ease
CS_FOR_SS_3 = 3;
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new;
_testOrderedFetch(sctx, 1, new OrderedConsumerConfiguration().filterSubject(tsc.subject()));

CS_FOR_SS_3 = 2;
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new;
_testOrderedFetch(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject())
.deliverPolicy(DeliverPolicy.ByStartTime).startTime(startTime));

CS_FOR_SS_3 = 2;
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new;
_testOrderedFetch(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject())
.deliverPolicy(DeliverPolicy.ByStartSequence).startSequence(2));
});
}

private static void _testOrderedFetch(StreamContext sctx, int expectedStreamSeq, OrderedConsumerConfiguration occ) throws Exception {
OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ);
FetchConsumeOptions fco = FetchConsumeOptions.builder().maxMessages(6).expiresIn(1000).build();
try (FetchConsumer fcon = occtx.fetch(fco)) {
Message m = fcon.nextMessage();
while (m != null) {
assertEquals(expectedStreamSeq++, m.metaData().streamSequence());
m = fcon.nextMessage();
}
// this should finish without error
try (FetchConsumer fcon = occtx.fetch(fco)) {
Message m = fcon.nextMessage();
while (expectedStreamSeq <= 5) {
assertEquals(expectedStreamSeq++, m.metaData().streamSequence());
m = fcon.nextMessage();
}
// we know this because the simulator is designed to fail the first time at the third message
assertEquals(3, expectedStreamSeq);
// fetch failure will stop the consumer, but make sure it's done b/c with ordered
// I can't have more than one consuming at a time.
while (!fcon.isFinished()) {
sleep(1);
}
});
}
// this should finish without error
try (FetchConsumer fcon = occtx.fetch(fco)) {
Message m = fcon.nextMessage();
while (expectedStreamSeq <= 6) {
assertEquals(expectedStreamSeq++, m.metaData().streamSequence());
m = fcon.nextMessage();
}
}
}

@Test
Expand All @@ -705,25 +781,41 @@ public void testOrderedBehaviorIterable() throws Exception {
JetStream js = nc.jetStream();
JetStreamManagement jsm = nc.jetStreamManagement();

// Get this in place before subscriptions are made
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new;

TestingStreamContainer tsc = new TestingStreamContainer(jsm);
StreamContext sctx = js.getStreamContext(tsc.stream);
jsPublish(js, tsc.subject(), 101, 5);
OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(tsc.subject());
OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ);
try (IterableConsumer icon = occtx.iterate()) {
// Loop through the messages to make sure I get stream sequence 1 to 5
int expectedStreamSeq = 1;
while (expectedStreamSeq <= 5) {
Message m = icon.nextMessage(Duration.ofSeconds(1)); // use duration version here for coverage
if (m != null) {
assertEquals(expectedStreamSeq++, m.metaData().streamSequence());
}

jsPublish(js, tsc.subject(), 101, 6, 100);
ZonedDateTime startTime = getStartTimeFirstMessage(js, tsc);

// New pomm factory in place before each subscription is made
// Set the Consumer Sequence For Stream Sequence 3 statically for ease
CS_FOR_SS_3 = 3;
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new;
_testOrderedIterate(sctx, 1, new OrderedConsumerConfiguration().filterSubject(tsc.subject()));

CS_FOR_SS_3 = 2;
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new;
_testOrderedIterate(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject())
.deliverPolicy(DeliverPolicy.ByStartTime).startTime(startTime));

CS_FOR_SS_3 = 2;
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new;
_testOrderedIterate(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject())
.deliverPolicy(DeliverPolicy.ByStartSequence).startSequence(2));
});
}

private static void _testOrderedIterate(StreamContext sctx, int expectedStreamSeq, OrderedConsumerConfiguration occ) throws Exception {
OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ);
try (IterableConsumer icon = occtx.iterate()) {
// Loop through the messages to make sure I get stream sequence 1 to 5
while (expectedStreamSeq <= 5) {
Message m = icon.nextMessage(Duration.ofSeconds(1)); // use duration version here for coverage
if (m != null) {
assertEquals(expectedStreamSeq++, m.metaData().streamSequence());
}
}
});
}
}

@Test
Expand Down

0 comments on commit 1bf06d5

Please sign in to comment.