Skip to content

Commit

Permalink
Extract from main 2.11 branch things not specific to 2.11 Part 3 (#1243)
Browse files Browse the repository at this point in the history
* Extract 2.11 changes part 3

* remove tests too
  • Loading branch information
scottf authored Oct 16, 2024
1 parent 1fbee9b commit dabca74
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 419 deletions.
43 changes: 0 additions & 43 deletions src/main/java/io/nats/client/JetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

/**
* JetStream Management context for creation and access to streams and consumers in NATS.
Expand Down Expand Up @@ -325,48 +324,6 @@ public interface JetStreamManagement {
*/
MessageInfo getNextMessage(String streamName, long seq, String subject) throws IOException, JetStreamApiException;

/**
* Request a batch of messages using a {@link MessageBatchGetRequest}.
* <p>
* This API is currently EXPERIMENTAL and is subject to change.
*
* @param streamName the name of the stream
* @param messageBatchGetRequest the request details
* @return a list containing {@link MessageInfo}
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
List<MessageInfo> fetchMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException;

/**
* Request a batch of messages using a {@link MessageBatchGetRequest}.
* <p>
* This API is currently EXPERIMENTAL and is subject to change.
*
* @param streamName the name of the stream
* @param messageBatchGetRequest the request details
* @return a queue used to asynchronously receive {@link MessageInfo}
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
LinkedBlockingQueue<MessageInfo> queueMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException;

/**
* Request a batch of messages using a {@link MessageBatchGetRequest}.
* <p>
* This API is currently EXPERIMENTAL and is subject to change.
*
* @param streamName the name of the stream
* @param messageBatchGetRequest the request details
* @param handler the handler used for receiving {@link MessageInfo}
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
void requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException;

/**
* Deletes a message, overwriting the message data with garbage
* This can be considered an expensive (time-consuming) operation, but is more secure.
Expand Down
44 changes: 0 additions & 44 deletions src/main/java/io/nats/client/api/MessageInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@
*/
public class MessageInfo extends ApiResponse<MessageInfo> {

/**
* Message returned as a response in {@link MessageBatchGetRequest} to signal end of data.
*/
public static final MessageInfo EOD = new MessageInfo(null, false);

private final boolean direct;
private final String subject;
private final long seq;
Expand All @@ -45,7 +40,6 @@ public class MessageInfo extends ApiResponse<MessageInfo> {
private final Headers headers;
private final String stream;
private final long lastSeq;
private final long numPending;

/**
* Create a Message Info
Expand All @@ -57,25 +51,6 @@ public MessageInfo(Message msg) {
this(msg, null, false);
}

/**
* Create a Message Info
* This signature is public for testing purposes and is not intended to be used externally.
* @param error the error
* @param direct true if the object is being created from a get direct api call instead of the standard get message
*/
public MessageInfo(Error error, boolean direct) {
super(error);
this.direct = direct;
subject = null;
data = null;
seq = -1;
time = null;
headers = null;
stream = null;
lastSeq = -1;
numPending = -1;
}

/**
* Create a Message Info
* This signature is public for testing purposes and is not intended to be used externally.
Expand All @@ -102,14 +77,6 @@ public MessageInfo(Message msg, String streamName, boolean direct) {
else {
lastSeq = JsonUtils.safeParseLong(tempLastSeq, -1);
}
String tempNumPending = msgHeaders.getLast(NATS_NUM_PENDING);
if (tempNumPending == null) {
numPending = -1;
}
else {
// Num pending is +1 since it includes EOB message, correct that here.
numPending = Long.parseLong(tempNumPending) - 1;
}
// these are control headers, not real headers so don't give them to the user.
headers = new Headers(msgHeaders, true, MESSAGE_INFO_HEADERS);
}
Expand All @@ -121,7 +88,6 @@ else if (hasError()) {
headers = null;
stream = null;
lastSeq = -1;
numPending = -1;
}
else {
JsonValue mjv = readValue(jv, MESSAGE);
Expand All @@ -133,7 +99,6 @@ else if (hasError()) {
headers = hdrBytes == null ? null : new IncomingHeadersProcessor(hdrBytes).getHeaders();
stream = streamName;
lastSeq = -1;
numPending = -1;
}
}

Expand Down Expand Up @@ -193,14 +158,6 @@ public long getLastSeq() {
return lastSeq;
}

/**
* Amount of pending messages that can be requested with a subsequent batch request.
* @return number of pending messages
*/
public long getNumPending() {
return numPending;
}

@Override
public String toString() {
StringBuilder sb = JsonUtils.beginJsonPrefixed("\"MessageInfo\":");
Expand All @@ -217,7 +174,6 @@ public String toString() {
JsonUtils.addField(sb, TIME, time);
JsonUtils.addField(sb, STREAM, stream);
JsonUtils.addField(sb, LAST_SEQ, lastSeq);
JsonUtils.addField(sb, NUM_PENDING, numPending);
JsonUtils.addField(sb, SUBJECT, subject);
JsonUtils.addField(sb, HDRS, headers);
return JsonUtils.endJson(sb).toString();
Expand Down
108 changes: 0 additions & 108 deletions src/main/java/io/nats/client/impl/NatsJetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,12 @@
import io.nats.client.*;
import io.nats.client.api.Error;
import io.nats.client.api.*;
import io.nats.client.support.Status;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

import static io.nats.client.support.NatsJetStreamClientError.JsAllowDirectRequired;
import static io.nats.client.support.NatsJetStreamClientError.JsDirectBatchGet211NotAvailable;
import static io.nats.client.support.Validator.*;

public class NatsJetStreamManagement extends NatsJetStreamImpl implements JetStreamManagement {
Expand Down Expand Up @@ -345,109 +340,6 @@ private MessageInfo _getMessage(String streamName, MessageGetRequest messageGetR
}
}

/**
* {@inheritDoc}
*/
@Override
public List<MessageInfo> fetchMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException {
validateMessageBatchGetRequest(streamName, messageBatchGetRequest);
List<MessageInfo> results = new ArrayList<>();
_requestMessageBatch(streamName, messageBatchGetRequest, msg -> {
if (msg != MessageInfo.EOD) {
results.add(msg);
}
});
return results;
}

/**
* {@inheritDoc}
*/
@Override
public LinkedBlockingQueue<MessageInfo> queueMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException {
validateMessageBatchGetRequest(streamName, messageBatchGetRequest);
final LinkedBlockingQueue<MessageInfo> q = new LinkedBlockingQueue<>();
conn.getOptions().getExecutor().submit(() -> _requestMessageBatch(streamName, messageBatchGetRequest, q::add));
return q;
}

/**
* {@inheritDoc}
*/
@Override
public void requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException {
validateMessageBatchGetRequest(streamName, messageBatchGetRequest);
_requestMessageBatch(streamName, messageBatchGetRequest, handler);
}

public void _requestMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) {
Subscription sub = null;
try {
String replyTo = conn.createInbox();
sub = conn.subscribe(replyTo);

String requestSubject = prependPrefix(String.format(JSAPI_DIRECT_GET, streamName));
conn.publish(requestSubject, replyTo, messageBatchGetRequest.serialize());

long maxTimeMillis = getTimeout().toMillis();
long timeLeft = maxTimeMillis;
long start = System.currentTimeMillis();
while (true) {
Message msg = sub.nextMessage(timeLeft);
if (msg == null) {
break;
}
if (msg.isStatusMessage()) {
Status status = msg.getStatus();
// Report error, otherwise successful status.
if (status.getCode() < 200 || status.getCode() > 299) {
MessageInfo messageInfo = new MessageInfo(Error.convert(status), true);
handler.onMessageInfo(messageInfo);
}
break;
}

Headers headers = msg.getHeaders();
if (headers == null || headers.getLast(NATS_NUM_PENDING) == null) {
throw JsDirectBatchGet211NotAvailable.instance();
}

MessageInfo messageInfo = new MessageInfo(msg, streamName, true);
handler.onMessageInfo(messageInfo);
timeLeft = maxTimeMillis - (System.currentTimeMillis() - start);
}
} catch (InterruptedException e) {
// sub.nextMessage was fetching one message
// and data is not completely read
// so it seems like this is an error condition
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} finally {
try {
handler.onMessageInfo(MessageInfo.EOD);
} catch (Exception ignore) {
}
try {
//noinspection DataFlowIssue
sub.unsubscribe();
} catch (Exception ignore) {
}
}
}

private void validateMessageBatchGetRequest(String streamName, MessageBatchGetRequest messageBatchGetRequest) throws IOException, JetStreamApiException {
validateNotNull(messageBatchGetRequest, "Message Batch Get Request");

if (!directBatchGet211Available) {
throw JsDirectBatchGet211NotAvailable.instance();
}

CachedStreamInfo csi = getCachedStreamInfo(streamName);
if (!csi.allowDirect) {
throw JsAllowDirectRequired.instance();
}
}

/**
* {@inheritDoc}
*/
Expand Down
Loading

0 comments on commit dabca74

Please sign in to comment.