Skip to content

Commit

Permalink
Extract non 211 Part 2 (#1242)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Oct 15, 2024
1 parent dd8a3a1 commit 1fbee9b
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 64 deletions.
3 changes: 1 addition & 2 deletions src/main/java/io/nats/client/FeatureOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ public abstract class FeatureOptions {

private final JetStreamOptions jso;

@SuppressWarnings("rawtypes") // Don't need the type of the builder to get its vars
protected FeatureOptions(Builder b) {
protected FeatureOptions(Builder<?, ?> b) {
jso = b.jsoBuilder.build();
}

Expand Down
3 changes: 1 addition & 2 deletions src/main/java/io/nats/client/SubscribeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ public abstract class SubscribeOptions {
protected final long pendingByteLimit; // Only applicable for non dispatched (sync) push consumers.
protected final String name;

@SuppressWarnings("rawtypes") // Don't need the type of the builder to get its vars
protected SubscribeOptions(Builder builder, boolean isPull,
protected SubscribeOptions(Builder<?, ?> builder, boolean isPull,
String deliverSubject, String deliverGroup,
long pendingMessageLimit, long pendingByteLimit) {

Expand Down
18 changes: 9 additions & 9 deletions src/test/java/io/nats/client/impl/JetStreamGeneralTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ public void testBindPush() throws Exception {

jsPublish(js, tsc.subject(), 1, 1);
PushSubscribeOptions pso = PushSubscribeOptions.builder()
.durable(tsc.name())
.durable(tsc.consumerName())
.build();
JetStreamSubscription s = js.subscribe(tsc.subject(), pso);
Message m = s.nextMessage(DEFAULT_TIMEOUT);
Expand All @@ -474,7 +474,7 @@ public void testBindPush() throws Exception {
jsPublish(js, tsc.subject(), 2, 1);
pso = PushSubscribeOptions.builder()
.stream(tsc.stream)
.durable(tsc.name())
.durable(tsc.consumerName())
.bind(true)
.build();
s = js.subscribe(tsc.subject(), pso);
Expand All @@ -485,7 +485,7 @@ public void testBindPush() throws Exception {
unsubscribeEnsureNotBound(s);

jsPublish(js, tsc.subject(), 3, 1);
pso = PushSubscribeOptions.bind(tsc.stream, tsc.name());
pso = PushSubscribeOptions.bind(tsc.stream, tsc.consumerName());
s = js.subscribe(tsc.subject(), pso);
m = s.nextMessage(DEFAULT_TIMEOUT);
assertNotNull(m);
Expand All @@ -495,7 +495,7 @@ public void testBindPush() throws Exception {
() -> PushSubscribeOptions.builder().stream(tsc.stream).bind(true).build());

assertThrows(IllegalArgumentException.class,
() -> PushSubscribeOptions.builder().durable(tsc.name()).bind(true).build());
() -> PushSubscribeOptions.builder().durable(tsc.consumerName()).bind(true).build());

assertThrows(IllegalArgumentException.class,
() -> PushSubscribeOptions.builder().stream(EMPTY).bind(true).build());
Expand All @@ -514,7 +514,7 @@ public void testBindPull() throws Exception {
jsPublish(js, tsc.subject(), 1, 1);

PullSubscribeOptions pso = PullSubscribeOptions.builder()
.durable(tsc.name())
.durable(tsc.consumerName())
.build();
JetStreamSubscription s = js.subscribe(tsc.subject(), pso);
s.pull(1);
Expand All @@ -527,7 +527,7 @@ public void testBindPull() throws Exception {
jsPublish(js, tsc.subject(), 2, 1);
pso = PullSubscribeOptions.builder()
.stream(tsc.stream)
.durable(tsc.name())
.durable(tsc.consumerName())
.bind(true)
.build();
s = js.subscribe(tsc.subject(), pso);
Expand All @@ -539,7 +539,7 @@ public void testBindPull() throws Exception {
unsubscribeEnsureNotBound(s);

jsPublish(js, tsc.subject(), 3, 1);
pso = PullSubscribeOptions.bind(tsc.stream, tsc.name());
pso = PullSubscribeOptions.bind(tsc.stream, tsc.consumerName());
s = js.subscribe(tsc.subject(), pso);
s.pull(1);
m = s.nextMessage(DEFAULT_TIMEOUT);
Expand Down Expand Up @@ -960,9 +960,9 @@ public void testInternalLookupConsumerInfoCoverage() throws Exception {
// - consumer not found
// - stream does not exist
JetStreamSubscription sub = js.subscribe(tsc.subject());
assertNull(((NatsJetStream)js).lookupConsumerInfo(tsc.stream, tsc.name()));
assertNull(((NatsJetStream)js).lookupConsumerInfo(tsc.stream, tsc.consumerName()));
assertThrows(JetStreamApiException.class,
() -> ((NatsJetStream)js).lookupConsumerInfo(stream(999), tsc.name()));
() -> ((NatsJetStream)js).lookupConsumerInfo(stream(999), tsc.consumerName()));
});
}

Expand Down
24 changes: 12 additions & 12 deletions src/test/java/io/nats/client/impl/JetStreamManagementTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ public void testAddPausedConsumer() throws Exception {

ZonedDateTime pauseUntil = ZonedDateTime.now(ZONE_ID_GMT).plusMinutes(2);
ConsumerConfiguration cc = ConsumerConfiguration.builder()
.durable(tsc.name())
.durable(tsc.consumerName())
.pauseUntil(pauseUntil)
.build();

Expand All @@ -849,7 +849,7 @@ public void testPauseResumeConsumer() throws Exception {
assertEquals(0, list.size());

ConsumerConfiguration cc = ConsumerConfiguration.builder()
.durable(tsc.name())
.durable(tsc.consumerName())
.build();

// durable and name can both be null
Expand Down Expand Up @@ -886,9 +886,9 @@ public void testPauseResumeConsumer() throws Exception {
ci = jsm.getConsumerInfo(tsc.stream, ci.getName());
assertFalse(ci.getPaused());

assertThrows(JetStreamApiException.class, () -> jsm.pauseConsumer(stream(), tsc.name(), pauseUntil));
assertThrows(JetStreamApiException.class, () -> jsm.pauseConsumer(stream(), tsc.consumerName(), pauseUntil));
assertThrows(JetStreamApiException.class, () -> jsm.pauseConsumer(tsc.stream, name(), pauseUntil));
assertThrows(JetStreamApiException.class, () -> jsm.resumeConsumer(stream(), tsc.name()));
assertThrows(JetStreamApiException.class, () -> jsm.resumeConsumer(stream(), tsc.consumerName()));
assertThrows(JetStreamApiException.class, () -> jsm.resumeConsumer(tsc.stream, name()));
});
}
Expand Down Expand Up @@ -1009,7 +1009,7 @@ public void testConsumerMetadata() throws Exception {
TestingStreamContainer tsc = new TestingStreamContainer(jsm);

ConsumerConfiguration cc = ConsumerConfiguration.builder()
.durable(tsc.name())
.durable(tsc.consumerName())
.metadata(metaData)
.build();

Expand Down Expand Up @@ -1065,14 +1065,14 @@ public void testGetConsumerInfo() throws Exception {
jsServer.run(nc -> {
JetStreamManagement jsm = nc.jetStreamManagement();
TestingStreamContainer tsc = new TestingStreamContainer(jsm);
assertThrows(JetStreamApiException.class, () -> jsm.getConsumerInfo(tsc.stream, tsc.name()));
ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.name()).build();
assertThrows(JetStreamApiException.class, () -> jsm.getConsumerInfo(tsc.stream, tsc.consumerName()));
ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.consumerName()).build();
ConsumerInfo ci = jsm.addOrUpdateConsumer(tsc.stream, cc);
assertEquals(tsc.stream, ci.getStreamName());
assertEquals(tsc.name(), ci.getName());
ci = jsm.getConsumerInfo(tsc.stream, tsc.name());
assertEquals(tsc.consumerName(), ci.getName());
ci = jsm.getConsumerInfo(tsc.stream, tsc.consumerName());
assertEquals(tsc.stream, ci.getStreamName());
assertEquals(tsc.name(), ci.getName());
assertEquals(tsc.consumerName(), ci.getName());
assertThrows(JetStreamApiException.class, () -> jsm.getConsumerInfo(tsc.stream, durable(999)));
if (nc.getServerInfo().isSameOrNewerThanVersion("2.10")) {
assertNotNull(ci.getTimestamp());
Expand Down Expand Up @@ -1228,14 +1228,14 @@ public void testConsumerReplica() throws Exception {
TestingStreamContainer tsc = new TestingStreamContainer(nc);

final ConsumerConfiguration cc0 = ConsumerConfiguration.builder()
.durable(tsc.name())
.durable(tsc.consumerName())
.build();
ConsumerInfo ci = jsm.addOrUpdateConsumer(tsc.stream, cc0);
// server returns 0 when value is not set
assertEquals(0, ci.getConsumerConfiguration().getNumReplicas());

final ConsumerConfiguration cc1 = ConsumerConfiguration.builder()
.durable(tsc.name())
.durable(tsc.consumerName())
.numReplicas(1)
.build();
ci = jsm.addOrUpdateConsumer(tsc.stream, cc1);
Expand Down
26 changes: 13 additions & 13 deletions src/test/java/io/nats/client/impl/JetStreamPullTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ public void testFetch() throws Exception {
.build();

PullSubscribeOptions options = PullSubscribeOptions.builder()
.durable(tsc.name())
.durable(tsc.consumerName())
.configuration(cc)
.build();

JetStreamSubscription sub = js.subscribe(tsc.subject(), options);
assertSubscription(sub, tsc.stream, tsc.name(), null, true);
assertSubscription(sub, tsc.stream, tsc.consumerName(), null, true);
nc.flush(Duration.ofSeconds(1)); // flush outgoing communication with/to the server

List<Message> messages = sub.fetch(10, fetchDur);
Expand Down Expand Up @@ -139,12 +139,12 @@ public void testIterate() throws Exception {
.build();

PullSubscribeOptions options = PullSubscribeOptions.builder()
.durable(tsc.name())
.durable(tsc.consumerName())
.configuration(cc)
.build();

JetStreamSubscription sub = js.subscribe(tsc.subject(), options);
assertSubscription(sub, tsc.stream, tsc.name(), null, true);
assertSubscription(sub, tsc.stream, tsc.consumerName(), null, true);
nc.flush(Duration.ofSeconds(1)); // flush outgoing communication with/to the server

Iterator<Message> iterator = sub.iterate(10, fetchDur);
Expand Down Expand Up @@ -218,11 +218,11 @@ public void testBasic() throws Exception {
TestingStreamContainer tsc = new TestingStreamContainer(nc);

// Build our subscription options.
PullSubscribeOptions options = PullSubscribeOptions.builder().durable(tsc.name()).build();
PullSubscribeOptions options = PullSubscribeOptions.builder().durable(tsc.consumerName()).build();

// Subscribe synchronously.
JetStreamSubscription sub = js.subscribe(tsc.subject(), options);
assertSubscription(sub, tsc.stream, tsc.name(), null, true);
assertSubscription(sub, tsc.stream, tsc.consumerName(), null, true);
nc.flush(Duration.ofSeconds(1)); // flush outgoing communication with/to the server

// publish some amount of messages, but not entire pull size
Expand Down Expand Up @@ -317,11 +317,11 @@ public void testNoWait() throws Exception {
TestingStreamContainer tsc = new TestingStreamContainer(nc);

// Build our subscription options.
PullSubscribeOptions options = PullSubscribeOptions.builder().durable(tsc.name()).build();
PullSubscribeOptions options = PullSubscribeOptions.builder().durable(tsc.consumerName()).build();

// Subscribe synchronously.
JetStreamSubscription sub = js.subscribe(tsc.subject(), options);
assertSubscription(sub, tsc.stream, tsc.name(), null, true);
assertSubscription(sub, tsc.stream, tsc.consumerName(), null, true);
nc.flush(Duration.ofSeconds(1)); // flush outgoing communication with/to the server

// publish 10 messages
Expand Down Expand Up @@ -391,11 +391,11 @@ public void testPullExpires() throws Exception {
TestingStreamContainer tsc = new TestingStreamContainer(nc);

// Build our subscription options.
PullSubscribeOptions options = PullSubscribeOptions.builder().durable(tsc.name()).build();
PullSubscribeOptions options = PullSubscribeOptions.builder().durable(tsc.consumerName()).build();

// Subscribe synchronously.
JetStreamSubscription sub = js.subscribe(tsc.subject(), options);
assertSubscription(sub, tsc.stream, tsc.name(), null, true);
assertSubscription(sub, tsc.stream, tsc.consumerName(), null, true);
nc.flush(Duration.ofSeconds(1)); // flush outgoing communication with/to the server

long expires = 500; // millis
Expand Down Expand Up @@ -574,7 +574,7 @@ public void testAckWaitTimeout() throws Exception {
.ackWait(1500)
.build();
PullSubscribeOptions pso = PullSubscribeOptions.builder()
.durable(tsc.name())
.durable(tsc.consumerName())
.configuration(cc)
.build();

Expand Down Expand Up @@ -1108,10 +1108,10 @@ public void testReader() throws Exception {
JetStream js = nc.jetStream();

// Pre define a consumer
ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.name()).filterSubjects(tsc.subject()).build();
ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.consumerName()).filterSubjects(tsc.subject()).build();
jsm.addOrUpdateConsumer(tsc.stream, cc);

PullSubscribeOptions so = PullSubscribeOptions.bind(tsc.stream, tsc.name());
PullSubscribeOptions so = PullSubscribeOptions.bind(tsc.stream, tsc.consumerName());
JetStreamSubscription sub = js.subscribe(tsc.subject(), so);
JetStreamReader reader = sub.reader(500, 125);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void testQueueSubWorkflow() throws Exception {
// - the PushSubscribeOptions can be re-used since all the subscribers are the same
// - use a concurrent integer to track all the messages received
// - have a list of subscribers and threads so I can track them
PushSubscribeOptions pso = PushSubscribeOptions.builder().durable(tsc.name()).build();
PushSubscribeOptions pso = PushSubscribeOptions.builder().durable(tsc.consumerName()).build();
AtomicInteger allReceived = new AtomicInteger();
List<JsQueueSubscriber> subscribers = new ArrayList<>();
List<Thread> subThreads = new ArrayList<>();
Expand Down
6 changes: 3 additions & 3 deletions src/test/java/io/nats/client/impl/JetStreamTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,11 @@ public String subject(Object variant) {
return subjects.computeIfAbsent(variant, TestBase::subject);
}

public String name() {
return name(defaultNameVariant);
public String consumerName() {
return consumerName(defaultNameVariant);
}

public String name(Object variant) {
public String consumerName(Object variant) {
return names.computeIfAbsent(variant, TestBase::name);
}
}
Expand Down
42 changes: 21 additions & 21 deletions src/test/java/io/nats/client/impl/SimplificationTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,11 @@ public void testIterableConsumer() throws Exception {
JetStream js = nc.jetStream();

// Pre define a consumer
ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.name()).build();
ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.consumerName()).build();
jsm.addOrUpdateConsumer(tsc.stream, cc);

// Consumer[Context]
ConsumerContext consumerContext = js.getConsumerContext(tsc.stream, tsc.name());
ConsumerContext consumerContext = js.getConsumerContext(tsc.stream, tsc.consumerName());

int stopCount = 500;
// create the consumer then use it
Expand Down Expand Up @@ -355,11 +355,11 @@ public void testConsumeWithHandler() throws Exception {
jsPublish(js, tsc.subject(), 2500);

// Pre define a consumer
ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.name()).build();
ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.consumerName()).build();
jsm.addOrUpdateConsumer(tsc.stream, cc);

// Consumer[Context]
ConsumerContext consumerContext = js.getConsumerContext(tsc.stream, tsc.name());
ConsumerContext consumerContext = js.getConsumerContext(tsc.stream, tsc.consumerName());

int stopCount = 500;

Expand Down Expand Up @@ -428,30 +428,30 @@ public void testCoverage() throws Exception {
JetStream js = nc.jetStream();

// Pre define a consumer
jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.name(1)).build());
jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.name(2)).build());
jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.name(3)).build());
jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.name(4)).build());
jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.consumerName(1)).build());
jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.consumerName(2)).build());
jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.consumerName(3)).build());
jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.consumerName(4)).build());

// Stream[Context]
StreamContext sctx1 = nc.getStreamContext(tsc.stream);
nc.getStreamContext(tsc.stream, JetStreamOptions.DEFAULT_JS_OPTIONS);
js.getStreamContext(tsc.stream);

// Consumer[Context]
ConsumerContext cctx1 = nc.getConsumerContext(tsc.stream, tsc.name(1));
ConsumerContext cctx2 = nc.getConsumerContext(tsc.stream, tsc.name(2), JetStreamOptions.DEFAULT_JS_OPTIONS);
ConsumerContext cctx3 = js.getConsumerContext(tsc.stream, tsc.name(3));
ConsumerContext cctx4 = sctx1.getConsumerContext(tsc.name(4));
ConsumerContext cctx5 = sctx1.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(tsc.name(5)).build());
ConsumerContext cctx6 = sctx1.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(tsc.name(6)).build());

after(cctx1.iterate(), tsc.name(1), true);
after(cctx2.iterate(ConsumeOptions.DEFAULT_CONSUME_OPTIONS), tsc.name(2), true);
after(cctx3.consume(m -> {}), tsc.name(3), true);
after(cctx4.consume(ConsumeOptions.DEFAULT_CONSUME_OPTIONS, m -> {}), tsc.name(4), true);
after(cctx5.fetchMessages(1), tsc.name(5), false);
after(cctx6.fetchBytes(1000), tsc.name(6), false);
ConsumerContext cctx1 = nc.getConsumerContext(tsc.stream, tsc.consumerName(1));
ConsumerContext cctx2 = nc.getConsumerContext(tsc.stream, tsc.consumerName(2), JetStreamOptions.DEFAULT_JS_OPTIONS);
ConsumerContext cctx3 = js.getConsumerContext(tsc.stream, tsc.consumerName(3));
ConsumerContext cctx4 = sctx1.getConsumerContext(tsc.consumerName(4));
ConsumerContext cctx5 = sctx1.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(tsc.consumerName(5)).build());
ConsumerContext cctx6 = sctx1.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(tsc.consumerName(6)).build());

after(cctx1.iterate(), tsc.consumerName(1), true);
after(cctx2.iterate(ConsumeOptions.DEFAULT_CONSUME_OPTIONS), tsc.consumerName(2), true);
after(cctx3.consume(m -> {}), tsc.consumerName(3), true);
after(cctx4.consume(ConsumeOptions.DEFAULT_CONSUME_OPTIONS, m -> {}), tsc.consumerName(4), true);
after(cctx5.fetchMessages(1), tsc.consumerName(5), false);
after(cctx6.fetchBytes(1000), tsc.consumerName(6), false);
});
}

Expand Down
Loading

0 comments on commit 1fbee9b

Please sign in to comment.