Skip to content

Commit

Permalink
Javadocs03 (#1221)
Browse files Browse the repository at this point in the history
* New sytle JS API doc updates. ConsumerContext pull only.

* createConsumerContext fail fast on push consumer

* Revert "createConsumerContext fail fast on push consumer"

This reverts commit 1c65472.

* Formatting

* Removed reference to rejected fail fast change

* Typos
  • Loading branch information
roeschter authored Sep 9, 2024
1 parent b4a484b commit 346cc29
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 57 deletions.
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/BaseConsumerContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

/**
* The Consumer Context provides a convenient interface around a defined JetStream Consumer
* <p> Note: ConsumerContext requires a <b>pull consumer</b>.
*/
public interface BaseConsumerContext {
/**
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/nats/client/ConsumerContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

/**
* The Consumer Context provides a convenient interface around a defined JetStream Consumer
* <p> Note: ConsumerContext requires a <b>pull consumer</b>.
* <p> For basic usage examples see {@link JetStream JetStream}
*/
public interface ConsumerContext extends BaseConsumerContext {
/**
Expand Down
103 changes: 51 additions & 52 deletions src/main/java/io/nats/client/JetStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,76 +25,75 @@

/**
* JetStream context for access to streams and consumers in NATS.
*
*
* <h3>Basic usage</h3>
*
* <p>{@link #publish(String, byte[]) JetStream.Publish} will send a message on the specified subject, waiting for acknowledgement.
*
* <p>{@link #publish(String, byte[]) JetStream.Publish} will send a message on the specified subject, waiting for acknowledgement.
* A <b>503 No responders</b> error will be received if no stream is listening on said subject.
*
* <p>{@link #publishAsync(String, byte[]) PublishAsync} will not wait for acknowledgement but return a {@link CompletableFuture CompletableFuture},
*
* <p>{@link #publishAsync(String, byte[]) PublishAsync} will not wait for acknowledgement but return a {@link CompletableFuture CompletableFuture},
* which can be checked for acknowledgement at a later point.
*
* <p> Use {@link #getStreamContext(String ) getStreamContext(String)} to access a simplified API for <b>consuming/subscribing</b> messages from Jetstream.
* It is <b>recommened</b> to manage consumers explicitely through {@link StreamContext StreamContext} or {@link JetStreamManagement JetStreamManagement}
*
* <p>{@link #subscribe(String)} is a convenience method for implicitly creating a consumer on a stream and receiving messages. This method should be used for ephemeral (not durable) conusmers.
* It can create a named durable consumers though Options, but we prefer to avoid creating durable consumers implictly.
* It is <b>recommened</b> to manage consumers explicitely through {@link StreamContext StreamContext} or {@link JetStreamManagement JetStreamManagement}
*
* {@link ConsumerContext ConsumerContext} based subscription.
*
*
* <p> Use {@link #getStreamContext(String ) getStreamContext(String)} to access a simplified API for <b>consuming/subscribing</b> messages from Jetstream.
* It is <b>recommened</b> to manage consumers explicitly through {@link StreamContext StreamContext} or {@link JetStreamManagement JetStreamManagement}
*
* <p>{@link #subscribe(String)} is a convenience method for implicitly creating a consumer on a stream and receiving messages. This method should be used for ephemeral (not durable) conusmers.
* It can create a named durable consumers though Options, but we prefer to avoid creating durable consumers implictly.
* It is <b>recommened</b> to manage consumers explicitly through {@link StreamContext StreamContext} and {@link ConsumerContext ConsumerContext} or {@link JetStreamManagement JetStreamManagement}
*
*
* <h3>Recommended usage for creating streams, consumers, publish and listen on a stream</h3>
* <pre>
* io.nats.client.Connection nc = Nats.connect();
*
* //Setting up a stream and a consumer
*
* //Setting up a stream and a consumer
* JetStreamManagement jsm = nc.jetStreamManagement();
* StreamConfiguration sc = StreamConfiguration.builder()
* .name("my-stream")
* .storageType(StorageType.File)
* .subjects("foo.*", "bar.*")
* .build();
*
*
* jsm.addStream(sc);
*
*
* ConsumerConfiguration consumerConfig = ConsumerConfiguration.builder()
* .durable("my-consumer")
* .build();
*
*
* jsm.createConsumer("my-stream", consumerConfig);
*
* //Listening and publishing
*
* //Listening and publishing
* io.nats.client.JetStream js = nc.jetStream();
* ConsumerContext consumerContext = js.getConsumerContext("my-stream", "my-consumer");
* ConsumerContext consumerContext = js.getConsumerContext("my-stream", "my-consumer");
* MessageConsumer mc = consumerContext.consume(
* msg -&gt; {
* System.out.println(" Received " + msg.getSubject());
* msg.ack();
* });
*
*
* js.publish("foo.joe", "Hello World".getBytes());
*
*
* //Wait a moment, then stop the MessageConsumer
* Thread.sleep(3000);
* mc.stop();
*
*
* </pre>
*
*
* <h3>Recommended usage of asynchronous publishing</h3>
*
* Jetstream messages can be published asynchronously, returning a CompletableFuture.
* Note that you need to check the Future eventually otherwise the delivery guarantee is the same a regular {@link Connection#publish(String, byte[]) Connection.Publish}
*
*
* Jetstream messages can be published asynchronously, returning a CompletableFuture.
* Note that you need to check the Future eventually otherwise the delivery guarantee is the same a regular {@link Connection#publish(String, byte[]) Connection.Publish}
*
* <p>We are publishing a batch of 100 messages and check for completion afterwards.
*
*
* <pre>
* int COUNT = 100;
* java.util.concurrent.CompletableFuture&lt;?&gt;[] acks = new java.util.concurrent.CompletableFuture&lt;?&gt;[COUNT];
*
*
* for( int i=0; i&lt;COUNT; i++ ) {
* acks[i] = js.publishAsync("foo.joe", ("Hello "+i).getBytes());
* }
*
*
* //Acknowledgments may arrive out of sequence, but CompletableFuture is handling this for us.
* for( int i=0; i&lt;COUNT; i++ ) {
* try {
Expand All @@ -103,11 +102,11 @@
* //Retry or handle error
* }
* }
*
*
* //Now we may send anther batch
*
*
* </pre>
*
*
*/
public interface JetStream {

Expand Down Expand Up @@ -545,7 +544,7 @@ public interface JetStream {

/**
* Create an asynchronous subscription to the specified subject in the mode of pull, with additional options.
*
*
* @param subscribeSubject The subject to subscribe to
* Can be null or empty when the options have a ConsumerConfiguration that supplies a filter subject.
* @param dispatcher The dispatcher to handle this subscription
Expand All @@ -560,25 +559,25 @@ public interface JetStream {

/**
* Get a stream context for a specific named stream. Verifies that the stream exists.
*
* <p><b>Recommended usage:</b> {@link StreamContext StreamContext} and {@link ConsumerContext ConsumerContext} are the preferred way to interact with existing streams and consume from streams.
* {@link JetStreamManagement JetStreamManagement} should be used to create streams and consumers. {@link ConsumerContext#consume ConsumerContext.consume()} supports both push and pull consumers transparently.
*
*
* <p><b>Recommended usage:</b> {@link StreamContext StreamContext} and {@link ConsumerContext ConsumerContext} are the preferred way to interact with existing streams and consume from streams.
* {@link JetStreamManagement JetStreamManagement} should be used to create streams and consumers. Note that {@link ConsumerContext#consume ConsumerContext.consume()} only supports both pull consumers.
*
* <pre>
* nc = Nats.connect();
* Jetstream js = nc.jetStream();
* StreamContext streamContext = js.getStreamContext("my-stream");
* ConsumerContext consumerContext = streamContext.getConsumerContext("my-consumer");
* // Or
* // ConsumerContext consumerContext = js.getConsumerContext("my-stream", "my-consumer");
* // Or
* // ConsumerContext consumerContext = js.getConsumerContext("my-stream", "my-consumer");
* consumerContext.consume(
* msg -&gt; {
* System.out.println(" Received " + msg.getSubject());
* msg.ack();
* });
* </pre>
*
*
* </pre>
*
*
* @param streamName the name of the stream
* @return a StreamContext object
* @throws IOException covers various communication issues with the NATS
Expand All @@ -589,16 +588,16 @@ public interface JetStream {

/**
* Get a consumer context for a specific named stream and specific named consumer.
*
* <p><b>Recommended usage:</b> See {@link #getStreamContext(String) getStreamContext(String)}
*
* <p> Note that ConsumerContext expects a <b>pull consumer</b>.
* <p><b>Recommended usage:</b> See {@link #getStreamContext(String) getStreamContext(String)}
*
* Verifies that the stream and consumer exist.
* @param streamName the name of the stream
* @param consumerName the name of the consumer
* @return a ConsumerContext object
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws JetStreamApiException the request had an error related to the data.
*/
ConsumerContext getConsumerContext(String streamName, String consumerName) throws IOException, JetStreamApiException;
ConsumerContext getConsumerContext(String streamName, String consumerName) throws IOException, JetStreamApiException;
}
4 changes: 3 additions & 1 deletion src/main/java/io/nats/client/JetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

/**
* JetStream Management context for creation and access to streams and consumers in NATS.
* <p> Using JetStream Management is the <b>recommended</b> way of managing Jetstream resources.
* <p> Basic usage examples can be found in {@link JetStream JetStream}
*/
public interface JetStreamManagement {

Expand Down Expand Up @@ -219,7 +221,7 @@ public interface JetStreamManagement {

/**
* Get a list of stream names that have subjects matching the subject filter.
*
*
* @param subjectFilter the subject. Wildcards are allowed.
* @return The list of stream names matching the subject filter. May be empty, will not be null.
* @throws IOException covers various communication issues with the NATS
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/nats/client/JetStreamReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
import java.time.Duration;

/**
* This interface provides push like ability for a pull consumer. Was the pre-cursor to simplified consume.
* This interface provides a simple iterative access to a pull consumer.
* <p>Note: This interface is superseded by {@link ConsumerContext ConsumerContext}. For examples for <b>recommended usage</b> see {@link JetStream JetStream}.
*/
public interface JetStreamReader {
/**
Expand Down
7 changes: 5 additions & 2 deletions src/main/java/io/nats/client/StreamContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
/**
* The Stream Context provide a set of operations for managing the stream
* and its contents and for managing consumers.
* <p> For basic usage examples see {@link JetStream JetStream}
*/
public interface StreamContext {
/**
Expand Down Expand Up @@ -74,21 +75,23 @@ public interface StreamContext {
/**
* Get a consumer context for the context's stream and specific named consumer.
* Verifies that the consumer exists.
* <p> Note that ConsumerContext expects a <b>pull consumer</b>.
* @param consumerName the name of the consumer
* @return a ConsumerContext object
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws JetStreamApiException the request had an error related to the data.
*/
ConsumerContext getConsumerContext(String consumerName) throws IOException, JetStreamApiException;

/**
* Management function to create or update a consumer on this stream.
* <p> Note that ConsumerContext expects a <b>pull consumer</b>.
* @param config the consumer configuration to use.
* @return a ConsumerContext object
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws JetStreamApiException the request had an error related to the data.
*/
ConsumerContext createOrUpdateConsumer(ConsumerConfiguration config) throws IOException, JetStreamApiException;

Expand Down
9 changes: 9 additions & 0 deletions src/main/java/io/nats/client/api/AckPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,17 @@
* Represents the Ack Policy of a consumer
*/
public enum AckPolicy {
/**
* Messages are acknowledged as soon as the server sends them. Clients do not need to ack.
*/
None("none"),
/**
* All messages with a sequence number less than the message acked are also acknowledged. E.g. reading a batch of messages 1 .. 100. Ack on message 100 will acknowledge 1 .. 99 as well.
*/
All("all"),
/**
* Each message must be acknowledged individually. Message can be acked out of sequence and create gaps of unacknowledged messages in the consumer.
*/
Explicit("explicit");

private String policy;
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/io/nats/client/api/ConsumerConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package io.nats.client.api;

import io.nats.client.Connection;
import io.nats.client.JetStream;
import io.nats.client.PullSubscribeOptions;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.support.*;
Expand All @@ -32,6 +34,8 @@
* The ConsumerConfiguration class specifies the configuration for creating a JetStream consumer on the client and
* if necessary the server.
* Options are created using a ConsumerConfiguration.Builder.
* <p>ConsumerConfiguration is intended to be used with {@link io.nats.client.JetStreamManagement#createConsumer(String, ConsumerConfiguration) JetStreamManagement.createConsumer()}.
* <P> By default this will create a <b>pull consumer</b> unless {@link ConsumerConfiguration.Builder#deliverSubject(String) ConsumerConfiguration.Builder.deliverSubject(String) } is set.
*/
public class ConsumerConfiguration implements JsonSerializable {
@Deprecated
Expand Down Expand Up @@ -159,7 +163,8 @@ protected ConsumerConfiguration(Builder b)
* Returns a JSON representation of this consumer configuration.
* @return json consumer configuration json string
*/
public String toJson() {
@Override
public String toJson() {
StringBuilder sb = beginJson();
JsonUtils.addField(sb, DESCRIPTION, description);
JsonUtils.addField(sb, DURABLE_NAME, durable);
Expand Down Expand Up @@ -816,6 +821,7 @@ public Builder deliverPolicy(DeliverPolicy policy) {

/**
* Sets the subject to deliver messages to.
* <p> By setting the deliverySubject this configuration will create a <b>push consumer</b>. When left empty or set to NULL a pull consumer will be created.
* @param subject the subject.
* @return the builder
*/
Expand Down

0 comments on commit 346cc29

Please sign in to comment.